From 28aa1de32a1108e8b42cf7606c962df6e5dc6f52 Mon Sep 17 00:00:00 2001 From: Ethan Atkins Date: Sat, 18 Aug 2018 10:56:43 -0700 Subject: [PATCH] Refactor continuous execution This commit makes watch event logging work in the '~' command. The previous design of the command made this difficult, so there is a significant re-design of the implementation of '~'. I believe that this redesign will allow the feature to be maintained and improved more easily moving forward. With the redesign, it is now possible to test the business logic of the watch command (and I add a rudimentary test that I will build upon in subsequent commits). A bonus of this redesign is that now if the user tries to watch an invalid command, the watch will immediately terminate with an error rather than get stuck waiting for events when the task can never possibly succeed. The previous implementation of the '~' command makes it difficult to dynamically control the implementation arguments because it is implemented in the command project which makes it unable to depend on any task keys that are defined in the build. It works around this by putting all of it's configuration in the Watched attribute which is stored globally. This would not have been necessary if the function had been defined in the main project where it could just extract the value of the watched task rather than relying on the global attribute value. Moreover, because it cannot depend on tasks, it makes it nigh impossible to use the logging framework within the '~' command. Another issue with the previous implementation is that it's somewhat difficult to reason about. The executeContinuously has effectively two entry points: one for the first time the command is run and one for each subsequent invocation when a new build is triggered. The successive invocations are triggered by prepending commands to run to the previous state. This is made recursive by prepending the initial command (that was prefixed with '~'. Which branch we're in is determined by checking for the existence of a temporary attribute, that we must ensure that we remove when the build is stopped. This makes a lot of behavior non-local and difficult for an outsider who is less familiar with sbt to understand. Broadly, this refactor does two things: 1) Move the definition of continuous from BasicCommands to BuiltInCommands 2) Re-work the implementation to be executed in code rather than using the sbt dsl. The first part is simple. We just add an implementation of continuous to BuiltInCommands and remove it from the list of BasicCommands. We need to leave in the legacy implementation for binary compatibility. I also moved all of the actual implementation logic into Watched, which makes maintenance easier since most of the logic is in one place. The second part is more complicated. Rather than rely on the sbt dsl (e.g. `(ClearOnFailure :: next :: FailureWall :: repeat :: s)`) to parse and run the command. We manually parse the command and generate a task of type `() => State`. We don't actually need to do anything with the generated state because we're going to return the original state at the end of the command no matter what. With this task, we can then create a tail recursive function that repeatedly executes the task until the watch is terminated. The parsing is handled in the Watch.command method (which is where I moved the refactored BasicCommands.continuous implementation). The actual task running and monitoring is handled in Watched.watch. This method has no reference to the sbt state, which makes it testable. It sets up an event monitor and then delegates the recursive monitoring to a small nested function, Watched.watch.impl. One nice thing about this approach is that it is very easy to reason about the life cycle of the EventMonitor. The recursive call is within a try { } finally { } where the monitor and stdin are guaranteed to be cleared at the end. Adding support for a custom (and default) watch logger is trivial with the new infrastructure and is done via the watchLogger TaskKey. There was a small reporting race condition that was introduced by the change to (2). Because the new implementation is able to bypass command parsing for triggered builds, the watch message would usually end up being printed before the task outcome was fully logged. To work around this, I made the watch and triggered messages be logged rather than printed directly to stdout. As a result, the only user visible result of this change should be that instead of seeing: "1. Waiting for source changes in project foo... (press enter to interrupt)", users will now see: "[info] 1. Waiting for source changes in project foo... (press enter to interrupt)". --- .../src/main/scala/sbt/BasicCommands.scala | 2 +- main-command/src/main/scala/sbt/Watched.scala | 236 +++++++++++++++++- .../src/test/scala/sbt/WatchedSpec.scala | 42 ++++ main/src/main/scala/sbt/Defaults.scala | 1 + main/src/main/scala/sbt/Keys.scala | 3 +- main/src/main/scala/sbt/Main.scala | 11 +- main/src/main/scala/sbt/MainLoop.scala | 11 +- 7 files changed, 298 insertions(+), 8 deletions(-) create mode 100644 main-command/src/test/scala/sbt/WatchedSpec.scala diff --git a/main-command/src/main/scala/sbt/BasicCommands.scala b/main-command/src/main/scala/sbt/BasicCommands.scala index 2351b185b..0e8c51a41 100644 --- a/main-command/src/main/scala/sbt/BasicCommands.scala +++ b/main-command/src/main/scala/sbt/BasicCommands.scala @@ -50,7 +50,6 @@ object BasicCommands { call, early, exit, - continuous, history, oldshell, client, @@ -254,6 +253,7 @@ object BasicCommands { def exit: Command = Command.command(TerminateAction, exitBrief, exitBrief)(_ exit true) + @deprecated("Replaced by BuiltInCommands.continuous", "1.3.0") def continuous: Command = Command(ContinuousExecutePrefix, continuousBriefHelp, continuousDetail)(otherCommandParser) { (s, arg) => diff --git a/main-command/src/main/scala/sbt/Watched.scala b/main-command/src/main/scala/sbt/Watched.scala index 00566febc..dcddda9d6 100644 --- a/main-command/src/main/scala/sbt/Watched.scala +++ b/main-command/src/main/scala/sbt/Watched.scala @@ -10,12 +10,15 @@ package sbt import java.io.File import java.nio.file.FileSystems -import sbt.BasicCommandStrings.ClearOnFailure -import sbt.State.FailureWall +import sbt.BasicCommandStrings._ +import sbt.BasicCommands.otherCommandParser +import sbt.CommandUtil.withAttribute import sbt.internal.io.{ EventMonitor, Source, WatchState } import sbt.internal.util.AttributeKey import sbt.internal.util.Types.const +import sbt.internal.util.complete.DefaultParsers import sbt.io._ +import sbt.util.{ Level, Logger } import scala.annotation.tailrec import scala.concurrent.duration._ @@ -103,8 +106,127 @@ object Watched { val PollDelay: FiniteDuration = 500.milliseconds val AntiEntropy: FiniteDuration = 40.milliseconds def isEnter(key: Int): Boolean = key == 10 || key == 13 - def printIfDefined(msg: String) = if (!msg.isEmpty) System.out.println(msg) + def printIfDefined(msg: String): Unit = if (!msg.isEmpty) System.out.println(msg) + type Task = () => State + type Setup = (State, Watched, String) => (State, Logger, Task => State) + + /** + * Provides the '~' continuous execution command. + * @param setup a function that provides a logger and a function from (() => State) => State. + * @return the '~' command. + */ + def continuous(setup: Setup): Command = + Command(ContinuousExecutePrefix, continuousBriefHelp, continuousDetail)(otherCommandParser) { + (state, command) => + Watched.command(state, command, setup) + } + + /** + * Implements continuous execution. It works by first parsing the command and generating a task to + * run with each build. It can run multiple commands that are separated by ";" in the command + * input. If any of these commands are invalid, the watch will immmediately exit. + * @param state the initial state + * @param command the command(s) to repeatedly apply + * @param setup function to generate a logger and a transformation of the resultant state. The + * purpose of the transformation is to preserve the logging semantics that existed + * in the legacy version of this function in which the task would be run through + * MainLoop.processCommand, which is unavailable in the main-command project + * @return the initial state if all of the input commands are valid. Otherwise, returns the + * initial state with the failure transformation. + */ + private[sbt] def command( + state: State, + command: String, + setup: Setup, + ): State = + withAttribute(state, Watched.Configuration, "Continuous execution not configured.") { w => + val (s0, logger, process) = setup(state, w, command) + val s = FailureWall :: s0 + val parser = Command.combine(s.definedCommands)(s) + val commands = command.split(";") match { + case Array("", rest @ _*) => rest + case Array(cmd) => Seq(cmd) + } + val tasks = commands.foldLeft(Nil: Seq[Either[String, () => Either[Exception, Boolean]]]) { + case (t, cmd) => + t :+ (DefaultParsers.parse(cmd, parser) match { + case Right(task) => + Right { () => + try { + process(task) + Right(true) + } catch { case e: Exception => Left(e) } + } + case Left(_) => Left(cmd) + }) + } + val (valid, invalid) = tasks.partition(_.isRight) + if (invalid.isEmpty) { + val task = () => + valid.foldLeft(Right(true): Either[Exception, Boolean]) { + case (status, Right(t)) => if (status.getOrElse(true)) t() else status + case _ => throw new IllegalStateException("Should be unreachable") + } + @tailrec def shouldTerminate: Boolean = + (System.in.available > 0) && (w.terminateWatch(System.in.read()) || shouldTerminate) + val watchState = WatchState.empty(w.watchService(), w.watchSources(s)) + val config = WatchConfig.default( + logger, + () => shouldTerminate, + count => Some(w.triggeredMessage(watchState.withCount(count))).filter(_.nonEmpty), + count => Some(w.watchingMessage(watchState.withCount(count))).filter(_.nonEmpty), + watchState, + w.pollInterval, + w.antiEntropy + ) + watch(task, config) + state + } else { + logger.error( + s"Terminating watch due to invalid command(s): ${invalid.mkString("'", "', '", "'")}" + ) + state.fail + } + } + + private[sbt] def watch( + task: () => Either[Exception, _], + config: WatchConfig, + ): Unit = { + val eventLogger = new EventMonitor.Logger { + override def debug(msg: => Any): Unit = config.logger.debug(msg.toString) + } + def debug(msg: String): Unit = if (msg.nonEmpty) config.logger.debug(msg) + val monitor = EventMonitor( + config.watchState, + config.pollInterval, + config.antiEntropy, + config.shouldTerminate(), + eventLogger + ) + + @tailrec + def impl(count: Int): Unit = { + task() match { + case _: Right[Exception, _] => + config.watchingMessage(count).foreach(debug) + if (monitor.awaitEvent()) { + config.triggeredMessage(count).foreach(debug) + impl(count + 1) + } + case Left(e) => config.logger.error(s"Terminating watch due to Unexpected error: $e") + } + } + try { + impl(count = 1) + } finally { + monitor.close() + while (System.in.available() > 0) System.in.read() + } + } + + @deprecated("Replaced by Watched.command", "1.3.0") def executeContinuously(watched: Watched, s: State, next: String, repeat: String): State = { @tailrec def shouldTerminate: Boolean = (System.in.available > 0) && (watched.terminateWatch(System.in.read()) || shouldTerminate) @@ -148,6 +270,13 @@ object Watched { } } + private[sbt] object NullLogger extends Logger { + override def trace(t: => Throwable): Unit = {} + override def success(message: => String): Unit = {} + override def log(level: Level.Value, message: => String): Unit = {} + } + + @deprecated("ContinuousEventMonitor attribute is not used by Watched.command", "1.3.0") val ContinuousEventMonitor = AttributeKey[EventMonitor]( "watch event monitor", @@ -180,3 +309,104 @@ object Watched { } } } + +/** + * Provides a number of configuration options for continuous execution. + */ +trait WatchConfig { + + /** + * A logger. + * @return a logger + */ + def logger: Logger + + /** + * Returns true if the continuous execution should stop. + * @return true if the contiuous execution should stop. + */ + def shouldTerminate(): Boolean + + /** + * The message to print when a build is triggered. + * @param count the current continous iteration count + * @return an optional string to log + */ + def triggeredMessage(count: Int): Option[String] + + /** + * The message to print at the beginning of each watch iteration. + * @param count the current watch iteration + * @return an optional string to log before each watch iteration. + */ + def watchingMessage(count: Int): Option[String] + + /** + * The WatchState that provides the WatchService that will be used to monitor events. + * @return the WatchState. + */ + def watchState: WatchState + + /** + * The maximum duration that the EventMonitor background thread will poll the underlying + * [[sbt.io.WatchService]] for events. + * @return + */ + def pollInterval: FiniteDuration + + /** + * The period for which files that trigger a build are quarantined from triggering a new build + * if they are modified. + * @return the anti-entropy period. + */ + def antiEntropy: FiniteDuration +} + +/** + * Provides a default implementation of [[WatchConfig]]. + */ +object WatchConfig { + + /** + * Generate an instance of [[WatchConfig]]. + * + * @param logger an [[sbt.util.Logger]] instance + * @param shouldStop returns true if the watch should stop + * @param triggeredMessage function to generate an optional message to print when a build is + + * @param watchingMessage function to generate an optional message to print before each watch + * iteration + * @param watchState the [[WatchState]] which provides an [[sbt.io.WatchService]] to monitor + * file system vents + * @param pollInterval the maximum polling time of the [[sbt.io.WatchService]] + * @param antiEntropy the period for which a file that triggered a build is quarantined so that + * any events detected during this period do not trigger a build. + * @return an instance of [[WatchConfig]]. + */ + def default( + logger: Logger, + shouldStop: () => Boolean, + triggeredMessage: Int => Option[String], + watchingMessage: Int => Option[String], + watchState: WatchState, + pollInterval: FiniteDuration, + antiEntropy: FiniteDuration, + ): WatchConfig = { + val l = logger + val ss = shouldStop + val tm = triggeredMessage + val wm = watchingMessage + val ws = watchState + val pi = pollInterval + val ae = antiEntropy + new WatchConfig { + override def logger: Logger = l + override def shouldTerminate(): Boolean = ss() + override def triggeredMessage(count: Int): Option[String] = tm(count) + override def watchingMessage(count: Int): Option[String] = wm(count) + override def watchState: WatchState = ws + override def pollInterval: FiniteDuration = pi + override def antiEntropy: FiniteDuration = ae + } + } +} diff --git a/main-command/src/test/scala/sbt/WatchedSpec.scala b/main-command/src/test/scala/sbt/WatchedSpec.scala new file mode 100644 index 000000000..6f47f8880 --- /dev/null +++ b/main-command/src/test/scala/sbt/WatchedSpec.scala @@ -0,0 +1,42 @@ +/* + * sbt + * Copyright 2011 - 2018, Lightbend, Inc. + * Copyright 2008 - 2010, Mark Harrah + * Licensed under Apache License 2.0 (see LICENSE) + */ + +package sbt + +import java.io.File +import java.util.concurrent.CountDownLatch + +import org.scalatest.{ FlatSpec, Matchers } +import sbt.Watched.{ WatchConfig, NullLogger, WatchSource } +import sbt.internal.io.WatchState +import sbt.io.IO + +import scala.concurrent.duration._ +import WatchedSpec._ + +class WatchedSpec extends FlatSpec with Matchers { + "Watched" should "stop" in IO.withTemporaryDirectory { dir => + val latch = new CountDownLatch(1) + val config = WatchConfig.default( + NullLogger, + () => latch.getCount == 0, + triggeredMessage = _ => { latch.countDown(); None }, + watchingMessage = _ => { new File(dir, "foo").createNewFile(); None }, + watchState = WatchState.empty(Watched.createWatchService, WatchSource(dir.toRealPath) :: Nil), + pollInterval = 5.millis, + antiEntropy = 5.millis + ) + Watched.watch(() => Right(true), config) + assert(latch.getCount == 0) + } +} + +object WatchedSpec { + implicit class FileOps(val f: File) { + def toRealPath: File = f.toPath.toRealPath().toFile + } +} diff --git a/main/src/main/scala/sbt/Defaults.scala b/main/src/main/scala/sbt/Defaults.scala index 9bdbfd755..469886edd 100755 --- a/main/src/main/scala/sbt/Defaults.scala +++ b/main/src/main/scala/sbt/Defaults.scala @@ -265,6 +265,7 @@ object Defaults extends BuildCommon { parallelExecution :== true, pollInterval :== new FiniteDuration(500, TimeUnit.MILLISECONDS), watchAntiEntropy :== new FiniteDuration(500, TimeUnit.MILLISECONDS), + watchLogger := streams.value.log, watchService :== { () => Watched.createWatchService() }, diff --git a/main/src/main/scala/sbt/Keys.scala b/main/src/main/scala/sbt/Keys.scala index b8e5ff33e..e47d19bca 100644 --- a/main/src/main/scala/sbt/Keys.scala +++ b/main/src/main/scala/sbt/Keys.scala @@ -143,8 +143,9 @@ object Keys { val analysis = AttributeKey[CompileAnalysis]("analysis", "Analysis of compilation, including dependencies and generated outputs.", DSetting) val watch = SettingKey(BasicKeys.watch) val suppressSbtShellNotification = settingKey[Boolean]("""True to suppress the "Executing in batch mode.." message.""").withRank(CSetting) - val watchAntiEntropy = settingKey[FiniteDuration]("Duration for which the watch EventMonitor will ignore events for a file after that file has triggered a build.").withRank(BMinusSetting) val pollInterval = settingKey[FiniteDuration]("Interval between checks for modified sources by the continuous execution command.").withRank(BMinusSetting) + val watchAntiEntropy = settingKey[FiniteDuration]("Duration for which the watch EventMonitor will ignore events for a file after that file has triggered a build.").withRank(BMinusSetting) + val watchLogger = taskKey[Logger]("A logger that reports watch events.").withRank(DSetting) val watchService = settingKey[() => WatchService]("Service to use to monitor file system changes.").withRank(BMinusSetting) val watchSources = taskKey[Seq[Watched.WatchSource]]("Defines the sources in this project for continuous execution to watch for changes.").withRank(BMinusSetting) val watchTransitiveSources = taskKey[Seq[Watched.WatchSource]]("Defines the sources in all projects for continuous execution to watch.").withRank(CSetting) diff --git a/main/src/main/scala/sbt/Main.scala b/main/src/main/scala/sbt/Main.scala index ed669cb75..7c3002d5e 100644 --- a/main/src/main/scala/sbt/Main.scala +++ b/main/src/main/scala/sbt/Main.scala @@ -241,7 +241,8 @@ object BuiltinCommands { export, boot, initialize, - act + act, + continuous ) ++ allBasicCommands def DefaultBootCommands: Seq[String] = @@ -446,6 +447,14 @@ object BuiltinCommands { s } + def continuous: Command = Watched.continuous { (state: State, _: Watched, command: String) => + val extracted = Project.extract(state) + val (s, logger) = extracted.runTask(Keys.watchLogger, state) + val process: (() => State) => State = + (f: () => State) => MainLoop.processCommand(Exec(command, None), s, f) + (s, logger, process) + } + private[this] def loadedEval(s: State, arg: String): Unit = { val extracted = Project extract s import extracted._ diff --git a/main/src/main/scala/sbt/MainLoop.scala b/main/src/main/scala/sbt/MainLoop.scala index c9f4d171c..eb32d333f 100644 --- a/main/src/main/scala/sbt/MainLoop.scala +++ b/main/src/main/scala/sbt/MainLoop.scala @@ -144,13 +144,20 @@ object MainLoop { } /** This is the main function State transfer function of the sbt command processing. */ - def processCommand(exec: Exec, state: State): State = { + def processCommand(exec: Exec, state: State): State = + processCommand(exec, state, () => Command.process(exec.commandLine, state)) + + private[sbt] def processCommand( + exec: Exec, + state: State, + runCommand: () => State + ): State = { val channelName = exec.source map (_.channelName) StandardMain.exchange publishEventMessage ExecStatusEvent("Processing", channelName, exec.execId, Vector()) try { - val newState = Command.process(exec.commandLine, state) + val newState = runCommand() val doneEvent = ExecStatusEvent( "Done", channelName,