Ensure watch threads are joined

While dogfooding the latest sbt, I noticed that sometimes the watch
input threads leak. I suspect this happens when a build is immediately
triggered by a file that was modified during compilation. Though I
didn't fully verify this, it's likely that we interrupted the input
reading thread before it actually started reading. When it started
reading after the interrupt, it would block until the user entered
another input character. The result was that the zombie thread would
effectively steal the next character from the input stream which
manifested as the first character being ignored when the user tried to
enter a watch input option. If more than one thread leaked, then it may
take a number of keystrokes before the user regained control.

To fix this, we can ensure that all watch related threads are joined
before we exit watch. To avoid completely blocking the ui, we only try
to join the threads for a second and print an error if the join fails.
This shouldn't be the case so if users see this error, then we need to
fix the bug.
This commit is contained in:
Ethan Atkins 2020-08-13 17:25:36 -07:00
parent ef34a33ac9
commit 59c3d73f83
1 changed files with 32 additions and 13 deletions

View File

@ -648,30 +648,49 @@ private[sbt] object Continuous extends DeprecatedContinuous {
private[this] class WatchExecutor(name: String) extends AutoCloseable {
val id = new AtomicInteger(0)
val threads = new java.util.Vector[Thread]
val closed = new AtomicBoolean(false)
def submit[R](tag: String, f: => R): WatchExecutor.Future[R] = {
val queue = new LinkedBlockingQueue[Either[Unit, R]]
val runnable: Runnable = () => {
try Util.ignoreResult(queue.offer(Right(f)))
catch { case _: Exception => Util.ignoreResult(queue.offer(Left(()))) }
if (closed.get) new WatchExecutor.FutureFailed(new IllegalStateException("closed executor"))
else {
val queue = new LinkedBlockingQueue[Either[Unit, R]]
val runnable: Runnable = () => {
try Util.ignoreResult(queue.offer(if (closed.get) Left(()) else Right(f)))
catch { case _: Exception => Util.ignoreResult(queue.offer(Left(()))) }
}
val thread = new Thread(runnable, s"sbt-watch-$name-$tag")
thread.setDaemon(true)
thread.start()
threads.add(thread)
new WatchExecutor.FutureImpl(thread, queue, this)
}
val thread = new Thread(runnable, s"sbt-watch-$name-$tag")
thread.setDaemon(true)
thread.start()
threads.add(thread)
new WatchExecutor.Future(thread, queue, this)
}
def removeThread(thread: Thread): Unit = Util.ignoreResult(threads.remove(thread))
def close(): Unit = {
threads.forEach(_.interrupt)
def close(): Unit = if (closed.compareAndSet(false, true)) {
threads.forEach { t =>
val deadline = 1.second.fromNow
while (t.isAlive && !deadline.isOverdue) {
t.interrupt()
t.join(10)
}
if (t.isAlive) System.err.println(s"Couldn't join watch thread $t")
}
threads.clear()
}
}
private object WatchExecutor {
final class Future[R](
sealed trait Future[R] extends Any {
def cancel(): Unit
def result: Try[R]
}
final class FutureFailed[R](t: Throwable) extends Future[R] {
def cancel(): Unit = {}
def result: Try[R] = Failure(t)
}
final class FutureImpl[R](
thread: Thread,
queue: LinkedBlockingQueue[Either[Unit, R]],
executor: WatchExecutor
) {
) extends Future[R] {
def cancel(): Unit = {
thread.interrupt()
executor.removeThread(thread)