diff --git a/main/src/main/scala/sbt/MainLoop.scala b/main/src/main/scala/sbt/MainLoop.scala index 1adf3e0d2..1fa2f686d 100644 --- a/main/src/main/scala/sbt/MainLoop.scala +++ b/main/src/main/scala/sbt/MainLoop.scala @@ -152,7 +152,7 @@ object MainLoop { state.get(Keys.superShellSleep.key).getOrElse(SysProp.supershellSleep.millis) val superShellThreshold = state.get(Keys.superShellThreshold.key).getOrElse(SysProp.supershellThreshold) - val taskProgress = new TaskProgress(superShellSleep, superShellThreshold) + val taskProgress = new TaskProgress(superShellSleep, superShellThreshold, state.log) val gcMonitor = if (SysProp.gcMonitor) Some(new sbt.internal.GCMonitor(state.log)) else None try { ErrorHandling.wideConvert { diff --git a/main/src/main/scala/sbt/internal/TaskProgress.scala b/main/src/main/scala/sbt/internal/TaskProgress.scala index 90ee91eba..cc741bb5a 100644 --- a/main/src/main/scala/sbt/internal/TaskProgress.scala +++ b/main/src/main/scala/sbt/internal/TaskProgress.scala @@ -9,19 +9,23 @@ package sbt package internal import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicReference } -import java.util.concurrent.TimeUnit +import java.util.concurrent.{ RejectedExecutionException, TimeUnit } import sbt.internal.util._ import scala.collection.JavaConverters._ import scala.concurrent.duration._ import java.util.concurrent.{ ConcurrentHashMap, Executors, TimeoutException } +import sbt.util.Logger /** * implements task progress display on the shell. */ -private[sbt] class TaskProgress(sleepDuration: FiniteDuration, threshold: FiniteDuration) - extends AbstractTaskExecuteProgress +private[sbt] class TaskProgress( + sleepDuration: FiniteDuration, + threshold: FiniteDuration, + logger: Logger +) extends AbstractTaskExecuteProgress with ExecuteProgress[Task] with AutoCloseable { private[this] val lastTaskCount = new AtomicInteger(0) @@ -31,25 +35,36 @@ private[sbt] class TaskProgress(sleepDuration: FiniteDuration, threshold: Finite private[this] val scheduler = Executors.newSingleThreadScheduledExecutor(r => new Thread(r, "sbt-progress-report-scheduler")) private[this] val pending = new java.util.Vector[java.util.concurrent.Future[_]] - private def schedule[R](duration: FiniteDuration, recurring: Boolean)(f: => R): AutoCloseable = { - val cancelled = new AtomicBoolean(false) - val runnable: Runnable = () => { - if (!cancelled.get) { - try Util.ignoreResult(f) - catch { case _: InterruptedException => } + private[this] val closed = new AtomicBoolean(false) + private def schedule[R](duration: FiniteDuration, recurring: Boolean)(f: => R): AutoCloseable = + if (!closed.get) { + val cancelled = new AtomicBoolean(false) + val runnable: Runnable = () => { + if (!cancelled.get) { + try Util.ignoreResult(f) + catch { case _: InterruptedException => } + } } + val delay = duration.toMillis + try { + val future = + if (recurring) scheduler.schedule(runnable, delay, TimeUnit.MILLISECONDS) + else scheduler.scheduleAtFixedRate(runnable, delay, delay, TimeUnit.MILLISECONDS) + pending.add(future) + () => Util.ignoreResult(future.cancel(true)) + } catch { + case e: RejectedExecutionException => + logger.trace(e) + () => () + } + } else { + logger.debug("tried to call schedule on closed TaskProgress") + () => () } - val delay = duration.toMillis - val future = - if (recurring) scheduler.schedule(runnable, delay, TimeUnit.MILLISECONDS) - else scheduler.scheduleAtFixedRate(runnable, delay, delay, TimeUnit.MILLISECONDS) - pending.add(future) - () => Util.ignoreResult(future.cancel(true)) - } private[this] val executor = Executors.newSingleThreadExecutor(r => new Thread(r, "sbt-task-progress-report-thread")) - override def close(): Unit = { - Option(reportLoop.get).foreach(_.close()) + override def close(): Unit = if (closed.compareAndSet(false, true)) { + Option(reportLoop.getAndSet(null)).foreach(_.close()) pending.forEach(f => Util.ignoreResult(f.cancel(true))) pending.clear() scheduler.shutdownNow() @@ -71,23 +86,30 @@ private[sbt] class TaskProgress(sleepDuration: FiniteDuration, threshold: Finite } Util.ignoreResult(pending.add(executor.submit(runnable))) } - override def beforeWork(task: Task[_]): Unit = { - super.beforeWork(task) - reportLoop.get match { - case null => - val loop = schedule(sleepDuration, recurring = true)(doReport()) - reportLoop.getAndSet(loop) match { - case null => - case l => - reportLoop.set(l) - loop.close() - } - case s => + override def beforeWork(task: Task[_]): Unit = + if (!closed.get) { + super.beforeWork(task) + reportLoop.get match { + case null => + val loop = schedule(sleepDuration, recurring = true)(doReport()) + reportLoop.getAndSet(loop) match { + case null => + case l => + reportLoop.set(l) + loop.close() + } + case s => + } + } else { + logger.debug(s"called beforeWork for ${taskName(task)} after task progress was closed") } - } override def afterReady(task: Task[_]): Unit = - Util.ignoreResult(active.put(task, schedule(threshold, recurring = false)(doReport()))) + if (!closed.get) { + Util.ignoreResult(active.put(task, schedule(threshold, recurring = false)(doReport()))) + } else { + logger.debug(s"called afterReady for ${taskName(task)} after task progress was closed") + } override def stop(): Unit = {} override def afterCompleted[A](task: Task[A], result: Result[A]): Unit =