diff --git a/main/src/main/scala/sbt/Defaults.scala b/main/src/main/scala/sbt/Defaults.scala index 0498650b8..8e379b6b5 100644 --- a/main/src/main/scala/sbt/Defaults.scala +++ b/main/src/main/scala/sbt/Defaults.scala @@ -2039,6 +2039,7 @@ object Defaults extends BuildCommon { } catch { case e: Throwable if !promise.isCompleted => promise.failure(e) + ConcurrentRestrictions.cancelAllSentinels() throw e } finally { i.setup.reporter match { diff --git a/main/src/main/scala/sbt/Project.scala b/main/src/main/scala/sbt/Project.scala index a86dd155a..232410149 100755 --- a/main/src/main/scala/sbt/Project.scala +++ b/main/src/main/scala/sbt/Project.scala @@ -53,6 +53,7 @@ import sbt.util.{ Show, Level } import sjsonnew.JsonFormat import language.experimental.macros +import scala.concurrent.TimeoutException import scala.concurrent.duration.FiniteDuration sealed trait ProjectDefinition[PR <: ProjectReference] { @@ -887,8 +888,20 @@ object Project extends ProjectExtra { (Def .task { val p = i.value - val result: A = Await.result(p.underlying.future, atMost) - result + var result: Option[A] = None + if (atMost == Duration.Inf) { + while (result.isEmpty) { + try { + result = Some(Await.result(p.underlying.future, Duration("1s"))) + Thread.sleep(10) + } catch { + case _: TimeoutException => () + } + } + } else { + result = Some(Await.result(p.underlying.future, atMost)) + } + result.get }) .tag(Tags.Sentinel) } diff --git a/tasks/src/main/scala/sbt/CompletionService.scala b/tasks/src/main/scala/sbt/CompletionService.scala index 347c514a6..f9586a5da 100644 --- a/tasks/src/main/scala/sbt/CompletionService.scala +++ b/tasks/src/main/scala/sbt/CompletionService.scala @@ -29,6 +29,7 @@ import java.util.concurrent.{ Executor, ExecutorCompletionService, Executors, + Future => JFuture, RejectedExecutionException, CompletionService => JCompletionService } @@ -52,8 +53,12 @@ object CompletionService { def take() = completion.take().get() } def submit[T](work: () => T, completion: JCompletionService[T]): () => T = { + val future = submitFuture[T](work, completion) + () => future.get + } + private[sbt] def submitFuture[A](work: () => A, completion: JCompletionService[A]): JFuture[A] = { val future = try completion.submit { - new Callable[T] { + new Callable[A] { def call = try { work() @@ -66,7 +71,7 @@ object CompletionService { case _: RejectedExecutionException => throw Incomplete(None, message = Some("cancelled")) } - () => future.get() + future } def manage[A, T]( service: CompletionService[A, T] diff --git a/tasks/src/main/scala/sbt/ConcurrentRestrictions.scala b/tasks/src/main/scala/sbt/ConcurrentRestrictions.scala index e3ed01c60..b5172ed57 100644 --- a/tasks/src/main/scala/sbt/ConcurrentRestrictions.scala +++ b/tasks/src/main/scala/sbt/ConcurrentRestrictions.scala @@ -11,7 +11,8 @@ import java.util.concurrent.atomic.AtomicInteger import sbt.internal.util.AttributeKey import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.{ Future => JFuture, RejectedExecutionException } +import scala.collection.mutable /** * Describes restrictions on concurrent execution for a set of tasks. @@ -45,6 +46,10 @@ trait ConcurrentRestrictions[A] { def valid(g: G): Boolean } +private[sbt] sealed trait CancelSentiels { + def cancelSentinels(): Unit +} + import java.util.{ LinkedList, Queue } import java.util.concurrent.{ Executor, Executors, ExecutorCompletionService } import annotation.tailrec @@ -57,6 +62,11 @@ object ConcurrentRestrictions { case _ => } + private[sbt] def cancelAllSentinels() = completionServices.keySet.asScala.toVector.foreach { + case a: CancelSentiels => a.cancelSentinels() + case _ => + } + /** * A ConcurrentRestrictions instance that places no restrictions on concurrently executing tasks. * @param zero the constant placeholder used for t @@ -181,12 +191,12 @@ object ConcurrentRestrictions { tags: ConcurrentRestrictions[A], warn: String => Unit, isSentinel: A => Boolean, - ): CompletionService[A, R] with AutoCloseable = { + ): CompletionService[A, R] with CancelSentiels with AutoCloseable = { // Represents submitted work for a task. final class Enqueue(val node: A, val work: () => R) - new CompletionService[A, R] with AutoCloseable { + new CompletionService[A, R] with CancelSentiels with AutoCloseable { completionServices.put(this, true) private[this] val closed = new AtomicBoolean(false) override def close(): Unit = if (closed.compareAndSet(false, true)) { @@ -206,11 +216,20 @@ object ConcurrentRestrictions { /** Tasks that cannot be run yet because they cannot execute concurrently with the currently running tasks.*/ private[this] val pending = new LinkedList[Enqueue] + private[this] val sentinels: mutable.ListBuffer[JFuture[_]] = mutable.ListBuffer.empty + + def cancelSentinels(): Unit = { + sentinels.toList foreach { s => + s.cancel(true) + } + sentinels.clear + } + def submit(node: A, work: () => R): Unit = synchronized { if (closed.get) throw new RejectedExecutionException else if (isSentinel(node)) { // skip all checks for sentinels - CompletionService.submit(work, jservice) + sentinels += CompletionService.submitFuture(work, jservice) } else { val newState = tags.add(tagState, node) // if the new task is allowed to run concurrently with the currently running tasks, @@ -232,7 +251,7 @@ object ConcurrentRestrictions { val wrappedWork = () => try work() finally cleanup(node) - CompletionService.submit(wrappedWork, jservice) + CompletionService.submitFuture(wrappedWork, jservice) () } private[this] def cleanup(node: A): Unit = synchronized {