diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 235c5fad1..b51dddc4a 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -96,7 +96,7 @@ object Dependencies { val scalaVerify = "com.eed3si9n.verify" %% "verify" % "1.0.0" val templateResolverApi = "org.scala-sbt" % "template-resolver" % "0.1" val remoteapis = - "com.eed3si9n.remoteapis.shaded" % "shaded-remoteapis-java" % "2.3.0-M1-4ee33449fff18243c019f799b636cdb8c8a18f6c" + "com.eed3si9n.remoteapis.shaded" % "shaded-remoteapis-java" % "2.3.0-M1-9fe80facbdb38084ca18c5a75e1905f77ed0fa82" val gson = "org.scala-sbt.gson" % "shaded-gson" % "2.13.1" val scalaCompiler = "org.scala-lang" %% "scala3-compiler" % scala3 diff --git a/sbt-remote-cache/src/main/scala/sbt/internal/GrpcActionCacheStore.scala b/sbt-remote-cache/src/main/scala/sbt/internal/GrpcActionCacheStore.scala index e5338abe7..27812671f 100644 --- a/sbt-remote-cache/src/main/scala/sbt/internal/GrpcActionCacheStore.scala +++ b/sbt-remote-cache/src/main/scala/sbt/internal/GrpcActionCacheStore.scala @@ -17,6 +17,7 @@ import build.bazel.remote.execution.v2.{ OutputFile, UpdateActionResultRequest as XUpdateActionResultRequest, } +import com.eed3si9n.remoteapis.shaded.com.google.bytestream.{ ByteStreamGrpc, ByteStreamProto } import com.eed3si9n.remoteapis.shaded.com.google.protobuf.ByteString import com.eed3si9n.remoteapis.shaded.io.grpc.{ CallCredentials, @@ -28,10 +29,8 @@ import com.eed3si9n.remoteapis.shaded.io.grpc.{ TlsChannelCredentials, } import com.eed3si9n.remoteapis.shaded.io.grpc.stub.StreamObserver -import com.eed3si9n.remoteapis.shaded.com.google.bytestream.ByteStreamGrpc -import com.eed3si9n.remoteapis.shaded.com.google.bytestream.ByteStreamProto import ByteStreamProto.{ ReadRequest, WriteRequest } -import java.io.InputStream +import java.io.{ BufferedOutputStream, FileOutputStream, InputStream, OutputStream } import java.net.URI import java.nio.file.Path import java.util.UUID @@ -218,7 +217,13 @@ class GrpcActionCacheStore( disk.syncFile(r, casFile, outputDirectory) else Nil else - val paths = Await.result(downloadBlobs(digests, outputDirectory), remoteTimeout) + val paths = Await.result( + downloadBlobs(digests, outputDirectory).recover: e => + e.printStackTrace() + throw e + , + remoteTimeout + ) refs .zip(digests) .zip(paths) @@ -235,7 +240,15 @@ class GrpcActionCacheStore( if blobs.isEmpty then Nil else if totalBytes <= chunkSizeBytes then batchUpdateBlobs(blobs) else - try Await.result(uploadBlobs(blobs).recover(_ => Nil), remoteTimeout) + try + Await + .result( + uploadBlobs(blobs).recover: e => + e.printStackTrace() + throw e + , + remoteTimeout + ) catch case _: TimeoutException => Nil def batchUpdateBlobs(blobs: Seq[VirtualFile]): Seq[HashedVirtualFileRef] = @@ -299,20 +312,23 @@ class GrpcActionCacheStore( val p = Promise[Path]() val uuid = UUID.randomUUID() val tempFile = outputDirectory.toFile() / s"$uuid.part" - sbt.io.Using.fileOutputStream(false)(tempFile): out => - val resObs = new StreamObserver[ByteStreamProto.ReadResponse]: - override def onCompleted(): Unit = - p.success(tempFile.toPath()) - override def onError(e: Throwable): Unit = p.failure(e) - override def onNext(res: ByteStreamProto.ReadResponse): Unit = - IO.transfer(res.getData().newInput(), out) - val b = ReadRequest.newBuilder() - val dn = downloadName(digest) - b.setResourceName(dn) - b.setReadOffset(0L) - val req = b.build() - byteStreamStub.read(req, resObs) - p.future + val resObs = new StreamObserver[ByteStreamProto.ReadResponse]: + val out = new BufferedOutputStream(new FileOutputStream(tempFile)) + override def onCompleted(): Unit = + out.close() + p.success(tempFile.toPath()) + override def onError(e: Throwable): Unit = + out.close() + p.failure(e) + override def onNext(res: ByteStreamProto.ReadResponse): Unit = + transfer(res.getData(), out) + val b = ReadRequest.newBuilder() + val dn = downloadName(digest) + b.setResourceName(dn) + b.setReadOffset(0L) + val req = b.build() + byteStreamStub.read(req, resObs) + p.future // helper function for many-to-one gRPC streaming // https://grpc.io/docs/languages/java/basics/#client-side-streaming-rpc-1 @@ -335,14 +351,14 @@ class GrpcActionCacheStore( * https://github.com/bazelbuild/remote-apis/blob/main/build/bazel/remote/execution/v2/remote_execution.proto#L219-L220 */ private def uploadName(d: Digest, uuid: UUID): String = - s"$instanceName/uploads/$uuid/blobs/${d.algo}/${d.hashHexString}/${d.sizeBytes}" + s"$instanceName/uploads/$uuid/blobs/${d.hashHexString}/${d.sizeBytes}" /** * resource name is load-bearing. * https://github.com/bazelbuild/remote-apis/blob/main/build/bazel/remote/execution/v2/remote_execution.proto#L294-L295 */ private def downloadName(d: Digest): String = - s"$instanceName/blobs/${d.algo}/${d.hashHexString}/${d.sizeBytes}" + s"$instanceName/blobs/${d.hashHexString}/${d.sizeBytes}" /** * https://github.com/bazelbuild/remote-apis/blob/96942a2107c702ed3ca4a664f7eeb7c85ba8dc77/build/bazel/remote/execution/v2/remote_execution.proto#L1629 @@ -432,4 +448,14 @@ class GrpcActionCacheStore( readBytes > 0 do () out.toByteString() + + private def transfer(data: ByteString, out: OutputStream): Unit = + val bufferSize = 8192 + Using.resource(data.newInput()): input => + val buf = new Array[Byte](bufferSize) + while + val readBytes = input.read(buf) + if readBytes >= 0 then out.write(buf, 0, readBytes) + readBytes >= 0 + do () end GrpcActionCacheStore