Don't throw on closed completion service

In 64c0f0acdd, I attempted to safely close
all of the completion services when the user inputs ctrl+c. I have
noticed though that sometimes sbt crashes in CI with the
RejectedExecutionException thrown by submit. To avoid throwing when
there was no cancellation, I slightly modified the shutdown logic to not
shutdown the completion service whil still shutting down the underlying
thread pool.
This commit is contained in:
Ethan Atkins 2020-09-20 21:42:06 -07:00
parent d96ecb2c21
commit c2c2a26203
2 changed files with 21 additions and 7 deletions

View File

@ -428,7 +428,7 @@ object EvaluateTask {
triggers: Triggers[Task],
config: EvaluateTaskConfig
)(implicit taskToNode: NodeView[Task]): (State, Result[T]) = {
import ConcurrentRestrictions.{ completionService, tagged, tagsKey }
import ConcurrentRestrictions.{ cancellableCompletionService, tagged, tagsKey }
val log = state.log
log.debug(
@ -439,15 +439,15 @@ object EvaluateTask {
val tags =
tagged[Task[_]](tagMap, Tags.predicate(config.restrictions))
val (service, shutdownThreads) =
completionService[Task[_], Completed](
cancellableCompletionService[Task[_], Completed](
tags,
(s: String) => log.warn(s),
(t: Task[_]) => tagMap(t).contains(Tags.Sentinel)
)
def shutdown(): Unit = {
def shutdownImpl(force: Boolean): Unit = {
// First ensure that all threads are stopped for task execution.
shutdownThreads()
shutdownThreads(force)
config.progressReporter.stop()
// Now we run the gc cleanup to force finalizers to clear out file handles (yay GC!)
@ -455,6 +455,7 @@ object EvaluateTask {
GCUtil.forceGcWithInterval(config.minForcegcInterval, log)
}
}
def shutdown(): Unit = shutdownImpl(false)
// propagate the defining key for reporting the origin
def overwriteNode(i: Incomplete): Boolean = i.node match {
case Some(t: Task[_]) => transformNode(t).isEmpty
@ -482,7 +483,7 @@ object EvaluateTask {
log.warn("Canceling execution...")
RunningProcesses.killAll()
ConcurrentRestrictions.cancelAll()
shutdown()
shutdownImpl(true)
}
}
currentlyRunningEngine.set((SafeState(state), runningEngine))

View File

@ -157,7 +157,7 @@ object ConcurrentRestrictions {
new Thread(r, s"sbt-completion-service-pool-$id-${i.getAndIncrement()}")
}
val service = completionService[A, R](pool, tags, warn)
(service, () => { service.close(); pool.shutdownNow(); () })
(service, () => { pool.shutdownNow(); () })
}
def completionService[A, R](
@ -168,7 +168,20 @@ object ConcurrentRestrictions {
val pool = Executors.newCachedThreadPool()
val service = completionService[A, R](pool, tags, warn, isSentinel)
(service, () => {
service.close()
pool.shutdownNow()
()
})
}
def cancellableCompletionService[A, R](
tags: ConcurrentRestrictions[A],
warn: String => Unit,
isSentinel: A => Boolean
): (CompletionService[A, R], Boolean => Unit) = {
val pool = Executors.newCachedThreadPool()
val service = completionService[A, R](pool, tags, warn, isSentinel)
(service, force => {
if (force) service.close()
pool.shutdownNow()
()
})