Fix #6698: Synchronize all RPC messages with systemOut notifications

`systemOut` notifications are buffered so that they are sent at most
once every 20 millisecond. Other RPC messages are not buffered.
As a consequence, some RPC messages can pass in front of some
systemOut notifications.
That's why `sbt --client run` can exit before it receives all the logs.

In general I think it is safer to maintain the order of all messages.
To do so we can force the flush of systemOut before each RPC message.
This commit is contained in:
Adrien Piquerez 2021-11-03 10:21:02 +01:00
parent 530505d063
commit c77d6dfb38
1 changed files with 16 additions and 9 deletions

View File

@ -602,6 +602,7 @@ final class NetworkChannel(
execId: String,
err: JsonRpcResponseError
): Unit = {
forceFlush()
val m = JsonRpcResponseMessage("2.0", execId, None, Option(err))
val bytes = Serialization.serializeResponseMessage(m)
publishBytes(bytes)
@ -611,13 +612,17 @@ final class NetworkChannel(
private[sbt] def jsonRpcNotify[A: JsonFormat](method: String, params: A): Unit = {
val m =
JsonRpcNotificationMessage("2.0", method, Option(Converter.toJson[A](params).get))
if (method != Serialization.systemOut) log.debug(s"jsonRpcNotify: $m")
if (method != Serialization.systemOut) {
forceFlush()
log.debug(s"jsonRpcNotify: $m")
}
val bytes = Serialization.serializeNotificationMessage(m)
publishBytes(bytes)
}
/** Notify to Language Server's client. */
private[sbt] def jsonRpcRequest[A: JsonFormat](id: String, method: String, params: A): Unit = {
forceFlush()
val m =
JsonRpcRequestMessage("2.0", id, method, Option(Converter.toJson[A](params).get))
log.debug(s"jsonRpcRequest: $m")
@ -654,7 +659,13 @@ final class NetworkChannel(
import scala.collection.JavaConverters._
private[this] val outputBuffer = new LinkedBlockingQueue[Byte]
private[this] val flushFuture = new AtomicReference[java.util.concurrent.Future[_]]
private[this] val flushExecutor = Executors.newSingleThreadScheduledExecutor(
r => new Thread(r, s"$name-output-buffer-timer-thread")
)
private[this] def forceFlush() = {
Util.ignoreResult(flushExecutor.shutdownNow())
doFlush()
}
private[this] def doFlush()() = {
val list = new java.util.ArrayList[Byte]
outputBuffer.synchronized(outputBuffer.drainTo(list))
@ -673,13 +684,9 @@ final class NetworkChannel(
* probably long enough to catch each burst but short enough to not introduce
* noticeable latency.
*/
private[this] val executor =
Executors.newSingleThreadScheduledExecutor(
r => new Thread(r, s"$name-output-buffer-timer-thread")
)
private[this] val flushFuture = new AtomicReference[java.util.concurrent.Future[_]]
override def close(): Unit = {
Util.ignoreResult(executor.shutdownNow())
doFlush()
forceFlush()
}
override def write(b: Int): Unit = outputBuffer.synchronized {
outputBuffer.put(b.toByte)
@ -689,7 +696,7 @@ final class NetworkChannel(
case null =>
try {
flushFuture.set(
executor.schedule(
flushExecutor.schedule(
(() => {
flushFuture.set(null)
doFlush()