Merge pull request #1242 from sbt/wip/cancel-task-hooks

Add task cancellation hooks into the build definition.
This commit is contained in:
eugene yokota 2014-04-08 23:32:46 -07:00
commit 78d2aabda2
10 changed files with 208 additions and 20 deletions

View File

@ -59,7 +59,7 @@ final object Aggregation
import extracted.structure
val toRun = ts map { case KeyValue(k,t) => t.map(v => KeyValue(k,v)) } join;
val roots = ts map { case KeyValue(k,_) => k }
val config = extractedConfig(extracted, structure, s)
val config = extractedTaskConfig(extracted, structure, s)
val start = System.currentTimeMillis
val (newS, result) = withStreams(structure, s){ str =>

View File

@ -120,6 +120,10 @@ object Defaults extends BuildCommon
trapExit :== true,
connectInput :== false,
cancelable :== false,
taskCancelStrategy := { state: State =>
if(cancelable.value) TaskCancellationStrategy.Signal
else TaskCancellationStrategy.Null
},
envVars :== Map.empty,
sbtVersion := appConfiguration.value.provider.id.version,
sbtBinaryVersion := binarySbtVersion(sbtVersion.value),

View File

@ -14,7 +14,101 @@ package sbt
import std.Transform.{DummyTaskMap,TaskAndValue}
import TaskName._
@deprecated("Use EvaluateTaskConfig instead.", "0.13.5")
final case class EvaluateConfig(cancelable: Boolean, restrictions: Seq[Tags.Rule], checkCycles: Boolean = false, progress: ExecuteProgress[Task] = EvaluateTask.defaultProgress)
/** An API that allows you to cancel executing tasks upon some signal.
*
* For example, this is implemented by the TaskEngine; invoking `cancel()` allows you
* to cancel the current task exeuction. A `TaskCancel` is passed to the
* [[TaskEvalautionCancelHandler]] which is responsible for calling `cancel()` when
* appropriate.
*/
trait RunningTaskEngine {
/** Attempts to kill and shutdown the running task engine.*/
def cancelAndShutdown(): Unit
}
/**
* A startegy for being able to cancle tasks.
*
* Implementations of this trait determine what will trigger `cancel()` for
* the task engine, providing in the `start` method.
*
* All methods on this API are expected to be called from the same thread.
*/
trait TaskCancellationStrategy {
/** The state used by this task. */
type State
/** Called when task evaluation starts.
*
* @param canceller An object that can cancel the current task evaluation session.
* @return Whatever state you need to cleanup in your finish method.
*/
def onTaskEngineStart(canceller: RunningTaskEngine): State
/** Called when task evaluation completes, either in success or failure. */
def onTaskEngineFinish(state: State): Unit
}
object TaskCancellationStrategy {
/** An empty handler that does not cancel tasks. */
object Null extends TaskCancellationStrategy {
type State = Unit
def onTaskEngineStart(canceller: RunningTaskEngine): Unit = ()
def onTaskEngineFinish(state: Unit): Unit = ()
}
/** Cancel handler which registers for SIGINT and cancels tasks when it is received. */
object Signal extends TaskCancellationStrategy {
type State = Signals.Registration
def onTaskEngineStart(canceller: RunningTaskEngine): Signals.Registration = {
Signals.register(() => canceller.cancelAndShutdown())
}
def onTaskEngineFinish(registration: Signals.Registration): Unit =
registration.remove()
}
}
/** The new API for running tasks.
*
* This represents all the hooks possible when running the task engine.
* We expose this trait so that we can, in a binary compatible way, modify what is used
* inside this configuration and how to construct it.
*/
sealed trait EvaluateTaskConfig {
def restrictions: Seq[Tags.Rule]
def checkCycles: Boolean
def progressReporter: ExecuteProgress[Task]
def cancelStrategy: TaskCancellationStrategy
}
final object EvaluateTaskConfig {
/** Pulls in the old configuration format. */
def apply(old: EvaluateConfig): EvaluateTaskConfig = {
object AdaptedTaskConfig extends EvaluateTaskConfig {
def restrictions: Seq[Tags.Rule] = old.restrictions
def checkCycles: Boolean = old.checkCycles
def progressReporter: ExecuteProgress[Task] = old.progress
def cancelStrategy: TaskCancellationStrategy =
if(old.cancelable) TaskCancellationStrategy.Signal
else TaskCancellationStrategy.Null
}
AdaptedTaskConfig
}
/** Raw constructor for EvaluateTaskConfig. */
def apply(restrictions: Seq[Tags.Rule],
checkCycles: Boolean,
progressReporter: ExecuteProgress[Task],
cancelStrategy: TaskCancellationStrategy): EvaluateTaskConfig = {
object SimpleEvaluateTaskConfig extends EvaluateTaskConfig {
def restrictions = restrictions
def checkCycles = checkCycles
def progressReporter = progressReporter
def cancelStrategy = cancelStrategy
}
SimpleEvaluateTaskConfig
}
}
final case class PluginData(dependencyClasspath: Seq[Attributed[File]], definitionClasspath: Seq[Attributed[File]], resolvers: Option[Seq[Resolver]], report: Option[UpdateReport], scalacOptions: Seq[String])
{
val classpath: Seq[Attributed[File]] = definitionClasspath ++ dependencyClasspath
@ -40,24 +134,25 @@ object EvaluateTask
val SystemProcessors = Runtime.getRuntime.availableProcessors
@deprecated("Use extractedConfig.", "0.13.0")
@deprecated("Use extractedTaskConfig.", "0.13.0")
def defaultConfig(state: State): EvaluateConfig =
{
val extracted = Project.extract(state)
extractedConfig(extracted, extracted.structure, state)
}
@deprecated("Use extractedConfig.", "0.13.0")
@deprecated("Use extractedTaskConfig.", "0.13.0")
def defaultConfig(extracted: Extracted, structure: BuildStructure) =
EvaluateConfig(false, restrictions(extracted, structure), progress = defaultProgress)
@deprecated("Use other extractedConfig", "0.13.2")
@deprecated("Use other extractedTaskConfig", "0.13.2")
def extractedConfig(extracted: Extracted, structure: BuildStructure): EvaluateConfig =
{
val workers = restrictions(extracted, structure)
val canCancel = cancelable(extracted, structure)
EvaluateConfig(cancelable = canCancel, restrictions = workers, progress = defaultProgress)
}
@deprecated("Use other extractedTaskConfig", "0.13.5")
def extractedConfig(extracted: Extracted, structure: BuildStructure, state: State): EvaluateConfig =
{
val workers = restrictions(extracted, structure)
@ -65,6 +160,13 @@ object EvaluateTask
val progress = executeProgress(extracted, structure, state)
EvaluateConfig(cancelable = canCancel, restrictions = workers, progress = progress)
}
def extractedTaskConfig(extracted: Extracted, structure: BuildStructure, state: State): EvaluateTaskConfig =
{
val rs = restrictions(extracted, structure)
val canceller = cancelStrategy(extracted, structure, state)
val progress = executeProgress(extracted, structure, state)
EvaluateTaskConfig(rs, false, progress, canceller)
}
def defaultRestrictions(maxWorkers: Int) = Tags.limitAll(maxWorkers) :: Nil
def defaultRestrictions(extracted: Extracted, structure: BuildStructure): Seq[Tags.Rule] =
@ -84,11 +186,13 @@ object EvaluateTask
1
def cancelable(extracted: Extracted, structure: BuildStructure): Boolean =
getSetting(Keys.cancelable, false, extracted, structure)
def cancelStrategy(extracted: Extracted, structure: BuildStructure, state: State): TaskCancellationStrategy =
getSetting(Keys.taskCancelStrategy, {(_: State) => TaskCancellationStrategy.Null}, extracted, structure)(state)
private[sbt] def executeProgress(extracted: Extracted, structure: BuildStructure, state: State): ExecuteProgress[Task] = {
import Types.const
val maker: State => Keys.TaskProgress = getSetting(Keys.executeProgress, const(new Keys.TaskProgress(defaultProgress)), extracted, structure)
maker(state).progress
val maker: State => Keys.TaskProgress = getSetting(Keys.executeProgress, const(new Keys.TaskProgress(defaultProgress)), extracted, structure)
maker(state).progress
}
def getSetting[T](key: SettingKey[T], default: T, extracted: Extracted, structure: BuildStructure): T =
@ -119,16 +223,20 @@ object EvaluateTask
* If the task is not defined, None is returned. The provided task key is resolved against the current project `ref`.
* Task execution is configured according to settings defined in the loaded project.*/
def apply[T](structure: BuildStructure, taskKey: ScopedKey[Task[T]], state: State, ref: ProjectRef): Option[(State, Result[T])] =
apply[T](structure, taskKey, state, ref, extractedConfig(Project.extract(state), structure))
apply[T](structure, taskKey, state, ref, extractedTaskConfig(Project.extract(state), structure, state))
/** Evaluates `taskKey` and returns the new State and the result of the task wrapped in Some.
* If the task is not defined, None is returned. The provided task key is resolved against the current project `ref`.
* `config` configures concurrency and canceling of task execution. */
@deprecated("Use EvalauteTaskConfig option instead.", "0.13.5")
def apply[T](structure: BuildStructure, taskKey: ScopedKey[Task[T]], state: State, ref: ProjectRef, config: EvaluateConfig): Option[(State, Result[T])] =
apply(structure, taskKey, state, ref, EvaluateTaskConfig(config))
def apply[T](structure: BuildStructure, taskKey: ScopedKey[Task[T]], state: State, ref: ProjectRef, config: EvaluateTaskConfig): Option[(State, Result[T])] = {
withStreams(structure, state) { str =>
for( (task, toNode) <- getTask(structure, taskKey, state, str, ref) ) yield
runTask(task, state, str, structure.index.triggers, config)(toNode)
}
}
def logIncResult(result: Result[_], state: State, streams: Streams) = result match { case Inc(i) => logIncomplete(i, state, streams); case _ => () }
def logIncomplete(result: Incomplete, state: State, streams: Streams)
{
@ -176,12 +284,18 @@ object EvaluateTask
def nodeView[HL <: HList](state: State, streams: Streams, roots: Seq[ScopedKey[_]], dummies: DummyTaskMap = DummyTaskMap(Nil)): NodeView[Task] =
Transform((dummyRoots, roots) :: (dummyStreamsManager, streams) :: (dummyState, state) :: dummies )
def runTask[T](root: Task[T], state: State, streams: Streams, triggers: Triggers[Task], config: EvaluateConfig)(implicit taskToNode: NodeView[Task]): (State, Result[T]) =
@deprecated("Use new EvalauteTaskConfig option to runTask", "0.13.5")
def runTask[T](root: Task[T], state: State, streams: Streams, triggers: Triggers[Task], config: EvaluateConfig)(implicit taskToNode: NodeView[Task]): (State, Result[T]) =
{
val newConfig = EvaluateTaskConfig(config)
runTask(root, state, streams, triggers, config)(taskToNode)
}
def runTask[T](root: Task[T], state: State, streams: Streams, triggers: Triggers[Task], config: EvaluateTaskConfig)(implicit taskToNode: NodeView[Task]): (State, Result[T]) =
{
import ConcurrentRestrictions.{completionService, TagMap, Tag, tagged, tagsKey}
import ConcurrentRestrictions.{completionService, TagMap, Tag, tagged, tagsKey}
val log = state.log
log.debug("Running task... Cancelable: " + config.cancelable + ", check cycles: " + config.checkCycles)
log.debug("Running task... Cancel: " + config.cancelStrategy + ", check cycles: " + config.checkCycles)
val tags = tagged[Task[_]](_.info get tagsKey getOrElse Map.empty, Tags.predicate(config.restrictions))
val (service, shutdown) = completionService[Task[_], Completed](tags, (s: String) => log.warn(s))
@ -191,7 +305,7 @@ object EvaluateTask
case _ => true
}
def run() = {
val x = new Execute[Task]( Execute.config(config.checkCycles, overwriteNode), triggers, config.progress)(taskToNode)
val x = new Execute[Task]( Execute.config(config.checkCycles, overwriteNode), triggers, config.progressReporter)(taskToNode)
val (newState, result) =
try {
val results = x.runKeep(root)(service)
@ -203,15 +317,18 @@ object EvaluateTask
logIncResult(replaced, state, streams)
(newState, replaced)
}
val cancel = () => {
println("")
log.warn("Canceling execution...")
shutdown()
object runningEngine extends RunningTaskEngine {
def cancelAndShutdown(): Unit = {
println("")
log.warn("Canceling execution...")
shutdown()
}
}
if(config.cancelable)
Signals.withHandler(cancel) { run }
else
run()
// Register with our cancel handler we're about to start.
val strat = config.cancelStrategy
val cancelState = strat.onTaskEngineStart(runningEngine)
try run()
finally strat.onTaskEngineFinish(cancelState)
}
private[this] def storeValuesForPrevious(results: RMap[Task, Result], state: State, streams: Streams): Unit =

View File

@ -346,6 +346,7 @@ object Keys
// wrapper to work around SI-2915
private[sbt] final class TaskProgress(val progress: ExecuteProgress[Task])
private[sbt] val executeProgress = SettingKey[State => TaskProgress]("executeProgress", "Experimental task execution listener.", DTask)
private[sbt] val taskCancelStrategy = SettingKey[State => TaskCancellationStrategy]("taskCancelStrategy", "Experimental task cancellation handler.", DTask)
// Experimental in sbt 0.13.2 to enable grabing semantic compile failures.
private[sbt] val compilerReporter = TaskKey[Option[xsbti.Reporter]]("compilerReporter", "Experimental hook to listen (or send) compilation failure messages.", DTask)

View File

@ -498,13 +498,25 @@ object Project extends ProjectExtra
@deprecated("This method does not apply state changes requested during task execution. Use 'runTask' instead, which does.", "0.11.1")
def evaluateTask[T](taskKey: ScopedKey[Task[T]], state: State, checkCycles: Boolean = false, maxWorkers: Int = EvaluateTask.SystemProcessors): Option[Result[T]] =
runTask(taskKey, state, EvaluateConfig(true, EvaluateTask.defaultRestrictions(maxWorkers), checkCycles)).map(_._2)
def runTask[T](taskKey: ScopedKey[Task[T]], state: State, checkCycles: Boolean = false): Option[(State, Result[T])] =
runTask(taskKey, state, EvaluateConfig(true, EvaluateTask.restrictions(state), checkCycles))
{
val extracted = Project.extract(state)
val ch = EvaluateTask.cancelStrategy(extracted, extracted.structure, state)
val p = EvaluateTask.executeProgress(extracted, extracted.structure, state)
val r = EvaluateTask.restrictions(state)
runTask(taskKey, state, EvaluateTaskConfig(r, checkCycles, p, ch))
}
@deprecated("Use EvalauteTaskConfig option instead.", "0.13.5")
def runTask[T](taskKey: ScopedKey[Task[T]], state: State, config: EvaluateConfig): Option[(State, Result[T])] =
{
val extracted = Project.extract(state)
EvaluateTask(extracted.structure, taskKey, state, extracted.currentRef, config)
}
def runTask[T](taskKey: ScopedKey[Task[T]], state: State, config: EvaluateTaskConfig): Option[(State, Result[T])] = {
val extracted = Project.extract(state)
EvaluateTask(extracted.structure, taskKey, state, extracted.currentRef, config)
}
implicit def projectToRef(p: Project): ProjectReference = LocalProject(p.id)

View File

@ -0,0 +1,9 @@
import sbt.ExposeYourself._
taskCancelHandler := { (state: State) =>
new TaskEvaluationCancelHandler {
type State = Unit
override def onTaskEngineStart(canceller: TaskCancel): Unit = canceller.cancel()
override def finish(result: Unit): Unit = ()
}
}

View File

@ -0,0 +1,5 @@
package sbt // this API is private[sbt], so only exposed for trusted clients and folks who like breaking.
object ExposeYourself {
val taskCancelHandler = sbt.Keys.taskCancelHandler
}

View File

@ -0,0 +1,5 @@
import scala
object Foo {
val x = "this should be long to compile or the test may fail."
}

View File

@ -0,0 +1,3 @@
# All tasks should fail.
-> compile
-> test

View File

@ -19,6 +19,38 @@ object Signals
case Right(v) => v
}
}
/** Helper interface so we can expose internals of signal-isms to others. */
sealed trait Registration {
def remove(): Unit
}
/** Register a signal handler that can be removed later.
* NOTE: Does not stack with other signal handlers!!!!
*/
def register(handler: () => Unit, signal: String = INT): Registration =
// TODO - Maybe we can just ignore things if not is-supported.
if(supported(signal)) {
import sun.misc.{Signal,SignalHandler}
val intSignal = new Signal(signal)
val newHandler = new SignalHandler {
def handle(sig: Signal) { handler() }
}
val oldHandler = Signal.handle(intSignal, newHandler)
object unregisterNewHandler extends Registration {
override def remove(): Unit = {
Signal.handle(intSignal, oldHandler)
}
}
unregisterNewHandler
} else {
// TODO - Maybe we should just throw an exception if we don't support signals...
object NullUnregisterNewHandler extends Registration {
override def remove(): Unit = ()
}
NullUnregisterNewHandler
}
def supported(signal: String): Boolean =
try
{