[2.x] fix: Fixes chunked upload/download (#9318)

**Problem**
Chunked upload/download doesn't work.

**Solution**
1. This fixes the SERVICE_NAME shading.
2. This fixes the resource name.
3. This fixes download stream processing.
This commit is contained in:
eugene yokota 2026-06-09 08:48:07 -04:00 committed by GitHub
parent 4950462abb
commit 75f1d15a81
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 48 additions and 22 deletions

View File

@ -96,7 +96,7 @@ object Dependencies {
val scalaVerify = "com.eed3si9n.verify" %% "verify" % "1.0.0"
val templateResolverApi = "org.scala-sbt" % "template-resolver" % "0.1"
val remoteapis =
"com.eed3si9n.remoteapis.shaded" % "shaded-remoteapis-java" % "2.3.0-M1-4ee33449fff18243c019f799b636cdb8c8a18f6c"
"com.eed3si9n.remoteapis.shaded" % "shaded-remoteapis-java" % "2.3.0-M1-9fe80facbdb38084ca18c5a75e1905f77ed0fa82"
val gson = "org.scala-sbt.gson" % "shaded-gson" % "2.13.1"
val scalaCompiler = "org.scala-lang" %% "scala3-compiler" % scala3

View File

@ -17,6 +17,7 @@ import build.bazel.remote.execution.v2.{
OutputFile,
UpdateActionResultRequest as XUpdateActionResultRequest,
}
import com.eed3si9n.remoteapis.shaded.com.google.bytestream.{ ByteStreamGrpc, ByteStreamProto }
import com.eed3si9n.remoteapis.shaded.com.google.protobuf.ByteString
import com.eed3si9n.remoteapis.shaded.io.grpc.{
CallCredentials,
@ -28,10 +29,8 @@ import com.eed3si9n.remoteapis.shaded.io.grpc.{
TlsChannelCredentials,
}
import com.eed3si9n.remoteapis.shaded.io.grpc.stub.StreamObserver
import com.eed3si9n.remoteapis.shaded.com.google.bytestream.ByteStreamGrpc
import com.eed3si9n.remoteapis.shaded.com.google.bytestream.ByteStreamProto
import ByteStreamProto.{ ReadRequest, WriteRequest }
import java.io.InputStream
import java.io.{ BufferedOutputStream, FileOutputStream, InputStream, OutputStream }
import java.net.URI
import java.nio.file.Path
import java.util.UUID
@ -218,7 +217,13 @@ class GrpcActionCacheStore(
disk.syncFile(r, casFile, outputDirectory)
else Nil
else
val paths = Await.result(downloadBlobs(digests, outputDirectory), remoteTimeout)
val paths = Await.result(
downloadBlobs(digests, outputDirectory).recover: e =>
e.printStackTrace()
throw e
,
remoteTimeout
)
refs
.zip(digests)
.zip(paths)
@ -235,7 +240,15 @@ class GrpcActionCacheStore(
if blobs.isEmpty then Nil
else if totalBytes <= chunkSizeBytes then batchUpdateBlobs(blobs)
else
try Await.result(uploadBlobs(blobs).recover(_ => Nil), remoteTimeout)
try
Await
.result(
uploadBlobs(blobs).recover: e =>
e.printStackTrace()
throw e
,
remoteTimeout
)
catch case _: TimeoutException => Nil
def batchUpdateBlobs(blobs: Seq[VirtualFile]): Seq[HashedVirtualFileRef] =
@ -299,20 +312,23 @@ class GrpcActionCacheStore(
val p = Promise[Path]()
val uuid = UUID.randomUUID()
val tempFile = outputDirectory.toFile() / s"$uuid.part"
sbt.io.Using.fileOutputStream(false)(tempFile): out =>
val resObs = new StreamObserver[ByteStreamProto.ReadResponse]:
override def onCompleted(): Unit =
p.success(tempFile.toPath())
override def onError(e: Throwable): Unit = p.failure(e)
override def onNext(res: ByteStreamProto.ReadResponse): Unit =
IO.transfer(res.getData().newInput(), out)
val b = ReadRequest.newBuilder()
val dn = downloadName(digest)
b.setResourceName(dn)
b.setReadOffset(0L)
val req = b.build()
byteStreamStub.read(req, resObs)
p.future
val resObs = new StreamObserver[ByteStreamProto.ReadResponse]:
val out = new BufferedOutputStream(new FileOutputStream(tempFile))
override def onCompleted(): Unit =
out.close()
p.success(tempFile.toPath())
override def onError(e: Throwable): Unit =
out.close()
p.failure(e)
override def onNext(res: ByteStreamProto.ReadResponse): Unit =
transfer(res.getData(), out)
val b = ReadRequest.newBuilder()
val dn = downloadName(digest)
b.setResourceName(dn)
b.setReadOffset(0L)
val req = b.build()
byteStreamStub.read(req, resObs)
p.future
// helper function for many-to-one gRPC streaming
// https://grpc.io/docs/languages/java/basics/#client-side-streaming-rpc-1
@ -335,14 +351,14 @@ class GrpcActionCacheStore(
* https://github.com/bazelbuild/remote-apis/blob/main/build/bazel/remote/execution/v2/remote_execution.proto#L219-L220
*/
private def uploadName(d: Digest, uuid: UUID): String =
s"$instanceName/uploads/$uuid/blobs/${d.algo}/${d.hashHexString}/${d.sizeBytes}"
s"$instanceName/uploads/$uuid/blobs/${d.hashHexString}/${d.sizeBytes}"
/**
* resource name is load-bearing.
* https://github.com/bazelbuild/remote-apis/blob/main/build/bazel/remote/execution/v2/remote_execution.proto#L294-L295
*/
private def downloadName(d: Digest): String =
s"$instanceName/blobs/${d.algo}/${d.hashHexString}/${d.sizeBytes}"
s"$instanceName/blobs/${d.hashHexString}/${d.sizeBytes}"
/**
* https://github.com/bazelbuild/remote-apis/blob/96942a2107c702ed3ca4a664f7eeb7c85ba8dc77/build/bazel/remote/execution/v2/remote_execution.proto#L1629
@ -432,4 +448,14 @@ class GrpcActionCacheStore(
readBytes > 0
do ()
out.toByteString()
private def transfer(data: ByteString, out: OutputStream): Unit =
val bufferSize = 8192
Using.resource(data.newInput()): input =>
val buf = new Array[Byte](bufferSize)
while
val readBytes = input.read(buf)
if readBytes >= 0 then out.write(buf, 0, readBytes)
readBytes >= 0
do ()
end GrpcActionCacheStore