From 64c0f0acdd71efde20deaf32d85a2a4b8ed62a15 Mon Sep 17 00:00:00 2001 From: Ethan Atkins Date: Fri, 14 Aug 2020 14:45:34 -0700 Subject: [PATCH] Cancel all running tasks with ctrl+c Frequently ctrl+c does not work to cancel the running tasks. This seems to be because the signal handler is bound to a specific instance of evaluate task but there may be multiple instances of evaluate task running at any given time. Shutting down just one of the running engines does not ensure that task evaluation stops. To work around this, we can globally store all of the completion services in a weak hash map and cancel all of them whenever a signal is received. Closing the service, which happens at the end of task evaluation will remove the service from the map so hopefully this shouldn't introduce a memory leak. --- main/src/main/scala/sbt/EvaluateTask.scala | 1 + .../scala/sbt/ConcurrentRestrictions.scala | 36 +++++++++++++++---- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/main/src/main/scala/sbt/EvaluateTask.scala b/main/src/main/scala/sbt/EvaluateTask.scala index 7292ff694..59bf062f3 100644 --- a/main/src/main/scala/sbt/EvaluateTask.scala +++ b/main/src/main/scala/sbt/EvaluateTask.scala @@ -480,6 +480,7 @@ object EvaluateTask { def cancelAndShutdown(): Unit = { println("") log.warn("Canceling execution...") + ConcurrentRestrictions.cancelAll() shutdown() } } diff --git a/tasks/src/main/scala/sbt/ConcurrentRestrictions.scala b/tasks/src/main/scala/sbt/ConcurrentRestrictions.scala index d289c453c..e3ed01c60 100644 --- a/tasks/src/main/scala/sbt/ConcurrentRestrictions.scala +++ b/tasks/src/main/scala/sbt/ConcurrentRestrictions.scala @@ -10,6 +10,8 @@ package sbt import java.util.concurrent.atomic.AtomicInteger import sbt.internal.util.AttributeKey +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.RejectedExecutionException /** * Describes restrictions on concurrent execution for a set of tasks. @@ -48,6 +50,12 @@ import java.util.concurrent.{ Executor, Executors, ExecutorCompletionService } import annotation.tailrec object ConcurrentRestrictions { + private[this] val completionServices = new java.util.WeakHashMap[CompletionService[_, _], Boolean] + import scala.collection.JavaConverters._ + def cancelAll() = completionServices.keySet.asScala.toVector.foreach { + case a: AutoCloseable => a.close() + case _ => + } /** * A ConcurrentRestrictions instance that places no restrictions on concurrently executing tasks. @@ -138,7 +146,8 @@ object ConcurrentRestrictions { val pool = Executors.newCachedThreadPool { r => new Thread(r, s"sbt-completion-service-pool-$id-${i.getAndIncrement()}") } - (completionService[A, R](pool, tags, warn), () => { pool.shutdownNow(); () }) + val service = completionService[A, R](pool, tags, warn) + (service, () => { service.close(); pool.shutdownNow(); () }) } def completionService[A, R]( @@ -147,7 +156,9 @@ object ConcurrentRestrictions { isSentinel: A => Boolean ): (CompletionService[A, R], () => Unit) = { val pool = Executors.newCachedThreadPool() - (completionService[A, R](pool, tags, warn, isSentinel), () => { + val service = completionService[A, R](pool, tags, warn, isSentinel) + (service, () => { + service.close() pool.shutdownNow() () }) @@ -157,7 +168,7 @@ object ConcurrentRestrictions { backing: Executor, tags: ConcurrentRestrictions[A], warn: String => Unit - ): CompletionService[A, R] = { + ): CompletionService[A, R] with AutoCloseable = { completionService[A, R](backing, tags, warn, (_: A) => false) } @@ -170,12 +181,18 @@ object ConcurrentRestrictions { tags: ConcurrentRestrictions[A], warn: String => Unit, isSentinel: A => Boolean, - ): CompletionService[A, R] = { + ): CompletionService[A, R] with AutoCloseable = { // Represents submitted work for a task. final class Enqueue(val node: A, val work: () => R) - new CompletionService[A, R] { + new CompletionService[A, R] with AutoCloseable { + completionServices.put(this, true) + private[this] val closed = new AtomicBoolean(false) + override def close(): Unit = if (closed.compareAndSet(false, true)) { + completionServices.remove(this) + () + } /** Backing service used to manage execution on threads once all constraints are satisfied. */ private[this] val jservice = new ExecutorCompletionService[R](backing) @@ -190,7 +207,8 @@ object ConcurrentRestrictions { private[this] val pending = new LinkedList[Enqueue] def submit(node: A, work: () => R): Unit = synchronized { - if (isSentinel(node)) { + if (closed.get) throw new RejectedExecutionException + else if (isSentinel(node)) { // skip all checks for sentinels CompletionService.submit(work, jservice) } else { @@ -253,7 +271,11 @@ object ConcurrentRestrictions { submitValid(tried) } - def take(): R = jservice.take().get() + def take(): R = { + if (closed.get) + throw new IllegalStateException("Tried to get values for a closed completion service") + jservice.take().get() + } } } }