Implement bgRun based on sbt-core-next

This commit is contained in:
Eugene Yokota 2017-01-20 06:08:24 -05:00
parent bf12a995e4
commit 74cfbd4a9c
10 changed files with 534 additions and 89 deletions

View File

@ -8,19 +8,20 @@ import sbt.internal.inc.AnalyzingCompiler
import sbt.util.Logger
import xsbti.compile.{ Inputs, Compilers }
import scala.util.Try
final class Console(compiler: AnalyzingCompiler) {
/** Starts an interactive scala interpreter session with the given classpath.*/
def apply(classpath: Seq[File], log: Logger): Option[String] =
def apply(classpath: Seq[File], log: Logger): Try[Unit] =
apply(classpath, Nil, "", "", log)
def apply(classpath: Seq[File], options: Seq[String], initialCommands: String, cleanupCommands: String, log: Logger): Option[String] =
def apply(classpath: Seq[File], options: Seq[String], initialCommands: String, cleanupCommands: String, log: Logger): Try[Unit] =
apply(classpath, options, initialCommands, cleanupCommands)(None, Nil)(log)
def apply(classpath: Seq[File], options: Seq[String], loader: ClassLoader, initialCommands: String, cleanupCommands: String)(bindings: (String, Any)*)(implicit log: Logger): Option[String] =
def apply(classpath: Seq[File], options: Seq[String], loader: ClassLoader, initialCommands: String, cleanupCommands: String)(bindings: (String, Any)*)(implicit log: Logger): Try[Unit] =
apply(classpath, options, initialCommands, cleanupCommands)(Some(loader), bindings)
def apply(classpath: Seq[File], options: Seq[String], initialCommands: String, cleanupCommands: String)(loader: Option[ClassLoader], bindings: Seq[(String, Any)])(implicit log: Logger): Option[String] =
def apply(classpath: Seq[File], options: Seq[String], initialCommands: String, cleanupCommands: String)(loader: Option[ClassLoader], bindings: Seq[(String, Any)])(implicit log: Logger): Try[Unit] =
{
def console0() = compiler.console(classpath, options, initialCommands, cleanupCommands, log)(loader, bindings)
// TODO: Fix JLine

View File

@ -0,0 +1,38 @@
package sbt
import java.io.Closeable
import sbt.util.Logger
import Def.ScopedKey
import sbt.internal.util.complete._
abstract class BackgroundJobService extends Closeable {
/**
* Launch a background job which is a function that runs inside another thread;
* killing the job will interrupt() the thread. If your thread blocks on a process,
* then you should get an InterruptedException while blocking on the process, and
* then you could process.destroy() for example.
*/
def runInBackground(spawningTask: ScopedKey[_], state: State)(start: (Logger) => Unit): JobHandle
def close: Unit = ()
def jobs: Vector[JobHandle]
def stop(job: JobHandle): Unit
def waitFor(job: JobHandle): Unit
}
object BackgroundJobService {
def jobIdParser: (State, Seq[JobHandle]) => Parser[Seq[JobHandle]] = {
import DefaultParsers._
(state, handles) => {
val stringIdParser: Parser[Seq[String]] = Space ~> token(NotSpace examples handles.map(_.id.toString).toSet, description = "<job id>").+
stringIdParser.map { strings =>
strings.map(Integer.parseInt(_)).flatMap(id => handles.find(_.id == id))
}
}
}
}
abstract class JobHandle {
def id: Long
def humanReadableName: String
def spawningTask: ScopedKey[_]
}

View File

@ -0,0 +1,20 @@
package sbt
import sbt.internal.DslEntry
import sbt.librarymanagement.Configuration
import sbt.util.Eval
private[sbt] trait BuildSyntax {
import language.experimental.macros
def settingKey[T](description: String): SettingKey[T] = macro std.KeyMacro.settingKeyImpl[T]
def taskKey[T](description: String): TaskKey[T] = macro std.KeyMacro.taskKeyImpl[T]
def inputKey[T](description: String): InputKey[T] = macro std.KeyMacro.inputKeyImpl[T]
def enablePlugins(ps: AutoPlugin*): DslEntry = DslEntry.DslEnablePlugins(ps)
def disablePlugins(ps: AutoPlugin*): DslEntry = DslEntry.DslDisablePlugins(ps)
def configs(cs: Configuration*): DslEntry = DslEntry.DslConfigs(cs)
def dependsOn(deps: Eval[ClasspathDep[ProjectReference]]*): DslEntry = DslEntry.DslDependsOn(deps)
// avoid conflict with `sbt.Keys.aggregate`
def aggregateProjects(refs: Eval[ProjectReference]*): DslEntry = DslEntry.DslAggregate(refs)
}
private[sbt] object BuildSyntax extends BuildSyntax

View File

@ -26,7 +26,6 @@ import sbt.internal.librarymanagement._
import sbt.internal.librarymanagement.syntax._
import sbt.internal.util._
import sbt.util.{ Level, Logger }
import sys.error
import scala.xml.NodeSeq
import scala.util.control.NonFatal
@ -134,7 +133,13 @@ object Defaults extends BuildCommon {
includeFilter in unmanagedSources :== ("*.java" | "*.scala") && new SimpleFileFilter(_.isFile),
includeFilter in unmanagedJars :== "*.jar" | "*.so" | "*.dll" | "*.jnilib" | "*.zip",
includeFilter in unmanagedResources :== AllPassFilter,
fileToStore :== DefaultFileToStore
fileToStore :== DefaultFileToStore,
bgJobService := { new DefaultBackgroundJobService() },
bgList := { bgJobService.value.jobs },
ps := psTask.value,
bgStop := bgStopTask.evaluated,
bgWaitFor := bgWaitForTask.evaluated,
onUnload := { s => try onUnload.value(s) finally bgJobService.value.close() }
)
private[sbt] lazy val globalIvyCore: Seq[Setting[_]] =
@ -325,7 +330,7 @@ object Defaults extends BuildCommon {
}
}
lazy val configTasks = docTaskSettings(doc) ++ inTask(compile)(compileInputsSettings) ++ configGlobal ++ compileAnalysisSettings ++ Seq(
lazy val configTasks: Seq[Setting[_]] = docTaskSettings(doc) ++ inTask(compile)(compileInputsSettings) ++ configGlobal ++ compileAnalysisSettings ++ Seq(
compile := compileTask.value,
manipulateBytecode := compileIncremental.value,
compileIncremental := (compileIncrementalTask tag (Tags.Compile, Tags.CPU)).value,
@ -343,14 +348,16 @@ object Defaults extends BuildCommon {
consoleQuick := consoleQuickTask.value,
discoveredMainClasses := (compile map discoverMainClasses storeAs discoveredMainClasses xtriggeredBy compile).value,
discoveredSbtPlugins := discoverSbtPluginNames.value,
inTask(run)(runnerTask :: Nil).head,
selectMainClass := mainClass.value orElse askForMainClass(discoveredMainClasses.value),
mainClass in run := (selectMainClass in run).value,
mainClass := pickMainClassOrWarn(discoveredMainClasses.value, streams.value.log),
run := runTask(fullClasspath, mainClass in run, runner in run).evaluated,
runMain := runMainTask(fullClasspath, runner in run).evaluated,
copyResources := copyResourcesTask.value
)
run := runTask(fullClasspath, mainClass in run, runner in run).evaluated,
copyResources := copyResourcesTask.value,
// note that we use the same runner and mainClass as plain run
bgRunMain := bgRunMainTask(fullClasspath, runner in run).evaluated,
bgRun := bgRunTask(fullClasspath, mainClass in run, runner in run).evaluated
) ++ inTask(run)(runnerSettings)
private[this] lazy val configGlobal = globalDefaults(Seq(
initialCommands :== "",
@ -531,11 +538,18 @@ object Defaults extends BuildCommon {
val opts = forkOptions.value
Seq(new Tests.Group("<default>", tests, if (fk) Tests.SubProcess(opts) else Tests.InProcess))
}
private[this] def forkOptions: Initialize[Task[ForkOptions]] =
(baseDirectory, javaOptions, outputStrategy, envVars, javaHome, connectInput) map {
(base, options, strategy, env, javaHomeDir, connectIn) =>
def forkOptionsTask: Initialize[Task[ForkOptions]] =
Def.task {
ForkOptions(
// bootJars is empty by default because only jars on the user's classpath should be on the boot classpath
ForkOptions(bootJars = Nil, javaHome = javaHomeDir, connectInput = connectIn, outputStrategy = strategy, runJVMOptions = options, workingDirectory = Some(base), envVars = env)
bootJars = Nil,
javaHome = javaHome.value,
connectInput = connectInput.value,
outputStrategy = outputStrategy.value,
runJVMOptions = javaOptions.value,
workingDirectory = Some(baseDirectory.value),
envVars = envVars.value
)
}
def testExecutionTask(task: Scoped): Initialize[Task[Tests.Execution]] =
@ -781,10 +795,6 @@ object Defaults extends BuildCommon {
new Package.Configuration(srcs, path, options)
}
@deprecated("use Defaults.askForMainClass", "0.13.7")
def selectRunMain(classes: Seq[String]): Option[String] = askForMainClass(classes)
@deprecated("use Defaults.pickMainClass", "0.13.7")
def selectPackageMain(classes: Seq[String]): Option[String] = pickMainClass(classes)
def askForMainClass(classes: Seq[String]): Option[String] =
sbt.SelectMainClass(Some(SimpleReader readLine _), classes)
def pickMainClass(classes: Seq[String]): Option[String] =
@ -806,35 +816,71 @@ object Defaults extends BuildCommon {
IO.createDirectories(dirs) // recreate empty directories
IO.move(mappings.map(_.swap))
}
def bgRunMainTask(classpath: Initialize[Task[Classpath]], scalaRun: Initialize[Task[ScalaRun]]): Initialize[InputTask[JobHandle]] =
{
val parser = Defaults.loadForParser(discoveredMainClasses)((s, names) => Defaults.runMainParser(s, names getOrElse Nil))
Def.inputTask {
val (mainClass, args) = parser.parsed
bgJobService.value.runInBackground(resolvedScoped.value, state.value) { (logger) =>
scalaRun.value.run(mainClass, data(classpath.value), args, logger).get
}
}
}
def bgRunTask(classpath: Initialize[Task[Classpath]], mainClassTask: Initialize[Task[Option[String]]], scalaRun: Initialize[Task[ScalaRun]]): Initialize[InputTask[JobHandle]] =
{
import Def.parserToInput
val parser = Def.spaceDelimited()
Def.inputTask {
val mainClass = mainClassTask.value getOrElse sys.error("No main class detected.")
bgJobService.value.runInBackground(resolvedScoped.value, state.value) { (logger) =>
// TODO - Copy the classpath into some tmp directory so we don't immediately die if a recompile happens.
scalaRun.value.run(mainClass, data(classpath.value), parser.parsed, logger).get
}
}
}
// runMain calls bgRunMain in the background and waits for the result.
def foregroundRunMainTask: Initialize[InputTask[Unit]] =
Def.inputTask {
val handle = bgRunMain.evaluated
val service = bgJobService.value
service.waitFor(handle)
}
// run calls bgRun in the background and waits for the result.
def foregroundRunTask: Initialize[InputTask[Unit]] =
Def.inputTask {
val handle = bgRun.evaluated
val service = bgJobService.value
service.waitFor(handle)
}
def runMainTask(classpath: Initialize[Task[Classpath]], scalaRun: Initialize[Task[ScalaRun]]): Initialize[InputTask[Unit]] =
{
val parser = loadForParser(discoveredMainClasses)((s, names) => runMainParser(s, names getOrElse Nil))
Def.inputTask {
val (mainClass, args) = parser.parsed
toError(scalaRun.value.run(mainClass, data(classpath.value), args, streams.value.log))
scalaRun.value.run(mainClass, data(classpath.value), args, streams.value.log).get
}
}
def runTask(classpath: Initialize[Task[Classpath]], mainClassTask: Initialize[Task[Option[String]]], scalaRun: Initialize[Task[ScalaRun]]): Initialize[InputTask[Unit]] =
{
import Def.parserToInput
val parser = Def.spaceDelimited()
Def.inputTask {
val mainClass = mainClassTask.value getOrElse sys.error("No main class detected.")
toError(scalaRun.value.run(mainClass, data(classpath.value), parser.parsed, streams.value.log))
scalaRun.value.run(mainClass, data(classpath.value), parser.parsed, streams.value.log).get
}
}
def runnerTask = runner := runnerInit.value
def runnerTask: Initialize[Task[ScalaRun]] = runnerInit
def runnerInit: Initialize[Task[ScalaRun]] = Def.task {
val tmp = taskTemporaryDirectory.value
val resolvedScope = resolvedScoped.value.scope
val si = scalaInstance.value
val s = streams.value
val opts = forkOptions.value
val options = javaOptions.value
if (fork.value) {
s.log.debug(s"javaOptions: $options")
new ForkRun(forkOptions.value)
new ForkRun(opts)
} else {
if (options.nonEmpty) {
val mask = ScopeMask(project = false)
@ -846,6 +892,31 @@ object Defaults extends BuildCommon {
}
}
private def foreachJobTask(f: (BackgroundJobService, JobHandle) => Unit): Initialize[InputTask[Unit]] = {
val parser: Initialize[State => Parser[Seq[JobHandle]]] = Def.setting { (s: State) =>
val extracted = Project.extract(s)
val service = extracted.get(bgJobService)
// you might be tempted to use the jobList task here, but the problem
// is that its result gets cached during execution and therefore stale
BackgroundJobService.jobIdParser(s, service.jobs)
}
Def.inputTask {
val handles = parser.parsed
for (handle <- handles) {
f(bgJobService.value, handle)
}
}
}
def psTask: Initialize[Task[Vector[JobHandle]]] =
Def.task {
val xs = bgList.value
val s = streams.value
xs foreach { x => s.log.info(x.toString) }
xs
}
def bgStopTask: Initialize[InputTask[Unit]] = foreachJobTask { (manager, handle) => manager.stop(handle) }
def bgWaitForTask: Initialize[InputTask[Unit]] = foreachJobTask { (manager, handle) => manager.waitFor(handle) }
@deprecated("Use `docTaskSettings` instead", "0.12.0")
def docSetting(key: TaskKey[File]) = docTaskSettings(key)
def docTaskSettings(key: TaskKey[File] = doc): Seq[Setting[_]] = inTask(key)(Seq(
@ -882,8 +953,8 @@ object Defaults extends BuildCommon {
}
))
def mainRunTask = run := runTask(fullClasspath in Runtime, mainClass in run, runner in run).evaluated
def mainRunMainTask = runMain := runMainTask(fullClasspath in Runtime, runner in run).evaluated
def mainBgRunTask = bgRun := bgRunTask(fullClasspath in Runtime, mainClass in run, runner in run).evaluated
def mainBgRunMainTask = bgRunMain := bgRunMainTask(fullClasspath in Runtime, runner in run).evaluated
def discoverMainClasses(analysis: CompileAnalysis): Seq[String] =
Discovery.applications(Tests.allDefs(analysis)).collect({ case (definition, discovered) if discovered.hasMain => definition.name }).sorted
@ -901,7 +972,7 @@ object Defaults extends BuildCommon {
cs.scalac match {
case ac: AnalyzingCompiler => ac.onArgs(exported(s, "scala"))
}
(new Console(compiler))(cpFiles, options, loader, initCommands, cleanup)()(s.log).foreach(msg => sys.error(msg))
(new Console(compiler))(cpFiles, options, loader, initCommands, cleanup)()(s.log).get
println()
}
@ -1086,15 +1157,21 @@ object Defaults extends BuildCommon {
val CompletionsID = "completions"
def noAggregation: Seq[Scoped] = Seq(run, runMain, console, consoleQuick, consoleProject)
def noAggregation: Seq[Scoped] = Seq(run, runMain, bgRun, bgRunMain, console, consoleQuick, consoleProject)
lazy val disableAggregation = Defaults.globalDefaults(noAggregation map disableAggregate)
def disableAggregate(k: Scoped) = aggregate in k :== false
lazy val runnerSettings: Seq[Setting[_]] = Seq(runnerTask)
// 1. runnerSettings is added unscoped via JvmPlugin.
// 2. In addition it's added scoped to run task.
lazy val runnerSettings: Seq[Setting[_]] =
Seq(
runner := runnerInit.value,
forkOptions := forkOptionsTask.value
)
lazy val baseTasks: Seq[Setting[_]] = projectTasks ++ packageBase
lazy val configSettings: Seq[Setting[_]] = Classpaths.configSettings ++ configTasks ++ configPaths ++ packageConfig ++ Classpaths.compilerPluginConfig ++ deprecationSettings
lazy val compileSettings: Seq[Setting[_]] = configSettings ++ (mainRunMainTask +: mainRunTask +: addBaseSources) ++ Classpaths.addUnmanagedLibrary
lazy val compileSettings: Seq[Setting[_]] = configSettings ++ (mainBgRunMainTask +: mainBgRunTask +: addBaseSources) ++ Classpaths.addUnmanagedLibrary
lazy val testSettings: Seq[Setting[_]] = configSettings ++ testTasks
lazy val itSettings: Seq[Setting[_]] = inConfig(IntegrationTest)(testSettings)
@ -2255,11 +2332,14 @@ trait BuildExtra extends BuildCommon with DefExtra {
val r = (runner in (config, run)).value
val cp = (fullClasspath in config).value
val args = spaceDelimited().parsed
toError(r.run(mainClass, data(cp), baseArguments ++ args, streams.value.log))
r.run(mainClass, data(cp), baseArguments ++ args, streams.value.log).get
}
def runTask(config: Configuration, mainClass: String, arguments: String*): Initialize[Task[Unit]] =
(fullClasspath in config, runner in (config, run), streams) map { (cp, r, s) =>
toError(r.run(mainClass, data(cp), arguments, s.log))
Def.task {
val cp = (fullClasspath in config).value
val r = (runner in (config, run)).value
val s = streams.value
r.run(mainClass, data(cp), arguments, s.log).get
}
def fullRunInputTask(scoped: InputKey[Unit], config: Configuration, mainClass: String, baseArguments: String*): Setting[InputTask[Unit]] =
@ -2267,7 +2347,7 @@ trait BuildExtra extends BuildCommon with DefExtra {
(initScoped(scoped.scopedKey, runnerInit) zipWith (fullClasspath in config, streams, result).identityMap) { (rTask, t) =>
(t, rTask) map {
case ((cp, s, args), r) =>
toError(r.run(mainClass, data(cp), baseArguments ++ args, s.log))
r.run(mainClass, data(cp), baseArguments ++ args, s.log).get
}
}
}).evaluated
@ -2276,7 +2356,7 @@ trait BuildExtra extends BuildCommon with DefExtra {
case (rTask, t) =>
(t, rTask) map {
case ((cp, s), r) =>
toError(r.run(mainClass, data(cp), arguments, s.log))
r.run(mainClass, data(cp), arguments, s.log).get
}
}).value
def initScoped[T](sk: ScopedKey[_], i: Initialize[T]): Initialize[T] = initScope(fillTaskAxis(sk.scope, sk.key), i)
@ -2323,7 +2403,6 @@ trait BuildCommon {
/** Converts the `Seq[File]` to a Classpath, which is an alias for `Seq[Attributed[File]]`. */
def classpath: Classpath = Attributed blankSeq s
}
def toError(o: Option[String]): Unit = o foreach error
def overrideConfigs(cs: Configuration*)(configurations: Seq[Configuration]): Seq[Configuration] =
{

View File

@ -74,6 +74,7 @@ import sbt.internal.librarymanagement.{
}
import sbt.util.{ AbstractLogger, Level, Logger }
import org.apache.logging.log4j.core.Appender
import sbt.BuildSyntax._
object Keys {
val TraceValues = "-1 to disable, 0 for up to the first sbt frame, or a positive number to set the maximum number of frames shown."
@ -238,12 +239,21 @@ object Keys {
val trapExit = SettingKey[Boolean]("trap-exit", "If true, enables exit trapping and thread management for 'run'-like tasks. This is currently only suitable for serially-executed 'run'-like tasks.", CSetting)
val fork = SettingKey[Boolean]("fork", "If true, forks a new JVM when running. If false, runs in the same JVM as the build.", ASetting)
val forkOptions = TaskKey[ForkOptions]("fork-option", "Configures JVM forking.", DSetting)
val outputStrategy = SettingKey[Option[sbt.OutputStrategy]]("output-strategy", "Selects how to log output when running a main class.", DSetting)
val connectInput = SettingKey[Boolean]("connect-input", "If true, connects standard input when running a main class forked.", CSetting)
val javaHome = SettingKey[Option[File]]("java-home", "Selects the Java installation used for compiling and forking. If None, uses the Java installation running the build.", ASetting)
val javaOptions = TaskKey[Seq[String]]("java-options", "Options passed to a new JVM when forking.", BPlusTask)
val envVars = TaskKey[Map[String, String]]("envVars", "Environment variables used when forking a new JVM", BTask)
val bgJobService = settingKey[BackgroundJobService]("Job manager used to run background jobs.")
val bgList = taskKey[Vector[JobHandle]]("List running background jobs.")
val ps = taskKey[Vector[JobHandle]]("bgList variant that displays on the log.")
val bgStop = inputKey[Unit]("Stop a background job by providing its ID.")
val bgWaitFor = inputKey[Unit]("Wait for a background job to finish by providing its ID.")
val bgRun = inputKey[JobHandle]("Start an application's default main class as a background job")
val bgRunMain = inputKey[JobHandle]("Start a provided main class as a background job")
// Test Keys
val testLoader = TaskKey[ClassLoader]("test-loader", "Provides the class loader used for testing.", DTask)
val loadedTestFrameworks = TaskKey[Map[TestFramework, Framework]]("loaded-test-frameworks", "Loads Framework definitions from the test loader.", DTask)

View File

@ -214,7 +214,9 @@ object BuildStreams {
def mkStreams(units: Map[URI, LoadedBuildUnit], root: URI, data: Settings[Scope]): State => Streams = s => {
implicit val isoString: sjsonnew.IsoString[scala.json.ast.unsafe.JValue] = sjsonnew.IsoString.iso(sjsonnew.support.scalajson.unsafe.CompactPrinter.apply, sjsonnew.support.scalajson.unsafe.Parser.parseUnsafe)
s get Keys.stateStreams getOrElse std.Streams(path(units, root, data), displayFull, LogManager.construct(data, s), sjsonnew.support.scalajson.unsafe.Converter)
(s get Keys.stateStreams) getOrElse {
std.Streams(path(units, root, data), displayFull, LogManager.construct(data, s), sjsonnew.support.scalajson.unsafe.Converter)
}
}
def path(units: Map[URI, LoadedBuildUnit], root: URI, data: Settings[Scope])(scoped: ScopedKey[_]): File =

View File

@ -0,0 +1,268 @@
package sbt
package internal
import java.util.concurrent.atomic.{ AtomicLong, AtomicInteger }
import java.io.Closeable
import sbt.util.{ Logger, LogExchange, Level }
import sbt.internal.util.MainAppender
import Def.ScopedKey
import scala.concurrent.ExecutionContext
/**
* Interface between sbt and a thing running in the background.
*/
private[sbt] abstract class BackgroundJob {
def humanReadableName: String
def awaitTermination(): Unit
def shutdown(): Unit
// this should be true on construction and stay true until
// the job is complete
def isRunning(): Boolean
// called after stop or on spontaneous exit, closing the result
// removes the listener
def onStop(listener: () => Unit)(implicit ex: ExecutionContext): Closeable
// do we need this or is the spawning task good enough?
// def tags: SomeType
}
private[sbt] abstract class AbstractJobHandle extends JobHandle {
override def toString = s"JobHandle(${id}, ${humanReadableName}, ${Def.showFullKey(spawningTask)})"
}
private[sbt] abstract class AbstractBackgroundJobService extends BackgroundJobService {
private val nextId = new AtomicLong(1)
private val pool = new BackgroundThreadPool()
// hooks for sending start/stop events
protected def onAddJob(job: JobHandle): Unit = {}
protected def onRemoveJob(job: JobHandle): Unit = {}
// this mutable state could conceptually go on State except
// that then every task that runs a background job would have
// to be a command, so not sure what to do here.
@volatile
private final var jobSet = Set.empty[ThreadJobHandle]
private def addJob(job: ThreadJobHandle): Unit = synchronized {
onAddJob(job)
jobSet += job
}
private def removeJob(job: ThreadJobHandle): Unit = synchronized {
onRemoveJob(job)
jobSet -= job
}
override def jobs: Vector[ThreadJobHandle] = jobSet.toVector
final class ThreadJobHandle(
override val id: Long, override val spawningTask: ScopedKey[_],
val logger: Logger, val job: BackgroundJob
) extends AbstractJobHandle {
def humanReadableName: String = job.humanReadableName
// EC for onStop handler below
import ExecutionContext.Implicits.global
job.onStop { () =>
// TODO: Fix this
// logger.close()
removeJob(this)
}
addJob(this)
override final def equals(other: Any): Boolean = other match {
case handle: JobHandle if handle.id == id => true
case _ => false
}
override final def hashCode(): Int = id.hashCode
}
private val unknownTask = TaskKey[Unit]("unknownTask", "Dummy value")
// we use this if we deserialize a handle for a job that no longer exists
private final class DeadHandle(override val id: Long, override val humanReadableName: String)
extends AbstractJobHandle {
override val spawningTask: ScopedKey[_] = unknownTask
}
protected def makeContext(id: Long, spawningTask: ScopedKey[_], state: State): Logger
def doRunInBackground(spawningTask: ScopedKey[_], state: State, start: (Logger) => BackgroundJob): JobHandle = {
val id = nextId.getAndIncrement()
val logger = makeContext(id, spawningTask, state)
val job = try new ThreadJobHandle(id, spawningTask, logger, start(logger))
catch {
case e: Throwable =>
// TODO: Fix this
// logger.close()
throw e
}
job
}
override def runInBackground(spawningTask: ScopedKey[_], state: State)(start: (Logger) => Unit): JobHandle = {
pool.run(this, spawningTask, state)(start)
}
override def close(): Unit = {
while (jobSet.nonEmpty) {
jobSet.headOption.foreach {
case handle: ThreadJobHandle @unchecked =>
handle.job.shutdown()
handle.job.awaitTermination()
case _ => //
}
}
pool.close()
}
private def withHandle(job: JobHandle)(f: ThreadJobHandle => Unit): Unit = job match {
case handle: ThreadJobHandle @unchecked => f(handle)
case dead: DeadHandle @unchecked => () // nothing to stop or wait for
case other => sys.error(s"BackgroundJobHandle does not originate with the current BackgroundJobService: $other")
}
override def stop(job: JobHandle): Unit =
withHandle(job)(_.job.shutdown())
override def waitFor(job: JobHandle): Unit =
withHandle(job)(_.job.awaitTermination())
override def toString(): String = s"BackgroundJobService(jobs=${jobs.map(_.id).mkString})"
}
private[sbt] object BackgroundThreadPool {
sealed trait Status
case object Waiting extends Status
final case class Running(thread: Thread) extends Status
// the oldThread is None if we never ran
final case class Stopped(oldThread: Option[Thread]) extends Status
}
private[sbt] class BackgroundThreadPool extends java.io.Closeable {
private val nextThreadId = new java.util.concurrent.atomic.AtomicInteger(1)
private val threadGroup = Thread.currentThread.getThreadGroup()
private val threadFactory = new java.util.concurrent.ThreadFactory() {
override def newThread(runnable: Runnable): Thread = {
val thread = new Thread(threadGroup, runnable, s"sbt-bg-threads-${nextThreadId.getAndIncrement}")
// Do NOT setDaemon because then the code in TaskExit.scala in sbt will insta-kill
// the backgrounded process, at least for the case of the run task.
thread
}
}
private val executor = new java.util.concurrent.ThreadPoolExecutor(
0, /* corePoolSize */
32, /* maxPoolSize, max # of bg tasks */
2, java.util.concurrent.TimeUnit.SECONDS, /* keep alive unused threads this long (if corePoolSize < maxPoolSize) */
new java.util.concurrent.SynchronousQueue[Runnable](),
threadFactory
)
private class BackgroundRunnable(val taskName: String, body: () => Unit)
extends BackgroundJob with Runnable {
import BackgroundThreadPool._
private val finishedLatch = new java.util.concurrent.CountDownLatch(1)
// synchronize to read/write this, no sync to just read
@volatile
private var status: Status = Waiting
// double-finally for extra paranoia that we will finishedLatch.countDown
override def run() = try {
val go = synchronized {
status match {
case Waiting =>
status = Running(Thread.currentThread())
true
case Stopped(_) =>
false
case Running(_) =>
throw new RuntimeException("Impossible status of bg thread")
}
}
try { if (go) body() }
finally cleanup()
} finally finishedLatch.countDown()
private class StopListener(val callback: () => Unit, val executionContext: ExecutionContext) extends Closeable {
override def close(): Unit = removeListener(this)
override def hashCode: Int = System.identityHashCode(this)
override def equals(other: Any): Boolean = other match {
case r: AnyRef => this eq r
case _ => false
}
}
// access is synchronized
private var stopListeners = Set.empty[StopListener]
private def removeListener(listener: StopListener): Unit = synchronized {
stopListeners -= listener
}
def cleanup(): Unit = {
// avoid holding any lock while invoking callbacks, and
// handle callbacks being added by other callbacks, just
// to be all fancy.
while (synchronized { stopListeners.nonEmpty }) {
val listeners = synchronized {
val list = stopListeners.toList
stopListeners = Set.empty
list
}
listeners.foreach { l =>
l.executionContext.execute(new Runnable { override def run = l.callback() })
}
}
}
override def onStop(listener: () => Unit)(implicit ex: ExecutionContext): Closeable =
synchronized {
val result = new StopListener(listener, ex)
stopListeners += result
result
}
override def awaitTermination(): Unit = finishedLatch.await()
override def humanReadableName: String = taskName
override def isRunning(): Boolean =
status match {
case Waiting => true // we start as running from BackgroundJob perspective
case Running(thread) => thread.isAlive()
case Stopped(threadOption) => threadOption.map(_.isAlive()).getOrElse(false)
}
override def shutdown(): Unit =
synchronized {
status match {
case Waiting =>
status = Stopped(None) // makes run() not run the body
case Running(thread) =>
status = Stopped(Some(thread))
thread.interrupt()
case Stopped(threadOption) =>
// try to interrupt again! woot!
threadOption.foreach(_.interrupt())
}
}
}
def run(manager: AbstractBackgroundJobService, spawningTask: ScopedKey[_], state: State)(work: (Logger) => Unit): JobHandle = {
def start(logger: Logger): BackgroundJob = {
val runnable = new BackgroundRunnable(spawningTask.key.label, { () =>
work(logger)
})
executor.execute(runnable)
runnable
}
manager.doRunInBackground(spawningTask, state, start _)
}
override def close(): Unit = {
executor.shutdown()
}
}
private[sbt] class DefaultBackgroundJobService extends AbstractBackgroundJobService {
private val generateId: AtomicInteger = new AtomicInteger
override def makeContext(id: Long, spawningTask: ScopedKey[_], state: State): Logger = {
val extracted = Project.extract(state)
LogManager.constructBackgroundLog(extracted.structure.data, state)(spawningTask)
}
}

View File

@ -16,6 +16,7 @@ import org.apache.logging.log4j.core.Appender
sealed abstract class LogManager {
def apply(data: Settings[Scope], state: State, task: ScopedKey[_], writer: PrintWriter): Logger
def backgroundLog(data: Settings[Scope], state: State, task: ScopedKey[_]): Logger
}
object LogManager {
@ -23,12 +24,18 @@ object LogManager {
private val generateId: AtomicInteger = new AtomicInteger
// This is called by mkStreams
def construct(data: Settings[Scope], state: State) = (task: ScopedKey[_], to: PrintWriter) =>
def construct(data: Settings[Scope], state: State): (ScopedKey[_], PrintWriter) => Logger = (task: ScopedKey[_], to: PrintWriter) =>
{
val manager = logManager in task.scope get data getOrElse defaultManager(state.globalLogging.console)
val manager: LogManager = (logManager in task.scope).get(data) getOrElse { defaultManager(state.globalLogging.console) }
manager(data, state, task, to)
}
def constructBackgroundLog(data: Settings[Scope], state: State): (ScopedKey[_]) => Logger = (task: ScopedKey[_]) =>
{
val manager: LogManager = (logManager in task.scope).get(data) getOrElse { defaultManager(state.globalLogging.console) }
manager.backgroundLog(data, state, task)
}
def defaultManager(console: ConsoleOut): LogManager = withLoggers((sk, s) => defaultScreen(console))
// This is called by Defaults.
@ -52,6 +59,9 @@ object LogManager {
) extends LogManager {
def apply(data: Settings[Scope], state: State, task: ScopedKey[_], to: PrintWriter): Logger =
defaultLogger(data, state, task, screen(task, state), backed(to), relay(()), extra(task).toList)
def backgroundLog(data: Settings[Scope], state: State, task: ScopedKey[_]): Logger =
LogManager.backgroundLog(data, state, task, screen(task, state), relay(()), extra(task).toList)
}
// This is the main function that is used to generate the logger for tasks.
@ -71,20 +81,22 @@ object LogManager {
val screenTrace = getOr(traceLevel.key, defaultTraceLevel(state))
val backingTrace = getOr(persistTraceLevel.key, Int.MaxValue)
val extraBacked = state.globalLogging.backed :: relay :: Nil
val consoleOpt =
execOpt match {
case Some(x: Exec) =>
x.source match {
// TODO: Fix this stringliness
case Some(x: CommandSource) if x.channelName == "console0" => Option(console)
case Some(x: CommandSource) => None
case _ => Option(console)
}
case _ => Option(console)
}
val consoleOpt = consoleLocally(state, console)
multiLogger(log, MainAppender.MainAppenderConfig(consoleOpt, backed,
extraBacked ::: extra, screenLevel, backingLevel, screenTrace, backingTrace))
}
// Return None if the exec is not from console origin.
def consoleLocally(state: State, console: Appender): Option[Appender] =
state.currentCommand match {
case Some(x: Exec) =>
x.source match {
// TODO: Fix this stringliness
case Some(x: CommandSource) if x.channelName == "console0" => Option(console)
case Some(x: CommandSource) => None
case _ => Option(console)
}
case _ => Option(console)
}
def defaultTraceLevel(state: State): Int =
if (state.interactive) -1 else Int.MaxValue
def suppressedMessage(key: ScopedKey[_], state: State): SuppressedTraceContext => Option[String] =
@ -99,6 +111,20 @@ object LogManager {
case _ => key // should never get here
}
def backgroundLog(data: Settings[Scope], state: State, task: ScopedKey[_],
console: Appender, /* TODO: backed: Appender,*/ relay: Appender, extra: List[Appender]): Logger =
{
val execOpt = state.currentCommand
val loggerName: String = s"bg-${task.key.label}-${generateId.incrementAndGet}"
val channelName: Option[String] = execOpt flatMap { e => e.source map { _.channelName } }
// val execId: Option[String] = execOpt flatMap { _.execId }
val log = LogExchange.logger(loggerName, channelName, None)
LogExchange.unbindLoggerAppenders(loggerName)
val consoleOpt = consoleLocally(state, console)
LogExchange.bindLoggerAppenders(loggerName, (consoleOpt.toList map { _ -> Level.Info }) ::: (relay -> Level.Debug) :: Nil)
log
}
// TODO: Fix this
// if global logging levels are not explicitly set, set them from project settings
// private[sbt] def setGlobalLogLevels(s: State, data: Settings[Scope]): State =

View File

@ -12,19 +12,17 @@ import sbt.internal.inc.ScalaInstance
import sbt.io.Path
import sbt.util.Logger
import scala.util.{ Try, Success, Failure }
import scala.util.control.NonFatal
import scala.sys.process.Process
trait ScalaRun {
def run(mainClass: String, classpath: Seq[File], options: Seq[String], log: Logger): Option[String]
sealed trait ScalaRun {
def run(mainClass: String, classpath: Seq[File], options: Seq[String], log: Logger): Try[Unit]
}
class ForkRun(config: ForkOptions) extends ScalaRun {
def run(mainClass: String, classpath: Seq[File], options: Seq[String], log: Logger): Option[String] =
def run(mainClass: String, classpath: Seq[File], options: Seq[String], log: Logger): Try[Unit] =
{
log.info("Running " + mainClass + " " + options.mkString(" "))
val scalaOptions = classpathOption(classpath) ::: mainClass :: options.toList
val configLogged = if (config.outputStrategy.isDefined) config else config.copy(outputStrategy = Some(LoggedOutput(log)))
// fork with Java because Scala introduces an extra class loader (#702)
val process = Fork.java.fork(configLogged, scalaOptions)
val process = fork(mainClass, classpath, options, log)
def cancel() = {
log.warn("Run canceled.")
process.destroy()
@ -33,27 +31,45 @@ class ForkRun(config: ForkOptions) extends ScalaRun {
val exitCode = try process.exitValue() catch { case e: InterruptedException => cancel() }
processExitCode(exitCode, "runner")
}
private def classpathOption(classpath: Seq[File]) = "-classpath" :: Path.makeString(classpath) :: Nil
private def processExitCode(exitCode: Int, label: String) =
def fork(mainClass: String, classpath: Seq[File], options: Seq[String], log: Logger): Process =
{
if (exitCode == 0)
None
else
Some("Nonzero exit code returned from " + label + ": " + exitCode)
log.info("Running " + mainClass + " " + options.mkString(" "))
val scalaOptions = classpathOption(classpath) ::: mainClass :: options.toList
val configLogged =
if (config.outputStrategy.isDefined) config
else config.copy(outputStrategy = Some(LoggedOutput(log)))
// fork with Java because Scala introduces an extra class loader (#702)
Fork.java.fork(configLogged, scalaOptions)
}
private def classpathOption(classpath: Seq[File]) = "-classpath" :: Path.makeString(classpath) :: Nil
private def processExitCode(exitCode: Int, label: String): Try[Unit] =
{
if (exitCode == 0) Success(())
else Failure(new RuntimeException("Nonzero exit code returned from " + label + ": " + exitCode))
}
}
class Run(instance: ScalaInstance, trapExit: Boolean, nativeTmp: File) extends ScalaRun {
/** Runs the class 'mainClass' using the given classpath and options using the scala runner.*/
def run(mainClass: String, classpath: Seq[File], options: Seq[String], log: Logger) =
def run(mainClass: String, classpath: Seq[File], options: Seq[String], log: Logger): Try[Unit] =
{
log.info("Running " + mainClass + " " + options.mkString(" "))
def execute() =
try { run0(mainClass, classpath, options, log) }
catch { case e: java.lang.reflect.InvocationTargetException => throw e.getCause }
def directExecute() = try { execute(); None } catch { case e: Exception => log.trace(e); Some(e.toString) }
def directExecute(): Try[Unit] =
Try(execute()) recover {
case NonFatal(e) =>
// bgStop should not print out stack trace
// log.trace(e)
throw e
}
// try { execute(); None } catch { case e: Exception => log.trace(e); Some(e.toString) }
if (trapExit) Run.executeTrapExit(execute(), log) else directExecute()
if (trapExit) Run.executeTrapExit(execute(), log)
else directExecute()
}
private def run0(mainClassName: String, classpath: Seq[File], options: Seq[String], log: Logger): Unit = {
log.debug(" Classpath:\n\t" + classpath.mkString("\n\t"))
@ -88,13 +104,12 @@ object Run {
runner.run(mainClass, classpath, options, log)
/** Executes the given function, trapping calls to System.exit. */
def executeTrapExit(f: => Unit, log: Logger): Option[String] =
def executeTrapExit(f: => Unit, log: Logger): Try[Unit] =
{
val exitCode = TrapExit(f, log)
if (exitCode == 0) {
log.debug("Exited with code 0")
None
} else
Some("Nonzero exit code: " + exitCode)
Success(())
} else Failure(new RuntimeException("Nonzero exit code: " + exitCode))
}
}

View File

@ -1,14 +1,12 @@
package sbt
import sbt.internal.DslEntry
import sbt.util.Eval
object syntax extends syntax
abstract class syntax extends IOSyntax0 with sbt.std.TaskExtra with sbt.internal.util.Types with sbt.ProcessExtra
with sbt.internal.librarymanagement.impl.DependencyBuilders with sbt.ProjectExtra
with sbt.internal.librarymanagement.DependencyFilterExtra with sbt.BuildExtra with sbt.TaskMacroExtra
with sbt.ScopeFilter.Make {
with sbt.ScopeFilter.Make
with sbt.BuildSyntax {
// IO
def uri(s: String): URI = new URI(s)
@ -43,18 +41,6 @@ abstract class syntax extends IOSyntax0 with sbt.std.TaskExtra with sbt.internal
// final val System = C.System
final val Optional = C.Optional
def config(s: String): Configuration = C.config(s)
import language.experimental.macros
def settingKey[T](description: String): SettingKey[T] = macro std.KeyMacro.settingKeyImpl[T]
def taskKey[T](description: String): TaskKey[T] = macro std.KeyMacro.taskKeyImpl[T]
def inputKey[T](description: String): InputKey[T] = macro std.KeyMacro.inputKeyImpl[T]
def enablePlugins(ps: AutoPlugin*): DslEntry = DslEntry.DslEnablePlugins(ps)
def disablePlugins(ps: AutoPlugin*): DslEntry = DslEntry.DslDisablePlugins(ps)
def configs(cs: Configuration*): DslEntry = DslEntry.DslConfigs(cs)
def dependsOn(deps: Eval[ClasspathDep[ProjectReference]]*): DslEntry = DslEntry.DslDependsOn(deps)
// avoid conflict with `sbt.Keys.aggregate`
def aggregateProjects(refs: Eval[ProjectReference]*): DslEntry = DslEntry.DslAggregate(refs)
}
// Todo share this this io.syntax