From 0a06bfe2d5e87b697908452754468e362c183c2b Mon Sep 17 00:00:00 2001 From: Ethan Atkins Date: Mon, 4 May 2020 17:46:26 -0700 Subject: [PATCH 1/2] Optimize TaskProgress.containsSkipTasks I was surprised to find this method in a flamegraph* so I optimized it. TaskProgress was actually on the hotpath of task evaluation so every single task was slower to be enqueued with the CompletionService. After this change, containsSkipTasks dropped out of the flamegraph. *The flamegraph was for a compile loop where sbt constantly modified a single source file and re-compiled it. The containsSkipTasks method appeared in over 2% of the method calls. --- .../src/main/scala/sbt/internal/TaskProgress.scala | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/main/src/main/scala/sbt/internal/TaskProgress.scala b/main/src/main/scala/sbt/internal/TaskProgress.scala index c45878ecc..01819493d 100644 --- a/main/src/main/scala/sbt/internal/TaskProgress.scala +++ b/main/src/main/scala/sbt/internal/TaskProgress.scala @@ -128,6 +128,16 @@ private[sbt] final class TaskProgress(log: ManagedLogger) } } - private[this] def containsSkipTasks(tasks: Vector[Task[_]]): Boolean = - tasks.map(taskName).exists(n => skipReportTasks.exists(m => m == n || n.endsWith("/ " + m))) + private[this] def containsSkipTasks(tasks: Vector[Task[_]]): Boolean = { + tasks.map(taskName).exists { n => + val shortName = n.lastIndexOf('/') match { + case -1 => n + case i => + var j = i + 1 + while (n(j) == ' ') j += 1 + n.substring(j) + } + skipReportTasks.contains(shortName) + } + } } From 9b2bbdd4cc60c341aec7f5e3d84feb0b1a576aa3 Mon Sep 17 00:00:00 2001 From: Ethan Atkins Date: Mon, 4 May 2020 20:13:41 -0700 Subject: [PATCH 2/2] Report progress on background thread Having the progress reports directly generated in beforeWork delays the task from being submitted to the executor. This commit moves all of the reporting onto the background thread to avoid these delays since progress is less important than task evaluation. --- .../scala/sbt/internal/TaskProgress.scala | 25 ++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/main/src/main/scala/sbt/internal/TaskProgress.scala b/main/src/main/scala/sbt/internal/TaskProgress.scala index 01819493d..1f89eaf80 100644 --- a/main/src/main/scala/sbt/internal/TaskProgress.scala +++ b/main/src/main/scala/sbt/internal/TaskProgress.scala @@ -9,6 +9,7 @@ package sbt package internal import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicReference } +import java.util.concurrent.{ LinkedBlockingQueue, TimeUnit } import sbt.internal.util._ import sbt.util.Level @@ -31,6 +32,7 @@ private[sbt] final class TaskProgress(log: ManagedLogger) with AutoCloseable { private[this] val isClosed = new AtomicBoolean(false) private[this] val firstTime = new AtomicBoolean(true) + private[this] val tasks = new LinkedBlockingQueue[Task[_]] setDaemon(true) start() @tailrec override def run(): Unit = { @@ -40,12 +42,26 @@ private[sbt] final class TaskProgress(log: ManagedLogger) val duration = if (firstTime.compareAndSet(true, activeExceedingThreshold.nonEmpty)) threshold else sleepDuration - Thread.sleep(duration.toMillis) - } catch { case _: InterruptedException => isClosed.set(true) } + val limit = duration.fromNow + while (Deadline.now < limit) { + var task = tasks.poll((limit - Deadline.now).toMillis, TimeUnit.MILLISECONDS) + while (task != null) { + if (containsSkipTasks(Vector(task)) || lastTaskCount.get == 0) report() + task = tasks.poll + } + } + } catch { + case _: InterruptedException => + isClosed.set(true) + // One last report after close in case the last one hadn't gone through yet. + report() + } run() } } + def addTask(task: Task[_]): Unit = tasks.put(task) + override def close(): Unit = { isClosed.set(true) interrupt() @@ -56,7 +72,10 @@ private[sbt] final class TaskProgress(log: ManagedLogger) override def beforeWork(task: Task[_]): Unit = { super.beforeWork(task) - if (containsSkipTasks(Vector(task)) || lastTaskCount.get == 0) report() + currentProgressThread.get match { + case Some(t) => t.addTask(task) + case _ => maybeStartThread() + } } override def afterReady(task: Task[_]): Unit = ()