[2.x] Add testForkedParallelism setting for forked test thread count (#8453)

**Problems**

When running forked tests, sbt uses `Runtime.getRuntime().availableProcessors()` to determine the thread pool size, ignoring `concurrentRestrictions`. This is inconsistent with non-forked parallel tests.

**Expectations**

Users should be able to control the number of parallel test threads in forked mode, similar to how `concurrentRestrictions` works for non-forked tests.

**Notes**

Added a new setting `testForkedParallelism` that allows explicit control:

```scala
testForkedParallelism := Some(2)  // Use 2 threads
testForkedParallelism := None     // Use availableProcessors() (default)
```
This commit is contained in:
MkDev11 2026-01-09 09:43:50 -08:00 committed by GitHub
parent 7320b7176a
commit 061145e67b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 66 additions and 27 deletions

View File

@ -439,6 +439,7 @@ lazy val workerProj = (project in file("worker"))
mimaBinaryIssueFilters ++= Vector(
exclude[MissingClassProblem]("com.google.gson.typeadapters.RuntimeTypeAdapterFactory"),
exclude[IncompatibleResultTypeProblem]("sbt.internal.worker1.WorkerMain.mkGson"),
exclude[DirectMissingMethodProblem]("sbt.internal.worker1.TestInfo.this"),
),
)
.configure(addSbtIOForTest)

View File

@ -45,6 +45,7 @@ private[sbt] object ForkTests:
converter: FileConverter,
fork: ForkOptions,
log: Logger,
parallelism: Option[Int],
tags: (Tag, Int)*
): Task[TestOutput] = {
import std.TaskExtra.*
@ -56,9 +57,10 @@ private[sbt] object ForkTests:
if opts.tests.isEmpty then
constant(TestOutput(TestResult.Passed, Map.empty[String, SuiteResult], Iterable.empty))
else
mainTestTask(runners, opts, classpath, converter, fork, log, config.parallel).tagw(
config.tags*
)
mainTestTask(runners, opts, classpath, converter, fork, log, config.parallel, parallelism)
.tagw(
config.tags*
)
main.tagw(tags*).dependsOn(all(opts.setup)*) flatMap { results =>
all(opts.cleanup).join.map(_ => results)
}
@ -72,10 +74,11 @@ private[sbt] object ForkTests:
converter: FileConverter,
fork: ForkOptions,
log: Logger,
parallelism: Option[Int],
tags: (Tag, Int)*
): Task[TestOutput] = {
val opts = processOptions(config, tests, log)
apply(runners, opts, config, classpath, converter, fork, log, tags*)
apply(runners, opts, config, classpath, converter, fork, log, parallelism, tags*)
}
def apply(
@ -86,9 +89,10 @@ private[sbt] object ForkTests:
converter: FileConverter,
fork: ForkOptions,
log: Logger,
parallelism: Option[Int],
tag: Tag
): Task[TestOutput] = {
apply(runners, tests, config, classpath, converter, fork, log, tag -> 1)
apply(runners, tests, config, classpath, converter, fork, log, parallelism, tag -> 1)
}
private def mainTestTask(
@ -98,7 +102,8 @@ private[sbt] object ForkTests:
converter: FileConverter,
fork: ForkOptions,
log: Logger,
parallel: Boolean
parallel: Boolean,
parallelism: Option[Int]
): Task[TestOutput] =
std.TaskExtra.task {
val testListeners = opts.testListeners.flatMap:
@ -148,6 +153,7 @@ private[sbt] object ForkTests:
null,
UTerminal.isAnsiSupported,
parallel,
parallelism.map(Integer.valueOf).orNull,
ArrayList(taskdefs.asJava),
ArrayList(testRunners.asJava),
)

View File

@ -201,6 +201,7 @@ object Defaults extends BuildCommon {
javaHomes :== ListMap.empty,
fullJavaHomes := CrossJava.expandJavaHomes(discoveredJavaHomes.value ++ javaHomes.value),
testForkedParallel :== true,
testForkedParallelism :== None,
javaOptions :== Nil,
sbtPlugin :== false,
isMetaBuild :== false,
@ -1123,8 +1124,9 @@ object Defaults extends BuildCommon {
.value,
testQuick / testFilter := Def.uncached(IncrementalTest.filterTask.value),
extraTestDigests ++= IncrementalTest.extraTestDigestsTask.value,
executeTests := Def.uncached({
executeTests := Def.uncached(Def.taskDyn {
import sbt.TupleSyntax.*
val fpm = testForkedParallelism.value
(
test / streams,
loadedTestFrameworks,
@ -1132,25 +1134,13 @@ object Defaults extends BuildCommon {
(test / testGrouping),
(test / testExecution),
(test / fullClasspath),
testForkedParallel,
testForkedParallel.toTaskable,
(test / javaOptions),
(classLoaderLayeringStrategy),
thisProject,
fileConverter,
).flatMapN { (s, lt, tl, gp, ex, cp, fp, jo, clls, thisProj, c) =>
allTestGroupsTask(
s,
lt,
tl,
gp,
ex,
cp,
fp,
jo,
clls,
projectId = s"${thisProj.id} / ",
c,
)
allTestGroupsTask(s, lt, tl, gp, ex, cp, fp, fpm, jo, clls, s"${thisProj.id} / ", c)
}
}.value),
// ((streams in test, loadedTestFrameworks, testLoader, testGrouping in test, testExecution in test, fullClasspath in test, javaHome in test, testForkedParallel, javaOptions in test) flatMap allTestGroupsTask).value,
@ -1318,6 +1308,7 @@ object Defaults extends BuildCommon {
newConfig,
fullClasspath.value,
testForkedParallel.value,
testForkedParallelism.value,
javaOptions.value,
classLoaderLayeringStrategy.value,
projectId = s"${thisProject.value.id} / ",
@ -1367,6 +1358,7 @@ object Defaults extends BuildCommon {
config,
cp,
forkedParallelExecution = false,
forkedParallelism = None,
javaOptions = Nil,
strategy = ClassLoaderLayeringStrategy.ScalaLibrary,
projectId = "",
@ -1392,6 +1384,7 @@ object Defaults extends BuildCommon {
config,
cp,
forkedParallelExecution,
forkedParallelism = None,
javaOptions = Nil,
strategy = ClassLoaderLayeringStrategy.ScalaLibrary,
projectId = "",
@ -1399,6 +1392,36 @@ object Defaults extends BuildCommon {
)
}
// Binary compatibility overload for sbt 2.0.0-RC7
private[sbt] def allTestGroupsTask(
s: TaskStreams,
frameworks: Map[TestFramework, Framework],
loader: ClassLoader,
groups: Seq[Tests.Group],
config: Tests.Execution,
cp: Classpath,
forkedParallelExecution: Boolean,
javaOptions: Seq[String],
strategy: ClassLoaderLayeringStrategy,
projectId: String,
converter: FileConverter,
): Task[Tests.Output] = {
allTestGroupsTask(
s,
frameworks,
loader,
groups,
config,
cp,
forkedParallelExecution,
forkedParallelism = None,
javaOptions,
strategy,
projectId,
converter,
)
}
private[sbt] def allTestGroupsTask(
s: TaskStreams,
frameworks: Map[TestFramework, Framework],
@ -1407,6 +1430,7 @@ object Defaults extends BuildCommon {
config: Tests.Execution,
cp: Classpath,
forkedParallelExecution: Boolean,
forkedParallelism: Option[Int],
javaOptions: Seq[String],
strategy: ClassLoaderLayeringStrategy,
projectId: String,
@ -1435,7 +1459,9 @@ object Defaults extends BuildCommon {
case Tests.SubProcess(opts) =>
s.log.debug(s"javaOptions: ${opts.runJVMOptions}")
val forkedConfig = config.copy(parallel = config.parallel && forkedParallelExecution)
s.log.debug(s"Forking tests - parallelism = ${forkedConfig.parallel}")
s.log.debug(
s"Forking tests - parallelism = ${forkedConfig.parallel}, threads = ${forkedParallelism.getOrElse("auto")}"
)
ForkTests(
runners,
processedOptions(group),
@ -1444,6 +1470,7 @@ object Defaults extends BuildCommon {
converter,
opts,
s.log,
forkedParallelism,
(Tags.ForkedTestGroup, 1) +: group.tags*
)
case Tests.InProcess =>

View File

@ -370,6 +370,7 @@ object Keys {
@transient
val testListeners = taskKey[Seq[TestReportListener]]("Defines test listeners.").withRank(DTask)
val testForkedParallel = settingKey[Boolean]("Whether forked tests should be executed in parallel").withRank(CTask)
val testForkedParallelism = settingKey[Option[Int]]("Maximum number of parallel test threads when using testForkedParallel. Defaults to the number of available processors.").withRank(CTask)
val testExecution = taskKey[Tests.Execution]("Settings controlling test execution").withRank(DTask)
val testFilter = taskKey[Seq[String] => Seq[String => Boolean]]("Filter controlling whether the test is executed").withRank(DTask)
val testResultLogger = settingKey[TestResultLogger]("Logs results after a test task completes.").withRank(DTask)

View File

@ -311,12 +311,13 @@ public class ForkTestMain {
this.originalOut.flush();
}
private ExecutorService executorService(final boolean parallel) {
private ExecutorService executorService(final boolean parallel, final Integer parallelism) {
if (parallel) {
final int nbThreads = Runtime.getRuntime().availableProcessors();
final int nbThreads =
(parallelism != null && parallelism > 0)
? parallelism
: Runtime.getRuntime().availableProcessors();
logDebug("Create a test executor with a thread pool of " + nbThreads + " threads.");
// more options later...
// TODO we might want to configure the blocking queue with size #proc
return Executors.newFixedThreadPool(nbThreads);
} else {
logDebug("Create a single-thread test executor");
@ -326,7 +327,7 @@ public class ForkTestMain {
private void runTests(TestInfo info, ClassLoader classLoader) throws Exception {
Thread.currentThread().setContextClassLoader(classLoader);
final ExecutorService executor = executorService(info.parallel);
final ExecutorService executor = executorService(info.parallel, info.parallelism);
final TaskDef[] tests = info.taskDefs.toArray(new TaskDef[] {});
final int nFrameworks = info.testRunners.size();
final Logger[] loggers = {remoteLogger(info.ansiCodesSupported)};

View File

@ -33,6 +33,7 @@ public class TestInfo implements Serializable {
public final RunInfo.NativeRunInfo nativeRunInfo;
public final boolean ansiCodesSupported;
public final boolean parallel;
public final Integer parallelism;
public final ArrayList<TaskDef> taskDefs;
public final ArrayList<TestRunner> testRunners;
@ -42,6 +43,7 @@ public class TestInfo implements Serializable {
RunInfo.NativeRunInfo nativeRunInfo,
boolean ansiCodesSupported,
boolean parallel,
Integer parallelism,
ArrayList<TaskDef> taskDefs,
ArrayList<TestRunner> testRunners) {
this.jvm = jvm;
@ -49,6 +51,7 @@ public class TestInfo implements Serializable {
this.nativeRunInfo = nativeRunInfo;
this.ansiCodesSupported = ansiCodesSupported;
this.parallel = parallel;
this.parallelism = parallelism;
this.taskDefs = taskDefs;
this.testRunners = testRunners;
}