Handle closed executor in network channel flush

I noticed some RejectedExecutionExceptions in travis failures of
ClientTest. This could happen if we try to write output to the
network channel after it has been closed. To avoid this problem, we can
catch RejectedExecutionExceptions and do an immediate flush if the
executor has been shutdown.
This commit is contained in:
Ethan Atkins 2020-07-19 10:05:50 -07:00
parent 8b73d2bc2a
commit 4acf3d14c2
1 changed files with 27 additions and 13 deletions

View File

@ -12,7 +12,13 @@ package server
import java.io.{ IOException, InputStream, OutputStream }
import java.net.{ Socket, SocketTimeoutException }
import java.nio.channels.ClosedChannelException
import java.util.concurrent.{ ConcurrentHashMap, Executors, LinkedBlockingQueue, TimeUnit }
import java.util.concurrent.{
ConcurrentHashMap,
Executors,
LinkedBlockingQueue,
RejectedExecutionException,
TimeUnit
}
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference }
import sbt.BasicCommandStrings.{ Shutdown, TerminateAction }
@ -666,25 +672,33 @@ final class NetworkChannel(
)
private[this] val buffer = new LinkedBlockingQueue[Byte]
private[this] val future = new AtomicReference[java.util.concurrent.Future[_]]
override def close(): Unit = Util.ignoreResult(executor.shutdownNow())
private[this] def doFlush()() = {
val list = new java.util.ArrayList[Byte]
buffer.synchronized(buffer.drainTo(list))
if (!list.isEmpty) jsonRpcNotify(Serialization.systemOut, list.asScala.toSeq)
}
override def close(): Unit = {
Util.ignoreResult(executor.shutdownNow())
doFlush()
}
override def write(b: Int): Unit = buffer.synchronized {
buffer.put(b.toByte)
}
override def flush(): Unit = {
future.get match {
case null =>
future.set(
executor.schedule(
(() => {
future.set(null)
val list = new java.util.ArrayList[Byte]
buffer.synchronized(buffer.drainTo(list))
jsonRpcNotify(Serialization.systemOut, list.asScala.toSeq)
}): Runnable,
20,
TimeUnit.MILLISECONDS
try {
future.set(
executor.schedule(
(() => {
future.set(null)
doFlush()
}): Runnable,
20,
TimeUnit.MILLISECONDS
)
)
)
} catch { case _: RejectedExecutionException => doFlush() }
case f =>
}
}