Fix TaskProgress

Supershell was not reliably working and I tracked it down to
TaskProgress not actually publishing updates during task execution. This
seemed to happen because the background task was only run once when the
task started up. Once that task exited, no further task reports would be
published. The fix is to start a new thread every time we enter
EvaluateTask. I verified manually that it did not seem to leak threads
because EvaluateTask always calls shutdown, which calls
afterAllCompleted, which stops the progress thread.

I also decreased the default report period to 100ms. I can't imagine
that this will have a big effect on performance. It can be tuned with
the sbt.supershell.sleep parameter.
This commit is contained in:
Ethan Atkins 2019-05-14 16:06:49 -07:00
parent 91a0b8ebfe
commit 564aa7262b
2 changed files with 42 additions and 36 deletions

View File

@ -467,6 +467,7 @@ object EvaluateTask {
// Register with our cancel handler we're about to start.
val strat = config.cancelStrategy
val cancelState = strat.onTaskEngineStart(runningEngine)
config.progressReporter.initial()
try {
(state.get(stateBuildStructure), state.get(sessionSettings)) match {
case (Some(structure), Some(settings)) =>

View File

@ -8,18 +8,13 @@
package sbt
package internal
import sbt.internal.util.{
RMap,
ConsoleAppender,
LogOption,
JLine,
ManagedLogger,
ProgressEvent,
ProgressItem
}
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicReference }
import sbt.internal.util._
import sbt.util.Level
import scala.concurrent.{ blocking, Future, ExecutionContext }
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
import scala.annotation.tailrec
import scala.util.control.NonFatal
/**
* implements task progress display on the shell.
@ -27,46 +22,56 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
private[sbt] final class TaskProgress(log: ManagedLogger)
extends AbstractTaskExecuteProgress
with ExecuteProgress[Task] {
private[this] val isReady = new AtomicBoolean(false)
private[this] val lastTaskCount = new AtomicInteger(0)
private[this] val isAllCompleted = new AtomicBoolean(false)
private[this] val isStopped = new AtomicBoolean(false)
private[this] val currentProgressThread = new AtomicReference[Option[ProgressThread]](None)
private[this] val sleepDuration =
try System.getProperty("sbt.supershell.sleep", "100").toLong
catch { case NonFatal(_) => 100L }
private[this] final class ProgressThread
extends Thread("task-progress-report-thread")
with AutoCloseable {
private[this] val isClosed = new AtomicBoolean(false)
setDaemon(true)
start()
@tailrec override def run(): Unit = {
if (!isClosed.get()) {
try {
report()
Thread.sleep(sleepDuration)
} catch {
case _: InterruptedException =>
}
run()
}
}
override def close(): Unit = {
isClosed.set(true)
interrupt()
}
}
override def initial(): Unit = {
currentProgressThread.get() match {
case None =>
currentProgressThread.set(Some(new ProgressThread))
case _ =>
}
ConsoleAppender.setTerminalWidth(JLine.terminal.getWidth)
}
override def afterReady(task: Task[_]): Unit = {
isReady.set(true)
}
override def afterReady(task: Task[_]): Unit = ()
override def afterCompleted[A](task: Task[A], result: Result[A]): Unit = ()
override def stop(): Unit = {
isStopped.set(true)
}
import ExecutionContext.Implicits._
Future {
while (!isReady.get && !isStopped.get) {
blocking {
Thread.sleep(500)
}
}
while (!isAllCompleted.get && !isStopped.get) {
blocking {
report()
Thread.sleep(500)
}
}
}
override def stop(): Unit = currentProgressThread.getAndSet(None).foreach(_.close())
override def afterAllCompleted(results: RMap[Task, Result]): Unit = {
// send an empty progress report to clear out the previous report
val event = ProgressEvent("Info", Vector(), Some(lastTaskCount.get), None, None)
import sbt.internal.util.codec.JsonProtocol._
log.logEvent(Level.Info, event)
isAllCompleted.set(true)
stop()
}
private[this] val skipReportTasks =
Set("run", "bgRun", "fgRun", "scala", "console", "consoleProject")