From e3625ca8b0781288f379eff6d6e6534998d64200 Mon Sep 17 00:00:00 2001 From: Ethan Atkins Date: Wed, 6 Feb 2019 14:23:54 -0800 Subject: [PATCH] Fix repository setup on windows Windows io really doesn't handle concurrent readers and writers all that well. Using the LegacyFileTreeRepository was problematic in windows scripted tests because even though the repository implementation did not use the cache in its list methods, it did persistently monitor the directories that were registered. The monitor has to do a lot of io on a background thread to maintain the cache. This caused io contention that would cause IO.createDirectory to fail with an obscure AccessDeniedException. The way to avoid this is to prevent the background io from occurring at all. I don't necessarily think this will impact most users running sbt interactively with a cache, but it did cause scripted tests to fail. For that reason I made the default in non-interactive/shell use cases on windows to be a PollingFileRepository which never monitors the file system except when we are in a watch. The LegacyFileTreeRepository works fine on mac and linux which have a more forgiving file system. To make this work, I had to add FileManagement.toMonitoringRepository. There are now two kinds of repositories that cannot monitor on their own: HybridPollingFileTreeRepository and PollingFileRepository. The FileManagement.toMonitoringRepository makes a new repository that turns on monitoring for those two repository types and disables the close method on all other repositories so that closing the FileEventMonitor does not actually close the global file repository. --- main/src/main/scala/sbt/Defaults.scala | 2 +- .../scala/sbt/internal/FileManagement.scala | 118 ++++++++++++++++-- 2 files changed, 107 insertions(+), 13 deletions(-) diff --git a/main/src/main/scala/sbt/Defaults.scala b/main/src/main/scala/sbt/Defaults.scala index f2646f50a..46c43c360 100755 --- a/main/src/main/scala/sbt/Defaults.scala +++ b/main/src/main/scala/sbt/Defaults.scala @@ -646,7 +646,7 @@ object Defaults extends BuildCommon { ) .getOrElse(watchTriggeredMessage.value) val logger = watchLogger.value - val repo = FileManagement.repo.value + val repo = FileManagement.toMonitoringRepository(FileManagement.repo.value) globs.foreach(repo.register) val monitor = FileManagement.monitor(repo, watchAntiEntropy.value, logger) WatchConfig.default( diff --git a/main/src/main/scala/sbt/internal/FileManagement.scala b/main/src/main/scala/sbt/internal/FileManagement.scala index a2c327386..348d933df 100644 --- a/main/src/main/scala/sbt/internal/FileManagement.scala +++ b/main/src/main/scala/sbt/internal/FileManagement.scala @@ -8,13 +8,19 @@ package sbt package internal +import java.io.IOException +import java.util.concurrent.ConcurrentHashMap + import sbt.BasicCommandStrings.ContinuousExecutePrefix import sbt.Keys._ import sbt.internal.io.HybridPollingFileTreeRepository -import sbt.io.FileTreeDataView.{ Observable, Observer, Observers } +import sbt.internal.util.Util +import sbt.io.FileTreeDataView.{ Entry, Observable, Observer, Observers } import sbt.io.{ FileTreeRepository, _ } -import sbt.util.Logger +import sbt.util.{ Level, Logger } +import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.concurrent.duration._ private[sbt] object FileManagement { @@ -34,15 +40,26 @@ private[sbt] object FileManagement { val enableCache = extracted .getOpt(Keys.enableGlobalCachingFileTreeRepository) .getOrElse(!scripted && (interactive || continuous)) + val pollInterval = extracted.getOpt(Keys.pollInterval).getOrElse(500.milliseconds) + val watchLogger: WatchLogger = extracted.getOpt(Keys.logLevel) match { + case Level.Debug => + new WatchLogger { override def debug(msg: => Any): Unit = println(s"[watch-debug] $msg") } + case _ => new WatchLogger { override def debug(msg: => Any): Unit = {} } + } if (enableCache) { if (pollingGlobs.isEmpty) FileTreeRepository.default(FileCacheEntry.default) - else FileTreeRepository.hybrid(FileCacheEntry.default, pollingGlobs: _*) + else + new HybridMonitoringRepository[FileCacheEntry]( + FileTreeRepository.hybrid(FileCacheEntry.default, pollingGlobs: _*), + pollInterval, + watchLogger + ) } else { - FileTreeRepository.legacy( - FileCacheEntry.default, - (_: Any) => {}, - Watched.createWatchService(extracted.getOpt(Keys.pollInterval).getOrElse(500.milliseconds)) - ) + if (Util.isWindows) new PollingFileRepository(FileCacheEntry.default) + else { + val service = Watched.createWatchService(pollInterval) + FileTreeRepository.legacy(FileCacheEntry.default _, (_: Any) => {}, service) + } } } @@ -55,10 +72,10 @@ private[sbt] object FileManagement { // callbacks. val copied: Observable[FileCacheEntry] = new Observable[FileCacheEntry] { private[this] val observers = new Observers[FileCacheEntry] - val (underlying, needClose) = repository match { + val underlying = repository match { case h: HybridPollingFileTreeRepository[FileCacheEntry] => - (h.toPollingRepository(antiEntropy, (msg: Any) => logger.debug(msg.toString)), true) - case r => (r, false) + h.toPollingRepository(antiEntropy, (msg: Any) => logger.debug(msg.toString)) + case r => r } private[this] val handle = underlying.addObserver(observers) override def addObserver(observer: Observer[FileCacheEntry]): Int = @@ -66,7 +83,7 @@ private[sbt] object FileManagement { override def removeObserver(handle: Int): Unit = observers.removeObserver(handle) override def close(): Unit = { underlying.removeObserver(handle) - if (needClose) underlying.close() + underlying.close() } } new FileEventMonitor[FileCacheEntry] { @@ -88,4 +105,81 @@ private[sbt] object FileManagement { lazy val msg = s"Tried to get FileTreeRepository for uninitialized state." state.value.get(Keys.globalFileTreeRepository).getOrElse(throw new IllegalStateException(msg)) } + + private[sbt] class HybridMonitoringRepository[T]( + underlying: HybridPollingFileTreeRepository[T], + delay: FiniteDuration, + logger: WatchLogger + ) extends FileTreeRepository[T] { + private val registered: mutable.Set[Glob] = ConcurrentHashMap.newKeySet[Glob].asScala + override def listEntries(glob: Glob): Seq[Entry[T]] = underlying.listEntries(glob) + override def list(glob: Glob): Seq[TypedPath] = underlying.list(glob) + override def addObserver(observer: Observer[T]): Int = underlying.addObserver(observer) + override def removeObserver(handle: Int): Unit = underlying.removeObserver(handle) + override def close(): Unit = underlying.close() + override def register(glob: Glob): Either[IOException, Boolean] = { + registered.add(glob) + underlying.register(glob) + } + override def unregister(glob: Glob): Unit = underlying.unregister(glob) + private[sbt] def toMonitoringRepository: FileTreeRepository[T] = { + val polling = underlying.toPollingRepository(delay, logger) + registered.foreach(polling.register) + polling + } + } + private[sbt] def toMonitoringRepository[T]( + repository: FileTreeRepository[T] + ): FileTreeRepository[T] = repository match { + case p: PollingFileRepository[T] => p.toMonitoringRepository + case h: HybridMonitoringRepository[T] => h.toMonitoringRepository + case r: FileTreeRepository[T] => new CopiedFileRepository(r) + } + private class CopiedFileRepository[T](underlying: FileTreeRepository[T]) + extends FileTreeRepository[T] { + def addObserver(observer: Observer[T]) = underlying.addObserver(observer) + def close(): Unit = {} // Don't close the underlying observable + def list(glob: Glob): Seq[TypedPath] = underlying.list(glob) + def listEntries(glob: Glob): Seq[Entry[T]] = underlying.listEntries(glob) + def removeObserver(handle: Int): Unit = underlying.removeObserver(handle) + def register(glob: Glob): Either[IOException, Boolean] = underlying.register(glob) + def unregister(glob: Glob): Unit = underlying.unregister(glob) + } + private[sbt] class PollingFileRepository[T](converter: TypedPath => T) + extends FileTreeRepository[T] { self => + private val registered: mutable.Set[Glob] = ConcurrentHashMap.newKeySet[Glob].asScala + private[this] val view = FileTreeView.DEFAULT + private[this] val dataView = view.asDataView(converter) + private[this] val handles: mutable.Map[FileTreeRepository[T], Int] = + new ConcurrentHashMap[FileTreeRepository[T], Int].asScala + private val observers: Observers[T] = new Observers + override def addObserver(observer: Observer[T]): Int = observers.addObserver(observer) + override def close(): Unit = { + handles.foreach { case (repo, handle) => repo.removeObserver(handle) } + observers.close() + } + override def list(glob: Glob): Seq[TypedPath] = view.list(glob) + override def listEntries(glob: Glob): Seq[Entry[T]] = dataView.listEntries(glob) + override def removeObserver(handle: Int): Unit = observers.removeObserver(handle) + override def register(glob: Glob): Either[IOException, Boolean] = Right(registered.add(glob)) + override def unregister(glob: Glob): Unit = registered -= glob + + private[sbt] def toMonitoringRepository: FileTreeRepository[T] = { + val legacy = FileTreeRepository.legacy(converter) + registered.foreach(legacy.register) + handles += legacy -> legacy.addObserver(observers) + new FileTreeRepository[T] { + override def listEntries(glob: Glob): Seq[Entry[T]] = legacy.listEntries(glob) + override def list(glob: Glob): Seq[TypedPath] = legacy.list(glob) + def addObserver(observer: Observer[T]): Int = legacy.addObserver(observer) + override def removeObserver(handle: Int): Unit = legacy.removeObserver(handle) + override def close(): Unit = legacy.close() + override def register(glob: Glob): Either[IOException, Boolean] = { + self.register(glob) + legacy.register(glob) + } + override def unregister(glob: Glob): Unit = legacy.unregister(glob) + } + } + } }