Implement copyClasspath for bgRun

Copies products to the workind directory, and the rest to the serviceTempDir of this service, both wrapped in SHA-1 hash of the file contents. This is intended to mimize the file copying and accumulation of the unused JAR file. Since working directory is wiped out when the background job ends, the product JAR is deleted too. Meanwhile, the rest of the dependencies are cached for the duration of this service.
This commit is contained in:
Eugene Yokota 2017-01-20 12:18:55 -05:00
parent 560c7a1364
commit 1e960b324c
6 changed files with 87 additions and 38 deletions

View File

@ -2,8 +2,9 @@ package sbt
import java.io.Closeable import java.io.Closeable
import sbt.util.Logger import sbt.util.Logger
import Def.ScopedKey import Def.{ ScopedKey, Classpath }
import sbt.internal.util.complete._ import sbt.internal.util.complete._
import java.io.File
abstract class BackgroundJobService extends Closeable { abstract class BackgroundJobService extends Closeable {
/** /**
@ -12,16 +13,20 @@ abstract class BackgroundJobService extends Closeable {
* then you should get an InterruptedException while blocking on the process, and * then you should get an InterruptedException while blocking on the process, and
* then you could process.destroy() for example. * then you could process.destroy() for example.
*/ */
def runInBackground(spawningTask: ScopedKey[_], state: State)(start: (Logger) => Unit): JobHandle def runInBackground(spawningTask: ScopedKey[_], state: State)(start: (Logger, File) => Unit): JobHandle
/** Same as shutown. */
def close(): Unit def close(): Unit
/** Shuts down all background jobs. */
def shutdown(): Unit def shutdown(): Unit
def jobs: Vector[JobHandle] def jobs: Vector[JobHandle]
def stop(job: JobHandle): Unit def stop(job: JobHandle): Unit
def waitFor(job: JobHandle): Unit def waitFor(job: JobHandle): Unit
/** Copies classpath to temporary directories. */
def copyClasspath(products: Classpath, full: Classpath, workingDirectory: File): Classpath
} }
object BackgroundJobService { object BackgroundJobService {
def jobIdParser: (State, Seq[JobHandle]) => Parser[Seq[JobHandle]] = { private[sbt] def jobIdParser: (State, Seq[JobHandle]) => Parser[Seq[JobHandle]] = {
import DefaultParsers._ import DefaultParsers._
(state, handles) => { (state, handles) => {
val stringIdParser: Parser[Seq[String]] = Space ~> token(NotSpace examples handles.map(_.id.toString).toSet, description = "<job id>").+ val stringIdParser: Parser[Seq[String]] = Space ~> token(NotSpace examples handles.map(_.id.toString).toSet, description = "<job id>").+

View File

@ -137,7 +137,8 @@ object Defaults extends BuildCommon {
bgList := { bgJobService.value.jobs }, bgList := { bgJobService.value.jobs },
ps := psTask.value, ps := psTask.value,
bgStop := bgStopTask.evaluated, bgStop := bgStopTask.evaluated,
bgWaitFor := bgWaitForTask.evaluated bgWaitFor := bgWaitForTask.evaluated,
bgCopyClasspath :== true
) )
private[sbt] lazy val globalIvyCore: Seq[Setting[_]] = private[sbt] lazy val globalIvyCore: Seq[Setting[_]] =
@ -353,8 +354,8 @@ object Defaults extends BuildCommon {
run := runTask(fullClasspath, mainClass in run, runner in run).evaluated, run := runTask(fullClasspath, mainClass in run, runner in run).evaluated,
copyResources := copyResourcesTask.value, copyResources := copyResourcesTask.value,
// note that we use the same runner and mainClass as plain run // note that we use the same runner and mainClass as plain run
bgRunMain := bgRunMainTask(fullClasspathAsJars, runner in run).evaluated, bgRunMain := bgRunMainTask(exportedProductJars, fullClasspathAsJars, bgCopyClasspath in bgRunMain, runner in run).evaluated,
bgRun := bgRunTask(fullClasspathAsJars, mainClass in run, runner in run).evaluated bgRun := bgRunTask(exportedProductJars, fullClasspathAsJars, mainClass in run, bgCopyClasspath in bgRun, runner in run).evaluated
) ++ inTask(run)(runnerSettings) ) ++ inTask(run)(runnerSettings)
private[this] lazy val configGlobal = globalDefaults(Seq( private[this] lazy val configGlobal = globalDefaults(Seq(
@ -815,25 +816,34 @@ object Defaults extends BuildCommon {
IO.move(mappings.map(_.swap)) IO.move(mappings.map(_.swap))
} }
def bgRunMainTask(classpath: Initialize[Task[Classpath]], scalaRun: Initialize[Task[ScalaRun]]): Initialize[InputTask[JobHandle]] = def bgRunMainTask(products: Initialize[Task[Classpath]], classpath: Initialize[Task[Classpath]],
copyClasspath: Initialize[Boolean], scalaRun: Initialize[Task[ScalaRun]]): Initialize[InputTask[JobHandle]] =
{ {
val parser = Defaults.loadForParser(discoveredMainClasses)((s, names) => Defaults.runMainParser(s, names getOrElse Nil)) val parser = Defaults.loadForParser(discoveredMainClasses)((s, names) => Defaults.runMainParser(s, names getOrElse Nil))
Def.inputTask { Def.inputTask {
val service = bgJobService.value
val (mainClass, args) = parser.parsed val (mainClass, args) = parser.parsed
bgJobService.value.runInBackground(resolvedScoped.value, state.value) { (logger) => service.runInBackground(resolvedScoped.value, state.value) { (logger, workingDir) =>
scalaRun.value.run(mainClass, data(classpath.value), args, logger).get val cp =
if (copyClasspath.value) service.copyClasspath(products.value, classpath.value, workingDir)
else classpath.value
scalaRun.value.run(mainClass, data(cp), args, logger).get
} }
} }
} }
def bgRunTask(classpath: Initialize[Task[Classpath]], mainClassTask: Initialize[Task[Option[String]]], scalaRun: Initialize[Task[ScalaRun]]): Initialize[InputTask[JobHandle]] = def bgRunTask(products: Initialize[Task[Classpath]], classpath: Initialize[Task[Classpath]], mainClassTask: Initialize[Task[Option[String]]],
copyClasspath: Initialize[Boolean], scalaRun: Initialize[Task[ScalaRun]]): Initialize[InputTask[JobHandle]] =
{ {
import Def.parserToInput import Def.parserToInput
val parser = Def.spaceDelimited() val parser = Def.spaceDelimited()
Def.inputTask { Def.inputTask {
val service = bgJobService.value
val mainClass = mainClassTask.value getOrElse sys.error("No main class detected.") val mainClass = mainClassTask.value getOrElse sys.error("No main class detected.")
bgJobService.value.runInBackground(resolvedScoped.value, state.value) { (logger) => service.runInBackground(resolvedScoped.value, state.value) { (logger, workingDir) =>
// TODO - Copy the classpath into some tmp directory so we don't immediately die if a recompile happens. val cp =
scalaRun.value.run(mainClass, data(classpath.value), parser.parsed, logger).get if (copyClasspath.value) service.copyClasspath(products.value, classpath.value, workingDir)
else classpath.value
scalaRun.value.run(mainClass, data(cp), parser.parsed, logger).get
} }
} }
} }
@ -951,8 +961,8 @@ object Defaults extends BuildCommon {
} }
)) ))
def mainBgRunTask = bgRun := bgRunTask(fullClasspathAsJars in Runtime, mainClass in run, runner in run).evaluated def mainBgRunTask = bgRun := bgRunTask(exportedProductJars, fullClasspathAsJars in Runtime, mainClass in run, bgCopyClasspath in bgRun, runner in run).evaluated
def mainBgRunMainTask = bgRunMain := bgRunMainTask(fullClasspathAsJars in Runtime, runner in run).evaluated def mainBgRunMainTask = bgRunMain := bgRunMainTask(exportedProductJars, fullClasspathAsJars in Runtime, bgCopyClasspath in bgRunMain, runner in run).evaluated
def discoverMainClasses(analysis: CompileAnalysis): Seq[String] = def discoverMainClasses(analysis: CompileAnalysis): Seq[String] =
Discovery.applications(Tests.allDefs(analysis)).collect({ case (definition, discovered) if discovered.hasMain => definition.name }).sorted Discovery.applications(Tests.allDefs(analysis)).collect({ case (definition, discovered) if discovered.hasMain => definition.name }).sorted

View File

@ -253,6 +253,7 @@ object Keys {
val bgWaitFor = inputKey[Unit]("Wait for a background job to finish by providing its ID.") val bgWaitFor = inputKey[Unit]("Wait for a background job to finish by providing its ID.")
val bgRun = inputKey[JobHandle]("Start an application's default main class as a background job") val bgRun = inputKey[JobHandle]("Start an application's default main class as a background job")
val bgRunMain = inputKey[JobHandle]("Start a provided main class as a background job") val bgRunMain = inputKey[JobHandle]("Start a provided main class as a background job")
val bgCopyClasspath = settingKey[Boolean]("Copies classpath on bgRun to prevent conflict.")
// Test Keys // Test Keys
val testLoader = TaskKey[ClassLoader]("test-loader", "Provides the class loader used for testing.", DTask) val testLoader = TaskKey[ClassLoader]("test-loader", "Provides the class loader used for testing.", DTask)

View File

@ -3,10 +3,14 @@ package internal
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import java.io.Closeable import java.io.Closeable
import sbt.util.Logger import Def.{ ScopedKey, Setting, Classpath }
import Def.{ ScopedKey, Setting }
import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext
import Scope.GlobalScope import Scope.GlobalScope
import java.io.File
import sbt.io.{ IO, Hash }
import sbt.io.syntax._
import sbt.util.{ Logger, LogExchange }
import sbt.internal.util.{ Attributed, ManagedLogger }
/** /**
* Interface between sbt and a thing running in the background. * Interface between sbt and a thing running in the background.
@ -32,6 +36,7 @@ private[sbt] abstract class AbstractJobHandle extends JobHandle {
private[sbt] abstract class AbstractBackgroundJobService extends BackgroundJobService { private[sbt] abstract class AbstractBackgroundJobService extends BackgroundJobService {
private val nextId = new AtomicLong(1) private val nextId = new AtomicLong(1)
private val pool = new BackgroundThreadPool() private val pool = new BackgroundThreadPool()
private val serviceTempDir = IO.createTemporaryDirectory
// hooks for sending start/stop events // hooks for sending start/stop events
protected def onAddJob(job: JobHandle): Unit = {} protected def onAddJob(job: JobHandle): Unit = {}
@ -55,7 +60,7 @@ private[sbt] abstract class AbstractBackgroundJobService extends BackgroundJobSe
final class ThreadJobHandle( final class ThreadJobHandle(
override val id: Long, override val spawningTask: ScopedKey[_], override val id: Long, override val spawningTask: ScopedKey[_],
val logger: Logger, val job: BackgroundJob val logger: ManagedLogger, val workingDirectory: File, val job: BackgroundJob
) extends AbstractJobHandle { ) extends AbstractJobHandle {
def humanReadableName: String = job.humanReadableName def humanReadableName: String = job.humanReadableName
// EC for onStop handler below // EC for onStop handler below
@ -64,6 +69,8 @@ private[sbt] abstract class AbstractBackgroundJobService extends BackgroundJobSe
// TODO: Fix this // TODO: Fix this
// logger.close() // logger.close()
removeJob(this) removeJob(this)
IO.delete(workingDirectory)
LogExchange.unbindLoggerAppenders(logger.name)
} }
addJob(this) addJob(this)
override final def equals(other: Any): Boolean = other match { override final def equals(other: Any): Boolean = other match {
@ -80,13 +87,16 @@ private[sbt] abstract class AbstractBackgroundJobService extends BackgroundJobSe
override val spawningTask: ScopedKey[_] = unknownTask override val spawningTask: ScopedKey[_] = unknownTask
} }
protected def makeContext(id: Long, spawningTask: ScopedKey[_], state: State): Logger protected def makeContext(id: Long, spawningTask: ScopedKey[_], state: State): ManagedLogger
def doRunInBackground(spawningTask: ScopedKey[_], state: State, start: (Logger) => BackgroundJob): JobHandle = { def doRunInBackground(spawningTask: ScopedKey[_], state: State, start: (Logger, File) => BackgroundJob): JobHandle = {
val id = nextId.getAndIncrement() val id = nextId.getAndIncrement()
val logger = makeContext(id, spawningTask, state) val logger = makeContext(id, spawningTask, state)
val job = try new ThreadJobHandle(id, spawningTask, logger, start(logger)) val workingDir = serviceTempDir / s"job-$id"
catch { IO.createDirectory(workingDir)
val job = try {
new ThreadJobHandle(id, spawningTask, logger, workingDir, start(logger, workingDir))
} catch {
case e: Throwable => case e: Throwable =>
// TODO: Fix this // TODO: Fix this
// logger.close() // logger.close()
@ -95,7 +105,7 @@ private[sbt] abstract class AbstractBackgroundJobService extends BackgroundJobSe
job job
} }
override def runInBackground(spawningTask: ScopedKey[_], state: State)(start: (Logger) => Unit): JobHandle = { override def runInBackground(spawningTask: ScopedKey[_], state: State)(start: (Logger, File) => Unit): JobHandle = {
pool.run(this, spawningTask, state)(start) pool.run(this, spawningTask, state)(start)
} }
@ -110,6 +120,7 @@ private[sbt] abstract class AbstractBackgroundJobService extends BackgroundJobSe
} }
} }
pool.close() pool.close()
IO.delete(serviceTempDir)
} }
private def withHandle(job: JobHandle)(f: ThreadJobHandle => Unit): Unit = job match { private def withHandle(job: JobHandle)(f: ThreadJobHandle => Unit): Unit = job match {
@ -125,6 +136,29 @@ private[sbt] abstract class AbstractBackgroundJobService extends BackgroundJobSe
withHandle(job)(_.job.awaitTermination()) withHandle(job)(_.job.awaitTermination())
override def toString(): String = s"BackgroundJobService(jobs=${jobs.map(_.id).mkString})" override def toString(): String = s"BackgroundJobService(jobs=${jobs.map(_.id).mkString})"
/**
* Copies products to the workind directory, and the rest to the serviceTempDir of this service,
* both wrapped in SHA-1 hash of the file contents.
* This is intended to mimize the file copying and accumulation of the unused JAR file.
* Since working directory is wiped out when the background job ends, the product JAR is deleted too.
* Meanwhile, the rest of the dependencies are cached for the duration of this service.
*/
override def copyClasspath(products: Classpath, full: Classpath, workingDirectory: File): Classpath =
{
def syncTo(dir: File)(source0: Attributed[File]): Attributed[File] =
{
val source = source0.data
val hash8 = Hash.toHex(Hash(source)).take(8)
val dest = dir / hash8 / source.getName
if (!dest.exists) { IO.copyFile(source, dest) }
Attributed.blank(dest)
}
val xs = (products.toVector map { syncTo(workingDirectory / "target") }) ++
((full diff products) map { syncTo(serviceTempDir / "target") })
Thread.sleep(100)
xs
}
} }
private[sbt] object BackgroundThreadPool { private[sbt] object BackgroundThreadPool {
@ -243,10 +277,10 @@ private[sbt] class BackgroundThreadPool extends java.io.Closeable {
} }
} }
def run(manager: AbstractBackgroundJobService, spawningTask: ScopedKey[_], state: State)(work: (Logger) => Unit): JobHandle = { def run(manager: AbstractBackgroundJobService, spawningTask: ScopedKey[_], state: State)(work: (Logger, File) => Unit): JobHandle = {
def start(logger: Logger): BackgroundJob = { def start(logger: Logger, workingDir: File): BackgroundJob = {
val runnable = new BackgroundRunnable(spawningTask.key.label, { () => val runnable = new BackgroundRunnable(spawningTask.key.label, { () =>
work(logger) work(logger, workingDir)
}) })
executor.execute(runnable) executor.execute(runnable)
runnable runnable
@ -260,7 +294,7 @@ private[sbt] class BackgroundThreadPool extends java.io.Closeable {
} }
private[sbt] class DefaultBackgroundJobService extends AbstractBackgroundJobService { private[sbt] class DefaultBackgroundJobService extends AbstractBackgroundJobService {
override def makeContext(id: Long, spawningTask: ScopedKey[_], state: State): Logger = { override def makeContext(id: Long, spawningTask: ScopedKey[_], state: State): ManagedLogger = {
val extracted = Project.extract(state) val extracted = Project.extract(state)
LogManager.constructBackgroundLog(extracted.structure.data, state)(spawningTask) LogManager.constructBackgroundLog(extracted.structure.data, state)(spawningTask)
} }

View File

@ -12,11 +12,12 @@ import scala.Console.{ BLUE, RESET }
import sbt.internal.util.{ AttributeKey, ConsoleOut, Settings, SuppressedTraceContext, MainAppender } import sbt.internal.util.{ AttributeKey, ConsoleOut, Settings, SuppressedTraceContext, MainAppender }
import MainAppender._ import MainAppender._
import sbt.util.{ AbstractLogger, Level, Logger, LogExchange } import sbt.util.{ AbstractLogger, Level, Logger, LogExchange }
import sbt.internal.util.ManagedLogger
import org.apache.logging.log4j.core.Appender import org.apache.logging.log4j.core.Appender
sealed abstract class LogManager { sealed abstract class LogManager {
def apply(data: Settings[Scope], state: State, task: ScopedKey[_], writer: PrintWriter): Logger def apply(data: Settings[Scope], state: State, task: ScopedKey[_], writer: PrintWriter): Logger
def backgroundLog(data: Settings[Scope], state: State, task: ScopedKey[_]): Logger def backgroundLog(data: Settings[Scope], state: State, task: ScopedKey[_]): ManagedLogger
} }
object LogManager { object LogManager {
@ -30,7 +31,7 @@ object LogManager {
manager(data, state, task, to) manager(data, state, task, to)
} }
def constructBackgroundLog(data: Settings[Scope], state: State): (ScopedKey[_]) => Logger = (task: ScopedKey[_]) => def constructBackgroundLog(data: Settings[Scope], state: State): (ScopedKey[_]) => ManagedLogger = (task: ScopedKey[_]) =>
{ {
val manager: LogManager = (logManager in task.scope).get(data) getOrElse { defaultManager(state.globalLogging.console) } val manager: LogManager = (logManager in task.scope).get(data) getOrElse { defaultManager(state.globalLogging.console) }
manager.backgroundLog(data, state, task) manager.backgroundLog(data, state, task)
@ -60,7 +61,7 @@ object LogManager {
def apply(data: Settings[Scope], state: State, task: ScopedKey[_], to: PrintWriter): Logger = def apply(data: Settings[Scope], state: State, task: ScopedKey[_], to: PrintWriter): Logger =
defaultLogger(data, state, task, screen(task, state), backed(to), relay(()), extra(task).toList) defaultLogger(data, state, task, screen(task, state), backed(to), relay(()), extra(task).toList)
def backgroundLog(data: Settings[Scope], state: State, task: ScopedKey[_]): Logger = def backgroundLog(data: Settings[Scope], state: State, task: ScopedKey[_]): ManagedLogger =
LogManager.backgroundLog(data, state, task, screen(task, state), relay(()), extra(task).toList) LogManager.backgroundLog(data, state, task, screen(task, state), relay(()), extra(task).toList)
} }
@ -112,7 +113,7 @@ object LogManager {
} }
def backgroundLog(data: Settings[Scope], state: State, task: ScopedKey[_], def backgroundLog(data: Settings[Scope], state: State, task: ScopedKey[_],
console: Appender, /* TODO: backed: Appender,*/ relay: Appender, extra: List[Appender]): Logger = console: Appender, /* TODO: backed: Appender,*/ relay: Appender, extra: List[Appender]): ManagedLogger =
{ {
val execOpt = state.currentCommand val execOpt = state.currentCommand
val loggerName: String = s"bg-${task.key.label}-${generateId.incrementAndGet}" val loggerName: String = s"bg-${task.key.label}-${generateId.incrementAndGet}"
@ -121,7 +122,7 @@ object LogManager {
val log = LogExchange.logger(loggerName, channelName, None) val log = LogExchange.logger(loggerName, channelName, None)
LogExchange.unbindLoggerAppenders(loggerName) LogExchange.unbindLoggerAppenders(loggerName)
val consoleOpt = consoleLocally(state, console) val consoleOpt = consoleLocally(state, console)
LogExchange.bindLoggerAppenders(loggerName, (consoleOpt.toList map { _ -> Level.Info }) ::: (relay -> Level.Debug) :: Nil) LogExchange.bindLoggerAppenders(loggerName, (consoleOpt.toList map { _ -> Level.Debug }) ::: (relay -> Level.Debug) :: Nil)
log log
} }

View File

@ -22,6 +22,9 @@ sealed trait ScalaRun {
class ForkRun(config: ForkOptions) extends ScalaRun { class ForkRun(config: ForkOptions) extends ScalaRun {
def run(mainClass: String, classpath: Seq[File], options: Seq[String], log: Logger): Try[Unit] = def run(mainClass: String, classpath: Seq[File], options: Seq[String], log: Logger): Try[Unit] =
{ {
def processExitCode(exitCode: Int, label: String): Try[Unit] =
if (exitCode == 0) Success(())
else Failure(new RuntimeException(s"""Nonzero exit code returned from $label: $exitCode""".stripMargin))
val process = fork(mainClass, classpath, options, log) val process = fork(mainClass, classpath, options, log)
def cancel() = { def cancel() = {
log.warn("Run canceled.") log.warn("Run canceled.")
@ -34,7 +37,7 @@ class ForkRun(config: ForkOptions) extends ScalaRun {
def fork(mainClass: String, classpath: Seq[File], options: Seq[String], log: Logger): Process = def fork(mainClass: String, classpath: Seq[File], options: Seq[String], log: Logger): Process =
{ {
log.info("Running " + mainClass + " " + options.mkString(" ")) log.info("Running (fork) " + mainClass + " " + options.mkString(" "))
val scalaOptions = classpathOption(classpath) ::: mainClass :: options.toList val scalaOptions = classpathOption(classpath) ::: mainClass :: options.toList
val configLogged = val configLogged =
@ -44,11 +47,6 @@ class ForkRun(config: ForkOptions) extends ScalaRun {
Fork.java.fork(configLogged, scalaOptions) Fork.java.fork(configLogged, scalaOptions)
} }
private def classpathOption(classpath: Seq[File]) = "-classpath" :: Path.makeString(classpath) :: Nil private def classpathOption(classpath: Seq[File]) = "-classpath" :: Path.makeString(classpath) :: Nil
private def processExitCode(exitCode: Int, label: String): Try[Unit] =
{
if (exitCode == 0) Success(())
else Failure(new RuntimeException("Nonzero exit code returned from " + label + ": " + exitCode))
}
} }
class Run(instance: ScalaInstance, trapExit: Boolean, nativeTmp: File) extends ScalaRun { class Run(instance: ScalaInstance, trapExit: Boolean, nativeTmp: File) extends ScalaRun {
/** Runs the class 'mainClass' using the given classpath and options using the scala runner.*/ /** Runs the class 'mainClass' using the given classpath and options using the scala runner.*/