Add support for in memory cache store

It can be quite slow to read and parse a large json file. Often, we are
reading and writing the same file over and over even though it isn't
actually changing. This is particularly noticeable with the
UpdateReport*. To speed this up, I introduce a global cache that can be
used to read values from a CacheStore. When using the cache, I've seen
the time for the update task drop from about 200ms to about 1ms. This
ends up being a 400ms time savings for test because update is called for
both Compile / compile and Test / compile.

The way that this works is that I add a new abstraction
CacheStoreFactoryFactory, which is the most enterprise java thing I've
ever written. We store a CacheStoreFactoryFactory in the sbt State.
When we make Streams for the task, we make the Stream's
cacheStoreFactory field using the CacheStoreFactoryFactory. The
generated CacheStoreFactory may or may not refer to a global cache.

The CacheStoreFactoryFactory may produce CacheStoreFactory instances
that delegate to a Caffeine cache with a max size parameter that is
specified in bytes by the fileCacheSize setting (which can also be set
with -Dsbt.file.cache.size). The size of the cache entry is estimated by
the size of the contents on disk. Since we are generally storing things
in the cache that are serialized as json, I figure that this should be a
reasonable estimate. I set the default max cache size to 128MB, which is
plenty of space for the previous cache entries for most projects. If the
size is set to 0, the CacheStoreFactoryFactory generates a regular
DirectoryStoreFactory.

To ensure that the cache accurately reflects the disk state of the
previous cache (or other cache's using a CacheStore), the Caffeine cache
stores the last modified time of the file whose contents it should
represent. If there is a discrepancy in the last modified times (which
would happen if, say, clean has been run), then the value is read from
disk even if the value hasn't changed.

* With the following build.sbt file, it takes roughly 200ms to read and
parse the update report on my compute:

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.3"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.1"

This is because spark-sql has an enormous number of dependencies and the
update report ends up being 3MB.
This commit is contained in:
Ethan Atkins 2019-07-08 20:42:04 -07:00
parent d9af39c90c
commit cad89d17a9
8 changed files with 197 additions and 49 deletions

View File

@ -7,12 +7,13 @@
package sbt
import Def.{ Initialize, ScopedKey }
import Previous._
import sbt.internal.util.{ ~>, IMap, RMap }
import sbt.util.{ Input, Output, StampedFormat }
import sbt.Def.{ Initialize, ScopedKey }
import sbt.Previous._
import sbt.Scope.Global
import sbt.internal.util.{ IMap, RMap, ~> }
import sbt.util.StampedFormat
import sjsonnew.JsonFormat
import Scope.Global
import scala.util.control.NonFatal
/**
@ -26,9 +27,8 @@ private[sbt] final class Previous(streams: Streams, referenced: IMap[ScopedTaskK
private[this] final class ReferencedValue[T](referenced: Referenced[T]) {
import referenced.{ stamped, task }
lazy val previousValue: Option[T] = {
val in = streams(task).getInput(task, StreamName)
try read(in, stamped)
finally in.close()
try Option(streams(task).cacheStoreFactory.make(StreamName).read[T]()(stamped))
catch { case NonFatal(_) => None }
}
}
@ -82,9 +82,9 @@ object Previous {
val map = referenced.getReferences
def impl[T](key: ScopedKey[_], result: T): Unit =
for (i <- map.get(key.asInstanceOf[ScopedTaskKey[T]])) {
val out = streams.apply(i.task).getOutput(StreamName)
try write(out, i.stamped, result)
finally out.close()
val out = streams.apply(i.task).cacheStoreFactory.make(StreamName)
try out.write(result)(i.stamped)
catch { case NonFatal(_) => }
}
for {
@ -93,14 +93,6 @@ object Previous {
} impl(key, result)
}
private def read[T](input: Input, format: JsonFormat[T]): Option[T] =
try Some(input.read()(format))
catch { case NonFatal(_) => None }
private def write[T](output: Output, format: JsonFormat[T], value: T): Unit =
try output.write(value)(format)
catch { case NonFatal(_) => () }
/** Public as a macro implementation detail. Do not call directly. */
def runtime[T](skey: TaskKey[T])(implicit format: JsonFormat[T]): Initialize[Task[Option[T]]] = {
val inputs = (cache in Global) zip Def.validated(skey, selfRefOk = true) zip (references in Global)

View File

@ -79,6 +79,7 @@ import sbt.util.CacheImplicits._
import sbt.util.InterfaceUtil.{ toJavaFunction => f1 }
import sbt.util._
import sjsonnew._
import sjsonnew.support.scalajson.unsafe.Converter
import xsbti.CrossValue
import xsbti.compile.{ AnalysisContents, IncOptions, IncToolOptionsUtil }
@ -2753,7 +2754,12 @@ object Classpaths {
): Initialize[Task[UpdateReport]] = Def.task {
val s = streams.value
val cacheDirectory = crossTarget.value / cacheLabel / updateCacheName.value
val cacheStoreFactory: CacheStoreFactory = CacheStoreFactory.directory(cacheDirectory)
import CacheStoreFactory.jvalueIsoString
val cacheStoreFactory: CacheStoreFactory = {
val factory = state.value.get(Keys.cacheStoreFactory).getOrElse(InMemoryCacheStore.factory(0))
factory(cacheDirectory.toPath, Converter)
}
val isRoot = executionRoots.value contains resolvedScoped.value
val shouldForce = isRoot || {

View File

@ -10,12 +10,15 @@ package sbt
import java.io.File
import java.net.URL
import lmcoursier.definitions.CacheLogger
import lmcoursier.{ CoursierConfiguration, FallbackDependency }
import org.apache.ivy.core.module.descriptor.ModuleDescriptor
import org.apache.ivy.core.module.id.ModuleRevisionId
import org.apache.logging.log4j.core.Appender
import sbt.BuildSyntax._
import sbt.Def.ScopedKey
import sbt.KeyRanks._
import sbt.internal.InMemoryCacheStore.CacheStoreFactoryFactory
import sbt.internal._
import sbt.internal.inc.ScalaInstance
import sbt.internal.io.WatchState
@ -31,8 +34,6 @@ import sbt.nio.file.Glob
import sbt.testing.Framework
import sbt.util.{ Level, Logger }
import xsbti.compile._
import lmcoursier.definitions.CacheLogger
import lmcoursier.{ CoursierConfiguration, FallbackDependency }
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.xml.{ NodeSeq, Node => XNode }
@ -487,6 +488,8 @@ object Keys {
val pluginData = taskKey[PluginData]("Information from the plugin build needed in the main build definition.").withRank(DTask)
val globalPluginUpdate = taskKey[UpdateReport]("A hook to get the UpdateReport of the global plugin.").withRank(DTask)
private[sbt] val taskCancelStrategy = settingKey[State => TaskCancellationStrategy]("Experimental task cancellation handler.").withRank(DTask)
private[sbt] val cacheStoreFactory = AttributeKey[CacheStoreFactoryFactory]("cache-store-factory")
val fileCacheSize = settingKey[Long]("The approximate maximum size in bytes of the cache used to store previous task results.")
// Experimental in sbt 0.13.2 to enable grabbing semantic compile failures.
private[sbt] val compilerReporter = taskKey[xsbti.Reporter]("Experimental hook to listen (or send) compilation failure messages.").withRank(DTask)

View File

@ -840,9 +840,17 @@ object BuiltinCommands {
val session = Load.initialSession(structure, eval, s0)
SessionSettings.checkSession(session, s)
Project
.setProject(session, structure, s)
.put(sbt.nio.Keys.hasCheckedMetaBuild, new AtomicBoolean(false))
addCacheStoreFactoryFactory(
Project
.setProject(session, structure, s)
.put(sbt.nio.Keys.hasCheckedMetaBuild, new AtomicBoolean(false))
)
}
private val addCacheStoreFactoryFactory: State => State = (s: State) => {
val size = Project.extract(s).getOpt(Keys.fileCacheSize).getOrElse(SysProp.fileCacheSize)
s.get(Keys.cacheStoreFactory).foreach(_.close())
s.put(Keys.cacheStoreFactory, InMemoryCacheStore.factory(size))
}
def registerCompilerCache(s: State): State = {
@ -857,7 +865,7 @@ object BuiltinCommands {
def clearCaches: Command = {
val help = Help.more(ClearCaches, ClearCachesDetailed)
val f: State => State = registerCompilerCache _ andThen (_.initializeClassLoaderCache)
val f: State => State = registerCompilerCache _ andThen (_.initializeClassLoaderCache) andThen addCacheStoreFactoryFactory
Command.command(ClearCaches, help)(f)
}

View File

@ -10,14 +10,16 @@ package internal
import java.io.File
import java.net.URI
import Def.{ displayFull, ScopedKey, ScopeLocal, Setting }
import Def.{ ScopeLocal, ScopedKey, Setting, displayFull }
import BuildPaths.outputDirectory
import Scope.GlobalScope
import BuildStreams.Streams
import sbt.io.syntax._
import sbt.internal.util.{ Attributed, AttributeEntry, AttributeKey, AttributeMap, Settings }
import sbt.internal.util.{ AttributeEntry, AttributeKey, AttributeMap, Attributed, Settings }
import sbt.internal.util.Attributed.data
import sbt.util.Logger
import sjsonnew.SupportConverter
import sjsonnew.shaded.scalajson.ast.unsafe.JValue
final class BuildStructure(
@ -305,7 +307,10 @@ object BuildStreams {
path(units, root, data),
displayFull,
LogManager.construct(data, s),
sjsonnew.support.scalajson.unsafe.Converter
sjsonnew.support.scalajson.unsafe.Converter, {
val factory = s.get(Keys.cacheStoreFactory).getOrElse(InMemoryCacheStore.factory(0))
(file, converter: SupportConverter[JValue]) => factory(file.toPath, converter)
}
)
}
}

View File

@ -0,0 +1,135 @@
/*
* sbt
* Copyright 2011 - 2018, Lightbend, Inc.
* Copyright 2008 - 2010, Mark Harrah
* Licensed under Apache License 2.0 (see LICENSE)
*/
package sbt.internal
import java.io.IOException
import java.lang.Math.toIntExact
import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.{ Files, Path }
import java.util.concurrent.atomic.AtomicReference
import com.github.benmanes.caffeine.cache.{ Cache, Caffeine, Weigher }
import sbt.io.IO
import sbt.util.{ CacheStore, CacheStoreFactory, DirectoryStoreFactory }
import sjsonnew.{ IsoString, JsonReader, JsonWriter, SupportConverter }
private[sbt] object InMemoryCacheStore {
private[this] class InMemoryCacheStore(maxSize: Long) extends AutoCloseable {
private[this] val weigher: Weigher[Path, (Any, Long, Int)] = { case (_, (_, _, size)) => size }
private[this] val files: Cache[Path, (Any, Long, Int)] = Caffeine
.newBuilder()
.maximumWeight(maxSize)
.weigher(weigher)
.build()
def get[T](path: Path): Option[(T, Long)] = {
files.getIfPresent(path) match {
case null => None
case (value: T @unchecked, lastModified, _) => Some((value, lastModified))
}
}
def put(path: Path, value: Any, lastModified: Long): Unit = {
try {
if (lastModified > 0) {
val attributes = Files.readAttributes(path, classOf[BasicFileAttributes])
files.put(path, (value, lastModified, toIntExact(attributes.size)))
}
} catch {
case _: IOException | _: ArithmeticException => files.invalidate(path)
}
}
def remove(path: Path): Unit = files.invalidate(path)
override def close(): Unit = {
files.invalidateAll()
files.cleanUp()
}
}
private[this] class CacheStoreImpl(path: Path, store: InMemoryCacheStore, cacheStore: CacheStore)
extends CacheStore {
override def delete(): Unit = cacheStore.delete()
override def read[T]()(implicit reader: JsonReader[T]): T = {
val lastModified = IO.getModifiedTimeOrZero(path.toFile)
store.get[T](path) match {
case Some((value: T, `lastModified`)) => value
case _ => cacheStore.read[T]()
}
}
override def write[T](value: T)(implicit writer: JsonWriter[T]): Unit = {
/*
* This may be inefficient if multiple threads are concurrently modifying the file.
* There is an assumption that there will be little to no concurrency at the file level
* of this cache. If this assumption is invalidated, we may need to do something more
* complicated.
*/
val lastModified = IO.getModifiedTimeOrZero(path.toFile)
store.get[T](path) match {
case Some((v, `lastModified`)) if v == value => // nothing has changed
case _ =>
store.remove(path)
cacheStore.write(value)
val newLastModified = System.currentTimeMillis
IO.setModifiedTimeOrFalse(path.toFile, newLastModified)
store.put(path, value, newLastModified)
}
}
override def close(): Unit = {
store.remove(path)
cacheStore.close()
}
}
private[this] def factory[J: IsoString](
store: InMemoryCacheStore,
path: Path,
converter: SupportConverter[J]
): CacheStoreFactory = {
val delegate = new DirectoryStoreFactory(path.toFile, converter)
new CacheStoreFactory {
override def make(identifier: String): CacheStore =
new CacheStoreImpl(path.resolve(identifier), store, delegate.make(identifier))
override def sub(identifier: String): CacheStoreFactory =
factory(store, path.resolve(identifier), converter)
}
}
private[sbt] trait CacheStoreFactoryFactory extends AutoCloseable {
def apply[J: IsoString](path: Path, supportConverter: SupportConverter[J]): CacheStoreFactory
}
private[this] class CacheStoreFactoryFactoryImpl(size: Long) extends CacheStoreFactoryFactory {
private[this] val storeRef = new AtomicReference[InMemoryCacheStore]
override def close(): Unit = Option(storeRef.get).foreach(_.close())
def apply[J: IsoString](
path: Path,
supportConverter: SupportConverter[J]
): CacheStoreFactory = {
val store = storeRef.get match {
case null =>
storeRef.synchronized {
storeRef.get match {
case null =>
val s = new InMemoryCacheStore(size)
storeRef.set(s)
s
case s => s
}
}
case s => s
}
factory(store, path, supportConverter)
}
}
private[this] object DirectoryFactory extends CacheStoreFactoryFactory {
override def apply[J: IsoString](
path: Path,
supportConverter: SupportConverter[J]
): CacheStoreFactory = new DirectoryStoreFactory(path.toFile, supportConverter)
override def close(): Unit = {}
}
def factory(size: Long): CacheStoreFactoryFactory =
if (size > 0) new CacheStoreFactoryFactoryImpl(size)
else DirectoryFactory
}

View File

@ -84,6 +84,7 @@ object SysProp {
def closeClassLoaders: Boolean = getOrTrue("sbt.classloader.close")
def fileCacheSize: Long = long("sbt.file.cache.size", 128 * 1024 * 1024)
def supershell: Boolean = color && getOrTrue("sbt.supershell")
def supershellSleep: Long = long("sbt.supershell.sleep", 100L)

View File

@ -8,29 +8,14 @@
package sbt
package std
import java.io.{
BufferedInputStream,
BufferedOutputStream,
BufferedReader,
BufferedWriter,
Closeable,
File,
FileInputStream,
FileOutputStream,
IOException,
InputStreamReader,
OutputStreamWriter,
PrintWriter
}
import java.io.{ File => _, _ }
import sbt.internal.io.DeferredWriter
import sbt.internal.util.ManagedLogger
import sbt.io.IO
import sbt.io.syntax._
import sbt.internal.util.ManagedLogger
import sbt.util._
import sjsonnew.{ IsoString, SupportConverter }
import sbt.util.{ CacheStoreFactory, DirectoryStoreFactory, Input, Output, PlainInput, PlainOutput }
// no longer specific to Tasks, so 'TaskStreams' should be renamed
/**
@ -137,6 +122,20 @@ object Streams {
name: Key => String,
mkLogger: (Key, PrintWriter) => ManagedLogger,
converter: SupportConverter[J]
): Streams[Key] =
apply(
taskDirectory,
name,
mkLogger,
converter,
(file, s: SupportConverter[J]) => new DirectoryStoreFactory[J](file, s)
)
private[sbt] def apply[Key, J: IsoString](
taskDirectory: Key => File,
name: Key => String,
mkLogger: (Key, PrintWriter) => ManagedLogger,
converter: SupportConverter[J],
mkFactory: (File, SupportConverter[J]) => CacheStoreFactory
): Streams[Key] = new Streams[Key] {
def apply(a: Key): ManagedStreams[Key] = new ManagedStreams[Key] {
@ -178,8 +177,7 @@ object Streams {
dir
}
lazy val cacheStoreFactory: CacheStoreFactory =
new DirectoryStoreFactory(cacheDirectory, converter)
lazy val cacheStoreFactory: CacheStoreFactory = mkFactory(cacheDirectory, converter)
def log(sid: String): ManagedLogger = mkLogger(a, text(sid))