allow setting initialization to be partially dynamic and run in parallel

This commit is contained in:
Mark Harrah 2011-08-14 10:53:37 -04:00
parent 0b5e6484ba
commit 8ce9950327
5 changed files with 302 additions and 34 deletions

172
util/collection/INode.scala Normal file
View File

@ -0,0 +1,172 @@
package sbt
import java.lang.Runnable
import java.util.concurrent.{atomic, Executor, LinkedBlockingQueue}
import atomic.{AtomicBoolean, AtomicInteger}
import Types.{:+:, Id}
object EvaluationState extends Enumeration {
val New, Blocked, Ready, Calling, Evaluated = Value
}
abstract class EvaluateSettings[Scope]
{
protected val init: Init[Scope]
import init._
protected def executor: Executor
protected def compiledSettings: Seq[Compiled[_]]
import EvaluationState.{Value => EvaluationState, _}
private[this] val complete = new LinkedBlockingQueue[Option[Throwable]]
private[this] val static = PMap.empty[ScopedKey, INode]
private[this] def getStatic[T](key: ScopedKey[T]): INode[T] = static get key getOrElse error("Illegal reference to key " + key)
private[this] val transform: Initialize ~> INode = new (Initialize ~> INode) { def apply[T](i: Initialize[T]): INode[T] = i match {
case k: Keyed[s, T] => single(getStatic(k.scopedKey), k.transform)
case a: Apply[hl,T] => new MixedNode(a.inputs transform transform, a.f)
case u: Uniform[s, T] => new UniformNode(u.inputs map transform.fn[s], u.f)
case b: Bind[s,T] => new BindNode[s,T]( transform(b.in), x => transform(b.f(x)))
case v: Value[T] => constant(v.value)
case o: Optional[s,T] => o.a match {
case None => constant( () => o.f(None) )
case Some(i) => single[s,T](transform(i), x => o.f(Some(x)))
}
}}
private[this] val roots: Seq[INode[_]] = compiledSettings flatMap { cs =>
(cs.settings map { s =>
val t = transform(s.init)
static(s.key) = t
t
}): Seq[INode[_]]
}
private[this] var running = new AtomicInteger
private[this] var cancel = new AtomicBoolean(false)
def run(implicit delegates: Scope => Seq[Scope]): Settings[Scope] =
{
assert(running.get() == 0, "Already running")
startWork()
roots.foreach( _.registerIfNew() )
workComplete()
complete.take() foreach { ex =>
cancel.set(true)
throw ex
}
getResults(delegates)
}
private[this] def getResults(implicit delegates: Scope => Seq[Scope]) = (empty /: static.toTypedSeq) { case (ss, static.TPair(key, node)) => ss.set(key.scope, key.key, node.get) }
private[this] val getValue = new (INode ~> Id) { def apply[T](node: INode[T]) = node.get }
private[this] def submitEvaluate(node: INode[_]) = submit(node.evaluate())
private[this] def submitCallComplete[T](node: BindNode[_, T], value: T) = submit(node.callComplete(value))
private[this] def submit(work: => Unit): Unit =
{
startWork()
executor.execute(new Runnable { def run = if(!cancel.get()) run0(work) })
}
private[this] def run0(work: => Unit): Unit =
{
try { work } catch { case e => complete.put( Some(e) ) }
workComplete()
}
private[this] def startWork(): Unit = running.incrementAndGet()
private[this] def workComplete(): Unit =
if(running.decrementAndGet() == 0)
complete.put( None )
private[this] sealed abstract class INode[T]
{
private[this] var state: EvaluationState = New
private[this] var value: T = _
private[this] val blocking = new collection.mutable.ListBuffer[INode[_]]
private[this] var blockedOn: Int = 0
private[this] val calledBy = new collection.mutable.ListBuffer[BindNode[_, T]]
override def toString = getClass.getName + " (state=" + state + ",blockedOn=" + blockedOn + ",calledBy=" + calledBy.size + ",blocking=" + blocking.size + "): " +
( (static.toSeq.flatMap { case (key, value) => if(value eq this) key.toString :: Nil else Nil }).headOption getOrElse "non-static")
final def get: T = synchronized {
assert(value != null, toString + " not evaluated")
value
}
final def doneOrBlock(from: INode[_]): Boolean = synchronized {
val ready = state == Evaluated
if(!ready) blocking += from
registerIfNew()
ready
}
final def isDone: Boolean = synchronized { state == Evaluated }
final def isNew: Boolean = synchronized { state == New }
final def isCalling: Boolean = synchronized { state == Calling }
final def registerIfNew(): Unit = synchronized { if(state == New) register() }
private[this] def register()
{
assert(state == New, "Already registered and: " + toString)
val deps = dependsOn
blockedOn = deps.size - deps.count(_.doneOrBlock(this))
if(blockedOn == 0)
schedule()
else
state = Blocked
}
final def schedule(): Unit = synchronized {
assert(state == New || state == Blocked, "Invalid state for schedule() call: " + toString)
state = Ready
submitEvaluate(this)
}
final def unblocked(): Unit = synchronized {
assert(state == Blocked, "Invalid state for unblocked() call: " + toString)
blockedOn -= 1
assert(blockedOn >= 0, "Negative blockedOn: " + blockedOn + " for " + toString)
if(blockedOn == 0) schedule()
}
final def evaluate(): Unit = synchronized { evaluate0() }
protected final def makeCall(source: BindNode[_, T], target: INode[T]) {
assert(state == Ready, "Invalid state for call to makeCall: " + toString)
state = Calling
target.call(source)
}
protected final def setValue(v: T) {
assert(state != Evaluated, "Already evaluated (trying to set value to " + v + "): " + toString)
value = v
state = Evaluated
blocking foreach { _.unblocked() }
blocking.clear()
calledBy foreach { node => submitCallComplete(node, value) }
calledBy.clear()
}
final def call(by: BindNode[_, T]): Unit = synchronized {
registerIfNew()
state match {
case Evaluated => submitCallComplete(by, value)
case _ => calledBy += by
}
}
protected def dependsOn: Seq[INode[_]]
protected def evaluate0(): Unit
}
private[this] def constant[T](f: () => T): INode[T] = new MixedNode[HNil, T](KNil, _ => f())
private[this] def single[S,T](in: INode[S], f: S => T): INode[T] = new MixedNode[S :+: HNil, T](in :^: KNil, hl => f(hl.head))
private[this] final class BindNode[S,T](in: INode[S], f: S => INode[T]) extends INode[T]
{
protected def dependsOn = in :: Nil
protected def evaluate0(): Unit = makeCall(this, f(in.get) )
def callComplete(value: T): Unit = synchronized {
assert(isCalling, "Invalid state for callComplete(" + value + "): " + toString)
setValue(value)
}
}
private[this] final class UniformNode[S,T](in: Seq[INode[S]], f: Seq[S] => T) extends INode[T]
{
protected def dependsOn = in
protected def evaluate0(): Unit = setValue( f(in.map(_.get)) )
}
private[this] final class MixedNode[HL <: HList, T](in: KList[INode, HL], f: HL => T) extends INode[T]
{
protected def dependsOn = in.toList
protected def evaluate0(): Unit = setValue( f( in down getValue ) )
}
}

View File

@ -12,9 +12,12 @@ trait RMap[K[_], V[_]]
def get[T](k: K[T]): Option[V[T]]
def contains[T](k: K[T]): Boolean
def toSeq: Seq[(K[_], V[_])]
def toTypedSeq = toSeq.map{ case (k: K[t],v) => TPair[t](k,v.asInstanceOf[V[t]]) }
def keys: Iterable[K[_]]
def values: Iterable[V[_]]
def isEmpty: Boolean
final case class TPair[T](key: K[T], value: V[T])
}
trait IMap[K[_], V[_]] extends (K ~> V) with RMap[K,V]

View File

@ -50,7 +50,7 @@ trait Init[Scope]
type SettingSeq[T] = Seq[Setting[T]]
type ScopedMap = IMap[ScopedKey, SettingSeq]
type CompiledMap = Map[ScopedKey[_], Compiled]
type CompiledMap = Map[ScopedKey[_], Compiled[_]]
type MapScoped = ScopedKey ~> ScopedKey
type ValidatedRef[T] = Either[Undefined, ScopedKey[T]]
type ValidatedInit[T] = Either[Seq[Undefined], Initialize[T]]
@ -62,6 +62,7 @@ trait Init[Scope]
def value[T](value: => T): Initialize[T] = new Value(value _)
def optional[T,U](i: Initialize[T])(f: Option[T] => U): Initialize[U] = new Optional(Some(i), f)
def update[T](key: ScopedKey[T])(f: T => T): Setting[T] = new Setting[T](key, app(key :^: KNil)(hl => f(hl.head)))
def bind[S,T](in: Initialize[S])(f: S => Initialize[T]): Initialize[T] = new Bind(f, in)
def app[HL <: HList, T](inputs: KList[Initialize, HL])(f: HL => T): Initialize[T] = new Apply(f, inputs)
def uniform[S,T](inputs: Seq[Initialize[S]])(f: Seq[S] => T): Initialize[T] = new Uniform(f, inputs)
@ -87,18 +88,18 @@ trait Init[Scope]
{
val cMap = compiled(init)(delegates, scopeLocal, display)
// order the initializations. cyclic references are detected here.
val ordered: Seq[Compiled] = sort(cMap)
val ordered: Seq[Compiled[_]] = sort(cMap)
// evaluation: apply the initializations.
applyInits(ordered)
try { applyInits(ordered) }
catch { case rru: RuntimeUndefined => throw Uninitialized(cMap.keys.toSeq, delegates, rru.undefined, true) }
}
def sort(cMap: CompiledMap): Seq[Compiled] =
def sort(cMap: CompiledMap): Seq[Compiled[_]] =
Dag.topologicalSort(cMap.values)(_.dependencies.map(cMap))
def compile(sMap: ScopedMap): CompiledMap =
sMap.toSeq.map { case (k, ss) =>
sMap.toTypedSeq.map { case sMap.TPair(k, ss) =>
val deps = ss flatMap { _.dependsOn } toSet;
val eval = (settings: Settings[Scope]) => (settings /: ss)(applySetting)
(k, new Compiled(k, deps, eval))
(k, new Compiled(k, deps, ss))
} toMap;
def grouped(init: Seq[Setting[_]]): ScopedMap =
@ -144,14 +145,17 @@ trait Init[Scope]
resolve(scopes)
}
private[this] def applyInits(ordered: Seq[Compiled])(implicit delegates: Scope => Seq[Scope]): Settings[Scope] =
(empty /: ordered){ (m, comp) => comp.eval(m) }
private[this] def applySetting[T](map: Settings[Scope], setting: Setting[T]): Settings[Scope] =
private[this] def applyInits(ordered: Seq[Compiled[_]])(implicit delegates: Scope => Seq[Scope]): Settings[Scope] =
{
val value = setting.init.evaluate(map)
val key = setting.key
map.set(key.scope, key.key, value)
val x = java.util.concurrent.Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors)
try {
val eval: EvaluateSettings[Scope] = new EvaluateSettings[Scope] {
override val init: Init.this.type = Init.this
def compiledSettings = ordered
def executor = x
}
eval.run
} finally { x.shutdown() }
}
def showUndefined(u: Undefined, validKeys: Seq[ScopedKey[_]], delegates: Scope => Seq[Scope])(implicit display: Show[ScopedKey[_]]): String =
@ -185,7 +189,7 @@ trait Init[Scope]
val keysString = keys.map(u => showUndefined(u, validKeys, delegates)).mkString("\n\n ", "\n\n ", "")
new Uninitialized(keys, prefix + suffix + " to undefined setting" + suffix + ": " + keysString + "\n ")
}
final class Compiled(val key: ScopedKey[_], val dependencies: Iterable[ScopedKey[_]], val eval: Settings[Scope] => Settings[Scope])
final class Compiled[T](val key: ScopedKey[T], val dependencies: Iterable[ScopedKey[_]], val settings: Seq[Setting[T]])
{
override def toString = showFullKey(key)
}
@ -253,7 +257,7 @@ trait Init[Scope]
sealed trait Keyed[S, T] extends Initialize[T]
{
def scopedKey: ScopedKey[S]
protected def transform: S => T
def transform: S => T
final def dependsOn = scopedKey :: Nil
final def apply[Z](g: T => Z): Initialize[Z] = new GetValue(scopedKey, g compose transform)
final def evaluate(ss: Settings[Scope]): T = transform(getValue(ss, scopedKey))
@ -271,10 +275,24 @@ trait Init[Scope]
}
private[this] final class GetValue[S,T](val scopedKey: ScopedKey[S], val transform: S => T) extends Keyed[S, T]
trait KeyedInitialize[T] extends Keyed[T, T] {
protected final val transform = idFun[T]
final val transform = idFun[T]
}
private[this] final class Optional[S,T](a: Option[Initialize[S]], f: Option[S] => T) extends Initialize[T]
private[sbt] final class Bind[S,T](val f: S => Initialize[T], val in: Initialize[S]) extends Initialize[T]
{
def dependsOn = in.dependsOn
def apply[Z](g: T => Z): Initialize[Z] = new Bind[S,Z](s => f(s)(g), in)
def evaluate(ss: Settings[Scope]): T = f(in evaluate ss) evaluate ss
def mapReferenced(g: MapScoped) = new Bind[S,T](s => f(s) mapReferenced g, in mapReferenced g)
def validateReferenced(g: ValidateRef) = (in validateReferenced g).right.map { validIn =>
new Bind[S,T](s => handleUndefined( f(s) validateReferenced g), validIn)
}
def handleUndefined(vr: ValidatedInit[T]): Initialize[T] = vr match {
case Left(undefs) => throw new RuntimeUndefined(undefs)
case Right(x) => x
}
def mapConstant(g: MapConstant) = new Bind[S,T](s => f(s) mapConstant g, in mapConstant g)
}
private[sbt] final class Optional[S,T](val a: Option[Initialize[S]], val f: Option[S] => T) extends Initialize[T]
{
def dependsOn = dependencies(a.toList)
def apply[Z](g: T => Z): Initialize[Z] = new Optional[S,Z](a, g compose f)
@ -283,7 +301,7 @@ trait Init[Scope]
def validateReferenced(g: ValidateRef) = Right( new Optional(a flatMap { _.validateReferenced(g).right.toOption }, f) )
def mapConstant(g: MapConstant): Initialize[T] = new Optional(a map mapConstantT(g).fn, f)
}
private[this] final class Value[T](value: () => T) extends Initialize[T]
private[sbt] final class Value[T](val value: () => T) extends Initialize[T]
{
def dependsOn = Nil
def mapReferenced(g: MapScoped) = this
@ -292,7 +310,7 @@ trait Init[Scope]
def mapConstant(g: MapConstant) = this
def evaluate(map: Settings[Scope]): T = value()
}
private[this] final class Apply[HL <: HList, T](val f: HL => T, val inputs: KList[Initialize, HL]) extends Initialize[T]
private[sbt] final class Apply[HL <: HList, T](val f: HL => T, val inputs: KList[Initialize, HL]) extends Initialize[T]
{
def dependsOn = dependencies(inputs.toList)
def mapReferenced(g: MapScoped) = mapInputs( mapReferencedT(g) )
@ -308,7 +326,7 @@ trait Init[Scope]
if(undefs.isEmpty) Right(new Apply(f, tx transform get)) else Left(undefs)
}
}
private[this] final class Uniform[S, T](val f: Seq[S] => T, val inputs: Seq[Initialize[S]]) extends Initialize[T]
private[sbt] final class Uniform[S, T](val f: Seq[S] => T, val inputs: Seq[Initialize[S]]) extends Initialize[T]
{
def dependsOn = dependencies(inputs)
def mapReferenced(g: MapScoped) = new Uniform(f, inputs map mapReferencedT(g).fn)

View File

@ -59,9 +59,9 @@ object SettingsUsage
val applied: Settings[Scope] = make(mySettings)(delegates, scopeLocal, showFullKey)
// Show results.
for(i <- 0 to 5; k <- Seq(a, b)) {
/* for(i <- 0 to 5; k <- Seq(a, b)) {
println( k.label + i + " = " + applied.get( Scope(i), k) )
}
}*/
/** Output:
* For the None results, we never defined the value and there was no value to delegate to.

View File

@ -3,21 +3,96 @@ package sbt
import org.scalacheck._
import Prop._
import SettingsUsage._
import SettingsExample._
object SettingsTest extends Properties("settings")
{
def tests =
for(i <- 0 to 5; k <- Seq(a, b)) yield {
val value = applied.get( Scope(i), k)
val expected = expectedValues(2*i + (if(k == a) 0 else 1))
("Index: " + i) |:
("Key: " + k.label) |:
("Value: " + value) |:
("Expected: " + expected) |:
(value == expected)
}
final val ChainMax = 5000
lazy val chainLengthGen = Gen.choose(1, ChainMax)
property("Basic settings test") = secure( all( tests: _*) )
property("Basic chain") = forAll(chainLengthGen) { (i: Int) =>
val abs = math.abs(i)
singleIntTest( chain( abs, value(0)), abs )
}
property("Basic bind chain") = forAll(chainLengthGen) { (i: Int) =>
val abs = math.abs(i)
singleIntTest( chainBind(value(abs)), 0 )
}
property("Allows references to completed settings") = forAllNoShrink(30) { allowedReference _ }
final def allowedReference(intermediate: Int): Prop =
{
val top = value(intermediate)
def iterate(init: Initialize[Int]): Initialize[Int] =
bind(init) { t =>
if(t <= 0)
top
else
iterate(value(t-1) )
}
try { evaluate( setting(chk, iterate(top)) :: Nil); true }
catch { case e: Exception => ("Unexpected exception: " + e) |: false }
}
// Circular (dynamic) references currently loop infinitely.
// This is the expected behavior (detecting dynamic cycles is expensive),
// but it may be necessary to provide an option to detect them (with a performance hit)
// This would test that cycle detection.
// property("Catches circular references") = forAll(chainLengthGen) { checkCircularReferences _ }
final def checkCircularReferences(intermediate: Int): Prop =
{
val ccr = new CCR(intermediate)
try { evaluate( setting(chk, ccr.top) :: Nil); false }
catch { case e: Exception => true }
}
def tests =
for(i <- 0 to 5; k <- Seq(a, b)) yield {
val expected = expectedValues(2*i + (if(k == a) 0 else 1))
checkKey[Int]( ScopedKey( Scope(i), k ), expected, applied)
}
lazy val expectedValues = None :: None :: None :: None :: None :: None :: Some(3) :: None :: Some(3) :: Some(9) :: Some(4) :: Some(9) :: Nil
lazy val ch = AttributeKey[Int]("ch")
lazy val chk = ScopedKey( Scope(0), ch)
def chain(i: Int, prev: Initialize[Int]): Initialize[Int] =
if(i <= 0) prev else chain(i - 1, prev(_ + 1))
def chainBind(prev: Initialize[Int]): Initialize[Int] =
bind(prev) { v =>
if(v <= 0) prev else chainBind(value(v - 1) )
}
def singleIntTest(i: Initialize[Int], expected: Int) =
{
val eval = evaluate( setting( chk, i ) :: Nil )
checkKey( chk, Some(expected), eval )
}
def checkKey[T](key: ScopedKey[T], expected: Option[T], settings: Settings[Scope]) =
{
val value = settings.get( key.scope, key.key)
("Key: " + key) |:
("Value: " + value) |:
("Expected: " + expected) |:
(value == expected)
}
def evaluate(settings: Seq[Setting[_]]): Settings[Scope] =
try { make(settings)(delegates, scopeLocal, showFullKey) }
catch { case e => e.printStackTrace; throw e }
}
// This setup is a workaround for module synchronization issues
final class CCR(intermediate: Int)
{
lazy val top = iterate(value(intermediate), intermediate)
def iterate(init: Initialize[Int], i: Int): Initialize[Int] =
bind(init) { t =>
if(t <= 0)
top
else
iterate(value(t - 1), t-1)
}
}