Validate the cache by default

This commit change the default FileTree.Repository to always use a polling file
repository but one that validates the current file system results
against the cache results. On windows, we do not validate the cache
because the cache can cause io contention in scripted tests. The
cache does seem to work ok on my VM, but not on appveyor for whatever
reason. Validating the cache by default was suggested by @smarter in a
comment in https://github.com/sbt/sbt/issues/4543.
This commit is contained in:
Ethan Atkins 2019-03-29 21:07:18 -07:00
parent 247d242008
commit eb2926b004
6 changed files with 116 additions and 129 deletions

View File

@ -288,11 +288,10 @@ object Defaults extends BuildCommon {
Previous.references :== new Previous.References,
concurrentRestrictions := defaultRestrictions.value,
parallelExecution :== true,
fileTreeRepository :=
FileTree.repository(state.value.get(Keys.globalFileTreeRepository) match {
case Some(r) => r
case None => FileTreeView.DEFAULT.asDataView(FileAttributes.default)
}),
fileTreeRepository := state.value
.get(globalFileTreeRepository)
.map(FileTree.repository)
.getOrElse(FileTree.Repository.polling),
Continuous.dynamicInputs := Continuous.dynamicInputsImpl.value,
externalHooks := {
val repository = fileTreeRepository.value

View File

@ -92,7 +92,6 @@ object Keys {
val analysis = AttributeKey[CompileAnalysis]("analysis", "Analysis of compilation, including dependencies and generated outputs.", DSetting)
val suppressSbtShellNotification = settingKey[Boolean]("""True to suppress the "Executing in batch mode.." message.""").withRank(CSetting)
val enableGlobalCachingFileTreeRepository = settingKey[Boolean]("Toggles whether or not to create a global cache of the file system that can be used by tasks to quickly list a path").withRank(DSetting)
val fileTreeRepository = taskKey[FileTree.Repository]("A repository of the file system.").withRank(DSetting)
val pollInterval = settingKey[FiniteDuration]("Interval between checks for modified sources by the continuous execution command.").withRank(BMinusSetting)
val pollingGlobs = settingKey[Seq[Glob]]("Directories that cannot be cached and must always be rescanned. Typically these will be NFS mounted or something similar.").withRank(DSetting)

View File

@ -848,14 +848,13 @@ object BuiltinCommands {
}
s.put(Keys.stateCompilerCache, cache)
}
private[this] val rawGlobalFileTreeRepository = AttributeKey[FileTreeRepository[FileAttributes]](
private[sbt] val rawGlobalFileTreeRepository = AttributeKey[FileTreeRepository[FileAttributes]](
"raw-global-file-tree-repository",
"Provides a view into the file system that may or may not cache the tree in memory",
1000
)
private[sbt] def registerGlobalCaches(s: State): State =
try {
val extracted = Project.extract(s)
val cleanedUp = new AtomicBoolean(false)
def cleanup(): Unit = {
s.get(rawGlobalFileTreeRepository).foreach(_.close())
@ -863,12 +862,17 @@ object BuiltinCommands {
()
}
cleanup()
val fileTreeRepository = FileManagement.defaultFileTreeRepository(s, extracted)
val newState = s.addExitHook(if (cleanedUp.compareAndSet(false, true)) cleanup())
newState
val fileTreeRepository = FileTreeRepository.default(FileAttributes.default)
val fileCache = System.getProperty("sbt.io.filecache", "validate")
val newState = s
.addExitHook(if (cleanedUp.compareAndSet(false, true)) cleanup())
.put(Keys.taskRepository, new TaskRepository.Repr)
.put(rawGlobalFileTreeRepository, fileTreeRepository)
.put(Keys.globalFileTreeRepository, new CopiedFileTreeRepository(fileTreeRepository))
if (fileCache == "false" || (fileCache != "true" && Util.isWindows)) newState
else {
val copied = new CopiedFileTreeRepository(fileTreeRepository)
newState.put(Keys.globalFileTreeRepository, copied)
}
} catch {
case NonFatal(_) => s
}

View File

@ -20,6 +20,7 @@ import sbt.BasicCommandStrings.{
import sbt.BasicCommands.otherCommandParser
import sbt.Def._
import sbt.Scope.Global
import sbt.internal.FileManagement.CopiedFileTreeRepository
import sbt.internal.LabeledFunctions._
import sbt.internal.io.WatchState
import sbt.internal.util.complete.Parser._
@ -204,7 +205,6 @@ object Continuous extends DeprecatedContinuous {
new IllegalStateException("Tried to access FileTreeRepository for uninitialized state")
state
.get(Keys.globalFileTreeRepository)
.map(FileManagement.toMonitoringRepository)
.getOrElse(throw exception)
}
@ -283,41 +283,58 @@ object Continuous extends DeprecatedContinuous {
isCommand: Boolean
): State = Watch.withCharBufferedStdIn { in =>
val duped = new DupedInputStream(in)
setup(state.put(DupedSystemIn, duped), command) { (commands, s, valid, invalid) =>
implicit val extracted: Extracted = Project.extract(s)
EvaluateTask.withStreams(extracted.structure, s)(_.use(Keys.streams in Global) { streams =>
implicit val logger: Logger = streams.log
if (invalid.isEmpty) {
val currentCount = new AtomicInteger(count)
val configs = getAllConfigs(valid.map(v => v._1 -> v._2))
val callbacks = aggregate(configs, logger, in, s, currentCount, isCommand, commands)
val task = () => {
currentCount.getAndIncrement()
// abort as soon as one of the tasks fails
valid.takeWhile(_._3.apply())
()
}
callbacks.onEnter()
// Here we enter the Watched.watch state machine. We will not return until one of the
// state machine callbacks returns Watched.CancelWatch, Watched.Custom, Watched.HandleError
// or Watched.Reload. The task defined above will be run at least once. It will be run
// additional times whenever the state transition callbacks return Watched.Trigger.
try {
val terminationAction = Watch(task, callbacks.onStart, callbacks.nextEvent)
callbacks.onTermination(terminationAction, command, currentCount.get(), state)
} finally {
configs.foreach(_.repository.close())
callbacks.onExit()
}
implicit val extracted: Extracted = Project.extract(state)
val (stateWithRepo, repo) = state.get(Keys.globalFileTreeRepository) match {
case Some(r) => (state, r)
case _ =>
val repo = if ("polling" == System.getProperty("sbt.watch.mode")) {
val service =
new PollingWatchService(extracted.getOpt(Keys.pollInterval).getOrElse(500.millis))
FileTreeRepository.legacy(FileAttributes.default _, (_: Any) => {}, service)
} else {
// At least one of the commands in the multi command string could not be parsed, so we
// log an error and exit.
val invalidCommands = invalid.mkString("'", "', '", "'")
logger.error(s"Terminating watch due to invalid command(s): $invalidCommands")
state.fail
state
.get(BuiltinCommands.rawGlobalFileTreeRepository)
.map(new CopiedFileTreeRepository(_))
.getOrElse(FileTreeRepository.default(FileAttributes.default))
}
})
(state.put(Keys.globalFileTreeRepository, repo), repo)
}
try {
setup(stateWithRepo.put(DupedSystemIn, duped), command) { (commands, s, valid, invalid) =>
EvaluateTask.withStreams(extracted.structure, s)(_.use(Keys.streams in Global) { streams =>
implicit val logger: Logger = streams.log
if (invalid.isEmpty) {
val currentCount = new AtomicInteger(count)
val configs = getAllConfigs(valid.map(v => v._1 -> v._2))
val callbacks = aggregate(configs, logger, in, s, currentCount, isCommand, commands)
val task = () => {
currentCount.getAndIncrement()
// abort as soon as one of the tasks fails
valid.takeWhile(_._3.apply())
()
}
callbacks.onEnter()
// Here we enter the Watched.watch state machine. We will not return until one of the
// state machine callbacks returns Watched.CancelWatch, Watched.Custom, Watched.HandleError
// or Watched.Reload. The task defined above will be run at least once. It will be run
// additional times whenever the state transition callbacks return Watched.Trigger.
try {
val terminationAction = Watch(task, callbacks.onStart, callbacks.nextEvent)
callbacks.onTermination(terminationAction, command, currentCount.get(), state)
} finally {
configs.foreach(_.repository.close())
callbacks.onExit()
}
} else {
// At least one of the commands in the multi command string could not be parsed, so we
// log an error and exit.
val invalidCommands = invalid.mkString("'", "', '", "'")
logger.error(s"Terminating watch due to invalid command(s): $invalidCommands")
state.fail
}
})
}
} finally repo.close()
}
private def parseCommand(command: String, state: State): Seq[ScopedKey[_]] = {

View File

@ -9,55 +9,15 @@ package sbt
package internal
import java.io.IOException
import java.util.concurrent.ConcurrentHashMap
import sbt.BasicCommandStrings.ContinuousExecutePrefix
import sbt.internal.io.HybridPollingFileTreeRepository
import sbt.io.FileTreeDataView.{ Entry, Observable, Observer, Observers }
import sbt.io.{ FileTreeRepository, _ }
import sbt.util.{ Level, Logger }
import sbt.util.Logger
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.duration._
private[sbt] object FileManagement {
private[sbt] def defaultFileTreeRepository(
state: State,
extracted: Extracted
): FileTreeRepository[FileAttributes] = {
val pollingGlobs = extracted.getOpt(Keys.pollingGlobs).getOrElse(Nil)
val remaining = state.remainingCommands.map(_.commandLine)
// If the session is interactive or if the commands include a continuous build, then use
// the default configuration. Otherwise, use the sbt1_2_compat config, which does not cache
// anything, which makes it less likely to cause issues with CI.
val interactive =
remaining.contains("shell") || remaining.lastOption.contains("iflast shell")
val scripted = remaining.contains("setUpScripted")
val continuous = remaining.lastOption.exists(_.startsWith(ContinuousExecutePrefix))
val enableCache = extracted
.getOpt(Keys.enableGlobalCachingFileTreeRepository)
.getOrElse(!scripted && (interactive || continuous))
val pollInterval = extracted.getOpt(Keys.pollInterval).getOrElse(500.milliseconds)
val watchLogger: WatchLogger = extracted.getOpt(Keys.logLevel) match {
case Level.Debug =>
new WatchLogger { override def debug(msg: => Any): Unit = println(s"[watch-debug] $msg") }
case _ => new WatchLogger { override def debug(msg: => Any): Unit = {} }
}
if (enableCache) {
if (pollingGlobs.isEmpty) FileTreeRepository.default(FileAttributes.default)
else
new HybridMonitoringRepository[FileAttributes](
FileTreeRepository.hybrid(FileAttributes.default, pollingGlobs: _*),
pollInterval,
watchLogger
)
} else {
val service = Watched.createWatchService(pollInterval)
FileTreeRepository.legacy(FileAttributes.default _, (_: Any) => {}, service)
}
}
private[sbt] def monitor(
repository: FileTreeRepository[FileAttributes],
antiEntropy: FiniteDuration,
@ -96,34 +56,6 @@ private[sbt] object FileManagement {
}
}
private[sbt] class HybridMonitoringRepository[T](
underlying: HybridPollingFileTreeRepository[T],
delay: FiniteDuration,
logger: WatchLogger
) extends FileTreeRepository[T] {
private val registered: mutable.Set[Glob] = ConcurrentHashMap.newKeySet[Glob].asScala
override def listEntries(glob: Glob): Seq[Entry[T]] = underlying.listEntries(glob)
override def list(glob: Glob): Seq[TypedPath] = underlying.list(glob)
override def addObserver(observer: Observer[T]): Int = underlying.addObserver(observer)
override def removeObserver(handle: Int): Unit = underlying.removeObserver(handle)
override def close(): Unit = underlying.close()
override def register(glob: Glob): Either[IOException, Boolean] = {
registered.add(glob)
underlying.register(glob)
}
override def unregister(glob: Glob): Unit = underlying.unregister(glob)
private[sbt] def toMonitoringRepository: FileTreeRepository[T] = {
val polling = underlying.toPollingRepository(delay, logger)
registered.foreach(polling.register)
polling
}
}
private[sbt] def toMonitoringRepository[T](
repository: FileTreeRepository[T]
): FileTreeRepository[T] = repository match {
case h: HybridMonitoringRepository[T] => h.toMonitoringRepository
case r: FileTreeRepository[T] => r
}
private[sbt] class CopiedFileTreeRepository[T](underlying: FileTreeRepository[T])
extends FileTreeRepository[T] {
def addObserver(observer: Observer[T]) = underlying.addObserver(observer)

View File

@ -18,8 +18,14 @@ import scala.collection.mutable
import scala.language.experimental.macros
object FileTree {
private def toPair(e: Entry[FileAttributes]): Option[(Path, FileAttributes)] =
e.value.toOption.map(a => e.typedPath.toPath -> a)
private sealed trait CacheOptions
private case object NoCache extends CacheOptions
private case object UseCache extends CacheOptions
private case object LogDifferences extends CacheOptions
private def toPair(
filter: Entry[FileAttributes] => Boolean
)(e: Entry[FileAttributes]): Option[(Path, FileAttributes)] =
e.value.toOption.flatMap(a => if (filter(e)) Some(e.typedPath.toPath -> a) else None)
trait Repository extends sbt.internal.Repository[Seq, Glob, (Path, FileAttributes)]
private[sbt] trait DynamicInputs {
def value: Option[mutable.Set[Glob]]
@ -42,27 +48,57 @@ object FileTree {
private[sbt] object polling extends Repository {
val view = FileTreeView.DEFAULT.asDataView(FileAttributes.default)
override def get(key: Glob): Seq[(Path, FileAttributes)] =
view.listEntries(key).flatMap(toPair)
view.listEntries(key).flatMap(toPair(key.toEntryFilter))
override def close(): Unit = {}
}
}
private class ViewRepository(underlying: FileTreeDataView[FileAttributes]) extends Repository {
override def get(key: Glob): Seq[(Path, FileAttributes)] =
underlying.listEntries(key).flatMap(toPair)
override def close(): Unit = {}
}
private class CachingRepository(underlying: FileTreeRepository[FileAttributes])
extends Repository {
lazy val cacheOptions = System.getProperty("sbt.io.filecache") match {
case "false" => NoCache
case "true" => UseCache
case _ => LogDifferences
}
override def get(key: Glob): Seq[(Path, FileAttributes)] = {
underlying.register(key)
//underlying.listEntries(key).flatMap(toPair).distinct
Repository.polling.get(key)
cacheOptions match {
case LogDifferences =>
val res = Repository.polling.get(key)
val filter = key.toEntryFilter
val cacheRes = underlying
.listEntries(key)
.flatMap(e => if (filter(e)) Some(e.typedPath.toPath) else None)
.toSet
val resSet = res.map(_._1).toSet
if (cacheRes != resSet) {
val msg = "Warning: got different files when using the internal file cache compared " +
s"to polling the file system for key: $key.\n"
val fileDiff = cacheRes diff resSet match {
case d if d.nonEmpty =>
new Exception("hmm").printStackTrace()
s"Cache had files not found in the file system:\n${d.mkString("\n")}.\n"
case _ => ""
}
val cacheDiff = resSet diff cacheRes match {
case d if d.nonEmpty =>
(if (fileDiff.isEmpty) "" else " ") +
s"File system had files not in the cache:\n${d.mkString("\n")}.\n"
case _ => ""
}
val diff = fileDiff + cacheDiff
val instructions = "Please open an issue at https://github.com/sbt/sbt. To disable " +
"this warning, run sbt with -Dsbt.io.filecache=false"
System.err.println(msg + diff + instructions)
}
res
case UseCache =>
underlying.listEntries(key).flatMap(toPair(key.toEntryFilter))
case NoCache =>
Repository.polling.get(key)
}
}
override def close(): Unit = underlying.close()
}
private[sbt] def repository(underlying: FileTreeDataView[FileAttributes]): Repository =
underlying match {
case r: FileTreeRepository[FileAttributes] => new CachingRepository(r)
case v => new ViewRepository(v)
}
private[sbt] def repository(underlying: FileTreeRepository[FileAttributes]): Repository =
new CachingRepository(underlying)
}