From 7ea05067365dee1d35562b83b5883ec5c2b37a28 Mon Sep 17 00:00:00 2001 From: Adrien Piquerez Date: Tue, 28 Nov 2023 14:20:29 +0100 Subject: [PATCH] Introduce TaskId to reduce abstraction around Execute --- .../sbt/internal/EvaluateConfigurations.scala | 14 +- main-settings/src/main/scala/sbt/Def.scala | 5 +- .../src/main/scala/sbt/Previous.scala | 4 +- .../src/main/scala/sbt/Structure.scala | 2 +- main/src/main/scala/sbt/EvaluateTask.scala | 76 +++--- main/src/main/scala/sbt/Keys.scala | 4 +- .../sbt/internal/AbstractTaskProgress.scala | 57 ++-- .../scala/sbt/internal/BuildStructure.scala | 2 +- .../main/scala/sbt/internal/TaskName.scala | 8 +- .../scala/sbt/internal/TaskProgress.scala | 29 ++- .../main/scala/sbt/internal/TaskTimings.scala | 8 +- .../scala/sbt/internal/TaskTraceEvent.scala | 10 +- .../sbt/internal/server/BspCompileTask.scala | 2 +- tasks-standard/src/main/scala/sbt/Task.scala | 7 +- .../src/main/scala/sbt/internal/Action.scala | 11 +- .../src/main/scala/sbt/std/TaskExtra.scala | 2 +- .../src/main/scala/sbt/std/Transform.scala | 28 +- tasks-standard/src/test/scala/TaskGen.scala | 8 +- .../src/test/scala/TaskRunnerCircular.scala | 2 +- .../src/test/scala/TaskSerial.scala | 21 +- .../main/scala/sbt/CompletionService.scala | 49 ++-- .../scala/sbt/ConcurrentRestrictions.scala | 106 ++++---- tasks/src/main/scala/sbt/Execute.scala | 244 +++++++++--------- .../src/main/scala/sbt/ExecuteProgress.scala | 54 ++-- tasks/src/main/scala/sbt/Incomplete.scala | 2 +- tasks/src/main/scala/sbt/Node.scala | 6 +- tasks/src/main/scala/sbt/TaskId.scala | 6 + 27 files changed, 388 insertions(+), 379 deletions(-) create mode 100644 tasks/src/main/scala/sbt/TaskId.scala diff --git a/buildfile/src/main/scala/sbt/internal/EvaluateConfigurations.scala b/buildfile/src/main/scala/sbt/internal/EvaluateConfigurations.scala index 7efd8d908..8df2ef103 100644 --- a/buildfile/src/main/scala/sbt/internal/EvaluateConfigurations.scala +++ b/buildfile/src/main/scala/sbt/internal/EvaluateConfigurations.scala @@ -25,7 +25,7 @@ import Scope.GlobalScope import sbt.SlashSyntax0.* import sbt.internal.parser.SbtParser import sbt.io.IO -import scala.collection.JavaConverters.* +import scala.jdk.CollectionConverters.* import xsbti.VirtualFile import xsbti.VirtualFileRef @@ -403,14 +403,14 @@ object Index { ) } - private[this] type TriggerMap = collection.mutable.HashMap[Task[Any], Seq[Task[Any]]] + private[this] type TriggerMap = collection.mutable.HashMap[TaskId[?], Seq[TaskId[?]]] - def triggers(ss: Settings[Scope]): Triggers[Task] = { + def triggers(ss: Settings[Scope]): Triggers = { val runBefore = new TriggerMap val triggeredBy = new TriggerMap ss.data.values foreach ( _.entries foreach { - case AttributeEntry(_, value: Task[Any]) => + case AttributeEntry(_, value: Task[?]) => val as = value.info.attributes update(runBefore, value, as.get(Def.runBefore.asInstanceOf)) update(triggeredBy, value, as.get(Def.triggeredBy.asInstanceOf)) @@ -418,13 +418,13 @@ object Index { } ) val onComplete = (GlobalScope / Def.onComplete) get ss getOrElse (() => ()) - new Triggers[Task](runBefore, triggeredBy, map => { onComplete(); map }) + new Triggers(runBefore, triggeredBy, map => { onComplete(); map }) } private[this] def update( map: TriggerMap, - base: Task[Any], - tasksOpt: Option[Seq[Task[Any]]] + base: Task[?], + tasksOpt: Option[Seq[Task[?]]] ): Unit = for { tasks <- tasksOpt diff --git a/main-settings/src/main/scala/sbt/Def.scala b/main-settings/src/main/scala/sbt/Def.scala index 6ce01cff7..00644099c 100644 --- a/main-settings/src/main/scala/sbt/Def.scala +++ b/main-settings/src/main/scala/sbt/Def.scala @@ -382,10 +382,11 @@ object Def extends Init[Scope] with TaskMacroExtra with InitializeImplicits: (TaskKey[A](name, description, DTask), dummyTask(name)) private[sbt] def dummyTask[T](name: String): Task[T] = { - import std.TaskExtra.{ task => newTask, _ } + import std.TaskExtra.{ task => newTask, toTaskInfo } val base: Task[T] = newTask( sys.error("Dummy task '" + name + "' did not get converted to a full task.") - ) named name + ) + .named(name) base.copy(info = base.info.set(isDummyTask, true)) } diff --git a/main-settings/src/main/scala/sbt/Previous.scala b/main-settings/src/main/scala/sbt/Previous.scala index b26e67dfa..4db97b7f3 100644 --- a/main-settings/src/main/scala/sbt/Previous.scala +++ b/main-settings/src/main/scala/sbt/Previous.scala @@ -115,7 +115,7 @@ object Previous { /** Persists values of tasks t where there is some task referencing it via t.previous. */ private[sbt] def complete( referenced: References, - results: RMap[Task, Result], + results: RMap[TaskId, Result], streams: Streams ): Unit = { val map = referenced.getReferences @@ -124,7 +124,7 @@ object Previous { // We first collect all of the successful tasks and write their scoped key into a map // along with their values. val successfulTaskResults = (for - case results.TPair(task, Result.Value(v)) <- results.toTypedSeq + results.TPair(task: Task[?], Result.Value(v)) <- results.toTypedSeq key <- task.info.attributes.get(Def.taskDefinitionKey).asInstanceOf[Option[AnyTaskKey]] yield key -> v).toMap // We then traverse the successful results and look up all of the referenced values for diff --git a/main-settings/src/main/scala/sbt/Structure.scala b/main-settings/src/main/scala/sbt/Structure.scala index 19ebd8b72..ded747f25 100644 --- a/main-settings/src/main/scala/sbt/Structure.scala +++ b/main-settings/src/main/scala/sbt/Structure.scala @@ -534,7 +534,7 @@ object Scoped: .apply(deps => nop.dependsOn(deps: _*)) } - sealed abstract class RichTaskables[K[L[x]]](final val keys: K[Taskable])(using + sealed abstract class RichTaskables[K[+L[x]]](final val keys: K[Taskable])(using a: AList[K] ): diff --git a/main/src/main/scala/sbt/EvaluateTask.scala b/main/src/main/scala/sbt/EvaluateTask.scala index 8e663f8b4..e28ae2321 100644 --- a/main/src/main/scala/sbt/EvaluateTask.scala +++ b/main/src/main/scala/sbt/EvaluateTask.scala @@ -12,7 +12,6 @@ import java.util.concurrent.atomic.AtomicReference import sbt.Def.{ ScopedKey, Setting, dummyState } import sbt.Keys.{ TaskProgress => _, name => _, _ } -// import sbt.Project.richInitializeTask import sbt.ProjectExtra.* import sbt.Scope.Global import sbt.SlashSyntax0._ @@ -102,7 +101,7 @@ object TaskCancellationStrategy { sealed trait EvaluateTaskConfig { def restrictions: Seq[Tags.Rule] def checkCycles: Boolean - def progressReporter: ExecuteProgress[Task] + def progressReporter: ExecuteProgress def cancelStrategy: TaskCancellationStrategy /** If true, we force a finalizer/gc run (or two) after task execution completes when needed. */ @@ -118,7 +117,7 @@ object EvaluateTaskConfig { def apply( restrictions: Seq[Tags.Rule], checkCycles: Boolean, - progressReporter: ExecuteProgress[Task], + progressReporter: ExecuteProgress, cancelStrategy: TaskCancellationStrategy, forceGarbageCollection: Boolean, minForcegcInterval: Duration @@ -135,7 +134,7 @@ object EvaluateTaskConfig { private[this] case class DefaultEvaluateTaskConfig( restrictions: Seq[Tags.Rule], checkCycles: Boolean, - progressReporter: ExecuteProgress[Task], + progressReporter: ExecuteProgress, cancelStrategy: TaskCancellationStrategy, forceGarbageCollection: Boolean, minForcegcInterval: Duration @@ -168,7 +167,7 @@ object EvaluateTask { @nowarn lazy private val sharedProgress = new TaskTimings(reportOnShutdown = true) - def taskTimingProgress: Option[ExecuteProgress[Task]] = + def taskTimingProgress: Option[ExecuteProgress] = if (SysProp.taskTimingsOnShutdown) Some(sharedProgress) else None @@ -194,14 +193,14 @@ object EvaluateTask { } lazy private val sharedTraceEvent = new TaskTraceEvent() - def taskTraceEvent: Option[ExecuteProgress[Task]] = + def taskTraceEvent: Option[ExecuteProgress] = if (SysProp.traces) { Some(sharedTraceEvent) } else None // sbt-pgp calls this @deprecated("No longer used", "1.3.0") - private[sbt] def defaultProgress(): ExecuteProgress[Task] = ExecuteProgress.empty[Task] + private[sbt] def defaultProgress(): ExecuteProgress = ExecuteProgress.empty val SystemProcessors = Runtime.getRuntime.availableProcessors @@ -262,26 +261,26 @@ object EvaluateTask { extracted: Extracted, structure: BuildStructure, state: State - ): ExecuteProgress[Task] = { + ): ExecuteProgress = { state .get(currentTaskProgress) .map { tp => - new ExecuteProgress[Task] { + new ExecuteProgress { val progress = tp.progress override def initial(): Unit = progress.initial() override def afterRegistered( - task: Task[Any], - allDeps: Iterable[Task[Any]], - pendingDeps: Iterable[Task[Any]] + task: TaskId[?], + allDeps: Iterable[TaskId[?]], + pendingDeps: Iterable[TaskId[?]] ): Unit = progress.afterRegistered(task, allDeps, pendingDeps) - override def afterReady(task: Task[Any]): Unit = progress.afterReady(task) - override def beforeWork(task: Task[Any]): Unit = progress.beforeWork(task) - override def afterWork[A](task: Task[A], result: Either[Task[A], Result[A]]): Unit = + override def afterReady(task: TaskId[?]): Unit = progress.afterReady(task) + override def beforeWork(task: TaskId[?]): Unit = progress.beforeWork(task) + override def afterWork[A](task: TaskId[A], result: Either[TaskId[A], Result[A]]): Unit = progress.afterWork(task, result) - override def afterCompleted[A](task: Task[A], result: Result[A]): Unit = + override def afterCompleted[A](task: TaskId[A], result: Result[A]): Unit = progress.afterCompleted(task, result) - override def afterAllCompleted(results: RMap[Task, Result]): Unit = + override def afterAllCompleted(results: RMap[TaskId, Result]): Unit = progress.afterAllCompleted(results) override def stop(): Unit = {} } @@ -298,9 +297,9 @@ object EvaluateTask { new TaskTimings(reportOnShutdown = false, state.globalLogging.full) :: Nil else Nil) reporters match { - case xs if xs.isEmpty => ExecuteProgress.empty[Task] + case xs if xs.isEmpty => ExecuteProgress.empty case xs if xs.size == 1 => xs.head - case xs => ExecuteProgress.aggregate[Task](xs) + case xs => ExecuteProgress.aggregate(xs) } } } @@ -439,7 +438,7 @@ object EvaluateTask { state: State, streams: Streams, ref: ProjectRef - ): Option[(Task[T], NodeView[Task])] = { + ): Option[(Task[T], NodeView)] = { val thisScope = Load.projectScope(ref) val resolvedScope = Scope.replaceThis(thisScope)(taskKey.scope) for (t <- structure.data.get(resolvedScope, taskKey.key)) @@ -450,7 +449,7 @@ object EvaluateTask { streams: Streams, roots: Seq[ScopedKey[_]], dummies: DummyTaskMap = DummyTaskMap(Nil) - ): NodeView[Task] = + ): NodeView = Transform( (dummyRoots, roots) :: (Def.dummyStreamsManager, streams) :: (dummyState, state) :: dummies ) @@ -471,24 +470,21 @@ object EvaluateTask { root: Task[T], state: State, streams: Streams, - triggers: Triggers[Task], + triggers: Triggers, config: EvaluateTaskConfig - )(using taskToNode: NodeView[Task]): (State, Result[T]) = { + )(using taskToNode: NodeView): (State, Result[T]) = { import ConcurrentRestrictions.{ cancellableCompletionService, tagged, tagsKey } val log = state.log log.debug( s"Running task... Cancel: ${config.cancelStrategy}, check cycles: ${config.checkCycles}, forcegc: ${config.forceGarbageCollection}" ) - def tagMap(t: Task[_]): Tags.TagMap = - t.info.get(tagsKey).getOrElse(Map.empty) - val tags = - tagged[Task[Any]](tagMap, Tags.predicate(config.restrictions)) + val tags = tagged(Tags.predicate(config.restrictions)) val (service, shutdownThreads) = - cancellableCompletionService[Task[Any], Completed]( + cancellableCompletionService( tags, (s: String) => log.warn(s), - (t: Task[_]) => tagMap(t).contains(Tags.Sentinel) + (t: TaskId[?]) => t.tags.contains(Tags.Sentinel) ) def shutdownImpl(force: Boolean): Unit = { @@ -504,18 +500,18 @@ object EvaluateTask { def shutdown(): Unit = shutdownImpl(false) // propagate the defining key for reporting the origin def overwriteNode(i: Incomplete): Boolean = i.node match { - case Some(t: Task[_]) => transformNode(t).isEmpty + case Some(t: Task[?]) => transformNode(t).isEmpty case _ => true } def run() = { - val x = new Execute[Task]( + val x = new Execute( Execute.config(config.checkCycles, overwriteNode), triggers, config.progressReporter ) val (newState, result) = try { - given strategy: x.Strategy = service + given strategy: CompletionService = service val results = x.runKeep(root) storeValuesForPrevious(results, state, streams) applyResults(results, state, root) @@ -548,7 +544,7 @@ object EvaluateTask { } private[this] def storeValuesForPrevious( - results: RMap[Task, Result], + results: RMap[TaskId, Result], state: State, streams: Streams ): Unit = @@ -556,13 +552,13 @@ object EvaluateTask { Previous.complete(referenced, results, streams) def applyResults[T]( - results: RMap[Task, Result], + results: RMap[TaskId, Result], state: State, root: Task[T] ): (State, Result[T]) = { (stateTransform(results)(state), results(root)) } - def stateTransform(results: RMap[Task, Result]): State => State = + def stateTransform(results: RMap[TaskId, Result]): State => State = Function.chain( results.toTypedSeq flatMap { case results.TPair(_, Result.Value(KeyValue(_, st: StateTransform))) => Some(st.transform) @@ -577,27 +573,25 @@ object EvaluateTask { Incomplete.transformBU(i)(convertCyclicInc andThen taskToKey andThen liftAnonymous) } def taskToKey: Incomplete => Incomplete = { - case in @ Incomplete(Some(node: Task[_]), _, _, _, _) => in.copy(node = transformNode(node)) + case in @ Incomplete(Some(node: Task[?]), _, _, _, _) => in.copy(node = transformNode(node)) case i => i } - type AnyCyclic = Execute[({ type A[_] <: AnyRef })#A]#CyclicException[_] - def convertCyclicInc: Incomplete => Incomplete = { case in @ Incomplete( _, _, _, _, - Some(c: Execute[({ type A[_] <: AnyRef })#A @unchecked]#CyclicException[_]) + Some(c: Execute#CyclicException) ) => in.copy(directCause = Some(new RuntimeException(convertCyclic(c)))) case i => i } - def convertCyclic(c: AnyCyclic): String = + def convertCyclic(c: Execute#CyclicException): String = (c.caller, c.target) match { - case (caller: Task[_], target: Task[_]) => + case (caller: Task[?], target: Task[?]) => c.toString + (if (caller eq target) "(task: " + name(caller) + ")" else "(caller: " + name(caller) + ", target: " + name(target) + ")") case _ => c.toString diff --git a/main/src/main/scala/sbt/Keys.scala b/main/src/main/scala/sbt/Keys.scala index c73687946..cb7658737 100644 --- a/main/src/main/scala/sbt/Keys.scala +++ b/main/src/main/scala/sbt/Keys.scala @@ -590,9 +590,9 @@ object Keys { val state = Def.stateKey val streamsManager = Def.streamsManagerKey // wrapper to work around SI-2915 - final class TaskProgress(val progress: ExecuteProgress[Task]) + final class TaskProgress(val progress: ExecuteProgress) object TaskProgress { - def apply(progress: ExecuteProgress[Task]): TaskProgress = new TaskProgress(progress) + def apply(progress: ExecuteProgress): TaskProgress = new TaskProgress(progress) } private[sbt] val currentTaskProgress = AttributeKey[TaskProgress]("current-task-progress") private[sbt] val taskProgress = AttributeKey[sbt.internal.TaskProgress]("active-task-progress") diff --git a/main/src/main/scala/sbt/internal/AbstractTaskProgress.scala b/main/src/main/scala/sbt/internal/AbstractTaskProgress.scala index a8d42fe50..15a4bdb4e 100644 --- a/main/src/main/scala/sbt/internal/AbstractTaskProgress.scala +++ b/main/src/main/scala/sbt/internal/AbstractTaskProgress.scala @@ -10,18 +10,18 @@ package internal import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters.* import scala.collection.mutable import scala.collection.immutable.VectorBuilder import scala.concurrent.duration._ -private[sbt] abstract class AbstractTaskExecuteProgress extends ExecuteProgress[Task] { +private[sbt] abstract class AbstractTaskExecuteProgress extends ExecuteProgress { import AbstractTaskExecuteProgress.Timer private[this] val showScopedKey = Def.showShortKey(None) - private[this] val anonOwners = new ConcurrentHashMap[Task[_], Task[_]] - private[this] val calledBy = new ConcurrentHashMap[Task[_], Task[_]] - private[this] val timings = new ConcurrentHashMap[Task[_], Timer] + private[this] val anonOwners = new ConcurrentHashMap[TaskId[_], TaskId[_]] + private[this] val calledBy = new ConcurrentHashMap[TaskId[_], TaskId[_]] + private[this] val timings = new ConcurrentHashMap[TaskId[_], Timer] private[sbt] def timingsByName: mutable.Map[String, AtomicLong] = { val result = new ConcurrentHashMap[String, AtomicLong] timings.forEach { (task, timing) => @@ -34,18 +34,18 @@ private[sbt] abstract class AbstractTaskExecuteProgress extends ExecuteProgress[ result.asScala } private[sbt] def anyTimings = !timings.isEmpty - def currentTimings: Iterator[(Task[_], Timer)] = timings.asScala.iterator + def currentTimings: Iterator[(TaskId[_], Timer)] = timings.asScala.iterator - private[internal] def exceededThreshold(task: Task[_], threshold: FiniteDuration): Boolean = + private[internal] def exceededThreshold(task: TaskId[_], threshold: FiniteDuration): Boolean = timings.get(task) match { case null => false case t => t.durationMicros > threshold.toMicros } private[internal] def timings( - tasks: java.util.Set[Task[_]], + tasks: java.util.Set[TaskId[_]], thresholdMicros: Long - ): Vector[(Task[_], Long)] = { - val result = new VectorBuilder[(Task[_], Long)] + ): Vector[(TaskId[_], Long)] = { + val result = new VectorBuilder[(TaskId[_], Long)] val now = System.nanoTime tasks.forEach { t => timings.get(t) match { @@ -60,7 +60,7 @@ private[sbt] abstract class AbstractTaskExecuteProgress extends ExecuteProgress[ result.result() } def activeTasks(now: Long) = { - val result = new VectorBuilder[(Task[_], FiniteDuration)] + val result = new VectorBuilder[(TaskId[_], FiniteDuration)] timings.forEach { (task, timing) => if (timing.isActive) result += task -> (now - timing.startNanos).nanos } @@ -68,25 +68,26 @@ private[sbt] abstract class AbstractTaskExecuteProgress extends ExecuteProgress[ } override def afterRegistered( - task: Task[Any], - allDeps: Iterable[Task[Any]], - pendingDeps: Iterable[Task[Any]] + task: TaskId[?], + allDeps: Iterable[TaskId[?]], + pendingDeps: Iterable[TaskId[?]] ): Unit = { // we need this to infer anonymous task names - pendingDeps foreach { t => - if (TaskName.transformNode(t).isEmpty) { - anonOwners.put(t, task) + pendingDeps + .filter { + case t: Task[?] => TaskName.transformNode(t).isEmpty + case _ => true } - } + .foreach(anonOwners.put(_, task)) } - override def beforeWork(task: Task[Any]): Unit = { + override def beforeWork(task: TaskId[?]): Unit = { timings.put(task, new Timer) () } protected def clearTimings: Boolean = false - override def afterWork[A](task: Task[A], result: Either[Task[A], Result[A]]): Unit = { + override def afterWork[A](task: TaskId[A], result: Either[TaskId[A], Result[A]]): Unit = { if (clearTimings) timings.remove(task) else timings.get(task) match { @@ -100,21 +101,23 @@ private[sbt] abstract class AbstractTaskExecuteProgress extends ExecuteProgress[ } } - private[this] val taskNameCache = new ConcurrentHashMap[Task[_], String] - protected def taskName(t: Task[_]): String = taskNameCache.get(t) match { + private[this] val taskNameCache = new ConcurrentHashMap[TaskId[_], String] + protected def taskName(t: TaskId[_]): String = taskNameCache.get(t) match { case null => val name = taskName0(t) taskNameCache.putIfAbsent(t, name) name case name => name } - private[this] def taskName0(t: Task[_]): String = { + private[this] def taskName0(t: TaskId[_]): String = { def definedName(node: Task[_]): Option[String] = - node.info.name orElse TaskName.transformNode(node).map(showScopedKey.show) + node.info.name.orElse(TaskName.transformNode(node).map(showScopedKey.show)) def inferredName(t: Task[_]): Option[String] = nameDelegate(t) map taskName - def nameDelegate(t: Task[_]): Option[Task[_]] = - Option(anonOwners.get(t)) orElse Option(calledBy.get(t)) - definedName(t) orElse inferredName(t) getOrElse TaskName.anonymousName(t) + def nameDelegate(t: Task[_]): Option[TaskId[_]] = + Option(anonOwners.get(t)).orElse(Option(calledBy.get(t))) + t match + case t: Task[?] => definedName(t).orElse(inferredName(t)).getOrElse(TaskName.anonymousName(t)) + case _ => TaskName.anonymousName(t) } } diff --git a/main/src/main/scala/sbt/internal/BuildStructure.scala b/main/src/main/scala/sbt/internal/BuildStructure.scala index 3641830fe..6aa878cb6 100644 --- a/main/src/main/scala/sbt/internal/BuildStructure.scala +++ b/main/src/main/scala/sbt/internal/BuildStructure.scala @@ -71,7 +71,7 @@ final class BuildStructure( final class StructureIndex( val keyMap: Map[String, AttributeKey[_]], val taskToKey: Map[Task[_], ScopedKey[Task[_]]], - val triggers: Triggers[Task], + val triggers: Triggers, val keyIndex: KeyIndex, val aggregateKeyIndex: KeyIndex, ) diff --git a/main/src/main/scala/sbt/internal/TaskName.scala b/main/src/main/scala/sbt/internal/TaskName.scala index 35b126e3c..ce86c80d3 100644 --- a/main/src/main/scala/sbt/internal/TaskName.scala +++ b/main/src/main/scala/sbt/internal/TaskName.scala @@ -12,11 +12,11 @@ import Def.{ displayFull, ScopedKey } import Keys.taskDefinitionKey private[sbt] object TaskName { - def name(node: Task[_]): String = definedName(node) getOrElse anonymousName(node) + def name(node: Task[_]): String = definedName(node).getOrElse(anonymousName(node)) def definedName(node: Task[_]): Option[String] = - node.info.name orElse transformNode(node).map(displayFull) - def anonymousName(node: Task[_]): String = + node.info.name.orElse(transformNode(node).map(displayFull)) + def anonymousName(node: TaskId[_]): String = "" def transformNode(node: Task[_]): Option[ScopedKey[_]] = - node.info.attributes get taskDefinitionKey + node.info.attributes.get(taskDefinitionKey) } diff --git a/main/src/main/scala/sbt/internal/TaskProgress.scala b/main/src/main/scala/sbt/internal/TaskProgress.scala index 38368c2c7..c446f28f2 100644 --- a/main/src/main/scala/sbt/internal/TaskProgress.scala +++ b/main/src/main/scala/sbt/internal/TaskProgress.scala @@ -26,11 +26,11 @@ private[sbt] class TaskProgress( threshold: FiniteDuration, logger: Logger ) extends AbstractTaskExecuteProgress - with ExecuteProgress[Task] + with ExecuteProgress with AutoCloseable { private[this] val lastTaskCount = new AtomicInteger(0) private[this] val reportLoop = new AtomicReference[AutoCloseable] - private[this] val active = new ConcurrentHashMap[Task[_], AutoCloseable] + private[this] val active = new ConcurrentHashMap[TaskId[_], AutoCloseable] private[this] val nextReport = new AtomicReference(Deadline.now) private[this] val scheduler = Executors.newSingleThreadScheduledExecutor(r => new Thread(r, "sbt-progress-report-scheduler")) @@ -90,7 +90,7 @@ private[sbt] class TaskProgress( } Util.ignoreResult(pending.add(executor.submit(runnable))) } - override def beforeWork(task: Task[Any]): Unit = + override def beforeWork(task: TaskId[?]): Unit = if (!closed.get) { super.beforeWork(task) reportLoop.get match { @@ -108,7 +108,7 @@ private[sbt] class TaskProgress( logger.debug(s"called beforeWork for ${taskName(task)} after task progress was closed") } - override def afterReady(task: Task[Any]): Unit = + override def afterReady(task: TaskId[?]): Unit = if (!closed.get) { try { Util.ignoreResult(executor.submit((() => { @@ -124,7 +124,7 @@ private[sbt] class TaskProgress( } override def stop(): Unit = {} - override def afterCompleted[A](task: Task[A], result: Result[A]): Unit = + override def afterCompleted[A](task: TaskId[A], result: Result[A]): Unit = active.remove(task) match { case null => case a => @@ -132,7 +132,7 @@ private[sbt] class TaskProgress( if (exceededThreshold(task, threshold)) report() } - override def afterAllCompleted(results: RMap[Task, Result]): Unit = { + override def afterAllCompleted(results: RMap[TaskId, Result]): Unit = { reportLoop.getAndSet(null) match { case null => case l => l.close() @@ -164,7 +164,7 @@ private[sbt] class TaskProgress( val ltc = lastTaskCount.get if (currentTasks.nonEmpty || ltc != 0) { val currentTasksCount = currentTasks.size - def event(tasks: Vector[(Task[_], Long)]): ProgressEvent = { + def event(tasks: Vector[(TaskId[_], Long)]): ProgressEvent = { if (tasks.nonEmpty) nextReport.set(Deadline.now + sleepDuration) val toWrite = tasks.sortBy(_._2) val distinct = new java.util.LinkedHashMap[String, ProgressItem] @@ -187,7 +187,7 @@ private[sbt] class TaskProgress( } } - private[this] def getShortName(task: Task[_]): String = { + private[this] def getShortName(task: TaskId[_]): String = { val name = taskName(task) name.lastIndexOf('/') match { case -1 => name @@ -199,12 +199,13 @@ private[sbt] class TaskProgress( } private[this] def filter( - tasks: Vector[(Task[_], Long)] - ): (Vector[(Task[_], Long)], Boolean) = { - tasks.foldLeft((Vector.empty[(Task[_], Long)], false)) { case ((tasks, skip), pair @ (t, _)) => - val shortName = getShortName(t) - val newSkip = skip || skipReportTasks.contains(shortName) - if (hiddenTasks.contains(shortName)) (tasks, newSkip) else (tasks :+ pair, newSkip) + tasks: Vector[(TaskId[_], Long)] + ): (Vector[(TaskId[_], Long)], Boolean) = { + tasks.foldLeft((Vector.empty[(TaskId[_], Long)], false)) { + case ((tasks, skip), pair @ (t, _)) => + val shortName = getShortName(t) + val newSkip = skip || skipReportTasks.contains(shortName) + if (hiddenTasks.contains(shortName)) (tasks, newSkip) else (tasks :+ pair, newSkip) } } } diff --git a/main/src/main/scala/sbt/internal/TaskTimings.scala b/main/src/main/scala/sbt/internal/TaskTimings.scala index 8f90807e5..9f39f0410 100644 --- a/main/src/main/scala/sbt/internal/TaskTimings.scala +++ b/main/src/main/scala/sbt/internal/TaskTimings.scala @@ -23,7 +23,7 @@ import sbt.util.{ Level, Logger } */ private[sbt] final class TaskTimings(reportOnShutdown: Boolean, logger: Logger) extends AbstractTaskExecuteProgress - with ExecuteProgress[Task] { + with ExecuteProgress { @deprecated("Use the constructor that takes an sbt.util.Logger parameter.", "1.3.3") def this(reportOnShutdown: Boolean) = this( @@ -50,9 +50,9 @@ private[sbt] final class TaskTimings(reportOnShutdown: Boolean, logger: Logger) start = System.nanoTime } - override def afterReady(task: Task[Any]): Unit = () - override def afterCompleted[T](task: Task[T], result: Result[T]): Unit = () - override def afterAllCompleted(results: RMap[Task, Result]): Unit = + override def afterReady(task: TaskId[?]): Unit = () + override def afterCompleted[T](task: TaskId[T], result: Result[T]): Unit = () + override def afterAllCompleted(results: RMap[TaskId, Result]): Unit = if (!reportOnShutdown) { report() } diff --git a/main/src/main/scala/sbt/internal/TaskTraceEvent.scala b/main/src/main/scala/sbt/internal/TaskTraceEvent.scala index a7448681e..ddb27ab70 100644 --- a/main/src/main/scala/sbt/internal/TaskTraceEvent.scala +++ b/main/src/main/scala/sbt/internal/TaskTraceEvent.scala @@ -21,17 +21,15 @@ import sjsonnew.support.scalajson.unsafe.CompactPrinter * as Chrome Trace Event Format. * This class is activated by adding -Dsbt.traces=true to the JVM options. */ -private[sbt] final class TaskTraceEvent - extends AbstractTaskExecuteProgress - with ExecuteProgress[Task] { +private[sbt] final class TaskTraceEvent extends AbstractTaskExecuteProgress with ExecuteProgress { import AbstractTaskExecuteProgress.Timer private[this] var start = 0L private[this] val console = ConsoleOut.systemOut override def initial(): Unit = () - override def afterReady(task: Task[Any]): Unit = () - override def afterCompleted[T](task: Task[T], result: Result[T]): Unit = () - override def afterAllCompleted(results: RMap[Task, Result]): Unit = () + override def afterReady(task: TaskId[?]): Unit = () + override def afterCompleted[T](task: TaskId[T], result: Result[T]): Unit = () + override def afterAllCompleted(results: RMap[TaskId, Result]): Unit = () override def stop(): Unit = () start = System.nanoTime diff --git a/main/src/main/scala/sbt/internal/server/BspCompileTask.scala b/main/src/main/scala/sbt/internal/server/BspCompileTask.scala index 57abb770a..0e2e751d4 100644 --- a/main/src/main/scala/sbt/internal/server/BspCompileTask.scala +++ b/main/src/main/scala/sbt/internal/server/BspCompileTask.scala @@ -66,7 +66,7 @@ object BspCompileTask { case class BspCompileTask private ( targetId: BuildTargetIdentifier, targetName: String, - id: TaskId, + id: sbt.internal.bsp.TaskId, startTimeMillis: Long ) { import sbt.internal.bsp.codec.JsonProtocol._ diff --git a/tasks-standard/src/main/scala/sbt/Task.scala b/tasks-standard/src/main/scala/sbt/Task.scala index 0903f807d..f56c6965e 100644 --- a/tasks-standard/src/main/scala/sbt/Task.scala +++ b/tasks-standard/src/main/scala/sbt/Task.scala @@ -16,7 +16,7 @@ import sbt.util.Monad /** * Combines metadata `info` and a computation `work` to define a task. */ -final case class Task[A](info: Info[A], work: Action[A]): +final case class Task[A](info: Info[A], work: Action[A]) extends TaskId[A]: override def toString = info.name getOrElse ("Task(" + info + ")") override def hashCode = info.hashCode @@ -28,7 +28,10 @@ final case class Task[A](info: Info[A], work: Action[A]): withInfo(info = nextInfo) } - def tags: TagMap = info get tagsKey getOrElse TagMap.empty + def tags: TagMap = info.get(tagsKey).getOrElse(TagMap.empty) + def name: Option[String] = info.name + + def attributes: AttributeMap = info.attributes private[sbt] def withInfo(info: Info[A]): Task[A] = Task(info = info, work = this.work) diff --git a/tasks-standard/src/main/scala/sbt/internal/Action.scala b/tasks-standard/src/main/scala/sbt/internal/Action.scala index 59220889f..bd041aa38 100644 --- a/tasks-standard/src/main/scala/sbt/internal/Action.scala +++ b/tasks-standard/src/main/scala/sbt/internal/Action.scala @@ -27,11 +27,11 @@ enum Action[A]: // private[sbt] def mapTask(f: [A1] => Task[A1] => Task[A1]) = this /** Applies a function to the result of evaluating a heterogeneous list of other tasks. */ - case Mapped[A, K[F[_]]](in: K[Task], f: K[Result] => A, alist: AList[K]) extends Action[A] + case Mapped[A, K[+F[_]]](in: K[Task], f: K[Result] => A, alist: AList[K]) extends Action[A] // private[sbt] def mapTask(g: Task ~> Task) = Mapped[A, K](alist.transform(in, g), f, alist) /** Computes another task to evaluate based on results from evaluating other tasks. */ - case FlatMapped[A, K[F[_]]]( + case FlatMapped[A, K[+F[_]]]( in: K[Task], f: K[Result] => Task[A], alist: AList[K], @@ -63,6 +63,7 @@ enum Action[A]: end Action object Action: + import sbt.std.TaskExtra.* /** * Encode this computation as a flatMap. @@ -72,12 +73,12 @@ object Action: ): Action.FlatMapped[A2, [F[_]] =>> Tuple1[F[Either[A1, A2]]]] = val alist = AList.tuple[Tuple1[Either[A1, A2]]] val f: Either[A1, A2] => Task[A2] = { - case Right(b) => std.TaskExtra.task(b) - case Left(a) => std.TaskExtra.singleInputTask(s.fin).map(_(a)) + case Right(b) => task(b) + case Left(a) => singleInputTask(s.fin).map(_(a)) } Action.FlatMapped[A2, [F[_]] =>> Tuple1[F[Either[A1, A2]]]]( Tuple1(s.fab), - { case Tuple1(r) => (f compose std.TaskExtra.successM)(r) }, + { case Tuple1(r) => f.compose(successM)(r) }, alist, ) end Action diff --git a/tasks-standard/src/main/scala/sbt/std/TaskExtra.scala b/tasks-standard/src/main/scala/sbt/std/TaskExtra.scala index 6567d5a38..b42da208b 100644 --- a/tasks-standard/src/main/scala/sbt/std/TaskExtra.scala +++ b/tasks-standard/src/main/scala/sbt/std/TaskExtra.scala @@ -144,7 +144,7 @@ trait TaskExtra extends TaskExtra0 { : Conversion[(Task[A1], Task[A2]), MultiInTask[[F[_]] =>> Tuple.Map[(A1, A2), F]]] = multT2Task(_) - final implicit def multInputTask[K[F[_]]: AList](tasks: K[Task]): MultiInTask[K] = + final implicit def multInputTask[K[+F[_]]: AList](tasks: K[Task]): MultiInTask[K] = new MultiInTask[K]: override def flatMapN[A](f: K[Id] => Task[A]): Task[A] = Task(Info(), Action.FlatMapped[A, K](tasks, f compose allM, AList[K])) diff --git a/tasks-standard/src/main/scala/sbt/std/Transform.scala b/tasks-standard/src/main/scala/sbt/std/Transform.scala index 77dfd2f9b..238613e57 100644 --- a/tasks-standard/src/main/scala/sbt/std/Transform.scala +++ b/tasks-standard/src/main/scala/sbt/std/Transform.scala @@ -30,8 +30,8 @@ object Transform: final class TaskAndValue[T](val task: Task[T], val value: T) - def dummyMap(dummyMap: DummyTaskMap): Task ~>| Task = { - val pmap = new DelegatingPMap[Task, Task](new collection.mutable.ListMap) + def dummyMap(dummyMap: DummyTaskMap): TaskId ~>| Task = { + val pmap = new DelegatingPMap[TaskId, Task](new collection.mutable.ListMap) def add[T](dummy: TaskAndValue[T]): Unit = { pmap(dummy.task) = fromDummyStrict(dummy.task, dummy.value) } @@ -40,15 +40,15 @@ object Transform: } /** Applies `map`, returning the result if defined or returning the input unchanged otherwise. */ - implicit def getOrId(map: Task ~>| Task): [A] => Task[A] => Task[A] = - [A] => (in: Task[A]) => map(in).getOrElse(in) + implicit def getOrId(map: TaskId ~>| Task): [A] => TaskId[A] => Task[A] = + [A] => (in: TaskId[A]) => map(in).getOrElse(in.asInstanceOf) def apply(dummies: DummyTaskMap) = taskToNode(getOrId(dummyMap(dummies))) - def taskToNode(pre: [A] => Task[A] => Task[A]): NodeView[Task] = - new NodeView[Task]: + def taskToNode(pre: [A] => TaskId[A] => Task[A]): NodeView = + new NodeView: import Action.* - def apply[T](t: Task[T]): Node[Task, T] = pre(t).work match + def apply[T](t: TaskId[T]): Node[T] = pre(t).work match case Pure(eval, _) => uniform(Nil)(_ => Right(eval())) case m: Mapped[a, k] => toNode[a, k](m.in)(right[a] compose m.f)(m.alist) case m: FlatMapped[a, k] => @@ -61,19 +61,19 @@ object Transform: case DependsOn(in, deps) => uniform(existToAny(deps))(const(Left(in)) compose all) case Join(in, f) => uniform(in)(f) - def inline1[T](t: Task[T]): Option[() => T] = t.work match - case Action.Pure(eval, true) => Some(eval) - case _ => None + def inline1[T](t: TaskId[T]): Option[() => T] = t match + case Task(_, Action.Pure(eval, true)) => Some(eval) + case _ => None def uniform[A1, D](tasks: Seq[Task[D]])( f: Seq[Result[D]] => Either[Task[A1], A1] - ): Node[Task, A1] = + ): Node[A1] = toNode[A1, [F[_]] =>> List[F[D]]](tasks.toList)(f)(AList.list[D]) def toNode[A1, K1[F[_]]: AList]( - inputs: K1[Task] - )(f: K1[Result] => Either[Task[A1], A1]): Node[Task, A1] = - new Node[Task, A1]: + inputs: K1[TaskId] + )(f: K1[Result] => Either[Task[A1], A1]): Node[A1] = + new Node[A1]: type K[F[_]] = K1[F] val in = inputs lazy val alist: AList[K] = AList[K] diff --git a/tasks-standard/src/test/scala/TaskGen.scala b/tasks-standard/src/test/scala/TaskGen.scala index 1e0f40d54..ad4cdcbdf 100644 --- a/tasks-standard/src/test/scala/TaskGen.scala +++ b/tasks-standard/src/test/scala/TaskGen.scala @@ -22,15 +22,15 @@ object TaskGen extends std.TaskExtra { val TaskListGen = MaxTasksGen.flatMap(size => Gen.listOfN(size, Arbitrary.arbInt.arbitrary)) def run[T](root: Task[T], checkCycles: Boolean, maxWorkers: Int): Result[T] = { - val (service, shutdown) = CompletionService[Task[_], Completed](maxWorkers) + val (service, shutdown) = CompletionService(maxWorkers) val dummies = std.Transform.DummyTaskMap(Nil) - val x = new Execute[Task]( + val x = new Execute( Execute.config(checkCycles), Execute.noTriggers, - ExecuteProgress.empty[Task] + ExecuteProgress.empty )(using std.Transform(dummies)) try { - x.run(root)(using service.asInstanceOf) + x.run(root)(using service) } finally { shutdown() } diff --git a/tasks-standard/src/test/scala/TaskRunnerCircular.scala b/tasks-standard/src/test/scala/TaskRunnerCircular.scala index 532d45173..a08c91b1d 100644 --- a/tasks-standard/src/test/scala/TaskRunnerCircular.scala +++ b/tasks-standard/src/test/scala/TaskRunnerCircular.scala @@ -50,5 +50,5 @@ object TaskRunnerCircularTest extends Properties("TaskRunner Circular") { def cyclic(i: Incomplete) = Incomplete .allExceptions(i) - .exists(_.isInstanceOf[Execute[({ type A[_] <: AnyRef })#A @unchecked]#CyclicException[_]]) + .exists(_.isInstanceOf[Execute#CyclicException]) } diff --git a/tasks-standard/src/test/scala/TaskSerial.scala b/tasks-standard/src/test/scala/TaskSerial.scala index f0ad85af4..7e6208b09 100644 --- a/tasks-standard/src/test/scala/TaskSerial.scala +++ b/tasks-standard/src/test/scala/TaskSerial.scala @@ -16,7 +16,7 @@ import TaskGen.MaxWorkers import org.scalacheck._ import Prop.forAll import Transform.taskToNode -import ConcurrentRestrictions.{ completionService, limitTotal, tagged => tagged0, TagMap } +import ConcurrentRestrictions.{ completionService, limitTotal, tagged } import java.util.concurrent.{ CountDownLatch, TimeUnit } @@ -52,7 +52,7 @@ object TaskSerial extends Properties("task serial") { def checkArbitrary( size: Int, - restrictions: ConcurrentRestrictions[Task[_]], + restrictions: ConcurrentRestrictions, shouldSucceed: Boolean ) = { val latch = task { new CountDownLatch(size) } @@ -70,8 +70,7 @@ object TaskSerial extends Properties("task serial") { "Some tasks were unschedulable: verify this is an actual failure by extending the timeout to several seconds." def scheduledMsg = "All tasks were unexpectedly scheduled." - def tagged(f: TagMap => Boolean) = tagged0[Task[_]](_.tags, f) - def evalRestricted[T](t: Task[T])(restrictions: ConcurrentRestrictions[Task[_]]): T = + def evalRestricted[T](t: Task[T])(restrictions: ConcurrentRestrictions): T = tryRun[T](t, checkCycles, restrictions) } @@ -79,18 +78,18 @@ object TaskTest { def run[T]( root: Task[T], checkCycles: Boolean, - restrictions: ConcurrentRestrictions[Task[_]] + restrictions: ConcurrentRestrictions ): Result[T] = { val (service, shutdown) = - completionService[Task[_], Completed](restrictions, (x: String) => System.err.println(x)) + completionService(restrictions, (x: String) => System.err.println(x)) - val x = new Execute[Task]( + val x = new Execute( Execute.config(checkCycles), Execute.noTriggers, - ExecuteProgress.empty[Task] - )(using taskToNode(idK[Task])) + ExecuteProgress.empty + )(using taskToNode([A] => (id: TaskId[A]) => id.asInstanceOf)) try { - x.run(root)(using service.asInstanceOf) + x.run(root)(using service) } finally { shutdown() } @@ -98,7 +97,7 @@ object TaskTest { def tryRun[T]( root: Task[T], checkCycles: Boolean, - restrictions: ConcurrentRestrictions[Task[_]] + restrictions: ConcurrentRestrictions ): T = run(root, checkCycles, restrictions) match { case Result.Value(v) => v diff --git a/tasks/src/main/scala/sbt/CompletionService.scala b/tasks/src/main/scala/sbt/CompletionService.scala index 672edd725..64012b02b 100644 --- a/tasks/src/main/scala/sbt/CompletionService.scala +++ b/tasks/src/main/scala/sbt/CompletionService.scala @@ -7,19 +7,19 @@ package sbt -trait CompletionService[A, R]: +trait CompletionService: /** * Submits a work node A with work that returns R. In Execute this is used for tasks returning * sbt.Completed. */ - def submit(node: A, work: () => R): Unit + def submit(node: TaskId[?], work: () => Completed): Unit /** * Retrieves and removes the result from the next completed task, waiting if none are yet present. * In Execute this is used for tasks returning sbt.Completed. */ - def take(): R + def take(): Completed end CompletionService import java.util.concurrent.atomic.AtomicInteger @@ -35,35 +35,40 @@ import java.util.concurrent.{ object CompletionService { val poolID = new AtomicInteger(1) - def apply[A, T](poolSize: Int): (CompletionService[A, T], () => Unit) = { + def apply(poolSize: Int): (CompletionService, () => Unit) = { val i = new AtomicInteger(1) val id = poolID.getAndIncrement() val pool = Executors.newFixedThreadPool( poolSize, (r: Runnable) => new Thread(r, s"sbt-completion-thread-$id-${i.getAndIncrement}") ) - (apply[A, T](pool), () => { pool.shutdownNow(); () }) + (apply(pool), () => { pool.shutdownNow(); () }) } - def apply[A, T](x: Executor): CompletionService[A, T] = - apply(new ExecutorCompletionService[T](x)) + def apply(x: Executor): CompletionService = + apply(new ExecutorCompletionService[Completed](x)) - def apply[A, T](completion: JCompletionService[T]): CompletionService[A, T] = - new CompletionService[A, T] { - def submit(node: A, work: () => T) = { CompletionService.submit(work, completion); () } + def apply(completion: JCompletionService[Completed]): CompletionService = + new CompletionService { + def submit(node: TaskId[?], work: () => Completed) = { + CompletionService.submit(work, completion); () + } def take() = completion.take().get() } - def submit[T](work: () => T, completion: JCompletionService[T]): () => T = { - val future = submitFuture[T](work, completion) + def submit(work: () => Completed, completion: JCompletionService[Completed]): () => Completed = { + val future = submitFuture(work, completion) () => future.get } - private[sbt] def submitFuture[A](work: () => A, completion: JCompletionService[A]): JFuture[A] = { + private[sbt] def submitFuture( + work: () => Completed, + completion: JCompletionService[Completed] + ): JFuture[Completed] = { val future = try completion.submit { - new Callable[A] { + new Callable[Completed] { def call = try { work() @@ -79,9 +84,9 @@ object CompletionService { } future } - def manage[A, T]( - service: CompletionService[A, T] - )(setup: A => Unit, cleanup: A => Unit): CompletionService[A, T] = + def manage( + service: CompletionService + )(setup: TaskId[?] => Unit, cleanup: TaskId[?] => Unit): CompletionService = wrap(service) { (node, work) => () => setup(node) try { @@ -90,11 +95,11 @@ object CompletionService { cleanup(node) } } - def wrap[A, T]( - service: CompletionService[A, T] - )(w: (A, () => T) => (() => T)): CompletionService[A, T] = - new CompletionService[A, T] { - def submit(node: A, work: () => T) = service.submit(node, w(node, work)) + def wrap( + service: CompletionService + )(w: (TaskId[?], () => Completed) => (() => Completed)): CompletionService = + new CompletionService { + def submit(node: TaskId[?], work: () => Completed) = service.submit(node, w(node, work)) def take() = service.take() } } diff --git a/tasks/src/main/scala/sbt/ConcurrentRestrictions.scala b/tasks/src/main/scala/sbt/ConcurrentRestrictions.scala index 0b6e6fb83..2776a44a3 100644 --- a/tasks/src/main/scala/sbt/ConcurrentRestrictions.scala +++ b/tasks/src/main/scala/sbt/ConcurrentRestrictions.scala @@ -13,14 +13,12 @@ import sbt.internal.util.AttributeKey import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.{ Future => JFuture, RejectedExecutionException } import scala.collection.mutable +import scala.jdk.CollectionConverters.* /** * Describes restrictions on concurrent execution for a set of tasks. - * - * @tparam A - * the type of a task */ -trait ConcurrentRestrictions[A] { +trait ConcurrentRestrictions { /** Internal state type used to describe a set of tasks. */ type G @@ -29,10 +27,10 @@ trait ConcurrentRestrictions[A] { def empty: G /** Updates the description `g` to include a new task `a`. */ - def add(g: G, a: A): G + def add(g: G, a: TaskId[?]): G /** Updates the description `g` to remove a previously added task `a`. */ - def remove(g: G, a: A): G + def remove(g: G, a: TaskId[?]): G /** * Returns true if the tasks described by `g` are allowed to execute concurrently. The methods in @@ -54,8 +52,7 @@ import java.util.concurrent.{ Executor, Executors, ExecutorCompletionService } import annotation.tailrec object ConcurrentRestrictions { - private[this] val completionServices = new java.util.WeakHashMap[CompletionService[_, _], Boolean] - import scala.collection.JavaConverters._ + private[this] val completionServices = new java.util.WeakHashMap[CompletionService, Boolean] def cancelAll() = completionServices.keySet.asScala.toVector.foreach { case a: AutoCloseable => a.close() case _ => @@ -71,22 +68,22 @@ object ConcurrentRestrictions { * @param zero * the constant placeholder used for t */ - def unrestricted[A]: ConcurrentRestrictions[A] = - new ConcurrentRestrictions[A] { + def unrestricted: ConcurrentRestrictions = + new ConcurrentRestrictions { type G = Unit def empty = () - def add(g: G, a: A) = () - def remove(g: G, a: A) = () + def add(g: G, a: TaskId[?]) = () + def remove(g: G, a: TaskId[?]) = () def valid(g: G) = true } - def limitTotal[A](i: Int): ConcurrentRestrictions[A] = { + def limitTotal(i: Int): ConcurrentRestrictions = { assert(i >= 1, "Maximum must be at least 1 (was " + i + ")") - new ConcurrentRestrictions[A] { + new ConcurrentRestrictions { type G = Int def empty = 0 - def add(g: Int, a: A) = g + 1 - def remove(g: Int, a: A) = g - 1 + def add(g: Int, a: TaskId[?]) = g + 1 + def remove(g: Int, a: TaskId[?]) = g - 1 def valid(g: Int) = g <= i } } @@ -108,26 +105,25 @@ object ConcurrentRestrictions { /** * Implements concurrency restrictions on tasks based on Tags. - * @tparam A - * type of a task * @param get * extracts tags from a task * @param validF * defines whether a set of tasks are allowed to execute concurrently based on their merged tags */ - def tagged[A](get: A => TagMap, validF: TagMap => Boolean): ConcurrentRestrictions[A] = - new ConcurrentRestrictions[A] { + def tagged(validF: TagMap => Boolean): ConcurrentRestrictions = + new ConcurrentRestrictions { type G = TagMap def empty = Map.empty - def add(g: TagMap, a: A) = merge(g, a, get)(_ + _) - def remove(g: TagMap, a: A) = merge(g, a, get)(_ - _) + def add(g: TagMap, a: TaskId[?]) = merge(g, a)(_ + _) + def remove(g: TagMap, a: TaskId[?]) = merge(g, a)(_ - _) def valid(g: TagMap) = validF(g) } - private[this] def merge[A](m: TagMap, a: A, get: A => TagMap)(f: (Int, Int) => Int): TagMap = { - val aTags = get(a) - val base = merge(m, aTags)(f) - val un = if (aTags.isEmpty) update(base, Untagged, 1)(f) else base + private[this] def merge(m: TagMap, a: TaskId[?])( + f: (Int, Int) => Int + ): TagMap = { + val base = merge(m, a.tags)(f) + val un = if (a.tags.isEmpty) update(base, Untagged, 1)(f) else base update(un, All, 1)(f) } @@ -154,26 +150,26 @@ object ConcurrentRestrictions { * @tparam R * the type of data that will be computed by the CompletionService. */ - def completionService[A, R]( - tags: ConcurrentRestrictions[A], + def completionService( + tags: ConcurrentRestrictions, warn: String => Unit - ): (CompletionService[A, R], () => Unit) = { + ): (CompletionService, () => Unit) = { val id = poolID.getAndIncrement val i = new AtomicInteger(1) val pool = Executors.newCachedThreadPool { r => new Thread(r, s"sbt-completion-service-pool-$id-${i.getAndIncrement()}") } - val service = completionService[A, R](pool, tags, warn) + val service = completionService(pool, tags, warn) (service, () => { pool.shutdownNow(); () }) } - def completionService[A, R]( - tags: ConcurrentRestrictions[A], + def completionService( + tags: ConcurrentRestrictions, warn: String => Unit, - isSentinel: A => Boolean - ): (CompletionService[A, R], () => Unit) = { + isSentinel: TaskId[?] => Boolean + ): (CompletionService, () => Unit) = { val pool = Executors.newCachedThreadPool() - val service = completionService[A, R](pool, tags, warn, isSentinel) + val service = completionService(pool, tags, warn, isSentinel) ( service, () => { @@ -183,13 +179,13 @@ object ConcurrentRestrictions { ) } - def cancellableCompletionService[A, R]( - tags: ConcurrentRestrictions[A], + def cancellableCompletionService( + tags: ConcurrentRestrictions, warn: String => Unit, - isSentinel: A => Boolean - ): (CompletionService[A, R], Boolean => Unit) = { + isSentinel: TaskId[?] => Boolean + ): (CompletionService, Boolean => Unit) = { val pool = Executors.newCachedThreadPool() - val service = completionService[A, R](pool, tags, warn, isSentinel) + val service = completionService(pool, tags, warn, isSentinel) ( service, force => { @@ -200,12 +196,12 @@ object ConcurrentRestrictions { ) } - def completionService[A, R]( + def completionService( backing: Executor, - tags: ConcurrentRestrictions[A], + tags: ConcurrentRestrictions, warn: String => Unit - ): CompletionService[A, R] with AutoCloseable = { - completionService[A, R](backing, tags, warn, (_: A) => false) + ): CompletionService with AutoCloseable = { + completionService(backing, tags, warn, _ => false) } /** @@ -213,17 +209,17 @@ object ConcurrentRestrictions { * restrictions on concurrent task execution and using the provided Executor to manage execution * on threads. */ - def completionService[A, R]( + def completionService( backing: Executor, - tags: ConcurrentRestrictions[A], + tags: ConcurrentRestrictions, warn: String => Unit, - isSentinel: A => Boolean, - ): CompletionService[A, R] with CancelSentiels with AutoCloseable = { + isSentinel: TaskId[?] => Boolean, + ): CompletionService with CancelSentiels with AutoCloseable = { // Represents submitted work for a task. - final class Enqueue(val node: A, val work: () => R) + final class Enqueue(val node: TaskId[?], val work: () => Completed) - new CompletionService[A, R] with CancelSentiels with AutoCloseable { + new CompletionService with CancelSentiels with AutoCloseable { completionServices.put(this, true) private[this] val closed = new AtomicBoolean(false) override def close(): Unit = if (closed.compareAndSet(false, true)) { @@ -232,7 +228,7 @@ object ConcurrentRestrictions { } /** Backing service used to manage execution on threads once all constraints are satisfied. */ - private[this] val jservice = new ExecutorCompletionService[R](backing) + private[this] val jservice = new ExecutorCompletionService[Completed](backing) /** The description of the currently running tasks, used by `tags` to manage restrictions. */ private[this] var tagState = tags.empty @@ -255,7 +251,7 @@ object ConcurrentRestrictions { sentinels.clear() } - def submit(node: A, work: () => R): Unit = synchronized { + def submit(node: TaskId[?], work: () => Completed): Unit = synchronized { if (closed.get) throw new RejectedExecutionException else if (isSentinel(node)) { // skip all checks for sentinels @@ -276,7 +272,7 @@ object ConcurrentRestrictions { } () } - private[this] def submitValid(node: A, work: () => R): Unit = { + private[this] def submitValid(node: TaskId[?], work: () => Completed): Unit = { running += 1 val wrappedWork = () => try work() @@ -284,7 +280,7 @@ object ConcurrentRestrictions { CompletionService.submitFuture(wrappedWork, jservice) () } - private[this] def cleanup(node: A): Unit = synchronized { + private[this] def cleanup(node: TaskId[?]): Unit = synchronized { running -= 1 tagState = tags.remove(tagState, node) if (!tags.valid(tagState)) { @@ -320,7 +316,7 @@ object ConcurrentRestrictions { submitValid(tried) } - def take(): R = { + def take(): Completed = { if (closed.get) throw new RejectedExecutionException( "Tried to get values for a closed completion service" diff --git a/tasks/src/main/scala/sbt/Execute.scala b/tasks/src/main/scala/sbt/Execute.scala index 8bec9ba3f..4c9a39c14 100644 --- a/tasks/src/main/scala/sbt/Execute.scala +++ b/tasks/src/main/scala/sbt/Execute.scala @@ -11,23 +11,24 @@ import java.util.concurrent.ExecutionException import sbt.internal.util.ErrorHandling.wideConvert import sbt.internal.util.{ DelegatingPMap, IDSet, PMap, RMap, ~> } -import sbt.internal.util.Types._ +import sbt.internal.util.Types.* import sbt.internal.util.Util.nilSeq -import Execute._ import scala.annotation.tailrec import scala.collection.mutable -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters.* import mutable.Map import sbt.internal.util.AList private[sbt] object Execute { - def idMap[A1, A2]: Map[A1, A2] = (new java.util.IdentityHashMap[A1, A2]).asScala - def pMap[F1[_], F2[_]]: PMap[F1, F2] = new DelegatingPMap[F1, F2](idMap) + def taskMap[A]: Map[TaskId[?], A] = (new java.util.IdentityHashMap[TaskId[?], A]).asScala + def taskPMap[F[_]]: PMap[TaskId, F] = new DelegatingPMap( + (new java.util.IdentityHashMap[TaskId[Any], F[Any]]).asScala + ) private[sbt] def completed(p: => Unit): Completed = new Completed { def process(): Unit = p } - def noTriggers[F[_]] = new Triggers[F](Map.empty, Map.empty, idFun) + def noTriggers[TaskId[_]] = new Triggers(Map.empty, Map.empty, idFun) def config(checkCycles: Boolean, overwriteNode: Incomplete => Boolean = const(false)): Config = new Config(checkCycles, overwriteNode) @@ -44,33 +45,33 @@ sealed trait Completed { def process(): Unit } -private[sbt] trait NodeView[F[_]] { - def apply[A](a: F[A]): Node[F, A] - def inline1[A](a: F[A]): Option[() => A] +private[sbt] trait NodeView { + def apply[A](a: TaskId[A]): Node[A] + def inline1[A](a: TaskId[A]): Option[() => A] } -final class Triggers[F[_]]( - val runBefore: collection.Map[F[Any], Seq[F[Any]]], - val injectFor: collection.Map[F[Any], Seq[F[Any]]], - val onComplete: RMap[F, Result] => RMap[F, Result], +final class Triggers( + val runBefore: collection.Map[TaskId[?], Seq[TaskId[?]]], + val injectFor: collection.Map[TaskId[?], Seq[TaskId[?]]], + val onComplete: RMap[TaskId, Result] => RMap[TaskId, Result], ) -private[sbt] final class Execute[F[_] <: AnyRef]( - config: Config, - triggers: Triggers[F], - progress: ExecuteProgress[F] -)(using view: NodeView[F]) { - type Strategy = CompletionService[F[Any], Completed] +private[sbt] final class Execute( + config: Execute.Config, + triggers: Triggers, + progress: ExecuteProgress +)(using view: NodeView) { + import Execute.* - private[this] val forward = idMap[F[Any], IDSet[F[Any]]] - private[this] val reverse = idMap[F[Any], Iterable[F[Any]]] - private[this] val callers = pMap[F, Compose[IDSet, F]] - private[this] val state = idMap[F[Any], State] - private[this] val viewCache = pMap[F, Node[F, *]] - private[this] val results = pMap[F, Result] + private[this] val forward = taskMap[IDSet[TaskId[?]]] + private[this] val reverse = taskMap[Iterable[TaskId[?]]] + private[this] val callers = taskPMap[[X] =>> IDSet[TaskId[X]]] + private[this] val state = taskMap[State] + private[this] val viewCache = taskPMap[Node] + private[this] val results = taskPMap[Result] - private[this] val getResult: [A] => F[A] => Result[A] = [A] => - (a: F[A]) => + private[this] val getResult: [A] => TaskId[A] => Result[A] = [A] => + (a: TaskId[A]) => view.inline1(a) match case Some(v) => Result.Value(v()) case None => results(a) @@ -85,12 +86,12 @@ private[sbt] final class Execute[F[_] <: AnyRef]( def dump: String = "State: " + state.toString + "\n\nResults: " + results + "\n\nCalls: " + callers + "\n\n" - def run[A](root: F[A])(using strategy: Strategy): Result[A] = + def run[A](root: TaskId[A])(using strategy: CompletionService): Result[A] = try { runKeep(root)(root) } catch { case i: Incomplete => Result.Inc(i) } - def runKeep[A](root: F[A])(using strategy: Strategy): RMap[F, Result] = { + def runKeep[A](root: TaskId[A])(using strategy: CompletionService): RMap[TaskId, Result] = { assert(state.isEmpty, "Execute already running/ran.") addNew(root) @@ -102,7 +103,7 @@ private[sbt] final class Execute[F[_] <: AnyRef]( finalResults } - def processAll()(using strategy: Strategy): Unit = { + def processAll()(using strategy: CompletionService): Unit = { @tailrec def next(): Unit = { pre { assert(reverse.nonEmpty, "Nothing to process.") @@ -135,7 +136,7 @@ private[sbt] final class Execute[F[_] <: AnyRef]( } def dumpCalling: String = state.filter(_._2 == Calling).mkString("\n\t") - def call[A](node: F[A], target: F[A])(using strategy: Strategy): Unit = { + def call[A](node: TaskId[A], target: TaskId[A])(using strategy: CompletionService): Unit = { if (config.checkCycles) cycleCheck(node, target) pre { assert(running(node)) @@ -145,7 +146,7 @@ private[sbt] final class Execute[F[_] <: AnyRef]( results.get(target) match { case Some(result) => retire(node, result) case None => - state(node.asInstanceOf) = Calling + state(node) = Calling addChecked(target) addCaller(node, target) } @@ -160,18 +161,16 @@ private[sbt] final class Execute[F[_] <: AnyRef]( } } - def retire[A](node: F[A], result: Result[A])(using strategy: Strategy): Unit = { + def retire[A](node: TaskId[A], result: Result[A])(using strategy: CompletionService): Unit = { pre { assert(running(node) | calling(node)) readyInv(node) } results(node) = result - state(node.asInstanceOf) = Done + state(node) = Done progress.afterCompleted(node, result) - remove(reverse.asInstanceOf[Map[F[A], Iterable[F[Any]]]], node) foreach { dep => - notifyDone(node, dep.asInstanceOf) - } + remove(reverse, node).foreach(dep => notifyDone(node, dep)) callers.remove(node).toList.flatten.foreach { c => retire(c, callerResult(c, result)) } @@ -183,23 +182,23 @@ private[sbt] final class Execute[F[_] <: AnyRef]( assert(done(node)) assert(results(node) == result) readyInv(node) - assert(!(reverse.contains(node.asInstanceOf))) + assert(!(reverse.contains(node))) assert(!(callers.contains(node))) assert(triggeredBy(node) forall added) } } - def callerResult[A](node: F[A], result: Result[A]): Result[A] = + def callerResult[A](node: TaskId[A], result: Result[A]): Result[A] = result match { case _: Result.Value[A] => result case Result.Inc(i) => Result.Inc(Incomplete(Some(node), tpe = i.tpe, causes = i :: Nil)) } - def notifyDone[A](node: F[A], dependent: F[Any])(using strategy: Strategy): Unit = { + def notifyDone(node: TaskId[?], dependent: TaskId[?])(using strategy: CompletionService): Unit = { val f = forward(dependent) - f -= node.asInstanceOf + f -= node if (f.isEmpty) { - remove[F[Any], IDSet[F[Any]]](forward.asInstanceOf, dependent) - ready[Any](dependent) + remove(forward, dependent) + ready(dependent) } } @@ -208,7 +207,7 @@ private[sbt] final class Execute[F[_] <: AnyRef]( * inputs and dependencies have completed. Its computation is then evaluated and made available * for nodes that have it as an input. */ - def addChecked[A](node: F[A])(using strategy: Strategy): Unit = { + def addChecked[A](node: TaskId[A])(using strategy: CompletionService): Unit = { if (!added(node)) addNew(node) post { addedInv(node) } @@ -219,14 +218,14 @@ private[sbt] final class Execute[F[_] <: AnyRef]( * have finished, the node's computation is scheduled to run. The node's dependencies will be * added (transitively) if they are not already registered. */ - def addNew[A](node: F[A])(using strategy: Strategy): Unit = { + def addNew(node: TaskId[?])(using strategy: CompletionService): Unit = { pre { newPre(node) } val v = register(node) - val deps: Iterable[F[Any]] = dependencies(v) ++ runBefore(node.asInstanceOf) - val active = IDSet[F[Any]](deps filter notDone.asInstanceOf) + val deps = dependencies(v) ++ runBefore(node) + val active = IDSet[TaskId[?]](deps filter notDone) progress.afterRegistered( - node.asInstanceOf, + node, deps, active.toList /* active is mutable, so take a snapshot */ @@ -234,10 +233,10 @@ private[sbt] final class Execute[F[_] <: AnyRef]( if (active.isEmpty) ready(node) else { - forward(node.asInstanceOf) = active.asInstanceOf + forward(node) = active for (a <- active) { - addChecked[Any](a.asInstanceOf) - addReverse[Any](a.asInstanceOf, node.asInstanceOf) + addChecked(a) + addReverse(a, node) } } @@ -253,45 +252,47 @@ private[sbt] final class Execute[F[_] <: AnyRef]( * Called when a pending 'node' becomes runnable. All of its dependencies must be done. This * schedules the node's computation with 'strategy'. */ - def ready[A](node: F[A])(using strategy: Strategy): Unit = { + def ready(node: TaskId[?])(using strategy: CompletionService): Unit = { pre { assert(pending(node)) readyInv(node) - assert(reverse.contains(node.asInstanceOf)) + assert(reverse.contains(node)) } - state(node.asInstanceOf) = Running - progress.afterReady(node.asInstanceOf) + state(node) = Running + progress.afterReady(node) submit(node) post { readyInv(node) - assert(reverse.contains(node.asInstanceOf)) + assert(reverse.contains(node)) assert(running(node)) } } /** Enters the given node into the system. */ - def register[A](node: F[A]): Node[F, A] = { - state(node.asInstanceOf) = Pending - reverse(node.asInstanceOf) = Seq() + def register[A](node: TaskId[A]): Node[A] = { + state(node) = Pending + reverse(node) = Seq() viewCache.getOrUpdate(node, view(node)) } /** Send the work for this node to the provided Strategy. */ - def submit[A](node: F[A])(using strategy: Strategy): Unit = { + def submit(node: TaskId[?])(using strategy: CompletionService): Unit = { val v = viewCache(node) - val rs = v.alist.transform[F, Result](v.in)(getResult) + val rs = v.alist.transform(v.in)(getResult) // v.alist.transform(v.in)(getResult) - strategy.submit(node.asInstanceOf, () => work(node, v.work(rs))) + strategy.submit(node, () => work(node, v.work(rs))) } /** * Evaluates the computation 'f' for 'node'. This returns a Completed instance, which contains the * post-processing to perform after the result is retrieved from the Strategy. */ - def work[A](node: F[A], f: => Either[F[A], A])(using strategy: Strategy): Completed = { - progress.beforeWork(node.asInstanceOf) + def work[A](node: TaskId[A], f: => Either[TaskId[A], A])(using + strategy: CompletionService + ): Completed = { + progress.beforeWork(node) val rawResult = wideConvert(f).left.map { case i: Incomplete => if (config.overwriteNode(i)) i.copy(node = Some(node)) else i case e => Incomplete(Some(node), Incomplete.Error, directCause = Some(e)) @@ -306,8 +307,8 @@ private[sbt] final class Execute[F[_] <: AnyRef]( } } private[this] def rewrap[A]( - rawResult: Either[Incomplete, Either[F[A], A]] - ): Either[F[A], Result[A]] = + rawResult: Either[Incomplete, Either[TaskId[A], A]] + ): Either[TaskId[A], Result[A]] = rawResult match { case Left(i) => Right(Result.Inc(i)) case Right(Right(v)) => Right(Result.Value(v)) @@ -317,108 +318,105 @@ private[sbt] final class Execute[F[_] <: AnyRef]( def remove[K, V](map: Map[K, V], k: K): V = map.remove(k).getOrElse(sys.error("Key '" + k + "' not in map :\n" + map)) - def addReverse[A](node: F[A], dependent: F[Any]): Unit = - reverse(node.asInstanceOf) ++= Seq(dependent) - def addCaller[A](caller: F[A], target: F[A]): Unit = - callers.getOrUpdate(target, IDSet.create[F[A]]) += caller + def addReverse(node: TaskId[?], dependent: TaskId[?]): Unit = + reverse(node) ++= Seq(dependent) + def addCaller[A](caller: TaskId[A], target: TaskId[A]): Unit = + callers.getOrUpdate(target, IDSet.create) += caller - def dependencies[A](node: F[A]): Iterable[F[Any]] = dependencies(viewCache(node.asInstanceOf)) - def dependencies[A](v: Node[F, A]): Iterable[F[Any]] = - v.alist.toList[F](v.in).filter(dep => view.inline1(dep).isEmpty) + def dependencies(node: TaskId[?]): Iterable[TaskId[?]] = dependencies(viewCache(node)) + def dependencies(v: Node[?]): Iterable[TaskId[?]] = + v.alist.toList(v.in).filter(dep => view.inline1(dep).isEmpty) - def runBefore[A](node: F[A]): Seq[F[A]] = - getSeq[A](triggers.runBefore, node) - def triggeredBy[A](node: F[A]): Seq[F[A]] = getSeq(triggers.injectFor, node) - def getSeq[A](map: collection.Map[F[Any], Seq[F[Any]]], node: F[A]): Seq[F[A]] = - map.getOrElse(node.asInstanceOf, nilSeq[F[Any]]).asInstanceOf + def runBefore(node: TaskId[?]): Seq[TaskId[?]] = triggers.runBefore.getOrElse(node, nilSeq) + def triggeredBy(node: TaskId[?]): Seq[TaskId[?]] = triggers.injectFor.getOrElse(node, nilSeq) // Contracts - def addedInv[A](node: F[A]): Unit = topologicalSort(node) foreach addedCheck - def addedCheck[A](node: F[A]): Unit = { + def addedInv(node: TaskId[?]): Unit = topologicalSort(node).foreach(addedCheck) + def addedCheck(node: TaskId[?]): Unit = { assert(added(node), "Not added: " + node) - assert(viewCache.contains[Any](node.asInstanceOf), "Not in view cache: " + node) - dependencyCheck(node.asInstanceOf) + assert(viewCache.contains(node), "Not in view cache: " + node) + dependencyCheck(node) } - def dependencyCheck(node: F[Any]): Unit = { + def dependencyCheck(node: TaskId[?]): Unit = { dependencies(node) foreach { dep => def onOpt[A](o: Option[A])(f: A => Boolean) = o match { case None => false; case Some(x) => f(x) } - def checkForward = onOpt(forward.get(node.asInstanceOf)) { _ contains dep.asInstanceOf } - def checkReverse = onOpt(reverse.get(dep.asInstanceOf)) { _.exists(_ == node) } - assert(done(dep.asInstanceOf) ^ (checkForward && checkReverse)) + def checkForward = onOpt(forward.get(node))(_.contains(dep)) + def checkReverse = onOpt(reverse.get(dep))(_.exists(_ == node)) + assert(done(dep) ^ (checkForward && checkReverse)) } } - def pendingInv[A](node: F[A]): Unit = { + def pendingInv(node: TaskId[?]): Unit = { assert(atState(node, Pending)) - assert((dependencies(node) ++ runBefore(node)) exists notDone.asInstanceOf) + assert((dependencies(node) ++ runBefore(node)).exists(notDone)) } - def runningInv[A](node: F[A]): Unit = { - assert(dependencies(node) forall done.asInstanceOf) - assert(!(forward.contains(node.asInstanceOf))) + def runningInv(node: TaskId[?]): Unit = { + assert(dependencies(node).forall(done)) + assert(!(forward.contains(node))) } - def newPre[A](node: F[A]): Unit = { + def newPre(node: TaskId[?]): Unit = { isNew(node) - assert(!(reverse.contains(node.asInstanceOf))) - assert(!(forward.contains(node.asInstanceOf))) - assert(!(callers.contains[Any](node.asInstanceOf))) - assert(!(viewCache.contains[Any](node.asInstanceOf))) - assert(!(results.contains[Any](node.asInstanceOf))) + assert(!(reverse.contains(node))) + assert(!(forward.contains(node))) + assert(!(callers.contains(node))) + assert(!(viewCache.contains(node))) + assert(!(results.contains(node))) } - def topologicalSort[A](node: F[A]): Seq[F[Any]] = { - val seen = IDSet.create[F[Any]] - def visit(n: F[Any]): List[F[Any]] = - (seen process n)(List[F[Any]]()) { - node.asInstanceOf :: dependencies(n).foldLeft(List[F[Any]]()) { (ss, dep) => - visit(dep.asInstanceOf) ::: ss - } + def topologicalSort(node: TaskId[?]): Seq[TaskId[?]] = { + val seen = IDSet.create[TaskId[?]] + def visit(n: TaskId[?]): List[TaskId[?]] = + seen.process(n)(List.empty) { + val deps: List[TaskId[?]] = + dependencies(n).foldLeft(List.empty)((ss, dep) => visit(dep) ::: ss) + node :: deps } - visit(node.asInstanceOf).reverse + visit(node).reverse } - def readyInv[A](node: F[A]): Unit = { - assert(dependencies(node) forall done.asInstanceOf) - assert(!(forward.contains(node.asInstanceOf))) + def readyInv(node: TaskId[?]): Unit = { + assert(dependencies(node).forall(done)) + assert(!(forward.contains(node))) } // cyclic reference checking def snapshotCycleCheck(): Unit = - callers.toSeq foreach { case (called: F[c], callers) => - for (caller <- callers) cycleCheck(caller.asInstanceOf[F[c]], called) + callers.toSeq foreach { case (called, callers) => + for (caller <- callers) cycleCheck(caller, called) } - def cycleCheck[A](node: F[A], target: F[A]): Unit = { + def cycleCheck(node: TaskId[?], target: TaskId[?]): Unit = { if (node eq target) cyclic(node, target, "Cannot call self") - val all = IDSet.create[F[A]] - def allCallers(n: F[A]): Unit = (all process n)(()) { + val all = IDSet.create[TaskId[?]] + def allCallers(n: TaskId[?]): Unit = (all process n)(()) { callers.get(n).toList.flatten.foreach(allCallers) } allCallers(node) if (all contains target) cyclic(node, target, "Cyclic reference") } - def cyclic[A](caller: F[A], target: F[A], msg: String) = + def cyclic(caller: TaskId[?], target: TaskId[?], msg: String) = throw new Incomplete( Some(caller), message = Some(msg), directCause = Some(new CyclicException(caller, target, msg)) ) - final class CyclicException[A](val caller: F[A], val target: F[A], msg: String) + final class CyclicException(val caller: TaskId[?], val target: TaskId[?], msg: String) extends Exception(msg) // state testing - def pending[A](d: F[A]) = atState(d, Pending) - def running[A](d: F[A]) = atState(d, Running) - def calling[A](d: F[A]) = atState(d, Calling) - def done[A](d: F[A]) = atState(d, Done) - def notDone[A](d: F[A]) = !done(d) - private def atState[A](d: F[A], s: State) = state.get(d.asInstanceOf) == Some(s) - def isNew[A](d: F[A]) = !added(d) - def added[A](d: F[A]) = state.contains(d.asInstanceOf) + def pending(d: TaskId[?]) = atState(d, Pending) + def running(d: TaskId[?]) = atState(d, Running) + def calling(d: TaskId[?]) = atState(d, Calling) + def done(d: TaskId[?]) = atState(d, Done) + def notDone(d: TaskId[?]) = !done(d) + private def atState(d: TaskId[?], s: State) = state.get(d) == Some(s) + def isNew(d: TaskId[?]) = !added(d) + def added(d: TaskId[?]) = state.contains(d) def complete = state.values.forall(_ == Done) def pre(f: => Unit) = if (checkPreAndPostConditions) f diff --git a/tasks/src/main/scala/sbt/ExecuteProgress.scala b/tasks/src/main/scala/sbt/ExecuteProgress.scala index e84603e5d..e58405f2f 100644 --- a/tasks/src/main/scala/sbt/ExecuteProgress.scala +++ b/tasks/src/main/scala/sbt/ExecuteProgress.scala @@ -14,7 +14,7 @@ import sbt.internal.util.RMap * except `started` and `finished`, which is called from the executing task's thread. All methods * should return quickly to avoid task execution overhead. */ -trait ExecuteProgress[F[_]] { +trait ExecuteProgress { def initial(): Unit /** @@ -22,20 +22,24 @@ trait ExecuteProgress[F[_]] { * `task` are `allDeps` and the subset of those dependencies that have not completed are * `pendingDeps`. */ - def afterRegistered(task: F[Any], allDeps: Iterable[F[Any]], pendingDeps: Iterable[F[Any]]): Unit + def afterRegistered( + task: TaskId[?], + allDeps: Iterable[TaskId[?]], + pendingDeps: Iterable[TaskId[?]] + ): Unit /** * Notifies that all of the dependencies of `task` have completed and `task` is therefore ready to * run. The task has not been scheduled on a thread yet. */ - def afterReady(task: F[Any]): Unit + def afterReady(task: TaskId[?]): Unit /** * Notifies that the work for `task` is starting after this call returns. This is called from the * thread the task executes on, unlike most other methods in this callback. It is called * immediately before the task's work starts with minimal intervening executor overhead. */ - def beforeWork(task: F[Any]): Unit + def beforeWork(task: TaskId[?]): Unit /** * Notifies that the work for `task` work has finished. The task may have computed the next task @@ -45,16 +49,16 @@ trait ExecuteProgress[F[_]] { * executes on, unlike most other methods in this callback. It is immediately called after the * task's work is complete with minimal intervening executor overhead. */ - def afterWork[A](task: F[A], result: Either[F[A], Result[A]]): Unit + def afterWork[A](task: TaskId[A], result: Either[TaskId[A], Result[A]]): Unit /** * Notifies that `task` has completed. The task's work is done with a final `result`. Any tasks * called by `task` have completed. */ - def afterCompleted[A](task: F[A], result: Result[A]): Unit + def afterCompleted[A](task: TaskId[A], result: Result[A]): Unit /** All tasks have completed with the final `results` provided. */ - def afterAllCompleted(results: RMap[F, Result]): Unit + def afterAllCompleted(results: RMap[TaskId, Result]): Unit /** Notifies that either all tasks have finished or cancelled. */ def stop(): Unit @@ -64,46 +68,46 @@ trait ExecuteProgress[F[_]] { * This module is experimental and subject to binary and source incompatible changes at any time. */ object ExecuteProgress { - def empty[F[_]]: ExecuteProgress[F] = new ExecuteProgress[F] { + def empty: ExecuteProgress = new ExecuteProgress { override def initial(): Unit = () override def afterRegistered( - task: F[Any], - allDeps: Iterable[F[Any]], - pendingDeps: Iterable[F[Any]] + task: TaskId[?], + allDeps: Iterable[TaskId[?]], + pendingDeps: Iterable[TaskId[?]] ): Unit = () - override def afterReady(task: F[Any]): Unit = () - override def beforeWork(task: F[Any]): Unit = () - override def afterWork[A](task: F[A], result: Either[F[A], Result[A]]): Unit = () - override def afterCompleted[A](task: F[A], result: Result[A]): Unit = () - override def afterAllCompleted(results: RMap[F, Result]): Unit = () + override def afterReady(task: TaskId[?]): Unit = () + override def beforeWork(task: TaskId[?]): Unit = () + override def afterWork[A](task: TaskId[A], result: Either[TaskId[A], Result[A]]): Unit = () + override def afterCompleted[A](task: TaskId[A], result: Result[A]): Unit = () + override def afterAllCompleted(results: RMap[TaskId, Result]): Unit = () override def stop(): Unit = () } - def aggregate[F[_]](reporters: Seq[ExecuteProgress[F]]) = new ExecuteProgress[F] { + def aggregate(reporters: Seq[ExecuteProgress]) = new ExecuteProgress { override def initial(): Unit = { reporters foreach { _.initial() } } override def afterRegistered( - task: F[Any], - allDeps: Iterable[F[Any]], - pendingDeps: Iterable[F[Any]] + task: TaskId[?], + allDeps: Iterable[TaskId[?]], + pendingDeps: Iterable[TaskId[?]] ): Unit = { reporters foreach { _.afterRegistered(task, allDeps, pendingDeps) } } - override def afterReady(task: F[Any]): Unit = { + override def afterReady(task: TaskId[?]): Unit = { reporters foreach { _.afterReady(task) } } - override def beforeWork(task: F[Any]): Unit = { + override def beforeWork(task: TaskId[?]): Unit = { reporters foreach { _.beforeWork(task) } } - override def afterWork[A](task: F[A], result: Either[F[A], Result[A]]): Unit = { + override def afterWork[A](task: TaskId[A], result: Either[TaskId[A], Result[A]]): Unit = { reporters foreach { _.afterWork(task, result) } } - override def afterCompleted[A](task: F[A], result: Result[A]): Unit = { + override def afterCompleted[A](task: TaskId[A], result: Result[A]): Unit = { reporters foreach { _.afterCompleted(task, result) } } - override def afterAllCompleted(results: RMap[F, Result]): Unit = { + override def afterAllCompleted(results: RMap[TaskId, Result]): Unit = { reporters foreach { _.afterAllCompleted(results) } } override def stop(): Unit = { diff --git a/tasks/src/main/scala/sbt/Incomplete.scala b/tasks/src/main/scala/sbt/Incomplete.scala index 1cbbec4a2..95deb8c40 100644 --- a/tasks/src/main/scala/sbt/Incomplete.scala +++ b/tasks/src/main/scala/sbt/Incomplete.scala @@ -11,6 +11,7 @@ import scala.collection.mutable.ListBuffer import sbt.internal.util.IDSet import Incomplete.{ Error, Value => IValue } +import scala.jdk.CollectionConverters.* /** * Describes why a task did not complete. @@ -45,7 +46,6 @@ object Incomplete extends Enumeration { def transformTD(i: Incomplete)(f: Incomplete => Incomplete): Incomplete = transform(i, true)(f) def transformBU(i: Incomplete)(f: Incomplete => Incomplete): Incomplete = transform(i, false)(f) def transform(i: Incomplete, topDown: Boolean)(f: Incomplete => Incomplete): Incomplete = { - import collection.JavaConverters._ val visited: collection.mutable.Map[Incomplete, Incomplete] = (new java.util.IdentityHashMap[Incomplete, Incomplete]).asScala def visit(inc: Incomplete): Incomplete = diff --git a/tasks/src/main/scala/sbt/Node.scala b/tasks/src/main/scala/sbt/Node.scala index 09e17de70..eb211dba7 100644 --- a/tasks/src/main/scala/sbt/Node.scala +++ b/tasks/src/main/scala/sbt/Node.scala @@ -17,11 +17,11 @@ import sbt.internal.util.AList * @tparam A * the type computed by this node */ -private[sbt] trait Node[Effect[_], A]: +private[sbt] trait Node[A]: type K[L[x]] - def in: K[Effect] + def in: K[TaskId] def alist: AList[K] /** Computes the result of this task given the results from the inputs. */ - def work(inputs: K[Result]): Either[Effect[A], A] + def work(inputs: K[Result]): Either[TaskId[A], A] end Node diff --git a/tasks/src/main/scala/sbt/TaskId.scala b/tasks/src/main/scala/sbt/TaskId.scala new file mode 100644 index 000000000..1ac1c9b3e --- /dev/null +++ b/tasks/src/main/scala/sbt/TaskId.scala @@ -0,0 +1,6 @@ +package sbt + +import sbt.internal.util.AttributeMap + +trait TaskId[A]: + def tags: ConcurrentRestrictions.TagMap