Merge pull request #5747 from eatkins/watch-threads

Ensure watch threads are joined
This commit is contained in:
eugene yokota 2020-08-14 05:09:37 -04:00 committed by GitHub
commit ca660f25da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 32 additions and 13 deletions

View File

@ -648,30 +648,49 @@ private[sbt] object Continuous extends DeprecatedContinuous {
private[this] class WatchExecutor(name: String) extends AutoCloseable {
val id = new AtomicInteger(0)
val threads = new java.util.Vector[Thread]
val closed = new AtomicBoolean(false)
def submit[R](tag: String, f: => R): WatchExecutor.Future[R] = {
val queue = new LinkedBlockingQueue[Either[Unit, R]]
val runnable: Runnable = () => {
try Util.ignoreResult(queue.offer(Right(f)))
catch { case _: Exception => Util.ignoreResult(queue.offer(Left(()))) }
if (closed.get) new WatchExecutor.FutureFailed(new IllegalStateException("closed executor"))
else {
val queue = new LinkedBlockingQueue[Either[Unit, R]]
val runnable: Runnable = () => {
try Util.ignoreResult(queue.offer(if (closed.get) Left(()) else Right(f)))
catch { case _: Exception => Util.ignoreResult(queue.offer(Left(()))) }
}
val thread = new Thread(runnable, s"sbt-watch-$name-$tag")
thread.setDaemon(true)
thread.start()
threads.add(thread)
new WatchExecutor.FutureImpl(thread, queue, this)
}
val thread = new Thread(runnable, s"sbt-watch-$name-$tag")
thread.setDaemon(true)
thread.start()
threads.add(thread)
new WatchExecutor.Future(thread, queue, this)
}
def removeThread(thread: Thread): Unit = Util.ignoreResult(threads.remove(thread))
def close(): Unit = {
threads.forEach(_.interrupt)
def close(): Unit = if (closed.compareAndSet(false, true)) {
threads.forEach { t =>
val deadline = 1.second.fromNow
while (t.isAlive && !deadline.isOverdue) {
t.interrupt()
t.join(10)
}
if (t.isAlive) System.err.println(s"Couldn't join watch thread $t")
}
threads.clear()
}
}
private object WatchExecutor {
final class Future[R](
sealed trait Future[R] extends Any {
def cancel(): Unit
def result: Try[R]
}
final class FutureFailed[R](t: Throwable) extends Future[R] {
def cancel(): Unit = {}
def result: Try[R] = Failure(t)
}
final class FutureImpl[R](
thread: Thread,
queue: LinkedBlockingQueue[Either[Unit, R]],
executor: WatchExecutor
) {
) extends Future[R] {
def cancel(): Unit = {
thread.interrupt()
executor.removeThread(thread)