diff --git a/main/src/main/scala/sbt/internal/Continuous.scala b/main/src/main/scala/sbt/internal/Continuous.scala index 351490f80..be0c1cea1 100644 --- a/main/src/main/scala/sbt/internal/Continuous.scala +++ b/main/src/main/scala/sbt/internal/Continuous.scala @@ -648,30 +648,49 @@ private[sbt] object Continuous extends DeprecatedContinuous { private[this] class WatchExecutor(name: String) extends AutoCloseable { val id = new AtomicInteger(0) val threads = new java.util.Vector[Thread] + val closed = new AtomicBoolean(false) def submit[R](tag: String, f: => R): WatchExecutor.Future[R] = { - val queue = new LinkedBlockingQueue[Either[Unit, R]] - val runnable: Runnable = () => { - try Util.ignoreResult(queue.offer(Right(f))) - catch { case _: Exception => Util.ignoreResult(queue.offer(Left(()))) } + if (closed.get) new WatchExecutor.FutureFailed(new IllegalStateException("closed executor")) + else { + val queue = new LinkedBlockingQueue[Either[Unit, R]] + val runnable: Runnable = () => { + try Util.ignoreResult(queue.offer(if (closed.get) Left(()) else Right(f))) + catch { case _: Exception => Util.ignoreResult(queue.offer(Left(()))) } + } + val thread = new Thread(runnable, s"sbt-watch-$name-$tag") + thread.setDaemon(true) + thread.start() + threads.add(thread) + new WatchExecutor.FutureImpl(thread, queue, this) } - val thread = new Thread(runnable, s"sbt-watch-$name-$tag") - thread.setDaemon(true) - thread.start() - threads.add(thread) - new WatchExecutor.Future(thread, queue, this) } def removeThread(thread: Thread): Unit = Util.ignoreResult(threads.remove(thread)) - def close(): Unit = { - threads.forEach(_.interrupt) + def close(): Unit = if (closed.compareAndSet(false, true)) { + threads.forEach { t => + val deadline = 1.second.fromNow + while (t.isAlive && !deadline.isOverdue) { + t.interrupt() + t.join(10) + } + if (t.isAlive) System.err.println(s"Couldn't join watch thread $t") + } threads.clear() } } private object WatchExecutor { - final class Future[R]( + sealed trait Future[R] extends Any { + def cancel(): Unit + def result: Try[R] + } + final class FutureFailed[R](t: Throwable) extends Future[R] { + def cancel(): Unit = {} + def result: Try[R] = Failure(t) + } + final class FutureImpl[R]( thread: Thread, queue: LinkedBlockingQueue[Either[Unit, R]], executor: WatchExecutor - ) { + ) extends Future[R] { def cancel(): Unit = { thread.interrupt() executor.removeThread(thread)