Reduce idle cpu usage

I noticed that sbt 1.3.0 was using more cpu when idling (either at the
shell or while waiting for file events) than 1.2.8. This was because I'd
reduced a number of timeouts to 2 milliseconds which was causing a
thread to keep waking up every 2 milliseconds to poll a queue. I thought
that this was cheaper than it actually is and drove the cpu utilization
to O(10%) of a cpu on my mac.

To address this, I consolidated a number of queues into a single queue
in CommandExchange and Continuous. In the CommandExchange case, I
reworked CommandChannel to have a register method that passes in a Queue
of CommandChannels. Whenever it appends an exec, it adds itself to the
queue. CommandExchange can then poll that queue directly and poll the
returned CommandChannel for the actual exec. Since the main thread is
blocking on this queue, it does not need to frequently wake up and can
just poll more or less indefinitely until a message is received. This
also reduces average latency compared to older versions of sbt since
messages will be processed almost as soon as they are received.

The continuous case is slightly more complicated because we are polling
from two sources, stdin and FileEventMonitor. In my ideal world, I'd
have a reactive api for both of those sources and they would just write
events to a shared queue that we could block on. That is nontrivial to
implement, so instead I consolidated the FileEventMonitor instances into
a single FileEventMonitor. Since there is now only one FileEventMonitor
queue, we can block on that queue for 30 milliseconds and the poll
stdin. This reduces cpu utilization to O(2%) on my machine while still
having reasonably low latency for key input events (the latency of file
events should be close to zero since we are usually polling the
FileEventMonitor queue when waiting).

I actually had a TODO about the FileEventMonitor change that this
resolves.
This commit is contained in:
Ethan Atkins 2019-05-30 11:50:03 -07:00
parent 73ebe34f71
commit 6f7a824478
3 changed files with 59 additions and 45 deletions

View File

@ -9,6 +9,7 @@ package sbt
package internal
import java.util.concurrent.ConcurrentLinkedQueue
import sbt.protocol.EventMessage
import sjsonnew.JsonFormat
@ -19,7 +20,19 @@ import sjsonnew.JsonFormat
*/
abstract class CommandChannel {
private val commandQueue: ConcurrentLinkedQueue[Exec] = new ConcurrentLinkedQueue()
def append(exec: Exec): Boolean = commandQueue.add(exec)
private val registered: java.util.Set[java.util.Queue[CommandChannel]] = new java.util.HashSet
private[sbt] final def register(queue: java.util.Queue[CommandChannel]): Unit = {
registered.add(queue)
()
}
private[sbt] final def unregister(queue: java.util.Queue[CommandChannel]): Unit = {
registered.remove(queue)
()
}
def append(exec: Exec): Boolean = {
registered.forEach(q => q.synchronized { if (!q.contains(this)) q.add(this); () })
commandQueue.add(exec)
}
def poll: Option[Exec] = Option(commandQueue.poll)
def publishEvent[A: JsonFormat](event: A, execId: Option[String]): Unit

View File

@ -10,7 +10,7 @@ package internal
import java.io.IOException
import java.net.Socket
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.{ ConcurrentLinkedQueue, LinkedBlockingQueue, TimeUnit }
import java.util.concurrent.atomic._
import sbt.BasicKeys._
@ -47,11 +47,15 @@ private[sbt] final class CommandExchange {
private val commandQueue: ConcurrentLinkedQueue[Exec] = new ConcurrentLinkedQueue()
private val channelBuffer: ListBuffer[CommandChannel] = new ListBuffer()
private val channelBufferLock = new AnyRef {}
private val commandChannelQueue = new LinkedBlockingQueue[CommandChannel]
private val nextChannelId: AtomicInteger = new AtomicInteger(0)
private lazy val jsonFormat = new sjsonnew.BasicJsonProtocol with JValueFormats {}
def channels: List[CommandChannel] = channelBuffer.toList
def subscribe(c: CommandChannel): Unit = channelBufferLock.synchronized(channelBuffer.append(c))
def subscribe(c: CommandChannel): Unit = channelBufferLock.synchronized {
channelBuffer.append(c)
c.register(commandChannelQueue)
}
def blockUntilNextExec: Exec = blockUntilNextExec(Duration.Inf, NullLogger)
// periodically move all messages from all the channels
@ -64,11 +68,11 @@ private[sbt] final class CommandExchange {
commandQueue.add(x)
slurpMessages()
}
commandChannelQueue.poll(1, TimeUnit.SECONDS)
slurpMessages()
Option(commandQueue.poll) match {
case Some(x) => x
case None =>
Thread.sleep(2)
val newDeadline = if (deadline.fold(false)(_.isOverdue())) {
GCUtil.forceGcWithInterval(interval, logger)
None

View File

@ -608,51 +608,48 @@ private[sbt] object Continuous extends DeprecatedContinuous {
override def debug(msg: Any): Unit = l.debug(msg.toString)
}
// TODO make this a normal monitor
private[this] val monitors: Seq[FileEventMonitor[Event]] =
private[this] val observers: Observers[Event] = new Observers
private[this] val repo = getRepository(state)
private[this] val handle = repo.addObserver(observers)
private[this] val eventMonitorObservers = new Observers[Event]
private[this] val delegateHandles: Seq[AutoCloseable] =
configs.map { config =>
// Create a logger with a scoped key prefix so that we can tell from which
// monitor events occurred.
FileEventMonitor.antiEntropy(
new Observable[Event] {
private[this] val repo = getRepository(state)
private[this] val observers = new Observers[Event] {
override def onNext(t: Event): Unit =
if (config.inputs().exists(_.glob.matches(t.path))) super.onNext(t)
}
private[this] val handle = repo.addObserver(observers)
override def addObserver(observer: Observer[Event]): AutoCloseable =
observers.addObserver(observer)
override def close(): Unit = {
handle.close()
observers.close()
}
},
config.watchSettings.antiEntropy,
logger.withPrefix(config.key.show),
config.watchSettings.deletionQuarantinePeriod,
config.watchSettings.antiEntropyRetentionPeriod
)
} ++ (if (trackMetaBuild) {
val antiEntropy = configs.map(_.watchSettings.antiEntropy).max
val repo = getRepository(state)
buildGlobs.foreach(repo.register)
FileEventMonitor.antiEntropy(
repo,
antiEntropy,
logger.withPrefix("meta-build"),
quarantinePeriod,
retentionPeriod
) :: Nil
} else Nil)
// Create a logger with a scoped key prefix so that we can tell from which task there
// were inputs that matched the event path.
val configLogger = logger.withPrefix(config.key.show)
observers.addObserver { e =>
if (config.inputs().exists(_.glob.matches(e.path))) {
configLogger.debug(s"Accepted event for ${e.path}")
eventMonitorObservers.onNext(e)
}
}
}
if (trackMetaBuild) {
buildGlobs.foreach(repo.register)
val metaLogger = logger.withPrefix("meta-build")
observers.addObserver { e =>
if (buildGlobs.exists(_.matches(e.path))) {
metaLogger.debug(s"Accepted event for ${e.path}")
eventMonitorObservers.onNext(e)
}
}
}
private[this] val monitor = FileEventMonitor.antiEntropy(
eventMonitorObservers,
configs.map(_.watchSettings.antiEntropy).max,
logger,
quarantinePeriod,
retentionPeriod
)
override def poll(duration: Duration, filter: Event => Boolean): Seq[Event] = {
val res = monitors.flatMap(_.poll(0.millis, filter)).toSet.toVector
if (res.isEmpty) Thread.sleep(duration.toMillis)
res
monitor.poll(duration, filter)
}
override def close(): Unit = monitors.foreach(_.close())
override def close(): Unit = {
delegateHandles.foreach(_.close())
handle.close()
}
}
val watchLogger: WatchLogger = msg => logger.debug(msg.toString)
val antiEntropy = configs.map(_.watchSettings.antiEntropy).max
@ -681,7 +678,7 @@ private[sbt] object Continuous extends DeprecatedContinuous {
}
(() => {
val actions = antiEntropyMonitor.poll(2.milliseconds).flatMap(onEvent)
val actions = antiEntropyMonitor.poll(30.milliseconds).flatMap(onEvent)
if (actions.exists(_._2 != Watch.Ignore)) {
val builder = new StringBuilder
val min = actions.minBy {