mirror of https://github.com/sbt/sbt.git
Improve supershell performance
It turns out that task progress actually introduces a fair bit of overhead. The biggest issue is that the task progress callbacks block the Execute main thread. This means that time in those callbacks delays task evaluation, slowing down sbt. This was not negligible, I was seeing a lot of the total time of a no-op compile in https://github.com/jtjeferreira/sbt-multi-module-sample was spent in TaskProgress callbacks. Prior to these changes, I ran 30 no-op compiles in that project and the average time was about 570ms. This number got worse and worse because there were memory leaks in the TaskProgress object. After these changes, it dropped to 250ms and after jit-ing, it would drop to about 200ms. I also successfully ran 5000 consecutive no-op compiles without leaking any memory. A lot of the overhead of task progress was in adding tasks to the timings map in AbstractTaskProgress. Tasks were never removed and ConcurrentHashMap insertion time is proportional to the size of the map (not sure if it's linear, quadratic or other) which was why sbt actually got slower and slower the longer it ran. Much of the time was spent adding tasks to the progress timings. To fix this, I did something similar to what I did to manage logger state in https://github.com/jtjeferreira/sbt-multi-module-sample. In MainLoop, we create a new TaskProgress instance before command evaluation and clean it up after. Earlier I made TaskProgress an object to try to ensure there was only one progress thread at a time, and that introduced the memory leak. In addition to removing the leak, I was able to improve performance by removing tasks from the timings map when they completed. Unlike TaskTimings and TaskTraceEvent, we don't care about tasks that have completed for TaskProgress so it is safe to remove them. In addition to the memory leaks, I also reworked how the background threads work. Instead of having one thread that sleeps and prints progress reports, we now use two single threaded executors. One is a scheduled executor that is used to schedule progress reports and the other is the actual thread on which the report is generated. When progress starts, we schedule a recurring report that is generated every sleep interval until task evaluation completes. Whenever we add a new task, if we have haven't previously generated a progress report, we schedule a report in threshold milliseconds. If the task completes before the threshold period has elapsed, we just cancel the schedule report. By doing things this way, we reduce the total number of reports that are generated. Because reports need to effectively lock System.out, the less we generate them, the better. I also modified the internal data structures of AbstractTaskProgress so that there is a single task map of timings instead of one map for timings and one for active tasks.
This commit is contained in:
parent
d569abe70a
commit
102e3d1969
|
|
@ -1019,6 +1019,7 @@ lazy val mainProj = (project in file("main"))
|
|||
// internal logging apis,
|
||||
exclude[IncompatibleSignatureProblem]("sbt.internal.LogManager*"),
|
||||
exclude[MissingTypesProblem]("sbt.internal.RelayAppender"),
|
||||
exclude[MissingClassProblem]("sbt.internal.TaskProgress$ProgressThread")
|
||||
)
|
||||
)
|
||||
.configure(
|
||||
|
|
|
|||
|
|
@ -78,7 +78,7 @@ private[sbt] final class ProgressState(
|
|||
}
|
||||
|
||||
private[util] def getPrompt(terminal: Terminal): Array[Byte] = {
|
||||
if (terminal.prompt != Prompt.Running && terminal.prompt != Prompt.Batch) {
|
||||
if (terminal.prompt.isInstanceOf[Prompt.AskUser]) {
|
||||
val prefix = if (terminal.isAnsiSupported) s"$DeleteLine$CursorLeft1000" else ""
|
||||
prefix.getBytes ++ terminal.prompt.render().getBytes("UTF-8")
|
||||
} else Array.empty
|
||||
|
|
@ -108,8 +108,8 @@ private[sbt] final class ProgressState(
|
|||
val lines = printProgress(terminal, lastLine)
|
||||
toWrite ++= (ClearScreenAfterCursor + lines).getBytes("UTF-8")
|
||||
}
|
||||
toWrite ++= getPrompt(terminal)
|
||||
}
|
||||
toWrite ++= getPrompt(terminal)
|
||||
printStream.write(toWrite.toArray)
|
||||
printStream.flush()
|
||||
} else printStream.write(bytes)
|
||||
|
|
@ -136,6 +136,9 @@ private[sbt] final class ProgressState(
|
|||
}
|
||||
|
||||
private[sbt] object ProgressState {
|
||||
private val MIN_COMMAND_WIDTH = 10
|
||||
private val SERVER_IS_RUNNING = "sbt server is running "
|
||||
private val SERVER_IS_RUNNING_LENGTH = SERVER_IS_RUNNING.length + 2
|
||||
|
||||
/**
|
||||
* Receives a new task report and replaces the old one. In the event that the new
|
||||
|
|
@ -165,8 +168,13 @@ private[sbt] object ProgressState {
|
|||
}
|
||||
} else {
|
||||
pe.command.toSeq.flatMap { cmd =>
|
||||
val width = terminal.getWidth
|
||||
val sanitized = if ((cmd.length + SERVER_IS_RUNNING_LENGTH) < width) {
|
||||
if (SERVER_IS_RUNNING_LENGTH + cmd.length < width) cmd
|
||||
else cmd.take(MIN_COMMAND_WIDTH) + "..."
|
||||
} else cmd
|
||||
val tail = if (isWatch) Nil else "enter 'cancel' to stop evaluation" :: Nil
|
||||
s"sbt server is running '$cmd'" :: tail
|
||||
s"$SERVER_IS_RUNNING '$sanitized'" :: tail
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -467,7 +467,7 @@ object Terminal {
|
|||
try {
|
||||
System.setOut(proxyPrintStream)
|
||||
System.setErr(proxyErrorStream)
|
||||
scala.Console.withErr(proxyErrorStream)(scala.Console.withOut(proxyOutputStream)(f))
|
||||
scala.Console.withErr(proxyErrorStream)(scala.Console.withOut(proxyPrintStream)(f))
|
||||
} finally {
|
||||
System.setOut(originalOut)
|
||||
System.setErr(originalErr)
|
||||
|
|
|
|||
|
|
@ -255,7 +255,7 @@ object EvaluateTask {
|
|||
extracted,
|
||||
structure
|
||||
)
|
||||
val reporters = maker.map(_.progress) ++ Some(TaskProgress) ++
|
||||
val reporters = maker.map(_.progress) ++ state.get(Keys.taskProgress) ++
|
||||
(if (SysProp.taskTimings)
|
||||
new TaskTimings(reportOnShutdown = false, state.globalLogging.full) :: Nil
|
||||
else Nil)
|
||||
|
|
|
|||
|
|
@ -555,6 +555,7 @@ object Keys {
|
|||
def apply(progress: ExecuteProgress[Task]): TaskProgress = new TaskProgress(progress)
|
||||
}
|
||||
private[sbt] val currentTaskProgress = AttributeKey[TaskProgress]("current-task-progress")
|
||||
private[sbt] val taskProgress = AttributeKey[sbt.internal.TaskProgress]("active-task-progress")
|
||||
val useSuperShell = settingKey[Boolean]("Enables (true) or disables the super shell.")
|
||||
val turbo = settingKey[Boolean]("Enables (true) or disables optional performance features.")
|
||||
// This key can be used to add custom ExecuteProgress instances
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ import java.util.Properties
|
|||
import java.util.concurrent.ForkJoinPool
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
import sbt.BasicCommandStrings.{ Shell, Shutdown, TemplateCommand, networkExecPrefix }
|
||||
import sbt.BasicCommandStrings.{ Shell, Shutdown, TemplateCommand }
|
||||
import sbt.Project.LoadAction
|
||||
import sbt.compiler.EvalImports
|
||||
import sbt.internal.Aggregation.AnyKeys
|
||||
|
|
@ -999,13 +999,7 @@ object BuiltinCommands {
|
|||
}
|
||||
|
||||
private def getExec(state: State, interval: Duration): Exec = {
|
||||
val exec: Exec =
|
||||
StandardMain.exchange.blockUntilNextExec(interval, Some(state), state.globalLogging.full)
|
||||
if (exec.source.fold(true)(_.channelName != ConsoleChannel.defaultName) &&
|
||||
!exec.commandLine.startsWith(networkExecPrefix)) {
|
||||
Terminal.consoleLog(s"received remote command: ${exec.commandLine}")
|
||||
}
|
||||
exec
|
||||
StandardMain.exchange.blockUntilNextExec(interval, Some(state), state.globalLogging.full)
|
||||
}
|
||||
|
||||
def shell: Command = Command.command(Shell, Help.more(Shell, ShellDetailed)) { s0 =>
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ import sbt.internal.langserver.ErrorCodes
|
|||
import sbt.internal.protocol.JsonRpcResponseError
|
||||
import sbt.internal.nio.CheckBuildSources.CheckBuildSourcesKey
|
||||
import sbt.internal.util.{ ErrorHandling, GlobalLogBacking, Prompt, Terminal }
|
||||
import sbt.internal.ShutdownHooks
|
||||
import sbt.internal.{ ShutdownHooks, TaskProgress }
|
||||
import sbt.io.{ IO, Using }
|
||||
import sbt.protocol._
|
||||
import sbt.util.{ Logger, LoggerContext }
|
||||
|
|
@ -150,9 +150,13 @@ object MainLoop {
|
|||
|
||||
def next(state: State): State = {
|
||||
val context = LoggerContext(useLog4J = state.get(Keys.useLog4J.key).getOrElse(false))
|
||||
val taskProgress = new TaskProgress
|
||||
try {
|
||||
ErrorHandling.wideConvert {
|
||||
state.put(Keys.loggerContext, context).process(processCommand)
|
||||
state
|
||||
.put(Keys.loggerContext, context)
|
||||
.put(Keys.taskProgress, taskProgress)
|
||||
.process(processCommand)
|
||||
} match {
|
||||
case Right(s) => s.remove(Keys.loggerContext)
|
||||
case Left(t: xsbti.FullReload) => throw t
|
||||
|
|
@ -186,7 +190,10 @@ object MainLoop {
|
|||
state.log.error(msg)
|
||||
state.log.error("\n")
|
||||
state.handleError(oom)
|
||||
} finally context.close()
|
||||
} finally {
|
||||
context.close()
|
||||
taskProgress.close()
|
||||
}
|
||||
}
|
||||
|
||||
/** This is the main function State transfer function of the sbt command processing. */
|
||||
|
|
@ -217,6 +224,7 @@ object MainLoop {
|
|||
() => {
|
||||
c.terminal.setPrompt(prevPrompt)
|
||||
Terminal.set(prevTerminal)
|
||||
c.terminal.setPrompt(prevPrompt)
|
||||
c.terminal.flush()
|
||||
}
|
||||
case _ => () => ()
|
||||
|
|
|
|||
|
|
@ -9,8 +9,11 @@ package sbt
|
|||
package internal
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import scala.collection.concurrent.TrieMap
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable
|
||||
import scala.collection.immutable.VectorBuilder
|
||||
import scala.concurrent.duration._
|
||||
|
||||
private[sbt] abstract class AbstractTaskExecuteProgress extends ExecuteProgress[Task] {
|
||||
import AbstractTaskExecuteProgress.Timer
|
||||
|
|
@ -18,10 +21,51 @@ private[sbt] abstract class AbstractTaskExecuteProgress extends ExecuteProgress[
|
|||
private[this] val showScopedKey = Def.showShortKey(None)
|
||||
private[this] val anonOwners = new ConcurrentHashMap[Task[_], Task[_]]
|
||||
private[this] val calledBy = new ConcurrentHashMap[Task[_], Task[_]]
|
||||
private[this] val activeTasksMap = new ConcurrentHashMap[Task[_], Unit]
|
||||
protected val timings = new ConcurrentHashMap[Task[_], Timer]
|
||||
private[this] val timings = new ConcurrentHashMap[Task[_], Timer]
|
||||
private[sbt] def timingsByName: mutable.Map[String, AtomicLong] = {
|
||||
val result = new ConcurrentHashMap[String, AtomicLong]
|
||||
timings.forEach { (task, timing) =>
|
||||
val duration = timing.durationNanos
|
||||
result.putIfAbsent(taskName(task), new AtomicLong(duration)) match {
|
||||
case null =>
|
||||
case t => t.getAndAdd(duration); ()
|
||||
}
|
||||
}
|
||||
result.asScala
|
||||
}
|
||||
private[sbt] def anyTimings = !timings.isEmpty
|
||||
def currentTimings: Iterator[(Task[_], Timer)] = timings.asScala.iterator
|
||||
|
||||
def activeTasks: Set[Task[_]] = activeTasksMap.keySet.asScala.toSet
|
||||
private[internal] def exceededThreshold(task: Task[_], threshold: FiniteDuration): Boolean =
|
||||
timings.get(task) match {
|
||||
case null => false
|
||||
case t => t.durationMicros > threshold.toMicros
|
||||
}
|
||||
private[internal] def timings(
|
||||
tasks: java.util.Set[Task[_]],
|
||||
thresholdMicros: Long
|
||||
): Vector[(Task[_], Long)] = {
|
||||
val result = new VectorBuilder[(Task[_], Long)]
|
||||
val now = System.nanoTime
|
||||
tasks.forEach { t =>
|
||||
timings.get(t) match {
|
||||
case null =>
|
||||
case timing =>
|
||||
if (timing.isActive) {
|
||||
val elapsed = (now - timing.startNanos) / 1000
|
||||
if (elapsed > thresholdMicros) result += t -> elapsed
|
||||
}
|
||||
}
|
||||
}
|
||||
result.result()
|
||||
}
|
||||
def activeTasks(now: Long) = {
|
||||
val result = new VectorBuilder[(Task[_], FiniteDuration)]
|
||||
timings.forEach { (task, timing) =>
|
||||
if (timing.isActive) result += task -> (now - timing.startNanos).nanos
|
||||
}
|
||||
result.result
|
||||
}
|
||||
|
||||
override def afterRegistered(
|
||||
task: Task[_],
|
||||
|
|
@ -38,15 +82,17 @@ private[sbt] abstract class AbstractTaskExecuteProgress extends ExecuteProgress[
|
|||
|
||||
override def beforeWork(task: Task[_]): Unit = {
|
||||
timings.put(task, new Timer)
|
||||
activeTasksMap.put(task, ())
|
||||
()
|
||||
}
|
||||
|
||||
protected def clearTimings: Boolean = false
|
||||
override def afterWork[A](task: Task[A], result: Either[Task[A], Result[A]]): Unit = {
|
||||
timings.get(task) match {
|
||||
case null =>
|
||||
case t => t.stop()
|
||||
}
|
||||
activeTasksMap.remove(task)
|
||||
if (clearTimings) timings.remove(task)
|
||||
else
|
||||
timings.get(task) match {
|
||||
case null =>
|
||||
case t => t.stop()
|
||||
}
|
||||
|
||||
// we need this to infer anonymous task names
|
||||
result.left.foreach { t =>
|
||||
|
|
@ -54,14 +100,14 @@ private[sbt] abstract class AbstractTaskExecuteProgress extends ExecuteProgress[
|
|||
}
|
||||
}
|
||||
|
||||
protected def reset(): Unit = {
|
||||
activeTasksMap.clear()
|
||||
timings.clear()
|
||||
private[this] val taskNameCache = new ConcurrentHashMap[Task[_], String]
|
||||
protected def taskName(t: Task[_]): String = taskNameCache.get(t) match {
|
||||
case null =>
|
||||
val name = taskName0(t)
|
||||
taskNameCache.putIfAbsent(t, name)
|
||||
name
|
||||
case name => name
|
||||
}
|
||||
|
||||
private[this] val taskNameCache = TrieMap.empty[Task[_], String]
|
||||
protected def taskName(t: Task[_]): String =
|
||||
taskNameCache.getOrElseUpdate(t, taskName0(t))
|
||||
private[this] def taskName0(t: Task[_]): String = {
|
||||
def definedName(node: Task[_]): Option[String] =
|
||||
node.info.name orElse TaskName.transformNode(node).map(showScopedKey.show)
|
||||
|
|
@ -80,6 +126,7 @@ object AbstractTaskExecuteProgress {
|
|||
def stop(): Unit = {
|
||||
endNanos = System.nanoTime()
|
||||
}
|
||||
def isActive = endNanos == 0L
|
||||
def durationNanos: Long = endNanos - startNanos
|
||||
def startMicros: Long = (startNanos.toDouble / 1000).toLong
|
||||
def durationMicros: Long = (durationNanos.toDouble / 1000).toLong
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ package internal
|
|||
import java.util.Locale
|
||||
|
||||
import scala.util.control.NonFatal
|
||||
import scala.concurrent.duration._
|
||||
import sbt.internal.util.ConsoleAppender
|
||||
import sbt.internal.util.complete.SizeParser
|
||||
|
||||
|
|
|
|||
|
|
@ -9,99 +9,102 @@ package sbt
|
|||
package internal
|
||||
|
||||
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicReference }
|
||||
import java.util.concurrent.{ LinkedBlockingQueue, TimeUnit }
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import sbt.internal.util._
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object TaskProgress extends TaskProgress
|
||||
import java.util.concurrent.{ ConcurrentHashMap, Executors, TimeoutException }
|
||||
|
||||
/**
|
||||
* implements task progress display on the shell.
|
||||
*/
|
||||
private[sbt] class TaskProgress private ()
|
||||
private[sbt] class TaskProgress
|
||||
extends AbstractTaskExecuteProgress
|
||||
with ExecuteProgress[Task] {
|
||||
with ExecuteProgress[Task]
|
||||
with AutoCloseable {
|
||||
private[this] val lastTaskCount = new AtomicInteger(0)
|
||||
private[this] val currentProgressThread = new AtomicReference[Option[ProgressThread]](None)
|
||||
private[this] val sleepDuration = SysProp.supershellSleep.millis
|
||||
private[this] val threshold = 10.millis
|
||||
private[this] val tasks = new LinkedBlockingQueue[Task[_]]
|
||||
private[this] final class ProgressThread
|
||||
extends Thread("task-progress-report-thread")
|
||||
with AutoCloseable {
|
||||
private[this] val isClosed = new AtomicBoolean(false)
|
||||
private[this] val firstTime = new AtomicBoolean(true)
|
||||
private[this] val hasReported = new AtomicBoolean(false)
|
||||
private[this] def doReport(): Unit = { hasReported.set(true); report() }
|
||||
setDaemon(true)
|
||||
start()
|
||||
private def resetThread(): Unit =
|
||||
currentProgressThread.synchronized {
|
||||
currentProgressThread.getAndSet(None) match {
|
||||
case Some(t) if t != this => currentProgressThread.set(Some(t))
|
||||
case _ =>
|
||||
}
|
||||
}
|
||||
@tailrec override def run(): Unit = {
|
||||
if (!isClosed.get() && (!hasReported.get || active.nonEmpty)) {
|
||||
try {
|
||||
if (activeExceedingThreshold.nonEmpty) doReport()
|
||||
val duration =
|
||||
if (firstTime.compareAndSet(true, activeExceedingThreshold.isEmpty)) threshold
|
||||
else sleepDuration
|
||||
val limit = duration.fromNow
|
||||
while (Deadline.now < limit && !isClosed.get && active.nonEmpty) {
|
||||
var task = tasks.poll((limit - Deadline.now).toMillis, TimeUnit.MILLISECONDS)
|
||||
while (task != null) {
|
||||
if (containsSkipTasks(Vector(task)) || lastTaskCount.get == 0) doReport()
|
||||
task = tasks.poll
|
||||
tasks.clear()
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
case _: InterruptedException =>
|
||||
isClosed.set(true)
|
||||
// One last report after close in case the last one hadn't gone through yet.
|
||||
doReport()
|
||||
|
||||
}
|
||||
run()
|
||||
} else {
|
||||
resetThread()
|
||||
private[this] val reportLoop = new AtomicReference[AutoCloseable]
|
||||
private[this] val active = new ConcurrentHashMap[Task[_], AutoCloseable]
|
||||
private[this] val nextReport = new AtomicReference(Deadline.now)
|
||||
private[this] val scheduler =
|
||||
Executors.newSingleThreadScheduledExecutor(r => new Thread(r, "sbt-progress-report-scheduler"))
|
||||
private[this] val pending = new java.util.Vector[java.util.concurrent.Future[_]]
|
||||
private def schedule[R](duration: FiniteDuration, recurring: Boolean)(f: => R): AutoCloseable = {
|
||||
val cancelled = new AtomicBoolean(false)
|
||||
val runnable: Runnable = () => {
|
||||
if (!cancelled.get) {
|
||||
try Util.ignoreResult(f)
|
||||
catch { case _: InterruptedException => }
|
||||
}
|
||||
}
|
||||
|
||||
def addTask(task: Task[_]): Unit = tasks.put(task)
|
||||
|
||||
override def close(): Unit = {
|
||||
isClosed.set(true)
|
||||
interrupt()
|
||||
report()
|
||||
appendProgress(ProgressEvent("Info", Vector(), None, None, None))
|
||||
resetThread()
|
||||
val delay = duration.toMillis
|
||||
val future =
|
||||
if (recurring) scheduler.schedule(runnable, delay, TimeUnit.MILLISECONDS)
|
||||
else scheduler.scheduleAtFixedRate(runnable, delay, delay, TimeUnit.MILLISECONDS)
|
||||
pending.add(future)
|
||||
() => Util.ignoreResult(future.cancel(true))
|
||||
}
|
||||
private[this] val executor =
|
||||
Executors.newSingleThreadExecutor(r => new Thread(r, "sbt-task-progress-report-thread"))
|
||||
override def close(): Unit = {
|
||||
Option(reportLoop.get).foreach(_.close())
|
||||
pending.forEach(f => Util.ignoreResult(f.cancel(true)))
|
||||
pending.clear()
|
||||
scheduler.shutdownNow()
|
||||
executor.shutdownNow()
|
||||
if (!executor.awaitTermination(1, TimeUnit.SECONDS) ||
|
||||
!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
|
||||
throw new TimeoutException
|
||||
}
|
||||
}
|
||||
|
||||
override protected def clearTimings: Boolean = true
|
||||
override def initial(): Unit = ()
|
||||
|
||||
private[this] def doReport(): Unit = {
|
||||
val runnable: Runnable = () => {
|
||||
if (nextReport.get.isOverdue) {
|
||||
report()
|
||||
}
|
||||
}
|
||||
Util.ignoreResult(pending.add(executor.submit(runnable)))
|
||||
}
|
||||
override def beforeWork(task: Task[_]): Unit = {
|
||||
maybeStartThread()
|
||||
super.beforeWork(task)
|
||||
tasks.put(task)
|
||||
reportLoop.get match {
|
||||
case null =>
|
||||
val loop = schedule(sleepDuration, recurring = true)(doReport())
|
||||
reportLoop.getAndSet(loop) match {
|
||||
case null =>
|
||||
case l =>
|
||||
reportLoop.set(l)
|
||||
loop.close()
|
||||
}
|
||||
case s =>
|
||||
}
|
||||
}
|
||||
override def afterReady(task: Task[_]): Unit = maybeStartThread()
|
||||
|
||||
override def afterCompleted[A](task: Task[A], result: Result[A]): Unit = maybeStartThread()
|
||||
override def afterReady(task: Task[_]): Unit =
|
||||
Util.ignoreResult(active.put(task, schedule(threshold, recurring = false)(doReport())))
|
||||
override def stop(): Unit = {}
|
||||
|
||||
override def stop(): Unit = currentProgressThread.synchronized {
|
||||
currentProgressThread.getAndSet(None).foreach(_.close())
|
||||
}
|
||||
override def afterCompleted[A](task: Task[A], result: Result[A]): Unit =
|
||||
active.remove(task) match {
|
||||
case null =>
|
||||
case a =>
|
||||
a.close()
|
||||
if (exceededThreshold(task, threshold)) report()
|
||||
}
|
||||
|
||||
override def afterAllCompleted(results: RMap[Task, Result]): Unit = {
|
||||
reset()
|
||||
reportLoop.getAndSet(null) match {
|
||||
case null =>
|
||||
case l => l.close()
|
||||
}
|
||||
// send an empty progress report to clear out the previous report
|
||||
appendProgress(ProgressEvent("Info", Vector(), Some(lastTaskCount.get), None, None))
|
||||
}
|
||||
|
|
@ -117,51 +120,39 @@ private[sbt] class TaskProgress private ()
|
|||
"consoleQuick",
|
||||
"state"
|
||||
)
|
||||
private[this] def maybeStartThread(): Unit = {
|
||||
currentProgressThread.get() match {
|
||||
case None =>
|
||||
currentProgressThread.synchronized {
|
||||
currentProgressThread.get() match {
|
||||
case None => currentProgressThread.set(Some(new ProgressThread))
|
||||
case _ =>
|
||||
}
|
||||
}
|
||||
case _ =>
|
||||
}
|
||||
}
|
||||
private[this] def appendProgress(event: ProgressEvent): Unit =
|
||||
StandardMain.exchange.updateProgress(event)
|
||||
private[this] def active: Vector[Task[_]] = activeTasks.toVector.filterNot(Def.isDummy)
|
||||
private[this] def activeExceedingThreshold: Vector[(Task[_], Long)] = active.flatMap { task =>
|
||||
timings.get(task) match {
|
||||
case null => None
|
||||
case t =>
|
||||
val elapsed = t.currentElapsedMicros
|
||||
if (elapsed.micros > threshold) Some[(Task[_], Long)](task -> elapsed) else None
|
||||
private[this] def report(): Unit = {
|
||||
val currentTasks = timings(active.keySet, threshold.toMicros)
|
||||
val ltc = lastTaskCount.get
|
||||
if (currentTasks.nonEmpty || ltc != 0) {
|
||||
val currentTasksCount = currentTasks.size
|
||||
def event(tasks: Vector[(Task[_], Long)]): ProgressEvent = {
|
||||
if (tasks.nonEmpty) nextReport.set(Deadline.now + sleepDuration)
|
||||
val toWrite = tasks.sortBy(_._2)
|
||||
val distinct = new java.util.LinkedHashMap[String, ProgressItem]
|
||||
toWrite.foreach {
|
||||
case (task, elapsed) =>
|
||||
val name = taskName(task)
|
||||
distinct.put(name, ProgressItem(name, elapsed))
|
||||
}
|
||||
ProgressEvent(
|
||||
"Info",
|
||||
distinct.values.asScala.toVector,
|
||||
Some(ltc),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
Some(containsSkipTasks(active.keySet))
|
||||
)
|
||||
}
|
||||
lastTaskCount.set(currentTasksCount)
|
||||
appendProgress(event(currentTasks))
|
||||
}
|
||||
}
|
||||
private[this] def report(): Unit = {
|
||||
val currentTasks = activeExceedingThreshold
|
||||
val ltc = lastTaskCount.get
|
||||
val currentTasksCount = currentTasks.size
|
||||
def event(tasks: Vector[(Task[_], Long)]): ProgressEvent = ProgressEvent(
|
||||
"Info",
|
||||
tasks
|
||||
.map { case (task, elapsed) => ProgressItem(taskName(task), elapsed) }
|
||||
.sortBy(_.elapsedMicros),
|
||||
Some(ltc),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
Some(containsSkipTasks(active))
|
||||
)
|
||||
if (active.nonEmpty) maybeStartThread()
|
||||
lastTaskCount.set(currentTasksCount)
|
||||
appendProgress(event(currentTasks))
|
||||
}
|
||||
|
||||
private[this] def containsSkipTasks(tasks: Vector[Task[_]]): Boolean = {
|
||||
tasks.map(taskName).exists { n =>
|
||||
private[this] def containsSkipTasks(tasks: java.util.Set[Task[_]]): Boolean = {
|
||||
tasks.iterator.asScala.map(taskName).exists { n =>
|
||||
val shortName = n.lastIndexOf('/') match {
|
||||
case -1 => n
|
||||
case i =>
|
||||
|
|
|
|||
|
|
@ -31,7 +31,6 @@ private[sbt] final class TaskTimings(reportOnShutdown: Boolean, logger: Logger)
|
|||
override def log(level: Level.Value, message: => String): Unit =
|
||||
ConsoleOut.systemOut.println(message)
|
||||
})
|
||||
import AbstractTaskExecuteProgress.Timer
|
||||
private[this] var start = 0L
|
||||
private[this] val threshold = SysProp.taskTimingsThreshold
|
||||
private[this] val omitPaths = SysProp.taskTimingsOmitPaths
|
||||
|
|
@ -61,15 +60,12 @@ private[sbt] final class TaskTimings(reportOnShutdown: Boolean, logger: Logger)
|
|||
private[this] def report() = {
|
||||
val total = divide(System.nanoTime - start)
|
||||
logger.info(s"Total time: $total $unit")
|
||||
import collection.JavaConverters._
|
||||
def sumTimes(in: Seq[(Task[_], Timer)]) = in.map(_._2.durationNanos).sum
|
||||
val timingsByName = timings.asScala.toSeq.groupBy { case (t, _) => taskName(t) } mapValues (sumTimes)
|
||||
val times = timingsByName.toSeq
|
||||
.sortBy(_._2)
|
||||
.sortBy(_._2.get)
|
||||
.reverse
|
||||
.map {
|
||||
case (name, time) =>
|
||||
(if (omitPaths) reFilePath.replaceFirstIn(name, "") else name, divide(time))
|
||||
(if (omitPaths) reFilePath.replaceFirstIn(name, "") else name, divide(time.get))
|
||||
}
|
||||
.filter { _._2 > threshold }
|
||||
if (times.size > 0) {
|
||||
|
|
|
|||
|
|
@ -13,7 +13,6 @@ import java.nio.file.Files
|
|||
import sbt.internal.util.{ RMap, ConsoleOut }
|
||||
import sbt.io.IO
|
||||
import sbt.io.syntax._
|
||||
import scala.collection.JavaConverters._
|
||||
import sjsonnew.shaded.scalajson.ast.unsafe.JString
|
||||
import sjsonnew.support.scalajson.unsafe.CompactPrinter
|
||||
|
||||
|
|
@ -39,7 +38,7 @@ private[sbt] final class TaskTraceEvent
|
|||
ShutdownHooks.add(() => report())
|
||||
|
||||
private[this] def report() = {
|
||||
if (timings.asScala.nonEmpty) {
|
||||
if (anyTimings) {
|
||||
writeTraceEvent()
|
||||
}
|
||||
}
|
||||
|
|
@ -63,10 +62,10 @@ private[sbt] final class TaskTraceEvent
|
|||
CompactPrinter.print(new JString(name), sb)
|
||||
s"""{"name": ${sb.toString}, "cat": "$cat", "ph": "X", "ts": ${(t.startMicros)}, "dur": ${(t.durationMicros)}, "pid": 0, "tid": ${t.threadId}}"""
|
||||
}
|
||||
val entryIterator = timings.entrySet().iterator()
|
||||
val entryIterator = currentTimings
|
||||
while (entryIterator.hasNext) {
|
||||
val entry = entryIterator.next()
|
||||
trace.append(durationEvent(taskName(entry.getKey), "task", entry.getValue))
|
||||
val (key, value) = entryIterator.next()
|
||||
trace.append(durationEvent(taskName(key), "task", value))
|
||||
if (entryIterator.hasNext) trace.append(",")
|
||||
}
|
||||
trace.append("]}")
|
||||
|
|
|
|||
Loading…
Reference in New Issue