From 4a2bf67eb81abc717b65c83183c777a493e35871 Mon Sep 17 00:00:00 2001 From: Ethan Atkins Date: Fri, 11 Sep 2020 10:52:22 -0700 Subject: [PATCH 1/2] Skip task progress work if already shutdown The play plugin seems to do out of band task evaluation on a stale State object in the `run` task. As a result, when sbt tries to schedule tasks to run, they tried to register the work with a closed TaskProgress instance. There was no guard against this and it ended up causing a RejectedExecutionException. --- main/src/main/scala/sbt/MainLoop.scala | 2 +- .../scala/sbt/internal/TaskProgress.scala | 78 +++++++++++-------- 2 files changed, 48 insertions(+), 32 deletions(-) diff --git a/main/src/main/scala/sbt/MainLoop.scala b/main/src/main/scala/sbt/MainLoop.scala index 1adf3e0d2..1fa2f686d 100644 --- a/main/src/main/scala/sbt/MainLoop.scala +++ b/main/src/main/scala/sbt/MainLoop.scala @@ -152,7 +152,7 @@ object MainLoop { state.get(Keys.superShellSleep.key).getOrElse(SysProp.supershellSleep.millis) val superShellThreshold = state.get(Keys.superShellThreshold.key).getOrElse(SysProp.supershellThreshold) - val taskProgress = new TaskProgress(superShellSleep, superShellThreshold) + val taskProgress = new TaskProgress(superShellSleep, superShellThreshold, state.log) val gcMonitor = if (SysProp.gcMonitor) Some(new sbt.internal.GCMonitor(state.log)) else None try { ErrorHandling.wideConvert { diff --git a/main/src/main/scala/sbt/internal/TaskProgress.scala b/main/src/main/scala/sbt/internal/TaskProgress.scala index 90ee91eba..f05e15ce4 100644 --- a/main/src/main/scala/sbt/internal/TaskProgress.scala +++ b/main/src/main/scala/sbt/internal/TaskProgress.scala @@ -16,12 +16,16 @@ import sbt.internal.util._ import scala.collection.JavaConverters._ import scala.concurrent.duration._ import java.util.concurrent.{ ConcurrentHashMap, Executors, TimeoutException } +import sbt.util.Logger /** * implements task progress display on the shell. */ -private[sbt] class TaskProgress(sleepDuration: FiniteDuration, threshold: FiniteDuration) - extends AbstractTaskExecuteProgress +private[sbt] class TaskProgress( + sleepDuration: FiniteDuration, + threshold: FiniteDuration, + logger: Logger +) extends AbstractTaskExecuteProgress with ExecuteProgress[Task] with AutoCloseable { private[this] val lastTaskCount = new AtomicInteger(0) @@ -31,25 +35,30 @@ private[sbt] class TaskProgress(sleepDuration: FiniteDuration, threshold: Finite private[this] val scheduler = Executors.newSingleThreadScheduledExecutor(r => new Thread(r, "sbt-progress-report-scheduler")) private[this] val pending = new java.util.Vector[java.util.concurrent.Future[_]] - private def schedule[R](duration: FiniteDuration, recurring: Boolean)(f: => R): AutoCloseable = { - val cancelled = new AtomicBoolean(false) - val runnable: Runnable = () => { - if (!cancelled.get) { - try Util.ignoreResult(f) - catch { case _: InterruptedException => } + private[this] val closed = new AtomicBoolean(false) + private def schedule[R](duration: FiniteDuration, recurring: Boolean)(f: => R): AutoCloseable = + if (!closed.get) { + val cancelled = new AtomicBoolean(false) + val runnable: Runnable = () => { + if (!cancelled.get) { + try Util.ignoreResult(f) + catch { case _: InterruptedException => } + } } + val delay = duration.toMillis + val future = + if (recurring) scheduler.schedule(runnable, delay, TimeUnit.MILLISECONDS) + else scheduler.scheduleAtFixedRate(runnable, delay, delay, TimeUnit.MILLISECONDS) + pending.add(future) + () => Util.ignoreResult(future.cancel(true)) + } else { + logger.debug("tried to call schedule on closed TaskProgress") + () => () } - val delay = duration.toMillis - val future = - if (recurring) scheduler.schedule(runnable, delay, TimeUnit.MILLISECONDS) - else scheduler.scheduleAtFixedRate(runnable, delay, delay, TimeUnit.MILLISECONDS) - pending.add(future) - () => Util.ignoreResult(future.cancel(true)) - } private[this] val executor = Executors.newSingleThreadExecutor(r => new Thread(r, "sbt-task-progress-report-thread")) - override def close(): Unit = { - Option(reportLoop.get).foreach(_.close()) + override def close(): Unit = if (closed.compareAndSet(false, true)) { + Option(reportLoop.getAndSet(null)).foreach(_.close()) pending.forEach(f => Util.ignoreResult(f.cancel(true))) pending.clear() scheduler.shutdownNow() @@ -71,23 +80,30 @@ private[sbt] class TaskProgress(sleepDuration: FiniteDuration, threshold: Finite } Util.ignoreResult(pending.add(executor.submit(runnable))) } - override def beforeWork(task: Task[_]): Unit = { - super.beforeWork(task) - reportLoop.get match { - case null => - val loop = schedule(sleepDuration, recurring = true)(doReport()) - reportLoop.getAndSet(loop) match { - case null => - case l => - reportLoop.set(l) - loop.close() - } - case s => + override def beforeWork(task: Task[_]): Unit = + if (!closed.get) { + super.beforeWork(task) + reportLoop.get match { + case null => + val loop = schedule(sleepDuration, recurring = true)(doReport()) + reportLoop.getAndSet(loop) match { + case null => + case l => + reportLoop.set(l) + loop.close() + } + case s => + } + } else { + logger.debug(s"called beforeWork for ${taskName(task)} after task progress was closed") } - } override def afterReady(task: Task[_]): Unit = - Util.ignoreResult(active.put(task, schedule(threshold, recurring = false)(doReport()))) + if (!closed.get) { + Util.ignoreResult(active.put(task, schedule(threshold, recurring = false)(doReport()))) + } else { + logger.debug(s"called afterReady for ${taskName(task)} after task progress was closed") + } override def stop(): Unit = {} override def afterCompleted[A](task: Task[A], result: Result[A]): Unit = From 5e88d4b2336c7bf1d0aca34c1d15dc94a224d098 Mon Sep 17 00:00:00 2001 From: Ethan Atkins Date: Fri, 11 Sep 2020 11:39:52 -0700 Subject: [PATCH 2/2] Warn on progress RejectedExecutionException It shouldn't be the case that a RejectedExecutionException is thrown by TaskProgress. If that assumption is violated, log the exception but don't crash sbt. --- .../main/scala/sbt/internal/TaskProgress.scala | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/main/src/main/scala/sbt/internal/TaskProgress.scala b/main/src/main/scala/sbt/internal/TaskProgress.scala index f05e15ce4..cc741bb5a 100644 --- a/main/src/main/scala/sbt/internal/TaskProgress.scala +++ b/main/src/main/scala/sbt/internal/TaskProgress.scala @@ -9,7 +9,7 @@ package sbt package internal import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicReference } -import java.util.concurrent.TimeUnit +import java.util.concurrent.{ RejectedExecutionException, TimeUnit } import sbt.internal.util._ @@ -46,11 +46,17 @@ private[sbt] class TaskProgress( } } val delay = duration.toMillis - val future = - if (recurring) scheduler.schedule(runnable, delay, TimeUnit.MILLISECONDS) - else scheduler.scheduleAtFixedRate(runnable, delay, delay, TimeUnit.MILLISECONDS) - pending.add(future) - () => Util.ignoreResult(future.cancel(true)) + try { + val future = + if (recurring) scheduler.schedule(runnable, delay, TimeUnit.MILLISECONDS) + else scheduler.scheduleAtFixedRate(runnable, delay, delay, TimeUnit.MILLISECONDS) + pending.add(future) + () => Util.ignoreResult(future.cancel(true)) + } catch { + case e: RejectedExecutionException => + logger.trace(e) + () => () + } } else { logger.debug("tried to call schedule on closed TaskProgress") () => ()