Merge pull request #5687 from eatkins/rejected-execution

Handle closed executor in network channel flush
This commit is contained in:
eugene yokota 2020-07-19 13:47:15 -04:00 committed by GitHub
commit b678d2115f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 27 additions and 13 deletions

View File

@ -12,7 +12,13 @@ package server
import java.io.{ IOException, InputStream, OutputStream }
import java.net.{ Socket, SocketTimeoutException }
import java.nio.channels.ClosedChannelException
import java.util.concurrent.{ ConcurrentHashMap, Executors, LinkedBlockingQueue, TimeUnit }
import java.util.concurrent.{
ConcurrentHashMap,
Executors,
LinkedBlockingQueue,
RejectedExecutionException,
TimeUnit
}
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference }
import sbt.BasicCommandStrings.{ Shutdown, TerminateAction }
@ -666,25 +672,33 @@ final class NetworkChannel(
)
private[this] val buffer = new LinkedBlockingQueue[Byte]
private[this] val future = new AtomicReference[java.util.concurrent.Future[_]]
override def close(): Unit = Util.ignoreResult(executor.shutdownNow())
private[this] def doFlush()() = {
val list = new java.util.ArrayList[Byte]
buffer.synchronized(buffer.drainTo(list))
if (!list.isEmpty) jsonRpcNotify(Serialization.systemOut, list.asScala.toSeq)
}
override def close(): Unit = {
Util.ignoreResult(executor.shutdownNow())
doFlush()
}
override def write(b: Int): Unit = buffer.synchronized {
buffer.put(b.toByte)
}
override def flush(): Unit = {
future.get match {
case null =>
future.set(
executor.schedule(
(() => {
future.set(null)
val list = new java.util.ArrayList[Byte]
buffer.synchronized(buffer.drainTo(list))
jsonRpcNotify(Serialization.systemOut, list.asScala.toSeq)
}): Runnable,
20,
TimeUnit.MILLISECONDS
try {
future.set(
executor.schedule(
(() => {
future.set(null)
doFlush()
}): Runnable,
20,
TimeUnit.MILLISECONDS
)
)
)
} catch { case _: RejectedExecutionException => doFlush() }
case f =>
}
}