From 7d3d3c71d6aaad4d102582b101efb5f97622b35c Mon Sep 17 00:00:00 2001 From: Ethan Atkins Date: Tue, 25 Sep 2018 19:39:19 -0700 Subject: [PATCH] Refactor Watched This commit reworks Watched to be more testable and extensible. It also adds some small features. The previous implementation presented a number of challenges: 1) It relied on external side effects to terminate the watch, which was difficult to test 2) It exposed irrelevant implementation details to the user in the methods that exposed the WatchState as a parameter. 3) It spun up two worker threads. One was to monitor System.in for user input. The other was to poll the watch service for events and write them to a queue. The user input thread actually broke '~console' because nearly every console session will hit the key, which would eventually cause the watch to stop when the user exited the console. To address (1), I add the shouldTerminate method to WatchConfig. This takes the current watch iteration is input and if the function returns true, the watch will stop. To address (2), I replace the triggeredMessage and watchingMessage keys with watchTriggeredMessage and watchStartMessage. The latter two keys are functions that do not take the WatchState as parameters. Both functions take the current iteration count as a parameter and the watchTriggeredMessage also has a parameter for the path that triggered the build. To address (3), I stop using the sbt.internal.io.EventMonitor and instead use the sbt.io.FileEventMonitor. The latter class is similar to the former except that it's polling method accepts a duration, which may be finite or infinite) and returns all of the events that occurred since it was last polled. By adding the ability to poll for a finite amount of time, we can interleave polling for events with polling System.in for user input, all on the main thread. This eliminates the two extraneous threads and fixes the '~console' use case I described before. I also let the user configure the function that reads from System.in via the watchHandleInput method. In fact, this method need not read from System.in at all since it's just () => Watched.Action. The reason that it isn't () => Boolean is that I'd like to leave open the option for the ability to trigger a build via user input, not just terminating the watch. My initial idea was to add the ability to type 'r' to re-build in addition to to exit. This doesn't work without integrating jline though because the input is buffered. Regardless, for testing purposes, it gives us the ability to add a timeout to the watch by making handleInput return true when a deadline expires. The tests are a bit wonky because I still need to rely on side effects in the logging methods to orchestrate the sequence of file events that I'd like to test. While I could move some of this logic into a background thread, there still needs to be coordination between the state of the watch and the background thread. I think it's easier to reason about when all of the work occurs on the same thread, even if it makes these user provided functions impure. I deprecated all of the previous watch related keys that are no longer used with the new infrastructure. To avoid breaking existing builds, I make the watchConfig task use the deprecated logging methods if they are defined in the user's builds, but sbt will not longer set the default values. For the vast majority of users, it should be straightforward to migrate their builds to use the new keys. My hunch is that the of the deprecated keys, only triggeredMessage is widely used (in conjunction with the clear screen method) and it is dead simple to replace it with watchTriggeredMessage. Note: The FileTreeViewConfig class is not really necessary for this commit. It will become more important in a subsequent commit which introduces an optional global file system cache. --- .travis.yml | 2 +- .../main/scala/sbt/FileTreeViewConfig.scala | 80 ++++ main-command/src/main/scala/sbt/Watched.scala | 359 +++++++++++------- .../src/test/scala/sbt/WatchedSpec.scala | 104 ++++- main/src/main/scala/sbt/Defaults.scala | 50 ++- main/src/main/scala/sbt/Keys.scala | 14 +- main/src/main/scala/sbt/Main.scala | 10 +- .../sbt-test/watch/on-start-watch/build.sbt | 13 + .../watch/on-start-watch/project/Count.scala | 6 + .../on-start-watch/src/main/scala/A.scala | 3 + sbt/src/sbt-test/watch/on-start-watch/test | 4 + 11 files changed, 470 insertions(+), 175 deletions(-) create mode 100644 main-command/src/main/scala/sbt/FileTreeViewConfig.scala create mode 100644 sbt/src/sbt-test/watch/on-start-watch/build.sbt create mode 100644 sbt/src/sbt-test/watch/on-start-watch/project/Count.scala create mode 100644 sbt/src/sbt-test/watch/on-start-watch/src/main/scala/A.scala create mode 100644 sbt/src/sbt-test/watch/on-start-watch/test diff --git a/.travis.yml b/.travis.yml index d17c5e546..1db7df73b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -41,7 +41,7 @@ env: - SBT_CMD="scripted source-dependencies/*1of3" - SBT_CMD="scripted source-dependencies/*2of3" - SBT_CMD="scripted source-dependencies/*3of3" - - SBT_CMD="scripted tests/*" + - SBT_CMD="scripted tests/* watch/*" - SBT_CMD="repoOverrideTest:scripted dependency-management/*" notifications: diff --git a/main-command/src/main/scala/sbt/FileTreeViewConfig.scala b/main-command/src/main/scala/sbt/FileTreeViewConfig.scala new file mode 100644 index 000000000..10414348b --- /dev/null +++ b/main-command/src/main/scala/sbt/FileTreeViewConfig.scala @@ -0,0 +1,80 @@ +/* + * sbt + * Copyright 2011 - 2018, Lightbend, Inc. + * Copyright 2008 - 2010, Mark Harrah + * Licensed under Apache License 2.0 (see LICENSE) + */ + +package sbt +import java.nio.file.Path + +import sbt.Watched.WatchSource +import sbt.internal.io.{ WatchServiceBackedObservable, WatchState } +import sbt.io.{ FileEventMonitor, FileTreeDataView, FileTreeView, TypedPath } +import sbt.util.Logger + +import scala.concurrent.duration.FiniteDuration + +/** + * Configuration for viewing and monitoring the file system. + */ +final class FileTreeViewConfig private ( + val newDataView: () => FileTreeDataView[Path], + val newMonitor: ( + FileTreeDataView[Path], + Seq[WatchSource], + Logger + ) => FileEventMonitor[Path] +) +object FileTreeViewConfig { + + /** + * Create a new FileTreeViewConfig. This factory takes a generic parameter, T, that is bounded + * by {{{sbt.io.FileTreeDataView[Path]}}}. The reason for this is to ensure that a + * sbt.io.FileTreeDataView that is instantiated by [[FileTreeViewConfig.newDataView]] can be + * passed into [[FileTreeViewConfig.newMonitor]] without constraining the type of view to be + * {{{sbt.io.FileTreeDataView[Path]}}}. + * @param newDataView create a new sbt.io.FileTreeDataView. This value may be cached in a global + * attribute + * @param newMonitor create a new sbt.io.FileEventMonitor using the sbt.io.FileTreeDataView + * created by newDataView + * @tparam T the subtype of sbt.io.FileTreeDataView that is returned by [[FileTreeViewConfig.newDataView]] + * @return a [[FileTreeViewConfig]] instance. + */ + def apply[T <: FileTreeDataView[Path]]( + newDataView: () => T, + newMonitor: (T, Seq[WatchSource], Logger) => FileEventMonitor[Path] + ): FileTreeViewConfig = + new FileTreeViewConfig( + newDataView, + (view: FileTreeDataView[Path], sources: Seq[WatchSource], logger: Logger) => + newMonitor(view.asInstanceOf[T], sources, logger) + ) + + /** + * Provides a default [[FileTreeViewConfig]]. This view does not cache entries. + * @param pollingInterval the maximum duration that the sbt.internal.io.EventMonitor will poll + * the underlying sbt.io.WatchService when monitoring for file events + * @param antiEntropy the duration of the period after a path triggers a build for which it is + * quarantined from triggering another build + * @return a [[FileTreeViewConfig]] instance. + */ + def default(pollingInterval: FiniteDuration, antiEntropy: FiniteDuration): FileTreeViewConfig = + FileTreeViewConfig( + () => FileTreeView.DEFAULT.asDataView(_.getPath), + (_: FileTreeDataView[Path], sources, logger) => { + val ioLogger: sbt.io.WatchLogger = msg => logger.debug(msg.toString) + FileEventMonitor.antiEntropy( + new WatchServiceBackedObservable( + WatchState.empty(Watched.createWatchService(), sources), + pollingInterval, + (_: TypedPath).getPath, + closeService = true, + ioLogger + ), + antiEntropy, + ioLogger + ) + } + ) +} diff --git a/main-command/src/main/scala/sbt/Watched.scala b/main-command/src/main/scala/sbt/Watched.scala index 3d80f64ca..bd31f11e5 100644 --- a/main-command/src/main/scala/sbt/Watched.scala +++ b/main-command/src/main/scala/sbt/Watched.scala @@ -8,21 +8,16 @@ package sbt import java.io.File -import java.nio.file.FileSystems +import java.nio.file.{ FileSystems, Path } -import sbt.BasicCommandStrings.{ - ContinuousExecutePrefix, - FailureWall, - continuousBriefHelp, - continuousDetail -} +import sbt.BasicCommandStrings.{ ContinuousExecutePrefix, continuousBriefHelp, continuousDetail } import sbt.BasicCommands.otherCommandParser -import sbt.CommandUtil.withAttribute import sbt.internal.LegacyWatched import sbt.internal.io.{ EventMonitor, Source, WatchState } import sbt.internal.util.AttributeKey import sbt.internal.util.Types.const import sbt.internal.util.complete.DefaultParsers +import sbt.io.FileEventMonitor.Event import sbt.io._ import sbt.util.{ Level, Logger } @@ -30,6 +25,7 @@ import scala.annotation.tailrec import scala.concurrent.duration._ import scala.util.Properties +@deprecated("Watched is no longer used to implement continuous execution", "1.3.0") trait Watched { /** The files watched when an action is run with a proceeding ~ */ @@ -59,18 +55,64 @@ trait Watched { } object Watched { - val defaultWatchingMessage: WatchState => String = ws => - s"${ws.count}. Waiting for source changes... (press enter to interrupt)" + /** + * This trait is used to communicate what the watch should do next at various points in time. It + * is heavily linked to a number of callbacks in [[WatchConfig]]. For example, when the + * sbt.io.FileEventMonitor created by [[FileTreeViewConfig.newMonitor]] detects a changed source + * file, then we expect [[WatchConfig.onWatchEvent]] to return [[Trigger]]. + */ + sealed trait Action + + /** + * Action that indicates that the watch should stop. + */ + case object CancelWatch extends Action + + /** + * Action that indicates that the watch should continue as though nothing happened. This may be + * because, for example, no user input was yet available in [[WatchConfig.handleInput]]. + */ + case object Ignore extends Action + + /** + * Action that indicates that the watch process should re-run the command. + */ + case object Trigger extends Action + + type WatchSource = Source + def terminateWatch(key: Int): Boolean = Watched.isEnter(key) + /* + * Without jline, checking for enter is nearly pointless because System.in.available will not + * return a non-zero value until the user presses enter. + */ + @tailrec + final def shouldTerminate: Boolean = + (System.in.available > 0) && (terminateWatch(System.in.read()) || shouldTerminate) + final val handleInput: () => Action = () => if (shouldTerminate) CancelWatch else Ignore + val defaultStartWatch: Int => Option[String] = count => + Some(s"$count. Waiting for source changes... (press enter to interrupt)") + @deprecated("Use defaultStartWatch in conjunction with the watchStartMessage key", "1.3.0") + val defaultWatchingMessage: WatchState => String = ws => defaultStartWatch(ws.count).get def projectWatchingMessage(projectId: String): WatchState => String = - ws => - s"${ws.count}. Waiting for source changes in project $projectId... (press enter to interrupt)" + ws => projectOnWatchMessage(projectId)(ws.count).get + def projectOnWatchMessage(project: String): Int => Option[String] = + count => + Some( + s"$count. Waiting for source changes in project $project... (press enter to interrupt)" + ) + val defaultOnTriggerMessage: Int => Option[String] = _ => None + @deprecated( + "Use defaultOnTriggerMessage in conjunction with the watchTriggeredMessage key", + "1.3.0" + ) val defaultTriggeredMessage: WatchState => String = const("") + val clearOnTrigger: Int => Option[String] = _ => Some(clearScreen) + @deprecated("Use clearOnTrigger in conjunction with the watchTriggeredMessage key", "1.3.0") val clearWhenTriggered: WatchState => String = const(clearScreen) def clearScreen: String = "\u001b[2J\u001b[0;0H" - type WatchSource = Source object WatchSource { /** @@ -94,6 +136,7 @@ object Watched { } + @deprecated("This method is not used and may be removed in a future version of sbt", "1.3.0") private[this] class AWatched extends Watched @deprecated("This method is not used and may be removed in a future version of sbt", "1.3.0") @@ -107,6 +150,7 @@ object Watched { override def watchingMessage(s: WatchState): String = base.watchingMessage(s) override def triggeredMessage(s: WatchState): String = base.triggeredMessage(s) } + @deprecated("This method is not used and may be removed in a future version of sbt", "1.3.0") def empty: Watched = new AWatched val PollDelay: FiniteDuration = 500.milliseconds @@ -114,24 +158,24 @@ object Watched { def isEnter(key: Int): Boolean = key == 10 || key == 13 def printIfDefined(msg: String): Unit = if (!msg.isEmpty) System.out.println(msg) - type Task = () => State - type Setup = (State, Watched, String) => (State, Logger, Task => State) + private type RunCommand = () => State + private type WatchSetup = (State, String) => (State, WatchConfig, RunCommand => State) /** * Provides the '~' continuous execution command. * @param setup a function that provides a logger and a function from (() => State) => State. * @return the '~' command. */ - def continuous(setup: Setup): Command = + def continuous(setup: WatchSetup): Command = Command(ContinuousExecutePrefix, continuousBriefHelp, continuousDetail)(otherCommandParser) { (state, command) => - Watched.command(state, command, setup) + Watched.executeContinuously(state, command, setup) } /** * Implements continuous execution. It works by first parsing the command and generating a task to * run with each build. It can run multiple commands that are separated by ";" in the command - * input. If any of these commands are invalid, the watch will immmediately exit. + * input. If any of these commands are invalid, the watch will immediately exit. * @param state the initial state * @param command the command(s) to repeatedly apply * @param setup function to generate a logger and a transformation of the resultant state. The @@ -141,93 +185,111 @@ object Watched { * @return the initial state if all of the input commands are valid. Otherwise, returns the * initial state with the failure transformation. */ - private[sbt] def command( + private[sbt] def executeContinuously( state: State, command: String, - setup: Setup, - ): State = - withAttribute(state, Watched.Configuration, "Continuous execution not configured.") { w => - val (s0, logger, process) = setup(state, w, command) - val s = FailureWall :: s0 - val parser = Command.combine(s.definedCommands)(s) - val commands = command.split(";") match { - case Array("", rest @ _*) => rest - case Array(cmd) => Seq(cmd) - } - val tasks = commands.foldLeft(Nil: Seq[Either[String, () => Either[Exception, Boolean]]]) { - case (t, cmd) => - t :+ (DefaultParsers.parse(cmd, parser) match { - case Right(task) => - Right { () => - try { - process(task) - Right(true) - } catch { case e: Exception => Left(e) } - } - case Left(_) => Left(cmd) - }) - } - val (valid, invalid) = tasks.partition(_.isRight) - if (invalid.isEmpty) { - val task = () => - valid.foldLeft(Right(true): Either[Exception, Boolean]) { - case (status, Right(t)) => if (status.getOrElse(true)) t() else status - case _ => throw new IllegalStateException("Should be unreachable") - } - @tailrec def shouldTerminate: Boolean = - (System.in.available > 0) && (w.terminateWatch(System.in.read()) || shouldTerminate) - val watchState = WatchState.empty(w.watchService(), w.watchSources(s)) - val config = WatchConfig.default( - logger, - () => shouldTerminate, - count => Some(w.triggeredMessage(watchState.withCount(count))).filter(_.nonEmpty), - count => Some(w.watchingMessage(watchState.withCount(count))).filter(_.nonEmpty), - watchState, - w.pollInterval, - w.antiEntropy - ) - watch(task, config) - state - } else { - logger.error( - s"Terminating watch due to invalid command(s): ${invalid.mkString("'", "', '", "'")}" - ) - state.fail - } + setup: WatchSetup, + ): State = { + val (s, config, newState) = setup(state, command) + val commands = command.split(";") match { + case Array("", rest @ _*) => rest + case Array(cmd) => Seq(cmd) } + val parser = Command.combine(s.definedCommands)(s) + val tasks = commands.foldLeft(Nil: Seq[Either[String, () => Either[Exception, Boolean]]]) { + (t, cmd) => + t :+ (DefaultParsers.parse(cmd, parser) match { + case Right(task) => + Right { () => + try { + newState(task) + Right(true) + } catch { case e: Exception => Left(e) } + } + case Left(_) => Left(cmd) + }) + } + val (valid, invalid) = tasks.partition(_.isRight) + if (invalid.isEmpty) { + val task = () => + valid.foldLeft(Right(true): Either[Exception, Boolean]) { + case (status, Right(t)) => if (status.getOrElse(true)) t() else status + case _ => throw new IllegalStateException("Should be unreachable") + } + watch(task, config) + state + } else { + config.logger.error( + s"Terminating watch due to invalid command(s): ${invalid.mkString("'", "', '", "'")}" + ) + state.fail + } + } private[sbt] def watch( task: () => Either[Exception, _], config: WatchConfig, ): Unit = { - val eventLogger = new EventMonitor.Logger { - override def debug(msg: => Any): Unit = config.logger.debug(msg.toString) - } - def debug(msg: String): Unit = if (msg.nonEmpty) config.logger.debug(msg) - val monitor = EventMonitor( - config.watchState, - config.pollInterval, - config.antiEntropy, - config.shouldTerminate(), - eventLogger - ) + val logger = config.logger + def info(msg: String): Unit = if (msg.nonEmpty) logger.info(msg) @tailrec def impl(count: Int): Unit = { + @tailrec + def nextAction(): Action = { + config.handleInput() match { + case CancelWatch => CancelWatch + case Trigger => Trigger + case _ => + val events = config.fileEventMonitor.poll(10.millis) + val next = events match { + case Seq() => (Ignore, None) + case Seq(head, tail @ _*) => + /* + * We traverse all of the events and find the one for which we give the highest + * weight. + * CancelWatch > Trigger > Ignore + */ + tail.foldLeft((config.onWatchEvent(head), Some(head))) { + case (r @ (CancelWatch, _), _) => r + // If we've found a trigger, only change the accumulator if we find a CancelWatch. + case ((action, event), e) => + config.onWatchEvent(e) match { + case Trigger if action == Ignore => (Trigger, Some(e)) + case _ => (action, event) + } + } + } + next match { + case (CancelWatch, Some(event)) => + logger.debug(s"Stopping watch due to event from ${event.entry.typedPath.getPath}.") + CancelWatch + case (Trigger, Some(event)) => + logger.debug(s"Triggered by ${event.entry.typedPath.getPath}") + config.triggeredMessage(event.entry.typedPath, count).foreach(info) + Trigger + case _ => + nextAction() + } + } + } task() match { - case _: Right[Exception, _] => - config.watchingMessage(count).foreach(debug) - if (monitor.awaitEvent()) { - config.triggeredMessage(count).foreach(debug) - impl(count + 1) + case Right(status) if !config.shouldTerminate(count) => + config.watchingMessage(count).foreach(info) + nextAction() match { + case CancelWatch => () + case _ => impl(count + 1) } - case Left(e) => config.logger.error(s"Terminating watch due to Unexpected error: $e") + case Left(e) => + logger.error(s"Terminating watch due to Unexpected error: $e") + case _ => + logger.debug("Terminating watch due to WatchConfig.shouldTerminate") } } try { impl(count = 1) } finally { - monitor.close() + config.fileEventMonitor.close() while (System.in.available() > 0) System.in.read() } } @@ -288,44 +350,48 @@ trait WatchConfig { def logger: Logger /** - * Returns true if the continuous execution should stop. - * @return true if the contiuous execution should stop. + * The sbt.io.FileEventMonitor that is used to monitor the file system. + * + * @return an sbt.io.FileEventMonitor instance. */ - def shouldTerminate(): Boolean + def fileEventMonitor: FileEventMonitor[Path] /** - * The message to print when a build is triggered. - * @param count the current continous iteration count - * @return an optional string to log + * A function that is periodically invoked to determine whether the watch should stop or + * trigger. Usually this will read from System.in to react to user input. + * @return an [[Watched.Action Action]] that will determine the next step in the watch. */ - def triggeredMessage(count: Int): Option[String] + def handleInput(): Watched.Action /** - * The message to print at the beginning of each watch iteration. - * @param count the current watch iteration - * @return an optional string to log before each watch iteration. + * This is run before each watch iteration and if it returns true, the watch is terminated. + * @param count The current number of watch iterstaions. + * @return true if the watch should stop. + */ + def shouldTerminate(count: Int): Boolean + + /** + * Callback that is invoked whenever a file system vent is detected. The next step of the watch + * is determined by the [[Watched.Action Action]] returned by the callback. + * @param event the detected sbt.io.FileEventMonitor.Event. + * @return the next [[Watched.Action Action]] to run. + */ + def onWatchEvent(event: Event[Path]): Watched.Action + + /** + * The optional message to log when a build is triggered. + * @param typedPath the path that triggered the build + * @param count the current iteration + * @return an optional log message. + */ + def triggeredMessage(typedPath: TypedPath, count: Int): Option[String] + + /** + * The optional message to log before each watch iteration. + * @param count the current iteration + * @return an optional log message. */ def watchingMessage(count: Int): Option[String] - - /** - * The WatchState that provides the WatchService that will be used to monitor events. - * @return the WatchState. - */ - def watchState: WatchState - - /** - * The maximum duration that the EventMonitor background thread will poll the underlying - * [[sbt.io.WatchService]] for events. - * @return - */ - def pollInterval: FiniteDuration - - /** - * The period for which files that trigger a build are quarantined from triggering a new build - * if they are modified. - * @return the anti-entropy period. - */ - def antiEntropy: FiniteDuration } /** @@ -334,45 +400,46 @@ trait WatchConfig { object WatchConfig { /** - * Generate an instance of [[WatchConfig]]. - * - * @param logger an [[sbt.util.Logger]] instance - * @param shouldStop returns true if the watch should stop - * @param triggeredMessage function to generate an optional message to print when a build is - - * @param watchingMessage function to generate an optional message to print before each watch - * iteration - * @param watchState the [[WatchState]] which provides an [[sbt.io.WatchService]] to monitor - * file system vents - * @param pollInterval the maximum polling time of the [[sbt.io.WatchService]] - * @param antiEntropy the period for which a file that triggered a build is quarantined so that - * any events detected during this period do not trigger a build. - * @return an instance of [[WatchConfig]]. + * Create an instance of [[WatchConfig]]. + * @param logger logger for watch events + * @param fileEventMonitor the monitor for file system events. + * @param handleInput callback that is periodically invoked to check whether to continue or + * terminate the watch based on user input. It is also possible to, for + * example time out the watch using this callback. + * @param onWatchEvent callback that is invoked when + * @param triggeredMessage optional message that will be logged when a new build is triggered. + * The input parameters are the sbt.io.TypedPath that triggered the new + * build and the current iteration count. + * @param watchingMessage optional message that is printed before each watch iteration begins. + * The input parameter is the current iteration count. + * @return a [[WatchConfig]] instance. */ def default( logger: Logger, - shouldStop: () => Boolean, - triggeredMessage: Int => Option[String], - watchingMessage: Int => Option[String], - watchState: WatchState, - pollInterval: FiniteDuration, - antiEntropy: FiniteDuration, + fileEventMonitor: FileEventMonitor[Path], + handleInput: () => Watched.Action, + shouldTerminate: Int => Boolean, + onWatchEvent: Event[Path] => Watched.Action, + triggeredMessage: (TypedPath, Int) => Option[String], + watchingMessage: Int => Option[String] ): WatchConfig = { val l = logger - val ss = shouldStop + val fem = fileEventMonitor + val hi = handleInput + val st = shouldTerminate + val owe = onWatchEvent val tm = triggeredMessage val wm = watchingMessage - val ws = watchState - val pi = pollInterval - val ae = antiEntropy new WatchConfig { override def logger: Logger = l - override def shouldTerminate(): Boolean = ss() - override def triggeredMessage(count: Int): Option[String] = tm(count) + override def fileEventMonitor: FileEventMonitor[Path] = fem + override def handleInput(): Watched.Action = hi() + override def shouldTerminate(count: Int): Boolean = + st(count) + override def onWatchEvent(event: Event[Path]): Watched.Action = owe(event) + override def triggeredMessage(typedPath: TypedPath, count: Int): Option[String] = + tm(typedPath, count) override def watchingMessage(count: Int): Option[String] = wm(count) - override def watchState: WatchState = ws - override def pollInterval: FiniteDuration = pi - override def antiEntropy: FiniteDuration = ae } } } diff --git a/main-command/src/test/scala/sbt/WatchedSpec.scala b/main-command/src/test/scala/sbt/WatchedSpec.scala index 5add4ba86..1ea7f36f7 100644 --- a/main-command/src/test/scala/sbt/WatchedSpec.scala +++ b/main-command/src/test/scala/sbt/WatchedSpec.scala @@ -8,31 +8,99 @@ package sbt import java.io.File -import java.util.concurrent.CountDownLatch +import java.nio.file.{ Files, Path } +import java.util.concurrent.atomic.AtomicBoolean import org.scalatest.{ FlatSpec, Matchers } -import sbt.Watched.{ NullLogger, WatchSource } -import sbt.internal.io.WatchState -import sbt.io.IO +import sbt.Watched._ +import sbt.WatchedSpec._ +import sbt.io.FileEventMonitor.Event +import sbt.io.{ FileEventMonitor, IO, TypedPath } +import sbt.util.Logger +import scala.collection.mutable import scala.concurrent.duration._ -import WatchedSpec._ class WatchedSpec extends FlatSpec with Matchers { - "Watched" should "stop" in IO.withTemporaryDirectory { dir => - val latch = new CountDownLatch(1) - val config = WatchConfig.default( - NullLogger, - () => latch.getCount == 0, - triggeredMessage = _ => { latch.countDown(); None }, - watchingMessage = _ => { new File(dir, "foo").createNewFile(); None }, - watchState = - WatchState.empty(Watched.createWatchService(), WatchSource(dir.toRealPath) :: Nil), - pollInterval = 5.millis, - antiEntropy = 5.millis + object Defaults { + private val fileTreeViewConfig = FileTreeViewConfig.default(50.millis, 50.millis) + def config( + sources: Seq[WatchSource], + fileEventMonitor: Option[FileEventMonitor[Path]] = None, + logger: Logger = NullLogger, + handleInput: () => Action = () => Ignore, + shouldTerminate: Int => Boolean = _ => true, + onWatchEvent: Event[Path] => Action = _ => Ignore, + triggeredMessage: (TypedPath, Int) => Option[String] = (_, _) => None, + watchingMessage: Int => Option[String] = _ => None + ): WatchConfig = { + val monitor = fileEventMonitor.getOrElse( + fileTreeViewConfig.newMonitor(fileTreeViewConfig.newDataView(), sources, logger) + ) + WatchConfig.default( + logger = logger, + monitor, + handleInput, + shouldTerminate, + onWatchEvent, + triggeredMessage, + watchingMessage + ) + } + } + "Watched.watch" should "stop" in IO.withTemporaryDirectory { dir => + val config = Defaults.config(sources = Seq(WatchSource(dir.toRealPath))) + Watched.watch(() => Right(true), config) should be(()) + } + it should "trigger" in IO.withTemporaryDirectory { dir => + val triggered = new AtomicBoolean(false) + val config = Defaults.config( + sources = Seq(WatchSource(dir.toRealPath)), + shouldTerminate = count => count == 2, + onWatchEvent = _ => { triggered.set(true); Trigger }, + watchingMessage = _ => { + new File(dir, "file").createNewFile; None + } ) - Watched.watch(() => Right(true), config) - assert(latch.getCount == 0) + Watched.watch(() => Right(true), config) should be(()) + assert(triggered.get()) + } + it should "filter events" in IO.withTemporaryDirectory { dir => + val realDir = dir.toRealPath + val queue = new mutable.Queue[TypedPath] + val foo = realDir.toPath.resolve("foo") + val bar = realDir.toPath.resolve("bar") + val config = Defaults.config( + sources = Seq(WatchSource(realDir)), + shouldTerminate = count => count == 2, + onWatchEvent = e => if (e.entry.typedPath.getPath == foo) Trigger else Ignore, + triggeredMessage = (tp, _) => { queue += tp; None }, + watchingMessage = _ => { Files.createFile(bar); Thread.sleep(5); Files.createFile(foo); None } + ) + Watched.watch(() => Right(true), config) should be(()) + queue.toIndexedSeq.map(_.getPath) shouldBe Seq(foo) + } + it should "enforce anti-entropy" in IO.withTemporaryDirectory { dir => + val realDir = dir.toRealPath + val queue = new mutable.Queue[TypedPath] + val foo = realDir.toPath.resolve("foo") + val bar = realDir.toPath.resolve("bar") + val config = Defaults.config( + sources = Seq(WatchSource(realDir)), + shouldTerminate = count => count == 3, + onWatchEvent = _ => Trigger, + triggeredMessage = (tp, _) => { queue += tp; None }, + watchingMessage = count => { + if (count == 1) Files.createFile(bar) + else if (count == 2) { + bar.toFile.setLastModified(5000) + Files.createFile(foo) + } + None + } + ) + Watched.watch(() => Right(true), config) should be(()) + queue.toIndexedSeq.map(_.getPath) shouldBe Seq(bar, foo) } } diff --git a/main/src/main/scala/sbt/Defaults.scala b/main/src/main/scala/sbt/Defaults.scala index 469886edd..8e41ada1d 100755 --- a/main/src/main/scala/sbt/Defaults.scala +++ b/main/src/main/scala/sbt/Defaults.scala @@ -40,16 +40,17 @@ import sbt.internal.util.Types._ import sbt.io.syntax._ import sbt.io.{ AllPassFilter, + DirectoryFilter, FileFilter, GlobFilter, + Hash, HiddenFileFilter, IO, NameFilter, NothingFilter, Path, PathFinder, - DirectoryFilter, - Hash + TypedPath }, Path._ import sbt.librarymanagement.Artifact.{ DocClassifier, SourceClassifier } import sbt.librarymanagement.Configurations.{ @@ -264,6 +265,11 @@ object Defaults extends BuildCommon { concurrentRestrictions := defaultRestrictions.value, parallelExecution :== true, pollInterval :== new FiniteDuration(500, TimeUnit.MILLISECONDS), + watchTriggeredMessage := { (_, _) => + None + }, + watchStartMessage := Watched.defaultStartWatch, + fileTreeViewConfig := FileTreeViewConfig.default(pollInterval.value, watchAntiEntropy.value), watchAntiEntropy :== new FiniteDuration(500, TimeUnit.MILLISECONDS), watchLogger := streams.value.log, watchService :== { () => @@ -600,8 +606,43 @@ object Defaults extends BuildCommon { clean := (Def.task { IO.delete(cleanFiles.value) } tag (Tags.Clean)).value, consoleProject := consoleProjectTask.value, watchTransitiveSources := watchTransitiveSourcesTask.value, - watchingMessage := Watched.projectWatchingMessage(thisProjectRef.value.project), - watch := watchSetting.value + watchOnEvent := { + val sources = watchTransitiveSources.value + e => + if (sources.exists(_.accept(e.entry.typedPath.getPath))) Watched.Trigger else Watched.Ignore + }, + watchHandleInput := Watched.handleInput, + watchShouldTerminate := { _ => + false + }, + watchConfig := { + val sources = watchTransitiveSources.value + val extracted = Project.extract(state.value) + val wm = extracted + .getOpt(watchingMessage) + .map(w => (count: Int) => Some(w(WatchState.empty(sources).withCount(count)))) + .getOrElse(watchStartMessage.value) + val tm = extracted + .getOpt(triggeredMessage) + .map( + tm => (_: TypedPath, count: Int) => Some(tm(WatchState.empty(sources).withCount(count))) + ) + .getOrElse(watchTriggeredMessage.value) + val logger = watchLogger.value + val viewConfig = fileTreeViewConfig.value + WatchConfig.default( + logger, + viewConfig.newMonitor(viewConfig.newDataView(), sources, logger), + watchHandleInput.value, + watchShouldTerminate.value, + watchOnEvent.value, + tm, + wm + ) + }, + watchStartMessage := Watched.projectOnWatchMessage(thisProjectRef.value.project), + watch := watchSetting.value, + fileTreeViewConfig := FileTreeViewConfig.default(pollInterval.value, watchAntiEntropy.value), ) def generate(generators: SettingKey[Seq[Task[Seq[File]]]]): Initialize[Task[Seq[File]]] = @@ -622,6 +663,7 @@ object Defaults extends BuildCommon { Def.task { allUpdates.value.flatten ++ globalPluginUpdate.?.value } } + @deprecated("This is no longer used to implement continuous execution", "1.3.0") def watchSetting: Initialize[Watched] = Def.setting { val getService = watchService.value diff --git a/main/src/main/scala/sbt/Keys.scala b/main/src/main/scala/sbt/Keys.scala index e47d19bca..66ffb8808 100644 --- a/main/src/main/scala/sbt/Keys.scala +++ b/main/src/main/scala/sbt/Keys.scala @@ -8,6 +8,7 @@ package sbt import java.io.File +import java.nio.file.{ Path => JPath } import java.net.URL import scala.concurrent.duration.{ FiniteDuration, Duration } import Def.ScopedKey @@ -40,7 +41,8 @@ import sbt.internal.{ SessionSettings, LogManager } -import sbt.io.{ FileFilter, WatchService } +import sbt.io.{ FileFilter, TypedPath, WatchService } +import sbt.io.FileEventMonitor.Event import sbt.internal.io.WatchState import sbt.internal.server.ServerHandler import sbt.internal.util.{ AttributeKey, SourcePosition } @@ -141,16 +143,26 @@ object Keys { val serverHandlers = settingKey[Seq[ServerHandler]]("User-defined server handlers.") val analysis = AttributeKey[CompileAnalysis]("analysis", "Analysis of compilation, including dependencies and generated outputs.", DSetting) + @deprecated("This is no longer used for continuous execution", "1.3.0") val watch = SettingKey(BasicKeys.watch) val suppressSbtShellNotification = settingKey[Boolean]("""True to suppress the "Executing in batch mode.." message.""").withRank(CSetting) val pollInterval = settingKey[FiniteDuration]("Interval between checks for modified sources by the continuous execution command.").withRank(BMinusSetting) val watchAntiEntropy = settingKey[FiniteDuration]("Duration for which the watch EventMonitor will ignore events for a file after that file has triggered a build.").withRank(BMinusSetting) + val watchConfig = taskKey[WatchConfig]("The configuration for continuous execution.").withRank(BMinusSetting) val watchLogger = taskKey[Logger]("A logger that reports watch events.").withRank(DSetting) + val watchHandleInput = settingKey[() => Watched.Action]("Function that is periodically invoked to determine if the continous build should be stopped or if a build should be triggered. It will usually read from stdin to respond to user commands.").withRank(BMinusSetting) + val watchOnEvent = taskKey[Event[JPath] => Watched.Action]("Determines how to handle a file event").withRank(BMinusSetting) val watchService = settingKey[() => WatchService]("Service to use to monitor file system changes.").withRank(BMinusSetting) + val watchShouldTerminate = settingKey[Int => Boolean]("Function that may terminate a continuous build based on the number of iterations.").withRank(BMinusSetting) val watchSources = taskKey[Seq[Watched.WatchSource]]("Defines the sources in this project for continuous execution to watch for changes.").withRank(BMinusSetting) + val watchStartMessage = settingKey[Int => Option[String]]("The message to show when triggered execution waits for sources to change. The parameter is the current watch iteration count.").withRank(DSetting) val watchTransitiveSources = taskKey[Seq[Watched.WatchSource]]("Defines the sources in all projects for continuous execution to watch.").withRank(CSetting) + val watchTriggeredMessage = settingKey[(TypedPath, Int) => Option[String]]("The message to show before triggered execution executes an action after sources change. The parameters are the path that triggered the build and the current watch iteration count.").withRank(DSetting) + @deprecated("Use watchStartMessage instead", "1.3.0") val watchingMessage = settingKey[WatchState => String]("The message to show when triggered execution waits for sources to change.").withRank(DSetting) + @deprecated("Use watchTriggeredMessage instead", "1.3.0") val triggeredMessage = settingKey[WatchState => String]("The message to show before triggered execution executes an action after sources change.").withRank(DSetting) + val fileTreeViewConfig = taskKey[FileTreeViewConfig]("Configures how sbt will traverse and monitor the file system.").withRank(BMinusSetting) // Path Keys val baseDirectory = settingKey[File]("The base directory. Depending on the scope, this is the base directory for the build, project, configuration, or task.").withRank(AMinusSetting) diff --git a/main/src/main/scala/sbt/Main.scala b/main/src/main/scala/sbt/Main.scala index 7c3002d5e..045e36ac7 100644 --- a/main/src/main/scala/sbt/Main.scala +++ b/main/src/main/scala/sbt/Main.scala @@ -447,12 +447,12 @@ object BuiltinCommands { s } - def continuous: Command = Watched.continuous { (state: State, _: Watched, command: String) => + def continuous: Command = Watched.continuous { (state: State, command: String) => val extracted = Project.extract(state) - val (s, logger) = extracted.runTask(Keys.watchLogger, state) - val process: (() => State) => State = - (f: () => State) => MainLoop.processCommand(Exec(command, None), s, f) - (s, logger, process) + val (s, watchConfig) = extracted.runTask(Keys.watchConfig, state) + val updateState = + (runCommand: () => State) => MainLoop.processCommand(Exec(command, None), s, runCommand) + (s, watchConfig, updateState) } private[this] def loadedEval(s: State, arg: String): Unit = { diff --git a/sbt/src/sbt-test/watch/on-start-watch/build.sbt b/sbt/src/sbt-test/watch/on-start-watch/build.sbt new file mode 100644 index 000000000..d992f3473 --- /dev/null +++ b/sbt/src/sbt-test/watch/on-start-watch/build.sbt @@ -0,0 +1,13 @@ +val checkCount = inputKey[Unit]("check that compile has run a specified number of times") + +checkCount := { + val expected = Def.spaceDelimited().parsed.head.toInt + assert(Count.get == expected) +} + +Compile / compile := { + Count.increment() + // Trigger a new build by updating the last modified time + ((Compile / scalaSource).value / "A.scala").setLastModified(5000) + (Compile / compile).value +} diff --git a/sbt/src/sbt-test/watch/on-start-watch/project/Count.scala b/sbt/src/sbt-test/watch/on-start-watch/project/Count.scala new file mode 100644 index 000000000..0698b75ff --- /dev/null +++ b/sbt/src/sbt-test/watch/on-start-watch/project/Count.scala @@ -0,0 +1,6 @@ +object Count { + private var count = 0 + def get: Int = count + def increment(): Unit = count += 1 + def reset(): Unit = count = 0 +} \ No newline at end of file diff --git a/sbt/src/sbt-test/watch/on-start-watch/src/main/scala/A.scala b/sbt/src/sbt-test/watch/on-start-watch/src/main/scala/A.scala new file mode 100644 index 000000000..df9e4d3d5 --- /dev/null +++ b/sbt/src/sbt-test/watch/on-start-watch/src/main/scala/A.scala @@ -0,0 +1,3 @@ +package a + +class A \ No newline at end of file diff --git a/sbt/src/sbt-test/watch/on-start-watch/test b/sbt/src/sbt-test/watch/on-start-watch/test new file mode 100644 index 000000000..f5fa900e7 --- /dev/null +++ b/sbt/src/sbt-test/watch/on-start-watch/test @@ -0,0 +1,4 @@ +# verify that the watch terminates when we reach the specified count +> set watchShouldTerminate := { count => count == 2 } +> ~compile +> checkCount 2