fix: Use promise to wait for forked test

**Problem**
I'm seeing NPE in some test-related tests like tests/junit:
[info] [error] java.lang.NullPointerException: Null output stream
[info] [error] 	at java.io.PrintStream.requireNonNull(PrintStream.java:79)
[info] [error] 	at java.io.PrintStream.<init>(PrintStream.java:151)
[info] [error] 	at java.io.PrintStream.<init>(PrintStream.java:135)
[info] [error] 	at sbt.internal.WorkerProxy.inputStream$lzyINIT1(WorkerExchange.scala:69)
[info] [error] 	at sbt.internal.WorkerProxy.inputStream(WorkerExchange.scala:69)
[info] [error] 	at sbt.internal.WorkerProxy.println(WorkerExchange.scala:77)
[info] [error] 	at sbt.ForkTests$.mainTestTask$$anonfun$1(ForkTests.scala:160)

This is likely coming from ProcessBuilder#run not immediately processing the passed in processIO.

**Solution**
This uses Promise to wait for the processIO to be processed.
This commit is contained in:
Eugene Yokota 2025-08-02 15:14:22 -04:00
parent a8cd83a4f1
commit 9b1e644be1
1 changed files with 7 additions and 4 deletions

View File

@ -11,13 +11,14 @@ package internal
import com.google.gson.Gson
import java.io.*
import java.util.concurrent.atomic.AtomicReference
import sbt.io.IO
import sbt.internal.worker1.*
import sbt.testing.Framework
import scala.sys.process.{ BasicIO, Process, ProcessIO }
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.concurrent.{ Await, Promise }
import scala.concurrent.duration.*
object WorkerExchange:
val listeners: mutable.ListBuffer[WorkerResponseListener] = ListBuffer.empty
@ -36,15 +37,17 @@ object WorkerExchange:
fullCp.mkString(File.pathSeparator),
classOf[WorkerMain].getCanonicalName,
)
val inputRef = AtomicReference[OutputStream]()
val inputRef = Promise[OutputStream]()
val processIo = ProcessIO(
in = (input) => inputRef.set(input),
in = (input) => inputRef.success(input),
out = BasicIO.processFully(onStdoutLine),
err = BasicIO.processFully((line) => scala.Console.err.println(line)),
)
val forkWithIo = fo.withOutputStrategy(OutputStrategy.CustomInputOutput(processIo))
val p = Fork.java.fork(forkWithIo, options)
WorkerProxy(inputRef.get(), p, options)
val forkTimeout = 30.seconds
val input = Await.result(inputRef.future, forkTimeout)
WorkerProxy(input, p, options)
def registerListener(listener: WorkerResponseListener): Unit =
synchronized: