Skip task progress work if already shutdown

The play plugin seems to do out of band task evaluation on a stale State
object in the `run` task. As a result, when sbt tries to schedule tasks
to run, they tried to register the work with a closed TaskProgress
instance. There was no guard against this and it ended up causing a
RejectedExecutionException.
This commit is contained in:
Ethan Atkins 2020-09-11 10:52:22 -07:00
parent 96b4f7b8e6
commit 4a2bf67eb8
2 changed files with 48 additions and 32 deletions

View File

@ -152,7 +152,7 @@ object MainLoop {
state.get(Keys.superShellSleep.key).getOrElse(SysProp.supershellSleep.millis)
val superShellThreshold =
state.get(Keys.superShellThreshold.key).getOrElse(SysProp.supershellThreshold)
val taskProgress = new TaskProgress(superShellSleep, superShellThreshold)
val taskProgress = new TaskProgress(superShellSleep, superShellThreshold, state.log)
val gcMonitor = if (SysProp.gcMonitor) Some(new sbt.internal.GCMonitor(state.log)) else None
try {
ErrorHandling.wideConvert {

View File

@ -16,12 +16,16 @@ import sbt.internal.util._
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import java.util.concurrent.{ ConcurrentHashMap, Executors, TimeoutException }
import sbt.util.Logger
/**
* implements task progress display on the shell.
*/
private[sbt] class TaskProgress(sleepDuration: FiniteDuration, threshold: FiniteDuration)
extends AbstractTaskExecuteProgress
private[sbt] class TaskProgress(
sleepDuration: FiniteDuration,
threshold: FiniteDuration,
logger: Logger
) extends AbstractTaskExecuteProgress
with ExecuteProgress[Task]
with AutoCloseable {
private[this] val lastTaskCount = new AtomicInteger(0)
@ -31,25 +35,30 @@ private[sbt] class TaskProgress(sleepDuration: FiniteDuration, threshold: Finite
private[this] val scheduler =
Executors.newSingleThreadScheduledExecutor(r => new Thread(r, "sbt-progress-report-scheduler"))
private[this] val pending = new java.util.Vector[java.util.concurrent.Future[_]]
private def schedule[R](duration: FiniteDuration, recurring: Boolean)(f: => R): AutoCloseable = {
val cancelled = new AtomicBoolean(false)
val runnable: Runnable = () => {
if (!cancelled.get) {
try Util.ignoreResult(f)
catch { case _: InterruptedException => }
private[this] val closed = new AtomicBoolean(false)
private def schedule[R](duration: FiniteDuration, recurring: Boolean)(f: => R): AutoCloseable =
if (!closed.get) {
val cancelled = new AtomicBoolean(false)
val runnable: Runnable = () => {
if (!cancelled.get) {
try Util.ignoreResult(f)
catch { case _: InterruptedException => }
}
}
val delay = duration.toMillis
val future =
if (recurring) scheduler.schedule(runnable, delay, TimeUnit.MILLISECONDS)
else scheduler.scheduleAtFixedRate(runnable, delay, delay, TimeUnit.MILLISECONDS)
pending.add(future)
() => Util.ignoreResult(future.cancel(true))
} else {
logger.debug("tried to call schedule on closed TaskProgress")
() => ()
}
val delay = duration.toMillis
val future =
if (recurring) scheduler.schedule(runnable, delay, TimeUnit.MILLISECONDS)
else scheduler.scheduleAtFixedRate(runnable, delay, delay, TimeUnit.MILLISECONDS)
pending.add(future)
() => Util.ignoreResult(future.cancel(true))
}
private[this] val executor =
Executors.newSingleThreadExecutor(r => new Thread(r, "sbt-task-progress-report-thread"))
override def close(): Unit = {
Option(reportLoop.get).foreach(_.close())
override def close(): Unit = if (closed.compareAndSet(false, true)) {
Option(reportLoop.getAndSet(null)).foreach(_.close())
pending.forEach(f => Util.ignoreResult(f.cancel(true)))
pending.clear()
scheduler.shutdownNow()
@ -71,23 +80,30 @@ private[sbt] class TaskProgress(sleepDuration: FiniteDuration, threshold: Finite
}
Util.ignoreResult(pending.add(executor.submit(runnable)))
}
override def beforeWork(task: Task[_]): Unit = {
super.beforeWork(task)
reportLoop.get match {
case null =>
val loop = schedule(sleepDuration, recurring = true)(doReport())
reportLoop.getAndSet(loop) match {
case null =>
case l =>
reportLoop.set(l)
loop.close()
}
case s =>
override def beforeWork(task: Task[_]): Unit =
if (!closed.get) {
super.beforeWork(task)
reportLoop.get match {
case null =>
val loop = schedule(sleepDuration, recurring = true)(doReport())
reportLoop.getAndSet(loop) match {
case null =>
case l =>
reportLoop.set(l)
loop.close()
}
case s =>
}
} else {
logger.debug(s"called beforeWork for ${taskName(task)} after task progress was closed")
}
}
override def afterReady(task: Task[_]): Unit =
Util.ignoreResult(active.put(task, schedule(threshold, recurring = false)(doReport())))
if (!closed.get) {
Util.ignoreResult(active.put(task, schedule(threshold, recurring = false)(doReport())))
} else {
logger.debug(s"called afterReady for ${taskName(task)} after task progress was closed")
}
override def stop(): Unit = {}
override def afterCompleted[A](task: Task[A], result: Result[A]): Unit =