Merge pull request #4750 from eatkins/idle

Consolidate CommandChannel queues  (Reduce idle cpu usage)
This commit is contained in:
eugene yokota 2019-05-31 17:34:10 -04:00 committed by GitHub
commit b7fb0ca2a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 115 additions and 102 deletions

View File

@ -9,6 +9,7 @@ package sbt
package internal
import java.util.concurrent.ConcurrentLinkedQueue
import sbt.protocol.EventMessage
import sjsonnew.JsonFormat
@ -19,7 +20,19 @@ import sjsonnew.JsonFormat
*/
abstract class CommandChannel {
private val commandQueue: ConcurrentLinkedQueue[Exec] = new ConcurrentLinkedQueue()
def append(exec: Exec): Boolean = commandQueue.add(exec)
private val registered: java.util.Set[java.util.Queue[CommandChannel]] = new java.util.HashSet
private[sbt] final def register(queue: java.util.Queue[CommandChannel]): Unit = {
registered.add(queue)
()
}
private[sbt] final def unregister(queue: java.util.Queue[CommandChannel]): Unit = {
registered.remove(queue)
()
}
def append(exec: Exec): Boolean = {
registered.forEach(q => q.synchronized { if (!q.contains(this)) q.add(this); () })
commandQueue.add(exec)
}
def poll: Option[Exec] = Option(commandQueue.poll)
def publishEvent[A: JsonFormat](event: A, execId: Option[String]): Unit

View File

@ -10,7 +10,7 @@ package internal
import java.io.IOException
import java.net.Socket
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.{ ConcurrentLinkedQueue, LinkedBlockingQueue, TimeUnit }
import java.util.concurrent.atomic._
import sbt.BasicKeys._
@ -47,11 +47,15 @@ private[sbt] final class CommandExchange {
private val commandQueue: ConcurrentLinkedQueue[Exec] = new ConcurrentLinkedQueue()
private val channelBuffer: ListBuffer[CommandChannel] = new ListBuffer()
private val channelBufferLock = new AnyRef {}
private val commandChannelQueue = new LinkedBlockingQueue[CommandChannel]
private val nextChannelId: AtomicInteger = new AtomicInteger(0)
private lazy val jsonFormat = new sjsonnew.BasicJsonProtocol with JValueFormats {}
def channels: List[CommandChannel] = channelBuffer.toList
def subscribe(c: CommandChannel): Unit = channelBufferLock.synchronized(channelBuffer.append(c))
def subscribe(c: CommandChannel): Unit = channelBufferLock.synchronized {
channelBuffer.append(c)
c.register(commandChannelQueue)
}
def blockUntilNextExec: Exec = blockUntilNextExec(Duration.Inf, NullLogger)
// periodically move all messages from all the channels
@ -64,11 +68,11 @@ private[sbt] final class CommandExchange {
commandQueue.add(x)
slurpMessages()
}
commandChannelQueue.poll(1, TimeUnit.SECONDS)
slurpMessages()
Option(commandQueue.poll) match {
case Some(x) => x
case None =>
Thread.sleep(2)
val newDeadline = if (deadline.fold(false)(_.isOverdue())) {
GCUtil.forceGcWithInterval(interval, logger)
None

View File

@ -608,51 +608,48 @@ private[sbt] object Continuous extends DeprecatedContinuous {
override def debug(msg: Any): Unit = l.debug(msg.toString)
}
// TODO make this a normal monitor
private[this] val monitors: Seq[FileEventMonitor[Event]] =
private[this] val observers: Observers[Event] = new Observers
private[this] val repo = getRepository(state)
private[this] val handle = repo.addObserver(observers)
private[this] val eventMonitorObservers = new Observers[Event]
private[this] val delegateHandles: Seq[AutoCloseable] =
configs.map { config =>
// Create a logger with a scoped key prefix so that we can tell from which
// monitor events occurred.
FileEventMonitor.antiEntropy(
new Observable[Event] {
private[this] val repo = getRepository(state)
private[this] val observers = new Observers[Event] {
override def onNext(t: Event): Unit =
if (config.inputs().exists(_.glob.matches(t.path))) super.onNext(t)
}
private[this] val handle = repo.addObserver(observers)
override def addObserver(observer: Observer[Event]): AutoCloseable =
observers.addObserver(observer)
override def close(): Unit = {
handle.close()
observers.close()
}
},
config.watchSettings.antiEntropy,
logger.withPrefix(config.key.show),
config.watchSettings.deletionQuarantinePeriod,
config.watchSettings.antiEntropyRetentionPeriod
)
} ++ (if (trackMetaBuild) {
val antiEntropy = configs.map(_.watchSettings.antiEntropy).max
val repo = getRepository(state)
buildGlobs.foreach(repo.register)
FileEventMonitor.antiEntropy(
repo,
antiEntropy,
logger.withPrefix("meta-build"),
quarantinePeriod,
retentionPeriod
) :: Nil
} else Nil)
// Create a logger with a scoped key prefix so that we can tell from which task there
// were inputs that matched the event path.
val configLogger = logger.withPrefix(config.key.show)
observers.addObserver { e =>
if (config.inputs().exists(_.glob.matches(e.path))) {
configLogger.debug(s"Accepted event for ${e.path}")
eventMonitorObservers.onNext(e)
}
}
}
if (trackMetaBuild) {
buildGlobs.foreach(repo.register)
val metaLogger = logger.withPrefix("meta-build")
observers.addObserver { e =>
if (buildGlobs.exists(_.matches(e.path))) {
metaLogger.debug(s"Accepted event for ${e.path}")
eventMonitorObservers.onNext(e)
}
}
}
private[this] val monitor = FileEventMonitor.antiEntropy(
eventMonitorObservers,
configs.map(_.watchSettings.antiEntropy).max,
logger,
quarantinePeriod,
retentionPeriod
)
override def poll(duration: Duration, filter: Event => Boolean): Seq[Event] = {
val res = monitors.flatMap(_.poll(0.millis, filter)).toSet.toVector
if (res.isEmpty) Thread.sleep(duration.toMillis)
res
monitor.poll(duration, filter)
}
override def close(): Unit = monitors.foreach(_.close())
override def close(): Unit = {
delegateHandles.foreach(_.close())
handle.close()
}
}
val watchLogger: WatchLogger = msg => logger.debug(msg.toString)
val antiEntropy = configs.map(_.watchSettings.antiEntropy).max
@ -681,7 +678,7 @@ private[sbt] object Continuous extends DeprecatedContinuous {
}
(() => {
val actions = antiEntropyMonitor.poll(2.milliseconds).flatMap(onEvent)
val actions = antiEntropyMonitor.poll(30.milliseconds).flatMap(onEvent)
if (actions.exists(_._2 != Watch.Ignore)) {
val builder = new StringBuilder
val min = actions.minBy {

View File

@ -7,18 +7,19 @@
package testpkg
import java.io.{ File, IOException }
import org.scalatest._
import scala.concurrent._
import sbt.protocol.ClientSocket
import scala.util.Try
import TestServer.withTestServer
import java.io.File
import sbt.io.syntax._
import sbt.io.IO
import sbt.RunFromSourceMain
import scala.util.Try
import scala.concurrent.ExecutionContext
import java.util.concurrent.ForkJoinPool
import sbt.io.IO
import sbt.io.syntax._
import sbt.protocol.ClientSocket
import testpkg.TestServer.withTestServer
import scala.annotation.tailrec
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.{ Success, Try }
class ServerSpec extends fixture.AsyncFreeSpec with fixture.AsyncTestDataFixture with Matchers {
"server" - {
@ -27,7 +28,7 @@ class ServerSpec extends fixture.AsyncFreeSpec with fixture.AsyncTestDataFixture
p.sendJsonRpc(
"""{ "jsonrpc": "2.0", "id": "3", "method": "sbt/setting", "params": { "setting": "root/name" } }"""
)
assert(p.waitForString(10) { s =>
assert(p.waitForString(10.seconds) { s =>
s contains """"id":"3""""
})
}
@ -38,7 +39,7 @@ class ServerSpec extends fixture.AsyncFreeSpec with fixture.AsyncTestDataFixture
p.sendJsonRpc(
"""{ "jsonrpc": "2.0", "id": 3, "method": "sbt/setting", "params": { "setting": "root/name" } }"""
)
assert(p.waitForString(10) { s =>
assert(p.waitForString(10.seconds) { s =>
s contains """"id":3"""
})
}
@ -49,7 +50,7 @@ class ServerSpec extends fixture.AsyncFreeSpec with fixture.AsyncTestDataFixture
p.sendJsonRpc(
"""{ "jsonrpc": "2.0", "id": 11, "method": "sbt/exec", "params": { "commandLine": "hello" } }"""
)
assert(p.waitForString(10) { s =>
assert(p.waitForString(10.seconds) { s =>
(s contains """"id":11""") && (s contains """"error":""")
})
}
@ -64,7 +65,7 @@ class ServerSpec extends fixture.AsyncFreeSpec with fixture.AsyncTestDataFixture
"""{ "jsonrpc": "2.0", "id":13, "method": "sbt/cancelRequest", "params": { "id": "55" } }"""
)
assert(p.waitForString(20) { s =>
assert(p.waitForString(20.seconds) { s =>
(s contains """"error":{"code":-32800""")
})
}
@ -76,7 +77,7 @@ class ServerSpec extends fixture.AsyncFreeSpec with fixture.AsyncTestDataFixture
"""{ "jsonrpc": "2.0", "id":12, "method": "sbt/exec", "params": { "commandLine": "run" } }"""
)
assert(p.waitForString(60) { s =>
assert(p.waitForString(1.minute) { s =>
p.sendJsonRpc(
"""{ "jsonrpc": "2.0", "id":13, "method": "sbt/cancelRequest", "params": { "id": "12" } }"""
)
@ -91,7 +92,7 @@ class ServerSpec extends fixture.AsyncFreeSpec with fixture.AsyncTestDataFixture
"""{ "jsonrpc": "2.0", "id": "foo", "method": "sbt/exec", "params": { "commandLine": "run" } }"""
)
assert(p.waitForString(60) { s =>
assert(p.waitForString(1.minute) { s =>
p.sendJsonRpc(
"""{ "jsonrpc": "2.0", "id": "bar", "method": "sbt/cancelRequest", "params": { "id": "foo" } }"""
)
@ -107,7 +108,7 @@ class ServerSpec extends fixture.AsyncFreeSpec with fixture.AsyncTestDataFixture
s"""{ "jsonrpc": "2.0", "id": 15, "method": "sbt/completion", "params": $completionStr }"""
)
assert(p.waitForString(10) { s =>
assert(p.waitForString(10.seconds) { s =>
s contains """"result":{"items":["""
})
}
@ -120,7 +121,7 @@ class ServerSpec extends fixture.AsyncFreeSpec with fixture.AsyncTestDataFixture
s"""{ "jsonrpc": "2.0", "id": 15, "method": "sbt/completion", "params": $completionStr }"""
)
assert(p.waitForString(10) { s =>
assert(p.waitForString(10.seconds) { s =>
s contains """"result":{"items":["hello"]}"""
})
}
@ -133,7 +134,7 @@ class ServerSpec extends fixture.AsyncFreeSpec with fixture.AsyncTestDataFixture
s"""{ "jsonrpc": "2.0", "id": 15, "method": "sbt/completion", "params": $completionStr }"""
)
assert(p.waitForString(10) { s =>
assert(p.waitForString(10.seconds) { s =>
s contains """"result":{"items":["testOnly org.sbt.ExampleSpec"]}"""
})
}
@ -159,20 +160,19 @@ object TestServer {
f: TestServer => Future[Assertion]
)(implicit td: TestData): Future[Assertion] = {
// Each test server instance will be executed in a Thread pool separated from the tests
val testServer = TestServer(baseDirectory)(
ExecutionContext.fromExecutor(new ForkJoinPool())
)
val testServer = TestServer(baseDirectory)
// checking last log message after initialization
// if something goes wrong here the communication streams are corrupted, restarting
val init =
Try {
testServer.waitForString(30) { s =>
testServer.waitForString(30.seconds) { s =>
println(s)
s contains """"message":"Done""""
}
}.toOption
}
init match {
case Some(_) =>
case Success(_) =>
try {
f(testServer)
} finally {
@ -194,7 +194,7 @@ object TestServer {
}
}
case class TestServer(baseDirectory: File)(implicit ec: ExecutionContext) {
case class TestServer(baseDirectory: File) {
import TestServer.hostLog
val readBuffer = new Array[Byte](40960)
@ -204,27 +204,28 @@ case class TestServer(baseDirectory: File)(implicit ec: ExecutionContext) {
private val RetByte = '\r'.toByte
hostLog("fork to a new sbt instance")
val process =
Future {
RunFromSourceMain.fork(baseDirectory)
}
val process = RunFromSourceMain.fork(baseDirectory)
lazy val portfile = baseDirectory / "project" / "target" / "active.json"
hostLog("wait 30s until the server is ready to respond")
def waitForPortfile(n: Int): Unit =
if (portfile.exists) ()
else {
if (n <= 0) sys.error(s"Timeout. $portfile is not found.")
else {
Thread.sleep(1000)
if ((n - 1) % 10 == 0) {
hostLog("waiting for the server...")
}
waitForPortfile(n - 1)
def portfileIsEmpty(): Boolean =
try IO.read(portfile).isEmpty
catch { case _: IOException => true }
def waitForPortfile(duration: FiniteDuration): Unit = {
val deadline = duration.fromNow
var nextLog = 10.seconds.fromNow
while (portfileIsEmpty && !deadline.isOverdue && process.isAlive) {
if (nextLog.isOverdue) {
hostLog("waiting for the server...")
nextLog = 10.seconds.fromNow
}
}
waitForPortfile(90)
if (deadline.isOverdue) sys.error(s"Timeout. $portfile is not found.")
if (!process.isAlive) sys.error(s"Server unexpectedly terminated.")
}
private val waitDuration: FiniteDuration = 120.seconds
hostLog(s"wait $waitDuration until the server is ready to respond")
waitForPortfile(90.seconds)
// make connection to the socket described in the portfile
val (sk, tkn) = ClientSocket.socket(portfile)
@ -245,11 +246,12 @@ case class TestServer(baseDirectory: File)(implicit ec: ExecutionContext) {
sendJsonRpc(
"""{ "jsonrpc": "2.0", "id": 9, "method": "sbt/exec", "params": { "commandLine": "exit" } }"""
)
for {
p <- process
} {
p.destroy()
val deadline = 10.seconds.fromNow
while (!deadline.isOverdue && process.isAlive) {
Thread.sleep(10)
}
// We gave the server a chance to exit but it didn't within a reasonable time frame.
if (deadline.isOverdue) process.destroy()
}
def sendJsonRpc(message: String): Unit = {
@ -286,17 +288,14 @@ case class TestServer(baseDirectory: File)(implicit ec: ExecutionContext) {
readContentLength(l)
}
final def waitForString(num: Int)(f: String => Boolean): Boolean = {
val res = Future {
var done = false
while (!done) {
done = readFrame.fold(false)(f)
}
true
}(ec)
import scala.concurrent.duration._
Await.result(res, num.seconds)
final def waitForString(duration: FiniteDuration)(f: String => Boolean): Boolean = {
val deadline = duration.fromNow
@tailrec
def impl(): Boolean = {
if (deadline.isOverdue) false
else readFrame.fold(false)(f) || impl
}
impl()
}
def readLine: Option[String] = {