Merge pull request #5804 from eatkins/watch-cancel

Change ctrl+c behavior in ~
This commit is contained in:
eugene yokota 2020-09-01 09:10:14 -04:00 committed by GitHub
commit debc9a28a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 66 additions and 29 deletions

View File

@ -10,7 +10,13 @@ package internal
import java.io.{ ByteArrayInputStream, IOException, InputStream, File => _ }
import java.nio.file.Path
import java.util.concurrent.{ ConcurrentHashMap, LinkedBlockingQueue }
import java.util.concurrent.{
ConcurrentHashMap,
CountDownLatch,
LinkedBlockingQueue,
TimeUnit,
TimeoutException
}
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
import sbt.BasicCommandStrings._
@ -326,7 +332,15 @@ private[sbt] object Continuous extends DeprecatedContinuous {
) = getFileEvents(configs, logger, state, commands, fileStampCache, channel.name)
val executor = new WatchExecutor(channel.name)
val nextEvent: Int => Watch.Action =
combineInputAndFileEvents(nextInputEvent, nextFileEvent, message, logger, logger, executor)
combineInputAndFileEvents(
nextInputEvent,
nextFileEvent,
message,
logger,
logger,
executor,
channel
)
val onExit = () => {
cleanupFileMonitor()
Util.ignoreResult(executor.close())
@ -654,15 +668,18 @@ private[sbt] object Continuous extends DeprecatedContinuous {
if (closed.get) new WatchExecutor.FutureFailed(new IllegalStateException("closed executor"))
else {
val queue = new LinkedBlockingQueue[Either[Unit, R]]
val latch = new CountDownLatch(1)
val runnable: Runnable = () => {
try Util.ignoreResult(queue.offer(if (closed.get) Left(()) else Right(f)))
catch { case _: Exception => Util.ignoreResult(queue.offer(Left(()))) }
try {
latch.countDown()
Util.ignoreResult(queue.offer(if (closed.get) Left(()) else Right(f)))
} catch { case _: Exception => Util.ignoreResult(queue.offer(Left(()))) }
}
val thread = new Thread(runnable, s"sbt-watch-$name-$tag")
thread.setDaemon(true)
thread.start()
threads.add(thread)
new WatchExecutor.FutureImpl(thread, queue, this)
new WatchExecutor.FutureImpl(thread, queue, this, latch)
}
}
def removeThread(thread: Thread): Unit = Util.ignoreResult(threads.remove(thread))
@ -678,18 +695,23 @@ private[sbt] object Continuous extends DeprecatedContinuous {
}
private object WatchExecutor {
sealed trait Future[R] extends Any {
def waitUntilStart(duration: FiniteDuration): Boolean
def cancel(): Unit
def result: Try[R]
}
final class FutureFailed[R](t: Throwable) extends Future[R] {
override def waitUntilStart(duration: FiniteDuration): Boolean = false
def cancel(): Unit = {}
def result: Try[R] = Failure(t)
}
final class FutureImpl[R](
thread: Thread,
queue: LinkedBlockingQueue[Either[Unit, R]],
executor: WatchExecutor
executor: WatchExecutor,
latch: CountDownLatch,
) extends Future[R] {
override def waitUntilStart(duration: FiniteDuration): Boolean =
latch.await(duration.toMillis, TimeUnit.MILLISECONDS)
def cancel(): Unit = {
executor.removeThread(thread)
thread.joinFor(1.second)
@ -767,6 +789,7 @@ private[sbt] object Continuous extends DeprecatedContinuous {
interrupted.set(false)
terminal.inputStream.read match {
case -1 => Watch.Ignore
case 3 => Watch.CancelWatch // ctrl+c on windows
case byte => inputHandler(byte.toChar.toString)
}
} catch {
@ -794,38 +817,52 @@ private[sbt] object Continuous extends DeprecatedContinuous {
logger: Logger,
rawLogger: Logger,
executor: WatchExecutor,
channel: CommandChannel
): Int => Watch.Action = count => {
val events = new LinkedBlockingQueue[Either[Watch.Action, (Watch.Event, Watch.Action)]]
val inputJob =
executor.submit("handle-input", nextInputAction(executor).foreach(a => events.put(Left(a))))
executor
.submit(s"handle-input-$count", nextInputAction(executor).foreach(a => events.put(Left(a))))
val fileJob =
executor.submit("get-file-events", nextFileEvent(count).foreach(e => events.put(Right(e))))
executor
.submit(s"get-file-events-$count", nextFileEvent(count).foreach(e => events.put(Right(e))))
val signalRegistration = channel match {
case _: ConsoleChannel => Some(Signals.register(() => events.put(Left(Watch.CancelWatch))))
case _ => None
}
try {
val (inputAction: Watch.Action, fileEvent: Option[(Watch.Event, Watch.Action)]) =
events.take() match {
case Left(a) => (a, None)
case Right(e) => (Watch.Ignore, Some(e))
if (!inputJob.waitUntilStart(1.second) || !fileJob.waitUntilStart(1.second)) {
inputJob.cancel()
fileJob.cancel()
new Watch.HandleError(new TimeoutException)
} else {
val (inputAction: Watch.Action, fileEvent: Option[(Watch.Event, Watch.Action)]) =
events.take() match {
case Left(a) => (a, None)
case Right(e) => (Watch.Ignore, Some(e))
}
val min: Watch.Action = (fileEvent.map(_._2).toSeq :+ inputAction).min
lazy val inputMessage =
s"Received input event: $inputAction." +
(if (inputAction != min) s" Dropping in favor of file event: $min" else "")
if (inputAction != Watch.Ignore) logger.info(inputMessage)
fileEvent
.collect {
case (event, action) if action != Watch.Ignore =>
s"Received file event $action for $event." +
(if (action != min) s" Dropping in favor of input event: $min" else "")
}
.foreach(logger.debug(_))
min match {
case ShowOptions =>
rawLogger.info(options)
Watch.Ignore
case m => m
}
val min: Watch.Action = (fileEvent.map(_._2).toSeq :+ inputAction).min
lazy val inputMessage =
s"Received input event: $inputAction." +
(if (inputAction != min) s" Dropping in favor of file event: $min" else "")
if (inputAction != Watch.Ignore) logger.info(inputMessage)
fileEvent
.collect {
case (event, action) if action != Watch.Ignore =>
s"Received file event $action for $event." +
(if (action != min) s" Dropping in favor of input event: $min" else "")
}
.foreach(logger.debug(_))
min match {
case ShowOptions =>
rawLogger.info(options)
Watch.Ignore
case m => m
}
} finally {
signalRegistration.foreach(_.remove())
inputJob.cancel()
fileJob.cancel()
()