allow setting initialization to be partially dynamic and run in parallel

This commit is contained in:
Mark Harrah 2011-08-14 10:53:37 -04:00
parent f795f70752
commit d36e02ea22
10 changed files with 342 additions and 60 deletions

View File

@ -212,9 +212,7 @@ object Defaults extends BuildCommon
} join
}
def watchTransitiveSourcesTask: Initialize[Task[Seq[File]]] =
(state, thisProjectRef) flatMap { (s, base) =>
inAllDependencies(base, watchSources.task, Project structure s).join.map(_.flatten)
}
inDependencies[Task[Seq[File]]](watchSources.task, const(std.TaskExtra.constant(Nil)), includeRoot = true) apply { _.join.map(_.flatten) }
def watchSourcesTask: Initialize[Task[Seq[File]]] =
Seq(unmanagedSources, unmanagedResources).map(inAllConfigurations).join { _.join.map(_.flatten.flatten.distinct) }
@ -512,19 +510,21 @@ object Defaults extends BuildCommon
recurse ?? Nil
}
def inAllDependencies[T](base: ProjectRef, key: ScopedSetting[T], structure: Load.BuildStructure): Seq[T] =
{
def deps(ref: ProjectRef): Seq[ProjectRef] =
Project.getProject(ref, structure).toList.flatMap { p =>
p.dependencies.map(_.project) ++ p.aggregate
}
def inDependencies[T](key: ScopedSetting[T], default: ProjectRef => T, includeRoot: Boolean = true, classpath: Boolean = true, aggregate: Boolean = false): Initialize[Seq[T]] =
Project.bind( (loadedBuild, thisProjectRef).identity ) { case (lb, base) =>
transitiveDependencies(base, lb, includeRoot, classpath, aggregate) map ( ref => (key in ref) ?? default(ref) ) join ;
}
inAllDeps(base, deps, key, structure.data)
def transitiveDependencies(base: ProjectRef, structure: Load.LoadedBuild, includeRoot: Boolean, classpath: Boolean = true, aggregate: Boolean = false): Seq[ProjectRef] =
{
val full = Dag.topologicalSort(base)(getDependencies(structure, classpath, aggregate))
if(includeRoot) full else full.dropRight(1)
}
def inAllDeps[T](base: ProjectRef, deps: ProjectRef => Seq[ProjectRef], key: ScopedSetting[T], data: Settings[Scope]): Seq[T] =
inAllProjects(Dag.topologicalSort(base)(deps), key, data)
def inAllProjects[T](allProjects: Seq[Reference], key: ScopedSetting[T], data: Settings[Scope]): Seq[T] =
allProjects.flatMap { p => key in p get data }
def getDependencies(structure: Load.LoadedBuild, classpath: Boolean = true, aggregate: Boolean = false): ProjectRef => Seq[ProjectRef] =
ref => Project.getProject(ref, structure).toList flatMap { p =>
(if(classpath) p.dependencies.map(_.project) else Nil) ++
(if(aggregate) p.aggregate else Nil)
}
val CompletionsID = "completions"

View File

@ -149,8 +149,10 @@ object Project extends Init[Scope] with ProjectExtra
def getProjectForReference(ref: Reference, structure: BuildStructure): Option[ResolvedProject] =
ref match { case pr: ProjectRef => getProject(pr, structure); case _ => None }
def getProject(ref: ProjectRef, structure: BuildStructure): Option[ResolvedProject] =
(structure.units get ref.build).flatMap(_.defined get ref.project)
def getProject(ref: ProjectRef, structure: BuildStructure): Option[ResolvedProject] = getProject(ref, structure.units)
def getProject(ref: ProjectRef, structure: Load.LoadedBuild): Option[ResolvedProject] = getProject(ref, structure.units)
def getProject(ref: ProjectRef, units: Map[URI, Load.LoadedBuildUnit]): Option[ResolvedProject] =
(units get ref.build).flatMap(_.defined get ref.project)
def setProject(session: SessionSettings, structure: BuildStructure, s: State): State =
{

View File

@ -0,0 +1,14 @@
// check that a plain File can be appended to Classpath
unmanagedJars in Compile += file("doesnotexist")
unmanagedJars in Compile ++= Seq( file("doesnotexist1"), file("doesnotexist2") )
// check that an Attributed File can be appended to Classpath
unmanagedJars in Compile += Attributed.blank(file("doesnotexist"))
unmanagedJars in Compile ++= Attributed.blankSeq( Seq( file("doesnotexist1"), file("doesnotexist2") ) )
maxErrors += 1
name += "-demo"

View File

@ -0,0 +1,2 @@
# this test contains source compatibility checks for settings
> test

View File

@ -26,16 +26,12 @@ object TaskRunnerCircularTest extends Properties("TaskRunner Circular")
{
lazy val top = iterate(pure("bottom", intermediate), intermediate)
def iterate(task: Task[Int], i: Int): Task[Int] =
{
lazy val it: Task[Int] =
task flatMap { t =>
if(t <= 0)
top
else
iterate(pure((t-1).toString, t-1), i-1)
}
it
}
task flatMap { t =>
if(t <= 0)
top
else
iterate(pure((t-1).toString, t-1), i-1)
}
try { tryRun(top, true, workers); false }
catch { case i: Incomplete => cyclic(i) }
}

172
util/collection/INode.scala Normal file
View File

@ -0,0 +1,172 @@
package sbt
import java.lang.Runnable
import java.util.concurrent.{atomic, Executor, LinkedBlockingQueue}
import atomic.{AtomicBoolean, AtomicInteger}
import Types.{:+:, Id}
object EvaluationState extends Enumeration {
val New, Blocked, Ready, Calling, Evaluated = Value
}
abstract class EvaluateSettings[Scope]
{
protected val init: Init[Scope]
import init._
protected def executor: Executor
protected def compiledSettings: Seq[Compiled[_]]
import EvaluationState.{Value => EvaluationState, _}
private[this] val complete = new LinkedBlockingQueue[Option[Throwable]]
private[this] val static = PMap.empty[ScopedKey, INode]
private[this] def getStatic[T](key: ScopedKey[T]): INode[T] = static get key getOrElse error("Illegal reference to key " + key)
private[this] val transform: Initialize ~> INode = new (Initialize ~> INode) { def apply[T](i: Initialize[T]): INode[T] = i match {
case k: Keyed[s, T] => single(getStatic(k.scopedKey), k.transform)
case a: Apply[hl,T] => new MixedNode(a.inputs transform transform, a.f)
case u: Uniform[s, T] => new UniformNode(u.inputs map transform.fn[s], u.f)
case b: Bind[s,T] => new BindNode[s,T]( transform(b.in), x => transform(b.f(x)))
case v: Value[T] => constant(v.value)
case o: Optional[s,T] => o.a match {
case None => constant( () => o.f(None) )
case Some(i) => single[s,T](transform(i), x => o.f(Some(x)))
}
}}
private[this] val roots: Seq[INode[_]] = compiledSettings flatMap { cs =>
(cs.settings map { s =>
val t = transform(s.init)
static(s.key) = t
t
}): Seq[INode[_]]
}
private[this] var running = new AtomicInteger
private[this] var cancel = new AtomicBoolean(false)
def run(implicit delegates: Scope => Seq[Scope]): Settings[Scope] =
{
assert(running.get() == 0, "Already running")
startWork()
roots.foreach( _.registerIfNew() )
workComplete()
complete.take() foreach { ex =>
cancel.set(true)
throw ex
}
getResults(delegates)
}
private[this] def getResults(implicit delegates: Scope => Seq[Scope]) = (empty /: static.toTypedSeq) { case (ss, static.TPair(key, node)) => ss.set(key.scope, key.key, node.get) }
private[this] val getValue = new (INode ~> Id) { def apply[T](node: INode[T]) = node.get }
private[this] def submitEvaluate(node: INode[_]) = submit(node.evaluate())
private[this] def submitCallComplete[T](node: BindNode[_, T], value: T) = submit(node.callComplete(value))
private[this] def submit(work: => Unit): Unit =
{
startWork()
executor.execute(new Runnable { def run = if(!cancel.get()) run0(work) })
}
private[this] def run0(work: => Unit): Unit =
{
try { work } catch { case e => complete.put( Some(e) ) }
workComplete()
}
private[this] def startWork(): Unit = running.incrementAndGet()
private[this] def workComplete(): Unit =
if(running.decrementAndGet() == 0)
complete.put( None )
private[this] sealed abstract class INode[T]
{
private[this] var state: EvaluationState = New
private[this] var value: T = _
private[this] val blocking = new collection.mutable.ListBuffer[INode[_]]
private[this] var blockedOn: Int = 0
private[this] val calledBy = new collection.mutable.ListBuffer[BindNode[_, T]]
override def toString = getClass.getName + " (state=" + state + ",blockedOn=" + blockedOn + ",calledBy=" + calledBy.size + ",blocking=" + blocking.size + "): " +
( (static.toSeq.flatMap { case (key, value) => if(value eq this) key.toString :: Nil else Nil }).headOption getOrElse "non-static")
final def get: T = synchronized {
assert(value != null, toString + " not evaluated")
value
}
final def doneOrBlock(from: INode[_]): Boolean = synchronized {
val ready = state == Evaluated
if(!ready) blocking += from
registerIfNew()
ready
}
final def isDone: Boolean = synchronized { state == Evaluated }
final def isNew: Boolean = synchronized { state == New }
final def isCalling: Boolean = synchronized { state == Calling }
final def registerIfNew(): Unit = synchronized { if(state == New) register() }
private[this] def register()
{
assert(state == New, "Already registered and: " + toString)
val deps = dependsOn
blockedOn = deps.size - deps.count(_.doneOrBlock(this))
if(blockedOn == 0)
schedule()
else
state = Blocked
}
final def schedule(): Unit = synchronized {
assert(state == New || state == Blocked, "Invalid state for schedule() call: " + toString)
state = Ready
submitEvaluate(this)
}
final def unblocked(): Unit = synchronized {
assert(state == Blocked, "Invalid state for unblocked() call: " + toString)
blockedOn -= 1
assert(blockedOn >= 0, "Negative blockedOn: " + blockedOn + " for " + toString)
if(blockedOn == 0) schedule()
}
final def evaluate(): Unit = synchronized { evaluate0() }
protected final def makeCall(source: BindNode[_, T], target: INode[T]) {
assert(state == Ready, "Invalid state for call to makeCall: " + toString)
state = Calling
target.call(source)
}
protected final def setValue(v: T) {
assert(state != Evaluated, "Already evaluated (trying to set value to " + v + "): " + toString)
value = v
state = Evaluated
blocking foreach { _.unblocked() }
blocking.clear()
calledBy foreach { node => submitCallComplete(node, value) }
calledBy.clear()
}
final def call(by: BindNode[_, T]): Unit = synchronized {
registerIfNew()
state match {
case Evaluated => submitCallComplete(by, value)
case _ => calledBy += by
}
}
protected def dependsOn: Seq[INode[_]]
protected def evaluate0(): Unit
}
private[this] def constant[T](f: () => T): INode[T] = new MixedNode[HNil, T](KNil, _ => f())
private[this] def single[S,T](in: INode[S], f: S => T): INode[T] = new MixedNode[S :+: HNil, T](in :^: KNil, hl => f(hl.head))
private[this] final class BindNode[S,T](in: INode[S], f: S => INode[T]) extends INode[T]
{
protected def dependsOn = in :: Nil
protected def evaluate0(): Unit = makeCall(this, f(in.get) )
def callComplete(value: T): Unit = synchronized {
assert(isCalling, "Invalid state for callComplete(" + value + "): " + toString)
setValue(value)
}
}
private[this] final class UniformNode[S,T](in: Seq[INode[S]], f: Seq[S] => T) extends INode[T]
{
protected def dependsOn = in
protected def evaluate0(): Unit = setValue( f(in.map(_.get)) )
}
private[this] final class MixedNode[HL <: HList, T](in: KList[INode, HL], f: HL => T) extends INode[T]
{
protected def dependsOn = in.toList
protected def evaluate0(): Unit = setValue( f( in down getValue ) )
}
}

View File

@ -12,9 +12,12 @@ trait RMap[K[_], V[_]]
def get[T](k: K[T]): Option[V[T]]
def contains[T](k: K[T]): Boolean
def toSeq: Seq[(K[_], V[_])]
def toTypedSeq = toSeq.map{ case (k: K[t],v) => TPair[t](k,v.asInstanceOf[V[t]]) }
def keys: Iterable[K[_]]
def values: Iterable[V[_]]
def isEmpty: Boolean
final case class TPair[T](key: K[T], value: V[T])
}
trait IMap[K[_], V[_]] extends (K ~> V) with RMap[K,V]

View File

@ -50,7 +50,7 @@ trait Init[Scope]
type SettingSeq[T] = Seq[Setting[T]]
type ScopedMap = IMap[ScopedKey, SettingSeq]
type CompiledMap = Map[ScopedKey[_], Compiled]
type CompiledMap = Map[ScopedKey[_], Compiled[_]]
type MapScoped = ScopedKey ~> ScopedKey
type ValidatedRef[T] = Either[Undefined, ScopedKey[T]]
type ValidatedInit[T] = Either[Seq[Undefined], Initialize[T]]
@ -62,6 +62,7 @@ trait Init[Scope]
def value[T](value: => T): Initialize[T] = new Value(value _)
def optional[T,U](i: Initialize[T])(f: Option[T] => U): Initialize[U] = new Optional(Some(i), f)
def update[T](key: ScopedKey[T])(f: T => T): Setting[T] = new Setting[T](key, app(key :^: KNil)(hl => f(hl.head)))
def bind[S,T](in: Initialize[S])(f: S => Initialize[T]): Initialize[T] = new Bind(f, in)
def app[HL <: HList, T](inputs: KList[Initialize, HL])(f: HL => T): Initialize[T] = new Apply(f, inputs)
def uniform[S,T](inputs: Seq[Initialize[S]])(f: Seq[S] => T): Initialize[T] = new Uniform(f, inputs)
@ -87,18 +88,18 @@ trait Init[Scope]
{
val cMap = compiled(init)(delegates, scopeLocal, display)
// order the initializations. cyclic references are detected here.
val ordered: Seq[Compiled] = sort(cMap)
val ordered: Seq[Compiled[_]] = sort(cMap)
// evaluation: apply the initializations.
applyInits(ordered)
try { applyInits(ordered) }
catch { case rru: RuntimeUndefined => throw Uninitialized(cMap.keys.toSeq, delegates, rru.undefined, true) }
}
def sort(cMap: CompiledMap): Seq[Compiled] =
def sort(cMap: CompiledMap): Seq[Compiled[_]] =
Dag.topologicalSort(cMap.values)(_.dependencies.map(cMap))
def compile(sMap: ScopedMap): CompiledMap =
sMap.toSeq.map { case (k, ss) =>
sMap.toTypedSeq.map { case sMap.TPair(k, ss) =>
val deps = ss flatMap { _.dependsOn } toSet;
val eval = (settings: Settings[Scope]) => (settings /: ss)(applySetting)
(k, new Compiled(k, deps, eval))
(k, new Compiled(k, deps, ss))
} toMap;
def grouped(init: Seq[Setting[_]]): ScopedMap =
@ -144,14 +145,17 @@ trait Init[Scope]
resolve(scopes)
}
private[this] def applyInits(ordered: Seq[Compiled])(implicit delegates: Scope => Seq[Scope]): Settings[Scope] =
(empty /: ordered){ (m, comp) => comp.eval(m) }
private[this] def applySetting[T](map: Settings[Scope], setting: Setting[T]): Settings[Scope] =
private[this] def applyInits(ordered: Seq[Compiled[_]])(implicit delegates: Scope => Seq[Scope]): Settings[Scope] =
{
val value = setting.init.evaluate(map)
val key = setting.key
map.set(key.scope, key.key, value)
val x = java.util.concurrent.Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors)
try {
val eval: EvaluateSettings[Scope] = new EvaluateSettings[Scope] {
override val init: Init.this.type = Init.this
def compiledSettings = ordered
def executor = x
}
eval.run
} finally { x.shutdown() }
}
def showUndefined(u: Undefined, validKeys: Seq[ScopedKey[_]], delegates: Scope => Seq[Scope])(implicit display: Show[ScopedKey[_]]): String =
@ -185,7 +189,7 @@ trait Init[Scope]
val keysString = keys.map(u => showUndefined(u, validKeys, delegates)).mkString("\n\n ", "\n\n ", "")
new Uninitialized(keys, prefix + suffix + " to undefined setting" + suffix + ": " + keysString + "\n ")
}
final class Compiled(val key: ScopedKey[_], val dependencies: Iterable[ScopedKey[_]], val eval: Settings[Scope] => Settings[Scope])
final class Compiled[T](val key: ScopedKey[T], val dependencies: Iterable[ScopedKey[_]], val settings: Seq[Setting[T]])
{
override def toString = showFullKey(key)
}
@ -253,7 +257,7 @@ trait Init[Scope]
sealed trait Keyed[S, T] extends Initialize[T]
{
def scopedKey: ScopedKey[S]
protected def transform: S => T
def transform: S => T
final def dependsOn = scopedKey :: Nil
final def apply[Z](g: T => Z): Initialize[Z] = new GetValue(scopedKey, g compose transform)
final def evaluate(ss: Settings[Scope]): T = transform(getValue(ss, scopedKey))
@ -271,10 +275,24 @@ trait Init[Scope]
}
private[this] final class GetValue[S,T](val scopedKey: ScopedKey[S], val transform: S => T) extends Keyed[S, T]
trait KeyedInitialize[T] extends Keyed[T, T] {
protected final val transform = idFun[T]
final val transform = idFun[T]
}
private[this] final class Optional[S,T](a: Option[Initialize[S]], f: Option[S] => T) extends Initialize[T]
private[sbt] final class Bind[S,T](val f: S => Initialize[T], val in: Initialize[S]) extends Initialize[T]
{
def dependsOn = in.dependsOn
def apply[Z](g: T => Z): Initialize[Z] = new Bind[S,Z](s => f(s)(g), in)
def evaluate(ss: Settings[Scope]): T = f(in evaluate ss) evaluate ss
def mapReferenced(g: MapScoped) = new Bind[S,T](s => f(s) mapReferenced g, in mapReferenced g)
def validateReferenced(g: ValidateRef) = (in validateReferenced g).right.map { validIn =>
new Bind[S,T](s => handleUndefined( f(s) validateReferenced g), validIn)
}
def handleUndefined(vr: ValidatedInit[T]): Initialize[T] = vr match {
case Left(undefs) => throw new RuntimeUndefined(undefs)
case Right(x) => x
}
def mapConstant(g: MapConstant) = new Bind[S,T](s => f(s) mapConstant g, in mapConstant g)
}
private[sbt] final class Optional[S,T](val a: Option[Initialize[S]], val f: Option[S] => T) extends Initialize[T]
{
def dependsOn = dependencies(a.toList)
def apply[Z](g: T => Z): Initialize[Z] = new Optional[S,Z](a, g compose f)
@ -283,7 +301,7 @@ trait Init[Scope]
def validateReferenced(g: ValidateRef) = Right( new Optional(a flatMap { _.validateReferenced(g).right.toOption }, f) )
def mapConstant(g: MapConstant): Initialize[T] = new Optional(a map mapConstantT(g).fn, f)
}
private[this] final class Value[T](value: () => T) extends Initialize[T]
private[sbt] final class Value[T](val value: () => T) extends Initialize[T]
{
def dependsOn = Nil
def mapReferenced(g: MapScoped) = this
@ -292,7 +310,7 @@ trait Init[Scope]
def mapConstant(g: MapConstant) = this
def evaluate(map: Settings[Scope]): T = value()
}
private[this] final class Apply[HL <: HList, T](val f: HL => T, val inputs: KList[Initialize, HL]) extends Initialize[T]
private[sbt] final class Apply[HL <: HList, T](val f: HL => T, val inputs: KList[Initialize, HL]) extends Initialize[T]
{
def dependsOn = dependencies(inputs.toList)
def mapReferenced(g: MapScoped) = mapInputs( mapReferencedT(g) )
@ -308,7 +326,7 @@ trait Init[Scope]
if(undefs.isEmpty) Right(new Apply(f, tx transform get)) else Left(undefs)
}
}
private[this] final class Uniform[S, T](val f: Seq[S] => T, val inputs: Seq[Initialize[S]]) extends Initialize[T]
private[sbt] final class Uniform[S, T](val f: Seq[S] => T, val inputs: Seq[Initialize[S]]) extends Initialize[T]
{
def dependsOn = dependencies(inputs)
def mapReferenced(g: MapScoped) = new Uniform(f, inputs map mapReferencedT(g).fn)

View File

@ -59,9 +59,9 @@ object SettingsUsage
val applied: Settings[Scope] = make(mySettings)(delegates, scopeLocal, showFullKey)
// Show results.
for(i <- 0 to 5; k <- Seq(a, b)) {
/* for(i <- 0 to 5; k <- Seq(a, b)) {
println( k.label + i + " = " + applied.get( Scope(i), k) )
}
}*/
/** Output:
* For the None results, we never defined the value and there was no value to delegate to.

View File

@ -3,21 +3,96 @@ package sbt
import org.scalacheck._
import Prop._
import SettingsUsage._
import SettingsExample._
object SettingsTest extends Properties("settings")
{
def tests =
for(i <- 0 to 5; k <- Seq(a, b)) yield {
val value = applied.get( Scope(i), k)
val expected = expectedValues(2*i + (if(k == a) 0 else 1))
("Index: " + i) |:
("Key: " + k.label) |:
("Value: " + value) |:
("Expected: " + expected) |:
(value == expected)
}
final val ChainMax = 5000
lazy val chainLengthGen = Gen.choose(1, ChainMax)
property("Basic settings test") = secure( all( tests: _*) )
property("Basic chain") = forAll(chainLengthGen) { (i: Int) =>
val abs = math.abs(i)
singleIntTest( chain( abs, value(0)), abs )
}
property("Basic bind chain") = forAll(chainLengthGen) { (i: Int) =>
val abs = math.abs(i)
singleIntTest( chainBind(value(abs)), 0 )
}
property("Allows references to completed settings") = forAllNoShrink(30) { allowedReference _ }
final def allowedReference(intermediate: Int): Prop =
{
val top = value(intermediate)
def iterate(init: Initialize[Int]): Initialize[Int] =
bind(init) { t =>
if(t <= 0)
top
else
iterate(value(t-1) )
}
try { evaluate( setting(chk, iterate(top)) :: Nil); true }
catch { case e: Exception => ("Unexpected exception: " + e) |: false }
}
// Circular (dynamic) references currently loop infinitely.
// This is the expected behavior (detecting dynamic cycles is expensive),
// but it may be necessary to provide an option to detect them (with a performance hit)
// This would test that cycle detection.
// property("Catches circular references") = forAll(chainLengthGen) { checkCircularReferences _ }
final def checkCircularReferences(intermediate: Int): Prop =
{
val ccr = new CCR(intermediate)
try { evaluate( setting(chk, ccr.top) :: Nil); false }
catch { case e: Exception => true }
}
def tests =
for(i <- 0 to 5; k <- Seq(a, b)) yield {
val expected = expectedValues(2*i + (if(k == a) 0 else 1))
checkKey[Int]( ScopedKey( Scope(i), k ), expected, applied)
}
lazy val expectedValues = None :: None :: None :: None :: None :: None :: Some(3) :: None :: Some(3) :: Some(9) :: Some(4) :: Some(9) :: Nil
lazy val ch = AttributeKey[Int]("ch")
lazy val chk = ScopedKey( Scope(0), ch)
def chain(i: Int, prev: Initialize[Int]): Initialize[Int] =
if(i <= 0) prev else chain(i - 1, prev(_ + 1))
def chainBind(prev: Initialize[Int]): Initialize[Int] =
bind(prev) { v =>
if(v <= 0) prev else chainBind(value(v - 1) )
}
def singleIntTest(i: Initialize[Int], expected: Int) =
{
val eval = evaluate( setting( chk, i ) :: Nil )
checkKey( chk, Some(expected), eval )
}
def checkKey[T](key: ScopedKey[T], expected: Option[T], settings: Settings[Scope]) =
{
val value = settings.get( key.scope, key.key)
("Key: " + key) |:
("Value: " + value) |:
("Expected: " + expected) |:
(value == expected)
}
def evaluate(settings: Seq[Setting[_]]): Settings[Scope] =
try { make(settings)(delegates, scopeLocal, showFullKey) }
catch { case e => e.printStackTrace; throw e }
}
// This setup is a workaround for module synchronization issues
final class CCR(intermediate: Int)
{
lazy val top = iterate(value(intermediate), intermediate)
def iterate(init: Initialize[Int], i: Int): Initialize[Int] =
bind(init) { t =>
if(t <= 0)
top
else
iterate(value(t - 1), t-1)
}
}