Block watch until threads spin up

I occassionally end up in a state where watch input does not seem to be
read. To rule out the possibility that the background thread that reads
input has not successfully started, this commit makes it so that we
block until the thread signals that it has started via a CountDownLatch.

The diff is superficially big because of an indentation change at the
bottom.
This commit is contained in:
Ethan Atkins 2020-08-24 11:04:14 -07:00
parent 1b99049fbc
commit afac5e5067
1 changed files with 50 additions and 28 deletions

View File

@ -10,7 +10,13 @@ package internal
import java.io.{ ByteArrayInputStream, IOException, InputStream, File => _ }
import java.nio.file.Path
import java.util.concurrent.{ ConcurrentHashMap, LinkedBlockingQueue }
import java.util.concurrent.{
ConcurrentHashMap,
CountDownLatch,
LinkedBlockingQueue,
TimeUnit,
TimeoutException
}
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
import sbt.BasicCommandStrings._
@ -654,15 +660,18 @@ private[sbt] object Continuous extends DeprecatedContinuous {
if (closed.get) new WatchExecutor.FutureFailed(new IllegalStateException("closed executor"))
else {
val queue = new LinkedBlockingQueue[Either[Unit, R]]
val latch = new CountDownLatch(1)
val runnable: Runnable = () => {
try Util.ignoreResult(queue.offer(if (closed.get) Left(()) else Right(f)))
catch { case _: Exception => Util.ignoreResult(queue.offer(Left(()))) }
try {
latch.countDown()
Util.ignoreResult(queue.offer(if (closed.get) Left(()) else Right(f)))
} catch { case _: Exception => Util.ignoreResult(queue.offer(Left(()))) }
}
val thread = new Thread(runnable, s"sbt-watch-$name-$tag")
thread.setDaemon(true)
thread.start()
threads.add(thread)
new WatchExecutor.FutureImpl(thread, queue, this)
new WatchExecutor.FutureImpl(thread, queue, this, latch)
}
}
def removeThread(thread: Thread): Unit = Util.ignoreResult(threads.remove(thread))
@ -678,18 +687,23 @@ private[sbt] object Continuous extends DeprecatedContinuous {
}
private object WatchExecutor {
sealed trait Future[R] extends Any {
def waitUntilStart(duration: FiniteDuration): Boolean
def cancel(): Unit
def result: Try[R]
}
final class FutureFailed[R](t: Throwable) extends Future[R] {
override def waitUntilStart(duration: FiniteDuration): Boolean = false
def cancel(): Unit = {}
def result: Try[R] = Failure(t)
}
final class FutureImpl[R](
thread: Thread,
queue: LinkedBlockingQueue[Either[Unit, R]],
executor: WatchExecutor
executor: WatchExecutor,
latch: CountDownLatch,
) extends Future[R] {
override def waitUntilStart(duration: FiniteDuration): Boolean =
latch.await(duration.toMillis, TimeUnit.MILLISECONDS)
def cancel(): Unit = {
executor.removeThread(thread)
thread.joinFor(1.second)
@ -798,32 +812,40 @@ private[sbt] object Continuous extends DeprecatedContinuous {
val events = new LinkedBlockingQueue[Either[Watch.Action, (Watch.Event, Watch.Action)]]
val inputJob =
executor.submit("handle-input", nextInputAction(executor).foreach(a => events.put(Left(a))))
executor
.submit(s"handle-input-$count", nextInputAction(executor).foreach(a => events.put(Left(a))))
val fileJob =
executor.submit("get-file-events", nextFileEvent(count).foreach(e => events.put(Right(e))))
executor
.submit(s"get-file-events-$count", nextFileEvent(count).foreach(e => events.put(Right(e))))
try {
val (inputAction: Watch.Action, fileEvent: Option[(Watch.Event, Watch.Action)]) =
events.take() match {
case Left(a) => (a, None)
case Right(e) => (Watch.Ignore, Some(e))
if (!inputJob.waitUntilStart(1.second) || !fileJob.waitUntilStart(1.second)) {
inputJob.cancel()
fileJob.cancel()
new Watch.HandleError(new TimeoutException)
} else {
val (inputAction: Watch.Action, fileEvent: Option[(Watch.Event, Watch.Action)]) =
events.take() match {
case Left(a) => (a, None)
case Right(e) => (Watch.Ignore, Some(e))
}
val min: Watch.Action = (fileEvent.map(_._2).toSeq :+ inputAction).min
lazy val inputMessage =
s"Received input event: $inputAction." +
(if (inputAction != min) s" Dropping in favor of file event: $min" else "")
if (inputAction != Watch.Ignore) logger.info(inputMessage)
fileEvent
.collect {
case (event, action) if action != Watch.Ignore =>
s"Received file event $action for $event." +
(if (action != min) s" Dropping in favor of input event: $min" else "")
}
.foreach(logger.debug(_))
min match {
case ShowOptions =>
rawLogger.info(options)
Watch.Ignore
case m => m
}
val min: Watch.Action = (fileEvent.map(_._2).toSeq :+ inputAction).min
lazy val inputMessage =
s"Received input event: $inputAction." +
(if (inputAction != min) s" Dropping in favor of file event: $min" else "")
if (inputAction != Watch.Ignore) logger.info(inputMessage)
fileEvent
.collect {
case (event, action) if action != Watch.Ignore =>
s"Received file event $action for $event." +
(if (action != min) s" Dropping in favor of input event: $min" else "")
}
.foreach(logger.debug(_))
min match {
case ShowOptions =>
rawLogger.info(options)
Watch.Ignore
case m => m
}
} finally {
inputJob.cancel()