From 754385125acb1debb5bb95e6b1b690a8dd73eb9e Mon Sep 17 00:00:00 2001 From: Ethan Atkins Date: Thu, 12 Apr 2018 13:06:54 -0700 Subject: [PATCH] Use new EventMonitor in executeContinuously In https://github.com/sbt/io/pull/142, I add a new api for watching for source file events. This commit updates sbt to use the new EventMonitor based api. The EventMonitor has an anti-entropy parameter, so that multiple events on the same file in a short window of time do not trigger a build. I add a key to tune it. The implementation of executeContinuously is pretty similar. The main changes are that shouldTerminate now blocks (EventMonitor spins up a thread to check the termination condition) and that the EventMonitor.watch method only returns a Boolean. This is because the event monitor contains mutable state. It does, however, have a state() method that returns an immutable snapshot of the state. --- main-command/src/main/scala/sbt/Watched.scala | 81 ++++++++++++------- main/src/main/scala/sbt/Defaults.scala | 5 +- main/src/main/scala/sbt/Keys.scala | 1 + 3 files changed, 55 insertions(+), 32 deletions(-) diff --git a/main-command/src/main/scala/sbt/Watched.scala b/main-command/src/main/scala/sbt/Watched.scala index cff9c1597..029702841 100644 --- a/main-command/src/main/scala/sbt/Watched.scala +++ b/main-command/src/main/scala/sbt/Watched.scala @@ -12,7 +12,7 @@ import java.nio.file.FileSystems import sbt.BasicCommandStrings.ClearOnFailure import sbt.State.FailureWall -import sbt.internal.io.{ Source, SourceModificationWatch, WatchState } +import sbt.internal.io.{ EventMonitor, Source, SourceModificationWatch, WatchState } import sbt.internal.util.AttributeKey import sbt.internal.util.Types.const import sbt.io._ @@ -33,6 +33,12 @@ trait Watched { */ def pollInterval: FiniteDuration = Watched.PollDelay + /** + * The duration for which the EventMonitor while ignore file events after a file triggers + * a new build. + */ + def antiEntropy: FiniteDuration = Watched.AntiEntropy + /** The message to show when triggered execution waits for sources to change.*/ private[sbt] def watchingMessage(s: WatchState): String = Watched.defaultWatchingMessage(s) @@ -81,52 +87,65 @@ object Watched { override def watchSources(s: State) = (base.watchSources(s) /: paths)(_ ++ _.watchSources(s)) override def terminateWatch(key: Int): Boolean = base.terminateWatch(key) override val pollInterval = (base +: paths).map(_.pollInterval).min + override val antiEntropy = (base +: paths).map(_.antiEntropy).min override def watchingMessage(s: WatchState) = base.watchingMessage(s) override def triggeredMessage(s: WatchState) = base.triggeredMessage(s) } def empty: Watched = new AWatched val PollDelay: FiniteDuration = 500.milliseconds + val AntiEntropy: FiniteDuration = 40.milliseconds def isEnter(key: Int): Boolean = key == 10 || key == 13 def printIfDefined(msg: String) = if (!msg.isEmpty) System.out.println(msg) def executeContinuously(watched: Watched, s: State, next: String, repeat: String): State = { @tailrec def shouldTerminate: Boolean = - (System.in.available > 0) && (watched.terminateWatch(System.in.read()) || shouldTerminate) - val sources = watched.watchSources(s) - val service = s get ContinuousWatchService getOrElse watched.watchService() - val watchState = s get ContinuousState getOrElse WatchState.empty(service, sources) - - if (watchState.count > 0) - printIfDefined(watched watchingMessage watchState) - - val (triggered, newWatchState) = - try { - val (triggered, newWatchState) = - SourceModificationWatch.watch(watched.pollInterval, watchState)(shouldTerminate) - (triggered, newWatchState) - } catch { - case e: Exception => - val log = s.log - log.error("Error occurred obtaining files to watch. Terminating continuous execution...") - State.handleException(e, s, log) - (false, watchState) - } - - if (triggered) { - printIfDefined(watched triggeredMessage newWatchState) - (ClearOnFailure :: next :: FailureWall :: repeat :: s) - .put(ContinuousState, newWatchState) - .put(ContinuousWatchService, service) - } else { - while (System.in.available() > 0) System.in.read() - service.close() - s.remove(ContinuousState).remove(ContinuousWatchService) + watched.terminateWatch(System.in.read()) || shouldTerminate + val log = s.log + val logger = new EventMonitor.Logger { + override def debug(msg: => Any): Unit = log.debug(msg.toString) + } + s get ContinuousEventMonitor match { + case None => + // This is the first iteration, so run the task and create a new EventMonitor + (ClearOnFailure :: next :: FailureWall :: repeat :: s) + .put( + ContinuousEventMonitor, + EventMonitor(WatchState.empty(watched.watchService(), watched.watchSources(s)), + watched.pollInterval, + watched.antiEntropy, + shouldTerminate, + logger) + ) + case Some(eventMonitor) => + printIfDefined(watched watchingMessage eventMonitor.state) + val triggered = try eventMonitor.watch() + catch { + case e: Exception => + log.error( + "Error occurred obtaining files to watch. Terminating continuous execution...") + State.handleException(e, s, log) + false + } + if (triggered) { + printIfDefined(watched triggeredMessage eventMonitor.state) + ClearOnFailure :: next :: FailureWall :: repeat :: s + } else { + while (System.in.available() > 0) System.in.read() + eventMonitor.close() + s.remove(ContinuousEventMonitor) + } } } + + val ContinuousEventMonitor = + AttributeKey[EventMonitor]("watch event monitor", + "Internal: maintains watch state and monitor threads.") + @deprecated("Superseded by ContinuousEventMonitor", "1.1.5") val ContinuousState = AttributeKey[WatchState]("watch state", "Internal: tracks state for continuous execution.") + @deprecated("Superseded by ContinuousEventMonitor", "1.1.5") val ContinuousWatchService = AttributeKey[WatchService]("watch service", "Internal: tracks watch service for continuous execution.") diff --git a/main/src/main/scala/sbt/Defaults.scala b/main/src/main/scala/sbt/Defaults.scala index 82af2a9e4..e34ccfc04 100755 --- a/main/src/main/scala/sbt/Defaults.scala +++ b/main/src/main/scala/sbt/Defaults.scala @@ -248,6 +248,7 @@ object Defaults extends BuildCommon { concurrentRestrictions := defaultRestrictions.value, parallelExecution :== true, pollInterval :== new FiniteDuration(500, TimeUnit.MILLISECONDS), + watchAntiEntropy :== new FiniteDuration(40, TimeUnit.MILLISECONDS), watchService :== { () => Watched.createWatchService() }, @@ -555,13 +556,15 @@ object Defaults extends BuildCommon { Def.setting { val getService = watchService.value val interval = pollInterval.value + val _antiEntropy = watchAntiEntropy.value val base = thisProjectRef.value val msg = watchingMessage.value val trigMsg = triggeredMessage.value new Watched { val scoped = watchTransitiveSources in base val key = scoped.scopedKey - override def pollInterval = interval + override def antiEntropy: FiniteDuration = _antiEntropy + override def pollInterval: FiniteDuration = interval override def watchingMessage(s: WatchState) = msg(s) override def triggeredMessage(s: WatchState) = trigMsg(s) override def watchService() = getService() diff --git a/main/src/main/scala/sbt/Keys.scala b/main/src/main/scala/sbt/Keys.scala index 8e26b54fb..473bfffba 100644 --- a/main/src/main/scala/sbt/Keys.scala +++ b/main/src/main/scala/sbt/Keys.scala @@ -140,6 +140,7 @@ object Keys { val analysis = AttributeKey[CompileAnalysis]("analysis", "Analysis of compilation, including dependencies and generated outputs.", DSetting) val watch = SettingKey(BasicKeys.watch) val suppressSbtShellNotification = settingKey[Boolean]("""True to suppress the "Executing in batch mode.." message.""").withRank(CSetting) + val watchAntiEntropy = settingKey[FiniteDuration]("Duration for which the watch EventMonitor will ignore events for a file after that file has triggered a build.").withRank(BMinusSetting) val pollInterval = settingKey[FiniteDuration]("Interval between checks for modified sources by the continuous execution command.").withRank(BMinusSetting) val watchService = settingKey[() => WatchService]("Service to use to monitor file system changes.").withRank(BMinusSetting) val watchSources = taskKey[Seq[Watched.WatchSource]]("Defines the sources in this project for continuous execution to watch for changes.").withRank(BMinusSetting)