From c2c2a26203b150196d7ddeda2f1fff5d441cb950 Mon Sep 17 00:00:00 2001 From: Ethan Atkins Date: Sun, 20 Sep 2020 21:42:06 -0700 Subject: [PATCH] Don't throw on closed completion service In 64c0f0acdd71efde20deaf32d85a2a4b8ed62a15, I attempted to safely close all of the completion services when the user inputs ctrl+c. I have noticed though that sometimes sbt crashes in CI with the RejectedExecutionException thrown by submit. To avoid throwing when there was no cancellation, I slightly modified the shutdown logic to not shutdown the completion service whil still shutting down the underlying thread pool. --- main/src/main/scala/sbt/EvaluateTask.scala | 11 ++++++----- .../main/scala/sbt/ConcurrentRestrictions.scala | 17 +++++++++++++++-- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/main/src/main/scala/sbt/EvaluateTask.scala b/main/src/main/scala/sbt/EvaluateTask.scala index 09aa2e278..4ff043af3 100644 --- a/main/src/main/scala/sbt/EvaluateTask.scala +++ b/main/src/main/scala/sbt/EvaluateTask.scala @@ -428,7 +428,7 @@ object EvaluateTask { triggers: Triggers[Task], config: EvaluateTaskConfig )(implicit taskToNode: NodeView[Task]): (State, Result[T]) = { - import ConcurrentRestrictions.{ completionService, tagged, tagsKey } + import ConcurrentRestrictions.{ cancellableCompletionService, tagged, tagsKey } val log = state.log log.debug( @@ -439,15 +439,15 @@ object EvaluateTask { val tags = tagged[Task[_]](tagMap, Tags.predicate(config.restrictions)) val (service, shutdownThreads) = - completionService[Task[_], Completed]( + cancellableCompletionService[Task[_], Completed]( tags, (s: String) => log.warn(s), (t: Task[_]) => tagMap(t).contains(Tags.Sentinel) ) - def shutdown(): Unit = { + def shutdownImpl(force: Boolean): Unit = { // First ensure that all threads are stopped for task execution. - shutdownThreads() + shutdownThreads(force) config.progressReporter.stop() // Now we run the gc cleanup to force finalizers to clear out file handles (yay GC!) @@ -455,6 +455,7 @@ object EvaluateTask { GCUtil.forceGcWithInterval(config.minForcegcInterval, log) } } + def shutdown(): Unit = shutdownImpl(false) // propagate the defining key for reporting the origin def overwriteNode(i: Incomplete): Boolean = i.node match { case Some(t: Task[_]) => transformNode(t).isEmpty @@ -482,7 +483,7 @@ object EvaluateTask { log.warn("Canceling execution...") RunningProcesses.killAll() ConcurrentRestrictions.cancelAll() - shutdown() + shutdownImpl(true) } } currentlyRunningEngine.set((SafeState(state), runningEngine)) diff --git a/tasks/src/main/scala/sbt/ConcurrentRestrictions.scala b/tasks/src/main/scala/sbt/ConcurrentRestrictions.scala index b5172ed57..d2407ccf4 100644 --- a/tasks/src/main/scala/sbt/ConcurrentRestrictions.scala +++ b/tasks/src/main/scala/sbt/ConcurrentRestrictions.scala @@ -157,7 +157,7 @@ object ConcurrentRestrictions { new Thread(r, s"sbt-completion-service-pool-$id-${i.getAndIncrement()}") } val service = completionService[A, R](pool, tags, warn) - (service, () => { service.close(); pool.shutdownNow(); () }) + (service, () => { pool.shutdownNow(); () }) } def completionService[A, R]( @@ -168,7 +168,20 @@ object ConcurrentRestrictions { val pool = Executors.newCachedThreadPool() val service = completionService[A, R](pool, tags, warn, isSentinel) (service, () => { - service.close() + pool.shutdownNow() + () + }) + } + + def cancellableCompletionService[A, R]( + tags: ConcurrentRestrictions[A], + warn: String => Unit, + isSentinel: A => Boolean + ): (CompletionService[A, R], Boolean => Unit) = { + val pool = Executors.newCachedThreadPool() + val service = completionService[A, R](pool, tags, warn, isSentinel) + (service, force => { + if (force) service.close() pool.shutdownNow() () })