Use new EventMonitor in executeContinuously

In https://github.com/sbt/io/pull/142, I add a new api for watching for
source file events. This commit updates sbt to use the new EventMonitor
based api. The EventMonitor has an anti-entropy parameter, so that
multiple events on the same file in a short window of time do not
trigger a build. I add a key to tune it.

The implementation of executeContinuously is pretty similar. The main
changes are that shouldTerminate now blocks (EventMonitor spins up a
thread to check the termination condition) and that the
EventMonitor.watch method only returns a Boolean. This is because
the event monitor contains mutable state. It does, however, have a
state() method that returns an immutable snapshot of the state.
This commit is contained in:
Ethan Atkins 2018-04-12 13:06:54 -07:00
parent 8c6f71a180
commit 754385125a
3 changed files with 55 additions and 32 deletions

View File

@ -12,7 +12,7 @@ import java.nio.file.FileSystems
import sbt.BasicCommandStrings.ClearOnFailure
import sbt.State.FailureWall
import sbt.internal.io.{ Source, SourceModificationWatch, WatchState }
import sbt.internal.io.{ EventMonitor, Source, SourceModificationWatch, WatchState }
import sbt.internal.util.AttributeKey
import sbt.internal.util.Types.const
import sbt.io._
@ -33,6 +33,12 @@ trait Watched {
*/
def pollInterval: FiniteDuration = Watched.PollDelay
/**
* The duration for which the EventMonitor while ignore file events after a file triggers
* a new build.
*/
def antiEntropy: FiniteDuration = Watched.AntiEntropy
/** The message to show when triggered execution waits for sources to change.*/
private[sbt] def watchingMessage(s: WatchState): String = Watched.defaultWatchingMessage(s)
@ -81,52 +87,65 @@ object Watched {
override def watchSources(s: State) = (base.watchSources(s) /: paths)(_ ++ _.watchSources(s))
override def terminateWatch(key: Int): Boolean = base.terminateWatch(key)
override val pollInterval = (base +: paths).map(_.pollInterval).min
override val antiEntropy = (base +: paths).map(_.antiEntropy).min
override def watchingMessage(s: WatchState) = base.watchingMessage(s)
override def triggeredMessage(s: WatchState) = base.triggeredMessage(s)
}
def empty: Watched = new AWatched
val PollDelay: FiniteDuration = 500.milliseconds
val AntiEntropy: FiniteDuration = 40.milliseconds
def isEnter(key: Int): Boolean = key == 10 || key == 13
def printIfDefined(msg: String) = if (!msg.isEmpty) System.out.println(msg)
def executeContinuously(watched: Watched, s: State, next: String, repeat: String): State = {
@tailrec def shouldTerminate: Boolean =
(System.in.available > 0) && (watched.terminateWatch(System.in.read()) || shouldTerminate)
val sources = watched.watchSources(s)
val service = s get ContinuousWatchService getOrElse watched.watchService()
val watchState = s get ContinuousState getOrElse WatchState.empty(service, sources)
if (watchState.count > 0)
printIfDefined(watched watchingMessage watchState)
val (triggered, newWatchState) =
try {
val (triggered, newWatchState) =
SourceModificationWatch.watch(watched.pollInterval, watchState)(shouldTerminate)
(triggered, newWatchState)
} catch {
case e: Exception =>
val log = s.log
log.error("Error occurred obtaining files to watch. Terminating continuous execution...")
State.handleException(e, s, log)
(false, watchState)
}
if (triggered) {
printIfDefined(watched triggeredMessage newWatchState)
(ClearOnFailure :: next :: FailureWall :: repeat :: s)
.put(ContinuousState, newWatchState)
.put(ContinuousWatchService, service)
} else {
while (System.in.available() > 0) System.in.read()
service.close()
s.remove(ContinuousState).remove(ContinuousWatchService)
watched.terminateWatch(System.in.read()) || shouldTerminate
val log = s.log
val logger = new EventMonitor.Logger {
override def debug(msg: => Any): Unit = log.debug(msg.toString)
}
s get ContinuousEventMonitor match {
case None =>
// This is the first iteration, so run the task and create a new EventMonitor
(ClearOnFailure :: next :: FailureWall :: repeat :: s)
.put(
ContinuousEventMonitor,
EventMonitor(WatchState.empty(watched.watchService(), watched.watchSources(s)),
watched.pollInterval,
watched.antiEntropy,
shouldTerminate,
logger)
)
case Some(eventMonitor) =>
printIfDefined(watched watchingMessage eventMonitor.state)
val triggered = try eventMonitor.watch()
catch {
case e: Exception =>
log.error(
"Error occurred obtaining files to watch. Terminating continuous execution...")
State.handleException(e, s, log)
false
}
if (triggered) {
printIfDefined(watched triggeredMessage eventMonitor.state)
ClearOnFailure :: next :: FailureWall :: repeat :: s
} else {
while (System.in.available() > 0) System.in.read()
eventMonitor.close()
s.remove(ContinuousEventMonitor)
}
}
}
val ContinuousEventMonitor =
AttributeKey[EventMonitor]("watch event monitor",
"Internal: maintains watch state and monitor threads.")
@deprecated("Superseded by ContinuousEventMonitor", "1.1.5")
val ContinuousState =
AttributeKey[WatchState]("watch state", "Internal: tracks state for continuous execution.")
@deprecated("Superseded by ContinuousEventMonitor", "1.1.5")
val ContinuousWatchService =
AttributeKey[WatchService]("watch service",
"Internal: tracks watch service for continuous execution.")

View File

@ -248,6 +248,7 @@ object Defaults extends BuildCommon {
concurrentRestrictions := defaultRestrictions.value,
parallelExecution :== true,
pollInterval :== new FiniteDuration(500, TimeUnit.MILLISECONDS),
watchAntiEntropy :== new FiniteDuration(40, TimeUnit.MILLISECONDS),
watchService :== { () =>
Watched.createWatchService()
},
@ -555,13 +556,15 @@ object Defaults extends BuildCommon {
Def.setting {
val getService = watchService.value
val interval = pollInterval.value
val _antiEntropy = watchAntiEntropy.value
val base = thisProjectRef.value
val msg = watchingMessage.value
val trigMsg = triggeredMessage.value
new Watched {
val scoped = watchTransitiveSources in base
val key = scoped.scopedKey
override def pollInterval = interval
override def antiEntropy: FiniteDuration = _antiEntropy
override def pollInterval: FiniteDuration = interval
override def watchingMessage(s: WatchState) = msg(s)
override def triggeredMessage(s: WatchState) = trigMsg(s)
override def watchService() = getService()

View File

@ -140,6 +140,7 @@ object Keys {
val analysis = AttributeKey[CompileAnalysis]("analysis", "Analysis of compilation, including dependencies and generated outputs.", DSetting)
val watch = SettingKey(BasicKeys.watch)
val suppressSbtShellNotification = settingKey[Boolean]("""True to suppress the "Executing in batch mode.." message.""").withRank(CSetting)
val watchAntiEntropy = settingKey[FiniteDuration]("Duration for which the watch EventMonitor will ignore events for a file after that file has triggered a build.").withRank(BMinusSetting)
val pollInterval = settingKey[FiniteDuration]("Interval between checks for modified sources by the continuous execution command.").withRank(BMinusSetting)
val watchService = settingKey[() => WatchService]("Service to use to monitor file system changes.").withRank(BMinusSetting)
val watchSources = taskKey[Seq[Watched.WatchSource]]("Defines the sources in this project for continuous execution to watch for changes.").withRank(BMinusSetting)