diff --git a/main/src/main/scala/sbt/EvaluateTask.scala b/main/src/main/scala/sbt/EvaluateTask.scala index 7292ff694..59bf062f3 100644 --- a/main/src/main/scala/sbt/EvaluateTask.scala +++ b/main/src/main/scala/sbt/EvaluateTask.scala @@ -480,6 +480,7 @@ object EvaluateTask { def cancelAndShutdown(): Unit = { println("") log.warn("Canceling execution...") + ConcurrentRestrictions.cancelAll() shutdown() } } diff --git a/tasks/src/main/scala/sbt/ConcurrentRestrictions.scala b/tasks/src/main/scala/sbt/ConcurrentRestrictions.scala index d289c453c..e3ed01c60 100644 --- a/tasks/src/main/scala/sbt/ConcurrentRestrictions.scala +++ b/tasks/src/main/scala/sbt/ConcurrentRestrictions.scala @@ -10,6 +10,8 @@ package sbt import java.util.concurrent.atomic.AtomicInteger import sbt.internal.util.AttributeKey +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.RejectedExecutionException /** * Describes restrictions on concurrent execution for a set of tasks. @@ -48,6 +50,12 @@ import java.util.concurrent.{ Executor, Executors, ExecutorCompletionService } import annotation.tailrec object ConcurrentRestrictions { + private[this] val completionServices = new java.util.WeakHashMap[CompletionService[_, _], Boolean] + import scala.collection.JavaConverters._ + def cancelAll() = completionServices.keySet.asScala.toVector.foreach { + case a: AutoCloseable => a.close() + case _ => + } /** * A ConcurrentRestrictions instance that places no restrictions on concurrently executing tasks. @@ -138,7 +146,8 @@ object ConcurrentRestrictions { val pool = Executors.newCachedThreadPool { r => new Thread(r, s"sbt-completion-service-pool-$id-${i.getAndIncrement()}") } - (completionService[A, R](pool, tags, warn), () => { pool.shutdownNow(); () }) + val service = completionService[A, R](pool, tags, warn) + (service, () => { service.close(); pool.shutdownNow(); () }) } def completionService[A, R]( @@ -147,7 +156,9 @@ object ConcurrentRestrictions { isSentinel: A => Boolean ): (CompletionService[A, R], () => Unit) = { val pool = Executors.newCachedThreadPool() - (completionService[A, R](pool, tags, warn, isSentinel), () => { + val service = completionService[A, R](pool, tags, warn, isSentinel) + (service, () => { + service.close() pool.shutdownNow() () }) @@ -157,7 +168,7 @@ object ConcurrentRestrictions { backing: Executor, tags: ConcurrentRestrictions[A], warn: String => Unit - ): CompletionService[A, R] = { + ): CompletionService[A, R] with AutoCloseable = { completionService[A, R](backing, tags, warn, (_: A) => false) } @@ -170,12 +181,18 @@ object ConcurrentRestrictions { tags: ConcurrentRestrictions[A], warn: String => Unit, isSentinel: A => Boolean, - ): CompletionService[A, R] = { + ): CompletionService[A, R] with AutoCloseable = { // Represents submitted work for a task. final class Enqueue(val node: A, val work: () => R) - new CompletionService[A, R] { + new CompletionService[A, R] with AutoCloseable { + completionServices.put(this, true) + private[this] val closed = new AtomicBoolean(false) + override def close(): Unit = if (closed.compareAndSet(false, true)) { + completionServices.remove(this) + () + } /** Backing service used to manage execution on threads once all constraints are satisfied. */ private[this] val jservice = new ExecutorCompletionService[R](backing) @@ -190,7 +207,8 @@ object ConcurrentRestrictions { private[this] val pending = new LinkedList[Enqueue] def submit(node: A, work: () => R): Unit = synchronized { - if (isSentinel(node)) { + if (closed.get) throw new RejectedExecutionException + else if (isSentinel(node)) { // skip all checks for sentinels CompletionService.submit(work, jservice) } else { @@ -253,7 +271,11 @@ object ConcurrentRestrictions { submitValid(tried) } - def take(): R = jservice.take().get() + def take(): R = { + if (closed.get) + throw new IllegalStateException("Tried to get values for a closed completion service") + jservice.take().get() + } } } }