Implement on after sync event

When the disk cache syncs dirzip file, it compares the
item hashes against the existing files, and synchronizes them
using the disk cache.
This commit is contained in:
Eugene Yokota 2024-08-11 02:49:46 -04:00
parent eb9a507419
commit c9e5924b09
14 changed files with 264 additions and 84 deletions

View File

@ -429,7 +429,7 @@ lazy val utilCache = project
.enablePlugins(
ContrabandPlugin,
// we generate JsonCodec only for actionresult.conta
// JsonCodecPlugin,
JsonCodecPlugin,
)
.dependsOn(utilLogging)
.settings(

View File

@ -5,6 +5,7 @@
## test scoped task
## this should not force any Scala version changes to other subprojects
> debug
> + baz/check
## test input task

View File

@ -28,7 +28,7 @@ import com.eed3si9n.remoteapis.shaded.io.grpc.{
TlsChannelCredentials,
}
import java.net.URI
import java.nio.file.{ Files, Path }
import java.nio.file.Path
import sbt.util.{
AbstractActionCacheStore,
ActionResult,
@ -197,14 +197,7 @@ class GrpcActionCacheStore(
val digest = Digest(r)
val blob = lookupResponse(digest)
val casFile = disk.putBlob(blob.getData().newInput(), digest)
val shortPath =
if r.id.startsWith("${OUT}/") then r.id.drop(7)
else r.id
val outPath = outputDirectory.resolve(shortPath)
Files.createDirectories(outPath.getParent())
if outPath.toFile().exists() then IO.delete(outPath.toFile())
Files.createSymbolicLink(outPath, casFile)
outPath
disk.syncFile(r, casFile, outputDirectory)
else Nil
/**

View File

@ -0,0 +1,11 @@
/**
* This code is generated using [[https://www.scala-sbt.org/contraband/ sbt-contraband]].
*/
// DO NOT EDIT MANUALLY
package sbt.internal.util.codec
trait ActionResultCodec extends sbt.internal.util.codec.HashedVirtualFileRefFormats
with sbt.internal.util.codec.ByteBufferFormats
with sjsonnew.BasicJsonProtocol
with sbt.internal.util.codec.ActionResultFormats
object ActionResultCodec extends ActionResultCodec

View File

@ -0,0 +1,35 @@
/**
* This code is generated using [[https://www.scala-sbt.org/contraband/ sbt-contraband]].
*/
// DO NOT EDIT MANUALLY
package sbt.internal.util.codec
import _root_.sjsonnew.{ Unbuilder, Builder, JsonFormat, deserializationError }
trait ActionResultFormats { self: sbt.internal.util.codec.HashedVirtualFileRefFormats with sbt.internal.util.codec.ByteBufferFormats with sjsonnew.BasicJsonProtocol =>
implicit lazy val ActionResultFormat: JsonFormat[sbt.util.ActionResult] = new JsonFormat[sbt.util.ActionResult] {
override def read[J](__jsOpt: Option[J], unbuilder: Unbuilder[J]): sbt.util.ActionResult = {
__jsOpt match {
case Some(__js) =>
unbuilder.beginObject(__js)
val outputFiles = unbuilder.readField[Vector[xsbti.HashedVirtualFileRef]]("outputFiles")
val origin = unbuilder.readField[Option[String]]("origin")
val exitCode = unbuilder.readField[Option[Int]]("exitCode")
val contents = unbuilder.readField[Vector[java.nio.ByteBuffer]]("contents")
val isExecutable = unbuilder.readField[Vector[Boolean]]("isExecutable")
unbuilder.endObject()
sbt.util.ActionResult(outputFiles, origin, exitCode, contents, isExecutable)
case None =>
deserializationError("Expected JsObject but found None")
}
}
override def write[J](obj: sbt.util.ActionResult, builder: Builder[J]): Unit = {
builder.beginObject()
builder.addField("outputFiles", obj.outputFiles)
builder.addField("origin", obj.origin)
builder.addField("exitCode", obj.exitCode)
builder.addField("contents", obj.contents)
builder.addField("isExecutable", obj.isExecutable)
builder.endObject()
}
}
}

View File

@ -0,0 +1,10 @@
/**
* This code is generated using [[https://www.scala-sbt.org/contraband/ sbt-contraband]].
*/
// DO NOT EDIT MANUALLY
package sbt.internal.util.codec
trait ManifestCodec extends sbt.internal.util.codec.HashedVirtualFileRefFormats
with sjsonnew.BasicJsonProtocol
with sbt.internal.util.codec.ManifestFormats
object ManifestCodec extends ManifestCodec

View File

@ -0,0 +1,29 @@
/**
* This code is generated using [[https://www.scala-sbt.org/contraband/ sbt-contraband]].
*/
// DO NOT EDIT MANUALLY
package sbt.internal.util.codec
import _root_.sjsonnew.{ Unbuilder, Builder, JsonFormat, deserializationError }
trait ManifestFormats { self: sbt.internal.util.codec.HashedVirtualFileRefFormats with sjsonnew.BasicJsonProtocol =>
implicit lazy val ManifestFormat: JsonFormat[sbt.util.Manifest] = new JsonFormat[sbt.util.Manifest] {
override def read[J](__jsOpt: Option[J], unbuilder: Unbuilder[J]): sbt.util.Manifest = {
__jsOpt match {
case Some(__js) =>
unbuilder.beginObject(__js)
val version = unbuilder.readField[String]("version")
val outputFiles = unbuilder.readField[Vector[xsbti.HashedVirtualFileRef]]("outputFiles")
unbuilder.endObject()
sbt.util.Manifest(version, outputFiles)
case None =>
deserializationError("Expected JsObject but found None")
}
}
override def write[J](obj: sbt.util.Manifest, builder: Builder[J]): Unit = {
builder.beginObject()
builder.addField("version", obj.version)
builder.addField("outputFiles", obj.outputFiles)
builder.endObject()
}
}
}

View File

@ -0,0 +1,38 @@
/**
* This code is generated using [[https://www.scala-sbt.org/contraband/ sbt-contraband]].
*/
// DO NOT EDIT MANUALLY
package sbt.util
/** A manifest of cached directory etc. */
final class Manifest private (
val version: String,
val outputFiles: Vector[xsbti.HashedVirtualFileRef]) extends Serializable {
private def this(version: String) = this(version, Vector())
override def equals(o: Any): Boolean = this.eq(o.asInstanceOf[AnyRef]) || (o match {
case x: Manifest => (this.version == x.version) && (this.outputFiles == x.outputFiles)
case _ => false
})
override def hashCode: Int = {
37 * (37 * (37 * (17 + "sbt.util.Manifest".##) + version.##) + outputFiles.##)
}
override def toString: String = {
"Manifest(" + version + ", " + outputFiles + ")"
}
private[this] def copy(version: String = version, outputFiles: Vector[xsbti.HashedVirtualFileRef] = outputFiles): Manifest = {
new Manifest(version, outputFiles)
}
def withVersion(version: String): Manifest = {
copy(version = version)
}
def withOutputFiles(outputFiles: Vector[xsbti.HashedVirtualFileRef]): Manifest = {
copy(outputFiles = outputFiles)
}
}
object Manifest {
def apply(version: String): Manifest = new Manifest(version)
def apply(version: String, outputFiles: Vector[xsbti.HashedVirtualFileRef]): Manifest = new Manifest(version, outputFiles)
}

View File

@ -0,0 +1,10 @@
package sbt.util
@target(Scala)
@codecPackage("sbt.internal.util.codec")
@fullCodec("ManifestCodec")
## A manifest of cached directory etc.
type Manifest {
version: String!
outputFiles: [xsbti.HashedVirtualFileRef] @since("0.1.0")
}

View File

@ -1,14 +1,14 @@
package sbt.util
@target(Scala)
type UpdateActionResultRequest {
type UpdateActionResultRequest @generateCodec(false) {
actionDigest: sbt.util.Digest!
outputFiles: [xsbti.VirtualFile] @since("0.1.0")
exitCode: Int @since("0.2.0")
isExecutable: [Boolean] @since("0.3.0")
}
type GetActionResultRequest {
type GetActionResultRequest @generateCodec(false) {
actionDigest: sbt.util.Digest!
inlineStdout: Boolean @since("0.1.0")
inlineStderr: Boolean @since("0.1.0")

View File

@ -1,12 +0,0 @@
/**
* This code is generated using [[https://www.scala-sbt.org/contraband/ sbt-contraband]].
*/
// DO NOT EDIT MANUALLY
package sbt.internal.util.codec
trait ActionResultCodec
extends sbt.internal.util.codec.HashedVirtualFileRefFormats
with sbt.internal.util.codec.ByteBufferFormats
with sjsonnew.BasicJsonProtocol
with sbt.internal.util.codec.ActionResultFormats
object ActionResultCodec extends ActionResultCodec

View File

@ -1,39 +0,0 @@
/**
* This code is generated using [[https://www.scala-sbt.org/contraband/ sbt-contraband]].
*/
// DO NOT EDIT MANUALLY
package sbt.internal.util.codec
import _root_.sjsonnew.{ Unbuilder, Builder, JsonFormat, deserializationError }
trait ActionResultFormats {
self: sbt.internal.util.codec.HashedVirtualFileRefFormats
with sbt.internal.util.codec.ByteBufferFormats
with sjsonnew.BasicJsonProtocol =>
implicit lazy val ActionResultFormat: JsonFormat[sbt.util.ActionResult] =
new JsonFormat[sbt.util.ActionResult] {
override def read[J](__jsOpt: Option[J], unbuilder: Unbuilder[J]): sbt.util.ActionResult = {
__jsOpt match {
case Some(__js) =>
unbuilder.beginObject(__js)
val outputFiles = unbuilder.readField[Vector[xsbti.HashedVirtualFileRef]]("outputFiles")
val origin = unbuilder.readField[Option[String]]("origin")
val exitCode = unbuilder.readField[Option[Int]]("exitCode")
val contents = unbuilder.readField[Vector[java.nio.ByteBuffer]]("contents")
val isExecutable = unbuilder.readField[Vector[Boolean]]("isExecutable")
unbuilder.endObject()
sbt.util.ActionResult(outputFiles, origin, exitCode, contents, isExecutable)
case None =>
deserializationError("Expected JsObject but found None")
}
}
override def write[J](obj: sbt.util.ActionResult, builder: Builder[J]): Unit = {
builder.beginObject()
builder.addField("outputFiles", obj.outputFiles)
builder.addField("origin", obj.origin)
builder.addField("exitCode", obj.exitCode)
builder.addField("contents", obj.contents)
builder.addField("isExecutable", obj.isExecutable)
builder.endObject()
}
}
}

View File

@ -1,20 +1,25 @@
package sbt.util
import java.io.File
import java.nio.file.Paths
import java.nio.charset.StandardCharsets
import java.nio.file.{ Path, Paths }
import sbt.internal.util.{ ActionCacheEvent, CacheEventLog, StringVirtualFile1 }
import sbt.io.syntax.*
import sbt.io.IO
import sbt.nio.file.{ **, FileTreeView }
import sbt.nio.file.syntax.*
import scala.reflect.ClassTag
import scala.annotation.{ meta, StaticAnnotation }
import sjsonnew.{ HashWriter, JsonFormat }
import sjsonnew.support.murmurhash.Hasher
import sjsonnew.support.scalajson.unsafe.{ CompactPrinter, Converter, Parser }
import xsbti.{ FileConverter, VirtualFile, VirtualFileRef }
import java.nio.charset.StandardCharsets
import java.nio.file.Path
import scala.quoted.{ Expr, FromExpr, ToExpr, Quotes }
import xsbti.{ FileConverter, HashedVirtualFileRef, VirtualFile, VirtualFileRef }
object ActionCache:
private[sbt] val dirZipExt = ".sbtdir.zip"
private[sbt] val manifestFileName = "sbtdir_manifest.json"
/**
* This is a key function that drives remote caching.
* This is intended to be called from the cached task macro for the most part.
@ -68,7 +73,7 @@ object ActionCache:
val newOutputs = Vector(valueFile) ++ outputs.toVector
store.put(UpdateActionResultRequest(input, newOutputs, exitCode = 0)) match
case Right(cachedResult) =>
store.syncBlobs(cachedResult.outputFiles, config.outputDirectory)
syncBlobs(cachedResult.outputFiles)
result
case Left(e) => throw e
@ -77,6 +82,9 @@ object ActionCache:
val json = Parser.parseUnsafe(str)
Converter.fromJsonUnsafe[O](json)
def syncBlobs(refs: Seq[HashedVirtualFileRef]): Seq[Path] =
store.syncBlobs(refs, config.outputDirectory)
val getRequest =
GetActionResultRequest(input, inlineStdout = false, inlineStderr = false, Vector(valuePath))
store.get(getRequest) match
@ -84,24 +92,54 @@ object ActionCache:
// some protocol can embed values into the result
result.contents.headOption match
case Some(head) =>
store.syncBlobs(result.outputFiles, config.outputDirectory)
syncBlobs(result.outputFiles)
val str = String(head.array(), StandardCharsets.UTF_8)
valueFromStr(str, result.origin)
case _ =>
val paths = store.syncBlobs(result.outputFiles, config.outputDirectory)
val paths = syncBlobs(result.outputFiles)
if paths.isEmpty then organicTask
else valueFromStr(IO.read(paths.head.toFile()), result.origin)
case Left(_) => organicTask
end cache
def packageDirectory(dir: VirtualFileRef, conv: FileConverter): VirtualFile =
import sbt.io.syntax.*
def manifestFromFile(manifest: Path): Manifest =
import sbt.internal.util.codec.ManifestCodec.given
val json = Parser.parseFromFile(manifest.toFile()).get
Converter.fromJsonUnsafe[Manifest](json)
def packageDirectory(
dir: VirtualFileRef,
conv: FileConverter,
outputDirectory: Path,
): VirtualFile =
import sbt.internal.util.codec.ManifestCodec.given
val dirPath = conv.toPath(dir)
val dirFile = dirPath.toFile()
val zipPath = Paths.get(dirPath.toString + ".dirzip")
val rebase: File => Seq[(File, String)] =
f => if f != dirFile then (f -> dirPath.relativize(f.toPath).toString) :: Nil else Nil
IO.zip(dirFile.allPaths.get().flatMap(rebase), zipPath.toFile(), None)
conv.toVirtualFile(zipPath)
val allPaths = FileTreeView.default.list(dirPath.toGlob / ** / "*")
// create a manifest of files and their hashes here
def makeManifest(manifestFile: Path): Unit =
val vfs = (allPaths.flatMap {
case (p, attr) if !attr.isDirectory =>
Some(conv.toVirtualFile(p): HashedVirtualFileRef)
case _ => None
}).toVector
val manifest = Manifest(
version = "0.1.0",
outputFiles = vfs,
)
val str = CompactPrinter(Converter.toJsonUnsafe(manifest))
IO.write(manifestFile.toFile(), str)
IO.withTemporaryDirectory: tempDir =>
val mPath = (tempDir / manifestFileName).toPath()
makeManifest(mPath)
val zipPath = Paths.get(dirPath.toString + dirZipExt)
val rebase: Path => Seq[(File, String)] =
(p: Path) =>
p match
case p if p == dirPath => Nil
case p if p == mPath => (mPath.toFile() -> manifestFileName) :: Nil
case f => (f.toFile() -> outputDirectory.relativize(f).toString) :: Nil
IO.zip((allPaths.map(_._1) ++ Seq(mPath)).flatMap(rebase), zipPath.toFile(), None)
conv.toVirtualFile(zipPath)
/**
* Represents a value and output files, used internally by the macro.

View File

@ -2,15 +2,18 @@ package sbt.util
import java.io.RandomAccessFile
import java.nio.ByteBuffer
import java.nio.file.{ Files, Path }
import java.nio.file.{ Files, Path, Paths }
import sjsonnew.*
import sjsonnew.support.scalajson.unsafe.{ CompactPrinter, Converter, Parser }
import sjsonnew.shaded.scalajson.ast.unsafe.JValue
import scala.collection.mutable
import scala.util.control.NonFatal
import sbt.internal.io.Retry
import sbt.io.IO
import sbt.io.syntax.*
import sbt.nio.file.{ **, FileTreeView }
import sbt.nio.file.syntax.*
import sbt.internal.util.StringVirtualFile1
import sbt.internal.util.codec.ActionResultCodec.given
import xsbti.{ HashedVirtualFileRef, PathBasedFile, VirtualFile }
@ -215,6 +218,13 @@ class DiskActionCacheStore(base: Path) extends AbstractActionCacheStore:
def toCasFile(digest: Digest): Path =
(casBase.toFile / digest.toString.replace("/", "-")).toPath()
def putBlob(blob: Path, digest: Digest): Path =
val in = Files.newInputStream(blob)
try
putBlob(in, digest)
finally
in.close()
def putBlob(input: InputStream, digest: Digest): Path =
val casFile = toCasFile(digest)
IO.transfer(input, casFile.toFile())
@ -243,7 +253,9 @@ class DiskActionCacheStore(base: Path) extends AbstractActionCacheStore:
override def syncBlobs(refs: Seq[HashedVirtualFileRef], outputDirectory: Path): Seq[Path] =
refs.flatMap: r =>
val casFile = toCasFile(Digest(r))
if casFile.toFile().exists then Some(syncFile(r, casFile, outputDirectory))
if casFile.toFile().exists then
// println(s"syncBlobs: $casFile exists for $r")
Some(syncFile(r, casFile, outputDirectory))
else None
def syncFile(ref: HashedVirtualFileRef, casFile: Path, outputDirectory: Path): Path =
@ -253,16 +265,70 @@ class DiskActionCacheStore(base: Path) extends AbstractActionCacheStore:
val d = Digest(ref)
def symlinkAndNotify(outPath: Path): Path =
Files.createDirectories(outPath.getParent())
val result = Files.createSymbolicLink(outPath, casFile)
// after(result)
val result = Retry:
if Files.exists(outPath) then IO.delete(outPath.toFile())
Files.createSymbolicLink(outPath, casFile)
afterFileWrite(ref, result, outputDirectory)
result
outputDirectory.resolve(shortPath) match
case p if !p.toFile().exists() => symlinkAndNotify(p)
case p if Digest.sameDigest(p, d) => p
case p if !Files.exists(p) =>
// println(s"- syncFile: $p does not exist")
symlinkAndNotify(p)
case p if Digest.sameDigest(p, d) =>
// println(s"- syncFile: $p has same digest")
p
case p =>
// println(s"- syncFile: $p has different digest")
IO.delete(p.toFile())
symlinkAndNotify(p)
/**
* Emulate virtual side effects.
*/
def afterFileWrite(ref: HashedVirtualFileRef, path: Path, outputDirectory: Path): Unit =
if path.toString().endsWith(ActionCache.dirZipExt) then unpackageDirZip(path, outputDirectory)
else ()
/**
* Given a dirzip, unzip it in a temp directory, and sync each items to the outputDirectory.
*/
private def unpackageDirZip(dirzip: Path, outputDirectory: Path): Path =
val dirPath = Paths.get(dirzip.toString.dropRight(ActionCache.dirZipExt.size))
Files.createDirectories(dirPath)
val allPaths = mutable.Set(
FileTreeView.default
.list(dirPath.toGlob / ** / "*")
.filter(!_._2.isDirectory)
.map(_._1): _*
)
def doSync(ref: HashedVirtualFileRef, in: Path): Unit =
val d = Digest(ref)
val casFile = putBlob(in, d)
syncFile(ref, casFile, outputDirectory)
IO.withTemporaryDirectory: tempDir =>
IO.unzip(dirzip.toFile(), tempDir)
val mPath = (tempDir / ActionCache.manifestFileName).toPath()
if !Files.exists(mPath) then sys.error(s"manifest is missing from $dirzip")
// manifest contains the list of files in the dirzip, and their hashes
val m = ActionCache.manifestFromFile(mPath)
m.outputFiles.foreach: ref =>
val shortPath =
if ref.id.startsWith("${OUT}/") then ref.id.drop(7)
else ref.id
val currentItem = outputDirectory.resolve(shortPath)
allPaths.remove(currentItem)
val d = Digest(ref)
currentItem match
case p if !Files.exists(p) => doSync(ref, tempDir.toPath().resolve(shortPath))
case p if Digest.sameDigest(p, d) => ()
case p =>
IO.delete(p.toFile())
doSync(ref, tempDir.toPath().resolve(shortPath))
// sync deleted files
allPaths.foreach: path =>
IO.delete(path.toFile())
dirPath
override def findBlobs(refs: Seq[HashedVirtualFileRef]): Seq[HashedVirtualFileRef] =
refs.flatMap: r =>
val casFile = toCasFile(Digest(r))