diff --git a/main-command/src/main/scala/sbt/internal/CommandChannel.scala b/main-command/src/main/scala/sbt/internal/CommandChannel.scala index 2857c2d58..2d61c995c 100644 --- a/main-command/src/main/scala/sbt/internal/CommandChannel.scala +++ b/main-command/src/main/scala/sbt/internal/CommandChannel.scala @@ -9,6 +9,7 @@ package sbt package internal import java.util.concurrent.ConcurrentLinkedQueue + import sbt.protocol.EventMessage import sjsonnew.JsonFormat @@ -19,7 +20,19 @@ import sjsonnew.JsonFormat */ abstract class CommandChannel { private val commandQueue: ConcurrentLinkedQueue[Exec] = new ConcurrentLinkedQueue() - def append(exec: Exec): Boolean = commandQueue.add(exec) + private val registered: java.util.Set[java.util.Queue[CommandChannel]] = new java.util.HashSet + private[sbt] final def register(queue: java.util.Queue[CommandChannel]): Unit = { + registered.add(queue) + () + } + private[sbt] final def unregister(queue: java.util.Queue[CommandChannel]): Unit = { + registered.remove(queue) + () + } + def append(exec: Exec): Boolean = { + registered.forEach(q => q.synchronized { if (!q.contains(this)) q.add(this); () }) + commandQueue.add(exec) + } def poll: Option[Exec] = Option(commandQueue.poll) def publishEvent[A: JsonFormat](event: A, execId: Option[String]): Unit diff --git a/main/src/main/scala/sbt/internal/CommandExchange.scala b/main/src/main/scala/sbt/internal/CommandExchange.scala index 96b4ae4a1..25a11c153 100644 --- a/main/src/main/scala/sbt/internal/CommandExchange.scala +++ b/main/src/main/scala/sbt/internal/CommandExchange.scala @@ -10,7 +10,7 @@ package internal import java.io.IOException import java.net.Socket -import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.{ ConcurrentLinkedQueue, LinkedBlockingQueue, TimeUnit } import java.util.concurrent.atomic._ import sbt.BasicKeys._ @@ -47,11 +47,15 @@ private[sbt] final class CommandExchange { private val commandQueue: ConcurrentLinkedQueue[Exec] = new ConcurrentLinkedQueue() private val channelBuffer: ListBuffer[CommandChannel] = new ListBuffer() private val channelBufferLock = new AnyRef {} + private val commandChannelQueue = new LinkedBlockingQueue[CommandChannel] private val nextChannelId: AtomicInteger = new AtomicInteger(0) private lazy val jsonFormat = new sjsonnew.BasicJsonProtocol with JValueFormats {} def channels: List[CommandChannel] = channelBuffer.toList - def subscribe(c: CommandChannel): Unit = channelBufferLock.synchronized(channelBuffer.append(c)) + def subscribe(c: CommandChannel): Unit = channelBufferLock.synchronized { + channelBuffer.append(c) + c.register(commandChannelQueue) + } def blockUntilNextExec: Exec = blockUntilNextExec(Duration.Inf, NullLogger) // periodically move all messages from all the channels @@ -64,11 +68,11 @@ private[sbt] final class CommandExchange { commandQueue.add(x) slurpMessages() } + commandChannelQueue.poll(1, TimeUnit.SECONDS) slurpMessages() Option(commandQueue.poll) match { case Some(x) => x case None => - Thread.sleep(2) val newDeadline = if (deadline.fold(false)(_.isOverdue())) { GCUtil.forceGcWithInterval(interval, logger) None diff --git a/main/src/main/scala/sbt/internal/Continuous.scala b/main/src/main/scala/sbt/internal/Continuous.scala index 5c3d17e19..d87b7d710 100644 --- a/main/src/main/scala/sbt/internal/Continuous.scala +++ b/main/src/main/scala/sbt/internal/Continuous.scala @@ -611,51 +611,48 @@ private[sbt] object Continuous extends DeprecatedContinuous { override def debug(msg: Any): Unit = l.debug(msg.toString) } - // TODO make this a normal monitor - private[this] val monitors: Seq[FileEventMonitor[Event]] = + private[this] val observers: Observers[Event] = new Observers + private[this] val repo = getRepository(state) + private[this] val handle = repo.addObserver(observers) + private[this] val eventMonitorObservers = new Observers[Event] + private[this] val delegateHandles: Seq[AutoCloseable] = configs.map { config => - // Create a logger with a scoped key prefix so that we can tell from which - // monitor events occurred. - FileEventMonitor.antiEntropy( - new Observable[Event] { - private[this] val repo = getRepository(state) - private[this] val observers = new Observers[Event] { - override def onNext(t: Event): Unit = - if (config.inputs().exists(_.glob.matches(t.path))) super.onNext(t) - } - private[this] val handle = repo.addObserver(observers) - override def addObserver(observer: Observer[Event]): AutoCloseable = - observers.addObserver(observer) - override def close(): Unit = { - handle.close() - observers.close() - } - }, - config.watchSettings.antiEntropy, - logger.withPrefix(config.key.show), - config.watchSettings.deletionQuarantinePeriod, - config.watchSettings.antiEntropyRetentionPeriod - ) - } ++ (if (trackMetaBuild) { - val antiEntropy = configs.map(_.watchSettings.antiEntropy).max - val repo = getRepository(state) - buildGlobs.foreach(repo.register) - FileEventMonitor.antiEntropy( - repo, - antiEntropy, - logger.withPrefix("meta-build"), - quarantinePeriod, - retentionPeriod - ) :: Nil - } else Nil) + // Create a logger with a scoped key prefix so that we can tell from which task there + // were inputs that matched the event path. + val configLogger = logger.withPrefix(config.key.show) + observers.addObserver { e => + if (config.inputs().exists(_.glob.matches(e.path))) { + configLogger.debug(s"Accepted event for ${e.path}") + eventMonitorObservers.onNext(e) + } + } + } + if (trackMetaBuild) { + buildGlobs.foreach(repo.register) + val metaLogger = logger.withPrefix("meta-build") + observers.addObserver { e => + if (buildGlobs.exists(_.matches(e.path))) { + metaLogger.debug(s"Accepted event for ${e.path}") + eventMonitorObservers.onNext(e) + } + } + } + private[this] val monitor = FileEventMonitor.antiEntropy( + eventMonitorObservers, + configs.map(_.watchSettings.antiEntropy).max, + logger, + quarantinePeriod, + retentionPeriod + ) override def poll(duration: Duration, filter: Event => Boolean): Seq[Event] = { - val res = monitors.flatMap(_.poll(0.millis, filter)).toSet.toVector - if (res.isEmpty) Thread.sleep(duration.toMillis) - res + monitor.poll(duration, filter) } - override def close(): Unit = monitors.foreach(_.close()) + override def close(): Unit = { + delegateHandles.foreach(_.close()) + handle.close() + } } val watchLogger: WatchLogger = msg => logger.debug(msg.toString) val antiEntropy = configs.map(_.watchSettings.antiEntropy).max @@ -684,7 +681,7 @@ private[sbt] object Continuous extends DeprecatedContinuous { } (() => { - val actions = antiEntropyMonitor.poll(2.milliseconds).flatMap(onEvent) + val actions = antiEntropyMonitor.poll(30.milliseconds).flatMap(onEvent) if (actions.exists(_._2 != Watch.Ignore)) { val builder = new StringBuilder val min = actions.minBy { diff --git a/sbt/src/test/scala/testpkg/ServerSpec.scala b/sbt/src/test/scala/testpkg/ServerSpec.scala index f7e87331d..7b5abacbe 100644 --- a/sbt/src/test/scala/testpkg/ServerSpec.scala +++ b/sbt/src/test/scala/testpkg/ServerSpec.scala @@ -7,18 +7,19 @@ package testpkg +import java.io.{ File, IOException } + import org.scalatest._ -import scala.concurrent._ -import sbt.protocol.ClientSocket -import scala.util.Try -import TestServer.withTestServer -import java.io.File -import sbt.io.syntax._ -import sbt.io.IO import sbt.RunFromSourceMain -import scala.util.Try -import scala.concurrent.ExecutionContext -import java.util.concurrent.ForkJoinPool +import sbt.io.IO +import sbt.io.syntax._ +import sbt.protocol.ClientSocket +import testpkg.TestServer.withTestServer + +import scala.annotation.tailrec +import scala.concurrent._ +import scala.concurrent.duration._ +import scala.util.{ Success, Try } class ServerSpec extends fixture.AsyncFreeSpec with fixture.AsyncTestDataFixture with Matchers { "server" - { @@ -27,7 +28,7 @@ class ServerSpec extends fixture.AsyncFreeSpec with fixture.AsyncTestDataFixture p.sendJsonRpc( """{ "jsonrpc": "2.0", "id": "3", "method": "sbt/setting", "params": { "setting": "root/name" } }""" ) - assert(p.waitForString(10) { s => + assert(p.waitForString(10.seconds) { s => s contains """"id":"3"""" }) } @@ -38,7 +39,7 @@ class ServerSpec extends fixture.AsyncFreeSpec with fixture.AsyncTestDataFixture p.sendJsonRpc( """{ "jsonrpc": "2.0", "id": 3, "method": "sbt/setting", "params": { "setting": "root/name" } }""" ) - assert(p.waitForString(10) { s => + assert(p.waitForString(10.seconds) { s => s contains """"id":3""" }) } @@ -49,7 +50,7 @@ class ServerSpec extends fixture.AsyncFreeSpec with fixture.AsyncTestDataFixture p.sendJsonRpc( """{ "jsonrpc": "2.0", "id": 11, "method": "sbt/exec", "params": { "commandLine": "hello" } }""" ) - assert(p.waitForString(10) { s => + assert(p.waitForString(10.seconds) { s => (s contains """"id":11""") && (s contains """"error":""") }) } @@ -64,7 +65,7 @@ class ServerSpec extends fixture.AsyncFreeSpec with fixture.AsyncTestDataFixture """{ "jsonrpc": "2.0", "id":13, "method": "sbt/cancelRequest", "params": { "id": "55" } }""" ) - assert(p.waitForString(20) { s => + assert(p.waitForString(20.seconds) { s => (s contains """"error":{"code":-32800""") }) } @@ -76,7 +77,7 @@ class ServerSpec extends fixture.AsyncFreeSpec with fixture.AsyncTestDataFixture """{ "jsonrpc": "2.0", "id":12, "method": "sbt/exec", "params": { "commandLine": "run" } }""" ) - assert(p.waitForString(60) { s => + assert(p.waitForString(1.minute) { s => p.sendJsonRpc( """{ "jsonrpc": "2.0", "id":13, "method": "sbt/cancelRequest", "params": { "id": "12" } }""" ) @@ -91,7 +92,7 @@ class ServerSpec extends fixture.AsyncFreeSpec with fixture.AsyncTestDataFixture """{ "jsonrpc": "2.0", "id": "foo", "method": "sbt/exec", "params": { "commandLine": "run" } }""" ) - assert(p.waitForString(60) { s => + assert(p.waitForString(1.minute) { s => p.sendJsonRpc( """{ "jsonrpc": "2.0", "id": "bar", "method": "sbt/cancelRequest", "params": { "id": "foo" } }""" ) @@ -107,7 +108,7 @@ class ServerSpec extends fixture.AsyncFreeSpec with fixture.AsyncTestDataFixture s"""{ "jsonrpc": "2.0", "id": 15, "method": "sbt/completion", "params": $completionStr }""" ) - assert(p.waitForString(10) { s => + assert(p.waitForString(10.seconds) { s => s contains """"result":{"items":[""" }) } @@ -120,7 +121,7 @@ class ServerSpec extends fixture.AsyncFreeSpec with fixture.AsyncTestDataFixture s"""{ "jsonrpc": "2.0", "id": 15, "method": "sbt/completion", "params": $completionStr }""" ) - assert(p.waitForString(10) { s => + assert(p.waitForString(10.seconds) { s => s contains """"result":{"items":["hello"]}""" }) } @@ -133,7 +134,7 @@ class ServerSpec extends fixture.AsyncFreeSpec with fixture.AsyncTestDataFixture s"""{ "jsonrpc": "2.0", "id": 15, "method": "sbt/completion", "params": $completionStr }""" ) - assert(p.waitForString(10) { s => + assert(p.waitForString(10.seconds) { s => s contains """"result":{"items":["testOnly org.sbt.ExampleSpec"]}""" }) } @@ -159,20 +160,19 @@ object TestServer { f: TestServer => Future[Assertion] )(implicit td: TestData): Future[Assertion] = { // Each test server instance will be executed in a Thread pool separated from the tests - val testServer = TestServer(baseDirectory)( - ExecutionContext.fromExecutor(new ForkJoinPool()) - ) + val testServer = TestServer(baseDirectory) // checking last log message after initialization // if something goes wrong here the communication streams are corrupted, restarting val init = Try { - testServer.waitForString(30) { s => + testServer.waitForString(30.seconds) { s => + println(s) s contains """"message":"Done"""" } - }.toOption + } init match { - case Some(_) => + case Success(_) => try { f(testServer) } finally { @@ -194,7 +194,7 @@ object TestServer { } } -case class TestServer(baseDirectory: File)(implicit ec: ExecutionContext) { +case class TestServer(baseDirectory: File) { import TestServer.hostLog val readBuffer = new Array[Byte](40960) @@ -204,27 +204,28 @@ case class TestServer(baseDirectory: File)(implicit ec: ExecutionContext) { private val RetByte = '\r'.toByte hostLog("fork to a new sbt instance") - val process = - Future { - RunFromSourceMain.fork(baseDirectory) - } + val process = RunFromSourceMain.fork(baseDirectory) lazy val portfile = baseDirectory / "project" / "target" / "active.json" - hostLog("wait 30s until the server is ready to respond") - def waitForPortfile(n: Int): Unit = - if (portfile.exists) () - else { - if (n <= 0) sys.error(s"Timeout. $portfile is not found.") - else { - Thread.sleep(1000) - if ((n - 1) % 10 == 0) { - hostLog("waiting for the server...") - } - waitForPortfile(n - 1) + def portfileIsEmpty(): Boolean = + try IO.read(portfile).isEmpty + catch { case _: IOException => true } + def waitForPortfile(duration: FiniteDuration): Unit = { + val deadline = duration.fromNow + var nextLog = 10.seconds.fromNow + while (portfileIsEmpty && !deadline.isOverdue && process.isAlive) { + if (nextLog.isOverdue) { + hostLog("waiting for the server...") + nextLog = 10.seconds.fromNow } } - waitForPortfile(90) + if (deadline.isOverdue) sys.error(s"Timeout. $portfile is not found.") + if (!process.isAlive) sys.error(s"Server unexpectedly terminated.") + } + private val waitDuration: FiniteDuration = 120.seconds + hostLog(s"wait $waitDuration until the server is ready to respond") + waitForPortfile(90.seconds) // make connection to the socket described in the portfile val (sk, tkn) = ClientSocket.socket(portfile) @@ -245,11 +246,12 @@ case class TestServer(baseDirectory: File)(implicit ec: ExecutionContext) { sendJsonRpc( """{ "jsonrpc": "2.0", "id": 9, "method": "sbt/exec", "params": { "commandLine": "exit" } }""" ) - for { - p <- process - } { - p.destroy() + val deadline = 10.seconds.fromNow + while (!deadline.isOverdue && process.isAlive) { + Thread.sleep(10) } + // We gave the server a chance to exit but it didn't within a reasonable time frame. + if (deadline.isOverdue) process.destroy() } def sendJsonRpc(message: String): Unit = { @@ -286,17 +288,14 @@ case class TestServer(baseDirectory: File)(implicit ec: ExecutionContext) { readContentLength(l) } - final def waitForString(num: Int)(f: String => Boolean): Boolean = { - val res = Future { - var done = false - while (!done) { - done = readFrame.fold(false)(f) - } - true - }(ec) - - import scala.concurrent.duration._ - Await.result(res, num.seconds) + final def waitForString(duration: FiniteDuration)(f: String => Boolean): Boolean = { + val deadline = duration.fromNow + @tailrec + def impl(): Boolean = { + if (deadline.isOverdue) false + else readFrame.fold(false)(f) || impl + } + impl() } def readLine: Option[String] = {