Merge pull request #5752 from eatkins/cancel-all

Cancel all running tasks with ctrl+c
This commit is contained in:
eugene yokota 2020-08-14 21:57:13 -04:00 committed by GitHub
commit 8ce423b088
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 7 deletions

View File

@ -480,6 +480,7 @@ object EvaluateTask {
def cancelAndShutdown(): Unit = {
println("")
log.warn("Canceling execution...")
ConcurrentRestrictions.cancelAll()
shutdown()
}
}

View File

@ -10,6 +10,8 @@ package sbt
import java.util.concurrent.atomic.AtomicInteger
import sbt.internal.util.AttributeKey
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.RejectedExecutionException
/**
* Describes restrictions on concurrent execution for a set of tasks.
@ -48,6 +50,12 @@ import java.util.concurrent.{ Executor, Executors, ExecutorCompletionService }
import annotation.tailrec
object ConcurrentRestrictions {
private[this] val completionServices = new java.util.WeakHashMap[CompletionService[_, _], Boolean]
import scala.collection.JavaConverters._
def cancelAll() = completionServices.keySet.asScala.toVector.foreach {
case a: AutoCloseable => a.close()
case _ =>
}
/**
* A ConcurrentRestrictions instance that places no restrictions on concurrently executing tasks.
@ -138,7 +146,8 @@ object ConcurrentRestrictions {
val pool = Executors.newCachedThreadPool { r =>
new Thread(r, s"sbt-completion-service-pool-$id-${i.getAndIncrement()}")
}
(completionService[A, R](pool, tags, warn), () => { pool.shutdownNow(); () })
val service = completionService[A, R](pool, tags, warn)
(service, () => { service.close(); pool.shutdownNow(); () })
}
def completionService[A, R](
@ -147,7 +156,9 @@ object ConcurrentRestrictions {
isSentinel: A => Boolean
): (CompletionService[A, R], () => Unit) = {
val pool = Executors.newCachedThreadPool()
(completionService[A, R](pool, tags, warn, isSentinel), () => {
val service = completionService[A, R](pool, tags, warn, isSentinel)
(service, () => {
service.close()
pool.shutdownNow()
()
})
@ -157,7 +168,7 @@ object ConcurrentRestrictions {
backing: Executor,
tags: ConcurrentRestrictions[A],
warn: String => Unit
): CompletionService[A, R] = {
): CompletionService[A, R] with AutoCloseable = {
completionService[A, R](backing, tags, warn, (_: A) => false)
}
@ -170,12 +181,18 @@ object ConcurrentRestrictions {
tags: ConcurrentRestrictions[A],
warn: String => Unit,
isSentinel: A => Boolean,
): CompletionService[A, R] = {
): CompletionService[A, R] with AutoCloseable = {
// Represents submitted work for a task.
final class Enqueue(val node: A, val work: () => R)
new CompletionService[A, R] {
new CompletionService[A, R] with AutoCloseable {
completionServices.put(this, true)
private[this] val closed = new AtomicBoolean(false)
override def close(): Unit = if (closed.compareAndSet(false, true)) {
completionServices.remove(this)
()
}
/** Backing service used to manage execution on threads once all constraints are satisfied. */
private[this] val jservice = new ExecutorCompletionService[R](backing)
@ -190,7 +207,8 @@ object ConcurrentRestrictions {
private[this] val pending = new LinkedList[Enqueue]
def submit(node: A, work: () => R): Unit = synchronized {
if (isSentinel(node)) {
if (closed.get) throw new RejectedExecutionException
else if (isSentinel(node)) {
// skip all checks for sentinels
CompletionService.submit(work, jservice)
} else {
@ -253,7 +271,11 @@ object ConcurrentRestrictions {
submitValid(tried)
}
def take(): R = jservice.take().get()
def take(): R = {
if (closed.get)
throw new IllegalStateException("Tried to get values for a closed completion service")
jservice.take().get()
}
}
}
}