From afac5e50677db0da9f0f305f443186ae7ac7f3c7 Mon Sep 17 00:00:00 2001 From: Ethan Atkins Date: Mon, 24 Aug 2020 11:04:14 -0700 Subject: [PATCH 1/2] Block watch until threads spin up I occassionally end up in a state where watch input does not seem to be read. To rule out the possibility that the background thread that reads input has not successfully started, this commit makes it so that we block until the thread signals that it has started via a CountDownLatch. The diff is superficially big because of an indentation change at the bottom. --- .../main/scala/sbt/internal/Continuous.scala | 78 ++++++++++++------- 1 file changed, 50 insertions(+), 28 deletions(-) diff --git a/main/src/main/scala/sbt/internal/Continuous.scala b/main/src/main/scala/sbt/internal/Continuous.scala index c97cbfe16..ed79d5b10 100644 --- a/main/src/main/scala/sbt/internal/Continuous.scala +++ b/main/src/main/scala/sbt/internal/Continuous.scala @@ -10,7 +10,13 @@ package internal import java.io.{ ByteArrayInputStream, IOException, InputStream, File => _ } import java.nio.file.Path -import java.util.concurrent.{ ConcurrentHashMap, LinkedBlockingQueue } +import java.util.concurrent.{ + ConcurrentHashMap, + CountDownLatch, + LinkedBlockingQueue, + TimeUnit, + TimeoutException +} import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } import sbt.BasicCommandStrings._ @@ -654,15 +660,18 @@ private[sbt] object Continuous extends DeprecatedContinuous { if (closed.get) new WatchExecutor.FutureFailed(new IllegalStateException("closed executor")) else { val queue = new LinkedBlockingQueue[Either[Unit, R]] + val latch = new CountDownLatch(1) val runnable: Runnable = () => { - try Util.ignoreResult(queue.offer(if (closed.get) Left(()) else Right(f))) - catch { case _: Exception => Util.ignoreResult(queue.offer(Left(()))) } + try { + latch.countDown() + Util.ignoreResult(queue.offer(if (closed.get) Left(()) else Right(f))) + } catch { case _: Exception => Util.ignoreResult(queue.offer(Left(()))) } } val thread = new Thread(runnable, s"sbt-watch-$name-$tag") thread.setDaemon(true) thread.start() threads.add(thread) - new WatchExecutor.FutureImpl(thread, queue, this) + new WatchExecutor.FutureImpl(thread, queue, this, latch) } } def removeThread(thread: Thread): Unit = Util.ignoreResult(threads.remove(thread)) @@ -678,18 +687,23 @@ private[sbt] object Continuous extends DeprecatedContinuous { } private object WatchExecutor { sealed trait Future[R] extends Any { + def waitUntilStart(duration: FiniteDuration): Boolean def cancel(): Unit def result: Try[R] } final class FutureFailed[R](t: Throwable) extends Future[R] { + override def waitUntilStart(duration: FiniteDuration): Boolean = false def cancel(): Unit = {} def result: Try[R] = Failure(t) } final class FutureImpl[R]( thread: Thread, queue: LinkedBlockingQueue[Either[Unit, R]], - executor: WatchExecutor + executor: WatchExecutor, + latch: CountDownLatch, ) extends Future[R] { + override def waitUntilStart(duration: FiniteDuration): Boolean = + latch.await(duration.toMillis, TimeUnit.MILLISECONDS) def cancel(): Unit = { executor.removeThread(thread) thread.joinFor(1.second) @@ -798,32 +812,40 @@ private[sbt] object Continuous extends DeprecatedContinuous { val events = new LinkedBlockingQueue[Either[Watch.Action, (Watch.Event, Watch.Action)]] val inputJob = - executor.submit("handle-input", nextInputAction(executor).foreach(a => events.put(Left(a)))) + executor + .submit(s"handle-input-$count", nextInputAction(executor).foreach(a => events.put(Left(a)))) val fileJob = - executor.submit("get-file-events", nextFileEvent(count).foreach(e => events.put(Right(e)))) + executor + .submit(s"get-file-events-$count", nextFileEvent(count).foreach(e => events.put(Right(e)))) try { - val (inputAction: Watch.Action, fileEvent: Option[(Watch.Event, Watch.Action)]) = - events.take() match { - case Left(a) => (a, None) - case Right(e) => (Watch.Ignore, Some(e)) + if (!inputJob.waitUntilStart(1.second) || !fileJob.waitUntilStart(1.second)) { + inputJob.cancel() + fileJob.cancel() + new Watch.HandleError(new TimeoutException) + } else { + val (inputAction: Watch.Action, fileEvent: Option[(Watch.Event, Watch.Action)]) = + events.take() match { + case Left(a) => (a, None) + case Right(e) => (Watch.Ignore, Some(e)) + } + val min: Watch.Action = (fileEvent.map(_._2).toSeq :+ inputAction).min + lazy val inputMessage = + s"Received input event: $inputAction." + + (if (inputAction != min) s" Dropping in favor of file event: $min" else "") + if (inputAction != Watch.Ignore) logger.info(inputMessage) + fileEvent + .collect { + case (event, action) if action != Watch.Ignore => + s"Received file event $action for $event." + + (if (action != min) s" Dropping in favor of input event: $min" else "") + } + .foreach(logger.debug(_)) + min match { + case ShowOptions => + rawLogger.info(options) + Watch.Ignore + case m => m } - val min: Watch.Action = (fileEvent.map(_._2).toSeq :+ inputAction).min - lazy val inputMessage = - s"Received input event: $inputAction." + - (if (inputAction != min) s" Dropping in favor of file event: $min" else "") - if (inputAction != Watch.Ignore) logger.info(inputMessage) - fileEvent - .collect { - case (event, action) if action != Watch.Ignore => - s"Received file event $action for $event." + - (if (action != min) s" Dropping in favor of input event: $min" else "") - } - .foreach(logger.debug(_)) - min match { - case ShowOptions => - rawLogger.info(options) - Watch.Ignore - case m => m } } finally { inputJob.cancel() From c931b005ab405870d45847b6189cfbc36db83479 Mon Sep 17 00:00:00 2001 From: Ethan Atkins Date: Mon, 24 Aug 2020 11:17:59 -0700 Subject: [PATCH 2/2] Change ctrl+c behavior in ~ While in a continuous build, when the user enters ctrl+c into the sbt server console (not a thin client connection) when sbt has been launched in interactive mode, the server exits. This commit makes it so that instead we just cancel the watch. As a result, if sbt was started in batch mode, e.g. `sbt ~compile`, ctrl+c will still exit sbt but in interactive mode ctrl+c will take the user back to the shell. --- .../main/scala/sbt/internal/Continuous.scala | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/main/src/main/scala/sbt/internal/Continuous.scala b/main/src/main/scala/sbt/internal/Continuous.scala index ed79d5b10..7aacae75f 100644 --- a/main/src/main/scala/sbt/internal/Continuous.scala +++ b/main/src/main/scala/sbt/internal/Continuous.scala @@ -332,7 +332,15 @@ private[sbt] object Continuous extends DeprecatedContinuous { ) = getFileEvents(configs, logger, state, commands, fileStampCache, channel.name) val executor = new WatchExecutor(channel.name) val nextEvent: Int => Watch.Action = - combineInputAndFileEvents(nextInputEvent, nextFileEvent, message, logger, logger, executor) + combineInputAndFileEvents( + nextInputEvent, + nextFileEvent, + message, + logger, + logger, + executor, + channel + ) val onExit = () => { cleanupFileMonitor() Util.ignoreResult(executor.close()) @@ -781,6 +789,7 @@ private[sbt] object Continuous extends DeprecatedContinuous { interrupted.set(false) terminal.inputStream.read match { case -1 => Watch.Ignore + case 3 => Watch.CancelWatch // ctrl+c on windows case byte => inputHandler(byte.toChar.toString) } } catch { @@ -808,6 +817,7 @@ private[sbt] object Continuous extends DeprecatedContinuous { logger: Logger, rawLogger: Logger, executor: WatchExecutor, + channel: CommandChannel ): Int => Watch.Action = count => { val events = new LinkedBlockingQueue[Either[Watch.Action, (Watch.Event, Watch.Action)]] @@ -817,6 +827,10 @@ private[sbt] object Continuous extends DeprecatedContinuous { val fileJob = executor .submit(s"get-file-events-$count", nextFileEvent(count).foreach(e => events.put(Right(e)))) + val signalRegistration = channel match { + case _: ConsoleChannel => Some(Signals.register(() => events.put(Left(Watch.CancelWatch)))) + case _ => None + } try { if (!inputJob.waitUntilStart(1.second) || !fileJob.waitUntilStart(1.second)) { inputJob.cancel() @@ -848,6 +862,7 @@ private[sbt] object Continuous extends DeprecatedContinuous { } } } finally { + signalRegistration.foreach(_.remove()) inputJob.cancel() fileJob.cancel() ()