Report progress on background thread

Having the progress reports directly generated in beforeWork delays the
task from being submitted to the executor. This commit moves all of the
reporting onto the background thread to avoid these delays since
progress is less important than task evaluation.
This commit is contained in:
Ethan Atkins 2020-05-04 20:13:41 -07:00
parent 0a06bfe2d5
commit 9b2bbdd4cc
1 changed files with 22 additions and 3 deletions

View File

@ -9,6 +9,7 @@ package sbt
package internal
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicReference }
import java.util.concurrent.{ LinkedBlockingQueue, TimeUnit }
import sbt.internal.util._
import sbt.util.Level
@ -31,6 +32,7 @@ private[sbt] final class TaskProgress(log: ManagedLogger)
with AutoCloseable {
private[this] val isClosed = new AtomicBoolean(false)
private[this] val firstTime = new AtomicBoolean(true)
private[this] val tasks = new LinkedBlockingQueue[Task[_]]
setDaemon(true)
start()
@tailrec override def run(): Unit = {
@ -40,12 +42,26 @@ private[sbt] final class TaskProgress(log: ManagedLogger)
val duration =
if (firstTime.compareAndSet(true, activeExceedingThreshold.nonEmpty)) threshold
else sleepDuration
Thread.sleep(duration.toMillis)
} catch { case _: InterruptedException => isClosed.set(true) }
val limit = duration.fromNow
while (Deadline.now < limit) {
var task = tasks.poll((limit - Deadline.now).toMillis, TimeUnit.MILLISECONDS)
while (task != null) {
if (containsSkipTasks(Vector(task)) || lastTaskCount.get == 0) report()
task = tasks.poll
}
}
} catch {
case _: InterruptedException =>
isClosed.set(true)
// One last report after close in case the last one hadn't gone through yet.
report()
}
run()
}
}
def addTask(task: Task[_]): Unit = tasks.put(task)
override def close(): Unit = {
isClosed.set(true)
interrupt()
@ -56,7 +72,10 @@ private[sbt] final class TaskProgress(log: ManagedLogger)
override def beforeWork(task: Task[_]): Unit = {
super.beforeWork(task)
if (containsSkipTasks(Vector(task)) || lastTaskCount.get == 0) report()
currentProgressThread.get match {
case Some(t) => t.addTask(task)
case _ => maybeStartThread()
}
}
override def afterReady(task: Task[_]): Unit = ()