Merge pull request #5718 from eatkins/force-flush

Allow sbt to force flush of remote output
This commit is contained in:
eugene yokota 2020-08-04 22:14:45 -04:00 committed by GitHub
commit 34b74d0a14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 21 additions and 16 deletions

View File

@ -163,7 +163,7 @@ trait Terminal extends AutoCloseable {
if (lines.nonEmpty) lines.tail.foldLeft(lines.headOption.fold(0)(count))(_ + count(_))
else 0
}
private[sbt] def flush(): Unit = printStream.flush()
}
object Terminal {

View File

@ -202,6 +202,10 @@ object MainLoop {
StandardMain.exchange.setExec(Some(exec))
StandardMain.exchange.unprompt(ConsoleUnpromptEvent(exec.source))
val newState = Command.process(exec.commandLine, progressState)
// Flush the terminal output after command evaluation to ensure that all output
// is displayed in the thin client before we report the command status.
val terminal = channelName.flatMap(exchange.channelForName(_).map(_.terminal))
terminal.foreach(_.flush())
if (exec.execId.fold(true)(!_.startsWith(networkExecPrefix)) &&
!exec.commandLine.startsWith(networkExecPrefix)) {
val doneEvent = ExecStatusEvent(

View File

@ -131,7 +131,7 @@ object Aggregation {
if (get(showSuccess)) {
if (get(showTiming)) {
val msg = timingString(start, stop, structure.data, currentRef)
if (success) log.success(msg) else log.error(msg)
if (success) log.success(msg) else if (Terminal.get.isSuccessEnabled) log.error(msg)
} else if (success)
log.success("")
}

View File

@ -653,6 +653,13 @@ final class NetworkChannel(
import sjsonnew.BasicJsonProtocol._
import scala.collection.JavaConverters._
private[this] val outputBuffer = new LinkedBlockingQueue[Byte]
private[this] val flushFuture = new AtomicReference[java.util.concurrent.Future[_]]
private[this] def doFlush()() = {
val list = new java.util.ArrayList[Byte]
outputBuffer.synchronized(outputBuffer.drainTo(list))
if (!list.isEmpty) jsonRpcNotify(Serialization.systemOut, list.asScala.toSeq)
}
private[this] lazy val outputStream: OutputStream with AutoCloseable = new OutputStream
with AutoCloseable {
/*
@ -670,28 +677,21 @@ final class NetworkChannel(
Executors.newSingleThreadScheduledExecutor(
r => new Thread(r, s"$name-output-buffer-timer-thread")
)
private[this] val buffer = new LinkedBlockingQueue[Byte]
private[this] val future = new AtomicReference[java.util.concurrent.Future[_]]
private[this] def doFlush()() = {
val list = new java.util.ArrayList[Byte]
buffer.synchronized(buffer.drainTo(list))
if (!list.isEmpty) jsonRpcNotify(Serialization.systemOut, list.asScala.toSeq)
}
override def close(): Unit = {
Util.ignoreResult(executor.shutdownNow())
doFlush()
}
override def write(b: Int): Unit = buffer.synchronized {
buffer.put(b.toByte)
override def write(b: Int): Unit = outputBuffer.synchronized {
outputBuffer.put(b.toByte)
}
override def flush(): Unit = {
future.get match {
flushFuture.get match {
case null =>
try {
future.set(
flushFuture.set(
executor.schedule(
(() => {
future.set(null)
flushFuture.set(null)
doFlush()
}): Runnable,
20,
@ -702,8 +702,8 @@ final class NetworkChannel(
case f =>
}
}
override def write(b: Array[Byte]): Unit = buffer.synchronized {
b.foreach(buffer.put)
override def write(b: Array[Byte]): Unit = outputBuffer.synchronized {
b.foreach(outputBuffer.put)
}
override def write(b: Array[Byte], off: Int, len: Int): Unit = {
write(java.util.Arrays.copyOfRange(b, off, off + len))
@ -880,6 +880,7 @@ final class NetworkChannel(
catch { case _: InterruptedException => }
}
override def flush(): Unit = doFlush()
override def toString: String = s"NetworkTerminal($name)"
override def close(): Unit = if (closed.compareAndSet(false, true)) {
val threads = blockedThreads.synchronized {