MList covariant, initial Node

This commit is contained in:
Mark Harrah 2010-05-30 21:14:18 -04:00
parent 9a4cfa0037
commit 75a784d5ec
10 changed files with 49 additions and 640 deletions

3
tasks/Incomplete.scala Normal file
View File

@ -0,0 +1,3 @@
package sbt
trait Incomplete

View File

@ -1,3 +1,3 @@
Simple Build Tool: Task Engine Component
Copyright 2009 Mark Harrah
Copyright 2009, 2010 Mark Harrah
Licensed under BSD-style license (see LICENSE)

44
tasks/Node.scala Normal file
View File

@ -0,0 +1,44 @@
package sbt
import Node._
import Types._
trait Node[A[_], T]
{
type Inputs <: MList[Id]
type Results <: Inputs#Map[Result]
def inputs: Inputs#Map[A]
def unitDependencies: Iterable[A[_]]
def work(results: Results, units: Iterable[Incomplete]): Either[A[T], T]
}
object Node
{
type Result[T] = Either[Throwable, T]
def pure[T](f: => T): PureNode[T]= map[Id, T, MNil](MNil, Nil)((_,_) => f)
def map[A[_], T, Inputs0 <: MList[Id]](inputs0: Inputs0#Map[A], deps0: Iterable[A[_]])(work0: (Inputs0#Map[Result], Iterable[Incomplete]) => T):
Node[A,T] { type Inputs = Inputs0; type Results = Inputs0#Map[Result] } =
new Node[A,T] {
type Inputs = Inputs0
type Results = Inputs0#Map[Result]
def inputs = inputs0
def unitDependencies = deps0
def work(results: Results, units: Iterable[Incomplete]) = Right(work0(results, units))
}
type PureNode[T] = Node[Id, T] { type Inputs = MNil; type Results = MNil }
type WorkResult[T] = Either[Id[T], T]
val pureResults = new ~>[PureNode, WorkResult] { def apply[T](t: PureNode[T] ) = t.work( MNil, Nil ) }
}
object Test
{
val a = pure(3)
val b = pure(true)
val c = pure("asdf")
val i3 = a :^: b :^: c :^: MNil
val d = map[PureNode, String, i3.Map[Id]](i3, Nil) { case (a :^: b :^: c :^: MNil, Seq()) => a + " " + b + " " + c }
}

View File

@ -1,190 +0,0 @@
/* sbt -- Simple Build Tool
* Copyright 2009 Mark Harrah
*/
package xsbt
/** This file provides the parallel execution engine of sbt. It is a fairly general module, with pluggable Schedulers and Strategies.
*
* There are three main componenets to the engine: Distributors, Schedulers, and Strategies.
*
* A Scheduler provides work that is ready to execute.
*
* A Strategy is used by a Scheduler to select the work to process from the work that is ready. It is notified as work
* becomes ready. It is requested to select work to process from the work that is ready.
*
* A Distributor uses a Scheduler to obtain work up to the maximum work allowed to run at once. It runs each
* unit of work in its own Thread. It then returns the work and either the computed value or the error that occured.
*
* The Scheduler and Strategy are called from the main thread and therefore do not need to be thread-safe.
**/
import java.util.concurrent.LinkedBlockingQueue
import scala.collection.{immutable, mutable}
import immutable.TreeSet
/** Processes work. */
trait Compute[Work[_],Result[_]] { def apply[A](w: Work[A]): Result[A] }
/** Requests work from `scheduler` and processes it using `compute`. This class limits the amount of work processing at any given time
* to `workers`.*/
final class Distributor[O,Work[_],Result[_]](val scheduler: Scheduler[O,Work,Result], compute: Compute[Work,Result], workers: Int) extends NotNull
{
require(workers > 0)
final def run() = (new Run).run()
private final class Run extends NotNull
{
import java.util.concurrent.LinkedBlockingQueue
private[this] val schedule = scheduler.run
/** The number of threads currently running. */
private[this] var running = 0
/** Pending notifications of completed work. */
private[this] val complete = new LinkedBlockingQueue[Done[_]]
private[Distributor] def run(): O =
{
def runImpl(): O =
{
next()
if(isIdle && !schedule.hasPending) // test if all work is complete
{
assume(schedule.isComplete, "Distributor idle and the scheduler indicated no work pending, but scheduler indicates it is not complete.")
schedule.result
}
else
{
waitForCompletedWork() // wait for some work to complete
runImpl() // continue
}
}
try { runImpl() }
finally { shutdown() }
}
private def shutdown(): Unit = all.foreach(_.work.put(None))
// true if the maximum number of worker threads are currently running
private def atMaximum = running == workers
private def availableWorkers = workers - running
// true if no worker threads are currently running
private def isIdle = running == 0
// process more work
private def next()
{
// if the maximum threads are being used, do nothing
// if all work is complete or the scheduler is waiting for current work to complete, do nothing
if(!atMaximum && schedule.hasPending)
{
val nextWork = schedule.next(availableWorkers)
val nextSize = nextWork.size
assume(nextSize <= availableWorkers, "Scheduler provided more work (" + nextSize + ") than allowed (" + availableWorkers + ")")
assume(nextSize > 0 || !isIdle, "Distributor idle and the scheduler indicated work pending, but provided no work.")
nextWork.foreach(work => process(work))
}
}
// wait on the blocking queue `complete` until some work finishes and notify the scheduler
private def waitForCompletedWork()
{
assume(running > 0)
val done = complete.take()
running -= 1
notifyScheduler(done)
}
private def notifyScheduler[T](done: Done[T]): Unit = schedule.complete(done.work, done.result)
private def process[T](work: Work[T])
{
assume(running + 1 <= workers)
running += 1
available.take().work.put(Some(work))
}
private[this] val all = List.tabulate(workers, i => new Worker)
private[this] val available =
{
val q = new LinkedBlockingQueue[Worker]
all.foreach(q.put)
q
}
private final class Worker extends Thread with NotNull
{
lazy val work =
{
start()
new LinkedBlockingQueue[Option[Work[_]]]
}
override def run()
{
def processData[T](data: Work[T])
{
val result = ErrorHandling.wideConvert(compute(data))
complete.put( new Done(result, data) )
}
def runImpl()
{
work.take() match
{
case Some(data) =>
processData(data)
available.put(this)
runImpl()
case None => ()
}
}
try { runImpl() }
catch { case e: InterruptedException => () }
}
}
}
private final class Done[T](val result: Either[Throwable, Result[T]], val work: Work[T]) extends NotNull
}
/** Schedules work of type Work that produces results of type Result. A Scheduler determines what work is ready to be processed.
* A Scheduler is itself immutable. It creates a mutable object for each scheduler run.*/
trait Scheduler[O,Work[_],Result[_]] extends NotNull
{
/** Starts a new run. The returned object is a new Run, representing a single scheduler run. All state for the run
* is encapsulated in this object.*/
def run: Run
trait Run extends NotNull
{
/** Notifies this scheduler that work has completed with the given result.*/
def complete[A](d: Work[A], result: Either[Throwable,Result[A]]): Unit
/** Returns true if there is any more work to be done, although remaining work can be blocked
* waiting for currently running work to complete.*/
def hasPending: Boolean
/**Returns true if this scheduler has no more work to be done, ever.*/
def isComplete: Boolean
/** Returns up to 'max' units of work. `max` is always positive. The returned sequence cannot be empty if there is
* no work currently being processed.*/
def next(max: Int): Seq[Work[_]]
/** The final result after all work has completed. */
def result: O
}
}
/** A Strategy selects the work to process from work that is ready to be processed.*/
trait ScheduleStrategy[D] extends NotNull
{
/** Starts a new run. The returned object is a new Run, representing a single strategy run. All state for the run
* is handled through this object and is encapsulated in this object.*/
def run: Run
trait Run extends NotNull
{
/** Adds the given work to the list of work that is ready to run.*/
def workReady(dep: D): Unit
/** Returns true if there is work ready to be run. */
def hasReady: Boolean
/** Provides up to `max` units of work. `max` is always positive and this method is not called
* if hasReady is false. The returned list cannot be empty is there is work ready to be run.*/
def next(max: Int): List[D]
}
}
final class SimpleStrategy[D] extends ScheduleStrategy[D]
{
def run = new Run
{
private var ready = List[D]()
def workReady(dep: D) { ready ::= dep }
def hasReady = !ready.isEmpty
def next(max: Int): List[D] =
{
val ret = ready.take(max)
ready = ready.drop(max)
ret
}
}
}

View File

@ -1,37 +0,0 @@
package xsbt
import Task.{bindTask, mapTask}
sealed trait TList
{
type Head
type Tail <: TList
type HListType <: HList
private[xsbt] def tasks: List[Task[_]]
private[xsbt] def get(results: Results): HListType
}
sealed class TNil extends TList
{
type Head = Nothing
type Tail = TNil
type HListType = HNil
def ::[A](t: Task[A]) = TCons[A,HNil,TNil](t, this)
private[xsbt] def tasks = Nil
private[xsbt] def get(results: Results) = HNil
}
final case class TCons[H, HL <: HList, T <: TList { type HListType = HL}](head: Task[H], tail: T) extends TList
{
type Head = H
type Tail = T
type HListType = HCons[H,HL]
type This = TCons[H, HL, T]
def ::[A](t: Task[A]) = TCons[A,HListType,This](t, this)
private[xsbt] def tasks = head :: tail.tasks
private[xsbt] def get(results: Results) = HCons(results(head), tail.get(results))
private def getF = get _
def map[X](f: HListType => X): Task[X] = mapTask(tasks: _*)(f compose getF)
def bind[X](f: HListType => Result[X]): Task[X] = bindTask(tasks: _*)(f compose getF)
def join: Task[HListType] = map(identity[HListType])
}
object TNil extends TNil

View File

@ -1,119 +0,0 @@
package xsbt
import Task.{mapTask,bindTask}
import scala.collection.{mutable,immutable}
sealed trait Result[O] extends NotNull
final case class Value[O](t: O) extends Result[O]
sealed abstract class Task[O] extends Identity with Result[O]
{
def dependencies: TreeHashSet[Task[_]] // IMPORTANT!! immutable.HashSet is NOT suitable. It has issues with multi-threaded access
def map[N](f: O => N): Task[N]
def bind[N](f: O => Result[N]): Task[N]
def dependsOn(addDependencies: Task[_]*): Task[O]
def named(s: String): Task[O]
}
private final class M[O,R <: Result[O]](name: Option[String])(val dependencies: TreeHashSet[Task[_]])(val compute: Results => R) extends Task[O]
{
def this(dependencies: Task[_]*)(compute: Results => R) =
this(None)(TreeHashSet(dependencies: _*))(compute)
final def dependsOn(addDependencies: Task[_]*) = new M(name)(dependencies ++ addDependencies)(compute)
final def map[N](f: O => N) = mapTask(this)(f compose get)
final def bind[N](f: O => Result[N]) = bindTask(this)(f compose get)
final def named(s: String) =
name match
{
case Some(n) => error("Cannot rename task already named '" + n + "'. (Tried to rename it to '" + s + "')")
case None => new M(Some(s))(dependencies)(compute)
}
final override def toString = "Task " + name.getOrElse("<anon$" + hashCode.toHexString + ">")
private def get: (Results => O) = _(this)
}
abstract class Identity extends NotNull
{
final override def equals(o: Any) = o match { case a: AnyRef => this eq a; case _ => false }
final override def hashCode = System.identityHashCode(this)
}
private trait Results extends NotNull
{
def apply[O](task: Task[O]): O
def contains(task: Task[_]): Boolean
}
object Task
{
val empty = Task(())
import Function.tupled
def apply[O](o: => O): Task[O] = mapTask()( _ => o )
def mapTask[O](dependencies: Task[_]*)(compute: Results => O): Task[O] =
bindTask(dependencies : _*)(in => Value(compute(in)))
def bindTask[O](dependencies: Task[_]*)(compute: Results => Result[O]): Task[O] =
new M[O,Result[O]](dependencies : _*)(compute)
private[xsbt] def compute[O](t: Task[O], results: Results): Result[O] = t match { case m: M[O,_] => m.compute(results) }
implicit def iterableToForkBuilder[A](t: Iterable[A]): ForkBuilderIterable[A] = new ForkBuilderIterable(t)
final class ForkBuilderIterable[A] private[Task](a: Iterable[A]) extends NotNull
{
def fork[X](f: A => X): Iterable[Task[X]] = forkTasks(x => Task(f(x)) )
def forkTasks[X](f: A => Task[X]): Iterable[Task[X]] = a.map(x => f(x))
def reduce(f: (A,A) => A): Task[A] = fork(x => x) reduce(f)
}
implicit def iterableToBuilder[O](t: Iterable[Task[O]]): BuilderIterable[O] = new BuilderIterable(t)
final class BuilderIterable[O] private[Task](a: Iterable[Task[O]]) extends NotNull
{
//def mapBind[X](f: O => Task[X]): Iterable[Task[XO]] = a.map(_.bind(f))
def join: Task[Iterable[O]] = join(identity[O])
def joinIgnore: Task[Unit] = join.map(_ => ())
def join[X](f: O => X): Task[Iterable[X]] = mapTask(a.toSeq: _*)( r => a map (f compose r.apply[O]) )
//def bindJoin[X](f: O => Task[X]): Task[Iterable[X]] = mapBind(f).join
def reduce(f: (O,O) => O): Task[O] =
{
def reduce2(list: List[Task[O]], accumulate: List[Task[O]]): List[Task[O]] =
list match
{
case Nil => accumulate
case x :: Nil => x :: accumulate
case xa :: xb :: tail => reduce2(tail, ( (xa, xb) map f ) :: accumulate )
}
def reduce1(list: List[Task[O]]): Task[O] =
list match
{
case Nil => error("Empty list")
case x :: Nil => x
case _ => reduce1(reduce2(list, Nil))
}
reduce1(a.toList)
}
}
implicit def twoToBuilder[A,B](t: (Task[A], Task[B]) ): Builder2[A,B] = new Builder2(t._1,t._2)
final class Builder2[A,B] private[Task](a: Task[A], b: Task[B]) extends NotNull
{
private def compute[T](f: (A,B) => T) = (r: Results) => f(r(a), r(b))
def map[X](f: (A,B) => X): Task[X] = mapTask(a,b)(compute(f))
def bind[X](f: (A,B) => Result[X]): Task[X] = bindTask(a,b)(compute(f))
}
implicit def threeToBuilder[A,B,C](t: (Task[A], Task[B], Task[C])): Builder3[A,B,C] = new Builder3(t._1,t._2,t._3)
final class Builder3[A,B,C] private[Task](a: Task[A], b: Task[B], c: Task[C]) extends NotNull
{
private def compute[T](f: (A,B,C) => T) = (r: Results) => f(r(a), r(b), r(c))
def map[X](f: (A,B,C) => X): Task[X] = mapTask(a,b,c)(compute(f))
def bind[X](f: (A,B,C) => Result[X]): Task[X] = bindTask(a,b,c)(compute(f))
}
implicit def fourToBuilder[A,B,C,D](t: (Task[A], Task[B], Task[C], Task[D])): Builder4[A,B,C,D] = new Builder4(t._1,t._2,t._3,t._4)
final class Builder4[A,B,C,D] private[Task](a: Task[A], b: Task[B], c: Task[C], d: Task[D]) extends NotNull
{
private def compute[T](f: (A,B,C,D) => T) = (r: Results) => f(r(a), r(b), r(c), r(d))
def map[X](f: (A,B,C,D) => X): Task[X] = mapTask(a,b,c,d)( compute(f) )
def bind[X](f: (A,B,C,D) => Result[X]): Task[X] = bindTask(a,b,c,d)( compute(f) )
}
}

View File

@ -1,33 +0,0 @@
package xsbt
trait TaskListener extends NotNull
{
def added(t: Task[_]): Unit
def runnable(t: Task[_]): Unit
def running(t: Task[_]): Unit
def calling(caller: Task[_], t: Task[_]): Unit
def called(caller: Task[_], t: Task[_]): Unit
def completed[T](t: Task[T], value: Option[T]): Unit
def failed[T](t: Task[T], exception: Throwable): Unit
}
class BasicTaskListener extends TaskListener
{
def added(t: Task[_]) {}
def runnable(t: Task[_]) {}
def running(t: Task[_]) {}
def calling(caller: Task[_], t: Task[_]) {}
def called(caller: Task[_], t: Task[_]) {}
def completed[T](t: Task[T], value: Option[T]) {}
def failed[T](t: Task[T], exception: Throwable) {}
}
class DebugTaskListener(debug: String => Unit) extends TaskListener
{
def this() = this(println)
def added(t: Task[_]) { debug("Added " + t) }
def runnable(t: Task[_]) { debug("Runnable " + t)}
def running(t: Task[_]) { debug("Running " + t) }
def calling(caller: Task[_], t: Task[_]) { debug(caller + " calling " + t)}
def called(caller: Task[_], t: Task[_]) { debug(caller + " called " + t)}
def completed[T](t: Task[T], value: Option[T]) { debug("Completed " + t + " with " + value)}
def failed[T](t: Task[T], exception: Throwable) { debug("Failed " + t + " with " + exception.toString); exception.printStackTrace }
}

View File

@ -1,21 +0,0 @@
package xsbt
object TaskRunner
{
def apply[T](node: Task[T]): T = apply(node, Runtime.getRuntime.availableProcessors)
/** Executes work for nodes in a directed acyclic graph with root node `node`.
* The maximum number of tasks to execute simultaneously is `maximumTasks`. */
def apply[T](node: Task[T], maximumTasks: Int): T =
{
require(maximumTasks > 0)
val compute = new Compute[Work, Result] { def apply[A](w: Work[A]) = w.apply }
val strategy = new SimpleStrategy[Work[_]]
val scheduler = new TaskScheduler(node, strategy, new BasicTaskListener)
val distributor = new Distributor[ Either[ List[WorkFailure[Task[_]]], T ] , Work, Result](scheduler, compute, maximumTasks)
distributor.run().fold(failures => throw new TasksFailed(failures), identity[T])
}
}
final case class TasksFailed(failures: List[WorkFailure[Task[_]]]) extends RuntimeException(failures.length + " tasks failed")
{
override def toString = failures.mkString(getMessage + "\n", "\n\t", "\n")
}

View File

@ -1,239 +0,0 @@
package xsbt
import scala.collection.{immutable,mutable}
final case class WorkFailure[D](work: D, exception: Throwable) extends NotNull
{
def map[C](f: D => C) = WorkFailure(f(work), exception)
}
private final class TaskScheduler[O](root: Task[O], strategy: ScheduleStrategy[Work[_]], newListener: => TaskListener)
extends Scheduler[ Either[ List[WorkFailure[Task[_]]], O ], Work, Result]
{
def run = new Run
{
val listener = newListener
def result =
{
assume(reverseDeps.isEmpty)
assume(forwardDeps.isEmpty)
assume(calls.isEmpty)
assume(!strategyRun.hasReady)
if(failureReports.isEmpty)
Right(completed(root))
else
Left(failureReports.toList)
}
def next(max: Int) =
{
val running = strategyRun.next(max)
running.foreach(r => listener.running(r.source))
running
}
def isComplete = reverseDeps.isEmpty
def hasPending = strategyRun.hasReady || !forwardDeps.isEmpty
def complete[A](work: Work[A], result: Either[Throwable,Result[A]]): Unit =
{
val task = work.source
result match
{
case Left(err) =>
failureReports += WorkFailure(task, err)
listener.failed(task, err)
retire(task, None)
assert(failed.contains(task))
case Right(value) =>
success(task, value)
assert(completed.contains(task) || (calls.isCalling(task) && !reverseDeps.isEmpty) || failed.contains(task))
}
assert(calls.isCalling(task) || !reverseDeps.contains(task))
assert(!forwardDeps.contains(task))
}
private def newDepMap = new mutable.HashMap[Task[_], mutable.Set[Task[_]]]
private val reverseDeps = newDepMap
private val forwardDeps = newDepMap
private val calls = new CalledByMap
private val completed = new ResultMap
private val strategyRun = strategy.run
private val failed = new mutable.HashSet[Task[_]]
private val failureReports = new mutable.ArrayBuffer[WorkFailure[Task[_]]]
{
val initialized = addGraph(root, root) // TODO: replace second root with something better? (it is ignored here anyway)
assert(initialized)
}
private def addReady[O](m: Task[O])
{
assert(!forwardDeps.contains(m), m)
assert(reverseDeps.contains(m), m)
assert(!completed.contains(m), m)
assert(!calls.isCalling(m), m)
assert(m.dependencies.forall(completed.contains), "Could not find result for dependency of ready task " + m)
strategyRun.workReady(new Work(m, completed))
listener.runnable(m)
}
// context called node
private def addGraph(node: Task[_], context: Task[_]): Boolean =
{
if(failed(node)) // node already failed
false
else if(calls.isCalling(node)) // node is waiting for a called task to complete, so we need to check for circular dependencies
{
if(calls.isCallerOf(node, context)) // if node called context, this is a circular dependency and is invalid
{
failureReports += WorkFailure(node, CircularDependency(node, context))
false
}
else
true
}
else if(reverseDeps.contains(node) || completed.contains(node)) // node is either already added and is waiting for dependencies to complete or it has completed
true
else // node has never been added
newAdd(node, context)
}
private def newAdd(node: Task[_], context: Task[_]): Boolean =
{
val deps = node.dependencies.filter(dep => !completed.contains(dep))
def finishAdding() =
{
listener.added(node)
true
}
if(deps.isEmpty) // node is ready to be run
{
reverseDeps(node) = new mutable.HashSet[Task[_]]
addReady(node)
finishAdding()
}
else if(deps.forall(dep => addGraph(dep,context))) // node requires dependencies to be added successfully and will then wait for them to complete before running
{
for(dep <- node.dependencies if !(completed.contains(dep) || reverseDeps.contains(dep) || calls.isCalling(dep)))
error("Invalid dependency state: (completed=" + completed.contains(dep) + ", reverse=" + reverseDeps.contains(dep) + ", calling=" + calls.isCalling(dep) + ") for " + dep)
reverseDeps(node) = new mutable.HashSet[Task[_]]
deps.foreach(dep => reverseDeps(dep) += node) // mark this node as depending on its dependencies
forwardDeps(node) = mutable.HashSet(deps.toSeq : _*)
finishAdding()
}
else // a dependency could not be added, so this node will fail as well.
{
failed += node
false
}
}
private def retire[O](m: Task[O], value: Option[O])
{
value match
{
case Some(v) => completed(m) = v // map the task to its value
case None => failed += m // mark the task as failed. complete has already recorded the error message for the original cause
}
updateCurrentGraph(m, value.isDefined) // update forward and reverse dependency maps and propagate the change to depending tasks
listener.completed(m, value)
calls.remove(m) match // unwind the call stack
{
case Some(c) =>
listener.called(c, m)
retire(c, value)
case None => ()
}
}
private def updateCurrentGraph[O](m: Task[O], success: Boolean)
{
if(!success)
{
// clear m from the forward dependency map
// for each dependency d of m, remove m from the set of tasks that depend on d
for(depSet <- forwardDeps.removeKey(m); dep <- depSet; reverseSet <- reverseDeps.get(dep))
reverseSet -= m
}
// m is complete, so remove its entry from reverseDeps and update all tasks that depend on m
for(mReverseDeps <- reverseDeps.removeKey(m); dependsOnM <- mReverseDeps)
{
if(success)
{
val on = forwardDeps(dependsOnM)
on -= m // m has completed, so remove it from the set of tasks that must complete before 'on' can run
if(on.isEmpty) // m was the last dependency of on, so make it runnable
{
forwardDeps.removeKey(dependsOnM)
addReady(dependsOnM)
}
}
else // cancel dependsOnM because dependency (m) failed
retire(dependsOnM, None)
}
}
private def success[O](task: Task[O], value: Result[O]): Unit =
value match
{
case t: Task[O] =>
if(t eq task)
{
failureReports += WorkFailure(t, CircularDependency(t, task))
retire(task, None)
}
else if(addGraph(t, task))
{
if(completed.contains(t))
retire(task, Some(completed(t)))
else
{
calls(t) = task
listener.calling(task, t)
}
}
else
retire(task, None)
case Value(v) => retire(task, Some(v))
}
}
}
final case class CircularDependency(node: Task[_], context: Task[_])
extends RuntimeException("Task " + context + " provided task " + node + " already in calling stack")
private final class CalledByMap extends NotNull
{
private[this] val calling = new mutable.HashSet[Task[_]]
private[this] val callMap = new mutable.HashMap[Task[_], Task[_]]
def update[O](called: Task[O], by: Task[O])
{
calling += by
callMap(called) = by
}
final def isCallerOf(check: Task[_], frame: Task[_]): Boolean =
{
if(check eq frame) true
else
callMap.get(frame) match
{
case Some(nextFrame) => isCallerOf(check, nextFrame)
case None => false
}
}
def isEmpty = calling.isEmpty && callMap.isEmpty
def isCalled(task: Task[_]): Boolean = callMap.contains(task)
def isCalling(caller: Task[_]): Boolean = calling(caller)
def remove[O](called: Task[O]): Option[Task[O]] =
for(caller <- callMap.removeKey(called)) yield
{
calling -= caller
caller.asInstanceOf[Task[O]]
}
}
import java.util.concurrent.{ConcurrentHashMap => HashMap}
private final class ResultMap(private val map: HashMap[Task[_], Any]) extends Results
{
def this() = this(new HashMap)
def update[O](task: Task[O], value: O) { map.put(task, value) }
def apply[O](task: Task[O]): O = map.get(task).asInstanceOf[O]
def contains(task: Task[_]) = map.containsKey(task)
}
private final class Work[O](val source: Task[O], results: Results) extends Identity with NotNull
{
final def apply = Task.compute(source, results)
}

View File

@ -4,6 +4,7 @@ trait TypeFunctions
{
type Id[X] = X
trait Const[A] { type Apply[B] = A }
trait P1of2[M[_,_], A] { type Apply[B] = M[A,B] }
trait Down[M[_]] { type Apply[B] = Id[M[B]] }
trait ~>[A[_], B[_]]