Cancel all promises if one fails

Fixes https://github.com/sbt/sbt/issues/5822

Currently the entire shell gets stuck when there's a compilation error with pipelining.
This at least returns to sbt shell.
This commit is contained in:
Eugene Yokota 2020-09-14 01:04:10 -04:00
parent 6ebba09020
commit 55d2b0a3c6
4 changed files with 47 additions and 9 deletions

View File

@ -2039,6 +2039,7 @@ object Defaults extends BuildCommon {
} catch {
case e: Throwable if !promise.isCompleted =>
promise.failure(e)
ConcurrentRestrictions.cancelAllSentinels()
throw e
} finally {
i.setup.reporter match {

View File

@ -53,6 +53,7 @@ import sbt.util.{ Show, Level }
import sjsonnew.JsonFormat
import language.experimental.macros
import scala.concurrent.TimeoutException
import scala.concurrent.duration.FiniteDuration
sealed trait ProjectDefinition[PR <: ProjectReference] {
@ -887,8 +888,20 @@ object Project extends ProjectExtra {
(Def
.task {
val p = i.value
val result: A = Await.result(p.underlying.future, atMost)
result
var result: Option[A] = None
if (atMost == Duration.Inf) {
while (result.isEmpty) {
try {
result = Some(Await.result(p.underlying.future, Duration("1s")))
Thread.sleep(10)
} catch {
case _: TimeoutException => ()
}
}
} else {
result = Some(Await.result(p.underlying.future, atMost))
}
result.get
})
.tag(Tags.Sentinel)
}

View File

@ -29,6 +29,7 @@ import java.util.concurrent.{
Executor,
ExecutorCompletionService,
Executors,
Future => JFuture,
RejectedExecutionException,
CompletionService => JCompletionService
}
@ -52,8 +53,12 @@ object CompletionService {
def take() = completion.take().get()
}
def submit[T](work: () => T, completion: JCompletionService[T]): () => T = {
val future = submitFuture[T](work, completion)
() => future.get
}
private[sbt] def submitFuture[A](work: () => A, completion: JCompletionService[A]): JFuture[A] = {
val future = try completion.submit {
new Callable[T] {
new Callable[A] {
def call =
try {
work()
@ -66,7 +71,7 @@ object CompletionService {
case _: RejectedExecutionException =>
throw Incomplete(None, message = Some("cancelled"))
}
() => future.get()
future
}
def manage[A, T](
service: CompletionService[A, T]

View File

@ -11,7 +11,8 @@ import java.util.concurrent.atomic.AtomicInteger
import sbt.internal.util.AttributeKey
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.{ Future => JFuture, RejectedExecutionException }
import scala.collection.mutable
/**
* Describes restrictions on concurrent execution for a set of tasks.
@ -45,6 +46,10 @@ trait ConcurrentRestrictions[A] {
def valid(g: G): Boolean
}
private[sbt] sealed trait CancelSentiels {
def cancelSentinels(): Unit
}
import java.util.{ LinkedList, Queue }
import java.util.concurrent.{ Executor, Executors, ExecutorCompletionService }
import annotation.tailrec
@ -57,6 +62,11 @@ object ConcurrentRestrictions {
case _ =>
}
private[sbt] def cancelAllSentinels() = completionServices.keySet.asScala.toVector.foreach {
case a: CancelSentiels => a.cancelSentinels()
case _ =>
}
/**
* A ConcurrentRestrictions instance that places no restrictions on concurrently executing tasks.
* @param zero the constant placeholder used for t
@ -181,12 +191,12 @@ object ConcurrentRestrictions {
tags: ConcurrentRestrictions[A],
warn: String => Unit,
isSentinel: A => Boolean,
): CompletionService[A, R] with AutoCloseable = {
): CompletionService[A, R] with CancelSentiels with AutoCloseable = {
// Represents submitted work for a task.
final class Enqueue(val node: A, val work: () => R)
new CompletionService[A, R] with AutoCloseable {
new CompletionService[A, R] with CancelSentiels with AutoCloseable {
completionServices.put(this, true)
private[this] val closed = new AtomicBoolean(false)
override def close(): Unit = if (closed.compareAndSet(false, true)) {
@ -206,11 +216,20 @@ object ConcurrentRestrictions {
/** Tasks that cannot be run yet because they cannot execute concurrently with the currently running tasks.*/
private[this] val pending = new LinkedList[Enqueue]
private[this] val sentinels: mutable.ListBuffer[JFuture[_]] = mutable.ListBuffer.empty
def cancelSentinels(): Unit = {
sentinels.toList foreach { s =>
s.cancel(true)
}
sentinels.clear
}
def submit(node: A, work: () => R): Unit = synchronized {
if (closed.get) throw new RejectedExecutionException
else if (isSentinel(node)) {
// skip all checks for sentinels
CompletionService.submit(work, jservice)
sentinels += CompletionService.submitFuture(work, jservice)
} else {
val newState = tags.add(tagState, node)
// if the new task is allowed to run concurrently with the currently running tasks,
@ -232,7 +251,7 @@ object ConcurrentRestrictions {
val wrappedWork = () =>
try work()
finally cleanup(node)
CompletionService.submit(wrappedWork, jservice)
CompletionService.submitFuture(wrappedWork, jservice)
()
}
private[this] def cleanup(node: A): Unit = synchronized {