From 061145e67b4cc731f87df4d5ada7c94d3f672722 Mon Sep 17 00:00:00 2001 From: MkDev11 Date: Fri, 9 Jan 2026 09:43:50 -0800 Subject: [PATCH] [2.x] Add testForkedParallelism setting for forked test thread count (#8453) **Problems** When running forked tests, sbt uses `Runtime.getRuntime().availableProcessors()` to determine the thread pool size, ignoring `concurrentRestrictions`. This is inconsistent with non-forked parallel tests. **Expectations** Users should be able to control the number of parallel test threads in forked mode, similar to how `concurrentRestrictions` works for non-forked tests. **Notes** Added a new setting `testForkedParallelism` that allows explicit control: ```scala testForkedParallelism := Some(2) // Use 2 threads testForkedParallelism := None // Use availableProcessors() (default) ``` --- build.sbt | 1 + .../src/main/scala/sbt/ForkTests.scala | 18 ++++-- main/src/main/scala/sbt/Defaults.scala | 59 ++++++++++++++----- main/src/main/scala/sbt/Keys.scala | 1 + .../sbt/internal/worker1/ForkTestMain.java | 11 ++-- .../java/sbt/internal/worker1/TestInfo.java | 3 + 6 files changed, 66 insertions(+), 27 deletions(-) diff --git a/build.sbt b/build.sbt index 8cd469207..762b05261 100644 --- a/build.sbt +++ b/build.sbt @@ -439,6 +439,7 @@ lazy val workerProj = (project in file("worker")) mimaBinaryIssueFilters ++= Vector( exclude[MissingClassProblem]("com.google.gson.typeadapters.RuntimeTypeAdapterFactory"), exclude[IncompatibleResultTypeProblem]("sbt.internal.worker1.WorkerMain.mkGson"), + exclude[DirectMissingMethodProblem]("sbt.internal.worker1.TestInfo.this"), ), ) .configure(addSbtIOForTest) diff --git a/main-actions/src/main/scala/sbt/ForkTests.scala b/main-actions/src/main/scala/sbt/ForkTests.scala index 81d5ac606..25eb00c33 100755 --- a/main-actions/src/main/scala/sbt/ForkTests.scala +++ b/main-actions/src/main/scala/sbt/ForkTests.scala @@ -45,6 +45,7 @@ private[sbt] object ForkTests: converter: FileConverter, fork: ForkOptions, log: Logger, + parallelism: Option[Int], tags: (Tag, Int)* ): Task[TestOutput] = { import std.TaskExtra.* @@ -56,9 +57,10 @@ private[sbt] object ForkTests: if opts.tests.isEmpty then constant(TestOutput(TestResult.Passed, Map.empty[String, SuiteResult], Iterable.empty)) else - mainTestTask(runners, opts, classpath, converter, fork, log, config.parallel).tagw( - config.tags* - ) + mainTestTask(runners, opts, classpath, converter, fork, log, config.parallel, parallelism) + .tagw( + config.tags* + ) main.tagw(tags*).dependsOn(all(opts.setup)*) flatMap { results => all(opts.cleanup).join.map(_ => results) } @@ -72,10 +74,11 @@ private[sbt] object ForkTests: converter: FileConverter, fork: ForkOptions, log: Logger, + parallelism: Option[Int], tags: (Tag, Int)* ): Task[TestOutput] = { val opts = processOptions(config, tests, log) - apply(runners, opts, config, classpath, converter, fork, log, tags*) + apply(runners, opts, config, classpath, converter, fork, log, parallelism, tags*) } def apply( @@ -86,9 +89,10 @@ private[sbt] object ForkTests: converter: FileConverter, fork: ForkOptions, log: Logger, + parallelism: Option[Int], tag: Tag ): Task[TestOutput] = { - apply(runners, tests, config, classpath, converter, fork, log, tag -> 1) + apply(runners, tests, config, classpath, converter, fork, log, parallelism, tag -> 1) } private def mainTestTask( @@ -98,7 +102,8 @@ private[sbt] object ForkTests: converter: FileConverter, fork: ForkOptions, log: Logger, - parallel: Boolean + parallel: Boolean, + parallelism: Option[Int] ): Task[TestOutput] = std.TaskExtra.task { val testListeners = opts.testListeners.flatMap: @@ -148,6 +153,7 @@ private[sbt] object ForkTests: null, UTerminal.isAnsiSupported, parallel, + parallelism.map(Integer.valueOf).orNull, ArrayList(taskdefs.asJava), ArrayList(testRunners.asJava), ) diff --git a/main/src/main/scala/sbt/Defaults.scala b/main/src/main/scala/sbt/Defaults.scala index dd9f5a177..96747f085 100644 --- a/main/src/main/scala/sbt/Defaults.scala +++ b/main/src/main/scala/sbt/Defaults.scala @@ -201,6 +201,7 @@ object Defaults extends BuildCommon { javaHomes :== ListMap.empty, fullJavaHomes := CrossJava.expandJavaHomes(discoveredJavaHomes.value ++ javaHomes.value), testForkedParallel :== true, + testForkedParallelism :== None, javaOptions :== Nil, sbtPlugin :== false, isMetaBuild :== false, @@ -1123,8 +1124,9 @@ object Defaults extends BuildCommon { .value, testQuick / testFilter := Def.uncached(IncrementalTest.filterTask.value), extraTestDigests ++= IncrementalTest.extraTestDigestsTask.value, - executeTests := Def.uncached({ + executeTests := Def.uncached(Def.taskDyn { import sbt.TupleSyntax.* + val fpm = testForkedParallelism.value ( test / streams, loadedTestFrameworks, @@ -1132,25 +1134,13 @@ object Defaults extends BuildCommon { (test / testGrouping), (test / testExecution), (test / fullClasspath), - testForkedParallel, + testForkedParallel.toTaskable, (test / javaOptions), (classLoaderLayeringStrategy), thisProject, fileConverter, ).flatMapN { (s, lt, tl, gp, ex, cp, fp, jo, clls, thisProj, c) => - allTestGroupsTask( - s, - lt, - tl, - gp, - ex, - cp, - fp, - jo, - clls, - projectId = s"${thisProj.id} / ", - c, - ) + allTestGroupsTask(s, lt, tl, gp, ex, cp, fp, fpm, jo, clls, s"${thisProj.id} / ", c) } }.value), // ((streams in test, loadedTestFrameworks, testLoader, testGrouping in test, testExecution in test, fullClasspath in test, javaHome in test, testForkedParallel, javaOptions in test) flatMap allTestGroupsTask).value, @@ -1318,6 +1308,7 @@ object Defaults extends BuildCommon { newConfig, fullClasspath.value, testForkedParallel.value, + testForkedParallelism.value, javaOptions.value, classLoaderLayeringStrategy.value, projectId = s"${thisProject.value.id} / ", @@ -1367,6 +1358,7 @@ object Defaults extends BuildCommon { config, cp, forkedParallelExecution = false, + forkedParallelism = None, javaOptions = Nil, strategy = ClassLoaderLayeringStrategy.ScalaLibrary, projectId = "", @@ -1392,6 +1384,7 @@ object Defaults extends BuildCommon { config, cp, forkedParallelExecution, + forkedParallelism = None, javaOptions = Nil, strategy = ClassLoaderLayeringStrategy.ScalaLibrary, projectId = "", @@ -1399,6 +1392,36 @@ object Defaults extends BuildCommon { ) } + // Binary compatibility overload for sbt 2.0.0-RC7 + private[sbt] def allTestGroupsTask( + s: TaskStreams, + frameworks: Map[TestFramework, Framework], + loader: ClassLoader, + groups: Seq[Tests.Group], + config: Tests.Execution, + cp: Classpath, + forkedParallelExecution: Boolean, + javaOptions: Seq[String], + strategy: ClassLoaderLayeringStrategy, + projectId: String, + converter: FileConverter, + ): Task[Tests.Output] = { + allTestGroupsTask( + s, + frameworks, + loader, + groups, + config, + cp, + forkedParallelExecution, + forkedParallelism = None, + javaOptions, + strategy, + projectId, + converter, + ) + } + private[sbt] def allTestGroupsTask( s: TaskStreams, frameworks: Map[TestFramework, Framework], @@ -1407,6 +1430,7 @@ object Defaults extends BuildCommon { config: Tests.Execution, cp: Classpath, forkedParallelExecution: Boolean, + forkedParallelism: Option[Int], javaOptions: Seq[String], strategy: ClassLoaderLayeringStrategy, projectId: String, @@ -1435,7 +1459,9 @@ object Defaults extends BuildCommon { case Tests.SubProcess(opts) => s.log.debug(s"javaOptions: ${opts.runJVMOptions}") val forkedConfig = config.copy(parallel = config.parallel && forkedParallelExecution) - s.log.debug(s"Forking tests - parallelism = ${forkedConfig.parallel}") + s.log.debug( + s"Forking tests - parallelism = ${forkedConfig.parallel}, threads = ${forkedParallelism.getOrElse("auto")}" + ) ForkTests( runners, processedOptions(group), @@ -1444,6 +1470,7 @@ object Defaults extends BuildCommon { converter, opts, s.log, + forkedParallelism, (Tags.ForkedTestGroup, 1) +: group.tags* ) case Tests.InProcess => diff --git a/main/src/main/scala/sbt/Keys.scala b/main/src/main/scala/sbt/Keys.scala index 81ae104b8..6582cf51e 100644 --- a/main/src/main/scala/sbt/Keys.scala +++ b/main/src/main/scala/sbt/Keys.scala @@ -370,6 +370,7 @@ object Keys { @transient val testListeners = taskKey[Seq[TestReportListener]]("Defines test listeners.").withRank(DTask) val testForkedParallel = settingKey[Boolean]("Whether forked tests should be executed in parallel").withRank(CTask) + val testForkedParallelism = settingKey[Option[Int]]("Maximum number of parallel test threads when using testForkedParallel. Defaults to the number of available processors.").withRank(CTask) val testExecution = taskKey[Tests.Execution]("Settings controlling test execution").withRank(DTask) val testFilter = taskKey[Seq[String] => Seq[String => Boolean]]("Filter controlling whether the test is executed").withRank(DTask) val testResultLogger = settingKey[TestResultLogger]("Logs results after a test task completes.").withRank(DTask) diff --git a/worker/src/main/java/sbt/internal/worker1/ForkTestMain.java b/worker/src/main/java/sbt/internal/worker1/ForkTestMain.java index 22d824171..6620f32b1 100644 --- a/worker/src/main/java/sbt/internal/worker1/ForkTestMain.java +++ b/worker/src/main/java/sbt/internal/worker1/ForkTestMain.java @@ -311,12 +311,13 @@ public class ForkTestMain { this.originalOut.flush(); } - private ExecutorService executorService(final boolean parallel) { + private ExecutorService executorService(final boolean parallel, final Integer parallelism) { if (parallel) { - final int nbThreads = Runtime.getRuntime().availableProcessors(); + final int nbThreads = + (parallelism != null && parallelism > 0) + ? parallelism + : Runtime.getRuntime().availableProcessors(); logDebug("Create a test executor with a thread pool of " + nbThreads + " threads."); - // more options later... - // TODO we might want to configure the blocking queue with size #proc return Executors.newFixedThreadPool(nbThreads); } else { logDebug("Create a single-thread test executor"); @@ -326,7 +327,7 @@ public class ForkTestMain { private void runTests(TestInfo info, ClassLoader classLoader) throws Exception { Thread.currentThread().setContextClassLoader(classLoader); - final ExecutorService executor = executorService(info.parallel); + final ExecutorService executor = executorService(info.parallel, info.parallelism); final TaskDef[] tests = info.taskDefs.toArray(new TaskDef[] {}); final int nFrameworks = info.testRunners.size(); final Logger[] loggers = {remoteLogger(info.ansiCodesSupported)}; diff --git a/worker/src/main/java/sbt/internal/worker1/TestInfo.java b/worker/src/main/java/sbt/internal/worker1/TestInfo.java index a1990e0d7..e3c9adba9 100644 --- a/worker/src/main/java/sbt/internal/worker1/TestInfo.java +++ b/worker/src/main/java/sbt/internal/worker1/TestInfo.java @@ -33,6 +33,7 @@ public class TestInfo implements Serializable { public final RunInfo.NativeRunInfo nativeRunInfo; public final boolean ansiCodesSupported; public final boolean parallel; + public final Integer parallelism; public final ArrayList taskDefs; public final ArrayList testRunners; @@ -42,6 +43,7 @@ public class TestInfo implements Serializable { RunInfo.NativeRunInfo nativeRunInfo, boolean ansiCodesSupported, boolean parallel, + Integer parallelism, ArrayList taskDefs, ArrayList testRunners) { this.jvm = jvm; @@ -49,6 +51,7 @@ public class TestInfo implements Serializable { this.nativeRunInfo = nativeRunInfo; this.ansiCodesSupported = ansiCodesSupported; this.parallel = parallel; + this.parallelism = parallelism; this.taskDefs = taskDefs; this.testRunners = testRunners; }