From ef90ca0540f82c2f933a82851361682a41bed8df Mon Sep 17 00:00:00 2001 From: Clayton <118192227+claytonlin1110@users.noreply.github.com> Date: Wed, 15 Apr 2026 23:58:06 -0500 Subject: [PATCH] [2.x] feat: Forward test events to listeners as they are emitted (#9087) Test report listeners now receive testEvent callbacks as the underlying test framework emits events, instead of only after a whole group finishes. startGroup / endGroup timing is aligned with that streaming model for both in-process and forked test runs. No public API changes - only listener callback timing and the internal worker <-> sbt JSON-RPC protocol. --- .../src/main/scala/sbt/ForkTests.scala | 28 ++++-- .../scala/sbt/internal/WorkerExchange.scala | 2 + .../tests/stream-test-events/build.sbt | 80 +++++++++++++++++ .../scala/custom/StreamingFramework.scala | 68 +++++++++++++++ .../scala/custom/StreamingFramework.scala | 68 +++++++++++++++ .../sbt-test/tests/stream-test-events/test | 6 ++ .../src/main/scala/sbt/TestFramework.scala | 10 ++- .../sbt/internal/worker1/ForkTestMain.java | 85 +++++++++++++------ 8 files changed, 313 insertions(+), 34 deletions(-) create mode 100644 sbt-app/src/sbt-test/tests/stream-test-events/build.sbt create mode 100644 sbt-app/src/sbt-test/tests/stream-test-events/forked/src/test/scala/custom/StreamingFramework.scala create mode 100644 sbt-app/src/sbt-test/tests/stream-test-events/inproc/src/test/scala/custom/StreamingFramework.scala create mode 100644 sbt-app/src/sbt-test/tests/stream-test-events/test diff --git a/main-actions/src/main/scala/sbt/ForkTests.scala b/main-actions/src/main/scala/sbt/ForkTests.scala index 08590ed08..06180a28c 100755 --- a/main-actions/src/main/scala/sbt/ForkTests.scala +++ b/main-actions/src/main/scala/sbt/ForkTests.scala @@ -194,6 +194,9 @@ private class React( ) extends WorkerResponseListener: val g = WorkerMain.mkGson() val promise: Promise[Int] = Promise() + + /** Events per test group, accumulated for [[SuiteResult]] (listeners get each event immediately). */ + private val progressEvents = mutable.Map.empty[String, mutable.ArrayBuffer[testing.Event]] override def apply(line: String): Unit = try val o = JsonParser.parseString(line).getAsJsonObject() @@ -235,15 +238,30 @@ private class React( case ForkTags.Debug => log.debug(info.message) case _ => () else () - case "testEvents" => + case "startTestGroup" => + val params = o.getAsJsonObject("params") + val info = + g.fromJson[ForkTestMain.ForkGroupStart](params, classOf[ForkTestMain.ForkGroupStart]) + if info.id == id then + progressEvents(info.group) = mutable.ArrayBuffer.empty + listeners.foreach(_.startGroup(info.group)) + else () + case "testProgress" => val params = o.getAsJsonObject("params") val info = g.fromJson[ForkTestMain.ForkEventsInfo](params, classOf[ForkTestMain.ForkEventsInfo]) if info.id == id then - val events = info.events.asScala.toSeq - listeners.foreach(_.startGroup(info.group)) - val event = TestEvent(events) - listeners.foreach(_.testEvent(event)) + val buf = progressEvents.getOrElseUpdate(info.group, mutable.ArrayBuffer.empty) + for e <- info.events.asScala do + buf += e + listeners.foreach(_.testEvent(TestEvent(Seq(e)))) + else () + case "endTestGroup" => + val params = o.getAsJsonObject("params") + val info = + g.fromJson[ForkTestMain.ForkGroupEnd](params, classOf[ForkTestMain.ForkGroupEnd]) + if info.id == id then + val events = progressEvents.remove(info.group).getOrElse(mutable.ArrayBuffer.empty).toSeq val suiteResult = SuiteResult(events) results += info.group -> suiteResult listeners.foreach(_.endGroup(info.group, suiteResult.result)) diff --git a/main-actions/src/main/scala/sbt/internal/WorkerExchange.scala b/main-actions/src/main/scala/sbt/internal/WorkerExchange.scala index 6e91c6636..979bf8aae 100644 --- a/main-actions/src/main/scala/sbt/internal/WorkerExchange.scala +++ b/main-actions/src/main/scala/sbt/internal/WorkerExchange.scala @@ -50,6 +50,8 @@ object WorkerExchange: val scanner = Scanner(socket.getInputStream(), "UTF-8") while scanner.hasNextLine() do notifyListeners(scanner.nextLine()) }) + accepter.setName("sbt-fork-test-response-reader") + accepter.setPriority(Thread.NORM_PRIORITY + 1) accepter.start() Some(serverSocket) case _ => None diff --git a/sbt-app/src/sbt-test/tests/stream-test-events/build.sbt b/sbt-app/src/sbt-test/tests/stream-test-events/build.sbt new file mode 100644 index 000000000..e3f20da9d --- /dev/null +++ b/sbt-app/src/sbt-test/tests/stream-test-events/build.sbt @@ -0,0 +1,80 @@ +import java.util.concurrent.atomic.AtomicLong +import sbt.protocol.testing.TestResult + +val startAtNs = collection.concurrent.TrieMap.empty[String, AtomicLong] +val eventCallbackCount = collection.concurrent.TrieMap.empty[String, AtomicLong] +val maxDetailSize = collection.concurrent.TrieMap.empty[String, AtomicLong] +val endSeen = collection.concurrent.TrieMap.empty[String, AtomicLong] + +def counter(map: collection.concurrent.TrieMap[String, AtomicLong], key: String): AtomicLong = + map.getOrElseUpdate(key, new AtomicLong(0L)) + +def resetCounters(key: String): Unit = { + counter(startAtNs, key).set(0L) + counter(eventCallbackCount, key).set(0L) + counter(maxDetailSize, key).set(0L) + counter(endSeen, key).set(0L) +} + +def streamingListener(label: String): TestReportListener = new TestReportListener { + def startGroup(name: String): Unit = + counter(startAtNs, label).compareAndSet(0L, System.nanoTime()) + + def testEvent(event: TestEvent): Unit = { + counter(eventCallbackCount, label).incrementAndGet() + val detailSize = event.detail.size.toLong + val maxSeen = counter(maxDetailSize, label) + var done = false + while (!done) { + val current = maxSeen.get() + if (detailSize <= current) done = true + else done = maxSeen.compareAndSet(current, detailSize) + } + } + + def endGroup(name: String, t: Throwable): Unit = + counter(endSeen, label).set(1L) + + def endGroup(name: String, result: TestResult): Unit = + counter(endSeen, label).set(1L) +} + +lazy val resetListener = taskKey[Unit]("Reset listener state for timing checks") +lazy val checkStreaming = taskKey[Unit]("Assert test events are received before endGroup") + +ThisBuild / scalaVersion := "2.12.21" + +def commonSettings(label: String): Seq[Def.Setting[_]] = + Seq( + libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0" % Test, + Test / testFrameworks := Seq(new TestFramework("custom.StreamingFramework")), + Test / parallelExecution := false, + testListeners += streamingListener(label), + resetListener := resetCounters(label), + checkStreaming := { + val startNs = counter(startAtNs, label).get() + val callbacks = counter(eventCallbackCount, label).get() + val largestDetail = counter(maxDetailSize, label).get() + val endWasSeen = counter(endSeen, label).get() + if (startNs == 0L) sys.error("startGroup was never called") + if (endWasSeen == 0L) sys.error("endGroup was never called") + if (callbacks < 2L) + sys.error("Expected at least two testEvent callbacks, saw " + callbacks) + if (largestDetail > 1L) + sys.error( + "Expected streamed test events with detail size 1, largest detail size was " + largestDetail + ) + } + ) + +lazy val inproc = (project in file("inproc")) + .settings(commonSettings("inproc"): _*) + .settings(Test / fork := false) + +lazy val forked = (project in file("forked")) + .settings(commonSettings("forked"): _*) + .settings(Test / fork := true) + +lazy val root = (project in file(".")) + .aggregate(inproc, forked) + .settings(publish / skip := true) diff --git a/sbt-app/src/sbt-test/tests/stream-test-events/forked/src/test/scala/custom/StreamingFramework.scala b/sbt-app/src/sbt-test/tests/stream-test-events/forked/src/test/scala/custom/StreamingFramework.scala new file mode 100644 index 000000000..2564d9544 --- /dev/null +++ b/sbt-app/src/sbt-test/tests/stream-test-events/forked/src/test/scala/custom/StreamingFramework.scala @@ -0,0 +1,68 @@ +package custom + +import sbt.testing._ + +trait StreamTest + +final class SampleTest extends StreamTest + +final class StreamingFramework extends Framework { + def name(): String = "StreamingFramework" + + def fingerprints(): Array[Fingerprint] = + Array( + new SubclassFingerprint { + def isModule(): Boolean = false + def superclassName(): String = "custom.StreamTest" + def requireNoArgConstructor(): Boolean = true + } + ) + + def runner( + args: Array[String], + remoteArgs: Array[String], + testClassLoader: ClassLoader + ): Runner = + new StreamingRunner +} + +final class StreamingRunner extends Runner { + def tasks(taskDefs: Array[TaskDef]): Array[Task] = + taskDefs.map(new StreamingTask(_)) + + def done(): String = "" + + def args(): Array[String] = Array.empty + + def remoteArgs(): Array[String] = Array.empty + + def receiveMessage(msg: String): Option[String] = None + + def serializeTask(task: Task, serializer: TaskDef => String): String = + serializer(task.taskDef()) + + def deserializeTask(task: String, deserializer: String => TaskDef): Task = + new StreamingTask(deserializer(task)) +} + +final class StreamingTask(td: TaskDef) extends Task { + def taskDef(): TaskDef = td + + def tags(): Array[String] = Array.empty + + def execute(handler: EventHandler, loggers: Array[Logger]): Array[Task] = { + handler.handle(new StreamingEvent(td, "first")) + Thread.sleep(1200L) + handler.handle(new StreamingEvent(td, "second")) + Array.empty + } +} + +final class StreamingEvent(td: TaskDef, testName: String) extends Event { + def fullyQualifiedName(): String = td.fullyQualifiedName() + def fingerprint(): Fingerprint = td.fingerprint() + def selector(): Selector = new TestSelector(testName) + def status(): Status = Status.Success + def throwable(): OptionalThrowable = new OptionalThrowable() + def duration(): Long = 0L +} \ No newline at end of file diff --git a/sbt-app/src/sbt-test/tests/stream-test-events/inproc/src/test/scala/custom/StreamingFramework.scala b/sbt-app/src/sbt-test/tests/stream-test-events/inproc/src/test/scala/custom/StreamingFramework.scala new file mode 100644 index 000000000..2564d9544 --- /dev/null +++ b/sbt-app/src/sbt-test/tests/stream-test-events/inproc/src/test/scala/custom/StreamingFramework.scala @@ -0,0 +1,68 @@ +package custom + +import sbt.testing._ + +trait StreamTest + +final class SampleTest extends StreamTest + +final class StreamingFramework extends Framework { + def name(): String = "StreamingFramework" + + def fingerprints(): Array[Fingerprint] = + Array( + new SubclassFingerprint { + def isModule(): Boolean = false + def superclassName(): String = "custom.StreamTest" + def requireNoArgConstructor(): Boolean = true + } + ) + + def runner( + args: Array[String], + remoteArgs: Array[String], + testClassLoader: ClassLoader + ): Runner = + new StreamingRunner +} + +final class StreamingRunner extends Runner { + def tasks(taskDefs: Array[TaskDef]): Array[Task] = + taskDefs.map(new StreamingTask(_)) + + def done(): String = "" + + def args(): Array[String] = Array.empty + + def remoteArgs(): Array[String] = Array.empty + + def receiveMessage(msg: String): Option[String] = None + + def serializeTask(task: Task, serializer: TaskDef => String): String = + serializer(task.taskDef()) + + def deserializeTask(task: String, deserializer: String => TaskDef): Task = + new StreamingTask(deserializer(task)) +} + +final class StreamingTask(td: TaskDef) extends Task { + def taskDef(): TaskDef = td + + def tags(): Array[String] = Array.empty + + def execute(handler: EventHandler, loggers: Array[Logger]): Array[Task] = { + handler.handle(new StreamingEvent(td, "first")) + Thread.sleep(1200L) + handler.handle(new StreamingEvent(td, "second")) + Array.empty + } +} + +final class StreamingEvent(td: TaskDef, testName: String) extends Event { + def fullyQualifiedName(): String = td.fullyQualifiedName() + def fingerprint(): Fingerprint = td.fingerprint() + def selector(): Selector = new TestSelector(testName) + def status(): Status = Status.Success + def throwable(): OptionalThrowable = new OptionalThrowable() + def duration(): Long = 0L +} \ No newline at end of file diff --git a/sbt-app/src/sbt-test/tests/stream-test-events/test b/sbt-app/src/sbt-test/tests/stream-test-events/test new file mode 100644 index 000000000..f655ede5f --- /dev/null +++ b/sbt-app/src/sbt-test/tests/stream-test-events/test @@ -0,0 +1,6 @@ +> inproc/resetListener +> inproc/test +> inproc/checkStreaming +> forked/resetListener +> forked/test +> forked/checkStreaming diff --git a/testing/src/main/scala/sbt/TestFramework.scala b/testing/src/main/scala/sbt/TestFramework.scala index 9d208cfe2..883b3f96c 100644 --- a/testing/src/main/scala/sbt/TestFramework.scala +++ b/testing/src/main/scala/sbt/TestFramework.scala @@ -135,7 +135,12 @@ private[sbt] final class TestRunner( // Thread-safe collection so AsyncFunSuite (and other async frameworks) can call // handle() from multiple threads without corrupting results (fixes #5245). val results = new CopyOnWriteArrayList[Event] - val handler = new EventHandler { def handle(e: Event): Unit = { results.add(e) } } + val handler = new EventHandler { + def handle(e: Event): Unit = { + results.add(e) + safeListenersCall(_.testEvent(TestEvent(Seq(e)))) + } + } val loggers: Vector[ContentLogger] = listeners.flatMap(_.contentLogger(testDefinition)) def errorEvents(e: Throwable): Array[sbt.testing.Task] = { val taskDef = testTask.taskDef @@ -148,6 +153,7 @@ private[sbt] final class TestRunner( val duration = -1L } results.add(event) + safeListenersCall(_.testEvent(TestEvent(Seq(event)))) Array.empty } val nestedTasks = @@ -160,8 +166,6 @@ private[sbt] final class TestRunner( loggers.foreach(_.flush()) } val resultsList = results.asScala.toList - val event = TestEvent(resultsList) - safeListenersCall(_.testEvent(event)) (SuiteResult(resultsList), nestedTasks.toSeq) } diff --git a/worker/src/main/java/sbt/internal/worker1/ForkTestMain.java b/worker/src/main/java/sbt/internal/worker1/ForkTestMain.java index ce18a426f..27f4e9ca1 100644 --- a/worker/src/main/java/sbt/internal/worker1/ForkTestMain.java +++ b/worker/src/main/java/sbt/internal/worker1/ForkTestMain.java @@ -12,7 +12,7 @@ import java.io.PrintStream; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; +import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; import java.util.concurrent.*; @@ -140,6 +140,26 @@ public class ForkTestMain { } } + public static class ForkGroupStart implements Serializable { + public long id; + public String group; + + public ForkGroupStart(long id, String group) { + this.id = id; + this.group = group; + } + } + + public static class ForkGroupEnd implements Serializable { + public long id; + public String group; + + public ForkGroupEnd(long id, String group) { + this.id = id; + this.group = group; + } + } + // ----------------------------------------------------------------------------- public static final class ForkError extends Exception { @@ -296,16 +316,38 @@ public class ForkTestMain { }; } - private void writeEvents(final TaskDef taskDef, final ForkEvent[] events) { + private void writeGroupStart(final TaskDef taskDef) { + ForkGroupStart info = new ForkGroupStart(this.id, taskDef.fullyQualifiedName()); + String params = this.gson.toJson(info, ForkGroupStart.class); + String notification = + String.format( + "{ \"jsonrpc\": \"2.0\", \"method\": \"startTestGroup\", \"params\": %s, \"re\": %d }", + params, this.id); + this.originalOut.println(notification); + this.originalOut.flush(); + } + + private void writeTestProgress(final TaskDef taskDef, final ForkEvent event) { ForkEventsInfo info = new ForkEventsInfo( this.id, taskDef.fullyQualifiedName(), - new ArrayList(Arrays.asList(events))); + new ArrayList(Collections.singletonList(event))); String params = this.gson.toJson(info, ForkEventsInfo.class); String notification = String.format( - "{ \"jsonrpc\": \"2.0\", \"method\": \"testEvents\", \"params\": %s, \"re\": %d }", + "{ \"jsonrpc\": \"2.0\", \"method\": \"testProgress\", \"params\": %s, \"re\": %d }", + params, this.id); + this.originalOut.println(notification); + this.originalOut.flush(); + } + + private void writeGroupEnd(final TaskDef taskDef) { + ForkGroupEnd info = new ForkGroupEnd(this.id, taskDef.fullyQualifiedName()); + String params = this.gson.toJson(info, ForkGroupEnd.class); + String notification = + String.format( + "{ \"jsonrpc\": \"2.0\", \"method\": \"endTestGroup\", \"params\": %s, \"re\": %d }", params, this.id); this.originalOut.println(notification); this.originalOut.flush(); @@ -414,41 +456,32 @@ public class ForkTestMain { final ExecutorService executor, final Task task, final Logger[] loggers) { return executor.submit( () -> { - ForkEvent[] events; Task[] nestedTasks; final TaskDef taskDef = task.taskDef(); + writeGroupStart(taskDef); try { - final Collection eventList = new ConcurrentLinkedDeque<>(); final EventHandler handler = new EventHandler() { public void handle(final Event e) { - eventList.add(new ForkEvent(e)); + writeTestProgress(taskDef, new ForkEvent(e)); } }; logDebug(" Running " + taskDef); nestedTasks = task.execute(handler, loggers); - if (nestedTasks.length > 0 || eventList.size() > 0) - logDebug( - " Produced " - + nestedTasks.length - + " nested tasks and " - + eventList.size() - + " events."); - events = eventList.toArray(new ForkEvent[eventList.size()]); + logDebug(" Produced " + nestedTasks.length + " nested tasks (events streamed)."); } catch (final Throwable t) { nestedTasks = new Task[0]; - events = - new ForkEvent[] { - testError( - taskDef, - "Uncaught exception when running " - + taskDef.fullyQualifiedName() - + ": " - + t.toString(), - t) - }; + writeTestProgress( + taskDef, + testError( + taskDef, + "Uncaught exception when running " + + taskDef.fullyQualifiedName() + + ": " + + t.toString(), + t)); } - writeEvents(taskDef, events); + writeGroupEnd(taskDef); return nestedTasks; }); }