Experimental task progress interface. Fixes #592.

Set sbt.task.timings=true to print timings for tasks.
This sample progress handler shows how to get names for tasks and
deal with flatMapped tasks.  There are still some tasks that make
it through as anonymous, which needs to be investigated.

A setting to provide a custom handler should come in a subsequent commit.
This commit is contained in:
Mark Harrah 2013-06-23 19:57:30 -04:00
parent ce1c8b0ebc
commit 1cc2f57e15
6 changed files with 139 additions and 21 deletions

View File

@ -12,6 +12,7 @@ package sbt
import Types.const
import scala.Console.RED
import std.Transform.{DummyTaskMap,TaskAndValue}
import TaskName._
final case class EvaluateConfig(cancelable: Boolean, restrictions: Seq[Tags.Rule], checkCycles: Boolean = false)
final case class PluginData(dependencyClasspath: Seq[Attributed[File]], definitionClasspath: Seq[Attributed[File]], resolvers: Option[Seq[Resolver]], report: Option[UpdateReport])
@ -162,7 +163,8 @@ object EvaluateTask
case _ => true
}
def run() = {
val x = Execute[Task]( Execute.config(config.checkCycles, overwriteNode), triggers)(taskToNode)
val progress = if(java.lang.Boolean.getBoolean("sbt.task.timings")) new TaskTimings else ExecuteProgress.empty[Task]
val x = new Execute[Task]( Execute.config(config.checkCycles, overwriteNode), triggers, progress)(taskToNode)
val (newState, result) =
try applyResults(x.runKeep(root)(service), state, root)
catch { case inc: Incomplete => (state, Inc(inc)) }
@ -211,8 +213,7 @@ object EvaluateTask
c.toString + (if(caller eq target) "(task: " + name(caller) + ")" else "(caller: " + name(caller) + ", target: " + name(target) + ")" )
case _ => c.toString
}
def name(node: Task[_]): String =
node.info.name orElse transformNode(node).map(displayFull) getOrElse ("<anon-" + System.identityHashCode(node).toHexString + ">")
def liftAnonymous: Incomplete => Incomplete = {
case i @ Incomplete(node, tpe, None, causes, None) =>
causes.find( inc => !inc.node.isDefined && (inc.message.isDefined || inc.directCause.isDefined)) match {
@ -221,8 +222,6 @@ object EvaluateTask
}
case i => i
}
def transformNode(node: Task[_]): Option[ScopedKey[_]] =
node.info.attributes get taskDefinitionKey
def processResult[T](result: Result[T], log: Logger, show: Boolean = false): T =
onResult(result, log) { v => if(show) println("Result: " + v); v }

View File

@ -0,0 +1,12 @@
package sbt
import Def.{displayFull, ScopedKey}
import Keys.taskDefinitionKey
private[sbt] object TaskName
{
def name(node: Task[_]): String = definedName(node) getOrElse anonymousName(node)
def definedName(node: Task[_]): Option[String] = node.info.name orElse transformNode(node).map(displayFull)
def anonymousName(node: Task[_]): String = "<anon-" + System.identityHashCode(node).toHexString + ">"
def transformNode(node: Task[_]): Option[ScopedKey[_]] = node.info.attributes get taskDefinitionKey
}

View File

@ -0,0 +1,39 @@
package sbt
import java.util.concurrent.ConcurrentHashMap
import TaskName._
private[sbt] final class TaskTimings extends ExecuteProgress[Task]
{
private[this] val calledBy = new ConcurrentHashMap[Task[_], Task[_]]
private[this] val anonOwners = new ConcurrentHashMap[Task[_], Task[_]]
private[this] val timings = new ConcurrentHashMap[Task[_], Long]
private[this] var start = 0L
type S = Unit
def initial = { start = System.nanoTime }
def registered(state: Unit, task: Task[_], allDeps: Iterable[Task[_]], pendingDeps: Iterable[Task[_]]) = {
pendingDeps foreach { t => if(transformNode(t).isEmpty) anonOwners.put(t,task) }
}
def ready(state: Unit, task: Task[_]) = ()
def workStarting(task: Task[_]) = timings.put(task, System.nanoTime)
def workFinished[T](task: Task[T], result: Either[Task[T], Result[T]]) = {
timings.put(task, System.nanoTime - timings.get(task))
result.left.foreach { t => calledBy.put(t, task) }
}
def completed[T](state: Unit, task: Task[T], result: Result[T]) = ()
def allCompleted(state: Unit, results: RMap[Task,Result]) =
{
val total = System.nanoTime - start
println("Total time: " + (total*1e-6) + " ms")
import collection.JavaConversions._
def sumTimes(in: Seq[(Task[_], Long)]) = in.map(_._2).sum
val timingsByName = timings.toSeq.groupBy { case (t, time) => mappedName(t) } mapValues(sumTimes)
for( (taskName, time) <- timingsByName.toSeq.sortBy(_._2).reverse)
println(" " + taskName + ": " + (time*1e-6) + " ms")
}
private[this] def inferredName(t: Task[_]): Option[String] = nameDelegate(t) map mappedName
private[this] def nameDelegate(t: Task[_]): Option[Task[_]] = Option(anonOwners.get(t)) orElse Option(calledBy.get(t))
private[this] def mappedName(t: Task[_]): String = definedName(t) orElse inferredName(t) getOrElse anonymousName(t)
}

View File

@ -11,7 +11,7 @@ import scala.annotation.tailrec
import scala.collection.{mutable, JavaConversions}
import mutable.Map
object Execute
private[sbt] object Execute
{
def idMap[A,B]: Map[A, B] = JavaConversions.mapAsScalaMap(new java.util.IdentityHashMap[A,B])
def pMap[A[_], B[_]]: PMap[A,B] = new DelegatingPMap[A, B](idMap)
@ -20,27 +20,21 @@ object Execute
}
def noTriggers[A[_]] = new Triggers[A](Map.empty, Map.empty, idFun)
def apply[A[_] <: AnyRef](config: Config, triggers: Triggers[A])(implicit view: NodeView[A]): Execute[A] =
new Execute(config, triggers)(view)
def config(checkCycles: Boolean, overwriteNode: Incomplete => Boolean = const(false)): Config = new Config(checkCycles, overwriteNode)
final class Config private[sbt](val checkCycles: Boolean, val overwriteNode: Incomplete => Boolean)
}
sealed trait Completed {
def process(): Unit
}
trait NodeView[A[_]]
private[sbt] trait NodeView[A[_]]
{
def apply[T](a: A[T]): Node[A, T]
def inline[T](a: A[T]): Option[() => T]
}
final class Triggers[A[_]](val runBefore: collection.Map[A[_], Seq[A[_]]], val injectFor: collection.Map[A[_], Seq[A[_]]], val onComplete: RMap[A,Result] => RMap[A,Result])
final class Execute[A[_] <: AnyRef] private(config: Config, triggers: Triggers[A])(implicit view: NodeView[A])
private[sbt] final class Execute[A[_] <: AnyRef](config: Config, triggers: Triggers[A], progress: ExecuteProgress[A])(implicit view: NodeView[A])
{
@deprecated("Use Execute.apply", "0.13.0")
def this(checkCycles: Boolean, triggers: Triggers[A])(implicit view: NodeView[A]) = this(Execute.config(checkCycles), triggers)(view)
type Strategy = CompletionService[A[_], Completed]
private[this] val forward = idMap[A[_], IDSet[A[_]] ]
@ -56,6 +50,7 @@ final class Execute[A[_] <: AnyRef] private(config: Config, triggers: Triggers[A
case None => results(a)
}
}
private[this] var progressState: progress.S = progress.initial
private[this] type State = State.Value
private[this] object State extends Enumeration {
@ -73,7 +68,9 @@ final class Execute[A[_] <: AnyRef] private(config: Config, triggers: Triggers[A
addNew(root)
processAll()
assert( results contains root, "No result for root node." )
triggers.onComplete(results)
val finalResults = triggers.onComplete(results)
progressState = progress.allCompleted(progressState, finalResults)
finalResults
}
def processAll()(implicit strategy: Strategy)
@ -136,6 +133,7 @@ final class Execute[A[_] <: AnyRef] private(config: Config, triggers: Triggers[A
results(node) = result
state(node) = Done
progressState = progress.completed(progressState, node, result)
remove( reverse, node ) foreach { dep => notifyDone(node, dep) }
callers.remove( node ).toList.flatten.foreach { c => retire(c, callerResult(c, result)) }
triggeredBy( node ) foreach { t => addChecked(t) }
@ -174,9 +172,9 @@ final class Execute[A[_] <: AnyRef] private(config: Config, triggers: Triggers[A
post { addedInv( node ) }
}
/** Adds a node that has not yet been registered with the system.
* If all of the node's dependencies have finished, the node's computation scheduled to run.
* If all of the node's dependencies have finished, the node's computation is scheduled to run.
* The node's dependencies will be added (transitively) if they are not already registered.
* */
*/
def addNew[T](node: A[T])(implicit strategy: Strategy)
{
pre { newPre(node) }
@ -184,6 +182,7 @@ final class Execute[A[_] <: AnyRef] private(config: Config, triggers: Triggers[A
val v = register( node )
val deps = dependencies(v) ++ runBefore(node)
val active = IDSet[A[_]](deps filter notDone )
progressState = progress.registered(progressState, node, deps, active.toList /** active is mutable, so take a snapshot */)
if( active.isEmpty)
ready( node )
@ -214,6 +213,7 @@ final class Execute[A[_] <: AnyRef] private(config: Config, triggers: Triggers[A
}
state(node) = Running
progressState = progress.ready(progressState, node)
submit(node)
post {
@ -240,18 +240,26 @@ final class Execute[A[_] <: AnyRef] private(config: Config, triggers: Triggers[A
* This returns a Completed instance, which contains the post-processing to perform after the result is retrieved from the Strategy.*/
def work[T](node: A[T], f: => Either[A[T], T])(implicit strategy: Strategy): Completed =
{
val result = wideConvert(f).left.map {
progress.workStarting(node)
val rawResult = wideConvert(f).left.map {
case i: Incomplete => if(config.overwriteNode(i)) i.copy(node = Some(node)) else i
case e => Incomplete(Some(node), Incomplete.Error, directCause = Some(e))
}
val result = rewrap(rawResult)
progress.workFinished(node, result)
completed {
result match {
case Left(i) => retire(node, Inc(i))
case Right(Right(v)) => retire(node, Value(v))
case Right(Left(target)) => call(node, target)
case Right(v) => retire(node, v)
case Left(target) => call(node, target)
}
}
}
private[this] def rewrap[T](rawResult: Either[Incomplete, Either[A[T], T]]): Either[A[T], Result[T]] =
rawResult match {
case Left(i) => Right(Inc(i))
case Right(Right(v)) => Right(Value(v))
case Right(Left(target)) => Left(target)
}
def remove[K, V](map: Map[K, V], k: K): V = map.remove(k).getOrElse(sys.error("Key '" + k + "' not in map :\n" + map))

View File

@ -0,0 +1,58 @@
package sbt
/** Processes progress events during task execution.
* All methods are called from the same thread except `started` and `finished`,
* which is called from the executing task's thread.
* All methods should return quickly to avoid task execution overhead.
*
* This class is experimental and subject to binary and source incompatible changes at any time. */
private[sbt] trait ExecuteProgress[A[_]]
{
type S
def initial: S
/** Notifies that a `task` has been registered in the system for execution.
* The dependencies of `task` are `allDeps` and the subset of those dependencies that
* have not completed are `pendingDeps`.*/
def registered(state: S, task: A[_], allDeps: Iterable[A[_]], pendingDeps: Iterable[A[_]]): S
/** Notifies that all of the dependencies of `task` have completed and `task` is therefore
* ready to run. The task has not been scheduled on a thread yet. */
def ready(state: S, task: A[_]): S
/** Notifies that the work for `task` is starting after this call returns.
* This is called from the thread the task executes on, unlike most other methods in this callback.
* It is called immediately before the task's work starts with minimal intervening executor overhead. */
def workStarting(task: A[_]): Unit
/** Notifies that the work for `task` work has finished. The task may have computed the next task to
* run, in which case `result` contains that next task wrapped in Left. If the task produced a value
* or terminated abnormally, `result` provides that outcome wrapped in Right. The ultimate result of
* a task is provided to the `completed` method.
* This is called from the thread the task executes on, unlike most other methods in this callback.
* It is immediately called after the task's work is complete with minimal intervening executor overhead. */
def workFinished[T](task: A[T], result: Either[A[T], Result[T]]): Unit
/** Notifies that `task` has completed.
* The task's work is done with a final `result`.
* Any tasks called by `task` have completed. */
def completed[T](state: S, task: A[T], result: Result[T]): S
/** All tasks have completed with the final `results` provided. */
def allCompleted(state: S, results: RMap[A,Result]): S
}
/** This module is experimental and subject to binary and source incompatible changes at any time. */
private[sbt] object ExecuteProgress
{
def empty[A[_]]: ExecuteProgress[A] = new ExecuteProgress[A] {
type S = Unit
def initial = ()
def registered(state: Unit, task: A[_], allDeps: Iterable[A[_]], pendingDeps: Iterable[A[_]]) = ()
def ready(state: Unit, task: A[_]) = ()
def workStarting(task: A[_]) = ()
def workFinished[T](task: A[T], result: Either[A[T], Result[T]]) = ()
def completed[T](state: Unit, task: A[T], result: Result[T]) = ()
def allCompleted(state: Unit, results: RMap[A,Result]) = ()
}
}

View File

@ -12,6 +12,7 @@ trait IDSet[T]
def ++=(t: Iterable[T]): Unit
def -= (t: T): Boolean
def all: collection.Iterable[T]
def toList: List[T]
def isEmpty: Boolean
def foreach(f: T => Unit): Unit
def process[S](t: T)(ifSeen: S)(ifNew: => S): S
@ -38,6 +39,7 @@ object IDSet
def ++=(t: Iterable[T]) = t foreach +=
def -= (t:T) = if(backing.remove(t) eq null) false else true
def all = collection.JavaConversions.collectionAsScalaIterable(backing.keySet)
def toList = all.toList
def isEmpty = backing.isEmpty
def process[S](t: T)(ifSeen: S)(ifNew: => S) = if(contains(t)) ifSeen else { this += t ; ifNew }
override def toString = backing.toString