Cancel all running tasks with ctrl+c

Frequently ctrl+c does not work to cancel the running tasks. This seems
to be because the signal handler is bound to a specific instance of
evaluate task but there may be multiple instances of evaluate task
running at any given time. Shutting down just one of the running engines
does not ensure that task evaluation stops. To work around this, we can
globally store all of the completion services in a weak hash map and
cancel all of them whenever a signal is received. Closing the service,
which happens at the end of task evaluation will remove the service from
the map so hopefully this shouldn't introduce a memory leak.
This commit is contained in:
Ethan Atkins 2020-08-14 14:45:34 -07:00
parent 91367bb34a
commit 64c0f0acdd
2 changed files with 30 additions and 7 deletions

View File

@ -480,6 +480,7 @@ object EvaluateTask {
def cancelAndShutdown(): Unit = {
println("")
log.warn("Canceling execution...")
ConcurrentRestrictions.cancelAll()
shutdown()
}
}

View File

@ -10,6 +10,8 @@ package sbt
import java.util.concurrent.atomic.AtomicInteger
import sbt.internal.util.AttributeKey
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.RejectedExecutionException
/**
* Describes restrictions on concurrent execution for a set of tasks.
@ -48,6 +50,12 @@ import java.util.concurrent.{ Executor, Executors, ExecutorCompletionService }
import annotation.tailrec
object ConcurrentRestrictions {
private[this] val completionServices = new java.util.WeakHashMap[CompletionService[_, _], Boolean]
import scala.collection.JavaConverters._
def cancelAll() = completionServices.keySet.asScala.toVector.foreach {
case a: AutoCloseable => a.close()
case _ =>
}
/**
* A ConcurrentRestrictions instance that places no restrictions on concurrently executing tasks.
@ -138,7 +146,8 @@ object ConcurrentRestrictions {
val pool = Executors.newCachedThreadPool { r =>
new Thread(r, s"sbt-completion-service-pool-$id-${i.getAndIncrement()}")
}
(completionService[A, R](pool, tags, warn), () => { pool.shutdownNow(); () })
val service = completionService[A, R](pool, tags, warn)
(service, () => { service.close(); pool.shutdownNow(); () })
}
def completionService[A, R](
@ -147,7 +156,9 @@ object ConcurrentRestrictions {
isSentinel: A => Boolean
): (CompletionService[A, R], () => Unit) = {
val pool = Executors.newCachedThreadPool()
(completionService[A, R](pool, tags, warn, isSentinel), () => {
val service = completionService[A, R](pool, tags, warn, isSentinel)
(service, () => {
service.close()
pool.shutdownNow()
()
})
@ -157,7 +168,7 @@ object ConcurrentRestrictions {
backing: Executor,
tags: ConcurrentRestrictions[A],
warn: String => Unit
): CompletionService[A, R] = {
): CompletionService[A, R] with AutoCloseable = {
completionService[A, R](backing, tags, warn, (_: A) => false)
}
@ -170,12 +181,18 @@ object ConcurrentRestrictions {
tags: ConcurrentRestrictions[A],
warn: String => Unit,
isSentinel: A => Boolean,
): CompletionService[A, R] = {
): CompletionService[A, R] with AutoCloseable = {
// Represents submitted work for a task.
final class Enqueue(val node: A, val work: () => R)
new CompletionService[A, R] {
new CompletionService[A, R] with AutoCloseable {
completionServices.put(this, true)
private[this] val closed = new AtomicBoolean(false)
override def close(): Unit = if (closed.compareAndSet(false, true)) {
completionServices.remove(this)
()
}
/** Backing service used to manage execution on threads once all constraints are satisfied. */
private[this] val jservice = new ExecutorCompletionService[R](backing)
@ -190,7 +207,8 @@ object ConcurrentRestrictions {
private[this] val pending = new LinkedList[Enqueue]
def submit(node: A, work: () => R): Unit = synchronized {
if (isSentinel(node)) {
if (closed.get) throw new RejectedExecutionException
else if (isSentinel(node)) {
// skip all checks for sentinels
CompletionService.submit(work, jservice)
} else {
@ -253,7 +271,11 @@ object ConcurrentRestrictions {
submitValid(tried)
}
def take(): R = jservice.take().get()
def take(): R = {
if (closed.get)
throw new IllegalStateException("Tried to get values for a closed completion service")
jservice.take().get()
}
}
}
}