From 59c3d73f837ae14660379f243a558675921a3a35 Mon Sep 17 00:00:00 2001 From: Ethan Atkins Date: Thu, 13 Aug 2020 17:25:36 -0700 Subject: [PATCH] Ensure watch threads are joined While dogfooding the latest sbt, I noticed that sometimes the watch input threads leak. I suspect this happens when a build is immediately triggered by a file that was modified during compilation. Though I didn't fully verify this, it's likely that we interrupted the input reading thread before it actually started reading. When it started reading after the interrupt, it would block until the user entered another input character. The result was that the zombie thread would effectively steal the next character from the input stream which manifested as the first character being ignored when the user tried to enter a watch input option. If more than one thread leaked, then it may take a number of keystrokes before the user regained control. To fix this, we can ensure that all watch related threads are joined before we exit watch. To avoid completely blocking the ui, we only try to join the threads for a second and print an error if the join fails. This shouldn't be the case so if users see this error, then we need to fix the bug. --- .../main/scala/sbt/internal/Continuous.scala | 45 +++++++++++++------ 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/main/src/main/scala/sbt/internal/Continuous.scala b/main/src/main/scala/sbt/internal/Continuous.scala index 351490f80..be0c1cea1 100644 --- a/main/src/main/scala/sbt/internal/Continuous.scala +++ b/main/src/main/scala/sbt/internal/Continuous.scala @@ -648,30 +648,49 @@ private[sbt] object Continuous extends DeprecatedContinuous { private[this] class WatchExecutor(name: String) extends AutoCloseable { val id = new AtomicInteger(0) val threads = new java.util.Vector[Thread] + val closed = new AtomicBoolean(false) def submit[R](tag: String, f: => R): WatchExecutor.Future[R] = { - val queue = new LinkedBlockingQueue[Either[Unit, R]] - val runnable: Runnable = () => { - try Util.ignoreResult(queue.offer(Right(f))) - catch { case _: Exception => Util.ignoreResult(queue.offer(Left(()))) } + if (closed.get) new WatchExecutor.FutureFailed(new IllegalStateException("closed executor")) + else { + val queue = new LinkedBlockingQueue[Either[Unit, R]] + val runnable: Runnable = () => { + try Util.ignoreResult(queue.offer(if (closed.get) Left(()) else Right(f))) + catch { case _: Exception => Util.ignoreResult(queue.offer(Left(()))) } + } + val thread = new Thread(runnable, s"sbt-watch-$name-$tag") + thread.setDaemon(true) + thread.start() + threads.add(thread) + new WatchExecutor.FutureImpl(thread, queue, this) } - val thread = new Thread(runnable, s"sbt-watch-$name-$tag") - thread.setDaemon(true) - thread.start() - threads.add(thread) - new WatchExecutor.Future(thread, queue, this) } def removeThread(thread: Thread): Unit = Util.ignoreResult(threads.remove(thread)) - def close(): Unit = { - threads.forEach(_.interrupt) + def close(): Unit = if (closed.compareAndSet(false, true)) { + threads.forEach { t => + val deadline = 1.second.fromNow + while (t.isAlive && !deadline.isOverdue) { + t.interrupt() + t.join(10) + } + if (t.isAlive) System.err.println(s"Couldn't join watch thread $t") + } threads.clear() } } private object WatchExecutor { - final class Future[R]( + sealed trait Future[R] extends Any { + def cancel(): Unit + def result: Try[R] + } + final class FutureFailed[R](t: Throwable) extends Future[R] { + def cancel(): Unit = {} + def result: Try[R] = Failure(t) + } + final class FutureImpl[R]( thread: Thread, queue: LinkedBlockingQueue[Either[Unit, R]], executor: WatchExecutor - ) { + ) extends Future[R] { def cancel(): Unit = { thread.interrupt() executor.removeThread(thread)