Merge pull request #5259 from eatkins/background-jobs

Background jobs
This commit is contained in:
eugene yokota 2019-12-02 15:55:15 -05:00 committed by GitHub
commit 3fc9513ec5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 73 additions and 50 deletions

View File

@ -8,11 +8,14 @@
package sbt
import java.io.Closeable
import sbt.util.Logger
import Def.{ ScopedKey, Classpath }
import Def.{ Classpath, ScopedKey }
import sbt.internal.util.complete._
import java.io.File
import scala.util.Try
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
abstract class BackgroundJobService extends Closeable {
@ -49,9 +52,19 @@ abstract class BackgroundJobService extends Closeable {
def jobs: Vector[JobHandle]
def stop(job: JobHandle): Unit
/**
* Delegate to waitFor but catches any exceptions and returns the result in an instance of `Try`.
* @param job the job to wait for
* @return the result of waiting for the job to complete.
*/
def waitForTry(job: JobHandle): Try[Unit] = {
// This implementation is provided only for backward compatibility.
Try(waitFor(job))
try Success(waitFor(job))
catch {
case NonFatal(e) =>
try stop(job)
catch { case NonFatal(_) => }
Failure(e)
}
}
def waitFor(job: JobHandle): Unit

View File

@ -351,7 +351,7 @@ object Defaults extends BuildCommon {
sys.env.contains("CI") || SysProp.ci,
// watch related settings
pollInterval :== Watch.defaultPollInterval,
) ++ LintUnused.lintSettings
) ++ LintUnused.lintSettings ++ DefaultBackgroundJobService.backgroundJobServiceSettings
)
def defaultTestTasks(key: Scoped): Seq[Setting[_]] =
@ -1599,7 +1599,8 @@ object Defaults extends BuildCommon {
}
def bgWaitForTask: Initialize[InputTask[Unit]] = foreachJobTask { (manager, handle) =>
manager.waitFor(handle)
manager.waitForTry(handle)
()
}
def docTaskSettings(key: TaskKey[File] = doc): Seq[Setting[_]] =

View File

@ -251,6 +251,7 @@ object Keys {
val javaOptions = taskKey[Seq[String]]("Options passed to a new JVM when forking.").withRank(BPlusTask)
val envVars = taskKey[Map[String, String]]("Environment variables used when forking a new JVM").withRank(BTask)
val bgJobServiceDirectory = settingKey[File]("The directory for temporary files used by background jobs.")
val bgJobService = settingKey[BackgroundJobService]("Job manager used to run background jobs.")
val bgList = taskKey[Seq[JobHandle]]("List running background jobs.")
val ps = taskKey[Seq[JobHandle]]("bgList variant that displays on the log.")

View File

@ -126,15 +126,14 @@ object StandardMain {
def runManaged(s: State): xsbti.MainResult = {
val previous = TrapExit.installManager()
try {
val hook = ShutdownHooks.add(closeRunnable)
try {
val hook = ShutdownHooks.add(closeRunnable)
try {
MainLoop.runLogged(s)
} finally {
hook.close()
()
}
} finally DefaultBackgroundJobService.backgroundJobService.shutdown()
MainLoop.runLogged(s)
} finally {
try DefaultBackgroundJobService.shutdown()
finally hook.close()
()
}
} finally TrapExit.uninstallManager(previous)
}

View File

@ -12,7 +12,8 @@ import java.io.{ Closeable, File, FileInputStream, IOException }
import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.{ FileVisitResult, Files, Path, SimpleFileVisitor }
import java.security.{ DigestInputStream, MessageDigest }
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.{ AtomicLong, AtomicReference }
import sbt.Def.{ Classpath, ScopedKey, Setting }
import sbt.Scope.GlobalScope
@ -61,13 +62,16 @@ private[sbt] abstract class AbstractBackgroundJobService extends BackgroundJobSe
private val nextId = new AtomicLong(1)
private val pool = new BackgroundThreadPool()
private var serviceTempDirOpt: Option[File] = None
private def serviceTempDir = serviceTempDirOpt match {
case Some(dir) => dir
case _ =>
val dir = IO.createTemporaryDirectory
serviceTempDirOpt = Some(dir)
dir
private[sbt] def serviceTempDirBase: File
private val serviceTempDirRef = new AtomicReference[File]
private def serviceTempDir: File = serviceTempDirRef.synchronized {
serviceTempDirRef.get match {
case null =>
val dir = IO.createUniqueDirectory(serviceTempDirBase)
serviceTempDirRef.set(dir)
dir
case s => s
}
}
// hooks for sending start/stop events
protected def onAddJob(@deprecated("unused", "") job: JobHandle): Unit = ()
@ -161,12 +165,12 @@ private[sbt] abstract class AbstractBackgroundJobService extends BackgroundJobSe
jobSet.headOption.foreach {
case handle: ThreadJobHandle @unchecked =>
handle.job.shutdown()
handle.job.awaitTermination()
handle.job.awaitTerminationTry()
case _ => //
}
}
pool.close()
serviceTempDirOpt foreach IO.delete
Option(serviceTempDirRef.get).foreach(IO.delete)
}
private def withHandle(job: JobHandle)(f: ThreadJobHandle => Unit): Unit = job match {
@ -178,24 +182,9 @@ private[sbt] abstract class AbstractBackgroundJobService extends BackgroundJobSe
)
}
private def withHandleTry(job: JobHandle)(f: ThreadJobHandle => Try[Unit]): Try[Unit] =
job match {
case handle: ThreadJobHandle @unchecked => f(handle)
case _: DeadHandle @unchecked => Try(()) // nothing to stop or wait for
case other =>
Try(
sys.error(
s"BackgroundJobHandle does not originate with the current BackgroundJobService: $other"
)
)
}
override def stop(job: JobHandle): Unit =
withHandle(job)(_.job.shutdown())
override def waitForTry(job: JobHandle): Try[Unit] =
withHandleTry(job)(_.job.awaitTerminationTry())
override def waitFor(job: JobHandle): Unit =
withHandle(job)(_.job.awaitTermination())
@ -387,7 +376,7 @@ private[sbt] class BackgroundThreadPool extends java.io.Closeable {
list
}
listeners.foreach { l =>
l.executionContext.execute(new Runnable { override def run = l.callback() })
l.executionContext.execute(() => l.callback())
}
}
}
@ -398,11 +387,9 @@ private[sbt] class BackgroundThreadPool extends java.io.Closeable {
stopListeners += result
result
}
override def awaitTermination(): Unit = finishedLatch.await()
override def awaitTerminationTry(): Try[Unit] = {
awaitTermination()
exitTry.getOrElse(Try(()))
override def awaitTermination(): Unit = {
finishedLatch.await()
exitTry.foreach(_.fold(e => throw e, identity))
}
override def humanReadableName: String = taskName
@ -482,14 +469,37 @@ private[sbt] class BackgroundThreadPool extends java.io.Closeable {
}
}
private[sbt] class DefaultBackgroundJobService extends AbstractBackgroundJobService {
private[sbt] class DefaultBackgroundJobService(private[sbt] val serviceTempDirBase: File)
extends AbstractBackgroundJobService {
@deprecated("Use the constructor that specifies the background job temporary directory", "1.4.0")
def this() = this(IO.createTemporaryDirectory)
override def makeContext(id: Long, spawningTask: ScopedKey[_], state: State): ManagedLogger = {
val extracted = Project.extract(state)
LogManager.constructBackgroundLog(extracted.structure.data, state)(spawningTask)
}
}
private[sbt] object DefaultBackgroundJobService {
lazy val backgroundJobService: DefaultBackgroundJobService = new DefaultBackgroundJobService
lazy val backgroundJobServiceSetting: Setting[_] =
((Keys.bgJobService in GlobalScope) :== backgroundJobService)
private[this] val backgroundJobServices = new ConcurrentHashMap[File, DefaultBackgroundJobService]
private[sbt] def shutdown(): Unit = {
backgroundJobServices.values.forEach(_.shutdown())
backgroundJobServices.clear()
}
private[sbt] lazy val backgroundJobServiceSetting: Setting[_] =
(Keys.bgJobService in GlobalScope) := {
val path = (sbt.Keys.bgJobServiceDirectory in GlobalScope).value
val newService = new DefaultBackgroundJobService(path)
backgroundJobServices.putIfAbsent(path, newService) match {
case null => newService
case s =>
newService.shutdown()
s
}
}
private[sbt] lazy val backgroundJobServiceSettings: Seq[Def.Setting[_]] = Def.settings(
Keys.bgJobServiceDirectory in GlobalScope := {
sbt.Keys.appConfiguration.value.baseDirectory / "target" / "bg-jobs"
},
backgroundJobServiceSetting
)
}

View File

@ -130,7 +130,6 @@ private[sbt] object Load {
def injectGlobal(state: State): Seq[Setting[_]] =
(appConfiguration in GlobalScope :== state.configuration) +:
LogManager.settingsLogger(state) +:
DefaultBackgroundJobService.backgroundJobServiceSetting +:
EvaluateTask.injectSettings
def defaultWithGlobal(