Merge pull request #5840 from eatkins/play-progress

Skip task progress work if already shutdown
This commit is contained in:
eugene yokota 2020-09-12 08:27:42 -04:00 committed by GitHub
commit 8160035f41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 55 additions and 33 deletions

View File

@ -152,7 +152,7 @@ object MainLoop {
state.get(Keys.superShellSleep.key).getOrElse(SysProp.supershellSleep.millis)
val superShellThreshold =
state.get(Keys.superShellThreshold.key).getOrElse(SysProp.supershellThreshold)
val taskProgress = new TaskProgress(superShellSleep, superShellThreshold)
val taskProgress = new TaskProgress(superShellSleep, superShellThreshold, state.log)
val gcMonitor = if (SysProp.gcMonitor) Some(new sbt.internal.GCMonitor(state.log)) else None
try {
ErrorHandling.wideConvert {

View File

@ -9,19 +9,23 @@ package sbt
package internal
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicReference }
import java.util.concurrent.TimeUnit
import java.util.concurrent.{ RejectedExecutionException, TimeUnit }
import sbt.internal.util._
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import java.util.concurrent.{ ConcurrentHashMap, Executors, TimeoutException }
import sbt.util.Logger
/**
* implements task progress display on the shell.
*/
private[sbt] class TaskProgress(sleepDuration: FiniteDuration, threshold: FiniteDuration)
extends AbstractTaskExecuteProgress
private[sbt] class TaskProgress(
sleepDuration: FiniteDuration,
threshold: FiniteDuration,
logger: Logger
) extends AbstractTaskExecuteProgress
with ExecuteProgress[Task]
with AutoCloseable {
private[this] val lastTaskCount = new AtomicInteger(0)
@ -31,25 +35,36 @@ private[sbt] class TaskProgress(sleepDuration: FiniteDuration, threshold: Finite
private[this] val scheduler =
Executors.newSingleThreadScheduledExecutor(r => new Thread(r, "sbt-progress-report-scheduler"))
private[this] val pending = new java.util.Vector[java.util.concurrent.Future[_]]
private def schedule[R](duration: FiniteDuration, recurring: Boolean)(f: => R): AutoCloseable = {
val cancelled = new AtomicBoolean(false)
val runnable: Runnable = () => {
if (!cancelled.get) {
try Util.ignoreResult(f)
catch { case _: InterruptedException => }
private[this] val closed = new AtomicBoolean(false)
private def schedule[R](duration: FiniteDuration, recurring: Boolean)(f: => R): AutoCloseable =
if (!closed.get) {
val cancelled = new AtomicBoolean(false)
val runnable: Runnable = () => {
if (!cancelled.get) {
try Util.ignoreResult(f)
catch { case _: InterruptedException => }
}
}
val delay = duration.toMillis
try {
val future =
if (recurring) scheduler.schedule(runnable, delay, TimeUnit.MILLISECONDS)
else scheduler.scheduleAtFixedRate(runnable, delay, delay, TimeUnit.MILLISECONDS)
pending.add(future)
() => Util.ignoreResult(future.cancel(true))
} catch {
case e: RejectedExecutionException =>
logger.trace(e)
() => ()
}
} else {
logger.debug("tried to call schedule on closed TaskProgress")
() => ()
}
val delay = duration.toMillis
val future =
if (recurring) scheduler.schedule(runnable, delay, TimeUnit.MILLISECONDS)
else scheduler.scheduleAtFixedRate(runnable, delay, delay, TimeUnit.MILLISECONDS)
pending.add(future)
() => Util.ignoreResult(future.cancel(true))
}
private[this] val executor =
Executors.newSingleThreadExecutor(r => new Thread(r, "sbt-task-progress-report-thread"))
override def close(): Unit = {
Option(reportLoop.get).foreach(_.close())
override def close(): Unit = if (closed.compareAndSet(false, true)) {
Option(reportLoop.getAndSet(null)).foreach(_.close())
pending.forEach(f => Util.ignoreResult(f.cancel(true)))
pending.clear()
scheduler.shutdownNow()
@ -71,23 +86,30 @@ private[sbt] class TaskProgress(sleepDuration: FiniteDuration, threshold: Finite
}
Util.ignoreResult(pending.add(executor.submit(runnable)))
}
override def beforeWork(task: Task[_]): Unit = {
super.beforeWork(task)
reportLoop.get match {
case null =>
val loop = schedule(sleepDuration, recurring = true)(doReport())
reportLoop.getAndSet(loop) match {
case null =>
case l =>
reportLoop.set(l)
loop.close()
}
case s =>
override def beforeWork(task: Task[_]): Unit =
if (!closed.get) {
super.beforeWork(task)
reportLoop.get match {
case null =>
val loop = schedule(sleepDuration, recurring = true)(doReport())
reportLoop.getAndSet(loop) match {
case null =>
case l =>
reportLoop.set(l)
loop.close()
}
case s =>
}
} else {
logger.debug(s"called beforeWork for ${taskName(task)} after task progress was closed")
}
}
override def afterReady(task: Task[_]): Unit =
Util.ignoreResult(active.put(task, schedule(threshold, recurring = false)(doReport())))
if (!closed.get) {
Util.ignoreResult(active.put(task, schedule(threshold, recurring = false)(doReport())))
} else {
logger.debug(s"called afterReady for ${taskName(task)} after task progress was closed")
}
override def stop(): Unit = {}
override def afterCompleted[A](task: Task[A], result: Result[A]): Unit =