Refactor continuous execution

This commit makes watch event logging work in the '~' command. The
previous design of the command made this difficult, so there is a
significant re-design of the implementation of '~'. I believe that this
redesign will allow the feature to be maintained and improved more
easily moving forward. With the redesign, it is now possible to test the
business logic of the watch command (and I add a rudimentary test that I
will build upon in subsequent commits).

A bonus of this redesign is that now if the user tries to watch an
invalid command, the watch will immediately terminate with an error
rather than get stuck waiting for events when the task can never
possibly succeed.

The previous implementation of the '~' command makes it difficult
to dynamically control the implementation arguments because it is
implemented in the command project which makes it unable to depend on
any task keys that are defined in the build. It works around this by
putting all of it's configuration in the Watched attribute which is
stored globally. This would not have been necessary if the function had
been defined in the main project where it could just extract the value
of the watched task rather than relying on the global attribute value.
Moreover, because it cannot depend on tasks, it makes it nigh impossible
to use the logging framework within the '~' command.

Another issue with the previous implementation is that it's somewhat
difficult to reason about. The executeContinuously has effectively two
entry points: one for the first time the command is run and one for each
subsequent invocation when a new build is triggered. The successive
invocations are triggered by prepending commands to run to the previous
state. This is made recursive by prepending the initial command (that
was prefixed with '~'. Which branch we're in is determined by checking
for the existence of a temporary attribute, that we must ensure that we
remove when the build is stopped. This makes a lot of behavior non-local and
difficult for an outsider who is less familiar with sbt to understand.

Broadly, this refactor does two things:
1) Move the definition of continuous from BasicCommands to BuiltInCommands
2) Re-work the implementation to be executed in code rather than using
   the sbt dsl.

The first part is simple. We just add an implementation of continuous to
BuiltInCommands and remove it from the list of BasicCommands. We need to
leave in the legacy implementation for binary compatibility. I also
moved all of the actual implementation logic into Watched, which makes
maintenance easier since most of the logic is in one place.

The second part is more complicated. Rather than rely on the sbt dsl
(e.g. `(ClearOnFailure :: next :: FailureWall :: repeat :: s)`) to
parse and run the command. We manually parse the command and generate a
task of type `() => State`. We don't actually need to do anything with
the generated state because we're going to return the original state at
the end of the command no matter what. With this task, we can then
create a tail recursive function that repeatedly executes the task until
the watch is terminated.

The parsing is handled in the Watch.command method (which is where I
moved the refactored BasicCommands.continuous implementation). The
actual task running and monitoring is handled in Watched.watch. This
method has no reference to the sbt state, which makes it testable. It sets
up an event monitor and then delegates the recursive monitoring to a
small nested function, Watched.watch.impl. One nice thing about this
approach is that it is very easy to reason about the life cycle of the
EventMonitor. The recursive call is within a try { } finally { } where
the monitor and stdin are guaranteed to be cleared at the end.

Adding support for a custom (and default) watch logger is trivial with
the new infrastructure and is done via the watchLogger TaskKey.

There was a small reporting race condition that was introduced by the
change to (2). Because the new implementation is able to bypass command
parsing for triggered builds, the watch message would usually end up
being printed before the task outcome was fully logged. To work around
this, I made the watch and triggered messages be logged rather than
printed directly to stdout. As a result, the only user visible result of
this change should be that instead of seeing:
"1. Waiting for source changes in project foo... (press enter to interrupt)",
users will now see:
"[info] 1. Waiting for source changes in project foo... (press enter to interrupt)".
This commit is contained in:
Ethan Atkins 2018-08-18 10:56:43 -07:00
parent 5e3f72ad8a
commit 28aa1de32a
7 changed files with 298 additions and 8 deletions

View File

@ -50,7 +50,6 @@ object BasicCommands {
call,
early,
exit,
continuous,
history,
oldshell,
client,
@ -254,6 +253,7 @@ object BasicCommands {
def exit: Command = Command.command(TerminateAction, exitBrief, exitBrief)(_ exit true)
@deprecated("Replaced by BuiltInCommands.continuous", "1.3.0")
def continuous: Command =
Command(ContinuousExecutePrefix, continuousBriefHelp, continuousDetail)(otherCommandParser) {
(s, arg) =>

View File

@ -10,12 +10,15 @@ package sbt
import java.io.File
import java.nio.file.FileSystems
import sbt.BasicCommandStrings.ClearOnFailure
import sbt.State.FailureWall
import sbt.BasicCommandStrings._
import sbt.BasicCommands.otherCommandParser
import sbt.CommandUtil.withAttribute
import sbt.internal.io.{ EventMonitor, Source, WatchState }
import sbt.internal.util.AttributeKey
import sbt.internal.util.Types.const
import sbt.internal.util.complete.DefaultParsers
import sbt.io._
import sbt.util.{ Level, Logger }
import scala.annotation.tailrec
import scala.concurrent.duration._
@ -103,8 +106,127 @@ object Watched {
val PollDelay: FiniteDuration = 500.milliseconds
val AntiEntropy: FiniteDuration = 40.milliseconds
def isEnter(key: Int): Boolean = key == 10 || key == 13
def printIfDefined(msg: String) = if (!msg.isEmpty) System.out.println(msg)
def printIfDefined(msg: String): Unit = if (!msg.isEmpty) System.out.println(msg)
type Task = () => State
type Setup = (State, Watched, String) => (State, Logger, Task => State)
/**
* Provides the '~' continuous execution command.
* @param setup a function that provides a logger and a function from (() => State) => State.
* @return the '~' command.
*/
def continuous(setup: Setup): Command =
Command(ContinuousExecutePrefix, continuousBriefHelp, continuousDetail)(otherCommandParser) {
(state, command) =>
Watched.command(state, command, setup)
}
/**
* Implements continuous execution. It works by first parsing the command and generating a task to
* run with each build. It can run multiple commands that are separated by ";" in the command
* input. If any of these commands are invalid, the watch will immmediately exit.
* @param state the initial state
* @param command the command(s) to repeatedly apply
* @param setup function to generate a logger and a transformation of the resultant state. The
* purpose of the transformation is to preserve the logging semantics that existed
* in the legacy version of this function in which the task would be run through
* MainLoop.processCommand, which is unavailable in the main-command project
* @return the initial state if all of the input commands are valid. Otherwise, returns the
* initial state with the failure transformation.
*/
private[sbt] def command(
state: State,
command: String,
setup: Setup,
): State =
withAttribute(state, Watched.Configuration, "Continuous execution not configured.") { w =>
val (s0, logger, process) = setup(state, w, command)
val s = FailureWall :: s0
val parser = Command.combine(s.definedCommands)(s)
val commands = command.split(";") match {
case Array("", rest @ _*) => rest
case Array(cmd) => Seq(cmd)
}
val tasks = commands.foldLeft(Nil: Seq[Either[String, () => Either[Exception, Boolean]]]) {
case (t, cmd) =>
t :+ (DefaultParsers.parse(cmd, parser) match {
case Right(task) =>
Right { () =>
try {
process(task)
Right(true)
} catch { case e: Exception => Left(e) }
}
case Left(_) => Left(cmd)
})
}
val (valid, invalid) = tasks.partition(_.isRight)
if (invalid.isEmpty) {
val task = () =>
valid.foldLeft(Right(true): Either[Exception, Boolean]) {
case (status, Right(t)) => if (status.getOrElse(true)) t() else status
case _ => throw new IllegalStateException("Should be unreachable")
}
@tailrec def shouldTerminate: Boolean =
(System.in.available > 0) && (w.terminateWatch(System.in.read()) || shouldTerminate)
val watchState = WatchState.empty(w.watchService(), w.watchSources(s))
val config = WatchConfig.default(
logger,
() => shouldTerminate,
count => Some(w.triggeredMessage(watchState.withCount(count))).filter(_.nonEmpty),
count => Some(w.watchingMessage(watchState.withCount(count))).filter(_.nonEmpty),
watchState,
w.pollInterval,
w.antiEntropy
)
watch(task, config)
state
} else {
logger.error(
s"Terminating watch due to invalid command(s): ${invalid.mkString("'", "', '", "'")}"
)
state.fail
}
}
private[sbt] def watch(
task: () => Either[Exception, _],
config: WatchConfig,
): Unit = {
val eventLogger = new EventMonitor.Logger {
override def debug(msg: => Any): Unit = config.logger.debug(msg.toString)
}
def debug(msg: String): Unit = if (msg.nonEmpty) config.logger.debug(msg)
val monitor = EventMonitor(
config.watchState,
config.pollInterval,
config.antiEntropy,
config.shouldTerminate(),
eventLogger
)
@tailrec
def impl(count: Int): Unit = {
task() match {
case _: Right[Exception, _] =>
config.watchingMessage(count).foreach(debug)
if (monitor.awaitEvent()) {
config.triggeredMessage(count).foreach(debug)
impl(count + 1)
}
case Left(e) => config.logger.error(s"Terminating watch due to Unexpected error: $e")
}
}
try {
impl(count = 1)
} finally {
monitor.close()
while (System.in.available() > 0) System.in.read()
}
}
@deprecated("Replaced by Watched.command", "1.3.0")
def executeContinuously(watched: Watched, s: State, next: String, repeat: String): State = {
@tailrec def shouldTerminate: Boolean =
(System.in.available > 0) && (watched.terminateWatch(System.in.read()) || shouldTerminate)
@ -148,6 +270,13 @@ object Watched {
}
}
private[sbt] object NullLogger extends Logger {
override def trace(t: => Throwable): Unit = {}
override def success(message: => String): Unit = {}
override def log(level: Level.Value, message: => String): Unit = {}
}
@deprecated("ContinuousEventMonitor attribute is not used by Watched.command", "1.3.0")
val ContinuousEventMonitor =
AttributeKey[EventMonitor](
"watch event monitor",
@ -180,3 +309,104 @@ object Watched {
}
}
}
/**
* Provides a number of configuration options for continuous execution.
*/
trait WatchConfig {
/**
* A logger.
* @return a logger
*/
def logger: Logger
/**
* Returns true if the continuous execution should stop.
* @return true if the contiuous execution should stop.
*/
def shouldTerminate(): Boolean
/**
* The message to print when a build is triggered.
* @param count the current continous iteration count
* @return an optional string to log
*/
def triggeredMessage(count: Int): Option[String]
/**
* The message to print at the beginning of each watch iteration.
* @param count the current watch iteration
* @return an optional string to log before each watch iteration.
*/
def watchingMessage(count: Int): Option[String]
/**
* The WatchState that provides the WatchService that will be used to monitor events.
* @return the WatchState.
*/
def watchState: WatchState
/**
* The maximum duration that the EventMonitor background thread will poll the underlying
* [[sbt.io.WatchService]] for events.
* @return
*/
def pollInterval: FiniteDuration
/**
* The period for which files that trigger a build are quarantined from triggering a new build
* if they are modified.
* @return the anti-entropy period.
*/
def antiEntropy: FiniteDuration
}
/**
* Provides a default implementation of [[WatchConfig]].
*/
object WatchConfig {
/**
* Generate an instance of [[WatchConfig]].
*
* @param logger an [[sbt.util.Logger]] instance
* @param shouldStop returns true if the watch should stop
* @param triggeredMessage function to generate an optional message to print when a build is
* @param watchingMessage function to generate an optional message to print before each watch
* iteration
* @param watchState the [[WatchState]] which provides an [[sbt.io.WatchService]] to monitor
* file system vents
* @param pollInterval the maximum polling time of the [[sbt.io.WatchService]]
* @param antiEntropy the period for which a file that triggered a build is quarantined so that
* any events detected during this period do not trigger a build.
* @return an instance of [[WatchConfig]].
*/
def default(
logger: Logger,
shouldStop: () => Boolean,
triggeredMessage: Int => Option[String],
watchingMessage: Int => Option[String],
watchState: WatchState,
pollInterval: FiniteDuration,
antiEntropy: FiniteDuration,
): WatchConfig = {
val l = logger
val ss = shouldStop
val tm = triggeredMessage
val wm = watchingMessage
val ws = watchState
val pi = pollInterval
val ae = antiEntropy
new WatchConfig {
override def logger: Logger = l
override def shouldTerminate(): Boolean = ss()
override def triggeredMessage(count: Int): Option[String] = tm(count)
override def watchingMessage(count: Int): Option[String] = wm(count)
override def watchState: WatchState = ws
override def pollInterval: FiniteDuration = pi
override def antiEntropy: FiniteDuration = ae
}
}
}

View File

@ -0,0 +1,42 @@
/*
* sbt
* Copyright 2011 - 2018, Lightbend, Inc.
* Copyright 2008 - 2010, Mark Harrah
* Licensed under Apache License 2.0 (see LICENSE)
*/
package sbt
import java.io.File
import java.util.concurrent.CountDownLatch
import org.scalatest.{ FlatSpec, Matchers }
import sbt.Watched.{ WatchConfig, NullLogger, WatchSource }
import sbt.internal.io.WatchState
import sbt.io.IO
import scala.concurrent.duration._
import WatchedSpec._
class WatchedSpec extends FlatSpec with Matchers {
"Watched" should "stop" in IO.withTemporaryDirectory { dir =>
val latch = new CountDownLatch(1)
val config = WatchConfig.default(
NullLogger,
() => latch.getCount == 0,
triggeredMessage = _ => { latch.countDown(); None },
watchingMessage = _ => { new File(dir, "foo").createNewFile(); None },
watchState = WatchState.empty(Watched.createWatchService, WatchSource(dir.toRealPath) :: Nil),
pollInterval = 5.millis,
antiEntropy = 5.millis
)
Watched.watch(() => Right(true), config)
assert(latch.getCount == 0)
}
}
object WatchedSpec {
implicit class FileOps(val f: File) {
def toRealPath: File = f.toPath.toRealPath().toFile
}
}

View File

@ -265,6 +265,7 @@ object Defaults extends BuildCommon {
parallelExecution :== true,
pollInterval :== new FiniteDuration(500, TimeUnit.MILLISECONDS),
watchAntiEntropy :== new FiniteDuration(500, TimeUnit.MILLISECONDS),
watchLogger := streams.value.log,
watchService :== { () =>
Watched.createWatchService()
},

View File

@ -143,8 +143,9 @@ object Keys {
val analysis = AttributeKey[CompileAnalysis]("analysis", "Analysis of compilation, including dependencies and generated outputs.", DSetting)
val watch = SettingKey(BasicKeys.watch)
val suppressSbtShellNotification = settingKey[Boolean]("""True to suppress the "Executing in batch mode.." message.""").withRank(CSetting)
val watchAntiEntropy = settingKey[FiniteDuration]("Duration for which the watch EventMonitor will ignore events for a file after that file has triggered a build.").withRank(BMinusSetting)
val pollInterval = settingKey[FiniteDuration]("Interval between checks for modified sources by the continuous execution command.").withRank(BMinusSetting)
val watchAntiEntropy = settingKey[FiniteDuration]("Duration for which the watch EventMonitor will ignore events for a file after that file has triggered a build.").withRank(BMinusSetting)
val watchLogger = taskKey[Logger]("A logger that reports watch events.").withRank(DSetting)
val watchService = settingKey[() => WatchService]("Service to use to monitor file system changes.").withRank(BMinusSetting)
val watchSources = taskKey[Seq[Watched.WatchSource]]("Defines the sources in this project for continuous execution to watch for changes.").withRank(BMinusSetting)
val watchTransitiveSources = taskKey[Seq[Watched.WatchSource]]("Defines the sources in all projects for continuous execution to watch.").withRank(CSetting)

View File

@ -241,7 +241,8 @@ object BuiltinCommands {
export,
boot,
initialize,
act
act,
continuous
) ++ allBasicCommands
def DefaultBootCommands: Seq[String] =
@ -446,6 +447,14 @@ object BuiltinCommands {
s
}
def continuous: Command = Watched.continuous { (state: State, _: Watched, command: String) =>
val extracted = Project.extract(state)
val (s, logger) = extracted.runTask(Keys.watchLogger, state)
val process: (() => State) => State =
(f: () => State) => MainLoop.processCommand(Exec(command, None), s, f)
(s, logger, process)
}
private[this] def loadedEval(s: State, arg: String): Unit = {
val extracted = Project extract s
import extracted._

View File

@ -144,13 +144,20 @@ object MainLoop {
}
/** This is the main function State transfer function of the sbt command processing. */
def processCommand(exec: Exec, state: State): State = {
def processCommand(exec: Exec, state: State): State =
processCommand(exec, state, () => Command.process(exec.commandLine, state))
private[sbt] def processCommand(
exec: Exec,
state: State,
runCommand: () => State
): State = {
val channelName = exec.source map (_.channelName)
StandardMain.exchange publishEventMessage
ExecStatusEvent("Processing", channelName, exec.execId, Vector())
try {
val newState = Command.process(exec.commandLine, state)
val newState = runCommand()
val doneEvent = ExecStatusEvent(
"Done",
channelName,