Use only one progress thread during task evaluation

In some circumstances, sbt would generate a number of task progress
threads that could run concurrently. The issue was that the TaskProgress
could be shared by multiple EvaluateTaskConfigs if a dynamic task was
used. This was problematic because when a dynamic task completed, it
might call afterAllCompleted which would stop the progress thread. There
also was a race condition because multiple threads calling initial could
theoretically have created a new progress thread which would cause a
resource leak.

To fix this, we modify the shared task progress so that the `stop()`
method is a no-op. This should prevent dynamic tasks from stopping the
progress thread. We also defer the creation of the task thread until
there is at least one active task. This prevents a thread from being
created in the shell.

The motivation for this change was that I found that sometimes there was
a leaked progress thread that would make the shell not really work for
me because the progress thread would overwrite the shell prompt. This
change fixes that behavior and I was able to validate with jstack that
there was consistently either one or zero task progress threads at a
time (zero in the shell, one when tasks were actually running).
This commit is contained in:
Ethan Atkins 2019-10-05 14:35:09 -07:00
parent 367461e586
commit 6559c3a06d
3 changed files with 66 additions and 37 deletions

View File

@ -224,36 +224,59 @@ object EvaluateTask {
structure: BuildStructure,
state: State
): ExecuteProgress[Task] = {
state.get(currentTaskProgress).map(_.progress).getOrElse {
val maker: Seq[Keys.TaskProgress] = getSetting(
Keys.progressReports,
Seq(),
extracted,
structure
)
val progressReporter = extracted.getOpt(progressState in ThisBuild).flatMap {
case Some(ps) =>
ps.reset()
ConsoleAppender.setShowProgress(true)
val appender = MainAppender.defaultScreen(StandardMain.console)
appender match {
case c: ConsoleAppender => c.setProgressState(ps)
case _ =>
}
val log = LogManager.progressLogger(appender)
Some(new TaskProgress(log))
case _ => None
state
.get(currentTaskProgress)
.map { tp =>
new ExecuteProgress[Task] {
val progress = tp.progress
override def initial(): Unit = progress.initial()
override def afterRegistered(
task: Task[_],
allDeps: Iterable[Task[_]],
pendingDeps: Iterable[Task[_]]
): Unit =
progress.afterRegistered(task, allDeps, pendingDeps)
override def afterReady(task: Task[_]): Unit = progress.afterReady(task)
override def beforeWork(task: Task[_]): Unit = progress.beforeWork(task)
override def afterWork[A](task: Task[A], result: Either[Task[A], Result[A]]): Unit =
progress.afterWork(task, result)
override def afterCompleted[A](task: Task[A], result: Result[A]): Unit =
progress.afterCompleted(task, result)
override def afterAllCompleted(results: RMap[Task, Result]): Unit =
progress.afterAllCompleted(results)
override def stop(): Unit = {}
}
}
val reporters = maker.map(_.progress) ++ progressReporter ++
(if (SysProp.taskTimings)
new TaskTimings(reportOnShutdown = false, state.globalLogging.full) :: Nil
else Nil)
reporters match {
case xs if xs.isEmpty => ExecuteProgress.empty[Task]
case xs if xs.size == 1 => xs.head
case xs => ExecuteProgress.aggregate[Task](xs)
.getOrElse {
val maker: Seq[Keys.TaskProgress] = getSetting(
Keys.progressReports,
Seq(),
extracted,
structure
)
val progressReporter = extracted.getOpt(progressState in ThisBuild).flatMap {
case Some(ps) =>
ps.reset()
ConsoleAppender.setShowProgress(true)
val appender = MainAppender.defaultScreen(StandardMain.console)
appender match {
case c: ConsoleAppender => c.setProgressState(ps)
case _ =>
}
val log = LogManager.progressLogger(appender)
Some(new TaskProgress(log))
case _ => None
}
val reporters = maker.map(_.progress) ++ progressReporter ++
(if (SysProp.taskTimings)
new TaskTimings(reportOnShutdown = false, state.globalLogging.full) :: Nil
else Nil)
reporters match {
case xs if xs.isEmpty => ExecuteProgress.empty[Task]
case xs if xs.size == 1 => xs.head
case xs => ExecuteProgress.aggregate[Task](xs)
}
}
}
}
// TODO - Should this pull from Global or from the project itself?
private[sbt] def forcegc(extracted: Extracted, structure: BuildStructure): Boolean =

View File

@ -48,14 +48,7 @@ private[sbt] final class TaskProgress(log: ManagedLogger)
}
}
override def initial(): Unit = {
currentProgressThread.get() match {
case None =>
currentProgressThread.set(Some(new ProgressThread))
case _ =>
}
ConsoleAppender.setTerminalWidth(JLine.terminal.getWidth)
}
override def initial(): Unit = ConsoleAppender.setTerminalWidth(JLine.terminal.getWidth)
override def beforeWork(task: Task[_]): Unit = {
super.beforeWork(task)
@ -72,15 +65,27 @@ private[sbt] final class TaskProgress(log: ManagedLogger)
val event = ProgressEvent("Info", Vector(), Some(lastTaskCount.get), None, None)
import sbt.internal.util.codec.JsonProtocol._
log.logEvent(Level.Info, event)
stop()
}
private[this] val skipReportTasks =
Set("run", "bgRun", "fgRun", "scala", "console", "consoleProject", "consoleQuick", "state")
private[this] def maybeStartThread(): Unit = {
currentProgressThread.get() match {
case None =>
currentProgressThread.synchronized {
currentProgressThread.get() match {
case None => currentProgressThread.set(Some(new ProgressThread))
case _ =>
}
}
case _ =>
}
}
private[this] def report(): Unit = {
val currentTasks = activeTasks.toVector.filterNot(Def.isDummy)
val ltc = lastTaskCount.get
val currentTasksCount = currentTasks.size
def report0(tasks: Vector[Task[_]]): Unit = {
if (tasks.nonEmpty) maybeStartThread()
val event = ProgressEvent(
"Info",
tasks

View File

@ -95,6 +95,7 @@ private[sbt] final class Execute[F[_] <: AnyRef](
assert(results contains root, "No result for root node.")
val finalResults = triggers.onComplete(results)
progress.afterAllCompleted(finalResults)
progress.stop()
finalResults
}