From 75a784d5ec87c170eb1d24a486400c8888a5797d Mon Sep 17 00:00:00 2001 From: Mark Harrah Date: Sun, 30 May 2010 21:14:18 -0400 Subject: [PATCH] MList covariant, initial Node --- tasks/Incomplete.scala | 3 + tasks/NOTICE | 2 +- tasks/Node.scala | 44 +++++ tasks/ParallelRunner.scala | 190 ---------------------- tasks/TList.scala | 37 ----- tasks/Task.scala | 119 -------------- tasks/TaskListener.scala | 33 ---- tasks/TaskRunner.scala | 21 --- tasks/TaskScheduler.scala | 239 ---------------------------- util/collection/TypeFunctions.scala | 1 + 10 files changed, 49 insertions(+), 640 deletions(-) create mode 100644 tasks/Incomplete.scala create mode 100644 tasks/Node.scala delete mode 100644 tasks/ParallelRunner.scala delete mode 100644 tasks/TList.scala delete mode 100644 tasks/Task.scala delete mode 100644 tasks/TaskListener.scala delete mode 100644 tasks/TaskRunner.scala delete mode 100644 tasks/TaskScheduler.scala diff --git a/tasks/Incomplete.scala b/tasks/Incomplete.scala new file mode 100644 index 000000000..c3bef01ea --- /dev/null +++ b/tasks/Incomplete.scala @@ -0,0 +1,3 @@ +package sbt + +trait Incomplete \ No newline at end of file diff --git a/tasks/NOTICE b/tasks/NOTICE index 88dc12ea6..640aae060 100644 --- a/tasks/NOTICE +++ b/tasks/NOTICE @@ -1,3 +1,3 @@ Simple Build Tool: Task Engine Component -Copyright 2009 Mark Harrah +Copyright 2009, 2010 Mark Harrah Licensed under BSD-style license (see LICENSE) \ No newline at end of file diff --git a/tasks/Node.scala b/tasks/Node.scala new file mode 100644 index 000000000..23f47ab85 --- /dev/null +++ b/tasks/Node.scala @@ -0,0 +1,44 @@ +package sbt + +import Node._ +import Types._ + +trait Node[A[_], T] +{ + type Inputs <: MList[Id] + type Results <: Inputs#Map[Result] + + def inputs: Inputs#Map[A] + def unitDependencies: Iterable[A[_]] + + def work(results: Results, units: Iterable[Incomplete]): Either[A[T], T] +} +object Node +{ + type Result[T] = Either[Throwable, T] + + def pure[T](f: => T): PureNode[T]= map[Id, T, MNil](MNil, Nil)((_,_) => f) + + def map[A[_], T, Inputs0 <: MList[Id]](inputs0: Inputs0#Map[A], deps0: Iterable[A[_]])(work0: (Inputs0#Map[Result], Iterable[Incomplete]) => T): + Node[A,T] { type Inputs = Inputs0; type Results = Inputs0#Map[Result] } = + new Node[A,T] { + type Inputs = Inputs0 + type Results = Inputs0#Map[Result] + def inputs = inputs0 + def unitDependencies = deps0 + def work(results: Results, units: Iterable[Incomplete]) = Right(work0(results, units)) + } + + type PureNode[T] = Node[Id, T] { type Inputs = MNil; type Results = MNil } + type WorkResult[T] = Either[Id[T], T] + val pureResults = new ~>[PureNode, WorkResult] { def apply[T](t: PureNode[T] ) = t.work( MNil, Nil ) } +} +object Test +{ + val a = pure(3) + val b = pure(true) + val c = pure("asdf") + val i3 = a :^: b :^: c :^: MNil + + val d = map[PureNode, String, i3.Map[Id]](i3, Nil) { case (a :^: b :^: c :^: MNil, Seq()) => a + " " + b + " " + c } +} \ No newline at end of file diff --git a/tasks/ParallelRunner.scala b/tasks/ParallelRunner.scala deleted file mode 100644 index beeec76fe..000000000 --- a/tasks/ParallelRunner.scala +++ /dev/null @@ -1,190 +0,0 @@ -/* sbt -- Simple Build Tool - * Copyright 2009 Mark Harrah - */ -package xsbt - -/** This file provides the parallel execution engine of sbt. It is a fairly general module, with pluggable Schedulers and Strategies. -* -* There are three main componenets to the engine: Distributors, Schedulers, and Strategies. -* -* A Scheduler provides work that is ready to execute. -* -* A Strategy is used by a Scheduler to select the work to process from the work that is ready. It is notified as work -* becomes ready. It is requested to select work to process from the work that is ready. -* -* A Distributor uses a Scheduler to obtain work up to the maximum work allowed to run at once. It runs each -* unit of work in its own Thread. It then returns the work and either the computed value or the error that occured. -* -* The Scheduler and Strategy are called from the main thread and therefore do not need to be thread-safe. -**/ - -import java.util.concurrent.LinkedBlockingQueue -import scala.collection.{immutable, mutable} -import immutable.TreeSet - -/** Processes work. */ -trait Compute[Work[_],Result[_]] { def apply[A](w: Work[A]): Result[A] } -/** Requests work from `scheduler` and processes it using `compute`. This class limits the amount of work processing at any given time -* to `workers`.*/ -final class Distributor[O,Work[_],Result[_]](val scheduler: Scheduler[O,Work,Result], compute: Compute[Work,Result], workers: Int) extends NotNull -{ - require(workers > 0) - final def run() = (new Run).run() - - private final class Run extends NotNull - { - import java.util.concurrent.LinkedBlockingQueue - private[this] val schedule = scheduler.run - /** The number of threads currently running. */ - private[this] var running = 0 - /** Pending notifications of completed work. */ - private[this] val complete = new LinkedBlockingQueue[Done[_]] - - private[Distributor] def run(): O = - { - def runImpl(): O = - { - next() - if(isIdle && !schedule.hasPending) // test if all work is complete - { - assume(schedule.isComplete, "Distributor idle and the scheduler indicated no work pending, but scheduler indicates it is not complete.") - schedule.result - } - else - { - waitForCompletedWork() // wait for some work to complete - runImpl() // continue - } - } - try { runImpl() } - finally { shutdown() } - } - private def shutdown(): Unit = all.foreach(_.work.put(None)) - // true if the maximum number of worker threads are currently running - private def atMaximum = running == workers - private def availableWorkers = workers - running - // true if no worker threads are currently running - private def isIdle = running == 0 - // process more work - private def next() - { - // if the maximum threads are being used, do nothing - // if all work is complete or the scheduler is waiting for current work to complete, do nothing - if(!atMaximum && schedule.hasPending) - { - val nextWork = schedule.next(availableWorkers) - val nextSize = nextWork.size - assume(nextSize <= availableWorkers, "Scheduler provided more work (" + nextSize + ") than allowed (" + availableWorkers + ")") - assume(nextSize > 0 || !isIdle, "Distributor idle and the scheduler indicated work pending, but provided no work.") - nextWork.foreach(work => process(work)) - } - } - // wait on the blocking queue `complete` until some work finishes and notify the scheduler - private def waitForCompletedWork() - { - assume(running > 0) - val done = complete.take() - running -= 1 - notifyScheduler(done) - } - private def notifyScheduler[T](done: Done[T]): Unit = schedule.complete(done.work, done.result) - private def process[T](work: Work[T]) - { - assume(running + 1 <= workers) - running += 1 - available.take().work.put(Some(work)) - } - private[this] val all = List.tabulate(workers, i => new Worker) - private[this] val available = - { - val q = new LinkedBlockingQueue[Worker] - all.foreach(q.put) - q - } - private final class Worker extends Thread with NotNull - { - lazy val work = - { - start() - new LinkedBlockingQueue[Option[Work[_]]] - } - override def run() - { - def processData[T](data: Work[T]) - { - val result = ErrorHandling.wideConvert(compute(data)) - complete.put( new Done(result, data) ) - } - def runImpl() - { - work.take() match - { - case Some(data) => - processData(data) - available.put(this) - runImpl() - case None => () - } - } - try { runImpl() } - catch { case e: InterruptedException => () } - } - } - } - private final class Done[T](val result: Either[Throwable, Result[T]], val work: Work[T]) extends NotNull -} -/** Schedules work of type Work that produces results of type Result. A Scheduler determines what work is ready to be processed. -* A Scheduler is itself immutable. It creates a mutable object for each scheduler run.*/ -trait Scheduler[O,Work[_],Result[_]] extends NotNull -{ - /** Starts a new run. The returned object is a new Run, representing a single scheduler run. All state for the run - * is encapsulated in this object.*/ - def run: Run - trait Run extends NotNull - { - /** Notifies this scheduler that work has completed with the given result.*/ - def complete[A](d: Work[A], result: Either[Throwable,Result[A]]): Unit - /** Returns true if there is any more work to be done, although remaining work can be blocked - * waiting for currently running work to complete.*/ - def hasPending: Boolean - /**Returns true if this scheduler has no more work to be done, ever.*/ - def isComplete: Boolean - /** Returns up to 'max' units of work. `max` is always positive. The returned sequence cannot be empty if there is - * no work currently being processed.*/ - def next(max: Int): Seq[Work[_]] - /** The final result after all work has completed. */ - def result: O - } -} -/** A Strategy selects the work to process from work that is ready to be processed.*/ -trait ScheduleStrategy[D] extends NotNull -{ - /** Starts a new run. The returned object is a new Run, representing a single strategy run. All state for the run - * is handled through this object and is encapsulated in this object.*/ - def run: Run - trait Run extends NotNull - { - /** Adds the given work to the list of work that is ready to run.*/ - def workReady(dep: D): Unit - /** Returns true if there is work ready to be run. */ - def hasReady: Boolean - /** Provides up to `max` units of work. `max` is always positive and this method is not called - * if hasReady is false. The returned list cannot be empty is there is work ready to be run.*/ - def next(max: Int): List[D] - } -} -final class SimpleStrategy[D] extends ScheduleStrategy[D] -{ - def run = new Run - { - private var ready = List[D]() - def workReady(dep: D) { ready ::= dep } - def hasReady = !ready.isEmpty - def next(max: Int): List[D] = - { - val ret = ready.take(max) - ready = ready.drop(max) - ret - } - } -} \ No newline at end of file diff --git a/tasks/TList.scala b/tasks/TList.scala deleted file mode 100644 index 58cecfeb2..000000000 --- a/tasks/TList.scala +++ /dev/null @@ -1,37 +0,0 @@ -package xsbt - -import Task.{bindTask, mapTask} - -sealed trait TList -{ - type Head - type Tail <: TList - type HListType <: HList - private[xsbt] def tasks: List[Task[_]] - private[xsbt] def get(results: Results): HListType -} -sealed class TNil extends TList -{ - type Head = Nothing - type Tail = TNil - type HListType = HNil - def ::[A](t: Task[A]) = TCons[A,HNil,TNil](t, this) - private[xsbt] def tasks = Nil - private[xsbt] def get(results: Results) = HNil -} -final case class TCons[H, HL <: HList, T <: TList { type HListType = HL}](head: Task[H], tail: T) extends TList -{ - type Head = H - type Tail = T - type HListType = HCons[H,HL] - type This = TCons[H, HL, T] - def ::[A](t: Task[A]) = TCons[A,HListType,This](t, this) - private[xsbt] def tasks = head :: tail.tasks - private[xsbt] def get(results: Results) = HCons(results(head), tail.get(results)) - private def getF = get _ - - def map[X](f: HListType => X): Task[X] = mapTask(tasks: _*)(f compose getF) - def bind[X](f: HListType => Result[X]): Task[X] = bindTask(tasks: _*)(f compose getF) - def join: Task[HListType] = map(identity[HListType]) -} -object TNil extends TNil \ No newline at end of file diff --git a/tasks/Task.scala b/tasks/Task.scala deleted file mode 100644 index 1b9c2a344..000000000 --- a/tasks/Task.scala +++ /dev/null @@ -1,119 +0,0 @@ -package xsbt - -import Task.{mapTask,bindTask} -import scala.collection.{mutable,immutable} - -sealed trait Result[O] extends NotNull -final case class Value[O](t: O) extends Result[O] -sealed abstract class Task[O] extends Identity with Result[O] -{ - def dependencies: TreeHashSet[Task[_]] // IMPORTANT!! immutable.HashSet is NOT suitable. It has issues with multi-threaded access - def map[N](f: O => N): Task[N] - def bind[N](f: O => Result[N]): Task[N] - def dependsOn(addDependencies: Task[_]*): Task[O] - def named(s: String): Task[O] -} -private final class M[O,R <: Result[O]](name: Option[String])(val dependencies: TreeHashSet[Task[_]])(val compute: Results => R) extends Task[O] -{ - def this(dependencies: Task[_]*)(compute: Results => R) = - this(None)(TreeHashSet(dependencies: _*))(compute) - - final def dependsOn(addDependencies: Task[_]*) = new M(name)(dependencies ++ addDependencies)(compute) - final def map[N](f: O => N) = mapTask(this)(f compose get) - final def bind[N](f: O => Result[N]) = bindTask(this)(f compose get) - final def named(s: String) = - name match - { - case Some(n) => error("Cannot rename task already named '" + n + "'. (Tried to rename it to '" + s + "')") - case None => new M(Some(s))(dependencies)(compute) - } - final override def toString = "Task " + name.getOrElse("") - - private def get: (Results => O) = _(this) -} -abstract class Identity extends NotNull -{ - final override def equals(o: Any) = o match { case a: AnyRef => this eq a; case _ => false } - final override def hashCode = System.identityHashCode(this) -} - -private trait Results extends NotNull -{ - def apply[O](task: Task[O]): O - def contains(task: Task[_]): Boolean -} - - -object Task -{ - val empty = Task(()) - - import Function.tupled - def apply[O](o: => O): Task[O] = mapTask()( _ => o ) - def mapTask[O](dependencies: Task[_]*)(compute: Results => O): Task[O] = - bindTask(dependencies : _*)(in => Value(compute(in))) - def bindTask[O](dependencies: Task[_]*)(compute: Results => Result[O]): Task[O] = - new M[O,Result[O]](dependencies : _*)(compute) - - private[xsbt] def compute[O](t: Task[O], results: Results): Result[O] = t match { case m: M[O,_] => m.compute(results) } - - implicit def iterableToForkBuilder[A](t: Iterable[A]): ForkBuilderIterable[A] = new ForkBuilderIterable(t) - final class ForkBuilderIterable[A] private[Task](a: Iterable[A]) extends NotNull - { - def fork[X](f: A => X): Iterable[Task[X]] = forkTasks(x => Task(f(x)) ) - def forkTasks[X](f: A => Task[X]): Iterable[Task[X]] = a.map(x => f(x)) - def reduce(f: (A,A) => A): Task[A] = fork(x => x) reduce(f) - } - - implicit def iterableToBuilder[O](t: Iterable[Task[O]]): BuilderIterable[O] = new BuilderIterable(t) - final class BuilderIterable[O] private[Task](a: Iterable[Task[O]]) extends NotNull - { - //def mapBind[X](f: O => Task[X]): Iterable[Task[XO]] = a.map(_.bind(f)) - def join: Task[Iterable[O]] = join(identity[O]) - def joinIgnore: Task[Unit] = join.map(_ => ()) - def join[X](f: O => X): Task[Iterable[X]] = mapTask(a.toSeq: _*)( r => a map (f compose r.apply[O]) ) - //def bindJoin[X](f: O => Task[X]): Task[Iterable[X]] = mapBind(f).join - def reduce(f: (O,O) => O): Task[O] = - { - def reduce2(list: List[Task[O]], accumulate: List[Task[O]]): List[Task[O]] = - list match - { - case Nil => accumulate - case x :: Nil => x :: accumulate - case xa :: xb :: tail => reduce2(tail, ( (xa, xb) map f ) :: accumulate ) - } - def reduce1(list: List[Task[O]]): Task[O] = - list match - { - case Nil => error("Empty list") - case x :: Nil => x - case _ => reduce1(reduce2(list, Nil)) - } - reduce1(a.toList) - } - } - - implicit def twoToBuilder[A,B](t: (Task[A], Task[B]) ): Builder2[A,B] = new Builder2(t._1,t._2) - final class Builder2[A,B] private[Task](a: Task[A], b: Task[B]) extends NotNull - { - private def compute[T](f: (A,B) => T) = (r: Results) => f(r(a), r(b)) - def map[X](f: (A,B) => X): Task[X] = mapTask(a,b)(compute(f)) - def bind[X](f: (A,B) => Result[X]): Task[X] = bindTask(a,b)(compute(f)) - } - - implicit def threeToBuilder[A,B,C](t: (Task[A], Task[B], Task[C])): Builder3[A,B,C] = new Builder3(t._1,t._2,t._3) - final class Builder3[A,B,C] private[Task](a: Task[A], b: Task[B], c: Task[C]) extends NotNull - { - private def compute[T](f: (A,B,C) => T) = (r: Results) => f(r(a), r(b), r(c)) - def map[X](f: (A,B,C) => X): Task[X] = mapTask(a,b,c)(compute(f)) - def bind[X](f: (A,B,C) => Result[X]): Task[X] = bindTask(a,b,c)(compute(f)) - } - - implicit def fourToBuilder[A,B,C,D](t: (Task[A], Task[B], Task[C], Task[D])): Builder4[A,B,C,D] = new Builder4(t._1,t._2,t._3,t._4) - final class Builder4[A,B,C,D] private[Task](a: Task[A], b: Task[B], c: Task[C], d: Task[D]) extends NotNull - { - private def compute[T](f: (A,B,C,D) => T) = (r: Results) => f(r(a), r(b), r(c), r(d)) - def map[X](f: (A,B,C,D) => X): Task[X] = mapTask(a,b,c,d)( compute(f) ) - def bind[X](f: (A,B,C,D) => Result[X]): Task[X] = bindTask(a,b,c,d)( compute(f) ) - } -} diff --git a/tasks/TaskListener.scala b/tasks/TaskListener.scala deleted file mode 100644 index 0575fa508..000000000 --- a/tasks/TaskListener.scala +++ /dev/null @@ -1,33 +0,0 @@ -package xsbt - -trait TaskListener extends NotNull -{ - def added(t: Task[_]): Unit - def runnable(t: Task[_]): Unit - def running(t: Task[_]): Unit - def calling(caller: Task[_], t: Task[_]): Unit - def called(caller: Task[_], t: Task[_]): Unit - def completed[T](t: Task[T], value: Option[T]): Unit - def failed[T](t: Task[T], exception: Throwable): Unit -} -class BasicTaskListener extends TaskListener -{ - def added(t: Task[_]) {} - def runnable(t: Task[_]) {} - def running(t: Task[_]) {} - def calling(caller: Task[_], t: Task[_]) {} - def called(caller: Task[_], t: Task[_]) {} - def completed[T](t: Task[T], value: Option[T]) {} - def failed[T](t: Task[T], exception: Throwable) {} -} -class DebugTaskListener(debug: String => Unit) extends TaskListener -{ - def this() = this(println) - def added(t: Task[_]) { debug("Added " + t) } - def runnable(t: Task[_]) { debug("Runnable " + t)} - def running(t: Task[_]) { debug("Running " + t) } - def calling(caller: Task[_], t: Task[_]) { debug(caller + " calling " + t)} - def called(caller: Task[_], t: Task[_]) { debug(caller + " called " + t)} - def completed[T](t: Task[T], value: Option[T]) { debug("Completed " + t + " with " + value)} - def failed[T](t: Task[T], exception: Throwable) { debug("Failed " + t + " with " + exception.toString); exception.printStackTrace } -} \ No newline at end of file diff --git a/tasks/TaskRunner.scala b/tasks/TaskRunner.scala deleted file mode 100644 index 552e7fda1..000000000 --- a/tasks/TaskRunner.scala +++ /dev/null @@ -1,21 +0,0 @@ -package xsbt - -object TaskRunner -{ - def apply[T](node: Task[T]): T = apply(node, Runtime.getRuntime.availableProcessors) - /** Executes work for nodes in a directed acyclic graph with root node `node`. - * The maximum number of tasks to execute simultaneously is `maximumTasks`. */ - def apply[T](node: Task[T], maximumTasks: Int): T = - { - require(maximumTasks > 0) - val compute = new Compute[Work, Result] { def apply[A](w: Work[A]) = w.apply } - val strategy = new SimpleStrategy[Work[_]] - val scheduler = new TaskScheduler(node, strategy, new BasicTaskListener) - val distributor = new Distributor[ Either[ List[WorkFailure[Task[_]]], T ] , Work, Result](scheduler, compute, maximumTasks) - distributor.run().fold(failures => throw new TasksFailed(failures), identity[T]) - } -} -final case class TasksFailed(failures: List[WorkFailure[Task[_]]]) extends RuntimeException(failures.length + " tasks failed") -{ - override def toString = failures.mkString(getMessage + "\n", "\n\t", "\n") -} \ No newline at end of file diff --git a/tasks/TaskScheduler.scala b/tasks/TaskScheduler.scala deleted file mode 100644 index 1bef32616..000000000 --- a/tasks/TaskScheduler.scala +++ /dev/null @@ -1,239 +0,0 @@ -package xsbt - -import scala.collection.{immutable,mutable} - -final case class WorkFailure[D](work: D, exception: Throwable) extends NotNull -{ - def map[C](f: D => C) = WorkFailure(f(work), exception) -} -private final class TaskScheduler[O](root: Task[O], strategy: ScheduleStrategy[Work[_]], newListener: => TaskListener) - extends Scheduler[ Either[ List[WorkFailure[Task[_]]], O ], Work, Result] -{ - def run = new Run - { - val listener = newListener - def result = - { - assume(reverseDeps.isEmpty) - assume(forwardDeps.isEmpty) - assume(calls.isEmpty) - assume(!strategyRun.hasReady) - if(failureReports.isEmpty) - Right(completed(root)) - else - Left(failureReports.toList) - } - def next(max: Int) = - { - val running = strategyRun.next(max) - running.foreach(r => listener.running(r.source)) - running - } - def isComplete = reverseDeps.isEmpty - def hasPending = strategyRun.hasReady || !forwardDeps.isEmpty - def complete[A](work: Work[A], result: Either[Throwable,Result[A]]): Unit = - { - val task = work.source - result match - { - case Left(err) => - failureReports += WorkFailure(task, err) - listener.failed(task, err) - retire(task, None) - assert(failed.contains(task)) - case Right(value) => - success(task, value) - assert(completed.contains(task) || (calls.isCalling(task) && !reverseDeps.isEmpty) || failed.contains(task)) - } - assert(calls.isCalling(task) || !reverseDeps.contains(task)) - assert(!forwardDeps.contains(task)) - } - - private def newDepMap = new mutable.HashMap[Task[_], mutable.Set[Task[_]]] - private val reverseDeps = newDepMap - private val forwardDeps = newDepMap - private val calls = new CalledByMap - private val completed = new ResultMap - private val strategyRun = strategy.run - private val failed = new mutable.HashSet[Task[_]] - private val failureReports = new mutable.ArrayBuffer[WorkFailure[Task[_]]] - - { - val initialized = addGraph(root, root) // TODO: replace second root with something better? (it is ignored here anyway) - assert(initialized) - } - - private def addReady[O](m: Task[O]) - { - assert(!forwardDeps.contains(m), m) - assert(reverseDeps.contains(m), m) - assert(!completed.contains(m), m) - assert(!calls.isCalling(m), m) - assert(m.dependencies.forall(completed.contains), "Could not find result for dependency of ready task " + m) - - strategyRun.workReady(new Work(m, completed)) - listener.runnable(m) - } - // context called node - private def addGraph(node: Task[_], context: Task[_]): Boolean = - { - if(failed(node)) // node already failed - false - else if(calls.isCalling(node)) // node is waiting for a called task to complete, so we need to check for circular dependencies - { - if(calls.isCallerOf(node, context)) // if node called context, this is a circular dependency and is invalid - { - failureReports += WorkFailure(node, CircularDependency(node, context)) - false - } - else - true - } - else if(reverseDeps.contains(node) || completed.contains(node)) // node is either already added and is waiting for dependencies to complete or it has completed - true - else // node has never been added - newAdd(node, context) - } - private def newAdd(node: Task[_], context: Task[_]): Boolean = - { - val deps = node.dependencies.filter(dep => !completed.contains(dep)) - def finishAdding() = - { - listener.added(node) - true - } - if(deps.isEmpty) // node is ready to be run - { - reverseDeps(node) = new mutable.HashSet[Task[_]] - addReady(node) - finishAdding() - } - else if(deps.forall(dep => addGraph(dep,context))) // node requires dependencies to be added successfully and will then wait for them to complete before running - { - for(dep <- node.dependencies if !(completed.contains(dep) || reverseDeps.contains(dep) || calls.isCalling(dep))) - error("Invalid dependency state: (completed=" + completed.contains(dep) + ", reverse=" + reverseDeps.contains(dep) + ", calling=" + calls.isCalling(dep) + ") for " + dep) - reverseDeps(node) = new mutable.HashSet[Task[_]] - deps.foreach(dep => reverseDeps(dep) += node) // mark this node as depending on its dependencies - forwardDeps(node) = mutable.HashSet(deps.toSeq : _*) - finishAdding() - } - else // a dependency could not be added, so this node will fail as well. - { - failed += node - false - } - } - private def retire[O](m: Task[O], value: Option[O]) - { - value match - { - case Some(v) => completed(m) = v // map the task to its value - case None => failed += m // mark the task as failed. complete has already recorded the error message for the original cause - } - updateCurrentGraph(m, value.isDefined) // update forward and reverse dependency maps and propagate the change to depending tasks - listener.completed(m, value) - calls.remove(m) match // unwind the call stack - { - case Some(c) => - listener.called(c, m) - retire(c, value) - case None => () - } - } - private def updateCurrentGraph[O](m: Task[O], success: Boolean) - { - if(!success) - { - // clear m from the forward dependency map - // for each dependency d of m, remove m from the set of tasks that depend on d - for(depSet <- forwardDeps.removeKey(m); dep <- depSet; reverseSet <- reverseDeps.get(dep)) - reverseSet -= m - } - // m is complete, so remove its entry from reverseDeps and update all tasks that depend on m - for(mReverseDeps <- reverseDeps.removeKey(m); dependsOnM <- mReverseDeps) - { - if(success) - { - val on = forwardDeps(dependsOnM) - on -= m // m has completed, so remove it from the set of tasks that must complete before 'on' can run - if(on.isEmpty) // m was the last dependency of on, so make it runnable - { - forwardDeps.removeKey(dependsOnM) - addReady(dependsOnM) - } - } - else // cancel dependsOnM because dependency (m) failed - retire(dependsOnM, None) - } - } - - private def success[O](task: Task[O], value: Result[O]): Unit = - value match - { - case t: Task[O] => - if(t eq task) - { - failureReports += WorkFailure(t, CircularDependency(t, task)) - retire(task, None) - } - else if(addGraph(t, task)) - { - if(completed.contains(t)) - retire(task, Some(completed(t))) - else - { - calls(t) = task - listener.calling(task, t) - } - } - else - retire(task, None) - case Value(v) => retire(task, Some(v)) - } - } -} -final case class CircularDependency(node: Task[_], context: Task[_]) - extends RuntimeException("Task " + context + " provided task " + node + " already in calling stack") - -private final class CalledByMap extends NotNull -{ - private[this] val calling = new mutable.HashSet[Task[_]] - private[this] val callMap = new mutable.HashMap[Task[_], Task[_]] - def update[O](called: Task[O], by: Task[O]) - { - calling += by - callMap(called) = by - } - final def isCallerOf(check: Task[_], frame: Task[_]): Boolean = - { - if(check eq frame) true - else - callMap.get(frame) match - { - case Some(nextFrame) => isCallerOf(check, nextFrame) - case None => false - } - } - def isEmpty = calling.isEmpty && callMap.isEmpty - def isCalled(task: Task[_]): Boolean = callMap.contains(task) - def isCalling(caller: Task[_]): Boolean = calling(caller) - def remove[O](called: Task[O]): Option[Task[O]] = - for(caller <- callMap.removeKey(called)) yield - { - calling -= caller - caller.asInstanceOf[Task[O]] - } -} -import java.util.concurrent.{ConcurrentHashMap => HashMap} -private final class ResultMap(private val map: HashMap[Task[_], Any]) extends Results -{ - def this() = this(new HashMap) - def update[O](task: Task[O], value: O) { map.put(task, value) } - def apply[O](task: Task[O]): O = map.get(task).asInstanceOf[O] - def contains(task: Task[_]) = map.containsKey(task) -} - -private final class Work[O](val source: Task[O], results: Results) extends Identity with NotNull -{ - final def apply = Task.compute(source, results) -} \ No newline at end of file diff --git a/util/collection/TypeFunctions.scala b/util/collection/TypeFunctions.scala index c07ecce77..9a6d96b14 100644 --- a/util/collection/TypeFunctions.scala +++ b/util/collection/TypeFunctions.scala @@ -4,6 +4,7 @@ trait TypeFunctions { type Id[X] = X trait Const[A] { type Apply[B] = A } + trait P1of2[M[_,_], A] { type Apply[B] = M[A,B] } trait Down[M[_]] { type Apply[B] = Id[M[B]] } trait ~>[A[_], B[_]]