From c77d6dfb384fecdff15e290d03d608524e43c690 Mon Sep 17 00:00:00 2001 From: Adrien Piquerez Date: Wed, 3 Nov 2021 10:21:02 +0100 Subject: [PATCH] Fix #6698: Synchronize all RPC messages with systemOut notifications `systemOut` notifications are buffered so that they are sent at most once every 20 millisecond. Other RPC messages are not buffered. As a consequence, some RPC messages can pass in front of some systemOut notifications. That's why `sbt --client run` can exit before it receives all the logs. In general I think it is safer to maintain the order of all messages. To do so we can force the flush of systemOut before each RPC message. --- .../sbt/internal/server/NetworkChannel.scala | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/main/src/main/scala/sbt/internal/server/NetworkChannel.scala b/main/src/main/scala/sbt/internal/server/NetworkChannel.scala index 8f647f98d..0acab0974 100644 --- a/main/src/main/scala/sbt/internal/server/NetworkChannel.scala +++ b/main/src/main/scala/sbt/internal/server/NetworkChannel.scala @@ -602,6 +602,7 @@ final class NetworkChannel( execId: String, err: JsonRpcResponseError ): Unit = { + forceFlush() val m = JsonRpcResponseMessage("2.0", execId, None, Option(err)) val bytes = Serialization.serializeResponseMessage(m) publishBytes(bytes) @@ -611,13 +612,17 @@ final class NetworkChannel( private[sbt] def jsonRpcNotify[A: JsonFormat](method: String, params: A): Unit = { val m = JsonRpcNotificationMessage("2.0", method, Option(Converter.toJson[A](params).get)) - if (method != Serialization.systemOut) log.debug(s"jsonRpcNotify: $m") + if (method != Serialization.systemOut) { + forceFlush() + log.debug(s"jsonRpcNotify: $m") + } val bytes = Serialization.serializeNotificationMessage(m) publishBytes(bytes) } /** Notify to Language Server's client. */ private[sbt] def jsonRpcRequest[A: JsonFormat](id: String, method: String, params: A): Unit = { + forceFlush() val m = JsonRpcRequestMessage("2.0", id, method, Option(Converter.toJson[A](params).get)) log.debug(s"jsonRpcRequest: $m") @@ -654,7 +659,13 @@ final class NetworkChannel( import scala.collection.JavaConverters._ private[this] val outputBuffer = new LinkedBlockingQueue[Byte] - private[this] val flushFuture = new AtomicReference[java.util.concurrent.Future[_]] + private[this] val flushExecutor = Executors.newSingleThreadScheduledExecutor( + r => new Thread(r, s"$name-output-buffer-timer-thread") + ) + private[this] def forceFlush() = { + Util.ignoreResult(flushExecutor.shutdownNow()) + doFlush() + } private[this] def doFlush()() = { val list = new java.util.ArrayList[Byte] outputBuffer.synchronized(outputBuffer.drainTo(list)) @@ -673,13 +684,9 @@ final class NetworkChannel( * probably long enough to catch each burst but short enough to not introduce * noticeable latency. */ - private[this] val executor = - Executors.newSingleThreadScheduledExecutor( - r => new Thread(r, s"$name-output-buffer-timer-thread") - ) + private[this] val flushFuture = new AtomicReference[java.util.concurrent.Future[_]] override def close(): Unit = { - Util.ignoreResult(executor.shutdownNow()) - doFlush() + forceFlush() } override def write(b: Int): Unit = outputBuffer.synchronized { outputBuffer.put(b.toByte) @@ -689,7 +696,7 @@ final class NetworkChannel( case null => try { flushFuture.set( - executor.schedule( + flushExecutor.schedule( (() => { flushFuture.set(null) doFlush()