overhaul Streams injection

This commit is contained in:
Mark Harrah 2011-02-05 21:39:34 -05:00
parent a49872c35d
commit c1cc482b94
8 changed files with 112 additions and 122 deletions

View File

@ -71,7 +71,8 @@ object Act
def applyTask(s: State, structure: Load.BuildStructure, p: Parser[Task[_]]): Parser[() => State] =
Command.applyEffect(p) { t =>
import EvaluateTask._
processResult(runTask(t)(nodeView(structure, s)), logger(s))
val result = withStreams(structure){ str => runTask(t)(nodeView(s, str)) }
processResult(result, logger(s))
s
}
def actParser(s: State): Parser[() => State] =

View File

@ -106,11 +106,12 @@ object EvaluateTask
val PluginDefinition = TaskKey[(Seq[File], Analysis)]("plugin-definition")
val (state, dummyState) = dummy[State]("state")
val (streams, dummyStreams) = dummy[TaskStreams]("streams")
val (streamsManager, dummyStreamsManager) = dummy[Streams]("streams-manager")
val streams = TaskKey[TaskStreams]("streams", true)
def injectSettings: Seq[Project.Setting[_]] = Seq(
(state in Scope.GlobalScope) :== dummyState,
(streams in Scope.GlobalScope) :== dummyStreams
(streamsManager in Scope.GlobalScope) :== dummyStreamsManager
)
def dummy[T](name: String): (TaskKey[T], Task[T]) = (TaskKey[T](name), dummyTask(name))
@ -124,18 +125,26 @@ object EvaluateTask
processResult(result, log)
}
def evaluateTask[T](structure: BuildStructure, taskKey: ScopedKey[Task[T]], state: State, thisProject: ProjectRef, checkCycles: Boolean = false, maxWorkers: Int = SystemProcessors): Option[Result[T]] =
for( (task, toNode) <- getTask(structure, taskKey, state, thisProject) ) yield
runTask(task, checkCycles, maxWorkers)(toNode)
withStreams(structure) { str =>
for( (task, toNode) <- getTask(structure, taskKey, state, str, thisProject) ) yield
runTask(task, checkCycles, maxWorkers)(toNode)
}
def withStreams[T](structure: BuildStructure)(f: Streams => T): T =
{
val str = std.Streams.closeable(structure.streams)
try { f(str) } finally { str.close() }
}
def getTask[T](structure: BuildStructure, taskKey: ScopedKey[Task[T]], state: State, thisProject: ProjectRef): Option[(Task[T], Execute.NodeView[Task])] =
def getTask[T](structure: BuildStructure, taskKey: ScopedKey[Task[T]], state: State, streams: Streams, thisProject: ProjectRef): Option[(Task[T], Execute.NodeView[Task])] =
{
val thisScope = Scope(Select(thisProject), Global, Global, Global)
val resolvedScope = Scope.replaceThis(thisScope)( taskKey.scope )
for( t <- structure.data.get(resolvedScope, taskKey.key)) yield
(t, nodeView(structure, state))
(t, nodeView(state, streams))
}
def nodeView(structure: BuildStructure, state: State): Execute.NodeView[Task] =
transform(structure, dummyStreams, dummyState, state)
def nodeView(state: State, streams: Streams): Execute.NodeView[Task] =
Transform(dummyStreamsManager :^: dummyState :^: KNil, streams :+: state :+: HNil)
def runTask[Task[_] <: AnyRef, T](root: Task[T], checkCycles: Boolean = false, maxWorkers: Int = SystemProcessors)(implicit taskToNode: Execute.NodeView[Task]): Result[T] =
{
@ -145,13 +154,6 @@ object EvaluateTask
try { x.run(root)(service) } finally { shutdown() }
}
def transform(structure: BuildStructure, streamsDummy: Task[TaskStreams], stateDummy: Task[State], state: State) =
{
val dummies = new Transform.Dummies(stateDummy :^: KNil, streamsDummy)
val inject = new Transform.Injected(state :+: HNil, structure.streams)
Transform(dummies, inject)(structure.index.taskToKey)
}
def processResult[T](result: Result[T], log: Logger): T =
result match
{
@ -160,6 +162,18 @@ object EvaluateTask
log.error(Incomplete.show(inc, true))
error("Task did not complete successfully")
}
val injectStreams = (scoped: ScopedKey[_]) =>
if(scoped.key == streams.key)
{
val scope = Scope.fillTaskAxis(scoped.scope, scoped.key)
Seq(streams in scope <<= streamsManager map { mgr =>
val stream = mgr(ScopedKey(scope, scoped.key))
stream.open()
stream
})
}
else Nil
}
object Index
{
@ -198,7 +212,7 @@ object Load
val evalPluginDef = EvaluateTask.evalPluginDef(log) _
val delegates = memo(defaultDelegates)
val inject: Seq[Project.Setting[_]] = ((AppConfig in Scope.GlobalScope) :== state.configuration) +: EvaluateTask.injectSettings
val config = new LoadBuildConfiguration(stagingDirectory, classpath, loader, compilers, evalPluginDef, delegates, inject, log)
val config = new LoadBuildConfiguration(stagingDirectory, classpath, loader, compilers, evalPluginDef, delegates, EvaluateTask.injectStreams, inject, log)
apply(base, state, config)
}
def defaultDelegates: LoadedBuild => Scope => Seq[Scope] = (lb: LoadedBuild) => {
@ -234,10 +248,10 @@ object Load
lazy val rootEval = lazyEval(loaded.units(loaded.root).unit)
val settings = config.injectSettings ++ buildConfigurations(loaded, getRootProject(projects), rootEval)
val delegates = config.delegates(loaded)
val data = Project.makeSettings(settings, delegates)
val data = Project.makeSettings(settings, delegates, config.scopeLocal)
val index = structureIndex(data)
val streams = mkStreams(projects, loaded.root, data)
(rootEval, new BuildStructure(projects, loaded.root, settings, data, index, streams, delegates))
(rootEval, new BuildStructure(projects, loaded.root, settings, data, index, streams, delegates, config.scopeLocal))
}
def structureIndex(settings: Settings[Scope]): StructureIndex =
@ -246,10 +260,10 @@ object Load
// Reevaluates settings after modifying them. Does not recompile or reload any build components.
def reapply(newSettings: Seq[Setting[_]], structure: BuildStructure): BuildStructure =
{
val newData = Project.makeSettings(newSettings, structure.delegates)
val newData = Project.makeSettings(newSettings, structure.delegates, structure.scopeLocal)
val newIndex = structureIndex(newData)
val newStreams = mkStreams(structure.units, structure.root, newData)
new BuildStructure(units = structure.units, root = structure.root, settings = newSettings, data = newData, index = newIndex, streams = newStreams, delegates = structure.delegates)
new BuildStructure(units = structure.units, root = structure.root, settings = newSettings, data = newData, index = newIndex, streams = newStreams, delegates = structure.delegates, scopeLocal = structure.scopeLocal)
}
def isProjectThis(s: Setting[_]) = s.key.scope.project == This
@ -525,8 +539,8 @@ object Load
def referenced(definition: Project): Seq[ProjectRef] = definition.inherits ++ definition.aggregate ++ definition.dependencies.map(_.project)
final class BuildStructure(val units: Map[URI, LoadedBuildUnit], val root: URI, val settings: Seq[Setting[_]], val data: Settings[Scope], val index: StructureIndex, val streams: Streams, val delegates: Scope => Seq[Scope])
final class LoadBuildConfiguration(val stagingDirectory: File, val classpath: Seq[File], val loader: ClassLoader, val compilers: Compilers, val evalPluginDef: (BuildStructure, State) => (Seq[File], Analysis), val delegates: LoadedBuild => Scope => Seq[Scope], val injectSettings: Seq[Setting[_]], val log: Logger)
final class BuildStructure(val units: Map[URI, LoadedBuildUnit], val root: URI, val settings: Seq[Setting[_]], val data: Settings[Scope], val index: StructureIndex, val streams: Streams, val delegates: Scope => Seq[Scope], val scopeLocal: ScopedKey[_] => Seq[Setting[_]])
final class LoadBuildConfiguration(val stagingDirectory: File, val classpath: Seq[File], val loader: ClassLoader, val compilers: Compilers, val evalPluginDef: (BuildStructure, State) => (Seq[File], Analysis), val delegates: LoadedBuild => Scope => Seq[Scope], val scopeLocal: ScopedKey[_] => Seq[Setting[_]], val injectSettings: Seq[Setting[_]], val log: Logger)
// information that is not original, but can be reconstructed from the rest of BuildStructure
final class StructureIndex(val keyMap: Map[String, AttributeKey[_]], val taskToKey: Map[Task[_], ScopedKey[Task[_]]], val keyIndex: KeyIndex)
@ -542,8 +556,8 @@ object BuildStreams
import Project.display
import std.{TaskExtra,Transform}
type Streams = std.Streams[ScopedKey[Task[_]]]
type TaskStreams = std.TaskStreams[ScopedKey[Task[_]]]
type Streams = std.Streams[ScopedKey[_]]
type TaskStreams = std.TaskStreams[ScopedKey[_]]
val GlobalPath = "$global"
def mkStreams(units: Map[URI, LoadedBuildUnit], root: URI, data: Settings[Scope], logRelativePath: Seq[String] = defaultLogPath): Streams =

View File

@ -10,9 +10,9 @@ package sbt
object LogManager
{
def construct(data: Settings[Scope]) = (task: ScopedKey[Task[_]], to: PrintWriter) =>
def construct(data: Settings[Scope]) = (task: ScopedKey[_], to: PrintWriter) =>
{
val scope = Scope.fillTaskAxis(task.scope, task.key)
val scope = task.scope
def level(key: AttributeKey[Level.Value], default: Level.Value): Level.Value = data.get(scope, key) getOrElse default
val screenLevel = level(ScreenLogLevel, Level.Info)
val backingLevel = level(PersistLogLevel, Level.Debug)

View File

@ -75,7 +75,8 @@ object Project extends Init[Scope]
val newAttrs = s.attributes.put(Watch.key, makeWatched(data, ref, project)).put(HistoryPath.key, historyPath)
s.copy(attributes = newAttrs)
}
def makeSettings(settings: Seq[Setting[_]], delegates: Scope => Seq[Scope]) = translateUninitialized( make(settings)(delegates) )
def makeSettings(settings: Seq[Setting[_]], delegates: Scope => Seq[Scope], scopeLocal: ScopedKey[_] => Seq[Setting[_]]) =
translateUninitialized( make(settings)(delegates, scopeLocal) )
def makeWatched(data: Settings[Scope], ref: ProjectRef, project: Project): Watched =
{

View File

@ -21,29 +21,29 @@ object InputTask {
def apply[I,T](p: Parser[I])(c: I => Task[T]): InputTask[T] = apply(p map c)
}
sealed trait Scoped { def scope: Scope }
sealed trait Scoped { def scope: Scope; def fillThis: Boolean }
sealed trait ScopedTaskable[T] extends Scoped
sealed trait ScopedSetting[T] extends ScopedTaskable[T] { def key: AttributeKey[T] }
sealed trait ScopedTask[T] extends ScopedTaskable[T] { def key: AttributeKey[Task[T]] }
sealed trait ScopedInput[T] extends Scoped { def key: AttributeKey[InputTask[T]] }
sealed trait ScopedInput[T] extends Scoped { def key: AttributeKey[InputTask[T]]; def fillThis = false }
sealed trait Key[T] extends Scoped { final def scope: Scope = Scope(This,This,This,This) }
final class SettingKey[T] private(val key: AttributeKey[T]) extends Key[T] with ScopedSetting[T]
final class TaskKey[T] private(val key: AttributeKey[Task[T]]) extends Key[T] with ScopedTask[T]
final class SettingKey[T] private(val key: AttributeKey[T], val fillThis: Boolean) extends Key[T] with ScopedSetting[T]
final class TaskKey[T] private(val key: AttributeKey[Task[T]], val fillThis: Boolean) extends Key[T] with ScopedTask[T]
final class InputKey[T] private(val key: AttributeKey[InputTask[T]]) extends Key[InputTask[T]] with ScopedInput[T]
object Scoped
{
implicit def richSettingScoped[T](s: ScopedSetting[T]): RichSettingScoped[T] = new RichSettingScoped[T](s.scope, s.key)
implicit def richTaskScoped[T](s: ScopedTask[T]): RichTaskScoped[T] = new RichTaskScoped[T](s.scope, s.key)
implicit def richSettingScoped[T](s: ScopedSetting[T]): RichSettingScoped[T] = new RichSettingScoped[T](s.scope, s.key, s.fillThis)
implicit def richTaskScoped[T](s: ScopedTask[T]): RichTaskScoped[T] = new RichTaskScoped[T](s.scope, s.key, s.fillThis)
implicit def richInputScoped[T](s: ScopedInput[T]): RichInputScoped[T] = new RichInputScoped[T](s.scope, s.key)
implicit def richSettingListScoped[T](s: ScopedSetting[Seq[T]]): RichSettingList[T] = new RichSettingList[T](s.scope, s.key)
implicit def richListTaskScoped[T](s: ScopedTask[Seq[T]]): RichListTask[T] = new RichListTask[T](s.scope, s.key)
implicit def richSettingListScoped[T](s: ScopedSetting[Seq[T]]): RichSettingList[T] = new RichSettingList[T](s.scope, s.key, s.fillThis)
implicit def richListTaskScoped[T](s: ScopedTask[Seq[T]]): RichListTask[T] = new RichListTask[T](s.scope, s.key, s.fillThis)
implicit def taskScoping[T](s: TaskKey[T]): ScopingTask[T] = new ScopingTask[T](s.key)
implicit def taskScoping[T](s: TaskKey[T]): ScopingTask[T] = new ScopingTask[T](s.key, s.fillThis)
implicit def settingScoping[T](s: SettingKey[T]): ScopingSetting[T, ScopedSetting[T]] =
new ScopingSetting(s.key, scope => scopedSetting(scope, s.key))
new ScopingSetting(s.key, scope => scopedSetting(scope, s.key, s.fillThis))
implicit def inputScoping[T](s: InputKey[T]): ScopingSetting[InputTask[T], ScopedInput[T]] =
new ScopingSetting(s.key, scope => scopedInput(scope, s.key))
@ -68,35 +68,35 @@ object Scoped
}
}
private[this] def scopedSetting[T](s: Scope, k: AttributeKey[T]): ScopedSetting[T] = new ScopedSetting[T] { val scope = s; val key = k }
private[this] def scopedSetting[T](s: Scope, k: AttributeKey[T], fb: Boolean): ScopedSetting[T] = new ScopedSetting[T] { val scope = s; val key = k; val fillThis = fb }
private[this] def scopedInput[T](s: Scope, k: AttributeKey[InputTask[T]]): ScopedInput[T] = new ScopedInput[T] { val scope = s; val key = k }
final class ScopingTask[T](taskKey: AttributeKey[Task[T]])
final class ScopingTask[T](taskKey: AttributeKey[Task[T]], val fillThis: Boolean)
{
def in(p: ProjectRef): ScopedTask[T] = in(Select(p), This)
def in(c: ConfigKey): ScopedTask[T] = in(This, Select(c))
def in(p: ProjectRef, c: ConfigKey): ScopedTask[T] = in(Select(p), Select(c))
def in(p: ScopeAxis[ProjectRef], c: ScopeAxis[ConfigKey]): ScopedTask[T] = in(Scope(p, c, This, This))
def in(s: Scope): ScopedTask[T] = scopedTask(s, taskKey)
def in(s: Scope): ScopedTask[T] = scopedTask(s, taskKey, fillThis)
}
private[this] def scopedTask[T](s: Scope, k: AttributeKey[Task[T]]): ScopedTask[T] = new ScopedTask[T] { val scope = s; val key = k }
private[this] def scopedTask[T](s: Scope, k: AttributeKey[Task[T]], fb: Boolean): ScopedTask[T] = new ScopedTask[T] { val scope = s; val key = k; val fillThis = fb }
final class RichSettingList[S](scope: Scope, key: AttributeKey[Seq[S]])
final class RichSettingList[S](scope: Scope, key: AttributeKey[Seq[S]], val fillThis: Boolean)
{
def += (value: => S): Setting[Seq[S]] = ++=(value :: Nil)
def ++=(values: => Seq[S]): Setting[Seq[S]] = (new RichSettingScoped(scope, key)) ~= (_ ++ values )
def ++=(values: => Seq[S]): Setting[Seq[S]] = (new RichSettingScoped(scope, key, fillThis)) ~= (_ ++ values )
}
final class RichListTask[S](scope: Scope, key: AttributeKey[Task[Seq[S]]])
final class RichListTask[S](scope: Scope, key: AttributeKey[Task[Seq[S]]], val fillThis: Boolean)
{
def += (value: => S): Setting[Task[Seq[S]]] = ++=(value :: Nil)
def ++=(values: => Seq[S]): Setting[Task[Seq[S]]] = (new RichTaskScoped(scope, key)) ~= (_ ++ values )
def ++=(values: => Seq[S]): Setting[Task[Seq[S]]] = (new RichTaskScoped(scope, key, fillThis)) ~= (_ ++ values )
}
sealed abstract class RichBaseScoped[S]
{
def scope: Scope
def key: AttributeKey[S]
def fillThis: Boolean
protected final val scoped = ScopedKey(scope, key)
protected final val scopedList = scoped :^: KNil
final def :==(value: S): Setting[S] = :=(value)
final def :==(value: SettingKey[S]): Setting[S] = <<=(value(identity))
@ -104,17 +104,17 @@ object Scoped
final def ~= (f: S => S): Setting[S] = Project.update(scoped)(f)
final def <<= (app: Apply[S]): Setting[S] = app toSetting scoped
final def apply[T](f: S => T): Apply[T] = Apply.mk(scopedList)(hl => f(hl.head))
final def apply[T](f: S => T): Apply[T] = Apply.single(scoped, fillThis)(f)
final def get(settings: Settings[Scope]): Option[S] = settings.get(scope, key)
}
final class RichInputScoped[T](val scope: Scope, val key: AttributeKey[InputTask[T]]) extends RichBaseScoped[InputTask[T]]
final class RichSettingScoped[S](val scope: Scope, val key: AttributeKey[S]) extends RichBaseScoped[S]
final class RichInputScoped[T](val scope: Scope, val key: AttributeKey[InputTask[T]]) extends RichBaseScoped[InputTask[T]] { def fillThis = false }
final class RichSettingScoped[S](val scope: Scope, val key: AttributeKey[S], val fillThis: Boolean) extends RichBaseScoped[S]
{
def map[T](f: S => T): Apply[Task[T]] = flatMap(s => task(f(s)) )
def flatMap[T](f: S => Task[T]): Apply[Task[T]] = Apply.mk(scopedList)(hl => f(hl.head))
def flatMap[T](f: S => Task[T]): Apply[Task[T]] = Apply.single(scoped, fillThis)(f)
}
final class RichTaskScoped[S](scope: Scope, key: AttributeKey[Task[S]])
final class RichTaskScoped[S](scope: Scope, key: AttributeKey[Task[S]], fillThis: Boolean)
{
type ScS = Setting[Task[S]]
def :==(value: S): ScS = :=(value)
@ -126,12 +126,12 @@ object Scoped
def <<= (app: App[S]): ScS = app toSetting scoped
def setting: ScopedSetting[Task[S]] = scopedSetting(scope, key)
def setting: ScopedSetting[Task[S]] = scopedSetting(scope, key, fillThis)
def get(settings: Settings[Scope]): Option[Task[S]] = settings.get(scope, key)
type App[T] = Apply[Task[T]]
private[this] def scoped = ScopedKey(scope, key)
private[this] def mk[T](onTask: Task[S] => Task[T]): App[T] = Apply.mk(scoped :^: KNil)(hl => onTask(hl.head))
private[this] def mk[T](onTask: Task[S] => Task[T]): App[T] = Apply.single(scoped, fillThis)(onTask)
def flatMapR[T](f: Result[S] => Task[T]): App[T] = mk(_ flatMapR f)
def flatMap[T](f: S => Task[T]): App[T] = flatMapR(f compose successM)
@ -147,7 +147,7 @@ object Scoped
def dependsOn(tasks: ScopedTask[_]*): App[S] =
{
val in = KCons(scopedTask(scope, key), KList.fromList(tasks))
val in = KCons(scopedTask(scope, key, fillThis), KList.fromList(tasks))
Apply.tasks(in) { kl =>
val KCons(h,t) = KList.kcons(kl)
h.dependsOn(t.toList :_*)
@ -187,7 +187,7 @@ object Scoped
key match
{
case ss: ScopedSetting[H] => prependSetting(ss)
case st: ScopedTask[H] => prependTask(scopedSetting(st.scope, st.key))
case st: ScopedTask[H] => prependTask(scopedSetting(st.scope, st.key, st.fillThis))
}
def combine[D[_],S](c: Combine[D], f: Results[HLv] => D[S]): Apply[Task[S]] =
@ -317,18 +317,22 @@ object Scoped
object Apply
{
def mk[H <: HList, T](in: KList[ScopedKey, H])(f: H => T): Apply[T] =
new Apply[T](scoped => Project.app(scoped, in)(f) )
def single[I,T](in: ScopedKey[I], fillThis: Boolean)(f: I => T): Apply[T] =
mk(scoped => fill(scoped, in, fillThis) :^: KNil)(hl => f(hl.head))
private[this] def mk[H <: HList, T](in: ScopedKey[_] => KList[ScopedKey, H])(f: H => T): Apply[T] =
new Apply[T](scoped => Project.app(scoped, in(scoped))(f) )
def apply[HL <: HList, T](in: KList[ScopedSetting, HL])(f: HL => T): Apply[T] = mk(in transform ssToSK)(f)
def apply[HL <: HList, T](in: KList[ScopedSetting, HL])(f: HL => T): Apply[T] = mk(scoped => in transform ssToSK(scoped))(f)
def tasks[HL <: HList, T](in: KList[ScopedTask, HL])(f: KList[Task, HL] => T): Apply[T] =
{
val kapp = new Project.KApp[HL, Task, T]
new Apply[T](scoped => kapp(scoped, in transform stToSK)(f) )
new Apply[T](scoped => kapp(scoped, in transform stToSK(scoped))(f) )
}
private val ssToSK = new (ScopedSetting ~> ScopedKey) { def apply[T](sk: ScopedSetting[T]) = new ScopedKey(sk.scope, sk.key) }
private val stToSK = new (ScopedTask ~> ScopedTaskKey) { def apply[T](st: ScopedTask[T]) = new ScopedKey(st.scope, st.key) }
private def fill[I](scoped: ScopedKey[_], in: ScopedKey[I], fillThis: Boolean): ScopedKey[I] =
if(fillThis) Project.resolveThis(Scope.fillTaskAxis(scoped.scope,scoped.key))(in) else in
private def ssToSK(scoped: ScopedKey[_]) = new (ScopedSetting ~> ScopedKey) { def apply[T](sk: ScopedSetting[T]) = fill(scoped, new ScopedKey(sk.scope, sk.key), sk.fillThis) }
private def stToSK(scoped: ScopedKey[_]) = new (ScopedTask ~> ScopedTaskKey) { def apply[T](st: ScopedTask[T]) = fill(scoped, new ScopedKey(st.scope, st.key), st.fillThis) }
private type ScopedTaskKey[T] = ScopedKey[Task[T]]
}
@ -370,17 +374,17 @@ object InputKey
}
object TaskKey
{
def apply[T](label: String): TaskKey[T] =
apply( AttributeKey[Task[T]](label) )
def apply[T](label: String, fillThis: Boolean = false): TaskKey[T] =
apply( AttributeKey[Task[T]](label), fillThis )
def apply[T](akey: AttributeKey[Task[T]]): TaskKey[T] =
new TaskKey[T](akey)
def apply[T](akey: AttributeKey[Task[T]], fillThis: Boolean): TaskKey[T] =
new TaskKey[T](akey, fillThis)
}
object SettingKey
{
def apply[T](label: String): SettingKey[T] =
apply( AttributeKey[T](label) )
def apply[T](label: String, fillThis: Boolean = false): SettingKey[T] =
apply( AttributeKey[T](label), fillThis )
def apply[T](akey: AttributeKey[T]): SettingKey[T] =
new SettingKey[T](akey)
def apply[T](akey: AttributeKey[T], fillThis: Boolean): SettingKey[T] =
new SettingKey[T](akey, fillThis)
}

View File

@ -38,9 +38,9 @@ private[sbt] sealed trait ManagedStreams[Key] extends TaskStreams[Key]
def close()
}
sealed trait Streams[Key]
trait Streams[Key]
{
private[sbt] def apply(a: Key): ManagedStreams[Key]
def apply(a: Key): ManagedStreams[Key]
def use[T](key: Key)(f: TaskStreams[Key] => T): T =
{
val s = apply(key)
@ -48,9 +48,20 @@ sealed trait Streams[Key]
try { f(s) } finally { s.close() }
}
}
trait CloseableStreams[Key] extends Streams[Key] with java.io.Closeable
object Streams
{
private[this] val closeQuietly = (c: Closeable) => try { c.close() } catch { case _: IOException => () }
def closeable[Key](delegate: Streams[Key]): CloseableStreams[Key] = new CloseableStreams[Key] {
private[this] val streams = new collection.mutable.HashMap[Key,ManagedStreams[Key]]
def apply(key: Key): ManagedStreams[Key] =
synchronized { streams.getOrElseUpdate(key, delegate(key)) }
def close(): Unit =
synchronized { streams.values.foreach(_.close() ); streams.clear() }
}
def apply[Key](taskDirectory: Key => File, name: Key => String, mkLogger: (Key, PrintWriter) => Logger): Streams[Key] = new Streams[Key] {

View File

@ -34,60 +34,15 @@ object System
new (Task ~> Task) {
def apply[T](in: Task[T]): Task[T] = map(in).getOrElse(in)
}
/** Creates a natural transformation that replaces occurrences of 'a' with 'b'.
* Only valid for M invariant in its type parameter. */
def replace[M[_] <: AnyRef, A](a: M[A], b: M[A]) = new (M ~> M) {
def apply[T](t: M[T]): M[T] =
if(t eq a) b.asInstanceOf[M[T]] else t
}
/** Returns the inputs to the Task that do not have their value discarded.
* For example, this does not include the inputs of MapFailure or the dependencies of DependsOn. */
def usedInputs(t: Action[_]): Seq[Task[_]] = t match {
case m: Mapped[_,_] => m.in.toList
case m: FlatMapped[_,_] => m.in.toList
case j: Join[_,_] => j.in
case _ => Nil
}
def streamed[Key](streams: Streams[Key], dummy: Task[TaskStreams[Key]], key: Task[_] => Key): Task ~> Task =
new (Task ~> Task) {
def apply[T](t: Task[T]): Task[T] = if(usedInputs(t.work) contains dummy) substitute(t) else t
def substitute[T](t: Task[T]): Task[T] =
{
val inStreams = streams(key(t))
val streamsTask = fromDummy(dummy){ inStreams.open(); inStreams }
val depMap = replace( dummy, streamsTask )
def wrap0[A,B](f: A => B) = (a: A) => try { f(a) } finally { inStreams.close() }
def fin(k: Task[T]): Task[T] = {
import TaskExtra._
k andFinally { inStreams.close() }
}
def newWork(a: Action[T]): Task[T] = t.copy(work = a)
t.work match {
case m: Mapped[_,_] => newWork( m.copy(in = m.in transform depMap, f = wrap0(m.f) ) ) // the Streams instance is valid only within the mapping function
case fm: FlatMapped[_,_] => fin(newWork( fm.copy(in = fm.in transform depMap) )) // the Streams instance is valid until a result is produced for the task
case j: Join[_,u] => newWork( j.copy(j.in map depMap.fn[u], f = wrap0(j.f)) )
case _ => t // can't get a TaskStreams value from the other types, so no need to transform here (shouldn't get here anyway because of usedInputs check)
}
}
}
}
object Transform
{
final class Dummies[HL <: HList, Key](val direct: KList[Task, HL], val streams: Task[TaskStreams[Key]])
final class Injected[HL <: HList, Key](val direct: HL, val streams: Streams[Key])
final class Dummies[HL <: HList](val direct: KList[Task, HL])
def apply[HL <: HList, Key](dummies: Dummies[HL, Key], injected: Injected[HL, Key])(implicit getKey: Task[_] => Key) =
def apply[HL <: HList, Key](dummies: KList[Task, HL], injected: HL) =
{
import System._
val inputs = dummyMap(dummies.direct)(injected.direct)
Convert.taskToNode streamed(injected.streams, dummies.streams, getKey) getOrId(inputs)
Convert.taskToNode getOrId(dummyMap(dummies)(injected))
}
}
object Convert

View File

@ -64,10 +64,11 @@ trait Init[Scope]
def getValue[T](s: Settings[Scope], k: ScopedKey[T]) = s.get(k.scope, k.key).get
def asFunction[T](s: Settings[Scope]): ScopedKey[T] => T = k => getValue(s, k)
def make(init: Seq[Setting[_]])(implicit delegates: Scope => Seq[Scope]): Settings[Scope] =
def make(init: Seq[Setting[_]])(implicit delegates: Scope => Seq[Scope], scopeLocal: ScopedKey[_] => Seq[Setting[_]]): Settings[Scope] =
{
val withLocal = addLocal(init)(scopeLocal)
// group by Scope/Key, dropping dead initializations
val sMap: ScopedMap = grouped(init)
val sMap: ScopedMap = grouped(withLocal)
// delegate references to undefined values according to 'delegates'
val dMap: ScopedMap = delegate(sMap)(delegates)
// merge Seq[Setting[_]] into Compiled
@ -95,6 +96,9 @@ trait Init[Scope]
def append[T](ss: Seq[Setting[T]], s: Setting[T]): Seq[Setting[T]] =
if(s.definitive) s :: Nil else ss :+ s
def addLocal(init: Seq[Setting[_]])(implicit scopeLocal: ScopedKey[_] => Seq[Setting[_]]): Seq[Setting[_]] =
init.flatMap( _.dependsOn flatMap scopeLocal ) ++ init
def delegate(sMap: ScopedMap)(implicit delegates: Scope => Seq[Scope]): ScopedMap =
{