diff --git a/main/src/main/scala/sbt/internal/server/NetworkChannel.scala b/main/src/main/scala/sbt/internal/server/NetworkChannel.scala index 8f647f98d..0acab0974 100644 --- a/main/src/main/scala/sbt/internal/server/NetworkChannel.scala +++ b/main/src/main/scala/sbt/internal/server/NetworkChannel.scala @@ -602,6 +602,7 @@ final class NetworkChannel( execId: String, err: JsonRpcResponseError ): Unit = { + forceFlush() val m = JsonRpcResponseMessage("2.0", execId, None, Option(err)) val bytes = Serialization.serializeResponseMessage(m) publishBytes(bytes) @@ -611,13 +612,17 @@ final class NetworkChannel( private[sbt] def jsonRpcNotify[A: JsonFormat](method: String, params: A): Unit = { val m = JsonRpcNotificationMessage("2.0", method, Option(Converter.toJson[A](params).get)) - if (method != Serialization.systemOut) log.debug(s"jsonRpcNotify: $m") + if (method != Serialization.systemOut) { + forceFlush() + log.debug(s"jsonRpcNotify: $m") + } val bytes = Serialization.serializeNotificationMessage(m) publishBytes(bytes) } /** Notify to Language Server's client. */ private[sbt] def jsonRpcRequest[A: JsonFormat](id: String, method: String, params: A): Unit = { + forceFlush() val m = JsonRpcRequestMessage("2.0", id, method, Option(Converter.toJson[A](params).get)) log.debug(s"jsonRpcRequest: $m") @@ -654,7 +659,13 @@ final class NetworkChannel( import scala.collection.JavaConverters._ private[this] val outputBuffer = new LinkedBlockingQueue[Byte] - private[this] val flushFuture = new AtomicReference[java.util.concurrent.Future[_]] + private[this] val flushExecutor = Executors.newSingleThreadScheduledExecutor( + r => new Thread(r, s"$name-output-buffer-timer-thread") + ) + private[this] def forceFlush() = { + Util.ignoreResult(flushExecutor.shutdownNow()) + doFlush() + } private[this] def doFlush()() = { val list = new java.util.ArrayList[Byte] outputBuffer.synchronized(outputBuffer.drainTo(list)) @@ -673,13 +684,9 @@ final class NetworkChannel( * probably long enough to catch each burst but short enough to not introduce * noticeable latency. */ - private[this] val executor = - Executors.newSingleThreadScheduledExecutor( - r => new Thread(r, s"$name-output-buffer-timer-thread") - ) + private[this] val flushFuture = new AtomicReference[java.util.concurrent.Future[_]] override def close(): Unit = { - Util.ignoreResult(executor.shutdownNow()) - doFlush() + forceFlush() } override def write(b: Int): Unit = outputBuffer.synchronized { outputBuffer.put(b.toByte) @@ -689,7 +696,7 @@ final class NetworkChannel( case null => try { flushFuture.set( - executor.schedule( + flushExecutor.schedule( (() => { flushFuture.set(null) doFlush()