fix: forward test events to listeners as they are emitted

This commit is contained in:
Clayton 2026-04-13 15:55:48 -05:00
parent 8bcc6ae420
commit 8b80903905
4 changed files with 90 additions and 34 deletions

View File

@ -194,6 +194,8 @@ private class React(
) extends WorkerResponseListener:
val g = WorkerMain.mkGson()
val promise: Promise[Int] = Promise()
/** Events per test group, accumulated for [[SuiteResult]] (listeners get each event immediately). */
private val progressEvents = mutable.Map.empty[String, mutable.ArrayBuffer[testing.Event]]
override def apply(line: String): Unit =
try
val o = JsonParser.parseString(line).getAsJsonObject()
@ -235,15 +237,30 @@ private class React(
case ForkTags.Debug => log.debug(info.message)
case _ => ()
else ()
case "testEvents" =>
case "startTestGroup" =>
val params = o.getAsJsonObject("params")
val info =
g.fromJson[ForkTestMain.ForkGroupStart](params, classOf[ForkTestMain.ForkGroupStart])
if info.id == id then
progressEvents(info.group) = mutable.ArrayBuffer.empty
listeners.foreach(_.startGroup(info.group))
else ()
case "testProgress" =>
val params = o.getAsJsonObject("params")
val info =
g.fromJson[ForkTestMain.ForkEventsInfo](params, classOf[ForkTestMain.ForkEventsInfo])
if info.id == id then
val events = info.events.asScala.toSeq
listeners.foreach(_.startGroup(info.group))
val event = TestEvent(events)
listeners.foreach(_.testEvent(event))
val buf = progressEvents.getOrElseUpdate(info.group, mutable.ArrayBuffer.empty)
for e <- info.events.asScala do
buf += e
listeners.foreach(_.testEvent(TestEvent(Seq(e))))
else ()
case "endTestGroup" =>
val params = o.getAsJsonObject("params")
val info =
g.fromJson[ForkTestMain.ForkGroupEnd](params, classOf[ForkTestMain.ForkGroupEnd])
if info.id == id then
val events = progressEvents.remove(info.group).getOrElse(mutable.ArrayBuffer.empty).toSeq
val suiteResult = SuiteResult(events)
results += info.group -> suiteResult
listeners.foreach(_.endGroup(info.group, suiteResult.result))

View File

@ -50,6 +50,8 @@ object WorkerExchange:
val scanner = Scanner(socket.getInputStream(), "UTF-8")
while scanner.hasNextLine() do notifyListeners(scanner.nextLine())
})
accepter.setName("sbt-fork-test-response-reader")
accepter.setPriority(Thread.NORM_PRIORITY + 1)
accepter.start()
Some(serverSocket)
case _ => None

View File

@ -135,7 +135,12 @@ private[sbt] final class TestRunner(
// Thread-safe collection so AsyncFunSuite (and other async frameworks) can call
// handle() from multiple threads without corrupting results (fixes #5245).
val results = new CopyOnWriteArrayList[Event]
val handler = new EventHandler { def handle(e: Event): Unit = { results.add(e) } }
val handler = new EventHandler {
def handle(e: Event): Unit = {
results.add(e)
safeListenersCall(_.testEvent(TestEvent(Seq(e))))
}
}
val loggers: Vector[ContentLogger] = listeners.flatMap(_.contentLogger(testDefinition))
def errorEvents(e: Throwable): Array[sbt.testing.Task] = {
val taskDef = testTask.taskDef
@ -148,6 +153,7 @@ private[sbt] final class TestRunner(
val duration = -1L
}
results.add(event)
safeListenersCall(_.testEvent(TestEvent(Seq(event))))
Array.empty
}
val nestedTasks =
@ -160,8 +166,6 @@ private[sbt] final class TestRunner(
loggers.foreach(_.flush())
}
val resultsList = results.asScala.toList
val event = TestEvent(resultsList)
safeListenersCall(_.testEvent(event))
(SuiteResult(resultsList), nestedTasks.toSeq)
}

View File

@ -12,7 +12,7 @@ import java.io.PrintStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.*;
@ -140,6 +140,26 @@ public class ForkTestMain {
}
}
public static class ForkGroupStart implements Serializable {
public long id;
public String group;
public ForkGroupStart(long id, String group) {
this.id = id;
this.group = group;
}
}
public static class ForkGroupEnd implements Serializable {
public long id;
public String group;
public ForkGroupEnd(long id, String group) {
this.id = id;
this.group = group;
}
}
// -----------------------------------------------------------------------------
public static final class ForkError extends Exception {
@ -296,16 +316,38 @@ public class ForkTestMain {
};
}
private void writeEvents(final TaskDef taskDef, final ForkEvent[] events) {
private void writeGroupStart(final TaskDef taskDef) {
ForkGroupStart info = new ForkGroupStart(this.id, taskDef.fullyQualifiedName());
String params = this.gson.toJson(info, ForkGroupStart.class);
String notification =
String.format(
"{ \"jsonrpc\": \"2.0\", \"method\": \"startTestGroup\", \"params\": %s, \"re\": %d }",
params, this.id);
this.originalOut.println(notification);
this.originalOut.flush();
}
private void writeTestProgress(final TaskDef taskDef, final ForkEvent event) {
ForkEventsInfo info =
new ForkEventsInfo(
this.id,
taskDef.fullyQualifiedName(),
new ArrayList<ForkEvent>(Arrays.asList(events)));
new ArrayList<ForkEvent>(Collections.singletonList(event)));
String params = this.gson.toJson(info, ForkEventsInfo.class);
String notification =
String.format(
"{ \"jsonrpc\": \"2.0\", \"method\": \"testEvents\", \"params\": %s, \"re\": %d }",
"{ \"jsonrpc\": \"2.0\", \"method\": \"testProgress\", \"params\": %s, \"re\": %d }",
params, this.id);
this.originalOut.println(notification);
this.originalOut.flush();
}
private void writeGroupEnd(final TaskDef taskDef) {
ForkGroupEnd info = new ForkGroupEnd(this.id, taskDef.fullyQualifiedName());
String params = this.gson.toJson(info, ForkGroupEnd.class);
String notification =
String.format(
"{ \"jsonrpc\": \"2.0\", \"method\": \"endTestGroup\", \"params\": %s, \"re\": %d }",
params, this.id);
this.originalOut.println(notification);
this.originalOut.flush();
@ -414,41 +456,32 @@ public class ForkTestMain {
final ExecutorService executor, final Task task, final Logger[] loggers) {
return executor.submit(
() -> {
ForkEvent[] events;
Task[] nestedTasks;
final TaskDef taskDef = task.taskDef();
writeGroupStart(taskDef);
try {
final Collection<ForkEvent> eventList = new ConcurrentLinkedDeque<>();
final EventHandler handler =
new EventHandler() {
public void handle(final Event e) {
eventList.add(new ForkEvent(e));
writeTestProgress(taskDef, new ForkEvent(e));
}
};
logDebug(" Running " + taskDef);
nestedTasks = task.execute(handler, loggers);
if (nestedTasks.length > 0 || eventList.size() > 0)
logDebug(
" Produced "
+ nestedTasks.length
+ " nested tasks and "
+ eventList.size()
+ " events.");
events = eventList.toArray(new ForkEvent[eventList.size()]);
logDebug(" Produced " + nestedTasks.length + " nested tasks (events streamed).");
} catch (final Throwable t) {
nestedTasks = new Task[0];
events =
new ForkEvent[] {
testError(
taskDef,
"Uncaught exception when running "
+ taskDef.fullyQualifiedName()
+ ": "
+ t.toString(),
t)
};
writeTestProgress(
taskDef,
testError(
taskDef,
"Uncaught exception when running "
+ taskDef.fullyQualifiedName()
+ ": "
+ t.toString(),
t));
}
writeEvents(taskDef, events);
writeGroupEnd(taskDef);
return nestedTasks;
});
}