From 272508596ac8dc7f763cc24c76e1bdfc9aa4eaea Mon Sep 17 00:00:00 2001 From: Ethan Atkins Date: Mon, 15 Jul 2019 12:52:49 -0700 Subject: [PATCH 1/3] Use one observer for all aggregated watch tasks There was a bug where sometimes a source file change would not trigger a new build if the change occurred during a build. Based on the logs, it seemed to be because a number of redundant events were generated for the same path and they triggered the anti-entropy constraint of the file event monitor. To fix this, I consolidated a number of observers of the global file tree repository into a single observer. This way, I am able to ensure that only one event is generated per file event. I also reworked the onEvent callback to only stamp the file once. It was previously stamping the modified source file for all of the aggregated tasks. In the sbt project running `~compile` meant that we were stamping a source file 22 times whenever the file changed. This actually uncovered a zinc issue though as well. Zinc computes and writes the hash of the source file to the analysis file after compilation has completed. If a source file is modified during compilation, then the new hash is written to the analysis file even when the compilation may have been made against the previous version of the file. Zinc will then refuse to re-compile that file until another change is made. I manually verified that in the sbt project if I ran `~compile` before this change and modified a file during compilation, then no event was triggered (there was a log message about the event being dropped due to the anti-entropy constraint though). After this change, if I changed a file during compilation, it seemed to always trigger, but due to the zinc bug, it didn't always re-compile. --- .../main/scala/sbt/internal/Continuous.scala | 98 ++++++++----------- 1 file changed, 43 insertions(+), 55 deletions(-) diff --git a/main/src/main/scala/sbt/internal/Continuous.scala b/main/src/main/scala/sbt/internal/Continuous.scala index 202e94fae..1f730394e 100644 --- a/main/src/main/scala/sbt/internal/Continuous.scala +++ b/main/src/main/scala/sbt/internal/Continuous.scala @@ -26,7 +26,6 @@ import sbt.internal.nio._ import sbt.internal.util.complete.Parser._ import sbt.internal.util.complete.{ Parser, Parsers } import sbt.internal.util.{ AttributeKey, JLine, Util } -import sbt.nio.FileStamper.LastModified import sbt.nio.Keys.{ fileInputs, _ } import sbt.nio.Watch.{ Creation, Deletion, ShowOptions, Update } import sbt.nio.file.FileAttributes @@ -582,7 +581,7 @@ private[sbt] object Continuous extends DeprecatedContinuous { val onEvent: Event => Seq[(Watch.Event, Watch.Action)] = event => { val path = event.path - def watchEvent(forceTrigger: Boolean): Option[Watch.Event] = { + def getWatchEvent(forceTrigger: Boolean): Option[Watch.Event] = { if (!event.exists) { Some(Deletion(event)) fileStampCache.remove(event.path) match { @@ -614,42 +613,33 @@ private[sbt] object Continuous extends DeprecatedContinuous { } if (buildGlobs.exists(_.matches(path))) { - watchEvent(forceTrigger = false).map(e => e -> Watch.Reload).toSeq + getWatchEvent(forceTrigger = false).map(e => e -> Watch.Reload).toSeq } else { - configs - .flatMap { config => - config - .inputs() - .filter(_.glob.matches(path)) - .sortBy(_.fileStamper match { - case FileStamper.Hash => -1 - case FileStamper.LastModified => 0 - }) - .headOption - .flatMap { d => - val forceTrigger = d.forceTrigger - d.fileStamper match { - // We do not update the file stamp cache because we only want hashes in that - // cache or else it can mess up the external hooks that use that cache. - case LastModified => - logger.debug(s"Trigger path detected $path") - val watchEvent = - if (!event.exists) Deletion(event) - else if (fileStampCache.get(path).isDefined) Creation(event) - else Update(event) - val action = config.watchSettings.onFileInputEvent(count.get(), watchEvent) - Some(watchEvent -> action) - case _ => - watchEvent(forceTrigger).flatMap { e => - val action = config.watchSettings.onFileInputEvent(count.get(), e) - if (action != Watch.Ignore) Some(e -> action) else None - } - } - } - } match { - case events if events.isEmpty => Nil - case events => events.minBy(_._2) :: Nil + val acceptedConfigParameters = configs.flatMap { config => + config.inputs().flatMap { + case i if i.glob.matches(path) => + Some((i.forceTrigger, i.fileStamper, config.watchSettings.onFileInputEvent)) + case _ => None + } } + if (acceptedConfigParameters.nonEmpty) { + val useHash = acceptedConfigParameters.exists(_._2 == FileStamper.Hash) + val forceTrigger = acceptedConfigParameters.exists(_._1) + val watchEvent = + if (useHash) getWatchEvent(forceTrigger) + else { + logger.debug(s"Trigger path detected $path") + Some( + if (!event.exists) Deletion(event) + else if (fileStampCache.get(path).isDefined) Creation(event) + else Update(event) + ) + } + acceptedConfigParameters.flatMap { + case (_, _, callback) => + watchEvent.map(e => e -> callback(count.get(), e)) + } + } else Nil } } val monitor: FileEventMonitor[Event] = new FileEventMonitor[Event] { @@ -662,28 +652,26 @@ private[sbt] object Continuous extends DeprecatedContinuous { private[this] val repo = getRepository(state) private[this] val handle = repo.addObserver(observers) private[this] val eventMonitorObservers = new Observers[Event] - private[this] val delegateHandles: Seq[AutoCloseable] = - configs.map { config => - // Create a logger with a scoped key prefix so that we can tell from which task there - // were inputs that matched the event path. - val configLogger = logger.withPrefix(config.command) - observers.addObserver { e => - if (config.inputs().exists(_.glob.matches(e.path))) { + private[this] val configHandle: AutoCloseable = + observers.addObserver { e => + // We only want to create one event per actual source file event. It doesn't matter + // which of the config inputs triggers the event because they all will be used in + // the onEvent callback above. + configs.find(_.inputs().exists(_.glob.matches(e.path))) match { + case Some(config) => + val configLogger = logger.withPrefix(config.command) configLogger.debug(s"Accepted event for ${e.path}") eventMonitorObservers.onNext(e) - } + case None => } - } - if (trackMetaBuild) { - buildGlobs.foreach(repo.register) - val metaLogger = logger.withPrefix("meta-build") - observers.addObserver { e => - if (buildGlobs.exists(_.matches(e.path))) { + if (trackMetaBuild && buildGlobs.exists(_.matches(e.path))) { + val metaLogger = logger.withPrefix("build") metaLogger.debug(s"Accepted event for ${e.path}") eventMonitorObservers.onNext(e) } } - } + if (trackMetaBuild) buildGlobs.foreach(repo.register) + private[this] val monitor = FileEventMonitor.antiEntropy( eventMonitorObservers, configs.map(_.watchSettings.antiEntropy).max, @@ -692,12 +680,11 @@ private[sbt] object Continuous extends DeprecatedContinuous { retentionPeriod ) - override def poll(duration: Duration, filter: Event => Boolean): Seq[Event] = { + override def poll(duration: Duration, filter: Event => Boolean): Seq[Event] = monitor.poll(duration, filter) - } override def close(): Unit = { - delegateHandles.foreach(_.close()) + configHandle.close() handle.close() } } @@ -728,7 +715,8 @@ private[sbt] object Continuous extends DeprecatedContinuous { } (() => { - val actions = antiEntropyMonitor.poll(30.milliseconds).flatMap(onEvent) + val events = antiEntropyMonitor.poll(30.milliseconds) + val actions = events.flatMap(onEvent) if (actions.exists(_._2 != Watch.Ignore)) { val builder = new StringBuilder val min = actions.minBy { From 5e374a8e7d15808ac006d16c0ca101a356e9d44b Mon Sep 17 00:00:00 2001 From: Ethan Atkins Date: Mon, 15 Jul 2019 14:15:58 -0700 Subject: [PATCH 2/3] Move onEvent callback definition It makes the file more readable to me to have this definition below the definition of the FileEventMonitor. --- .../main/scala/sbt/internal/Continuous.scala | 111 +++++++++--------- 1 file changed, 56 insertions(+), 55 deletions(-) diff --git a/main/src/main/scala/sbt/internal/Continuous.scala b/main/src/main/scala/sbt/internal/Continuous.scala index 1f730394e..4032b061c 100644 --- a/main/src/main/scala/sbt/internal/Continuous.scala +++ b/main/src/main/scala/sbt/internal/Continuous.scala @@ -578,6 +578,62 @@ private[sbt] object Continuous extends DeprecatedContinuous { val retentionPeriod = configs.map(_.watchSettings.antiEntropyRetentionPeriod).max val quarantinePeriod = configs.map(_.watchSettings.deletionQuarantinePeriod).max + val monitor: FileEventMonitor[Event] = new FileEventMonitor[Event] { + + private implicit class WatchLogger(val l: Logger) extends sbt.internal.nio.WatchLogger { + override def debug(msg: Any): Unit = l.debug(msg.toString) + } + + private[this] val observers: Observers[Event] = new Observers + private[this] val repo = getRepository(state) + private[this] val handle = repo.addObserver(observers) + private[this] val eventMonitorObservers = new Observers[Event] + private[this] val configHandle: AutoCloseable = + observers.addObserver { e => + // We only want to create one event per actual source file event. It doesn't matter + // which of the config inputs triggers the event because they all will be used in + // the onEvent callback above. + configs.find(_.inputs().exists(_.glob.matches(e.path))) match { + case Some(config) => + val configLogger = logger.withPrefix(config.command) + configLogger.debug(s"Accepted event for ${e.path}") + eventMonitorObservers.onNext(e) + case None => + } + if (trackMetaBuild && buildGlobs.exists(_.matches(e.path))) { + val metaLogger = logger.withPrefix("build") + metaLogger.debug(s"Accepted event for ${e.path}") + eventMonitorObservers.onNext(e) + } + } + if (trackMetaBuild) buildGlobs.foreach(repo.register) + + private[this] val monitor = FileEventMonitor.antiEntropy( + eventMonitorObservers, + configs.map(_.watchSettings.antiEntropy).max, + logger, + quarantinePeriod, + retentionPeriod + ) + + override def poll(duration: Duration, filter: Event => Boolean): Seq[Event] = + monitor.poll(duration, filter) + + override def close(): Unit = { + configHandle.close() + handle.close() + } + } + val watchLogger: WatchLogger = msg => logger.debug(msg.toString) + val antiEntropy = configs.map(_.watchSettings.antiEntropy).max + val antiEntropyMonitor = FileEventMonitor.antiEntropy( + monitor, + antiEntropy, + watchLogger, + quarantinePeriod, + retentionPeriod + ) + val onEvent: Event => Seq[(Watch.Event, Watch.Action)] = event => { val path = event.path @@ -642,61 +698,6 @@ private[sbt] object Continuous extends DeprecatedContinuous { } else Nil } } - val monitor: FileEventMonitor[Event] = new FileEventMonitor[Event] { - - private implicit class WatchLogger(val l: Logger) extends sbt.internal.nio.WatchLogger { - override def debug(msg: Any): Unit = l.debug(msg.toString) - } - - private[this] val observers: Observers[Event] = new Observers - private[this] val repo = getRepository(state) - private[this] val handle = repo.addObserver(observers) - private[this] val eventMonitorObservers = new Observers[Event] - private[this] val configHandle: AutoCloseable = - observers.addObserver { e => - // We only want to create one event per actual source file event. It doesn't matter - // which of the config inputs triggers the event because they all will be used in - // the onEvent callback above. - configs.find(_.inputs().exists(_.glob.matches(e.path))) match { - case Some(config) => - val configLogger = logger.withPrefix(config.command) - configLogger.debug(s"Accepted event for ${e.path}") - eventMonitorObservers.onNext(e) - case None => - } - if (trackMetaBuild && buildGlobs.exists(_.matches(e.path))) { - val metaLogger = logger.withPrefix("build") - metaLogger.debug(s"Accepted event for ${e.path}") - eventMonitorObservers.onNext(e) - } - } - if (trackMetaBuild) buildGlobs.foreach(repo.register) - - private[this] val monitor = FileEventMonitor.antiEntropy( - eventMonitorObservers, - configs.map(_.watchSettings.antiEntropy).max, - logger, - quarantinePeriod, - retentionPeriod - ) - - override def poll(duration: Duration, filter: Event => Boolean): Seq[Event] = - monitor.poll(duration, filter) - - override def close(): Unit = { - configHandle.close() - handle.close() - } - } - val watchLogger: WatchLogger = msg => logger.debug(msg.toString) - val antiEntropy = configs.map(_.watchSettings.antiEntropy).max - val antiEntropyMonitor = FileEventMonitor.antiEntropy( - monitor, - antiEntropy, - watchLogger, - quarantinePeriod, - retentionPeriod - ) /* * This is a callback that will be invoked whenever onEvent returns a Trigger action. The * motivation is to allow the user to specify this callback via setting so that, for example, From 6c4e23f77c0d1497c4e55360d332dc885bb82323 Mon Sep 17 00:00:00 2001 From: Ethan Atkins Date: Mon, 15 Jul 2019 14:15:01 -0700 Subject: [PATCH 3/3] Only persist file stamps in turbo mode The use of the persistent file stamp cache between watch runs didn't seem to cause any issues, but there was some chance for inconsistency between the file stamp cache and the file system so it makes sense to put it behind the turbo flag. After changing the default, the watch/on-change scripted test started failing. It turns out that the reason is that the file stamp cache managed by the watch process was not pre-filled by task evaluation. For this reason, the first time a source file was modified, it was treated as a creation regardless of whether or not it actually was. To fix this, I add logic to pre-fill the watch file stamp cache if we are _not_ persisting the file stamps between runs. I ran a before and after with the scala build performance benchmark tool and setting the watchPersistFileStamps key to true reduced the median run time by about 200ms in the non-turbo case. --- .../main/scala/sbt/internal/Continuous.scala | 26 +++++++++++++++---- main/src/main/scala/sbt/nio/FileStamp.scala | 8 +++--- main/src/main/scala/sbt/nio/Settings.scala | 19 +++++++++++--- main/src/main/scala/sbt/nio/Watch.scala | 2 +- sbt/src/sbt-test/watch/on-change/build.sbt | 2 +- 5 files changed, 43 insertions(+), 14 deletions(-) diff --git a/main/src/main/scala/sbt/internal/Continuous.scala b/main/src/main/scala/sbt/internal/Continuous.scala index 4032b061c..89e5fe00e 100644 --- a/main/src/main/scala/sbt/internal/Continuous.scala +++ b/main/src/main/scala/sbt/internal/Continuous.scala @@ -8,7 +8,8 @@ package sbt package internal -import java.io.{ ByteArrayInputStream, InputStream, File => _ } +import java.io.{ ByteArrayInputStream, IOException, InputStream, File => _ } +import java.nio.file.Path import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } import sbt.BasicCommandStrings.{ @@ -28,7 +29,7 @@ import sbt.internal.util.complete.{ Parser, Parsers } import sbt.internal.util.{ AttributeKey, JLine, Util } import sbt.nio.Keys.{ fileInputs, _ } import sbt.nio.Watch.{ Creation, Deletion, ShowOptions, Update } -import sbt.nio.file.FileAttributes +import sbt.nio.file.{ FileAttributes, Glob } import sbt.nio.{ FileStamp, FileStamper, Watch } import sbt.util.{ Level, _ } @@ -328,12 +329,14 @@ private[sbt] object Continuous extends DeprecatedContinuous { } val fileStampCache = new FileStamp.Cache repo.addObserver(t => fileStampCache.invalidate(t.path)) + val persistFileStamps = extracted.get(watchPersistFileStamps) + val cachingRepo: FileTreeRepository[FileAttributes] = + if (persistFileStamps) repo else new FileStampRepository(fileStampCache, repo) try { - val stateWithRepo = state.put(globalFileTreeRepository, repo) + val stateWithRepo = state.put(globalFileTreeRepository, cachingRepo) val fullState = addLegacyWatchSetting( - if (extracted.get(watchPersistFileStamps)) - stateWithRepo.put(persistentFileStampCache, fileStampCache) + if (persistFileStamps) stateWithRepo.put(persistentFileStampCache, fileStampCache) else stateWithRepo ) setup(fullState, commands) { (s, valid, invalid) => @@ -1091,4 +1094,17 @@ private[sbt] object Continuous extends DeprecatedContinuous { } } + private[sbt] class FileStampRepository( + fileStampCache: FileStamp.Cache, + underlying: FileTreeRepository[FileAttributes] + ) extends FileTreeRepository[FileAttributes] { + def putIfAbsent(path: Path, stamper: FileStamper): (Option[FileStamp], Option[FileStamp]) = + fileStampCache.putIfAbsent(path, stamper) + override def list(path: Path): Seq[(Path, FileAttributes)] = underlying.list(path) + override def addObserver(observer: Observer[FileEvent[FileAttributes]]): AutoCloseable = + underlying.addObserver(observer) + override def register(glob: Glob): Either[IOException, Observable[FileEvent[FileAttributes]]] = + underlying.register(glob) + override def close(): Unit = underlying.close() + } } diff --git a/main/src/main/scala/sbt/nio/FileStamp.scala b/main/src/main/scala/sbt/nio/FileStamp.scala index 0cd081686..fb6a3fcfb 100644 --- a/main/src/main/scala/sbt/nio/FileStamp.scala +++ b/main/src/main/scala/sbt/nio/FileStamp.scala @@ -253,12 +253,12 @@ private[sbt] object FileStamp { case e => e.value } - def putIfAbsent(key: Path, stamper: FileStamper): Unit = { + def putIfAbsent(key: Path, stamper: FileStamper): (Option[FileStamp], Option[FileStamp]) = { underlying.get(key) match { - case null => updateImpl(key, stamper) - case _ => + case null => (None, updateImpl(key, stamper)) + case Right(s) => (Some(s), None) + case Left(_) => (None, None) } - () } def update(key: Path, stamper: FileStamper): (Option[FileStamp], Option[FileStamp]) = { underlying.get(key) match { diff --git a/main/src/main/scala/sbt/nio/Settings.scala b/main/src/main/scala/sbt/nio/Settings.scala index 532ed90ae..8600a50e8 100644 --- a/main/src/main/scala/sbt/nio/Settings.scala +++ b/main/src/main/scala/sbt/nio/Settings.scala @@ -13,6 +13,7 @@ import java.nio.file.{ Files, Path } import sbt.Project._ import sbt.internal.Clean.ToSeqPath +import sbt.internal.Continuous.FileStampRepository import sbt.internal.util.{ AttributeKey, SourcePosition } import sbt.internal.{ Clean, Continuous, DynamicInput, SettingsGraph } import sbt.nio.FileStamp.{ fileStampJsonFormatter, pathJsonFormatter, _ } @@ -319,10 +320,22 @@ private[sbt] object Settings { addTaskDefinition(Keys.inputFileStamps in scopedKey.scope := { val cache = (unmanagedFileStampCache in scopedKey.scope).value val stamper = (Keys.inputFileStamper in scopedKey.scope).value + val stampFile: Path => Option[(Path, FileStamp)] = + sbt.Keys.state.value.get(globalFileTreeRepository) match { + case Some(repo: FileStampRepository) => + (path: Path) => + repo.putIfAbsent(path, stamper) match { + case (None, Some(s)) => + cache.put(path, s) + Some(path -> s) + case _ => cache.getOrElseUpdate(path, stamper).map(path -> _) + } + case _ => + (path: Path) => cache.getOrElseUpdate(path, stamper).map(path -> _) + } (Keys.allInputPathsAndAttributes in scopedKey.scope).value.flatMap { - case (p, a) if a.isRegularFile && !Files.isHidden(p) => - cache.getOrElseUpdate(p, stamper).map(p -> _) - case _ => None + case (path, a) if a.isRegularFile && !Files.isHidden(path) => stampFile(path) + case _ => None } }) private[this] def outputsAndStamps[T: JsonFormat: ToSeqPath]( diff --git a/main/src/main/scala/sbt/nio/Watch.scala b/main/src/main/scala/sbt/nio/Watch.scala index efd7d552a..fcb497b65 100644 --- a/main/src/main/scala/sbt/nio/Watch.scala +++ b/main/src/main/scala/sbt/nio/Watch.scala @@ -576,7 +576,7 @@ object Watch { sbt.Keys.aggregate in watchTasks :== false, watchTriggeredMessage :== Watch.defaultOnTriggerMessage, watchForceTriggerOnAnyChange :== false, - watchPersistFileStamps :== true, + watchPersistFileStamps := (sbt.Keys.turbo in ThisBuild).value, watchTriggers :== Nil, ) } diff --git a/sbt/src/sbt-test/watch/on-change/build.sbt b/sbt/src/sbt-test/watch/on-change/build.sbt index 260e5d1c2..542ca3643 100644 --- a/sbt/src/sbt-test/watch/on-change/build.sbt +++ b/sbt/src/sbt-test/watch/on-change/build.sbt @@ -33,4 +33,4 @@ watchOnFileInputEvent := { (_, event: Watch.Event) => else Watch.Trigger } -watchAntiEntropy := 0.millis \ No newline at end of file +watchAntiEntropy := 0.millis