mirror of https://github.com/sbt/sbt.git
[2.x] feat: Forward test events to listeners as they are emitted (#9087)
Test report listeners now receive testEvent callbacks as the underlying test framework emits events, instead of only after a whole group finishes. startGroup / endGroup timing is aligned with that streaming model for both in-process and forked test runs. No public API changes - only listener callback timing and the internal worker <-> sbt JSON-RPC protocol.
This commit is contained in:
parent
9d620f3b1f
commit
ef90ca0540
|
|
@ -194,6 +194,9 @@ private class React(
|
|||
) extends WorkerResponseListener:
|
||||
val g = WorkerMain.mkGson()
|
||||
val promise: Promise[Int] = Promise()
|
||||
|
||||
/** Events per test group, accumulated for [[SuiteResult]] (listeners get each event immediately). */
|
||||
private val progressEvents = mutable.Map.empty[String, mutable.ArrayBuffer[testing.Event]]
|
||||
override def apply(line: String): Unit =
|
||||
try
|
||||
val o = JsonParser.parseString(line).getAsJsonObject()
|
||||
|
|
@ -235,15 +238,30 @@ private class React(
|
|||
case ForkTags.Debug => log.debug(info.message)
|
||||
case _ => ()
|
||||
else ()
|
||||
case "testEvents" =>
|
||||
case "startTestGroup" =>
|
||||
val params = o.getAsJsonObject("params")
|
||||
val info =
|
||||
g.fromJson[ForkTestMain.ForkGroupStart](params, classOf[ForkTestMain.ForkGroupStart])
|
||||
if info.id == id then
|
||||
progressEvents(info.group) = mutable.ArrayBuffer.empty
|
||||
listeners.foreach(_.startGroup(info.group))
|
||||
else ()
|
||||
case "testProgress" =>
|
||||
val params = o.getAsJsonObject("params")
|
||||
val info =
|
||||
g.fromJson[ForkTestMain.ForkEventsInfo](params, classOf[ForkTestMain.ForkEventsInfo])
|
||||
if info.id == id then
|
||||
val events = info.events.asScala.toSeq
|
||||
listeners.foreach(_.startGroup(info.group))
|
||||
val event = TestEvent(events)
|
||||
listeners.foreach(_.testEvent(event))
|
||||
val buf = progressEvents.getOrElseUpdate(info.group, mutable.ArrayBuffer.empty)
|
||||
for e <- info.events.asScala do
|
||||
buf += e
|
||||
listeners.foreach(_.testEvent(TestEvent(Seq(e))))
|
||||
else ()
|
||||
case "endTestGroup" =>
|
||||
val params = o.getAsJsonObject("params")
|
||||
val info =
|
||||
g.fromJson[ForkTestMain.ForkGroupEnd](params, classOf[ForkTestMain.ForkGroupEnd])
|
||||
if info.id == id then
|
||||
val events = progressEvents.remove(info.group).getOrElse(mutable.ArrayBuffer.empty).toSeq
|
||||
val suiteResult = SuiteResult(events)
|
||||
results += info.group -> suiteResult
|
||||
listeners.foreach(_.endGroup(info.group, suiteResult.result))
|
||||
|
|
|
|||
|
|
@ -50,6 +50,8 @@ object WorkerExchange:
|
|||
val scanner = Scanner(socket.getInputStream(), "UTF-8")
|
||||
while scanner.hasNextLine() do notifyListeners(scanner.nextLine())
|
||||
})
|
||||
accepter.setName("sbt-fork-test-response-reader")
|
||||
accepter.setPriority(Thread.NORM_PRIORITY + 1)
|
||||
accepter.start()
|
||||
Some(serverSocket)
|
||||
case _ => None
|
||||
|
|
|
|||
|
|
@ -0,0 +1,80 @@
|
|||
import java.util.concurrent.atomic.AtomicLong
|
||||
import sbt.protocol.testing.TestResult
|
||||
|
||||
val startAtNs = collection.concurrent.TrieMap.empty[String, AtomicLong]
|
||||
val eventCallbackCount = collection.concurrent.TrieMap.empty[String, AtomicLong]
|
||||
val maxDetailSize = collection.concurrent.TrieMap.empty[String, AtomicLong]
|
||||
val endSeen = collection.concurrent.TrieMap.empty[String, AtomicLong]
|
||||
|
||||
def counter(map: collection.concurrent.TrieMap[String, AtomicLong], key: String): AtomicLong =
|
||||
map.getOrElseUpdate(key, new AtomicLong(0L))
|
||||
|
||||
def resetCounters(key: String): Unit = {
|
||||
counter(startAtNs, key).set(0L)
|
||||
counter(eventCallbackCount, key).set(0L)
|
||||
counter(maxDetailSize, key).set(0L)
|
||||
counter(endSeen, key).set(0L)
|
||||
}
|
||||
|
||||
def streamingListener(label: String): TestReportListener = new TestReportListener {
|
||||
def startGroup(name: String): Unit =
|
||||
counter(startAtNs, label).compareAndSet(0L, System.nanoTime())
|
||||
|
||||
def testEvent(event: TestEvent): Unit = {
|
||||
counter(eventCallbackCount, label).incrementAndGet()
|
||||
val detailSize = event.detail.size.toLong
|
||||
val maxSeen = counter(maxDetailSize, label)
|
||||
var done = false
|
||||
while (!done) {
|
||||
val current = maxSeen.get()
|
||||
if (detailSize <= current) done = true
|
||||
else done = maxSeen.compareAndSet(current, detailSize)
|
||||
}
|
||||
}
|
||||
|
||||
def endGroup(name: String, t: Throwable): Unit =
|
||||
counter(endSeen, label).set(1L)
|
||||
|
||||
def endGroup(name: String, result: TestResult): Unit =
|
||||
counter(endSeen, label).set(1L)
|
||||
}
|
||||
|
||||
lazy val resetListener = taskKey[Unit]("Reset listener state for timing checks")
|
||||
lazy val checkStreaming = taskKey[Unit]("Assert test events are received before endGroup")
|
||||
|
||||
ThisBuild / scalaVersion := "2.12.21"
|
||||
|
||||
def commonSettings(label: String): Seq[Def.Setting[_]] =
|
||||
Seq(
|
||||
libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0" % Test,
|
||||
Test / testFrameworks := Seq(new TestFramework("custom.StreamingFramework")),
|
||||
Test / parallelExecution := false,
|
||||
testListeners += streamingListener(label),
|
||||
resetListener := resetCounters(label),
|
||||
checkStreaming := {
|
||||
val startNs = counter(startAtNs, label).get()
|
||||
val callbacks = counter(eventCallbackCount, label).get()
|
||||
val largestDetail = counter(maxDetailSize, label).get()
|
||||
val endWasSeen = counter(endSeen, label).get()
|
||||
if (startNs == 0L) sys.error("startGroup was never called")
|
||||
if (endWasSeen == 0L) sys.error("endGroup was never called")
|
||||
if (callbacks < 2L)
|
||||
sys.error("Expected at least two testEvent callbacks, saw " + callbacks)
|
||||
if (largestDetail > 1L)
|
||||
sys.error(
|
||||
"Expected streamed test events with detail size 1, largest detail size was " + largestDetail
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
lazy val inproc = (project in file("inproc"))
|
||||
.settings(commonSettings("inproc"): _*)
|
||||
.settings(Test / fork := false)
|
||||
|
||||
lazy val forked = (project in file("forked"))
|
||||
.settings(commonSettings("forked"): _*)
|
||||
.settings(Test / fork := true)
|
||||
|
||||
lazy val root = (project in file("."))
|
||||
.aggregate(inproc, forked)
|
||||
.settings(publish / skip := true)
|
||||
|
|
@ -0,0 +1,68 @@
|
|||
package custom
|
||||
|
||||
import sbt.testing._
|
||||
|
||||
trait StreamTest
|
||||
|
||||
final class SampleTest extends StreamTest
|
||||
|
||||
final class StreamingFramework extends Framework {
|
||||
def name(): String = "StreamingFramework"
|
||||
|
||||
def fingerprints(): Array[Fingerprint] =
|
||||
Array(
|
||||
new SubclassFingerprint {
|
||||
def isModule(): Boolean = false
|
||||
def superclassName(): String = "custom.StreamTest"
|
||||
def requireNoArgConstructor(): Boolean = true
|
||||
}
|
||||
)
|
||||
|
||||
def runner(
|
||||
args: Array[String],
|
||||
remoteArgs: Array[String],
|
||||
testClassLoader: ClassLoader
|
||||
): Runner =
|
||||
new StreamingRunner
|
||||
}
|
||||
|
||||
final class StreamingRunner extends Runner {
|
||||
def tasks(taskDefs: Array[TaskDef]): Array[Task] =
|
||||
taskDefs.map(new StreamingTask(_))
|
||||
|
||||
def done(): String = ""
|
||||
|
||||
def args(): Array[String] = Array.empty
|
||||
|
||||
def remoteArgs(): Array[String] = Array.empty
|
||||
|
||||
def receiveMessage(msg: String): Option[String] = None
|
||||
|
||||
def serializeTask(task: Task, serializer: TaskDef => String): String =
|
||||
serializer(task.taskDef())
|
||||
|
||||
def deserializeTask(task: String, deserializer: String => TaskDef): Task =
|
||||
new StreamingTask(deserializer(task))
|
||||
}
|
||||
|
||||
final class StreamingTask(td: TaskDef) extends Task {
|
||||
def taskDef(): TaskDef = td
|
||||
|
||||
def tags(): Array[String] = Array.empty
|
||||
|
||||
def execute(handler: EventHandler, loggers: Array[Logger]): Array[Task] = {
|
||||
handler.handle(new StreamingEvent(td, "first"))
|
||||
Thread.sleep(1200L)
|
||||
handler.handle(new StreamingEvent(td, "second"))
|
||||
Array.empty
|
||||
}
|
||||
}
|
||||
|
||||
final class StreamingEvent(td: TaskDef, testName: String) extends Event {
|
||||
def fullyQualifiedName(): String = td.fullyQualifiedName()
|
||||
def fingerprint(): Fingerprint = td.fingerprint()
|
||||
def selector(): Selector = new TestSelector(testName)
|
||||
def status(): Status = Status.Success
|
||||
def throwable(): OptionalThrowable = new OptionalThrowable()
|
||||
def duration(): Long = 0L
|
||||
}
|
||||
|
|
@ -0,0 +1,68 @@
|
|||
package custom
|
||||
|
||||
import sbt.testing._
|
||||
|
||||
trait StreamTest
|
||||
|
||||
final class SampleTest extends StreamTest
|
||||
|
||||
final class StreamingFramework extends Framework {
|
||||
def name(): String = "StreamingFramework"
|
||||
|
||||
def fingerprints(): Array[Fingerprint] =
|
||||
Array(
|
||||
new SubclassFingerprint {
|
||||
def isModule(): Boolean = false
|
||||
def superclassName(): String = "custom.StreamTest"
|
||||
def requireNoArgConstructor(): Boolean = true
|
||||
}
|
||||
)
|
||||
|
||||
def runner(
|
||||
args: Array[String],
|
||||
remoteArgs: Array[String],
|
||||
testClassLoader: ClassLoader
|
||||
): Runner =
|
||||
new StreamingRunner
|
||||
}
|
||||
|
||||
final class StreamingRunner extends Runner {
|
||||
def tasks(taskDefs: Array[TaskDef]): Array[Task] =
|
||||
taskDefs.map(new StreamingTask(_))
|
||||
|
||||
def done(): String = ""
|
||||
|
||||
def args(): Array[String] = Array.empty
|
||||
|
||||
def remoteArgs(): Array[String] = Array.empty
|
||||
|
||||
def receiveMessage(msg: String): Option[String] = None
|
||||
|
||||
def serializeTask(task: Task, serializer: TaskDef => String): String =
|
||||
serializer(task.taskDef())
|
||||
|
||||
def deserializeTask(task: String, deserializer: String => TaskDef): Task =
|
||||
new StreamingTask(deserializer(task))
|
||||
}
|
||||
|
||||
final class StreamingTask(td: TaskDef) extends Task {
|
||||
def taskDef(): TaskDef = td
|
||||
|
||||
def tags(): Array[String] = Array.empty
|
||||
|
||||
def execute(handler: EventHandler, loggers: Array[Logger]): Array[Task] = {
|
||||
handler.handle(new StreamingEvent(td, "first"))
|
||||
Thread.sleep(1200L)
|
||||
handler.handle(new StreamingEvent(td, "second"))
|
||||
Array.empty
|
||||
}
|
||||
}
|
||||
|
||||
final class StreamingEvent(td: TaskDef, testName: String) extends Event {
|
||||
def fullyQualifiedName(): String = td.fullyQualifiedName()
|
||||
def fingerprint(): Fingerprint = td.fingerprint()
|
||||
def selector(): Selector = new TestSelector(testName)
|
||||
def status(): Status = Status.Success
|
||||
def throwable(): OptionalThrowable = new OptionalThrowable()
|
||||
def duration(): Long = 0L
|
||||
}
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
> inproc/resetListener
|
||||
> inproc/test
|
||||
> inproc/checkStreaming
|
||||
> forked/resetListener
|
||||
> forked/test
|
||||
> forked/checkStreaming
|
||||
|
|
@ -135,7 +135,12 @@ private[sbt] final class TestRunner(
|
|||
// Thread-safe collection so AsyncFunSuite (and other async frameworks) can call
|
||||
// handle() from multiple threads without corrupting results (fixes #5245).
|
||||
val results = new CopyOnWriteArrayList[Event]
|
||||
val handler = new EventHandler { def handle(e: Event): Unit = { results.add(e) } }
|
||||
val handler = new EventHandler {
|
||||
def handle(e: Event): Unit = {
|
||||
results.add(e)
|
||||
safeListenersCall(_.testEvent(TestEvent(Seq(e))))
|
||||
}
|
||||
}
|
||||
val loggers: Vector[ContentLogger] = listeners.flatMap(_.contentLogger(testDefinition))
|
||||
def errorEvents(e: Throwable): Array[sbt.testing.Task] = {
|
||||
val taskDef = testTask.taskDef
|
||||
|
|
@ -148,6 +153,7 @@ private[sbt] final class TestRunner(
|
|||
val duration = -1L
|
||||
}
|
||||
results.add(event)
|
||||
safeListenersCall(_.testEvent(TestEvent(Seq(event))))
|
||||
Array.empty
|
||||
}
|
||||
val nestedTasks =
|
||||
|
|
@ -160,8 +166,6 @@ private[sbt] final class TestRunner(
|
|||
loggers.foreach(_.flush())
|
||||
}
|
||||
val resultsList = results.asScala.toList
|
||||
val event = TestEvent(resultsList)
|
||||
safeListenersCall(_.testEvent(event))
|
||||
(SuiteResult(resultsList), nestedTasks.toSeq)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import java.io.PrintStream;
|
|||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.*;
|
||||
|
|
@ -140,6 +140,26 @@ public class ForkTestMain {
|
|||
}
|
||||
}
|
||||
|
||||
public static class ForkGroupStart implements Serializable {
|
||||
public long id;
|
||||
public String group;
|
||||
|
||||
public ForkGroupStart(long id, String group) {
|
||||
this.id = id;
|
||||
this.group = group;
|
||||
}
|
||||
}
|
||||
|
||||
public static class ForkGroupEnd implements Serializable {
|
||||
public long id;
|
||||
public String group;
|
||||
|
||||
public ForkGroupEnd(long id, String group) {
|
||||
this.id = id;
|
||||
this.group = group;
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
public static final class ForkError extends Exception {
|
||||
|
|
@ -296,16 +316,38 @@ public class ForkTestMain {
|
|||
};
|
||||
}
|
||||
|
||||
private void writeEvents(final TaskDef taskDef, final ForkEvent[] events) {
|
||||
private void writeGroupStart(final TaskDef taskDef) {
|
||||
ForkGroupStart info = new ForkGroupStart(this.id, taskDef.fullyQualifiedName());
|
||||
String params = this.gson.toJson(info, ForkGroupStart.class);
|
||||
String notification =
|
||||
String.format(
|
||||
"{ \"jsonrpc\": \"2.0\", \"method\": \"startTestGroup\", \"params\": %s, \"re\": %d }",
|
||||
params, this.id);
|
||||
this.originalOut.println(notification);
|
||||
this.originalOut.flush();
|
||||
}
|
||||
|
||||
private void writeTestProgress(final TaskDef taskDef, final ForkEvent event) {
|
||||
ForkEventsInfo info =
|
||||
new ForkEventsInfo(
|
||||
this.id,
|
||||
taskDef.fullyQualifiedName(),
|
||||
new ArrayList<ForkEvent>(Arrays.asList(events)));
|
||||
new ArrayList<ForkEvent>(Collections.singletonList(event)));
|
||||
String params = this.gson.toJson(info, ForkEventsInfo.class);
|
||||
String notification =
|
||||
String.format(
|
||||
"{ \"jsonrpc\": \"2.0\", \"method\": \"testEvents\", \"params\": %s, \"re\": %d }",
|
||||
"{ \"jsonrpc\": \"2.0\", \"method\": \"testProgress\", \"params\": %s, \"re\": %d }",
|
||||
params, this.id);
|
||||
this.originalOut.println(notification);
|
||||
this.originalOut.flush();
|
||||
}
|
||||
|
||||
private void writeGroupEnd(final TaskDef taskDef) {
|
||||
ForkGroupEnd info = new ForkGroupEnd(this.id, taskDef.fullyQualifiedName());
|
||||
String params = this.gson.toJson(info, ForkGroupEnd.class);
|
||||
String notification =
|
||||
String.format(
|
||||
"{ \"jsonrpc\": \"2.0\", \"method\": \"endTestGroup\", \"params\": %s, \"re\": %d }",
|
||||
params, this.id);
|
||||
this.originalOut.println(notification);
|
||||
this.originalOut.flush();
|
||||
|
|
@ -414,41 +456,32 @@ public class ForkTestMain {
|
|||
final ExecutorService executor, final Task task, final Logger[] loggers) {
|
||||
return executor.submit(
|
||||
() -> {
|
||||
ForkEvent[] events;
|
||||
Task[] nestedTasks;
|
||||
final TaskDef taskDef = task.taskDef();
|
||||
writeGroupStart(taskDef);
|
||||
try {
|
||||
final Collection<ForkEvent> eventList = new ConcurrentLinkedDeque<>();
|
||||
final EventHandler handler =
|
||||
new EventHandler() {
|
||||
public void handle(final Event e) {
|
||||
eventList.add(new ForkEvent(e));
|
||||
writeTestProgress(taskDef, new ForkEvent(e));
|
||||
}
|
||||
};
|
||||
logDebug(" Running " + taskDef);
|
||||
nestedTasks = task.execute(handler, loggers);
|
||||
if (nestedTasks.length > 0 || eventList.size() > 0)
|
||||
logDebug(
|
||||
" Produced "
|
||||
+ nestedTasks.length
|
||||
+ " nested tasks and "
|
||||
+ eventList.size()
|
||||
+ " events.");
|
||||
events = eventList.toArray(new ForkEvent[eventList.size()]);
|
||||
logDebug(" Produced " + nestedTasks.length + " nested tasks (events streamed).");
|
||||
} catch (final Throwable t) {
|
||||
nestedTasks = new Task[0];
|
||||
events =
|
||||
new ForkEvent[] {
|
||||
testError(
|
||||
taskDef,
|
||||
"Uncaught exception when running "
|
||||
+ taskDef.fullyQualifiedName()
|
||||
+ ": "
|
||||
+ t.toString(),
|
||||
t)
|
||||
};
|
||||
writeTestProgress(
|
||||
taskDef,
|
||||
testError(
|
||||
taskDef,
|
||||
"Uncaught exception when running "
|
||||
+ taskDef.fullyQualifiedName()
|
||||
+ ": "
|
||||
+ t.toString(),
|
||||
t));
|
||||
}
|
||||
writeEvents(taskDef, events);
|
||||
writeGroupEnd(taskDef);
|
||||
return nestedTasks;
|
||||
});
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue