diff --git a/main/src/main/scala/sbt/internal/Continuous.scala b/main/src/main/scala/sbt/internal/Continuous.scala index c97cbfe16..7aacae75f 100644 --- a/main/src/main/scala/sbt/internal/Continuous.scala +++ b/main/src/main/scala/sbt/internal/Continuous.scala @@ -10,7 +10,13 @@ package internal import java.io.{ ByteArrayInputStream, IOException, InputStream, File => _ } import java.nio.file.Path -import java.util.concurrent.{ ConcurrentHashMap, LinkedBlockingQueue } +import java.util.concurrent.{ + ConcurrentHashMap, + CountDownLatch, + LinkedBlockingQueue, + TimeUnit, + TimeoutException +} import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } import sbt.BasicCommandStrings._ @@ -326,7 +332,15 @@ private[sbt] object Continuous extends DeprecatedContinuous { ) = getFileEvents(configs, logger, state, commands, fileStampCache, channel.name) val executor = new WatchExecutor(channel.name) val nextEvent: Int => Watch.Action = - combineInputAndFileEvents(nextInputEvent, nextFileEvent, message, logger, logger, executor) + combineInputAndFileEvents( + nextInputEvent, + nextFileEvent, + message, + logger, + logger, + executor, + channel + ) val onExit = () => { cleanupFileMonitor() Util.ignoreResult(executor.close()) @@ -654,15 +668,18 @@ private[sbt] object Continuous extends DeprecatedContinuous { if (closed.get) new WatchExecutor.FutureFailed(new IllegalStateException("closed executor")) else { val queue = new LinkedBlockingQueue[Either[Unit, R]] + val latch = new CountDownLatch(1) val runnable: Runnable = () => { - try Util.ignoreResult(queue.offer(if (closed.get) Left(()) else Right(f))) - catch { case _: Exception => Util.ignoreResult(queue.offer(Left(()))) } + try { + latch.countDown() + Util.ignoreResult(queue.offer(if (closed.get) Left(()) else Right(f))) + } catch { case _: Exception => Util.ignoreResult(queue.offer(Left(()))) } } val thread = new Thread(runnable, s"sbt-watch-$name-$tag") thread.setDaemon(true) thread.start() threads.add(thread) - new WatchExecutor.FutureImpl(thread, queue, this) + new WatchExecutor.FutureImpl(thread, queue, this, latch) } } def removeThread(thread: Thread): Unit = Util.ignoreResult(threads.remove(thread)) @@ -678,18 +695,23 @@ private[sbt] object Continuous extends DeprecatedContinuous { } private object WatchExecutor { sealed trait Future[R] extends Any { + def waitUntilStart(duration: FiniteDuration): Boolean def cancel(): Unit def result: Try[R] } final class FutureFailed[R](t: Throwable) extends Future[R] { + override def waitUntilStart(duration: FiniteDuration): Boolean = false def cancel(): Unit = {} def result: Try[R] = Failure(t) } final class FutureImpl[R]( thread: Thread, queue: LinkedBlockingQueue[Either[Unit, R]], - executor: WatchExecutor + executor: WatchExecutor, + latch: CountDownLatch, ) extends Future[R] { + override def waitUntilStart(duration: FiniteDuration): Boolean = + latch.await(duration.toMillis, TimeUnit.MILLISECONDS) def cancel(): Unit = { executor.removeThread(thread) thread.joinFor(1.second) @@ -767,6 +789,7 @@ private[sbt] object Continuous extends DeprecatedContinuous { interrupted.set(false) terminal.inputStream.read match { case -1 => Watch.Ignore + case 3 => Watch.CancelWatch // ctrl+c on windows case byte => inputHandler(byte.toChar.toString) } } catch { @@ -794,38 +817,52 @@ private[sbt] object Continuous extends DeprecatedContinuous { logger: Logger, rawLogger: Logger, executor: WatchExecutor, + channel: CommandChannel ): Int => Watch.Action = count => { val events = new LinkedBlockingQueue[Either[Watch.Action, (Watch.Event, Watch.Action)]] val inputJob = - executor.submit("handle-input", nextInputAction(executor).foreach(a => events.put(Left(a)))) + executor + .submit(s"handle-input-$count", nextInputAction(executor).foreach(a => events.put(Left(a)))) val fileJob = - executor.submit("get-file-events", nextFileEvent(count).foreach(e => events.put(Right(e)))) + executor + .submit(s"get-file-events-$count", nextFileEvent(count).foreach(e => events.put(Right(e)))) + val signalRegistration = channel match { + case _: ConsoleChannel => Some(Signals.register(() => events.put(Left(Watch.CancelWatch)))) + case _ => None + } try { - val (inputAction: Watch.Action, fileEvent: Option[(Watch.Event, Watch.Action)]) = - events.take() match { - case Left(a) => (a, None) - case Right(e) => (Watch.Ignore, Some(e)) + if (!inputJob.waitUntilStart(1.second) || !fileJob.waitUntilStart(1.second)) { + inputJob.cancel() + fileJob.cancel() + new Watch.HandleError(new TimeoutException) + } else { + val (inputAction: Watch.Action, fileEvent: Option[(Watch.Event, Watch.Action)]) = + events.take() match { + case Left(a) => (a, None) + case Right(e) => (Watch.Ignore, Some(e)) + } + val min: Watch.Action = (fileEvent.map(_._2).toSeq :+ inputAction).min + lazy val inputMessage = + s"Received input event: $inputAction." + + (if (inputAction != min) s" Dropping in favor of file event: $min" else "") + if (inputAction != Watch.Ignore) logger.info(inputMessage) + fileEvent + .collect { + case (event, action) if action != Watch.Ignore => + s"Received file event $action for $event." + + (if (action != min) s" Dropping in favor of input event: $min" else "") + } + .foreach(logger.debug(_)) + min match { + case ShowOptions => + rawLogger.info(options) + Watch.Ignore + case m => m } - val min: Watch.Action = (fileEvent.map(_._2).toSeq :+ inputAction).min - lazy val inputMessage = - s"Received input event: $inputAction." + - (if (inputAction != min) s" Dropping in favor of file event: $min" else "") - if (inputAction != Watch.Ignore) logger.info(inputMessage) - fileEvent - .collect { - case (event, action) if action != Watch.Ignore => - s"Received file event $action for $event." + - (if (action != min) s" Dropping in favor of input event: $min" else "") - } - .foreach(logger.debug(_)) - min match { - case ShowOptions => - rawLogger.info(options) - Watch.Ignore - case m => m } } finally { + signalRegistration.foreach(_.remove()) inputJob.cancel() fileJob.cancel() ()