mirror of https://github.com/sbt/sbt.git
Introduce TaskId to reduce abstraction around Execute
This commit is contained in:
parent
ff48bbfe8a
commit
7ea0506736
|
|
@ -25,7 +25,7 @@ import Scope.GlobalScope
|
||||||
import sbt.SlashSyntax0.*
|
import sbt.SlashSyntax0.*
|
||||||
import sbt.internal.parser.SbtParser
|
import sbt.internal.parser.SbtParser
|
||||||
import sbt.io.IO
|
import sbt.io.IO
|
||||||
import scala.collection.JavaConverters.*
|
import scala.jdk.CollectionConverters.*
|
||||||
import xsbti.VirtualFile
|
import xsbti.VirtualFile
|
||||||
import xsbti.VirtualFileRef
|
import xsbti.VirtualFileRef
|
||||||
|
|
||||||
|
|
@ -403,14 +403,14 @@ object Index {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
private[this] type TriggerMap = collection.mutable.HashMap[Task[Any], Seq[Task[Any]]]
|
private[this] type TriggerMap = collection.mutable.HashMap[TaskId[?], Seq[TaskId[?]]]
|
||||||
|
|
||||||
def triggers(ss: Settings[Scope]): Triggers[Task] = {
|
def triggers(ss: Settings[Scope]): Triggers = {
|
||||||
val runBefore = new TriggerMap
|
val runBefore = new TriggerMap
|
||||||
val triggeredBy = new TriggerMap
|
val triggeredBy = new TriggerMap
|
||||||
ss.data.values foreach (
|
ss.data.values foreach (
|
||||||
_.entries foreach {
|
_.entries foreach {
|
||||||
case AttributeEntry(_, value: Task[Any]) =>
|
case AttributeEntry(_, value: Task[?]) =>
|
||||||
val as = value.info.attributes
|
val as = value.info.attributes
|
||||||
update(runBefore, value, as.get(Def.runBefore.asInstanceOf))
|
update(runBefore, value, as.get(Def.runBefore.asInstanceOf))
|
||||||
update(triggeredBy, value, as.get(Def.triggeredBy.asInstanceOf))
|
update(triggeredBy, value, as.get(Def.triggeredBy.asInstanceOf))
|
||||||
|
|
@ -418,13 +418,13 @@ object Index {
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
val onComplete = (GlobalScope / Def.onComplete) get ss getOrElse (() => ())
|
val onComplete = (GlobalScope / Def.onComplete) get ss getOrElse (() => ())
|
||||||
new Triggers[Task](runBefore, triggeredBy, map => { onComplete(); map })
|
new Triggers(runBefore, triggeredBy, map => { onComplete(); map })
|
||||||
}
|
}
|
||||||
|
|
||||||
private[this] def update(
|
private[this] def update(
|
||||||
map: TriggerMap,
|
map: TriggerMap,
|
||||||
base: Task[Any],
|
base: Task[?],
|
||||||
tasksOpt: Option[Seq[Task[Any]]]
|
tasksOpt: Option[Seq[Task[?]]]
|
||||||
): Unit =
|
): Unit =
|
||||||
for {
|
for {
|
||||||
tasks <- tasksOpt
|
tasks <- tasksOpt
|
||||||
|
|
|
||||||
|
|
@ -382,10 +382,11 @@ object Def extends Init[Scope] with TaskMacroExtra with InitializeImplicits:
|
||||||
(TaskKey[A](name, description, DTask), dummyTask(name))
|
(TaskKey[A](name, description, DTask), dummyTask(name))
|
||||||
|
|
||||||
private[sbt] def dummyTask[T](name: String): Task[T] = {
|
private[sbt] def dummyTask[T](name: String): Task[T] = {
|
||||||
import std.TaskExtra.{ task => newTask, _ }
|
import std.TaskExtra.{ task => newTask, toTaskInfo }
|
||||||
val base: Task[T] = newTask(
|
val base: Task[T] = newTask(
|
||||||
sys.error("Dummy task '" + name + "' did not get converted to a full task.")
|
sys.error("Dummy task '" + name + "' did not get converted to a full task.")
|
||||||
) named name
|
)
|
||||||
|
.named(name)
|
||||||
base.copy(info = base.info.set(isDummyTask, true))
|
base.copy(info = base.info.set(isDummyTask, true))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -115,7 +115,7 @@ object Previous {
|
||||||
/** Persists values of tasks t where there is some task referencing it via t.previous. */
|
/** Persists values of tasks t where there is some task referencing it via t.previous. */
|
||||||
private[sbt] def complete(
|
private[sbt] def complete(
|
||||||
referenced: References,
|
referenced: References,
|
||||||
results: RMap[Task, Result],
|
results: RMap[TaskId, Result],
|
||||||
streams: Streams
|
streams: Streams
|
||||||
): Unit = {
|
): Unit = {
|
||||||
val map = referenced.getReferences
|
val map = referenced.getReferences
|
||||||
|
|
@ -124,7 +124,7 @@ object Previous {
|
||||||
// We first collect all of the successful tasks and write their scoped key into a map
|
// We first collect all of the successful tasks and write their scoped key into a map
|
||||||
// along with their values.
|
// along with their values.
|
||||||
val successfulTaskResults = (for
|
val successfulTaskResults = (for
|
||||||
case results.TPair(task, Result.Value(v)) <- results.toTypedSeq
|
results.TPair(task: Task[?], Result.Value(v)) <- results.toTypedSeq
|
||||||
key <- task.info.attributes.get(Def.taskDefinitionKey).asInstanceOf[Option[AnyTaskKey]]
|
key <- task.info.attributes.get(Def.taskDefinitionKey).asInstanceOf[Option[AnyTaskKey]]
|
||||||
yield key -> v).toMap
|
yield key -> v).toMap
|
||||||
// We then traverse the successful results and look up all of the referenced values for
|
// We then traverse the successful results and look up all of the referenced values for
|
||||||
|
|
|
||||||
|
|
@ -534,7 +534,7 @@ object Scoped:
|
||||||
.apply(deps => nop.dependsOn(deps: _*))
|
.apply(deps => nop.dependsOn(deps: _*))
|
||||||
}
|
}
|
||||||
|
|
||||||
sealed abstract class RichTaskables[K[L[x]]](final val keys: K[Taskable])(using
|
sealed abstract class RichTaskables[K[+L[x]]](final val keys: K[Taskable])(using
|
||||||
a: AList[K]
|
a: AList[K]
|
||||||
):
|
):
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,6 @@ import java.util.concurrent.atomic.AtomicReference
|
||||||
|
|
||||||
import sbt.Def.{ ScopedKey, Setting, dummyState }
|
import sbt.Def.{ ScopedKey, Setting, dummyState }
|
||||||
import sbt.Keys.{ TaskProgress => _, name => _, _ }
|
import sbt.Keys.{ TaskProgress => _, name => _, _ }
|
||||||
// import sbt.Project.richInitializeTask
|
|
||||||
import sbt.ProjectExtra.*
|
import sbt.ProjectExtra.*
|
||||||
import sbt.Scope.Global
|
import sbt.Scope.Global
|
||||||
import sbt.SlashSyntax0._
|
import sbt.SlashSyntax0._
|
||||||
|
|
@ -102,7 +101,7 @@ object TaskCancellationStrategy {
|
||||||
sealed trait EvaluateTaskConfig {
|
sealed trait EvaluateTaskConfig {
|
||||||
def restrictions: Seq[Tags.Rule]
|
def restrictions: Seq[Tags.Rule]
|
||||||
def checkCycles: Boolean
|
def checkCycles: Boolean
|
||||||
def progressReporter: ExecuteProgress[Task]
|
def progressReporter: ExecuteProgress
|
||||||
def cancelStrategy: TaskCancellationStrategy
|
def cancelStrategy: TaskCancellationStrategy
|
||||||
|
|
||||||
/** If true, we force a finalizer/gc run (or two) after task execution completes when needed. */
|
/** If true, we force a finalizer/gc run (or two) after task execution completes when needed. */
|
||||||
|
|
@ -118,7 +117,7 @@ object EvaluateTaskConfig {
|
||||||
def apply(
|
def apply(
|
||||||
restrictions: Seq[Tags.Rule],
|
restrictions: Seq[Tags.Rule],
|
||||||
checkCycles: Boolean,
|
checkCycles: Boolean,
|
||||||
progressReporter: ExecuteProgress[Task],
|
progressReporter: ExecuteProgress,
|
||||||
cancelStrategy: TaskCancellationStrategy,
|
cancelStrategy: TaskCancellationStrategy,
|
||||||
forceGarbageCollection: Boolean,
|
forceGarbageCollection: Boolean,
|
||||||
minForcegcInterval: Duration
|
minForcegcInterval: Duration
|
||||||
|
|
@ -135,7 +134,7 @@ object EvaluateTaskConfig {
|
||||||
private[this] case class DefaultEvaluateTaskConfig(
|
private[this] case class DefaultEvaluateTaskConfig(
|
||||||
restrictions: Seq[Tags.Rule],
|
restrictions: Seq[Tags.Rule],
|
||||||
checkCycles: Boolean,
|
checkCycles: Boolean,
|
||||||
progressReporter: ExecuteProgress[Task],
|
progressReporter: ExecuteProgress,
|
||||||
cancelStrategy: TaskCancellationStrategy,
|
cancelStrategy: TaskCancellationStrategy,
|
||||||
forceGarbageCollection: Boolean,
|
forceGarbageCollection: Boolean,
|
||||||
minForcegcInterval: Duration
|
minForcegcInterval: Duration
|
||||||
|
|
@ -168,7 +167,7 @@ object EvaluateTask {
|
||||||
|
|
||||||
@nowarn
|
@nowarn
|
||||||
lazy private val sharedProgress = new TaskTimings(reportOnShutdown = true)
|
lazy private val sharedProgress = new TaskTimings(reportOnShutdown = true)
|
||||||
def taskTimingProgress: Option[ExecuteProgress[Task]] =
|
def taskTimingProgress: Option[ExecuteProgress] =
|
||||||
if (SysProp.taskTimingsOnShutdown) Some(sharedProgress)
|
if (SysProp.taskTimingsOnShutdown) Some(sharedProgress)
|
||||||
else None
|
else None
|
||||||
|
|
||||||
|
|
@ -194,14 +193,14 @@ object EvaluateTask {
|
||||||
}
|
}
|
||||||
|
|
||||||
lazy private val sharedTraceEvent = new TaskTraceEvent()
|
lazy private val sharedTraceEvent = new TaskTraceEvent()
|
||||||
def taskTraceEvent: Option[ExecuteProgress[Task]] =
|
def taskTraceEvent: Option[ExecuteProgress] =
|
||||||
if (SysProp.traces) {
|
if (SysProp.traces) {
|
||||||
Some(sharedTraceEvent)
|
Some(sharedTraceEvent)
|
||||||
} else None
|
} else None
|
||||||
|
|
||||||
// sbt-pgp calls this
|
// sbt-pgp calls this
|
||||||
@deprecated("No longer used", "1.3.0")
|
@deprecated("No longer used", "1.3.0")
|
||||||
private[sbt] def defaultProgress(): ExecuteProgress[Task] = ExecuteProgress.empty[Task]
|
private[sbt] def defaultProgress(): ExecuteProgress = ExecuteProgress.empty
|
||||||
|
|
||||||
val SystemProcessors = Runtime.getRuntime.availableProcessors
|
val SystemProcessors = Runtime.getRuntime.availableProcessors
|
||||||
|
|
||||||
|
|
@ -262,26 +261,26 @@ object EvaluateTask {
|
||||||
extracted: Extracted,
|
extracted: Extracted,
|
||||||
structure: BuildStructure,
|
structure: BuildStructure,
|
||||||
state: State
|
state: State
|
||||||
): ExecuteProgress[Task] = {
|
): ExecuteProgress = {
|
||||||
state
|
state
|
||||||
.get(currentTaskProgress)
|
.get(currentTaskProgress)
|
||||||
.map { tp =>
|
.map { tp =>
|
||||||
new ExecuteProgress[Task] {
|
new ExecuteProgress {
|
||||||
val progress = tp.progress
|
val progress = tp.progress
|
||||||
override def initial(): Unit = progress.initial()
|
override def initial(): Unit = progress.initial()
|
||||||
override def afterRegistered(
|
override def afterRegistered(
|
||||||
task: Task[Any],
|
task: TaskId[?],
|
||||||
allDeps: Iterable[Task[Any]],
|
allDeps: Iterable[TaskId[?]],
|
||||||
pendingDeps: Iterable[Task[Any]]
|
pendingDeps: Iterable[TaskId[?]]
|
||||||
): Unit =
|
): Unit =
|
||||||
progress.afterRegistered(task, allDeps, pendingDeps)
|
progress.afterRegistered(task, allDeps, pendingDeps)
|
||||||
override def afterReady(task: Task[Any]): Unit = progress.afterReady(task)
|
override def afterReady(task: TaskId[?]): Unit = progress.afterReady(task)
|
||||||
override def beforeWork(task: Task[Any]): Unit = progress.beforeWork(task)
|
override def beforeWork(task: TaskId[?]): Unit = progress.beforeWork(task)
|
||||||
override def afterWork[A](task: Task[A], result: Either[Task[A], Result[A]]): Unit =
|
override def afterWork[A](task: TaskId[A], result: Either[TaskId[A], Result[A]]): Unit =
|
||||||
progress.afterWork(task, result)
|
progress.afterWork(task, result)
|
||||||
override def afterCompleted[A](task: Task[A], result: Result[A]): Unit =
|
override def afterCompleted[A](task: TaskId[A], result: Result[A]): Unit =
|
||||||
progress.afterCompleted(task, result)
|
progress.afterCompleted(task, result)
|
||||||
override def afterAllCompleted(results: RMap[Task, Result]): Unit =
|
override def afterAllCompleted(results: RMap[TaskId, Result]): Unit =
|
||||||
progress.afterAllCompleted(results)
|
progress.afterAllCompleted(results)
|
||||||
override def stop(): Unit = {}
|
override def stop(): Unit = {}
|
||||||
}
|
}
|
||||||
|
|
@ -298,9 +297,9 @@ object EvaluateTask {
|
||||||
new TaskTimings(reportOnShutdown = false, state.globalLogging.full) :: Nil
|
new TaskTimings(reportOnShutdown = false, state.globalLogging.full) :: Nil
|
||||||
else Nil)
|
else Nil)
|
||||||
reporters match {
|
reporters match {
|
||||||
case xs if xs.isEmpty => ExecuteProgress.empty[Task]
|
case xs if xs.isEmpty => ExecuteProgress.empty
|
||||||
case xs if xs.size == 1 => xs.head
|
case xs if xs.size == 1 => xs.head
|
||||||
case xs => ExecuteProgress.aggregate[Task](xs)
|
case xs => ExecuteProgress.aggregate(xs)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -439,7 +438,7 @@ object EvaluateTask {
|
||||||
state: State,
|
state: State,
|
||||||
streams: Streams,
|
streams: Streams,
|
||||||
ref: ProjectRef
|
ref: ProjectRef
|
||||||
): Option[(Task[T], NodeView[Task])] = {
|
): Option[(Task[T], NodeView)] = {
|
||||||
val thisScope = Load.projectScope(ref)
|
val thisScope = Load.projectScope(ref)
|
||||||
val resolvedScope = Scope.replaceThis(thisScope)(taskKey.scope)
|
val resolvedScope = Scope.replaceThis(thisScope)(taskKey.scope)
|
||||||
for (t <- structure.data.get(resolvedScope, taskKey.key))
|
for (t <- structure.data.get(resolvedScope, taskKey.key))
|
||||||
|
|
@ -450,7 +449,7 @@ object EvaluateTask {
|
||||||
streams: Streams,
|
streams: Streams,
|
||||||
roots: Seq[ScopedKey[_]],
|
roots: Seq[ScopedKey[_]],
|
||||||
dummies: DummyTaskMap = DummyTaskMap(Nil)
|
dummies: DummyTaskMap = DummyTaskMap(Nil)
|
||||||
): NodeView[Task] =
|
): NodeView =
|
||||||
Transform(
|
Transform(
|
||||||
(dummyRoots, roots) :: (Def.dummyStreamsManager, streams) :: (dummyState, state) :: dummies
|
(dummyRoots, roots) :: (Def.dummyStreamsManager, streams) :: (dummyState, state) :: dummies
|
||||||
)
|
)
|
||||||
|
|
@ -471,24 +470,21 @@ object EvaluateTask {
|
||||||
root: Task[T],
|
root: Task[T],
|
||||||
state: State,
|
state: State,
|
||||||
streams: Streams,
|
streams: Streams,
|
||||||
triggers: Triggers[Task],
|
triggers: Triggers,
|
||||||
config: EvaluateTaskConfig
|
config: EvaluateTaskConfig
|
||||||
)(using taskToNode: NodeView[Task]): (State, Result[T]) = {
|
)(using taskToNode: NodeView): (State, Result[T]) = {
|
||||||
import ConcurrentRestrictions.{ cancellableCompletionService, tagged, tagsKey }
|
import ConcurrentRestrictions.{ cancellableCompletionService, tagged, tagsKey }
|
||||||
|
|
||||||
val log = state.log
|
val log = state.log
|
||||||
log.debug(
|
log.debug(
|
||||||
s"Running task... Cancel: ${config.cancelStrategy}, check cycles: ${config.checkCycles}, forcegc: ${config.forceGarbageCollection}"
|
s"Running task... Cancel: ${config.cancelStrategy}, check cycles: ${config.checkCycles}, forcegc: ${config.forceGarbageCollection}"
|
||||||
)
|
)
|
||||||
def tagMap(t: Task[_]): Tags.TagMap =
|
val tags = tagged(Tags.predicate(config.restrictions))
|
||||||
t.info.get(tagsKey).getOrElse(Map.empty)
|
|
||||||
val tags =
|
|
||||||
tagged[Task[Any]](tagMap, Tags.predicate(config.restrictions))
|
|
||||||
val (service, shutdownThreads) =
|
val (service, shutdownThreads) =
|
||||||
cancellableCompletionService[Task[Any], Completed](
|
cancellableCompletionService(
|
||||||
tags,
|
tags,
|
||||||
(s: String) => log.warn(s),
|
(s: String) => log.warn(s),
|
||||||
(t: Task[_]) => tagMap(t).contains(Tags.Sentinel)
|
(t: TaskId[?]) => t.tags.contains(Tags.Sentinel)
|
||||||
)
|
)
|
||||||
|
|
||||||
def shutdownImpl(force: Boolean): Unit = {
|
def shutdownImpl(force: Boolean): Unit = {
|
||||||
|
|
@ -504,18 +500,18 @@ object EvaluateTask {
|
||||||
def shutdown(): Unit = shutdownImpl(false)
|
def shutdown(): Unit = shutdownImpl(false)
|
||||||
// propagate the defining key for reporting the origin
|
// propagate the defining key for reporting the origin
|
||||||
def overwriteNode(i: Incomplete): Boolean = i.node match {
|
def overwriteNode(i: Incomplete): Boolean = i.node match {
|
||||||
case Some(t: Task[_]) => transformNode(t).isEmpty
|
case Some(t: Task[?]) => transformNode(t).isEmpty
|
||||||
case _ => true
|
case _ => true
|
||||||
}
|
}
|
||||||
def run() = {
|
def run() = {
|
||||||
val x = new Execute[Task](
|
val x = new Execute(
|
||||||
Execute.config(config.checkCycles, overwriteNode),
|
Execute.config(config.checkCycles, overwriteNode),
|
||||||
triggers,
|
triggers,
|
||||||
config.progressReporter
|
config.progressReporter
|
||||||
)
|
)
|
||||||
val (newState, result) =
|
val (newState, result) =
|
||||||
try {
|
try {
|
||||||
given strategy: x.Strategy = service
|
given strategy: CompletionService = service
|
||||||
val results = x.runKeep(root)
|
val results = x.runKeep(root)
|
||||||
storeValuesForPrevious(results, state, streams)
|
storeValuesForPrevious(results, state, streams)
|
||||||
applyResults(results, state, root)
|
applyResults(results, state, root)
|
||||||
|
|
@ -548,7 +544,7 @@ object EvaluateTask {
|
||||||
}
|
}
|
||||||
|
|
||||||
private[this] def storeValuesForPrevious(
|
private[this] def storeValuesForPrevious(
|
||||||
results: RMap[Task, Result],
|
results: RMap[TaskId, Result],
|
||||||
state: State,
|
state: State,
|
||||||
streams: Streams
|
streams: Streams
|
||||||
): Unit =
|
): Unit =
|
||||||
|
|
@ -556,13 +552,13 @@ object EvaluateTask {
|
||||||
Previous.complete(referenced, results, streams)
|
Previous.complete(referenced, results, streams)
|
||||||
|
|
||||||
def applyResults[T](
|
def applyResults[T](
|
||||||
results: RMap[Task, Result],
|
results: RMap[TaskId, Result],
|
||||||
state: State,
|
state: State,
|
||||||
root: Task[T]
|
root: Task[T]
|
||||||
): (State, Result[T]) = {
|
): (State, Result[T]) = {
|
||||||
(stateTransform(results)(state), results(root))
|
(stateTransform(results)(state), results(root))
|
||||||
}
|
}
|
||||||
def stateTransform(results: RMap[Task, Result]): State => State =
|
def stateTransform(results: RMap[TaskId, Result]): State => State =
|
||||||
Function.chain(
|
Function.chain(
|
||||||
results.toTypedSeq flatMap {
|
results.toTypedSeq flatMap {
|
||||||
case results.TPair(_, Result.Value(KeyValue(_, st: StateTransform))) => Some(st.transform)
|
case results.TPair(_, Result.Value(KeyValue(_, st: StateTransform))) => Some(st.transform)
|
||||||
|
|
@ -577,27 +573,25 @@ object EvaluateTask {
|
||||||
Incomplete.transformBU(i)(convertCyclicInc andThen taskToKey andThen liftAnonymous)
|
Incomplete.transformBU(i)(convertCyclicInc andThen taskToKey andThen liftAnonymous)
|
||||||
}
|
}
|
||||||
def taskToKey: Incomplete => Incomplete = {
|
def taskToKey: Incomplete => Incomplete = {
|
||||||
case in @ Incomplete(Some(node: Task[_]), _, _, _, _) => in.copy(node = transformNode(node))
|
case in @ Incomplete(Some(node: Task[?]), _, _, _, _) => in.copy(node = transformNode(node))
|
||||||
case i => i
|
case i => i
|
||||||
}
|
}
|
||||||
|
|
||||||
type AnyCyclic = Execute[({ type A[_] <: AnyRef })#A]#CyclicException[_]
|
|
||||||
|
|
||||||
def convertCyclicInc: Incomplete => Incomplete = {
|
def convertCyclicInc: Incomplete => Incomplete = {
|
||||||
case in @ Incomplete(
|
case in @ Incomplete(
|
||||||
_,
|
_,
|
||||||
_,
|
_,
|
||||||
_,
|
_,
|
||||||
_,
|
_,
|
||||||
Some(c: Execute[({ type A[_] <: AnyRef })#A @unchecked]#CyclicException[_])
|
Some(c: Execute#CyclicException)
|
||||||
) =>
|
) =>
|
||||||
in.copy(directCause = Some(new RuntimeException(convertCyclic(c))))
|
in.copy(directCause = Some(new RuntimeException(convertCyclic(c))))
|
||||||
case i => i
|
case i => i
|
||||||
}
|
}
|
||||||
|
|
||||||
def convertCyclic(c: AnyCyclic): String =
|
def convertCyclic(c: Execute#CyclicException): String =
|
||||||
(c.caller, c.target) match {
|
(c.caller, c.target) match {
|
||||||
case (caller: Task[_], target: Task[_]) =>
|
case (caller: Task[?], target: Task[?]) =>
|
||||||
c.toString + (if (caller eq target) "(task: " + name(caller) + ")"
|
c.toString + (if (caller eq target) "(task: " + name(caller) + ")"
|
||||||
else "(caller: " + name(caller) + ", target: " + name(target) + ")")
|
else "(caller: " + name(caller) + ", target: " + name(target) + ")")
|
||||||
case _ => c.toString
|
case _ => c.toString
|
||||||
|
|
|
||||||
|
|
@ -590,9 +590,9 @@ object Keys {
|
||||||
val state = Def.stateKey
|
val state = Def.stateKey
|
||||||
val streamsManager = Def.streamsManagerKey
|
val streamsManager = Def.streamsManagerKey
|
||||||
// wrapper to work around SI-2915
|
// wrapper to work around SI-2915
|
||||||
final class TaskProgress(val progress: ExecuteProgress[Task])
|
final class TaskProgress(val progress: ExecuteProgress)
|
||||||
object TaskProgress {
|
object TaskProgress {
|
||||||
def apply(progress: ExecuteProgress[Task]): TaskProgress = new TaskProgress(progress)
|
def apply(progress: ExecuteProgress): TaskProgress = new TaskProgress(progress)
|
||||||
}
|
}
|
||||||
private[sbt] val currentTaskProgress = AttributeKey[TaskProgress]("current-task-progress")
|
private[sbt] val currentTaskProgress = AttributeKey[TaskProgress]("current-task-progress")
|
||||||
private[sbt] val taskProgress = AttributeKey[sbt.internal.TaskProgress]("active-task-progress")
|
private[sbt] val taskProgress = AttributeKey[sbt.internal.TaskProgress]("active-task-progress")
|
||||||
|
|
|
||||||
|
|
@ -10,18 +10,18 @@ package internal
|
||||||
|
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
import scala.collection.JavaConverters._
|
import scala.jdk.CollectionConverters.*
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
import scala.collection.immutable.VectorBuilder
|
import scala.collection.immutable.VectorBuilder
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
private[sbt] abstract class AbstractTaskExecuteProgress extends ExecuteProgress[Task] {
|
private[sbt] abstract class AbstractTaskExecuteProgress extends ExecuteProgress {
|
||||||
import AbstractTaskExecuteProgress.Timer
|
import AbstractTaskExecuteProgress.Timer
|
||||||
|
|
||||||
private[this] val showScopedKey = Def.showShortKey(None)
|
private[this] val showScopedKey = Def.showShortKey(None)
|
||||||
private[this] val anonOwners = new ConcurrentHashMap[Task[_], Task[_]]
|
private[this] val anonOwners = new ConcurrentHashMap[TaskId[_], TaskId[_]]
|
||||||
private[this] val calledBy = new ConcurrentHashMap[Task[_], Task[_]]
|
private[this] val calledBy = new ConcurrentHashMap[TaskId[_], TaskId[_]]
|
||||||
private[this] val timings = new ConcurrentHashMap[Task[_], Timer]
|
private[this] val timings = new ConcurrentHashMap[TaskId[_], Timer]
|
||||||
private[sbt] def timingsByName: mutable.Map[String, AtomicLong] = {
|
private[sbt] def timingsByName: mutable.Map[String, AtomicLong] = {
|
||||||
val result = new ConcurrentHashMap[String, AtomicLong]
|
val result = new ConcurrentHashMap[String, AtomicLong]
|
||||||
timings.forEach { (task, timing) =>
|
timings.forEach { (task, timing) =>
|
||||||
|
|
@ -34,18 +34,18 @@ private[sbt] abstract class AbstractTaskExecuteProgress extends ExecuteProgress[
|
||||||
result.asScala
|
result.asScala
|
||||||
}
|
}
|
||||||
private[sbt] def anyTimings = !timings.isEmpty
|
private[sbt] def anyTimings = !timings.isEmpty
|
||||||
def currentTimings: Iterator[(Task[_], Timer)] = timings.asScala.iterator
|
def currentTimings: Iterator[(TaskId[_], Timer)] = timings.asScala.iterator
|
||||||
|
|
||||||
private[internal] def exceededThreshold(task: Task[_], threshold: FiniteDuration): Boolean =
|
private[internal] def exceededThreshold(task: TaskId[_], threshold: FiniteDuration): Boolean =
|
||||||
timings.get(task) match {
|
timings.get(task) match {
|
||||||
case null => false
|
case null => false
|
||||||
case t => t.durationMicros > threshold.toMicros
|
case t => t.durationMicros > threshold.toMicros
|
||||||
}
|
}
|
||||||
private[internal] def timings(
|
private[internal] def timings(
|
||||||
tasks: java.util.Set[Task[_]],
|
tasks: java.util.Set[TaskId[_]],
|
||||||
thresholdMicros: Long
|
thresholdMicros: Long
|
||||||
): Vector[(Task[_], Long)] = {
|
): Vector[(TaskId[_], Long)] = {
|
||||||
val result = new VectorBuilder[(Task[_], Long)]
|
val result = new VectorBuilder[(TaskId[_], Long)]
|
||||||
val now = System.nanoTime
|
val now = System.nanoTime
|
||||||
tasks.forEach { t =>
|
tasks.forEach { t =>
|
||||||
timings.get(t) match {
|
timings.get(t) match {
|
||||||
|
|
@ -60,7 +60,7 @@ private[sbt] abstract class AbstractTaskExecuteProgress extends ExecuteProgress[
|
||||||
result.result()
|
result.result()
|
||||||
}
|
}
|
||||||
def activeTasks(now: Long) = {
|
def activeTasks(now: Long) = {
|
||||||
val result = new VectorBuilder[(Task[_], FiniteDuration)]
|
val result = new VectorBuilder[(TaskId[_], FiniteDuration)]
|
||||||
timings.forEach { (task, timing) =>
|
timings.forEach { (task, timing) =>
|
||||||
if (timing.isActive) result += task -> (now - timing.startNanos).nanos
|
if (timing.isActive) result += task -> (now - timing.startNanos).nanos
|
||||||
}
|
}
|
||||||
|
|
@ -68,25 +68,26 @@ private[sbt] abstract class AbstractTaskExecuteProgress extends ExecuteProgress[
|
||||||
}
|
}
|
||||||
|
|
||||||
override def afterRegistered(
|
override def afterRegistered(
|
||||||
task: Task[Any],
|
task: TaskId[?],
|
||||||
allDeps: Iterable[Task[Any]],
|
allDeps: Iterable[TaskId[?]],
|
||||||
pendingDeps: Iterable[Task[Any]]
|
pendingDeps: Iterable[TaskId[?]]
|
||||||
): Unit = {
|
): Unit = {
|
||||||
// we need this to infer anonymous task names
|
// we need this to infer anonymous task names
|
||||||
pendingDeps foreach { t =>
|
pendingDeps
|
||||||
if (TaskName.transformNode(t).isEmpty) {
|
.filter {
|
||||||
anonOwners.put(t, task)
|
case t: Task[?] => TaskName.transformNode(t).isEmpty
|
||||||
|
case _ => true
|
||||||
}
|
}
|
||||||
}
|
.foreach(anonOwners.put(_, task))
|
||||||
}
|
}
|
||||||
|
|
||||||
override def beforeWork(task: Task[Any]): Unit = {
|
override def beforeWork(task: TaskId[?]): Unit = {
|
||||||
timings.put(task, new Timer)
|
timings.put(task, new Timer)
|
||||||
()
|
()
|
||||||
}
|
}
|
||||||
|
|
||||||
protected def clearTimings: Boolean = false
|
protected def clearTimings: Boolean = false
|
||||||
override def afterWork[A](task: Task[A], result: Either[Task[A], Result[A]]): Unit = {
|
override def afterWork[A](task: TaskId[A], result: Either[TaskId[A], Result[A]]): Unit = {
|
||||||
if (clearTimings) timings.remove(task)
|
if (clearTimings) timings.remove(task)
|
||||||
else
|
else
|
||||||
timings.get(task) match {
|
timings.get(task) match {
|
||||||
|
|
@ -100,21 +101,23 @@ private[sbt] abstract class AbstractTaskExecuteProgress extends ExecuteProgress[
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private[this] val taskNameCache = new ConcurrentHashMap[Task[_], String]
|
private[this] val taskNameCache = new ConcurrentHashMap[TaskId[_], String]
|
||||||
protected def taskName(t: Task[_]): String = taskNameCache.get(t) match {
|
protected def taskName(t: TaskId[_]): String = taskNameCache.get(t) match {
|
||||||
case null =>
|
case null =>
|
||||||
val name = taskName0(t)
|
val name = taskName0(t)
|
||||||
taskNameCache.putIfAbsent(t, name)
|
taskNameCache.putIfAbsent(t, name)
|
||||||
name
|
name
|
||||||
case name => name
|
case name => name
|
||||||
}
|
}
|
||||||
private[this] def taskName0(t: Task[_]): String = {
|
private[this] def taskName0(t: TaskId[_]): String = {
|
||||||
def definedName(node: Task[_]): Option[String] =
|
def definedName(node: Task[_]): Option[String] =
|
||||||
node.info.name orElse TaskName.transformNode(node).map(showScopedKey.show)
|
node.info.name.orElse(TaskName.transformNode(node).map(showScopedKey.show))
|
||||||
def inferredName(t: Task[_]): Option[String] = nameDelegate(t) map taskName
|
def inferredName(t: Task[_]): Option[String] = nameDelegate(t) map taskName
|
||||||
def nameDelegate(t: Task[_]): Option[Task[_]] =
|
def nameDelegate(t: Task[_]): Option[TaskId[_]] =
|
||||||
Option(anonOwners.get(t)) orElse Option(calledBy.get(t))
|
Option(anonOwners.get(t)).orElse(Option(calledBy.get(t)))
|
||||||
definedName(t) orElse inferredName(t) getOrElse TaskName.anonymousName(t)
|
t match
|
||||||
|
case t: Task[?] => definedName(t).orElse(inferredName(t)).getOrElse(TaskName.anonymousName(t))
|
||||||
|
case _ => TaskName.anonymousName(t)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -71,7 +71,7 @@ final class BuildStructure(
|
||||||
final class StructureIndex(
|
final class StructureIndex(
|
||||||
val keyMap: Map[String, AttributeKey[_]],
|
val keyMap: Map[String, AttributeKey[_]],
|
||||||
val taskToKey: Map[Task[_], ScopedKey[Task[_]]],
|
val taskToKey: Map[Task[_], ScopedKey[Task[_]]],
|
||||||
val triggers: Triggers[Task],
|
val triggers: Triggers,
|
||||||
val keyIndex: KeyIndex,
|
val keyIndex: KeyIndex,
|
||||||
val aggregateKeyIndex: KeyIndex,
|
val aggregateKeyIndex: KeyIndex,
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -12,11 +12,11 @@ import Def.{ displayFull, ScopedKey }
|
||||||
import Keys.taskDefinitionKey
|
import Keys.taskDefinitionKey
|
||||||
|
|
||||||
private[sbt] object TaskName {
|
private[sbt] object TaskName {
|
||||||
def name(node: Task[_]): String = definedName(node) getOrElse anonymousName(node)
|
def name(node: Task[_]): String = definedName(node).getOrElse(anonymousName(node))
|
||||||
def definedName(node: Task[_]): Option[String] =
|
def definedName(node: Task[_]): Option[String] =
|
||||||
node.info.name orElse transformNode(node).map(displayFull)
|
node.info.name.orElse(transformNode(node).map(displayFull))
|
||||||
def anonymousName(node: Task[_]): String =
|
def anonymousName(node: TaskId[_]): String =
|
||||||
"<anon-" + System.identityHashCode(node).toHexString + ">"
|
"<anon-" + System.identityHashCode(node).toHexString + ">"
|
||||||
def transformNode(node: Task[_]): Option[ScopedKey[_]] =
|
def transformNode(node: Task[_]): Option[ScopedKey[_]] =
|
||||||
node.info.attributes get taskDefinitionKey
|
node.info.attributes.get(taskDefinitionKey)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -26,11 +26,11 @@ private[sbt] class TaskProgress(
|
||||||
threshold: FiniteDuration,
|
threshold: FiniteDuration,
|
||||||
logger: Logger
|
logger: Logger
|
||||||
) extends AbstractTaskExecuteProgress
|
) extends AbstractTaskExecuteProgress
|
||||||
with ExecuteProgress[Task]
|
with ExecuteProgress
|
||||||
with AutoCloseable {
|
with AutoCloseable {
|
||||||
private[this] val lastTaskCount = new AtomicInteger(0)
|
private[this] val lastTaskCount = new AtomicInteger(0)
|
||||||
private[this] val reportLoop = new AtomicReference[AutoCloseable]
|
private[this] val reportLoop = new AtomicReference[AutoCloseable]
|
||||||
private[this] val active = new ConcurrentHashMap[Task[_], AutoCloseable]
|
private[this] val active = new ConcurrentHashMap[TaskId[_], AutoCloseable]
|
||||||
private[this] val nextReport = new AtomicReference(Deadline.now)
|
private[this] val nextReport = new AtomicReference(Deadline.now)
|
||||||
private[this] val scheduler =
|
private[this] val scheduler =
|
||||||
Executors.newSingleThreadScheduledExecutor(r => new Thread(r, "sbt-progress-report-scheduler"))
|
Executors.newSingleThreadScheduledExecutor(r => new Thread(r, "sbt-progress-report-scheduler"))
|
||||||
|
|
@ -90,7 +90,7 @@ private[sbt] class TaskProgress(
|
||||||
}
|
}
|
||||||
Util.ignoreResult(pending.add(executor.submit(runnable)))
|
Util.ignoreResult(pending.add(executor.submit(runnable)))
|
||||||
}
|
}
|
||||||
override def beforeWork(task: Task[Any]): Unit =
|
override def beforeWork(task: TaskId[?]): Unit =
|
||||||
if (!closed.get) {
|
if (!closed.get) {
|
||||||
super.beforeWork(task)
|
super.beforeWork(task)
|
||||||
reportLoop.get match {
|
reportLoop.get match {
|
||||||
|
|
@ -108,7 +108,7 @@ private[sbt] class TaskProgress(
|
||||||
logger.debug(s"called beforeWork for ${taskName(task)} after task progress was closed")
|
logger.debug(s"called beforeWork for ${taskName(task)} after task progress was closed")
|
||||||
}
|
}
|
||||||
|
|
||||||
override def afterReady(task: Task[Any]): Unit =
|
override def afterReady(task: TaskId[?]): Unit =
|
||||||
if (!closed.get) {
|
if (!closed.get) {
|
||||||
try {
|
try {
|
||||||
Util.ignoreResult(executor.submit((() => {
|
Util.ignoreResult(executor.submit((() => {
|
||||||
|
|
@ -124,7 +124,7 @@ private[sbt] class TaskProgress(
|
||||||
}
|
}
|
||||||
override def stop(): Unit = {}
|
override def stop(): Unit = {}
|
||||||
|
|
||||||
override def afterCompleted[A](task: Task[A], result: Result[A]): Unit =
|
override def afterCompleted[A](task: TaskId[A], result: Result[A]): Unit =
|
||||||
active.remove(task) match {
|
active.remove(task) match {
|
||||||
case null =>
|
case null =>
|
||||||
case a =>
|
case a =>
|
||||||
|
|
@ -132,7 +132,7 @@ private[sbt] class TaskProgress(
|
||||||
if (exceededThreshold(task, threshold)) report()
|
if (exceededThreshold(task, threshold)) report()
|
||||||
}
|
}
|
||||||
|
|
||||||
override def afterAllCompleted(results: RMap[Task, Result]): Unit = {
|
override def afterAllCompleted(results: RMap[TaskId, Result]): Unit = {
|
||||||
reportLoop.getAndSet(null) match {
|
reportLoop.getAndSet(null) match {
|
||||||
case null =>
|
case null =>
|
||||||
case l => l.close()
|
case l => l.close()
|
||||||
|
|
@ -164,7 +164,7 @@ private[sbt] class TaskProgress(
|
||||||
val ltc = lastTaskCount.get
|
val ltc = lastTaskCount.get
|
||||||
if (currentTasks.nonEmpty || ltc != 0) {
|
if (currentTasks.nonEmpty || ltc != 0) {
|
||||||
val currentTasksCount = currentTasks.size
|
val currentTasksCount = currentTasks.size
|
||||||
def event(tasks: Vector[(Task[_], Long)]): ProgressEvent = {
|
def event(tasks: Vector[(TaskId[_], Long)]): ProgressEvent = {
|
||||||
if (tasks.nonEmpty) nextReport.set(Deadline.now + sleepDuration)
|
if (tasks.nonEmpty) nextReport.set(Deadline.now + sleepDuration)
|
||||||
val toWrite = tasks.sortBy(_._2)
|
val toWrite = tasks.sortBy(_._2)
|
||||||
val distinct = new java.util.LinkedHashMap[String, ProgressItem]
|
val distinct = new java.util.LinkedHashMap[String, ProgressItem]
|
||||||
|
|
@ -187,7 +187,7 @@ private[sbt] class TaskProgress(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private[this] def getShortName(task: Task[_]): String = {
|
private[this] def getShortName(task: TaskId[_]): String = {
|
||||||
val name = taskName(task)
|
val name = taskName(task)
|
||||||
name.lastIndexOf('/') match {
|
name.lastIndexOf('/') match {
|
||||||
case -1 => name
|
case -1 => name
|
||||||
|
|
@ -199,12 +199,13 @@ private[sbt] class TaskProgress(
|
||||||
|
|
||||||
}
|
}
|
||||||
private[this] def filter(
|
private[this] def filter(
|
||||||
tasks: Vector[(Task[_], Long)]
|
tasks: Vector[(TaskId[_], Long)]
|
||||||
): (Vector[(Task[_], Long)], Boolean) = {
|
): (Vector[(TaskId[_], Long)], Boolean) = {
|
||||||
tasks.foldLeft((Vector.empty[(Task[_], Long)], false)) { case ((tasks, skip), pair @ (t, _)) =>
|
tasks.foldLeft((Vector.empty[(TaskId[_], Long)], false)) {
|
||||||
val shortName = getShortName(t)
|
case ((tasks, skip), pair @ (t, _)) =>
|
||||||
val newSkip = skip || skipReportTasks.contains(shortName)
|
val shortName = getShortName(t)
|
||||||
if (hiddenTasks.contains(shortName)) (tasks, newSkip) else (tasks :+ pair, newSkip)
|
val newSkip = skip || skipReportTasks.contains(shortName)
|
||||||
|
if (hiddenTasks.contains(shortName)) (tasks, newSkip) else (tasks :+ pair, newSkip)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,7 @@ import sbt.util.{ Level, Logger }
|
||||||
*/
|
*/
|
||||||
private[sbt] final class TaskTimings(reportOnShutdown: Boolean, logger: Logger)
|
private[sbt] final class TaskTimings(reportOnShutdown: Boolean, logger: Logger)
|
||||||
extends AbstractTaskExecuteProgress
|
extends AbstractTaskExecuteProgress
|
||||||
with ExecuteProgress[Task] {
|
with ExecuteProgress {
|
||||||
@deprecated("Use the constructor that takes an sbt.util.Logger parameter.", "1.3.3")
|
@deprecated("Use the constructor that takes an sbt.util.Logger parameter.", "1.3.3")
|
||||||
def this(reportOnShutdown: Boolean) =
|
def this(reportOnShutdown: Boolean) =
|
||||||
this(
|
this(
|
||||||
|
|
@ -50,9 +50,9 @@ private[sbt] final class TaskTimings(reportOnShutdown: Boolean, logger: Logger)
|
||||||
start = System.nanoTime
|
start = System.nanoTime
|
||||||
}
|
}
|
||||||
|
|
||||||
override def afterReady(task: Task[Any]): Unit = ()
|
override def afterReady(task: TaskId[?]): Unit = ()
|
||||||
override def afterCompleted[T](task: Task[T], result: Result[T]): Unit = ()
|
override def afterCompleted[T](task: TaskId[T], result: Result[T]): Unit = ()
|
||||||
override def afterAllCompleted(results: RMap[Task, Result]): Unit =
|
override def afterAllCompleted(results: RMap[TaskId, Result]): Unit =
|
||||||
if (!reportOnShutdown) {
|
if (!reportOnShutdown) {
|
||||||
report()
|
report()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,17 +21,15 @@ import sjsonnew.support.scalajson.unsafe.CompactPrinter
|
||||||
* as Chrome Trace Event Format.
|
* as Chrome Trace Event Format.
|
||||||
* This class is activated by adding -Dsbt.traces=true to the JVM options.
|
* This class is activated by adding -Dsbt.traces=true to the JVM options.
|
||||||
*/
|
*/
|
||||||
private[sbt] final class TaskTraceEvent
|
private[sbt] final class TaskTraceEvent extends AbstractTaskExecuteProgress with ExecuteProgress {
|
||||||
extends AbstractTaskExecuteProgress
|
|
||||||
with ExecuteProgress[Task] {
|
|
||||||
import AbstractTaskExecuteProgress.Timer
|
import AbstractTaskExecuteProgress.Timer
|
||||||
private[this] var start = 0L
|
private[this] var start = 0L
|
||||||
private[this] val console = ConsoleOut.systemOut
|
private[this] val console = ConsoleOut.systemOut
|
||||||
|
|
||||||
override def initial(): Unit = ()
|
override def initial(): Unit = ()
|
||||||
override def afterReady(task: Task[Any]): Unit = ()
|
override def afterReady(task: TaskId[?]): Unit = ()
|
||||||
override def afterCompleted[T](task: Task[T], result: Result[T]): Unit = ()
|
override def afterCompleted[T](task: TaskId[T], result: Result[T]): Unit = ()
|
||||||
override def afterAllCompleted(results: RMap[Task, Result]): Unit = ()
|
override def afterAllCompleted(results: RMap[TaskId, Result]): Unit = ()
|
||||||
override def stop(): Unit = ()
|
override def stop(): Unit = ()
|
||||||
|
|
||||||
start = System.nanoTime
|
start = System.nanoTime
|
||||||
|
|
|
||||||
|
|
@ -66,7 +66,7 @@ object BspCompileTask {
|
||||||
case class BspCompileTask private (
|
case class BspCompileTask private (
|
||||||
targetId: BuildTargetIdentifier,
|
targetId: BuildTargetIdentifier,
|
||||||
targetName: String,
|
targetName: String,
|
||||||
id: TaskId,
|
id: sbt.internal.bsp.TaskId,
|
||||||
startTimeMillis: Long
|
startTimeMillis: Long
|
||||||
) {
|
) {
|
||||||
import sbt.internal.bsp.codec.JsonProtocol._
|
import sbt.internal.bsp.codec.JsonProtocol._
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ import sbt.util.Monad
|
||||||
/**
|
/**
|
||||||
* Combines metadata `info` and a computation `work` to define a task.
|
* Combines metadata `info` and a computation `work` to define a task.
|
||||||
*/
|
*/
|
||||||
final case class Task[A](info: Info[A], work: Action[A]):
|
final case class Task[A](info: Info[A], work: Action[A]) extends TaskId[A]:
|
||||||
override def toString = info.name getOrElse ("Task(" + info + ")")
|
override def toString = info.name getOrElse ("Task(" + info + ")")
|
||||||
override def hashCode = info.hashCode
|
override def hashCode = info.hashCode
|
||||||
|
|
||||||
|
|
@ -28,7 +28,10 @@ final case class Task[A](info: Info[A], work: Action[A]):
|
||||||
withInfo(info = nextInfo)
|
withInfo(info = nextInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
def tags: TagMap = info get tagsKey getOrElse TagMap.empty
|
def tags: TagMap = info.get(tagsKey).getOrElse(TagMap.empty)
|
||||||
|
def name: Option[String] = info.name
|
||||||
|
|
||||||
|
def attributes: AttributeMap = info.attributes
|
||||||
|
|
||||||
private[sbt] def withInfo(info: Info[A]): Task[A] =
|
private[sbt] def withInfo(info: Info[A]): Task[A] =
|
||||||
Task(info = info, work = this.work)
|
Task(info = info, work = this.work)
|
||||||
|
|
|
||||||
|
|
@ -27,11 +27,11 @@ enum Action[A]:
|
||||||
// private[sbt] def mapTask(f: [A1] => Task[A1] => Task[A1]) = this
|
// private[sbt] def mapTask(f: [A1] => Task[A1] => Task[A1]) = this
|
||||||
|
|
||||||
/** Applies a function to the result of evaluating a heterogeneous list of other tasks. */
|
/** Applies a function to the result of evaluating a heterogeneous list of other tasks. */
|
||||||
case Mapped[A, K[F[_]]](in: K[Task], f: K[Result] => A, alist: AList[K]) extends Action[A]
|
case Mapped[A, K[+F[_]]](in: K[Task], f: K[Result] => A, alist: AList[K]) extends Action[A]
|
||||||
// private[sbt] def mapTask(g: Task ~> Task) = Mapped[A, K](alist.transform(in, g), f, alist)
|
// private[sbt] def mapTask(g: Task ~> Task) = Mapped[A, K](alist.transform(in, g), f, alist)
|
||||||
|
|
||||||
/** Computes another task to evaluate based on results from evaluating other tasks. */
|
/** Computes another task to evaluate based on results from evaluating other tasks. */
|
||||||
case FlatMapped[A, K[F[_]]](
|
case FlatMapped[A, K[+F[_]]](
|
||||||
in: K[Task],
|
in: K[Task],
|
||||||
f: K[Result] => Task[A],
|
f: K[Result] => Task[A],
|
||||||
alist: AList[K],
|
alist: AList[K],
|
||||||
|
|
@ -63,6 +63,7 @@ enum Action[A]:
|
||||||
end Action
|
end Action
|
||||||
|
|
||||||
object Action:
|
object Action:
|
||||||
|
import sbt.std.TaskExtra.*
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Encode this computation as a flatMap.
|
* Encode this computation as a flatMap.
|
||||||
|
|
@ -72,12 +73,12 @@ object Action:
|
||||||
): Action.FlatMapped[A2, [F[_]] =>> Tuple1[F[Either[A1, A2]]]] =
|
): Action.FlatMapped[A2, [F[_]] =>> Tuple1[F[Either[A1, A2]]]] =
|
||||||
val alist = AList.tuple[Tuple1[Either[A1, A2]]]
|
val alist = AList.tuple[Tuple1[Either[A1, A2]]]
|
||||||
val f: Either[A1, A2] => Task[A2] = {
|
val f: Either[A1, A2] => Task[A2] = {
|
||||||
case Right(b) => std.TaskExtra.task(b)
|
case Right(b) => task(b)
|
||||||
case Left(a) => std.TaskExtra.singleInputTask(s.fin).map(_(a))
|
case Left(a) => singleInputTask(s.fin).map(_(a))
|
||||||
}
|
}
|
||||||
Action.FlatMapped[A2, [F[_]] =>> Tuple1[F[Either[A1, A2]]]](
|
Action.FlatMapped[A2, [F[_]] =>> Tuple1[F[Either[A1, A2]]]](
|
||||||
Tuple1(s.fab),
|
Tuple1(s.fab),
|
||||||
{ case Tuple1(r) => (f compose std.TaskExtra.successM)(r) },
|
{ case Tuple1(r) => f.compose(successM)(r) },
|
||||||
alist,
|
alist,
|
||||||
)
|
)
|
||||||
end Action
|
end Action
|
||||||
|
|
|
||||||
|
|
@ -144,7 +144,7 @@ trait TaskExtra extends TaskExtra0 {
|
||||||
: Conversion[(Task[A1], Task[A2]), MultiInTask[[F[_]] =>> Tuple.Map[(A1, A2), F]]] =
|
: Conversion[(Task[A1], Task[A2]), MultiInTask[[F[_]] =>> Tuple.Map[(A1, A2), F]]] =
|
||||||
multT2Task(_)
|
multT2Task(_)
|
||||||
|
|
||||||
final implicit def multInputTask[K[F[_]]: AList](tasks: K[Task]): MultiInTask[K] =
|
final implicit def multInputTask[K[+F[_]]: AList](tasks: K[Task]): MultiInTask[K] =
|
||||||
new MultiInTask[K]:
|
new MultiInTask[K]:
|
||||||
override def flatMapN[A](f: K[Id] => Task[A]): Task[A] =
|
override def flatMapN[A](f: K[Id] => Task[A]): Task[A] =
|
||||||
Task(Info(), Action.FlatMapped[A, K](tasks, f compose allM, AList[K]))
|
Task(Info(), Action.FlatMapped[A, K](tasks, f compose allM, AList[K]))
|
||||||
|
|
|
||||||
|
|
@ -30,8 +30,8 @@ object Transform:
|
||||||
|
|
||||||
final class TaskAndValue[T](val task: Task[T], val value: T)
|
final class TaskAndValue[T](val task: Task[T], val value: T)
|
||||||
|
|
||||||
def dummyMap(dummyMap: DummyTaskMap): Task ~>| Task = {
|
def dummyMap(dummyMap: DummyTaskMap): TaskId ~>| Task = {
|
||||||
val pmap = new DelegatingPMap[Task, Task](new collection.mutable.ListMap)
|
val pmap = new DelegatingPMap[TaskId, Task](new collection.mutable.ListMap)
|
||||||
def add[T](dummy: TaskAndValue[T]): Unit = {
|
def add[T](dummy: TaskAndValue[T]): Unit = {
|
||||||
pmap(dummy.task) = fromDummyStrict(dummy.task, dummy.value)
|
pmap(dummy.task) = fromDummyStrict(dummy.task, dummy.value)
|
||||||
}
|
}
|
||||||
|
|
@ -40,15 +40,15 @@ object Transform:
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Applies `map`, returning the result if defined or returning the input unchanged otherwise. */
|
/** Applies `map`, returning the result if defined or returning the input unchanged otherwise. */
|
||||||
implicit def getOrId(map: Task ~>| Task): [A] => Task[A] => Task[A] =
|
implicit def getOrId(map: TaskId ~>| Task): [A] => TaskId[A] => Task[A] =
|
||||||
[A] => (in: Task[A]) => map(in).getOrElse(in)
|
[A] => (in: TaskId[A]) => map(in).getOrElse(in.asInstanceOf)
|
||||||
|
|
||||||
def apply(dummies: DummyTaskMap) = taskToNode(getOrId(dummyMap(dummies)))
|
def apply(dummies: DummyTaskMap) = taskToNode(getOrId(dummyMap(dummies)))
|
||||||
|
|
||||||
def taskToNode(pre: [A] => Task[A] => Task[A]): NodeView[Task] =
|
def taskToNode(pre: [A] => TaskId[A] => Task[A]): NodeView =
|
||||||
new NodeView[Task]:
|
new NodeView:
|
||||||
import Action.*
|
import Action.*
|
||||||
def apply[T](t: Task[T]): Node[Task, T] = pre(t).work match
|
def apply[T](t: TaskId[T]): Node[T] = pre(t).work match
|
||||||
case Pure(eval, _) => uniform(Nil)(_ => Right(eval()))
|
case Pure(eval, _) => uniform(Nil)(_ => Right(eval()))
|
||||||
case m: Mapped[a, k] => toNode[a, k](m.in)(right[a] compose m.f)(m.alist)
|
case m: Mapped[a, k] => toNode[a, k](m.in)(right[a] compose m.f)(m.alist)
|
||||||
case m: FlatMapped[a, k] =>
|
case m: FlatMapped[a, k] =>
|
||||||
|
|
@ -61,19 +61,19 @@ object Transform:
|
||||||
case DependsOn(in, deps) => uniform(existToAny(deps))(const(Left(in)) compose all)
|
case DependsOn(in, deps) => uniform(existToAny(deps))(const(Left(in)) compose all)
|
||||||
case Join(in, f) => uniform(in)(f)
|
case Join(in, f) => uniform(in)(f)
|
||||||
|
|
||||||
def inline1[T](t: Task[T]): Option[() => T] = t.work match
|
def inline1[T](t: TaskId[T]): Option[() => T] = t match
|
||||||
case Action.Pure(eval, true) => Some(eval)
|
case Task(_, Action.Pure(eval, true)) => Some(eval)
|
||||||
case _ => None
|
case _ => None
|
||||||
|
|
||||||
def uniform[A1, D](tasks: Seq[Task[D]])(
|
def uniform[A1, D](tasks: Seq[Task[D]])(
|
||||||
f: Seq[Result[D]] => Either[Task[A1], A1]
|
f: Seq[Result[D]] => Either[Task[A1], A1]
|
||||||
): Node[Task, A1] =
|
): Node[A1] =
|
||||||
toNode[A1, [F[_]] =>> List[F[D]]](tasks.toList)(f)(AList.list[D])
|
toNode[A1, [F[_]] =>> List[F[D]]](tasks.toList)(f)(AList.list[D])
|
||||||
|
|
||||||
def toNode[A1, K1[F[_]]: AList](
|
def toNode[A1, K1[F[_]]: AList](
|
||||||
inputs: K1[Task]
|
inputs: K1[TaskId]
|
||||||
)(f: K1[Result] => Either[Task[A1], A1]): Node[Task, A1] =
|
)(f: K1[Result] => Either[Task[A1], A1]): Node[A1] =
|
||||||
new Node[Task, A1]:
|
new Node[A1]:
|
||||||
type K[F[_]] = K1[F]
|
type K[F[_]] = K1[F]
|
||||||
val in = inputs
|
val in = inputs
|
||||||
lazy val alist: AList[K] = AList[K]
|
lazy val alist: AList[K] = AList[K]
|
||||||
|
|
|
||||||
|
|
@ -22,15 +22,15 @@ object TaskGen extends std.TaskExtra {
|
||||||
val TaskListGen = MaxTasksGen.flatMap(size => Gen.listOfN(size, Arbitrary.arbInt.arbitrary))
|
val TaskListGen = MaxTasksGen.flatMap(size => Gen.listOfN(size, Arbitrary.arbInt.arbitrary))
|
||||||
|
|
||||||
def run[T](root: Task[T], checkCycles: Boolean, maxWorkers: Int): Result[T] = {
|
def run[T](root: Task[T], checkCycles: Boolean, maxWorkers: Int): Result[T] = {
|
||||||
val (service, shutdown) = CompletionService[Task[_], Completed](maxWorkers)
|
val (service, shutdown) = CompletionService(maxWorkers)
|
||||||
val dummies = std.Transform.DummyTaskMap(Nil)
|
val dummies = std.Transform.DummyTaskMap(Nil)
|
||||||
val x = new Execute[Task](
|
val x = new Execute(
|
||||||
Execute.config(checkCycles),
|
Execute.config(checkCycles),
|
||||||
Execute.noTriggers,
|
Execute.noTriggers,
|
||||||
ExecuteProgress.empty[Task]
|
ExecuteProgress.empty
|
||||||
)(using std.Transform(dummies))
|
)(using std.Transform(dummies))
|
||||||
try {
|
try {
|
||||||
x.run(root)(using service.asInstanceOf)
|
x.run(root)(using service)
|
||||||
} finally {
|
} finally {
|
||||||
shutdown()
|
shutdown()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -50,5 +50,5 @@ object TaskRunnerCircularTest extends Properties("TaskRunner Circular") {
|
||||||
def cyclic(i: Incomplete) =
|
def cyclic(i: Incomplete) =
|
||||||
Incomplete
|
Incomplete
|
||||||
.allExceptions(i)
|
.allExceptions(i)
|
||||||
.exists(_.isInstanceOf[Execute[({ type A[_] <: AnyRef })#A @unchecked]#CyclicException[_]])
|
.exists(_.isInstanceOf[Execute#CyclicException])
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ import TaskGen.MaxWorkers
|
||||||
import org.scalacheck._
|
import org.scalacheck._
|
||||||
import Prop.forAll
|
import Prop.forAll
|
||||||
import Transform.taskToNode
|
import Transform.taskToNode
|
||||||
import ConcurrentRestrictions.{ completionService, limitTotal, tagged => tagged0, TagMap }
|
import ConcurrentRestrictions.{ completionService, limitTotal, tagged }
|
||||||
|
|
||||||
import java.util.concurrent.{ CountDownLatch, TimeUnit }
|
import java.util.concurrent.{ CountDownLatch, TimeUnit }
|
||||||
|
|
||||||
|
|
@ -52,7 +52,7 @@ object TaskSerial extends Properties("task serial") {
|
||||||
|
|
||||||
def checkArbitrary(
|
def checkArbitrary(
|
||||||
size: Int,
|
size: Int,
|
||||||
restrictions: ConcurrentRestrictions[Task[_]],
|
restrictions: ConcurrentRestrictions,
|
||||||
shouldSucceed: Boolean
|
shouldSucceed: Boolean
|
||||||
) = {
|
) = {
|
||||||
val latch = task { new CountDownLatch(size) }
|
val latch = task { new CountDownLatch(size) }
|
||||||
|
|
@ -70,8 +70,7 @@ object TaskSerial extends Properties("task serial") {
|
||||||
"Some tasks were unschedulable: verify this is an actual failure by extending the timeout to several seconds."
|
"Some tasks were unschedulable: verify this is an actual failure by extending the timeout to several seconds."
|
||||||
def scheduledMsg = "All tasks were unexpectedly scheduled."
|
def scheduledMsg = "All tasks were unexpectedly scheduled."
|
||||||
|
|
||||||
def tagged(f: TagMap => Boolean) = tagged0[Task[_]](_.tags, f)
|
def evalRestricted[T](t: Task[T])(restrictions: ConcurrentRestrictions): T =
|
||||||
def evalRestricted[T](t: Task[T])(restrictions: ConcurrentRestrictions[Task[_]]): T =
|
|
||||||
tryRun[T](t, checkCycles, restrictions)
|
tryRun[T](t, checkCycles, restrictions)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -79,18 +78,18 @@ object TaskTest {
|
||||||
def run[T](
|
def run[T](
|
||||||
root: Task[T],
|
root: Task[T],
|
||||||
checkCycles: Boolean,
|
checkCycles: Boolean,
|
||||||
restrictions: ConcurrentRestrictions[Task[_]]
|
restrictions: ConcurrentRestrictions
|
||||||
): Result[T] = {
|
): Result[T] = {
|
||||||
val (service, shutdown) =
|
val (service, shutdown) =
|
||||||
completionService[Task[_], Completed](restrictions, (x: String) => System.err.println(x))
|
completionService(restrictions, (x: String) => System.err.println(x))
|
||||||
|
|
||||||
val x = new Execute[Task](
|
val x = new Execute(
|
||||||
Execute.config(checkCycles),
|
Execute.config(checkCycles),
|
||||||
Execute.noTriggers,
|
Execute.noTriggers,
|
||||||
ExecuteProgress.empty[Task]
|
ExecuteProgress.empty
|
||||||
)(using taskToNode(idK[Task]))
|
)(using taskToNode([A] => (id: TaskId[A]) => id.asInstanceOf))
|
||||||
try {
|
try {
|
||||||
x.run(root)(using service.asInstanceOf)
|
x.run(root)(using service)
|
||||||
} finally {
|
} finally {
|
||||||
shutdown()
|
shutdown()
|
||||||
}
|
}
|
||||||
|
|
@ -98,7 +97,7 @@ object TaskTest {
|
||||||
def tryRun[T](
|
def tryRun[T](
|
||||||
root: Task[T],
|
root: Task[T],
|
||||||
checkCycles: Boolean,
|
checkCycles: Boolean,
|
||||||
restrictions: ConcurrentRestrictions[Task[_]]
|
restrictions: ConcurrentRestrictions
|
||||||
): T =
|
): T =
|
||||||
run(root, checkCycles, restrictions) match {
|
run(root, checkCycles, restrictions) match {
|
||||||
case Result.Value(v) => v
|
case Result.Value(v) => v
|
||||||
|
|
|
||||||
|
|
@ -7,19 +7,19 @@
|
||||||
|
|
||||||
package sbt
|
package sbt
|
||||||
|
|
||||||
trait CompletionService[A, R]:
|
trait CompletionService:
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Submits a work node A with work that returns R. In Execute this is used for tasks returning
|
* Submits a work node A with work that returns R. In Execute this is used for tasks returning
|
||||||
* sbt.Completed.
|
* sbt.Completed.
|
||||||
*/
|
*/
|
||||||
def submit(node: A, work: () => R): Unit
|
def submit(node: TaskId[?], work: () => Completed): Unit
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieves and removes the result from the next completed task, waiting if none are yet present.
|
* Retrieves and removes the result from the next completed task, waiting if none are yet present.
|
||||||
* In Execute this is used for tasks returning sbt.Completed.
|
* In Execute this is used for tasks returning sbt.Completed.
|
||||||
*/
|
*/
|
||||||
def take(): R
|
def take(): Completed
|
||||||
end CompletionService
|
end CompletionService
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
|
@ -35,35 +35,40 @@ import java.util.concurrent.{
|
||||||
|
|
||||||
object CompletionService {
|
object CompletionService {
|
||||||
val poolID = new AtomicInteger(1)
|
val poolID = new AtomicInteger(1)
|
||||||
def apply[A, T](poolSize: Int): (CompletionService[A, T], () => Unit) = {
|
def apply(poolSize: Int): (CompletionService, () => Unit) = {
|
||||||
val i = new AtomicInteger(1)
|
val i = new AtomicInteger(1)
|
||||||
val id = poolID.getAndIncrement()
|
val id = poolID.getAndIncrement()
|
||||||
val pool = Executors.newFixedThreadPool(
|
val pool = Executors.newFixedThreadPool(
|
||||||
poolSize,
|
poolSize,
|
||||||
(r: Runnable) => new Thread(r, s"sbt-completion-thread-$id-${i.getAndIncrement}")
|
(r: Runnable) => new Thread(r, s"sbt-completion-thread-$id-${i.getAndIncrement}")
|
||||||
)
|
)
|
||||||
(apply[A, T](pool), () => { pool.shutdownNow(); () })
|
(apply(pool), () => { pool.shutdownNow(); () })
|
||||||
}
|
}
|
||||||
|
|
||||||
def apply[A, T](x: Executor): CompletionService[A, T] =
|
def apply(x: Executor): CompletionService =
|
||||||
apply(new ExecutorCompletionService[T](x))
|
apply(new ExecutorCompletionService[Completed](x))
|
||||||
|
|
||||||
def apply[A, T](completion: JCompletionService[T]): CompletionService[A, T] =
|
def apply(completion: JCompletionService[Completed]): CompletionService =
|
||||||
new CompletionService[A, T] {
|
new CompletionService {
|
||||||
def submit(node: A, work: () => T) = { CompletionService.submit(work, completion); () }
|
def submit(node: TaskId[?], work: () => Completed) = {
|
||||||
|
CompletionService.submit(work, completion); ()
|
||||||
|
}
|
||||||
def take() = completion.take().get()
|
def take() = completion.take().get()
|
||||||
}
|
}
|
||||||
|
|
||||||
def submit[T](work: () => T, completion: JCompletionService[T]): () => T = {
|
def submit(work: () => Completed, completion: JCompletionService[Completed]): () => Completed = {
|
||||||
val future = submitFuture[T](work, completion)
|
val future = submitFuture(work, completion)
|
||||||
() => future.get
|
() => future.get
|
||||||
}
|
}
|
||||||
|
|
||||||
private[sbt] def submitFuture[A](work: () => A, completion: JCompletionService[A]): JFuture[A] = {
|
private[sbt] def submitFuture(
|
||||||
|
work: () => Completed,
|
||||||
|
completion: JCompletionService[Completed]
|
||||||
|
): JFuture[Completed] = {
|
||||||
val future =
|
val future =
|
||||||
try
|
try
|
||||||
completion.submit {
|
completion.submit {
|
||||||
new Callable[A] {
|
new Callable[Completed] {
|
||||||
def call =
|
def call =
|
||||||
try {
|
try {
|
||||||
work()
|
work()
|
||||||
|
|
@ -79,9 +84,9 @@ object CompletionService {
|
||||||
}
|
}
|
||||||
future
|
future
|
||||||
}
|
}
|
||||||
def manage[A, T](
|
def manage(
|
||||||
service: CompletionService[A, T]
|
service: CompletionService
|
||||||
)(setup: A => Unit, cleanup: A => Unit): CompletionService[A, T] =
|
)(setup: TaskId[?] => Unit, cleanup: TaskId[?] => Unit): CompletionService =
|
||||||
wrap(service) { (node, work) => () =>
|
wrap(service) { (node, work) => () =>
|
||||||
setup(node)
|
setup(node)
|
||||||
try {
|
try {
|
||||||
|
|
@ -90,11 +95,11 @@ object CompletionService {
|
||||||
cleanup(node)
|
cleanup(node)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def wrap[A, T](
|
def wrap(
|
||||||
service: CompletionService[A, T]
|
service: CompletionService
|
||||||
)(w: (A, () => T) => (() => T)): CompletionService[A, T] =
|
)(w: (TaskId[?], () => Completed) => (() => Completed)): CompletionService =
|
||||||
new CompletionService[A, T] {
|
new CompletionService {
|
||||||
def submit(node: A, work: () => T) = service.submit(node, w(node, work))
|
def submit(node: TaskId[?], work: () => Completed) = service.submit(node, w(node, work))
|
||||||
def take() = service.take()
|
def take() = service.take()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -13,14 +13,12 @@ import sbt.internal.util.AttributeKey
|
||||||
import java.util.concurrent.atomic.AtomicBoolean
|
import java.util.concurrent.atomic.AtomicBoolean
|
||||||
import java.util.concurrent.{ Future => JFuture, RejectedExecutionException }
|
import java.util.concurrent.{ Future => JFuture, RejectedExecutionException }
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
|
import scala.jdk.CollectionConverters.*
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Describes restrictions on concurrent execution for a set of tasks.
|
* Describes restrictions on concurrent execution for a set of tasks.
|
||||||
*
|
|
||||||
* @tparam A
|
|
||||||
* the type of a task
|
|
||||||
*/
|
*/
|
||||||
trait ConcurrentRestrictions[A] {
|
trait ConcurrentRestrictions {
|
||||||
|
|
||||||
/** Internal state type used to describe a set of tasks. */
|
/** Internal state type used to describe a set of tasks. */
|
||||||
type G
|
type G
|
||||||
|
|
@ -29,10 +27,10 @@ trait ConcurrentRestrictions[A] {
|
||||||
def empty: G
|
def empty: G
|
||||||
|
|
||||||
/** Updates the description `g` to include a new task `a`. */
|
/** Updates the description `g` to include a new task `a`. */
|
||||||
def add(g: G, a: A): G
|
def add(g: G, a: TaskId[?]): G
|
||||||
|
|
||||||
/** Updates the description `g` to remove a previously added task `a`. */
|
/** Updates the description `g` to remove a previously added task `a`. */
|
||||||
def remove(g: G, a: A): G
|
def remove(g: G, a: TaskId[?]): G
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if the tasks described by `g` are allowed to execute concurrently. The methods in
|
* Returns true if the tasks described by `g` are allowed to execute concurrently. The methods in
|
||||||
|
|
@ -54,8 +52,7 @@ import java.util.concurrent.{ Executor, Executors, ExecutorCompletionService }
|
||||||
import annotation.tailrec
|
import annotation.tailrec
|
||||||
|
|
||||||
object ConcurrentRestrictions {
|
object ConcurrentRestrictions {
|
||||||
private[this] val completionServices = new java.util.WeakHashMap[CompletionService[_, _], Boolean]
|
private[this] val completionServices = new java.util.WeakHashMap[CompletionService, Boolean]
|
||||||
import scala.collection.JavaConverters._
|
|
||||||
def cancelAll() = completionServices.keySet.asScala.toVector.foreach {
|
def cancelAll() = completionServices.keySet.asScala.toVector.foreach {
|
||||||
case a: AutoCloseable => a.close()
|
case a: AutoCloseable => a.close()
|
||||||
case _ =>
|
case _ =>
|
||||||
|
|
@ -71,22 +68,22 @@ object ConcurrentRestrictions {
|
||||||
* @param zero
|
* @param zero
|
||||||
* the constant placeholder used for t
|
* the constant placeholder used for t
|
||||||
*/
|
*/
|
||||||
def unrestricted[A]: ConcurrentRestrictions[A] =
|
def unrestricted: ConcurrentRestrictions =
|
||||||
new ConcurrentRestrictions[A] {
|
new ConcurrentRestrictions {
|
||||||
type G = Unit
|
type G = Unit
|
||||||
def empty = ()
|
def empty = ()
|
||||||
def add(g: G, a: A) = ()
|
def add(g: G, a: TaskId[?]) = ()
|
||||||
def remove(g: G, a: A) = ()
|
def remove(g: G, a: TaskId[?]) = ()
|
||||||
def valid(g: G) = true
|
def valid(g: G) = true
|
||||||
}
|
}
|
||||||
|
|
||||||
def limitTotal[A](i: Int): ConcurrentRestrictions[A] = {
|
def limitTotal(i: Int): ConcurrentRestrictions = {
|
||||||
assert(i >= 1, "Maximum must be at least 1 (was " + i + ")")
|
assert(i >= 1, "Maximum must be at least 1 (was " + i + ")")
|
||||||
new ConcurrentRestrictions[A] {
|
new ConcurrentRestrictions {
|
||||||
type G = Int
|
type G = Int
|
||||||
def empty = 0
|
def empty = 0
|
||||||
def add(g: Int, a: A) = g + 1
|
def add(g: Int, a: TaskId[?]) = g + 1
|
||||||
def remove(g: Int, a: A) = g - 1
|
def remove(g: Int, a: TaskId[?]) = g - 1
|
||||||
def valid(g: Int) = g <= i
|
def valid(g: Int) = g <= i
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -108,26 +105,25 @@ object ConcurrentRestrictions {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implements concurrency restrictions on tasks based on Tags.
|
* Implements concurrency restrictions on tasks based on Tags.
|
||||||
* @tparam A
|
|
||||||
* type of a task
|
|
||||||
* @param get
|
* @param get
|
||||||
* extracts tags from a task
|
* extracts tags from a task
|
||||||
* @param validF
|
* @param validF
|
||||||
* defines whether a set of tasks are allowed to execute concurrently based on their merged tags
|
* defines whether a set of tasks are allowed to execute concurrently based on their merged tags
|
||||||
*/
|
*/
|
||||||
def tagged[A](get: A => TagMap, validF: TagMap => Boolean): ConcurrentRestrictions[A] =
|
def tagged(validF: TagMap => Boolean): ConcurrentRestrictions =
|
||||||
new ConcurrentRestrictions[A] {
|
new ConcurrentRestrictions {
|
||||||
type G = TagMap
|
type G = TagMap
|
||||||
def empty = Map.empty
|
def empty = Map.empty
|
||||||
def add(g: TagMap, a: A) = merge(g, a, get)(_ + _)
|
def add(g: TagMap, a: TaskId[?]) = merge(g, a)(_ + _)
|
||||||
def remove(g: TagMap, a: A) = merge(g, a, get)(_ - _)
|
def remove(g: TagMap, a: TaskId[?]) = merge(g, a)(_ - _)
|
||||||
def valid(g: TagMap) = validF(g)
|
def valid(g: TagMap) = validF(g)
|
||||||
}
|
}
|
||||||
|
|
||||||
private[this] def merge[A](m: TagMap, a: A, get: A => TagMap)(f: (Int, Int) => Int): TagMap = {
|
private[this] def merge(m: TagMap, a: TaskId[?])(
|
||||||
val aTags = get(a)
|
f: (Int, Int) => Int
|
||||||
val base = merge(m, aTags)(f)
|
): TagMap = {
|
||||||
val un = if (aTags.isEmpty) update(base, Untagged, 1)(f) else base
|
val base = merge(m, a.tags)(f)
|
||||||
|
val un = if (a.tags.isEmpty) update(base, Untagged, 1)(f) else base
|
||||||
update(un, All, 1)(f)
|
update(un, All, 1)(f)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -154,26 +150,26 @@ object ConcurrentRestrictions {
|
||||||
* @tparam R
|
* @tparam R
|
||||||
* the type of data that will be computed by the CompletionService.
|
* the type of data that will be computed by the CompletionService.
|
||||||
*/
|
*/
|
||||||
def completionService[A, R](
|
def completionService(
|
||||||
tags: ConcurrentRestrictions[A],
|
tags: ConcurrentRestrictions,
|
||||||
warn: String => Unit
|
warn: String => Unit
|
||||||
): (CompletionService[A, R], () => Unit) = {
|
): (CompletionService, () => Unit) = {
|
||||||
val id = poolID.getAndIncrement
|
val id = poolID.getAndIncrement
|
||||||
val i = new AtomicInteger(1)
|
val i = new AtomicInteger(1)
|
||||||
val pool = Executors.newCachedThreadPool { r =>
|
val pool = Executors.newCachedThreadPool { r =>
|
||||||
new Thread(r, s"sbt-completion-service-pool-$id-${i.getAndIncrement()}")
|
new Thread(r, s"sbt-completion-service-pool-$id-${i.getAndIncrement()}")
|
||||||
}
|
}
|
||||||
val service = completionService[A, R](pool, tags, warn)
|
val service = completionService(pool, tags, warn)
|
||||||
(service, () => { pool.shutdownNow(); () })
|
(service, () => { pool.shutdownNow(); () })
|
||||||
}
|
}
|
||||||
|
|
||||||
def completionService[A, R](
|
def completionService(
|
||||||
tags: ConcurrentRestrictions[A],
|
tags: ConcurrentRestrictions,
|
||||||
warn: String => Unit,
|
warn: String => Unit,
|
||||||
isSentinel: A => Boolean
|
isSentinel: TaskId[?] => Boolean
|
||||||
): (CompletionService[A, R], () => Unit) = {
|
): (CompletionService, () => Unit) = {
|
||||||
val pool = Executors.newCachedThreadPool()
|
val pool = Executors.newCachedThreadPool()
|
||||||
val service = completionService[A, R](pool, tags, warn, isSentinel)
|
val service = completionService(pool, tags, warn, isSentinel)
|
||||||
(
|
(
|
||||||
service,
|
service,
|
||||||
() => {
|
() => {
|
||||||
|
|
@ -183,13 +179,13 @@ object ConcurrentRestrictions {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
def cancellableCompletionService[A, R](
|
def cancellableCompletionService(
|
||||||
tags: ConcurrentRestrictions[A],
|
tags: ConcurrentRestrictions,
|
||||||
warn: String => Unit,
|
warn: String => Unit,
|
||||||
isSentinel: A => Boolean
|
isSentinel: TaskId[?] => Boolean
|
||||||
): (CompletionService[A, R], Boolean => Unit) = {
|
): (CompletionService, Boolean => Unit) = {
|
||||||
val pool = Executors.newCachedThreadPool()
|
val pool = Executors.newCachedThreadPool()
|
||||||
val service = completionService[A, R](pool, tags, warn, isSentinel)
|
val service = completionService(pool, tags, warn, isSentinel)
|
||||||
(
|
(
|
||||||
service,
|
service,
|
||||||
force => {
|
force => {
|
||||||
|
|
@ -200,12 +196,12 @@ object ConcurrentRestrictions {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
def completionService[A, R](
|
def completionService(
|
||||||
backing: Executor,
|
backing: Executor,
|
||||||
tags: ConcurrentRestrictions[A],
|
tags: ConcurrentRestrictions,
|
||||||
warn: String => Unit
|
warn: String => Unit
|
||||||
): CompletionService[A, R] with AutoCloseable = {
|
): CompletionService with AutoCloseable = {
|
||||||
completionService[A, R](backing, tags, warn, (_: A) => false)
|
completionService(backing, tags, warn, _ => false)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -213,17 +209,17 @@ object ConcurrentRestrictions {
|
||||||
* restrictions on concurrent task execution and using the provided Executor to manage execution
|
* restrictions on concurrent task execution and using the provided Executor to manage execution
|
||||||
* on threads.
|
* on threads.
|
||||||
*/
|
*/
|
||||||
def completionService[A, R](
|
def completionService(
|
||||||
backing: Executor,
|
backing: Executor,
|
||||||
tags: ConcurrentRestrictions[A],
|
tags: ConcurrentRestrictions,
|
||||||
warn: String => Unit,
|
warn: String => Unit,
|
||||||
isSentinel: A => Boolean,
|
isSentinel: TaskId[?] => Boolean,
|
||||||
): CompletionService[A, R] with CancelSentiels with AutoCloseable = {
|
): CompletionService with CancelSentiels with AutoCloseable = {
|
||||||
|
|
||||||
// Represents submitted work for a task.
|
// Represents submitted work for a task.
|
||||||
final class Enqueue(val node: A, val work: () => R)
|
final class Enqueue(val node: TaskId[?], val work: () => Completed)
|
||||||
|
|
||||||
new CompletionService[A, R] with CancelSentiels with AutoCloseable {
|
new CompletionService with CancelSentiels with AutoCloseable {
|
||||||
completionServices.put(this, true)
|
completionServices.put(this, true)
|
||||||
private[this] val closed = new AtomicBoolean(false)
|
private[this] val closed = new AtomicBoolean(false)
|
||||||
override def close(): Unit = if (closed.compareAndSet(false, true)) {
|
override def close(): Unit = if (closed.compareAndSet(false, true)) {
|
||||||
|
|
@ -232,7 +228,7 @@ object ConcurrentRestrictions {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Backing service used to manage execution on threads once all constraints are satisfied. */
|
/** Backing service used to manage execution on threads once all constraints are satisfied. */
|
||||||
private[this] val jservice = new ExecutorCompletionService[R](backing)
|
private[this] val jservice = new ExecutorCompletionService[Completed](backing)
|
||||||
|
|
||||||
/** The description of the currently running tasks, used by `tags` to manage restrictions. */
|
/** The description of the currently running tasks, used by `tags` to manage restrictions. */
|
||||||
private[this] var tagState = tags.empty
|
private[this] var tagState = tags.empty
|
||||||
|
|
@ -255,7 +251,7 @@ object ConcurrentRestrictions {
|
||||||
sentinels.clear()
|
sentinels.clear()
|
||||||
}
|
}
|
||||||
|
|
||||||
def submit(node: A, work: () => R): Unit = synchronized {
|
def submit(node: TaskId[?], work: () => Completed): Unit = synchronized {
|
||||||
if (closed.get) throw new RejectedExecutionException
|
if (closed.get) throw new RejectedExecutionException
|
||||||
else if (isSentinel(node)) {
|
else if (isSentinel(node)) {
|
||||||
// skip all checks for sentinels
|
// skip all checks for sentinels
|
||||||
|
|
@ -276,7 +272,7 @@ object ConcurrentRestrictions {
|
||||||
}
|
}
|
||||||
()
|
()
|
||||||
}
|
}
|
||||||
private[this] def submitValid(node: A, work: () => R): Unit = {
|
private[this] def submitValid(node: TaskId[?], work: () => Completed): Unit = {
|
||||||
running += 1
|
running += 1
|
||||||
val wrappedWork = () =>
|
val wrappedWork = () =>
|
||||||
try work()
|
try work()
|
||||||
|
|
@ -284,7 +280,7 @@ object ConcurrentRestrictions {
|
||||||
CompletionService.submitFuture(wrappedWork, jservice)
|
CompletionService.submitFuture(wrappedWork, jservice)
|
||||||
()
|
()
|
||||||
}
|
}
|
||||||
private[this] def cleanup(node: A): Unit = synchronized {
|
private[this] def cleanup(node: TaskId[?]): Unit = synchronized {
|
||||||
running -= 1
|
running -= 1
|
||||||
tagState = tags.remove(tagState, node)
|
tagState = tags.remove(tagState, node)
|
||||||
if (!tags.valid(tagState)) {
|
if (!tags.valid(tagState)) {
|
||||||
|
|
@ -320,7 +316,7 @@ object ConcurrentRestrictions {
|
||||||
submitValid(tried)
|
submitValid(tried)
|
||||||
}
|
}
|
||||||
|
|
||||||
def take(): R = {
|
def take(): Completed = {
|
||||||
if (closed.get)
|
if (closed.get)
|
||||||
throw new RejectedExecutionException(
|
throw new RejectedExecutionException(
|
||||||
"Tried to get values for a closed completion service"
|
"Tried to get values for a closed completion service"
|
||||||
|
|
|
||||||
|
|
@ -11,23 +11,24 @@ import java.util.concurrent.ExecutionException
|
||||||
|
|
||||||
import sbt.internal.util.ErrorHandling.wideConvert
|
import sbt.internal.util.ErrorHandling.wideConvert
|
||||||
import sbt.internal.util.{ DelegatingPMap, IDSet, PMap, RMap, ~> }
|
import sbt.internal.util.{ DelegatingPMap, IDSet, PMap, RMap, ~> }
|
||||||
import sbt.internal.util.Types._
|
import sbt.internal.util.Types.*
|
||||||
import sbt.internal.util.Util.nilSeq
|
import sbt.internal.util.Util.nilSeq
|
||||||
import Execute._
|
|
||||||
|
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
import scala.collection.JavaConverters._
|
import scala.jdk.CollectionConverters.*
|
||||||
import mutable.Map
|
import mutable.Map
|
||||||
import sbt.internal.util.AList
|
import sbt.internal.util.AList
|
||||||
|
|
||||||
private[sbt] object Execute {
|
private[sbt] object Execute {
|
||||||
def idMap[A1, A2]: Map[A1, A2] = (new java.util.IdentityHashMap[A1, A2]).asScala
|
def taskMap[A]: Map[TaskId[?], A] = (new java.util.IdentityHashMap[TaskId[?], A]).asScala
|
||||||
def pMap[F1[_], F2[_]]: PMap[F1, F2] = new DelegatingPMap[F1, F2](idMap)
|
def taskPMap[F[_]]: PMap[TaskId, F] = new DelegatingPMap(
|
||||||
|
(new java.util.IdentityHashMap[TaskId[Any], F[Any]]).asScala
|
||||||
|
)
|
||||||
private[sbt] def completed(p: => Unit): Completed = new Completed {
|
private[sbt] def completed(p: => Unit): Completed = new Completed {
|
||||||
def process(): Unit = p
|
def process(): Unit = p
|
||||||
}
|
}
|
||||||
def noTriggers[F[_]] = new Triggers[F](Map.empty, Map.empty, idFun)
|
def noTriggers[TaskId[_]] = new Triggers(Map.empty, Map.empty, idFun)
|
||||||
|
|
||||||
def config(checkCycles: Boolean, overwriteNode: Incomplete => Boolean = const(false)): Config =
|
def config(checkCycles: Boolean, overwriteNode: Incomplete => Boolean = const(false)): Config =
|
||||||
new Config(checkCycles, overwriteNode)
|
new Config(checkCycles, overwriteNode)
|
||||||
|
|
@ -44,33 +45,33 @@ sealed trait Completed {
|
||||||
def process(): Unit
|
def process(): Unit
|
||||||
}
|
}
|
||||||
|
|
||||||
private[sbt] trait NodeView[F[_]] {
|
private[sbt] trait NodeView {
|
||||||
def apply[A](a: F[A]): Node[F, A]
|
def apply[A](a: TaskId[A]): Node[A]
|
||||||
def inline1[A](a: F[A]): Option[() => A]
|
def inline1[A](a: TaskId[A]): Option[() => A]
|
||||||
}
|
}
|
||||||
|
|
||||||
final class Triggers[F[_]](
|
final class Triggers(
|
||||||
val runBefore: collection.Map[F[Any], Seq[F[Any]]],
|
val runBefore: collection.Map[TaskId[?], Seq[TaskId[?]]],
|
||||||
val injectFor: collection.Map[F[Any], Seq[F[Any]]],
|
val injectFor: collection.Map[TaskId[?], Seq[TaskId[?]]],
|
||||||
val onComplete: RMap[F, Result] => RMap[F, Result],
|
val onComplete: RMap[TaskId, Result] => RMap[TaskId, Result],
|
||||||
)
|
)
|
||||||
|
|
||||||
private[sbt] final class Execute[F[_] <: AnyRef](
|
private[sbt] final class Execute(
|
||||||
config: Config,
|
config: Execute.Config,
|
||||||
triggers: Triggers[F],
|
triggers: Triggers,
|
||||||
progress: ExecuteProgress[F]
|
progress: ExecuteProgress
|
||||||
)(using view: NodeView[F]) {
|
)(using view: NodeView) {
|
||||||
type Strategy = CompletionService[F[Any], Completed]
|
import Execute.*
|
||||||
|
|
||||||
private[this] val forward = idMap[F[Any], IDSet[F[Any]]]
|
private[this] val forward = taskMap[IDSet[TaskId[?]]]
|
||||||
private[this] val reverse = idMap[F[Any], Iterable[F[Any]]]
|
private[this] val reverse = taskMap[Iterable[TaskId[?]]]
|
||||||
private[this] val callers = pMap[F, Compose[IDSet, F]]
|
private[this] val callers = taskPMap[[X] =>> IDSet[TaskId[X]]]
|
||||||
private[this] val state = idMap[F[Any], State]
|
private[this] val state = taskMap[State]
|
||||||
private[this] val viewCache = pMap[F, Node[F, *]]
|
private[this] val viewCache = taskPMap[Node]
|
||||||
private[this] val results = pMap[F, Result]
|
private[this] val results = taskPMap[Result]
|
||||||
|
|
||||||
private[this] val getResult: [A] => F[A] => Result[A] = [A] =>
|
private[this] val getResult: [A] => TaskId[A] => Result[A] = [A] =>
|
||||||
(a: F[A]) =>
|
(a: TaskId[A]) =>
|
||||||
view.inline1(a) match
|
view.inline1(a) match
|
||||||
case Some(v) => Result.Value(v())
|
case Some(v) => Result.Value(v())
|
||||||
case None => results(a)
|
case None => results(a)
|
||||||
|
|
@ -85,12 +86,12 @@ private[sbt] final class Execute[F[_] <: AnyRef](
|
||||||
def dump: String =
|
def dump: String =
|
||||||
"State: " + state.toString + "\n\nResults: " + results + "\n\nCalls: " + callers + "\n\n"
|
"State: " + state.toString + "\n\nResults: " + results + "\n\nCalls: " + callers + "\n\n"
|
||||||
|
|
||||||
def run[A](root: F[A])(using strategy: Strategy): Result[A] =
|
def run[A](root: TaskId[A])(using strategy: CompletionService): Result[A] =
|
||||||
try {
|
try {
|
||||||
runKeep(root)(root)
|
runKeep(root)(root)
|
||||||
} catch { case i: Incomplete => Result.Inc(i) }
|
} catch { case i: Incomplete => Result.Inc(i) }
|
||||||
|
|
||||||
def runKeep[A](root: F[A])(using strategy: Strategy): RMap[F, Result] = {
|
def runKeep[A](root: TaskId[A])(using strategy: CompletionService): RMap[TaskId, Result] = {
|
||||||
assert(state.isEmpty, "Execute already running/ran.")
|
assert(state.isEmpty, "Execute already running/ran.")
|
||||||
|
|
||||||
addNew(root)
|
addNew(root)
|
||||||
|
|
@ -102,7 +103,7 @@ private[sbt] final class Execute[F[_] <: AnyRef](
|
||||||
finalResults
|
finalResults
|
||||||
}
|
}
|
||||||
|
|
||||||
def processAll()(using strategy: Strategy): Unit = {
|
def processAll()(using strategy: CompletionService): Unit = {
|
||||||
@tailrec def next(): Unit = {
|
@tailrec def next(): Unit = {
|
||||||
pre {
|
pre {
|
||||||
assert(reverse.nonEmpty, "Nothing to process.")
|
assert(reverse.nonEmpty, "Nothing to process.")
|
||||||
|
|
@ -135,7 +136,7 @@ private[sbt] final class Execute[F[_] <: AnyRef](
|
||||||
}
|
}
|
||||||
def dumpCalling: String = state.filter(_._2 == Calling).mkString("\n\t")
|
def dumpCalling: String = state.filter(_._2 == Calling).mkString("\n\t")
|
||||||
|
|
||||||
def call[A](node: F[A], target: F[A])(using strategy: Strategy): Unit = {
|
def call[A](node: TaskId[A], target: TaskId[A])(using strategy: CompletionService): Unit = {
|
||||||
if (config.checkCycles) cycleCheck(node, target)
|
if (config.checkCycles) cycleCheck(node, target)
|
||||||
pre {
|
pre {
|
||||||
assert(running(node))
|
assert(running(node))
|
||||||
|
|
@ -145,7 +146,7 @@ private[sbt] final class Execute[F[_] <: AnyRef](
|
||||||
results.get(target) match {
|
results.get(target) match {
|
||||||
case Some(result) => retire(node, result)
|
case Some(result) => retire(node, result)
|
||||||
case None =>
|
case None =>
|
||||||
state(node.asInstanceOf) = Calling
|
state(node) = Calling
|
||||||
addChecked(target)
|
addChecked(target)
|
||||||
addCaller(node, target)
|
addCaller(node, target)
|
||||||
}
|
}
|
||||||
|
|
@ -160,18 +161,16 @@ private[sbt] final class Execute[F[_] <: AnyRef](
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def retire[A](node: F[A], result: Result[A])(using strategy: Strategy): Unit = {
|
def retire[A](node: TaskId[A], result: Result[A])(using strategy: CompletionService): Unit = {
|
||||||
pre {
|
pre {
|
||||||
assert(running(node) | calling(node))
|
assert(running(node) | calling(node))
|
||||||
readyInv(node)
|
readyInv(node)
|
||||||
}
|
}
|
||||||
|
|
||||||
results(node) = result
|
results(node) = result
|
||||||
state(node.asInstanceOf) = Done
|
state(node) = Done
|
||||||
progress.afterCompleted(node, result)
|
progress.afterCompleted(node, result)
|
||||||
remove(reverse.asInstanceOf[Map[F[A], Iterable[F[Any]]]], node) foreach { dep =>
|
remove(reverse, node).foreach(dep => notifyDone(node, dep))
|
||||||
notifyDone(node, dep.asInstanceOf)
|
|
||||||
}
|
|
||||||
callers.remove(node).toList.flatten.foreach { c =>
|
callers.remove(node).toList.flatten.foreach { c =>
|
||||||
retire(c, callerResult(c, result))
|
retire(c, callerResult(c, result))
|
||||||
}
|
}
|
||||||
|
|
@ -183,23 +182,23 @@ private[sbt] final class Execute[F[_] <: AnyRef](
|
||||||
assert(done(node))
|
assert(done(node))
|
||||||
assert(results(node) == result)
|
assert(results(node) == result)
|
||||||
readyInv(node)
|
readyInv(node)
|
||||||
assert(!(reverse.contains(node.asInstanceOf)))
|
assert(!(reverse.contains(node)))
|
||||||
assert(!(callers.contains(node)))
|
assert(!(callers.contains(node)))
|
||||||
assert(triggeredBy(node) forall added)
|
assert(triggeredBy(node) forall added)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def callerResult[A](node: F[A], result: Result[A]): Result[A] =
|
def callerResult[A](node: TaskId[A], result: Result[A]): Result[A] =
|
||||||
result match {
|
result match {
|
||||||
case _: Result.Value[A] => result
|
case _: Result.Value[A] => result
|
||||||
case Result.Inc(i) => Result.Inc(Incomplete(Some(node), tpe = i.tpe, causes = i :: Nil))
|
case Result.Inc(i) => Result.Inc(Incomplete(Some(node), tpe = i.tpe, causes = i :: Nil))
|
||||||
}
|
}
|
||||||
|
|
||||||
def notifyDone[A](node: F[A], dependent: F[Any])(using strategy: Strategy): Unit = {
|
def notifyDone(node: TaskId[?], dependent: TaskId[?])(using strategy: CompletionService): Unit = {
|
||||||
val f = forward(dependent)
|
val f = forward(dependent)
|
||||||
f -= node.asInstanceOf
|
f -= node
|
||||||
if (f.isEmpty) {
|
if (f.isEmpty) {
|
||||||
remove[F[Any], IDSet[F[Any]]](forward.asInstanceOf, dependent)
|
remove(forward, dependent)
|
||||||
ready[Any](dependent)
|
ready(dependent)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -208,7 +207,7 @@ private[sbt] final class Execute[F[_] <: AnyRef](
|
||||||
* inputs and dependencies have completed. Its computation is then evaluated and made available
|
* inputs and dependencies have completed. Its computation is then evaluated and made available
|
||||||
* for nodes that have it as an input.
|
* for nodes that have it as an input.
|
||||||
*/
|
*/
|
||||||
def addChecked[A](node: F[A])(using strategy: Strategy): Unit = {
|
def addChecked[A](node: TaskId[A])(using strategy: CompletionService): Unit = {
|
||||||
if (!added(node)) addNew(node)
|
if (!added(node)) addNew(node)
|
||||||
|
|
||||||
post { addedInv(node) }
|
post { addedInv(node) }
|
||||||
|
|
@ -219,14 +218,14 @@ private[sbt] final class Execute[F[_] <: AnyRef](
|
||||||
* have finished, the node's computation is scheduled to run. The node's dependencies will be
|
* have finished, the node's computation is scheduled to run. The node's dependencies will be
|
||||||
* added (transitively) if they are not already registered.
|
* added (transitively) if they are not already registered.
|
||||||
*/
|
*/
|
||||||
def addNew[A](node: F[A])(using strategy: Strategy): Unit = {
|
def addNew(node: TaskId[?])(using strategy: CompletionService): Unit = {
|
||||||
pre { newPre(node) }
|
pre { newPre(node) }
|
||||||
|
|
||||||
val v = register(node)
|
val v = register(node)
|
||||||
val deps: Iterable[F[Any]] = dependencies(v) ++ runBefore(node.asInstanceOf)
|
val deps = dependencies(v) ++ runBefore(node)
|
||||||
val active = IDSet[F[Any]](deps filter notDone.asInstanceOf)
|
val active = IDSet[TaskId[?]](deps filter notDone)
|
||||||
progress.afterRegistered(
|
progress.afterRegistered(
|
||||||
node.asInstanceOf,
|
node,
|
||||||
deps,
|
deps,
|
||||||
active.toList
|
active.toList
|
||||||
/* active is mutable, so take a snapshot */
|
/* active is mutable, so take a snapshot */
|
||||||
|
|
@ -234,10 +233,10 @@ private[sbt] final class Execute[F[_] <: AnyRef](
|
||||||
|
|
||||||
if (active.isEmpty) ready(node)
|
if (active.isEmpty) ready(node)
|
||||||
else {
|
else {
|
||||||
forward(node.asInstanceOf) = active.asInstanceOf
|
forward(node) = active
|
||||||
for (a <- active) {
|
for (a <- active) {
|
||||||
addChecked[Any](a.asInstanceOf)
|
addChecked(a)
|
||||||
addReverse[Any](a.asInstanceOf, node.asInstanceOf)
|
addReverse(a, node)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -253,45 +252,47 @@ private[sbt] final class Execute[F[_] <: AnyRef](
|
||||||
* Called when a pending 'node' becomes runnable. All of its dependencies must be done. This
|
* Called when a pending 'node' becomes runnable. All of its dependencies must be done. This
|
||||||
* schedules the node's computation with 'strategy'.
|
* schedules the node's computation with 'strategy'.
|
||||||
*/
|
*/
|
||||||
def ready[A](node: F[A])(using strategy: Strategy): Unit = {
|
def ready(node: TaskId[?])(using strategy: CompletionService): Unit = {
|
||||||
pre {
|
pre {
|
||||||
assert(pending(node))
|
assert(pending(node))
|
||||||
readyInv(node)
|
readyInv(node)
|
||||||
assert(reverse.contains(node.asInstanceOf))
|
assert(reverse.contains(node))
|
||||||
}
|
}
|
||||||
|
|
||||||
state(node.asInstanceOf) = Running
|
state(node) = Running
|
||||||
progress.afterReady(node.asInstanceOf)
|
progress.afterReady(node)
|
||||||
submit(node)
|
submit(node)
|
||||||
|
|
||||||
post {
|
post {
|
||||||
readyInv(node)
|
readyInv(node)
|
||||||
assert(reverse.contains(node.asInstanceOf))
|
assert(reverse.contains(node))
|
||||||
assert(running(node))
|
assert(running(node))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Enters the given node into the system. */
|
/** Enters the given node into the system. */
|
||||||
def register[A](node: F[A]): Node[F, A] = {
|
def register[A](node: TaskId[A]): Node[A] = {
|
||||||
state(node.asInstanceOf) = Pending
|
state(node) = Pending
|
||||||
reverse(node.asInstanceOf) = Seq()
|
reverse(node) = Seq()
|
||||||
viewCache.getOrUpdate(node, view(node))
|
viewCache.getOrUpdate(node, view(node))
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Send the work for this node to the provided Strategy. */
|
/** Send the work for this node to the provided Strategy. */
|
||||||
def submit[A](node: F[A])(using strategy: Strategy): Unit = {
|
def submit(node: TaskId[?])(using strategy: CompletionService): Unit = {
|
||||||
val v = viewCache(node)
|
val v = viewCache(node)
|
||||||
val rs = v.alist.transform[F, Result](v.in)(getResult)
|
val rs = v.alist.transform(v.in)(getResult)
|
||||||
// v.alist.transform(v.in)(getResult)
|
// v.alist.transform(v.in)(getResult)
|
||||||
strategy.submit(node.asInstanceOf, () => work(node, v.work(rs)))
|
strategy.submit(node, () => work(node, v.work(rs)))
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Evaluates the computation 'f' for 'node'. This returns a Completed instance, which contains the
|
* Evaluates the computation 'f' for 'node'. This returns a Completed instance, which contains the
|
||||||
* post-processing to perform after the result is retrieved from the Strategy.
|
* post-processing to perform after the result is retrieved from the Strategy.
|
||||||
*/
|
*/
|
||||||
def work[A](node: F[A], f: => Either[F[A], A])(using strategy: Strategy): Completed = {
|
def work[A](node: TaskId[A], f: => Either[TaskId[A], A])(using
|
||||||
progress.beforeWork(node.asInstanceOf)
|
strategy: CompletionService
|
||||||
|
): Completed = {
|
||||||
|
progress.beforeWork(node)
|
||||||
val rawResult = wideConvert(f).left.map {
|
val rawResult = wideConvert(f).left.map {
|
||||||
case i: Incomplete => if (config.overwriteNode(i)) i.copy(node = Some(node)) else i
|
case i: Incomplete => if (config.overwriteNode(i)) i.copy(node = Some(node)) else i
|
||||||
case e => Incomplete(Some(node), Incomplete.Error, directCause = Some(e))
|
case e => Incomplete(Some(node), Incomplete.Error, directCause = Some(e))
|
||||||
|
|
@ -306,8 +307,8 @@ private[sbt] final class Execute[F[_] <: AnyRef](
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
private[this] def rewrap[A](
|
private[this] def rewrap[A](
|
||||||
rawResult: Either[Incomplete, Either[F[A], A]]
|
rawResult: Either[Incomplete, Either[TaskId[A], A]]
|
||||||
): Either[F[A], Result[A]] =
|
): Either[TaskId[A], Result[A]] =
|
||||||
rawResult match {
|
rawResult match {
|
||||||
case Left(i) => Right(Result.Inc(i))
|
case Left(i) => Right(Result.Inc(i))
|
||||||
case Right(Right(v)) => Right(Result.Value(v))
|
case Right(Right(v)) => Right(Result.Value(v))
|
||||||
|
|
@ -317,108 +318,105 @@ private[sbt] final class Execute[F[_] <: AnyRef](
|
||||||
def remove[K, V](map: Map[K, V], k: K): V =
|
def remove[K, V](map: Map[K, V], k: K): V =
|
||||||
map.remove(k).getOrElse(sys.error("Key '" + k + "' not in map :\n" + map))
|
map.remove(k).getOrElse(sys.error("Key '" + k + "' not in map :\n" + map))
|
||||||
|
|
||||||
def addReverse[A](node: F[A], dependent: F[Any]): Unit =
|
def addReverse(node: TaskId[?], dependent: TaskId[?]): Unit =
|
||||||
reverse(node.asInstanceOf) ++= Seq(dependent)
|
reverse(node) ++= Seq(dependent)
|
||||||
def addCaller[A](caller: F[A], target: F[A]): Unit =
|
def addCaller[A](caller: TaskId[A], target: TaskId[A]): Unit =
|
||||||
callers.getOrUpdate(target, IDSet.create[F[A]]) += caller
|
callers.getOrUpdate(target, IDSet.create) += caller
|
||||||
|
|
||||||
def dependencies[A](node: F[A]): Iterable[F[Any]] = dependencies(viewCache(node.asInstanceOf))
|
def dependencies(node: TaskId[?]): Iterable[TaskId[?]] = dependencies(viewCache(node))
|
||||||
def dependencies[A](v: Node[F, A]): Iterable[F[Any]] =
|
def dependencies(v: Node[?]): Iterable[TaskId[?]] =
|
||||||
v.alist.toList[F](v.in).filter(dep => view.inline1(dep).isEmpty)
|
v.alist.toList(v.in).filter(dep => view.inline1(dep).isEmpty)
|
||||||
|
|
||||||
def runBefore[A](node: F[A]): Seq[F[A]] =
|
def runBefore(node: TaskId[?]): Seq[TaskId[?]] = triggers.runBefore.getOrElse(node, nilSeq)
|
||||||
getSeq[A](triggers.runBefore, node)
|
def triggeredBy(node: TaskId[?]): Seq[TaskId[?]] = triggers.injectFor.getOrElse(node, nilSeq)
|
||||||
def triggeredBy[A](node: F[A]): Seq[F[A]] = getSeq(triggers.injectFor, node)
|
|
||||||
def getSeq[A](map: collection.Map[F[Any], Seq[F[Any]]], node: F[A]): Seq[F[A]] =
|
|
||||||
map.getOrElse(node.asInstanceOf, nilSeq[F[Any]]).asInstanceOf
|
|
||||||
|
|
||||||
// Contracts
|
// Contracts
|
||||||
|
|
||||||
def addedInv[A](node: F[A]): Unit = topologicalSort(node) foreach addedCheck
|
def addedInv(node: TaskId[?]): Unit = topologicalSort(node).foreach(addedCheck)
|
||||||
def addedCheck[A](node: F[A]): Unit = {
|
def addedCheck(node: TaskId[?]): Unit = {
|
||||||
assert(added(node), "Not added: " + node)
|
assert(added(node), "Not added: " + node)
|
||||||
assert(viewCache.contains[Any](node.asInstanceOf), "Not in view cache: " + node)
|
assert(viewCache.contains(node), "Not in view cache: " + node)
|
||||||
dependencyCheck(node.asInstanceOf)
|
dependencyCheck(node)
|
||||||
}
|
}
|
||||||
def dependencyCheck(node: F[Any]): Unit = {
|
def dependencyCheck(node: TaskId[?]): Unit = {
|
||||||
dependencies(node) foreach { dep =>
|
dependencies(node) foreach { dep =>
|
||||||
def onOpt[A](o: Option[A])(f: A => Boolean) = o match {
|
def onOpt[A](o: Option[A])(f: A => Boolean) = o match {
|
||||||
case None => false; case Some(x) => f(x)
|
case None => false; case Some(x) => f(x)
|
||||||
}
|
}
|
||||||
def checkForward = onOpt(forward.get(node.asInstanceOf)) { _ contains dep.asInstanceOf }
|
def checkForward = onOpt(forward.get(node))(_.contains(dep))
|
||||||
def checkReverse = onOpt(reverse.get(dep.asInstanceOf)) { _.exists(_ == node) }
|
def checkReverse = onOpt(reverse.get(dep))(_.exists(_ == node))
|
||||||
assert(done(dep.asInstanceOf) ^ (checkForward && checkReverse))
|
assert(done(dep) ^ (checkForward && checkReverse))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def pendingInv[A](node: F[A]): Unit = {
|
def pendingInv(node: TaskId[?]): Unit = {
|
||||||
assert(atState(node, Pending))
|
assert(atState(node, Pending))
|
||||||
assert((dependencies(node) ++ runBefore(node)) exists notDone.asInstanceOf)
|
assert((dependencies(node) ++ runBefore(node)).exists(notDone))
|
||||||
}
|
}
|
||||||
def runningInv[A](node: F[A]): Unit = {
|
def runningInv(node: TaskId[?]): Unit = {
|
||||||
assert(dependencies(node) forall done.asInstanceOf)
|
assert(dependencies(node).forall(done))
|
||||||
assert(!(forward.contains(node.asInstanceOf)))
|
assert(!(forward.contains(node)))
|
||||||
}
|
}
|
||||||
def newPre[A](node: F[A]): Unit = {
|
def newPre(node: TaskId[?]): Unit = {
|
||||||
isNew(node)
|
isNew(node)
|
||||||
assert(!(reverse.contains(node.asInstanceOf)))
|
assert(!(reverse.contains(node)))
|
||||||
assert(!(forward.contains(node.asInstanceOf)))
|
assert(!(forward.contains(node)))
|
||||||
assert(!(callers.contains[Any](node.asInstanceOf)))
|
assert(!(callers.contains(node)))
|
||||||
assert(!(viewCache.contains[Any](node.asInstanceOf)))
|
assert(!(viewCache.contains(node)))
|
||||||
assert(!(results.contains[Any](node.asInstanceOf)))
|
assert(!(results.contains(node)))
|
||||||
}
|
}
|
||||||
|
|
||||||
def topologicalSort[A](node: F[A]): Seq[F[Any]] = {
|
def topologicalSort(node: TaskId[?]): Seq[TaskId[?]] = {
|
||||||
val seen = IDSet.create[F[Any]]
|
val seen = IDSet.create[TaskId[?]]
|
||||||
def visit(n: F[Any]): List[F[Any]] =
|
def visit(n: TaskId[?]): List[TaskId[?]] =
|
||||||
(seen process n)(List[F[Any]]()) {
|
seen.process(n)(List.empty) {
|
||||||
node.asInstanceOf :: dependencies(n).foldLeft(List[F[Any]]()) { (ss, dep) =>
|
val deps: List[TaskId[?]] =
|
||||||
visit(dep.asInstanceOf) ::: ss
|
dependencies(n).foldLeft(List.empty)((ss, dep) => visit(dep) ::: ss)
|
||||||
}
|
node :: deps
|
||||||
}
|
}
|
||||||
|
|
||||||
visit(node.asInstanceOf).reverse
|
visit(node).reverse
|
||||||
}
|
}
|
||||||
|
|
||||||
def readyInv[A](node: F[A]): Unit = {
|
def readyInv(node: TaskId[?]): Unit = {
|
||||||
assert(dependencies(node) forall done.asInstanceOf)
|
assert(dependencies(node).forall(done))
|
||||||
assert(!(forward.contains(node.asInstanceOf)))
|
assert(!(forward.contains(node)))
|
||||||
}
|
}
|
||||||
|
|
||||||
// cyclic reference checking
|
// cyclic reference checking
|
||||||
|
|
||||||
def snapshotCycleCheck(): Unit =
|
def snapshotCycleCheck(): Unit =
|
||||||
callers.toSeq foreach { case (called: F[c], callers) =>
|
callers.toSeq foreach { case (called, callers) =>
|
||||||
for (caller <- callers) cycleCheck(caller.asInstanceOf[F[c]], called)
|
for (caller <- callers) cycleCheck(caller, called)
|
||||||
}
|
}
|
||||||
|
|
||||||
def cycleCheck[A](node: F[A], target: F[A]): Unit = {
|
def cycleCheck(node: TaskId[?], target: TaskId[?]): Unit = {
|
||||||
if (node eq target) cyclic(node, target, "Cannot call self")
|
if (node eq target) cyclic(node, target, "Cannot call self")
|
||||||
val all = IDSet.create[F[A]]
|
val all = IDSet.create[TaskId[?]]
|
||||||
def allCallers(n: F[A]): Unit = (all process n)(()) {
|
def allCallers(n: TaskId[?]): Unit = (all process n)(()) {
|
||||||
callers.get(n).toList.flatten.foreach(allCallers)
|
callers.get(n).toList.flatten.foreach(allCallers)
|
||||||
}
|
}
|
||||||
allCallers(node)
|
allCallers(node)
|
||||||
if (all contains target) cyclic(node, target, "Cyclic reference")
|
if (all contains target) cyclic(node, target, "Cyclic reference")
|
||||||
}
|
}
|
||||||
def cyclic[A](caller: F[A], target: F[A], msg: String) =
|
def cyclic(caller: TaskId[?], target: TaskId[?], msg: String) =
|
||||||
throw new Incomplete(
|
throw new Incomplete(
|
||||||
Some(caller),
|
Some(caller),
|
||||||
message = Some(msg),
|
message = Some(msg),
|
||||||
directCause = Some(new CyclicException(caller, target, msg))
|
directCause = Some(new CyclicException(caller, target, msg))
|
||||||
)
|
)
|
||||||
final class CyclicException[A](val caller: F[A], val target: F[A], msg: String)
|
final class CyclicException(val caller: TaskId[?], val target: TaskId[?], msg: String)
|
||||||
extends Exception(msg)
|
extends Exception(msg)
|
||||||
|
|
||||||
// state testing
|
// state testing
|
||||||
|
|
||||||
def pending[A](d: F[A]) = atState(d, Pending)
|
def pending(d: TaskId[?]) = atState(d, Pending)
|
||||||
def running[A](d: F[A]) = atState(d, Running)
|
def running(d: TaskId[?]) = atState(d, Running)
|
||||||
def calling[A](d: F[A]) = atState(d, Calling)
|
def calling(d: TaskId[?]) = atState(d, Calling)
|
||||||
def done[A](d: F[A]) = atState(d, Done)
|
def done(d: TaskId[?]) = atState(d, Done)
|
||||||
def notDone[A](d: F[A]) = !done(d)
|
def notDone(d: TaskId[?]) = !done(d)
|
||||||
private def atState[A](d: F[A], s: State) = state.get(d.asInstanceOf) == Some(s)
|
private def atState(d: TaskId[?], s: State) = state.get(d) == Some(s)
|
||||||
def isNew[A](d: F[A]) = !added(d)
|
def isNew(d: TaskId[?]) = !added(d)
|
||||||
def added[A](d: F[A]) = state.contains(d.asInstanceOf)
|
def added(d: TaskId[?]) = state.contains(d)
|
||||||
def complete = state.values.forall(_ == Done)
|
def complete = state.values.forall(_ == Done)
|
||||||
|
|
||||||
def pre(f: => Unit) = if (checkPreAndPostConditions) f
|
def pre(f: => Unit) = if (checkPreAndPostConditions) f
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@ import sbt.internal.util.RMap
|
||||||
* except `started` and `finished`, which is called from the executing task's thread. All methods
|
* except `started` and `finished`, which is called from the executing task's thread. All methods
|
||||||
* should return quickly to avoid task execution overhead.
|
* should return quickly to avoid task execution overhead.
|
||||||
*/
|
*/
|
||||||
trait ExecuteProgress[F[_]] {
|
trait ExecuteProgress {
|
||||||
def initial(): Unit
|
def initial(): Unit
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -22,20 +22,24 @@ trait ExecuteProgress[F[_]] {
|
||||||
* `task` are `allDeps` and the subset of those dependencies that have not completed are
|
* `task` are `allDeps` and the subset of those dependencies that have not completed are
|
||||||
* `pendingDeps`.
|
* `pendingDeps`.
|
||||||
*/
|
*/
|
||||||
def afterRegistered(task: F[Any], allDeps: Iterable[F[Any]], pendingDeps: Iterable[F[Any]]): Unit
|
def afterRegistered(
|
||||||
|
task: TaskId[?],
|
||||||
|
allDeps: Iterable[TaskId[?]],
|
||||||
|
pendingDeps: Iterable[TaskId[?]]
|
||||||
|
): Unit
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Notifies that all of the dependencies of `task` have completed and `task` is therefore ready to
|
* Notifies that all of the dependencies of `task` have completed and `task` is therefore ready to
|
||||||
* run. The task has not been scheduled on a thread yet.
|
* run. The task has not been scheduled on a thread yet.
|
||||||
*/
|
*/
|
||||||
def afterReady(task: F[Any]): Unit
|
def afterReady(task: TaskId[?]): Unit
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Notifies that the work for `task` is starting after this call returns. This is called from the
|
* Notifies that the work for `task` is starting after this call returns. This is called from the
|
||||||
* thread the task executes on, unlike most other methods in this callback. It is called
|
* thread the task executes on, unlike most other methods in this callback. It is called
|
||||||
* immediately before the task's work starts with minimal intervening executor overhead.
|
* immediately before the task's work starts with minimal intervening executor overhead.
|
||||||
*/
|
*/
|
||||||
def beforeWork(task: F[Any]): Unit
|
def beforeWork(task: TaskId[?]): Unit
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Notifies that the work for `task` work has finished. The task may have computed the next task
|
* Notifies that the work for `task` work has finished. The task may have computed the next task
|
||||||
|
|
@ -45,16 +49,16 @@ trait ExecuteProgress[F[_]] {
|
||||||
* executes on, unlike most other methods in this callback. It is immediately called after the
|
* executes on, unlike most other methods in this callback. It is immediately called after the
|
||||||
* task's work is complete with minimal intervening executor overhead.
|
* task's work is complete with minimal intervening executor overhead.
|
||||||
*/
|
*/
|
||||||
def afterWork[A](task: F[A], result: Either[F[A], Result[A]]): Unit
|
def afterWork[A](task: TaskId[A], result: Either[TaskId[A], Result[A]]): Unit
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Notifies that `task` has completed. The task's work is done with a final `result`. Any tasks
|
* Notifies that `task` has completed. The task's work is done with a final `result`. Any tasks
|
||||||
* called by `task` have completed.
|
* called by `task` have completed.
|
||||||
*/
|
*/
|
||||||
def afterCompleted[A](task: F[A], result: Result[A]): Unit
|
def afterCompleted[A](task: TaskId[A], result: Result[A]): Unit
|
||||||
|
|
||||||
/** All tasks have completed with the final `results` provided. */
|
/** All tasks have completed with the final `results` provided. */
|
||||||
def afterAllCompleted(results: RMap[F, Result]): Unit
|
def afterAllCompleted(results: RMap[TaskId, Result]): Unit
|
||||||
|
|
||||||
/** Notifies that either all tasks have finished or cancelled. */
|
/** Notifies that either all tasks have finished or cancelled. */
|
||||||
def stop(): Unit
|
def stop(): Unit
|
||||||
|
|
@ -64,46 +68,46 @@ trait ExecuteProgress[F[_]] {
|
||||||
* This module is experimental and subject to binary and source incompatible changes at any time.
|
* This module is experimental and subject to binary and source incompatible changes at any time.
|
||||||
*/
|
*/
|
||||||
object ExecuteProgress {
|
object ExecuteProgress {
|
||||||
def empty[F[_]]: ExecuteProgress[F] = new ExecuteProgress[F] {
|
def empty: ExecuteProgress = new ExecuteProgress {
|
||||||
override def initial(): Unit = ()
|
override def initial(): Unit = ()
|
||||||
override def afterRegistered(
|
override def afterRegistered(
|
||||||
task: F[Any],
|
task: TaskId[?],
|
||||||
allDeps: Iterable[F[Any]],
|
allDeps: Iterable[TaskId[?]],
|
||||||
pendingDeps: Iterable[F[Any]]
|
pendingDeps: Iterable[TaskId[?]]
|
||||||
): Unit =
|
): Unit =
|
||||||
()
|
()
|
||||||
override def afterReady(task: F[Any]): Unit = ()
|
override def afterReady(task: TaskId[?]): Unit = ()
|
||||||
override def beforeWork(task: F[Any]): Unit = ()
|
override def beforeWork(task: TaskId[?]): Unit = ()
|
||||||
override def afterWork[A](task: F[A], result: Either[F[A], Result[A]]): Unit = ()
|
override def afterWork[A](task: TaskId[A], result: Either[TaskId[A], Result[A]]): Unit = ()
|
||||||
override def afterCompleted[A](task: F[A], result: Result[A]): Unit = ()
|
override def afterCompleted[A](task: TaskId[A], result: Result[A]): Unit = ()
|
||||||
override def afterAllCompleted(results: RMap[F, Result]): Unit = ()
|
override def afterAllCompleted(results: RMap[TaskId, Result]): Unit = ()
|
||||||
override def stop(): Unit = ()
|
override def stop(): Unit = ()
|
||||||
}
|
}
|
||||||
|
|
||||||
def aggregate[F[_]](reporters: Seq[ExecuteProgress[F]]) = new ExecuteProgress[F] {
|
def aggregate(reporters: Seq[ExecuteProgress]) = new ExecuteProgress {
|
||||||
override def initial(): Unit = {
|
override def initial(): Unit = {
|
||||||
reporters foreach { _.initial() }
|
reporters foreach { _.initial() }
|
||||||
}
|
}
|
||||||
override def afterRegistered(
|
override def afterRegistered(
|
||||||
task: F[Any],
|
task: TaskId[?],
|
||||||
allDeps: Iterable[F[Any]],
|
allDeps: Iterable[TaskId[?]],
|
||||||
pendingDeps: Iterable[F[Any]]
|
pendingDeps: Iterable[TaskId[?]]
|
||||||
): Unit = {
|
): Unit = {
|
||||||
reporters foreach { _.afterRegistered(task, allDeps, pendingDeps) }
|
reporters foreach { _.afterRegistered(task, allDeps, pendingDeps) }
|
||||||
}
|
}
|
||||||
override def afterReady(task: F[Any]): Unit = {
|
override def afterReady(task: TaskId[?]): Unit = {
|
||||||
reporters foreach { _.afterReady(task) }
|
reporters foreach { _.afterReady(task) }
|
||||||
}
|
}
|
||||||
override def beforeWork(task: F[Any]): Unit = {
|
override def beforeWork(task: TaskId[?]): Unit = {
|
||||||
reporters foreach { _.beforeWork(task) }
|
reporters foreach { _.beforeWork(task) }
|
||||||
}
|
}
|
||||||
override def afterWork[A](task: F[A], result: Either[F[A], Result[A]]): Unit = {
|
override def afterWork[A](task: TaskId[A], result: Either[TaskId[A], Result[A]]): Unit = {
|
||||||
reporters foreach { _.afterWork(task, result) }
|
reporters foreach { _.afterWork(task, result) }
|
||||||
}
|
}
|
||||||
override def afterCompleted[A](task: F[A], result: Result[A]): Unit = {
|
override def afterCompleted[A](task: TaskId[A], result: Result[A]): Unit = {
|
||||||
reporters foreach { _.afterCompleted(task, result) }
|
reporters foreach { _.afterCompleted(task, result) }
|
||||||
}
|
}
|
||||||
override def afterAllCompleted(results: RMap[F, Result]): Unit = {
|
override def afterAllCompleted(results: RMap[TaskId, Result]): Unit = {
|
||||||
reporters foreach { _.afterAllCompleted(results) }
|
reporters foreach { _.afterAllCompleted(results) }
|
||||||
}
|
}
|
||||||
override def stop(): Unit = {
|
override def stop(): Unit = {
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ import scala.collection.mutable.ListBuffer
|
||||||
|
|
||||||
import sbt.internal.util.IDSet
|
import sbt.internal.util.IDSet
|
||||||
import Incomplete.{ Error, Value => IValue }
|
import Incomplete.{ Error, Value => IValue }
|
||||||
|
import scala.jdk.CollectionConverters.*
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Describes why a task did not complete.
|
* Describes why a task did not complete.
|
||||||
|
|
@ -45,7 +46,6 @@ object Incomplete extends Enumeration {
|
||||||
def transformTD(i: Incomplete)(f: Incomplete => Incomplete): Incomplete = transform(i, true)(f)
|
def transformTD(i: Incomplete)(f: Incomplete => Incomplete): Incomplete = transform(i, true)(f)
|
||||||
def transformBU(i: Incomplete)(f: Incomplete => Incomplete): Incomplete = transform(i, false)(f)
|
def transformBU(i: Incomplete)(f: Incomplete => Incomplete): Incomplete = transform(i, false)(f)
|
||||||
def transform(i: Incomplete, topDown: Boolean)(f: Incomplete => Incomplete): Incomplete = {
|
def transform(i: Incomplete, topDown: Boolean)(f: Incomplete => Incomplete): Incomplete = {
|
||||||
import collection.JavaConverters._
|
|
||||||
val visited: collection.mutable.Map[Incomplete, Incomplete] =
|
val visited: collection.mutable.Map[Incomplete, Incomplete] =
|
||||||
(new java.util.IdentityHashMap[Incomplete, Incomplete]).asScala
|
(new java.util.IdentityHashMap[Incomplete, Incomplete]).asScala
|
||||||
def visit(inc: Incomplete): Incomplete =
|
def visit(inc: Incomplete): Incomplete =
|
||||||
|
|
|
||||||
|
|
@ -17,11 +17,11 @@ import sbt.internal.util.AList
|
||||||
* @tparam A
|
* @tparam A
|
||||||
* the type computed by this node
|
* the type computed by this node
|
||||||
*/
|
*/
|
||||||
private[sbt] trait Node[Effect[_], A]:
|
private[sbt] trait Node[A]:
|
||||||
type K[L[x]]
|
type K[L[x]]
|
||||||
def in: K[Effect]
|
def in: K[TaskId]
|
||||||
def alist: AList[K]
|
def alist: AList[K]
|
||||||
|
|
||||||
/** Computes the result of this task given the results from the inputs. */
|
/** Computes the result of this task given the results from the inputs. */
|
||||||
def work(inputs: K[Result]): Either[Effect[A], A]
|
def work(inputs: K[Result]): Either[TaskId[A], A]
|
||||||
end Node
|
end Node
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,6 @@
|
||||||
|
package sbt
|
||||||
|
|
||||||
|
import sbt.internal.util.AttributeMap
|
||||||
|
|
||||||
|
trait TaskId[A]:
|
||||||
|
def tags: ConcurrentRestrictions.TagMap
|
||||||
Loading…
Reference in New Issue