From afac5e50677db0da9f0f305f443186ae7ac7f3c7 Mon Sep 17 00:00:00 2001 From: Ethan Atkins Date: Mon, 24 Aug 2020 11:04:14 -0700 Subject: [PATCH] Block watch until threads spin up I occassionally end up in a state where watch input does not seem to be read. To rule out the possibility that the background thread that reads input has not successfully started, this commit makes it so that we block until the thread signals that it has started via a CountDownLatch. The diff is superficially big because of an indentation change at the bottom. --- .../main/scala/sbt/internal/Continuous.scala | 78 ++++++++++++------- 1 file changed, 50 insertions(+), 28 deletions(-) diff --git a/main/src/main/scala/sbt/internal/Continuous.scala b/main/src/main/scala/sbt/internal/Continuous.scala index c97cbfe16..ed79d5b10 100644 --- a/main/src/main/scala/sbt/internal/Continuous.scala +++ b/main/src/main/scala/sbt/internal/Continuous.scala @@ -10,7 +10,13 @@ package internal import java.io.{ ByteArrayInputStream, IOException, InputStream, File => _ } import java.nio.file.Path -import java.util.concurrent.{ ConcurrentHashMap, LinkedBlockingQueue } +import java.util.concurrent.{ + ConcurrentHashMap, + CountDownLatch, + LinkedBlockingQueue, + TimeUnit, + TimeoutException +} import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } import sbt.BasicCommandStrings._ @@ -654,15 +660,18 @@ private[sbt] object Continuous extends DeprecatedContinuous { if (closed.get) new WatchExecutor.FutureFailed(new IllegalStateException("closed executor")) else { val queue = new LinkedBlockingQueue[Either[Unit, R]] + val latch = new CountDownLatch(1) val runnable: Runnable = () => { - try Util.ignoreResult(queue.offer(if (closed.get) Left(()) else Right(f))) - catch { case _: Exception => Util.ignoreResult(queue.offer(Left(()))) } + try { + latch.countDown() + Util.ignoreResult(queue.offer(if (closed.get) Left(()) else Right(f))) + } catch { case _: Exception => Util.ignoreResult(queue.offer(Left(()))) } } val thread = new Thread(runnable, s"sbt-watch-$name-$tag") thread.setDaemon(true) thread.start() threads.add(thread) - new WatchExecutor.FutureImpl(thread, queue, this) + new WatchExecutor.FutureImpl(thread, queue, this, latch) } } def removeThread(thread: Thread): Unit = Util.ignoreResult(threads.remove(thread)) @@ -678,18 +687,23 @@ private[sbt] object Continuous extends DeprecatedContinuous { } private object WatchExecutor { sealed trait Future[R] extends Any { + def waitUntilStart(duration: FiniteDuration): Boolean def cancel(): Unit def result: Try[R] } final class FutureFailed[R](t: Throwable) extends Future[R] { + override def waitUntilStart(duration: FiniteDuration): Boolean = false def cancel(): Unit = {} def result: Try[R] = Failure(t) } final class FutureImpl[R]( thread: Thread, queue: LinkedBlockingQueue[Either[Unit, R]], - executor: WatchExecutor + executor: WatchExecutor, + latch: CountDownLatch, ) extends Future[R] { + override def waitUntilStart(duration: FiniteDuration): Boolean = + latch.await(duration.toMillis, TimeUnit.MILLISECONDS) def cancel(): Unit = { executor.removeThread(thread) thread.joinFor(1.second) @@ -798,32 +812,40 @@ private[sbt] object Continuous extends DeprecatedContinuous { val events = new LinkedBlockingQueue[Either[Watch.Action, (Watch.Event, Watch.Action)]] val inputJob = - executor.submit("handle-input", nextInputAction(executor).foreach(a => events.put(Left(a)))) + executor + .submit(s"handle-input-$count", nextInputAction(executor).foreach(a => events.put(Left(a)))) val fileJob = - executor.submit("get-file-events", nextFileEvent(count).foreach(e => events.put(Right(e)))) + executor + .submit(s"get-file-events-$count", nextFileEvent(count).foreach(e => events.put(Right(e)))) try { - val (inputAction: Watch.Action, fileEvent: Option[(Watch.Event, Watch.Action)]) = - events.take() match { - case Left(a) => (a, None) - case Right(e) => (Watch.Ignore, Some(e)) + if (!inputJob.waitUntilStart(1.second) || !fileJob.waitUntilStart(1.second)) { + inputJob.cancel() + fileJob.cancel() + new Watch.HandleError(new TimeoutException) + } else { + val (inputAction: Watch.Action, fileEvent: Option[(Watch.Event, Watch.Action)]) = + events.take() match { + case Left(a) => (a, None) + case Right(e) => (Watch.Ignore, Some(e)) + } + val min: Watch.Action = (fileEvent.map(_._2).toSeq :+ inputAction).min + lazy val inputMessage = + s"Received input event: $inputAction." + + (if (inputAction != min) s" Dropping in favor of file event: $min" else "") + if (inputAction != Watch.Ignore) logger.info(inputMessage) + fileEvent + .collect { + case (event, action) if action != Watch.Ignore => + s"Received file event $action for $event." + + (if (action != min) s" Dropping in favor of input event: $min" else "") + } + .foreach(logger.debug(_)) + min match { + case ShowOptions => + rawLogger.info(options) + Watch.Ignore + case m => m } - val min: Watch.Action = (fileEvent.map(_._2).toSeq :+ inputAction).min - lazy val inputMessage = - s"Received input event: $inputAction." + - (if (inputAction != min) s" Dropping in favor of file event: $min" else "") - if (inputAction != Watch.Ignore) logger.info(inputMessage) - fileEvent - .collect { - case (event, action) if action != Watch.Ignore => - s"Received file event $action for $event." + - (if (action != min) s" Dropping in favor of input event: $min" else "") - } - .foreach(logger.debug(_)) - min match { - case ShowOptions => - rawLogger.info(options) - Watch.Ignore - case m => m } } finally { inputJob.cancel()