Merge pull request #4561 from eed3si9n/wip/progress

open up ExecuteProgress, and adds a few keys
This commit is contained in:
eugene yokota 2019-03-20 11:06:03 -04:00 committed by GitHub
commit e8a011bf5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 144 additions and 87 deletions

View File

@ -291,7 +291,10 @@ lazy val taskProj = (project in file("tasks"))
name := "Tasks",
mimaSettings,
mimaBinaryIssueFilters ++= Seq(
exclude[ReversedMissingMethodProblem]("sbt.ExecuteProgress.stop")
// ok because sbt.ExecuteProgress has been under private[sbt]
exclude[IncompatibleResultTypeProblem]("sbt.ExecuteProgress.initial"),
exclude[DirectMissingMethodProblem]("sbt.ExecuteProgress.*"),
exclude[ReversedMissingMethodProblem]("sbt.ExecuteProgress.*"),
)
)
.configure(addSbtUtilControl)

View File

@ -259,6 +259,15 @@ object Defaults extends BuildCommon {
() =>
{ IO.delete(dir); IO.createDirectory(dir) }
},
useSuperShell :== sbt.internal.TaskProgress.isEnabled,
progressReports := { (s: State) =>
val progress = useSuperShell.value
val rs = EvaluateTask.taskTimingProgress.toVector ++ {
if (progress) Vector(EvaluateTask.taskProgress(s))
else Vector()
}
rs map { Keys.TaskProgress(_) }
},
Previous.cache := new Previous(
Def.streamsManagerKey.value,
Previous.references.value.getReferences

View File

@ -162,19 +162,21 @@ object EvaluateTask {
import Keys.state
lazy private val sharedProgress = new TaskTimings(reportOnShutdown = true)
// sbt-pgp calls this
private[sbt] def defaultProgress(): ExecuteProgress[Task] = ExecuteProgress.empty[Task]
private[sbt] def defaultProgress(currentRef: ProjectRef): ExecuteProgress[Task] =
def taskTimingProgress: Option[ExecuteProgress[Task]] =
if (java.lang.Boolean.getBoolean("sbt.task.timings")) {
if (java.lang.Boolean.getBoolean("sbt.task.timings.on.shutdown"))
sharedProgress
Some(sharedProgress)
else
new TaskTimings(reportOnShutdown = false)
} else {
if (ConsoleAppender.showProgress) new TaskProgress(currentRef)
else ExecuteProgress.empty[Task]
}
Some(new TaskTimings(reportOnShutdown = false))
} else None
def taskProgress(state: State): ExecuteProgress[Task] = {
new TaskProgress(Project.extract(state).currentRef)
}
// sbt-pgp calls this
@deprecated("No longer used", "1.3.0")
private[sbt] def defaultProgress(): ExecuteProgress[Task] = ExecuteProgress.empty[Task]
val SystemProcessors = Runtime.getRuntime.availableProcessors
@ -232,13 +234,22 @@ object EvaluateTask {
state: State
): ExecuteProgress[Task] = {
import Types.const
val maker: State => Keys.TaskProgress = getSetting(
Keys.executeProgress,
const(new Keys.TaskProgress(defaultProgress(extracted.currentRef))),
val maker: State => Seq[Keys.TaskProgress] = getSetting(
Keys.progressReports,
const(Seq()),
extracted,
structure
)
maker(state).progress
val reporters = maker(state) map { _.progress }
// configure the logger for super shell
ConsoleAppender.setShowProgress((reporters collect {
case p: TaskProgress => ()
}).nonEmpty)
reporters match {
case xs if xs.isEmpty => ExecuteProgress.empty[Task]
case xs if xs.size == 1 => xs.head
case xs => ExecuteProgress.aggregate[Task](xs)
}
}
// TODO - Should this pull from Global or from the project itself?
private[sbt] def forcegc(extracted: Extracted, structure: BuildStructure): Boolean =
@ -258,7 +269,7 @@ object EvaluateTask {
extracted: Extracted,
structure: BuildStructure
): T =
key in extracted.currentRef get structure.data getOrElse default
(key in extracted.currentRef).get(structure.data).getOrElse(default)
def injectSettings: Seq[Setting[_]] = Seq(
(state in Global) ::= dummyState,

View File

@ -456,6 +456,18 @@ object Keys {
val (executionRoots, dummyRoots) = Def.dummy[Seq[ScopedKey[_]]]("executionRoots", "The list of root tasks for this task execution. Roots are the top-level tasks that were directly requested to be run.")
val state = Def.stateKey
val streamsManager = Def.streamsManagerKey
// wrapper to work around SI-2915
final class TaskProgress(val progress: ExecuteProgress[Task])
object TaskProgress {
def apply(progress: ExecuteProgress[Task]): TaskProgress = new TaskProgress(progress)
}
val useSuperShell = settingKey[Boolean]("Enables (true) or disables the super shell.")
// This key can be used to add custom ExecuteProgress instances
val progressReports = settingKey[State => Seq[TaskProgress]]("A function that returns a list of progress reporters.").withRank(DTask)
private[sbt] val postProgressReports = settingKey[Unit]("Internally used to modify logger.").withRank(DTask)
@deprecated("No longer used", "1.3.0")
private[sbt] val executeProgress = settingKey[State => TaskProgress]("Experimental task execution listener.").withRank(DTask)
private[sbt] val globalFileTreeView = AttributeKey[FileTreeDataView[FileCacheEntry]](
"globalFileTreeView",
"Provides a view into the file system that may or may not cache the tree in memory",
@ -468,10 +480,6 @@ object Keys {
val globalPluginUpdate = taskKey[UpdateReport]("A hook to get the UpdateReport of the global plugin.").withRank(DTask)
val classLoaderCache = taskKey[internal.ClassLoaderCache]("The cache of ClassLoaders to be used for layering in tasks that invoke other java code").withRank(DTask)
private[sbt] val taskRepository = AttributeKey[TaskRepository.Repr]("task-repository", "A repository that can be used to cache arbitrary values for a given task key that can be read or filled during task evaluation.", 10000)
// wrapper to work around SI-2915
private[sbt] final class TaskProgress(val progress: ExecuteProgress[Task])
private[sbt] val executeProgress = settingKey[State => TaskProgress]("Experimental task execution listener.").withRank(DTask)
private[sbt] val taskCancelStrategy = settingKey[State => TaskCancellationStrategy]("Experimental task cancellation handler.").withRank(DTask)
// Experimental in sbt 0.13.2 to enable grabbing semantic compile failures.

View File

@ -138,20 +138,6 @@ object StandardMain {
// This is to workaround https://github.com/sbt/io/issues/110
sys.props.put("jna.nosys", "true")
ConsoleAppender.setTerminalWidth(JLine.usingTerminal(_.getWidth))
ConsoleAppender.setShowProgress(
ConsoleAppender.formatEnabledInEnv && sys.props
.get("sbt.progress")
.flatMap({ s =>
ConsoleAppender.parseLogOption(s) match {
case LogOption.Always => Some(true)
case LogOption.Never => Some(false)
case _ => None
}
})
.getOrElse(true)
)
import BasicCommandStrings.isEarlyCommand
val userCommands = configuration.arguments.map(_.trim)
val (earlyCommands, normalCommands) = (preCommands ++ userCommands).partition(isEarlyCommand)

View File

@ -8,7 +8,7 @@
package sbt
package internal
import sbt.internal.util.{ RMap, ConsoleOut }
import sbt.internal.util.{ RMap, ConsoleOut, ConsoleAppender, LogOption, JLine }
import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.{ blocking, Future, ExecutionContext }
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
@ -20,8 +20,6 @@ import TaskProgress._
* implements task progress display on the shell.
*/
private[sbt] final class TaskProgress(currentRef: ProjectRef) extends ExecuteProgress[Task] {
type S = Unit
private[this] val showScopedKey = Def.showRelativeKey2(currentRef)
// private[this] var start = 0L
private[this] val activeTasks = new ConcurrentHashMap[Task[_], Long]
@ -33,9 +31,11 @@ private[sbt] final class TaskProgress(currentRef: ProjectRef) extends ExecutePro
private[this] val isAllCompleted = new AtomicBoolean(false)
private[this] val isStopped = new AtomicBoolean(false)
override def initial: Unit = ()
override def registered(
state: Unit,
override def initial(): Unit = {
ConsoleAppender.setTerminalWidth(JLine.usingTerminal(_.getWidth))
}
override def afterRegistered(
task: Task[_],
allDeps: Iterable[Task[_]],
pendingDeps: Iterable[Task[_]]
@ -47,16 +47,16 @@ private[sbt] final class TaskProgress(currentRef: ProjectRef) extends ExecutePro
}
}
}
override def ready(state: Unit, task: Task[_]): Unit = {
override def afterReady(task: Task[_]): Unit = {
isReady.set(true)
}
override def workStarting(task: Task[_]): Unit = {
override def beforeWork(task: Task[_]): Unit = {
activeTasks.put(task, System.nanoTime)
()
}
override def workFinished[A](task: Task[A], result: Either[Task[A], Result[A]]): Unit = {
override def afterWork[A](task: Task[A], result: Either[Task[A], Result[A]]): Unit = {
val start = activeTasks.get(task)
timings.put(task, System.nanoTime - start)
activeTasks.remove(task)
@ -66,7 +66,7 @@ private[sbt] final class TaskProgress(currentRef: ProjectRef) extends ExecutePro
}
}
override def completed[A](state: Unit, task: Task[A], result: Result[A]): Unit = ()
override def afterCompleted[A](task: Task[A], result: Result[A]): Unit = ()
override def stop(): Unit = {
isStopped.set(true)
@ -88,7 +88,7 @@ private[sbt] final class TaskProgress(currentRef: ProjectRef) extends ExecutePro
}
private[this] val console = ConsoleOut.systemOut
override def allCompleted(state: Unit, results: RMap[Task, Result]): Unit = {
override def afterAllCompleted(results: RMap[Task, Result]): Unit = {
isAllCompleted.set(true)
// completionReport()
}
@ -167,4 +167,17 @@ private[sbt] object TaskProgress {
def cursorUp(n: Int): String = s"\u001B[${n}A"
def cursorDown(n: Int): String = s"\u001B[${n}B"
final val CursorDown1 = cursorDown(1)
def isEnabled: Boolean =
ConsoleAppender.formatEnabledInEnv && sys.props
.get("sbt.supershell")
.flatMap(
str =>
ConsoleAppender.parseLogOption(str) match {
case LogOption.Always => Some(true)
case LogOption.Never => Some(false)
case _ => None
}
)
.getOrElse(true)
}

View File

@ -39,8 +39,6 @@ private[sbt] final class TaskTimings(reportOnShutdown: Boolean) extends ExecuteP
("ms", 6)
}
type S = Unit
if (reportOnShutdown) {
start = System.nanoTime
Runtime.getRuntime.addShutdownHook(new Thread {
@ -48,35 +46,34 @@ private[sbt] final class TaskTimings(reportOnShutdown: Boolean) extends ExecuteP
})
}
def initial = {
override def initial(): Unit = {
if (!reportOnShutdown)
start = System.nanoTime
}
def registered(
state: Unit,
override def afterRegistered(
task: Task[_],
allDeps: Iterable[Task[_]],
pendingDeps: Iterable[Task[_]]
) = {
): Unit = {
pendingDeps foreach { t =>
if (transformNode(t).isEmpty) anonOwners.put(t, task)
}
}
def ready(state: Unit, task: Task[_]) = ()
def workStarting(task: Task[_]) = { timings.put(task, System.nanoTime); () }
def workFinished[T](task: Task[T], result: Either[Task[T], Result[T]]) = {
override def afterReady(task: Task[_]): Unit = ()
override def beforeWork(task: Task[_]): Unit = { timings.put(task, System.nanoTime); () }
override def afterWork[T](task: Task[T], result: Either[Task[T], Result[T]]) = {
timings.put(task, System.nanoTime - timings.get(task))
result.left.foreach { t =>
calledBy.put(t, task)
}
}
def completed[T](state: Unit, task: Task[T], result: Result[T]) = ()
def allCompleted(state: Unit, results: RMap[Task, Result]) =
override def afterCompleted[T](task: Task[T], result: Result[T]): Unit = ()
override def afterAllCompleted(results: RMap[Task, Result]): Unit =
if (!reportOnShutdown) {
report()
}
def stop(): Unit = ()
override def stop(): Unit = ()
private val reFilePath = raw"\{[^}]+\}".r

View File

@ -69,14 +69,14 @@ private[sbt] final class Execute[F[_] <: AnyRef](
case None => results(a)
}
)
private[this] var progressState: progress.S = progress.initial
private[this] type State = State.Value
private[this] object State extends Enumeration {
val Pending, Running, Calling, Done = Value
}
import State.{ Pending, Running, Calling, Done }
val init = progress.initial()
def dump: String =
"State: " + state.toString + "\n\nResults: " + results + "\n\nCalls: " + callers + "\n\n"
@ -90,7 +90,7 @@ private[sbt] final class Execute[F[_] <: AnyRef](
processAll()
assert(results contains root, "No result for root node.")
val finalResults = triggers.onComplete(results)
progressState = progress.allCompleted(progressState, finalResults)
progress.afterAllCompleted(finalResults)
finalResults
}
@ -153,7 +153,7 @@ private[sbt] final class Execute[F[_] <: AnyRef](
results(node) = result
state(node) = Done
progressState = progress.completed(progressState, node, result)
progress.afterCompleted(node, result)
remove(reverse, node) foreach { dep =>
notifyDone(node, dep)
}
@ -210,8 +210,7 @@ private[sbt] final class Execute[F[_] <: AnyRef](
val v = register(node)
val deps = dependencies(v) ++ runBefore(node)
val active = IDSet[F[_]](deps filter notDone)
progressState = progress.registered(
progressState,
progress.afterRegistered(
node,
deps,
active.toList
@ -245,7 +244,7 @@ private[sbt] final class Execute[F[_] <: AnyRef](
}
state(node) = Running
progressState = progress.ready(progressState, node)
progress.afterReady(node)
submit(node)
post {
@ -274,13 +273,13 @@ private[sbt] final class Execute[F[_] <: AnyRef](
* This returns a Completed instance, which contains the post-processing to perform after the result is retrieved from the Strategy.
*/
def work[A](node: F[A], f: => Either[F[A], A])(implicit strategy: Strategy): Completed = {
progress.workStarting(node)
progress.beforeWork(node)
val rawResult = wideConvert(f).left.map {
case i: Incomplete => if (config.overwriteNode(i)) i.copy(node = Some(node)) else i
case e => Incomplete(Some(node), Incomplete.Error, directCause = Some(e))
}
val result = rewrap(rawResult)
progress.workFinished(node, result)
progress.afterWork(node, result)
completed {
result match {
case Right(v) => retire(node, v)

View File

@ -14,32 +14,29 @@ import sbt.internal.util.RMap
* All methods are called from the same thread except `started` and `finished`,
* which is called from the executing task's thread.
* All methods should return quickly to avoid task execution overhead.
*
* This class is experimental and subject to binary and source incompatible changes at any time.
*/
private[sbt] trait ExecuteProgress[F[_]] {
type S
def initial: S
trait ExecuteProgress[F[_]] {
def initial(): Unit
/**
* Notifies that a `task` has been registered in the system for execution.
* The dependencies of `task` are `allDeps` and the subset of those dependencies that
* have not completed are `pendingDeps`.
*/
def registered(state: S, task: F[_], allDeps: Iterable[F[_]], pendingDeps: Iterable[F[_]]): S
def afterRegistered(task: F[_], allDeps: Iterable[F[_]], pendingDeps: Iterable[F[_]]): Unit
/**
* Notifies that all of the dependencies of `task` have completed and `task` is therefore
* ready to run. The task has not been scheduled on a thread yet.
*/
def ready(state: S, task: F[_]): S
def afterReady(task: F[_]): Unit
/**
* Notifies that the work for `task` is starting after this call returns.
* This is called from the thread the task executes on, unlike most other methods in this callback.
* It is called immediately before the task's work starts with minimal intervening executor overhead.
*/
def workStarting(task: F[_]): Unit
def beforeWork(task: F[_]): Unit
/**
* Notifies that the work for `task` work has finished. The task may have computed the next task to
@ -49,34 +46,68 @@ private[sbt] trait ExecuteProgress[F[_]] {
* This is called from the thread the task executes on, unlike most other methods in this callback.
* It is immediately called after the task's work is complete with minimal intervening executor overhead.
*/
def workFinished[A](task: F[A], result: Either[F[A], Result[A]]): Unit
def afterWork[A](task: F[A], result: Either[F[A], Result[A]]): Unit
/**
* Notifies that `task` has completed.
* The task's work is done with a final `result`.
* Any tasks called by `task` have completed.
*/
def completed[A](state: S, task: F[A], result: Result[A]): S
def afterCompleted[A](task: F[A], result: Result[A]): Unit
/** All tasks have completed with the final `results` provided. */
def allCompleted(state: S, results: RMap[F, Result]): S
def afterAllCompleted(results: RMap[F, Result]): Unit
/** Notifies that either all tasks have finished or cancelled. */
def stop(): Unit
}
/** This module is experimental and subject to binary and source incompatible changes at any time. */
private[sbt] object ExecuteProgress {
object ExecuteProgress {
def empty[F[_]]: ExecuteProgress[F] = new ExecuteProgress[F] {
type S = Unit
def initial = ()
def registered(state: Unit, task: F[_], allDeps: Iterable[F[_]], pendingDeps: Iterable[F[_]]) =
override def initial(): Unit = ()
override def afterRegistered(
task: F[_],
allDeps: Iterable[F[_]],
pendingDeps: Iterable[F[_]]
): Unit =
()
def ready(state: Unit, task: F[_]) = ()
def workStarting(task: F[_]) = ()
def workFinished[A](task: F[A], result: Either[F[A], Result[A]]) = ()
def completed[A](state: Unit, task: F[A], result: Result[A]) = ()
def allCompleted(state: Unit, results: RMap[F, Result]) = ()
def stop(): Unit = ()
override def afterReady(task: F[_]): Unit = ()
override def beforeWork(task: F[_]): Unit = ()
override def afterWork[A](task: F[A], result: Either[F[A], Result[A]]): Unit = ()
override def afterCompleted[A](task: F[A], result: Result[A]): Unit = ()
override def afterAllCompleted(results: RMap[F, Result]): Unit = ()
override def stop(): Unit = ()
}
def aggregate[F[_]](reporters: Seq[ExecuteProgress[F]]) = new ExecuteProgress[F] {
override def initial(): Unit = {
reporters foreach { _.initial() }
}
override def afterRegistered(
task: F[_],
allDeps: Iterable[F[_]],
pendingDeps: Iterable[F[_]]
): Unit = {
reporters foreach { _.afterRegistered(task, allDeps, pendingDeps) }
}
override def afterReady(task: F[_]): Unit = {
reporters foreach { _.afterReady(task) }
}
override def beforeWork(task: F[_]): Unit = {
reporters foreach { _.beforeWork(task) }
}
override def afterWork[A](task: F[A], result: Either[F[A], Result[A]]): Unit = {
reporters foreach { _.afterWork(task, result) }
}
override def afterCompleted[A](task: F[A], result: Result[A]): Unit = {
reporters foreach { _.afterCompleted(task, result) }
}
override def afterAllCompleted(results: RMap[F, Result]): Unit = {
reporters foreach { _.afterAllCompleted(results) }
}
override def stop(): Unit = {
reporters foreach { _.stop() }
}
}
}