From 8b80903905df5ee0d6e1ec1cdc7af57dc09fe24f Mon Sep 17 00:00:00 2001 From: Clayton Date: Mon, 13 Apr 2026 15:55:48 -0500 Subject: [PATCH] fix: forward test events to listeners as they are emitted --- .../src/main/scala/sbt/ForkTests.scala | 27 ++++-- .../scala/sbt/internal/WorkerExchange.scala | 2 + .../src/main/scala/sbt/TestFramework.scala | 10 ++- .../sbt/internal/worker1/ForkTestMain.java | 85 +++++++++++++------ 4 files changed, 90 insertions(+), 34 deletions(-) diff --git a/main-actions/src/main/scala/sbt/ForkTests.scala b/main-actions/src/main/scala/sbt/ForkTests.scala index 08590ed08..198e995fc 100755 --- a/main-actions/src/main/scala/sbt/ForkTests.scala +++ b/main-actions/src/main/scala/sbt/ForkTests.scala @@ -194,6 +194,8 @@ private class React( ) extends WorkerResponseListener: val g = WorkerMain.mkGson() val promise: Promise[Int] = Promise() + /** Events per test group, accumulated for [[SuiteResult]] (listeners get each event immediately). */ + private val progressEvents = mutable.Map.empty[String, mutable.ArrayBuffer[testing.Event]] override def apply(line: String): Unit = try val o = JsonParser.parseString(line).getAsJsonObject() @@ -235,15 +237,30 @@ private class React( case ForkTags.Debug => log.debug(info.message) case _ => () else () - case "testEvents" => + case "startTestGroup" => + val params = o.getAsJsonObject("params") + val info = + g.fromJson[ForkTestMain.ForkGroupStart](params, classOf[ForkTestMain.ForkGroupStart]) + if info.id == id then + progressEvents(info.group) = mutable.ArrayBuffer.empty + listeners.foreach(_.startGroup(info.group)) + else () + case "testProgress" => val params = o.getAsJsonObject("params") val info = g.fromJson[ForkTestMain.ForkEventsInfo](params, classOf[ForkTestMain.ForkEventsInfo]) if info.id == id then - val events = info.events.asScala.toSeq - listeners.foreach(_.startGroup(info.group)) - val event = TestEvent(events) - listeners.foreach(_.testEvent(event)) + val buf = progressEvents.getOrElseUpdate(info.group, mutable.ArrayBuffer.empty) + for e <- info.events.asScala do + buf += e + listeners.foreach(_.testEvent(TestEvent(Seq(e)))) + else () + case "endTestGroup" => + val params = o.getAsJsonObject("params") + val info = + g.fromJson[ForkTestMain.ForkGroupEnd](params, classOf[ForkTestMain.ForkGroupEnd]) + if info.id == id then + val events = progressEvents.remove(info.group).getOrElse(mutable.ArrayBuffer.empty).toSeq val suiteResult = SuiteResult(events) results += info.group -> suiteResult listeners.foreach(_.endGroup(info.group, suiteResult.result)) diff --git a/main-actions/src/main/scala/sbt/internal/WorkerExchange.scala b/main-actions/src/main/scala/sbt/internal/WorkerExchange.scala index 6e91c6636..979bf8aae 100644 --- a/main-actions/src/main/scala/sbt/internal/WorkerExchange.scala +++ b/main-actions/src/main/scala/sbt/internal/WorkerExchange.scala @@ -50,6 +50,8 @@ object WorkerExchange: val scanner = Scanner(socket.getInputStream(), "UTF-8") while scanner.hasNextLine() do notifyListeners(scanner.nextLine()) }) + accepter.setName("sbt-fork-test-response-reader") + accepter.setPriority(Thread.NORM_PRIORITY + 1) accepter.start() Some(serverSocket) case _ => None diff --git a/testing/src/main/scala/sbt/TestFramework.scala b/testing/src/main/scala/sbt/TestFramework.scala index 9d208cfe2..883b3f96c 100644 --- a/testing/src/main/scala/sbt/TestFramework.scala +++ b/testing/src/main/scala/sbt/TestFramework.scala @@ -135,7 +135,12 @@ private[sbt] final class TestRunner( // Thread-safe collection so AsyncFunSuite (and other async frameworks) can call // handle() from multiple threads without corrupting results (fixes #5245). val results = new CopyOnWriteArrayList[Event] - val handler = new EventHandler { def handle(e: Event): Unit = { results.add(e) } } + val handler = new EventHandler { + def handle(e: Event): Unit = { + results.add(e) + safeListenersCall(_.testEvent(TestEvent(Seq(e)))) + } + } val loggers: Vector[ContentLogger] = listeners.flatMap(_.contentLogger(testDefinition)) def errorEvents(e: Throwable): Array[sbt.testing.Task] = { val taskDef = testTask.taskDef @@ -148,6 +153,7 @@ private[sbt] final class TestRunner( val duration = -1L } results.add(event) + safeListenersCall(_.testEvent(TestEvent(Seq(event)))) Array.empty } val nestedTasks = @@ -160,8 +166,6 @@ private[sbt] final class TestRunner( loggers.foreach(_.flush()) } val resultsList = results.asScala.toList - val event = TestEvent(resultsList) - safeListenersCall(_.testEvent(event)) (SuiteResult(resultsList), nestedTasks.toSeq) } diff --git a/worker/src/main/java/sbt/internal/worker1/ForkTestMain.java b/worker/src/main/java/sbt/internal/worker1/ForkTestMain.java index ce18a426f..27f4e9ca1 100644 --- a/worker/src/main/java/sbt/internal/worker1/ForkTestMain.java +++ b/worker/src/main/java/sbt/internal/worker1/ForkTestMain.java @@ -12,7 +12,7 @@ import java.io.PrintStream; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; +import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; import java.util.concurrent.*; @@ -140,6 +140,26 @@ public class ForkTestMain { } } + public static class ForkGroupStart implements Serializable { + public long id; + public String group; + + public ForkGroupStart(long id, String group) { + this.id = id; + this.group = group; + } + } + + public static class ForkGroupEnd implements Serializable { + public long id; + public String group; + + public ForkGroupEnd(long id, String group) { + this.id = id; + this.group = group; + } + } + // ----------------------------------------------------------------------------- public static final class ForkError extends Exception { @@ -296,16 +316,38 @@ public class ForkTestMain { }; } - private void writeEvents(final TaskDef taskDef, final ForkEvent[] events) { + private void writeGroupStart(final TaskDef taskDef) { + ForkGroupStart info = new ForkGroupStart(this.id, taskDef.fullyQualifiedName()); + String params = this.gson.toJson(info, ForkGroupStart.class); + String notification = + String.format( + "{ \"jsonrpc\": \"2.0\", \"method\": \"startTestGroup\", \"params\": %s, \"re\": %d }", + params, this.id); + this.originalOut.println(notification); + this.originalOut.flush(); + } + + private void writeTestProgress(final TaskDef taskDef, final ForkEvent event) { ForkEventsInfo info = new ForkEventsInfo( this.id, taskDef.fullyQualifiedName(), - new ArrayList(Arrays.asList(events))); + new ArrayList(Collections.singletonList(event))); String params = this.gson.toJson(info, ForkEventsInfo.class); String notification = String.format( - "{ \"jsonrpc\": \"2.0\", \"method\": \"testEvents\", \"params\": %s, \"re\": %d }", + "{ \"jsonrpc\": \"2.0\", \"method\": \"testProgress\", \"params\": %s, \"re\": %d }", + params, this.id); + this.originalOut.println(notification); + this.originalOut.flush(); + } + + private void writeGroupEnd(final TaskDef taskDef) { + ForkGroupEnd info = new ForkGroupEnd(this.id, taskDef.fullyQualifiedName()); + String params = this.gson.toJson(info, ForkGroupEnd.class); + String notification = + String.format( + "{ \"jsonrpc\": \"2.0\", \"method\": \"endTestGroup\", \"params\": %s, \"re\": %d }", params, this.id); this.originalOut.println(notification); this.originalOut.flush(); @@ -414,41 +456,32 @@ public class ForkTestMain { final ExecutorService executor, final Task task, final Logger[] loggers) { return executor.submit( () -> { - ForkEvent[] events; Task[] nestedTasks; final TaskDef taskDef = task.taskDef(); + writeGroupStart(taskDef); try { - final Collection eventList = new ConcurrentLinkedDeque<>(); final EventHandler handler = new EventHandler() { public void handle(final Event e) { - eventList.add(new ForkEvent(e)); + writeTestProgress(taskDef, new ForkEvent(e)); } }; logDebug(" Running " + taskDef); nestedTasks = task.execute(handler, loggers); - if (nestedTasks.length > 0 || eventList.size() > 0) - logDebug( - " Produced " - + nestedTasks.length - + " nested tasks and " - + eventList.size() - + " events."); - events = eventList.toArray(new ForkEvent[eventList.size()]); + logDebug(" Produced " + nestedTasks.length + " nested tasks (events streamed)."); } catch (final Throwable t) { nestedTasks = new Task[0]; - events = - new ForkEvent[] { - testError( - taskDef, - "Uncaught exception when running " - + taskDef.fullyQualifiedName() - + ": " - + t.toString(), - t) - }; + writeTestProgress( + taskDef, + testError( + taskDef, + "Uncaught exception when running " + + taskDef.fullyQualifiedName() + + ": " + + t.toString(), + t)); } - writeEvents(taskDef, events); + writeGroupEnd(taskDef); return nestedTasks; }); }