From b656d599e17f4b681fb5440422a311d70e414f60 Mon Sep 17 00:00:00 2001 From: Ethan Atkins Date: Mon, 3 Aug 2020 15:46:03 -0700 Subject: [PATCH] Allow sbt to force flush of remote output In eb688c9ecdb942dbf0fa985c0103f6f95f6341c1, we started buffering output to the remote client to reduce flickering. This was causing problems with the output for the thin client in batch mode. With the delay, it was possible for the client to exit before all of its output had been displayed. Bonus: only display aggregation error message if terminal has success enabled (the thin client displays its own timing message so the message in aggregation ended up being a duplicate). --- .../scala/sbt/internal/util/Terminal.scala | 2 +- main/src/main/scala/sbt/MainLoop.scala | 4 +++ .../main/scala/sbt/internal/Aggregation.scala | 2 +- .../sbt/internal/server/NetworkChannel.scala | 29 ++++++++++--------- 4 files changed, 21 insertions(+), 16 deletions(-) diff --git a/internal/util-logging/src/main/scala/sbt/internal/util/Terminal.scala b/internal/util-logging/src/main/scala/sbt/internal/util/Terminal.scala index 019e0bb46..52cad90de 100644 --- a/internal/util-logging/src/main/scala/sbt/internal/util/Terminal.scala +++ b/internal/util-logging/src/main/scala/sbt/internal/util/Terminal.scala @@ -163,7 +163,7 @@ trait Terminal extends AutoCloseable { if (lines.nonEmpty) lines.tail.foldLeft(lines.headOption.fold(0)(count))(_ + count(_)) else 0 } - + private[sbt] def flush(): Unit = printStream.flush() } object Terminal { diff --git a/main/src/main/scala/sbt/MainLoop.scala b/main/src/main/scala/sbt/MainLoop.scala index 09f26e96e..a2c4efd7d 100644 --- a/main/src/main/scala/sbt/MainLoop.scala +++ b/main/src/main/scala/sbt/MainLoop.scala @@ -202,6 +202,10 @@ object MainLoop { StandardMain.exchange.setExec(Some(exec)) StandardMain.exchange.unprompt(ConsoleUnpromptEvent(exec.source)) val newState = Command.process(exec.commandLine, progressState) + // Flush the terminal output after command evaluation to ensure that all output + // is displayed in the thin client before we report the command status. + val terminal = channelName.flatMap(exchange.channelForName(_).map(_.terminal)) + terminal.foreach(_.flush()) if (exec.execId.fold(true)(!_.startsWith(networkExecPrefix)) && !exec.commandLine.startsWith(networkExecPrefix)) { val doneEvent = ExecStatusEvent( diff --git a/main/src/main/scala/sbt/internal/Aggregation.scala b/main/src/main/scala/sbt/internal/Aggregation.scala index df7e43ee3..62b6fa7d9 100644 --- a/main/src/main/scala/sbt/internal/Aggregation.scala +++ b/main/src/main/scala/sbt/internal/Aggregation.scala @@ -131,7 +131,7 @@ object Aggregation { if (get(showSuccess)) { if (get(showTiming)) { val msg = timingString(start, stop, structure.data, currentRef) - if (success) log.success(msg) else log.error(msg) + if (success) log.success(msg) else if (Terminal.get.isSuccessEnabled) log.error(msg) } else if (success) log.success("") } diff --git a/main/src/main/scala/sbt/internal/server/NetworkChannel.scala b/main/src/main/scala/sbt/internal/server/NetworkChannel.scala index 4aa3c1f81..ddb42df1c 100644 --- a/main/src/main/scala/sbt/internal/server/NetworkChannel.scala +++ b/main/src/main/scala/sbt/internal/server/NetworkChannel.scala @@ -653,6 +653,13 @@ final class NetworkChannel( import sjsonnew.BasicJsonProtocol._ import scala.collection.JavaConverters._ + private[this] val outputBuffer = new LinkedBlockingQueue[Byte] + private[this] val flushFuture = new AtomicReference[java.util.concurrent.Future[_]] + private[this] def doFlush()() = { + val list = new java.util.ArrayList[Byte] + outputBuffer.synchronized(outputBuffer.drainTo(list)) + if (!list.isEmpty) jsonRpcNotify(Serialization.systemOut, list.asScala.toSeq) + } private[this] lazy val outputStream: OutputStream with AutoCloseable = new OutputStream with AutoCloseable { /* @@ -670,28 +677,21 @@ final class NetworkChannel( Executors.newSingleThreadScheduledExecutor( r => new Thread(r, s"$name-output-buffer-timer-thread") ) - private[this] val buffer = new LinkedBlockingQueue[Byte] - private[this] val future = new AtomicReference[java.util.concurrent.Future[_]] - private[this] def doFlush()() = { - val list = new java.util.ArrayList[Byte] - buffer.synchronized(buffer.drainTo(list)) - if (!list.isEmpty) jsonRpcNotify(Serialization.systemOut, list.asScala.toSeq) - } override def close(): Unit = { Util.ignoreResult(executor.shutdownNow()) doFlush() } - override def write(b: Int): Unit = buffer.synchronized { - buffer.put(b.toByte) + override def write(b: Int): Unit = outputBuffer.synchronized { + outputBuffer.put(b.toByte) } override def flush(): Unit = { - future.get match { + flushFuture.get match { case null => try { - future.set( + flushFuture.set( executor.schedule( (() => { - future.set(null) + flushFuture.set(null) doFlush() }): Runnable, 20, @@ -702,8 +702,8 @@ final class NetworkChannel( case f => } } - override def write(b: Array[Byte]): Unit = buffer.synchronized { - b.foreach(buffer.put) + override def write(b: Array[Byte]): Unit = outputBuffer.synchronized { + b.foreach(outputBuffer.put) } override def write(b: Array[Byte], off: Int, len: Int): Unit = { write(java.util.Arrays.copyOfRange(b, off, off + len)) @@ -880,6 +880,7 @@ final class NetworkChannel( catch { case _: InterruptedException => } } + override def flush(): Unit = doFlush() override def toString: String = s"NetworkTerminal($name)" override def close(): Unit = if (closed.compareAndSet(false, true)) { val threads = blockedThreads.synchronized {