Use interval throttling

JMX doesn't seem to work in reporting

    memoryMxBean.getObjectPendingFinalizationCount

At least for the test build that I used it always reports zero.
This commit is contained in:
Eugene Yokota 2015-05-04 23:09:23 -04:00
parent d82fe46052
commit 2bbe872969
4 changed files with 53 additions and 32 deletions

View File

@ -157,7 +157,7 @@ object Defaults extends BuildCommon {
fork :== false,
initialize :== {},
forcegc :== sys.props.get("sbt.task.forcegc").map(java.lang.Boolean.parseBoolean).getOrElse(EvaluateTaskConfig.defaultForceGarbageCollection),
maxObjectPendingFinalization :== sys.props.get("sbt.task.maxObjectPendingFinalization").map(java.lang.Integer.parseInt).getOrElse(EvaluateTaskConfig.defaultMaxObjectPendingFinalization)
minForcegcInterval :== sys.props.get("sbt.task.minForcegcInterval").map(java.lang.Integer.parseInt).getOrElse(EvaluateTaskConfig.defaultMinForcegcInterval)
))
def defaultTestTasks(key: Scoped): Seq[Setting[_]] = inTask(key)(Seq(
tags := Seq(Tags.Test -> 1),

View File

@ -4,7 +4,6 @@
package sbt
import java.io.File
import java.lang.management.{ ManagementFactory, MemoryMXBean }
import Def.{ displayFull, dummyState, ScopedKey, Setting }
import Keys.{ streams, Streams, TaskStreams }
import Keys.{ dummyRoots, dummyStreamsManager, executionRoots, pluginData, streamsManager, taskDefinitionKey, transformState }
@ -57,6 +56,7 @@ object TaskCancellationStrategy {
type State = Unit
def onTaskEngineStart(canceller: RunningTaskEngine): Unit = ()
def onTaskEngineFinish(state: Unit): Unit = ()
override def toString: String = "Null"
}
/** Cancel handler which registers for SIGINT and cancels tasks when it is received. */
object Signal extends TaskCancellationStrategy {
@ -66,6 +66,7 @@ object TaskCancellationStrategy {
}
def onTaskEngineFinish(registration: Signals.Registration): Unit =
registration.remove()
override def toString: String = "Signal"
}
}
@ -87,15 +88,15 @@ sealed trait EvaluateTaskConfig {
def forceGarbageCollection: Boolean
/**
* Threshold to forcing garbage collection.
* Interval in seconds.
*/
def maxObjectPendingFinalization: Int
def minForcegcInterval: Int
}
final object EvaluateTaskConfig {
// Returns the default force garbage collection flag,
// as specified by system properties.
private[sbt] def defaultForceGarbageCollection: Boolean = true
private[sbt] def defaultMaxObjectPendingFinalization: Int = 1024
private[sbt] def defaultMinForcegcInterval: Int = 60
/** Pulls in the old configuration format. */
def apply(old: EvaluateConfig): EvaluateTaskConfig = {
@ -107,39 +108,40 @@ final object EvaluateTaskConfig {
if (old.cancelable) TaskCancellationStrategy.Signal
else TaskCancellationStrategy.Null
def forceGarbageCollection = defaultForceGarbageCollection
def maxObjectPendingFinalization = defaultMaxObjectPendingFinalization
def minForcegcInterval = defaultMinForcegcInterval
}
AdaptedTaskConfig
}
@deprecated("Use the alternative that specifies maxObjectPendingFinalization", "0.13.7")
@deprecated("Use the alternative that specifies minForcegcInterval", "0.13.9")
def apply(restrictions: Seq[Tags.Rule],
checkCycles: Boolean,
progressReporter: ExecuteProgress[Task],
cancelStrategy: TaskCancellationStrategy,
forceGarbageCollection: Boolean): EvaluateTaskConfig =
apply(restrictions, checkCycles, progressReporter, cancelStrategy, forceGarbageCollection,
defaultMaxObjectPendingFinalization)
defaultMinForcegcInterval)
/** Raw constructor for EvaluateTaskConfig. */
def apply(restrictions: Seq[Tags.Rule],
checkCycles: Boolean,
progressReporter: ExecuteProgress[Task],
cancelStrategy: TaskCancellationStrategy,
forceGarbageCollection: Boolean,
maxObjectPendingFinalization: Int): EvaluateTaskConfig = {
minForcegcInterval: Int): EvaluateTaskConfig = {
val r = restrictions
val check = checkCycles
val cs = cancelStrategy
val pr = progressReporter
val fgc = forceGarbageCollection
val mopf = maxObjectPendingFinalization
val mfi = minForcegcInterval
object SimpleEvaluateTaskConfig extends EvaluateTaskConfig {
def restrictions = r
def checkCycles = check
def progressReporter = pr
def cancelStrategy = cs
def forceGarbageCollection = fgc
def maxObjectPendingFinalization = mopf
def minForcegcInterval = mfi
}
SimpleEvaluateTaskConfig
}
@ -166,7 +168,6 @@ object EvaluateTask {
if (java.lang.Boolean.getBoolean("sbt.task.timings")) new TaskTimings else ExecuteProgress.empty[Task]
val SystemProcessors = Runtime.getRuntime.availableProcessors
lazy val memoryMxBean: MemoryMXBean = ManagementFactory.getMemoryMXBean
@deprecated("Use extractedTaskConfig.", "0.13.0")
def defaultConfig(state: State): EvaluateConfig =
@ -200,8 +201,8 @@ object EvaluateTask {
val canceller = cancelStrategy(extracted, structure, state)
val progress = executeProgress(extracted, structure, state)
val fgc = forcegc(extracted, structure)
val mopf = maxObjectPendingFinalization(extracted, structure)
EvaluateTaskConfig(rs, false, progress, canceller, fgc, mopf)
val mfi = minForcegcInterval(extracted, structure)
EvaluateTaskConfig(rs, false, progress, canceller, fgc, mfi)
}
def defaultRestrictions(maxWorkers: Int) = Tags.limitAll(maxWorkers) :: Nil
@ -234,8 +235,8 @@ object EvaluateTask {
private[sbt] def forcegc(extracted: Extracted, structure: BuildStructure): Boolean =
getSetting(Keys.forcegc in Global, EvaluateTaskConfig.defaultForceGarbageCollection, extracted, structure)
// TODO - Should this pull from Global or from the project itself?
private[sbt] def maxObjectPendingFinalization(extracted: Extracted, structure: BuildStructure): Int =
getSetting(Keys.maxObjectPendingFinalization in Global, EvaluateTaskConfig.defaultMaxObjectPendingFinalization, extracted, structure)
private[sbt] def minForcegcInterval(extracted: Extracted, structure: BuildStructure): Int =
getSetting(Keys.minForcegcInterval in Global, EvaluateTaskConfig.defaultMinForcegcInterval, extracted, structure)
def getSetting[T](key: SettingKey[T], default: T, extracted: Extracted, structure: BuildStructure): T =
key in extracted.currentRef get structure.data getOrElse default
@ -337,29 +338,17 @@ object EvaluateTask {
import ConcurrentRestrictions.{ completionService, TagMap, Tag, tagged, tagsKey }
val log = state.log
log.debug("Running task... Cancel: " + config.cancelStrategy + ", check cycles: " + config.checkCycles)
log.debug(s"Running task... Cancel: ${config.cancelStrategy}, check cycles: ${config.checkCycles}, forcegc: ${config.forceGarbageCollection}")
val tags = tagged[Task[_]](_.info get tagsKey getOrElse Map.empty, Tags.predicate(config.restrictions))
val (service, shutdownThreads) = completionService[Task[_], Completed](tags, (s: String) => log.warn(s))
def shutdown(): Unit = {
// First ensure that all threads are stopped for task execution.
shutdownThreads()
// Now we run the gc cleanup to force finalizers to clear out file handles (yay GC!)
if (config.forceGarbageCollection) {
try {
val opfc = memoryMxBean.getObjectPendingFinalizationCount
if (opfc > config.maxObjectPendingFinalization) {
log.info(s"Forcing garbage collection... Objects pending finalization: $opfc")
// Force the detection of finalizers for scala.reflect weakhashsets
System.gc()
// Force finalizers to run.
System.runFinalization()
// Force actually cleaning the weak hash maps.
System.gc()
}
} catch {
case _: Throwable => // gotta catch em all
}
GCUtil.forceGcWithInterval(config.minForcegcInterval, log)
}
}
// propagate the defining key for reporting the origin

View File

@ -0,0 +1,32 @@
package sbt
import java.util.concurrent.atomic.AtomicLong
import scala.util.control.NonFatal
private[sbt] object GCUtil {
val lastGcCheck: AtomicLong = new AtomicLong(0L)
def forceGcWithInterval(minForcegcInterval: Int, log: Logger): Unit =
{
val now = System.currentTimeMillis
val last = lastGcCheck.get
// This throttles System.gc calls to interval
if (now - last > minForcegcInterval * 1000) {
lastGcCheck.set(now)
forceGc(log)
}
}
def forceGc(log: Logger): Unit =
try {
log.debug(s"Forcing garbage collection...")
// Force the detection of finalizers for scala.reflect weakhashsets
System.gc()
// Force finalizers to run.
System.runFinalization()
// Force actually cleaning the weak hash maps.
System.gc()
} catch {
case NonFatal(_) => // gotta catch em all
}
}

View File

@ -342,7 +342,7 @@ object Keys {
val concurrentRestrictions = SettingKey[Seq[Tags.Rule]]("concurrent-restrictions", "Rules describing restrictions on concurrent task execution.", BSetting)
val cancelable = SettingKey[Boolean]("cancelable", "Enables (true) or disables (false) the ability to interrupt task execution with CTRL+C.", BMinusSetting)
val forcegc = SettingKey[Boolean]("forcegc", "Enables (true) or disables (false) forcing garbage collection after task run when needed.", BMinusSetting)
val maxObjectPendingFinalization = SettingKey[Int]("max-object-pending-finalization", "Threshold to forcing garbage collection.")
val minForcegcInterval = SettingKey[Int]("min-forcegc-interval", "Minimal interval (in seconds) to check for forcing garbage collection.")
val settingsData = std.FullInstance.settingsData
val streams = TaskKey[TaskStreams]("streams", "Provides streams for logging and persisting data.", DTask)
val taskDefinitionKey = Def.taskDefinitionKey