diff --git a/main/src/main/scala/sbt/internal/Continuous.scala b/main/src/main/scala/sbt/internal/Continuous.scala index c97cbfe16..ed79d5b10 100644 --- a/main/src/main/scala/sbt/internal/Continuous.scala +++ b/main/src/main/scala/sbt/internal/Continuous.scala @@ -10,7 +10,13 @@ package internal import java.io.{ ByteArrayInputStream, IOException, InputStream, File => _ } import java.nio.file.Path -import java.util.concurrent.{ ConcurrentHashMap, LinkedBlockingQueue } +import java.util.concurrent.{ + ConcurrentHashMap, + CountDownLatch, + LinkedBlockingQueue, + TimeUnit, + TimeoutException +} import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } import sbt.BasicCommandStrings._ @@ -654,15 +660,18 @@ private[sbt] object Continuous extends DeprecatedContinuous { if (closed.get) new WatchExecutor.FutureFailed(new IllegalStateException("closed executor")) else { val queue = new LinkedBlockingQueue[Either[Unit, R]] + val latch = new CountDownLatch(1) val runnable: Runnable = () => { - try Util.ignoreResult(queue.offer(if (closed.get) Left(()) else Right(f))) - catch { case _: Exception => Util.ignoreResult(queue.offer(Left(()))) } + try { + latch.countDown() + Util.ignoreResult(queue.offer(if (closed.get) Left(()) else Right(f))) + } catch { case _: Exception => Util.ignoreResult(queue.offer(Left(()))) } } val thread = new Thread(runnable, s"sbt-watch-$name-$tag") thread.setDaemon(true) thread.start() threads.add(thread) - new WatchExecutor.FutureImpl(thread, queue, this) + new WatchExecutor.FutureImpl(thread, queue, this, latch) } } def removeThread(thread: Thread): Unit = Util.ignoreResult(threads.remove(thread)) @@ -678,18 +687,23 @@ private[sbt] object Continuous extends DeprecatedContinuous { } private object WatchExecutor { sealed trait Future[R] extends Any { + def waitUntilStart(duration: FiniteDuration): Boolean def cancel(): Unit def result: Try[R] } final class FutureFailed[R](t: Throwable) extends Future[R] { + override def waitUntilStart(duration: FiniteDuration): Boolean = false def cancel(): Unit = {} def result: Try[R] = Failure(t) } final class FutureImpl[R]( thread: Thread, queue: LinkedBlockingQueue[Either[Unit, R]], - executor: WatchExecutor + executor: WatchExecutor, + latch: CountDownLatch, ) extends Future[R] { + override def waitUntilStart(duration: FiniteDuration): Boolean = + latch.await(duration.toMillis, TimeUnit.MILLISECONDS) def cancel(): Unit = { executor.removeThread(thread) thread.joinFor(1.second) @@ -798,32 +812,40 @@ private[sbt] object Continuous extends DeprecatedContinuous { val events = new LinkedBlockingQueue[Either[Watch.Action, (Watch.Event, Watch.Action)]] val inputJob = - executor.submit("handle-input", nextInputAction(executor).foreach(a => events.put(Left(a)))) + executor + .submit(s"handle-input-$count", nextInputAction(executor).foreach(a => events.put(Left(a)))) val fileJob = - executor.submit("get-file-events", nextFileEvent(count).foreach(e => events.put(Right(e)))) + executor + .submit(s"get-file-events-$count", nextFileEvent(count).foreach(e => events.put(Right(e)))) try { - val (inputAction: Watch.Action, fileEvent: Option[(Watch.Event, Watch.Action)]) = - events.take() match { - case Left(a) => (a, None) - case Right(e) => (Watch.Ignore, Some(e)) + if (!inputJob.waitUntilStart(1.second) || !fileJob.waitUntilStart(1.second)) { + inputJob.cancel() + fileJob.cancel() + new Watch.HandleError(new TimeoutException) + } else { + val (inputAction: Watch.Action, fileEvent: Option[(Watch.Event, Watch.Action)]) = + events.take() match { + case Left(a) => (a, None) + case Right(e) => (Watch.Ignore, Some(e)) + } + val min: Watch.Action = (fileEvent.map(_._2).toSeq :+ inputAction).min + lazy val inputMessage = + s"Received input event: $inputAction." + + (if (inputAction != min) s" Dropping in favor of file event: $min" else "") + if (inputAction != Watch.Ignore) logger.info(inputMessage) + fileEvent + .collect { + case (event, action) if action != Watch.Ignore => + s"Received file event $action for $event." + + (if (action != min) s" Dropping in favor of input event: $min" else "") + } + .foreach(logger.debug(_)) + min match { + case ShowOptions => + rawLogger.info(options) + Watch.Ignore + case m => m } - val min: Watch.Action = (fileEvent.map(_._2).toSeq :+ inputAction).min - lazy val inputMessage = - s"Received input event: $inputAction." + - (if (inputAction != min) s" Dropping in favor of file event: $min" else "") - if (inputAction != Watch.Ignore) logger.info(inputMessage) - fileEvent - .collect { - case (event, action) if action != Watch.Ignore => - s"Received file event $action for $event." + - (if (action != min) s" Dropping in favor of input event: $min" else "") - } - .foreach(logger.debug(_)) - min match { - case ShowOptions => - rawLogger.info(options) - Watch.Ignore - case m => m } } finally { inputJob.cancel()