diff --git a/main-command/src/main/scala/sbt/internal/CommandChannel.scala b/main-command/src/main/scala/sbt/internal/CommandChannel.scala index 2857c2d58..2d61c995c 100644 --- a/main-command/src/main/scala/sbt/internal/CommandChannel.scala +++ b/main-command/src/main/scala/sbt/internal/CommandChannel.scala @@ -9,6 +9,7 @@ package sbt package internal import java.util.concurrent.ConcurrentLinkedQueue + import sbt.protocol.EventMessage import sjsonnew.JsonFormat @@ -19,7 +20,19 @@ import sjsonnew.JsonFormat */ abstract class CommandChannel { private val commandQueue: ConcurrentLinkedQueue[Exec] = new ConcurrentLinkedQueue() - def append(exec: Exec): Boolean = commandQueue.add(exec) + private val registered: java.util.Set[java.util.Queue[CommandChannel]] = new java.util.HashSet + private[sbt] final def register(queue: java.util.Queue[CommandChannel]): Unit = { + registered.add(queue) + () + } + private[sbt] final def unregister(queue: java.util.Queue[CommandChannel]): Unit = { + registered.remove(queue) + () + } + def append(exec: Exec): Boolean = { + registered.forEach(q => q.synchronized { if (!q.contains(this)) q.add(this); () }) + commandQueue.add(exec) + } def poll: Option[Exec] = Option(commandQueue.poll) def publishEvent[A: JsonFormat](event: A, execId: Option[String]): Unit diff --git a/main/src/main/scala/sbt/internal/CommandExchange.scala b/main/src/main/scala/sbt/internal/CommandExchange.scala index 96b4ae4a1..25a11c153 100644 --- a/main/src/main/scala/sbt/internal/CommandExchange.scala +++ b/main/src/main/scala/sbt/internal/CommandExchange.scala @@ -10,7 +10,7 @@ package internal import java.io.IOException import java.net.Socket -import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.{ ConcurrentLinkedQueue, LinkedBlockingQueue, TimeUnit } import java.util.concurrent.atomic._ import sbt.BasicKeys._ @@ -47,11 +47,15 @@ private[sbt] final class CommandExchange { private val commandQueue: ConcurrentLinkedQueue[Exec] = new ConcurrentLinkedQueue() private val channelBuffer: ListBuffer[CommandChannel] = new ListBuffer() private val channelBufferLock = new AnyRef {} + private val commandChannelQueue = new LinkedBlockingQueue[CommandChannel] private val nextChannelId: AtomicInteger = new AtomicInteger(0) private lazy val jsonFormat = new sjsonnew.BasicJsonProtocol with JValueFormats {} def channels: List[CommandChannel] = channelBuffer.toList - def subscribe(c: CommandChannel): Unit = channelBufferLock.synchronized(channelBuffer.append(c)) + def subscribe(c: CommandChannel): Unit = channelBufferLock.synchronized { + channelBuffer.append(c) + c.register(commandChannelQueue) + } def blockUntilNextExec: Exec = blockUntilNextExec(Duration.Inf, NullLogger) // periodically move all messages from all the channels @@ -64,11 +68,11 @@ private[sbt] final class CommandExchange { commandQueue.add(x) slurpMessages() } + commandChannelQueue.poll(1, TimeUnit.SECONDS) slurpMessages() Option(commandQueue.poll) match { case Some(x) => x case None => - Thread.sleep(2) val newDeadline = if (deadline.fold(false)(_.isOverdue())) { GCUtil.forceGcWithInterval(interval, logger) None diff --git a/main/src/main/scala/sbt/internal/Continuous.scala b/main/src/main/scala/sbt/internal/Continuous.scala index 41c59ba1e..a05b56ad6 100644 --- a/main/src/main/scala/sbt/internal/Continuous.scala +++ b/main/src/main/scala/sbt/internal/Continuous.scala @@ -608,51 +608,48 @@ private[sbt] object Continuous extends DeprecatedContinuous { override def debug(msg: Any): Unit = l.debug(msg.toString) } - // TODO make this a normal monitor - private[this] val monitors: Seq[FileEventMonitor[Event]] = + private[this] val observers: Observers[Event] = new Observers + private[this] val repo = getRepository(state) + private[this] val handle = repo.addObserver(observers) + private[this] val eventMonitorObservers = new Observers[Event] + private[this] val delegateHandles: Seq[AutoCloseable] = configs.map { config => - // Create a logger with a scoped key prefix so that we can tell from which - // monitor events occurred. - FileEventMonitor.antiEntropy( - new Observable[Event] { - private[this] val repo = getRepository(state) - private[this] val observers = new Observers[Event] { - override def onNext(t: Event): Unit = - if (config.inputs().exists(_.glob.matches(t.path))) super.onNext(t) - } - private[this] val handle = repo.addObserver(observers) - override def addObserver(observer: Observer[Event]): AutoCloseable = - observers.addObserver(observer) - override def close(): Unit = { - handle.close() - observers.close() - } - }, - config.watchSettings.antiEntropy, - logger.withPrefix(config.key.show), - config.watchSettings.deletionQuarantinePeriod, - config.watchSettings.antiEntropyRetentionPeriod - ) - } ++ (if (trackMetaBuild) { - val antiEntropy = configs.map(_.watchSettings.antiEntropy).max - val repo = getRepository(state) - buildGlobs.foreach(repo.register) - FileEventMonitor.antiEntropy( - repo, - antiEntropy, - logger.withPrefix("meta-build"), - quarantinePeriod, - retentionPeriod - ) :: Nil - } else Nil) + // Create a logger with a scoped key prefix so that we can tell from which task there + // were inputs that matched the event path. + val configLogger = logger.withPrefix(config.key.show) + observers.addObserver { e => + if (config.inputs().exists(_.glob.matches(e.path))) { + configLogger.debug(s"Accepted event for ${e.path}") + eventMonitorObservers.onNext(e) + } + } + } + if (trackMetaBuild) { + buildGlobs.foreach(repo.register) + val metaLogger = logger.withPrefix("meta-build") + observers.addObserver { e => + if (buildGlobs.exists(_.matches(e.path))) { + metaLogger.debug(s"Accepted event for ${e.path}") + eventMonitorObservers.onNext(e) + } + } + } + private[this] val monitor = FileEventMonitor.antiEntropy( + eventMonitorObservers, + configs.map(_.watchSettings.antiEntropy).max, + logger, + quarantinePeriod, + retentionPeriod + ) override def poll(duration: Duration, filter: Event => Boolean): Seq[Event] = { - val res = monitors.flatMap(_.poll(0.millis, filter)).toSet.toVector - if (res.isEmpty) Thread.sleep(duration.toMillis) - res + monitor.poll(duration, filter) } - override def close(): Unit = monitors.foreach(_.close()) + override def close(): Unit = { + delegateHandles.foreach(_.close()) + handle.close() + } } val watchLogger: WatchLogger = msg => logger.debug(msg.toString) val antiEntropy = configs.map(_.watchSettings.antiEntropy).max @@ -681,7 +678,7 @@ private[sbt] object Continuous extends DeprecatedContinuous { } (() => { - val actions = antiEntropyMonitor.poll(2.milliseconds).flatMap(onEvent) + val actions = antiEntropyMonitor.poll(30.milliseconds).flatMap(onEvent) if (actions.exists(_._2 != Watch.Ignore)) { val builder = new StringBuilder val min = actions.minBy {