Merge pull request #5905 from eatkins/ci-hangs

Fix many instances of server test hangs
This commit is contained in:
Ethan Atkins 2020-09-28 08:02:24 -07:00 committed by GitHub
commit 1b2404b287
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 146 additions and 77 deletions

View File

@ -30,6 +30,7 @@ matrix:
before_install:
- curl -sL https://raw.githubusercontent.com/shyiko/jabba/0.11.0/install.sh | bash && . ~/.jabba/jabba.sh
- if [ $SBT_LOCAL == true ]; then sbt -Dsbt.io.virtual=false publishLocalBin; fi
- rm -r $(find $HOME/.sbt/boot -name "*-SNAPSHOT") || true
install:
- $JABBA_HOME/bin/jabba install $TRAVIS_JDK
@ -45,6 +46,7 @@ before_cache:
- find $HOME/.cache/coursier/v1 -name "ivydata-*.properties" -delete
- find $HOME/.ivy2 -name "ivydata-*.properties" -delete
- find $HOME/.sbt -name "*.lock" -delete
- rm -r $(find $HOME/.sbt/boot -name "*-SNAPSHOT") || true
cache:
directories:

View File

@ -1075,7 +1075,7 @@ lazy val sbtProj = (project in file("sbt"))
.configure(addSbtIO, addSbtCompilerBridge)
lazy val serverTestProj = (project in file("server-test"))
.dependsOn(sbtProj % "test->test", scriptedSbtReduxProj % "test->test")
.dependsOn(sbtProj % "compile->test", scriptedSbtReduxProj % "compile->test")
.settings(
testedBaseSettings,
crossScalaVersions := Seq(baseScalaVersion),
@ -1086,15 +1086,26 @@ lazy val serverTestProj = (project in file("server-test"))
Test / run / connectInput := true,
Test / run / outputStrategy := Some(StdoutOutput),
Test / run / fork := true,
Test / fork := true,
Test / javaOptions ++= {
val cp = (Test / fullClasspathAsJars).value.map(_.data).mkString(java.io.File.pathSeparator)
List(
s"-Dsbt.server.classpath=$cp",
s"-Dsbt.server.version=${version.value}",
s"-Dsbt.server.scala.version=${scalaVersion.value}",
s"-Dsbt.supershell=false",
)
Test / sourceGenerators += Def.task {
val rawClasspath =
(Compile / fullClasspathAsJars).value.map(_.data).mkString(java.io.File.pathSeparator)
val cp =
if (scala.util.Properties.isWin) rawClasspath.replaceAllLiterally("\\", "\\\\")
else rawClasspath
val content = {
s"""|
|package testpkg
|
|object TestProperties {
| val classpath = "$cp"
| val version = "${version.value}"
| val scalaVersion = "${scalaVersion.value}"
|}
""".stripMargin
}
val file = (Test / target).value / "generated" / "src" / "test" / "scala" / "testpkg" / "TestProperties.scala"
IO.write(file, content)
file :: Nil
},
)

View File

@ -306,10 +306,10 @@ object Terminal {
}
}
private[sbt] lazy val formatEnabledInEnv: Boolean = logFormatEnabled.getOrElse(useColorDefault)
private[this] val hasConsole = Option(java.lang.System.console).isDefined
private[this] def useColorDefault: Boolean = {
// This approximates that both stdin and stdio are connected,
// so by default color will be turned off for pipes and redirects.
val hasConsole = Option(java.lang.System.console).isDefined
props.map(_.color).orElse(isColorEnabledProp).getOrElse(hasConsole)
}
private[this] lazy val isColorEnabledProp: Option[Boolean] =
@ -779,7 +779,7 @@ object Terminal {
private[util] val system: org.jline.terminal.Terminal,
) extends TerminalImpl(in, out, originalErr, "console0") {
private[this] val rawMode = new AtomicBoolean(false)
enterRawMode()
if (hasConsole) enterRawMode()
override private[sbt] def getSizeImpl: (Int, Int) = {
val size = system.getSize
(size.getColumns, size.getRows)
@ -811,16 +811,22 @@ object Terminal {
override private[sbt] def setSize(width: Int, height: Int): Unit =
system.setSize(new org.jline.terminal.Size(width, height))
override private[sbt] def enterRawMode(): Unit = if (rawMode.compareAndSet(false, true)) {
in.setRawMode(true)
try JLine3.enterRawMode(system)
catch { case _: java.io.IOError => }
}
override private[sbt] def exitRawMode(): Unit = if (rawMode.compareAndSet(true, false)) {
in.setRawMode(false)
try JLine3.exitRawMode(system)
catch { case _: java.io.IOError => }
override def inputStream: InputStream = {
if (hasConsole) in else BlockingInputStream
}
override private[sbt] def enterRawMode(): Unit =
if (rawMode.compareAndSet(false, true) && hasConsole) {
in.setRawMode(true)
try JLine3.enterRawMode(system)
catch { case _: java.io.IOError => }
}
override private[sbt] def exitRawMode(): Unit =
if (rawMode.compareAndSet(true, false) && hasConsole) {
in.setRawMode(false)
try JLine3.exitRawMode(system)
catch { case _: java.io.IOError => }
}
override def isColorEnabled: Boolean =
props
.map(_.color)
@ -970,8 +976,17 @@ object Terminal {
}
private[sbt] object NullTerminal extends DefaultTerminal
private[sbt] object SimpleTerminal extends DefaultTerminal {
override lazy val inputStream: InputStream = originalIn
override lazy val inputStream: InputStream =
if (isCI) BlockingInputStream
else originalIn
override lazy val outputStream: OutputStream = originalOut
override lazy val errorStream: OutputStream = originalErr
}
private[this] object BlockingInputStream extends SimpleInputStream {
override def read(): Int = {
try this.synchronized(this.wait)
catch { case _: InterruptedException => }
-1
}
}
}

View File

@ -445,6 +445,7 @@ object State {
s.fail
}
private[sbt] def logFullException(e: Throwable, log: Logger): Unit = {
e.printStackTrace(System.err)
log.trace(e)
log.error(ErrorHandling reducedToString e)
log.error("Use 'last' for the full log.")

View File

@ -59,6 +59,7 @@ import Serialization.{
setTerminalAttributes,
}
import NetworkClient.Arguments
import java.util.concurrent.TimeoutException
trait ConsoleInterface {
def appendLog(level: Level.Value, message: => String): Unit
@ -799,7 +800,10 @@ class NetworkClient(
val json = s"""{"query":"$query","level":1}"""
val execId = sendJson("sbt/completion", json)
pendingCompletions.put(execId, result.put)
val response = result.take
val response = result.poll(30, TimeUnit.SECONDS) match {
case null => throw new TimeoutException("no response from server within 30 seconds")
case r => r
}
def fillCompletions(label: String, regex: String, command: String): Seq[String] = {
def updateCompletions(): Seq[String] = {
errorStream.println()
@ -915,7 +919,9 @@ class NetworkClient(
if (mainThread != null && mainThread != Thread.currentThread) mainThread.interrupt
connectionHolder.get match {
case null =>
case c => c.shutdown()
case c =>
try sendExecCommand("exit")
finally c.shutdown()
}
Option(inputThread.get).foreach(_.interrupt())
} catch {

View File

@ -428,7 +428,7 @@ object EvaluateTask {
triggers: Triggers[Task],
config: EvaluateTaskConfig
)(implicit taskToNode: NodeView[Task]): (State, Result[T]) = {
import ConcurrentRestrictions.{ completionService, tagged, tagsKey }
import ConcurrentRestrictions.{ cancellableCompletionService, tagged, tagsKey }
val log = state.log
log.debug(
@ -439,15 +439,15 @@ object EvaluateTask {
val tags =
tagged[Task[_]](tagMap, Tags.predicate(config.restrictions))
val (service, shutdownThreads) =
completionService[Task[_], Completed](
cancellableCompletionService[Task[_], Completed](
tags,
(s: String) => log.warn(s),
(t: Task[_]) => tagMap(t).contains(Tags.Sentinel)
)
def shutdown(): Unit = {
def shutdownImpl(force: Boolean): Unit = {
// First ensure that all threads are stopped for task execution.
shutdownThreads()
shutdownThreads(force)
config.progressReporter.stop()
// Now we run the gc cleanup to force finalizers to clear out file handles (yay GC!)
@ -455,6 +455,7 @@ object EvaluateTask {
GCUtil.forceGcWithInterval(config.minForcegcInterval, log)
}
}
def shutdown(): Unit = shutdownImpl(false)
// propagate the defining key for reporting the origin
def overwriteNode(i: Incomplete): Boolean = i.node match {
case Some(t: Task[_]) => transformNode(t).isEmpty
@ -482,7 +483,7 @@ object EvaluateTask {
log.warn("Canceling execution...")
RunningProcesses.killAll()
ConcurrentRestrictions.cancelAll()
shutdown()
shutdownImpl(true)
}
}
currentlyRunningEngine.set((SafeState(state), runningEngine))

View File

@ -1041,19 +1041,17 @@ object BuiltinCommands {
.extract(s1)
.getOpt(Keys.minForcegcInterval)
.getOrElse(GCUtil.defaultMinForcegcInterval)
try {
val exec: Exec = getExec(s1, minGCInterval)
val newState = s1
.copy(
onFailure = Some(Exec(Shell, None)),
remainingCommands = exec +: Exec(Shell, None) +: s1.remainingCommands
)
.setInteractive(true)
val res =
if (exec.commandLine.trim.isEmpty) newState
else newState.clearGlobalLog
res
} catch { case _: InterruptedException => s1.exit(true) }
val exec: Exec = getExec(s1, minGCInterval)
val newState = s1
.copy(
onFailure = Some(Exec(Shell, None)),
remainingCommands = exec +: Exec(Shell, None) +: s1.remainingCommands
)
.setInteractive(true)
val res =
if (exec.commandLine.trim.isEmpty) newState
else newState.clearGlobalLog
res
}
}

View File

@ -91,7 +91,7 @@ private[sbt] final class CommandExchange {
case s @ Seq(_, _) => Some(s.min)
case s => s.headOption
}
Option(deadline match {
try Option(deadline match {
case Some(d: Deadline) =>
commandQueue.poll(d.timeLeft.toMillis + 1, TimeUnit.MILLISECONDS) match {
case null if idleDeadline.fold(false)(_.isOverdue) =>
@ -106,6 +106,7 @@ private[sbt] final class CommandExchange {
}
case _ => commandQueue.take
})
catch { case _: InterruptedException => None }
}
poll match {
case Some(exec) if exec.source.fold(true)(s => channels.exists(_.name == s.channelName)) =>

View File

@ -37,10 +37,10 @@ object RunFromSourceMain {
): Process = {
val fo = fo0
.withWorkingDirectory(workingDirectory)
.withRunJVMOptions(sys.props.get("sbt.ivy.home") match {
.withRunJVMOptions((sys.props.get("sbt.ivy.home") match {
case Some(home) => Vector(s"-Dsbt.ivy.home=$home")
case _ => Vector()
})
}) ++ fo0.runJVMOptions)
implicit val runner = new ForkRun(fo)
val options =
Vector(workingDirectory.toString, scalaVersion, sbtVersion, cp.mkString(pathSeparator))

View File

@ -8,6 +8,7 @@
package testpkg
import java.io.{ InputStream, OutputStream, PrintStream }
import java.util.concurrent.{ LinkedBlockingQueue, TimeUnit, TimeoutException }
import sbt.internal.client.NetworkClient
import sbt.internal.util.Util
import scala.collection.mutable
@ -41,27 +42,47 @@ object ClientTest extends AbstractServerTest {
} else -1
}
}
private def client(args: String*) =
NetworkClient.client(
testPath.toFile,
args.toArray,
NullInputStream,
NullPrintStream,
NullPrintStream,
false
private[this] def background[R](f: => R): R = {
val result = new LinkedBlockingQueue[R]
val thread = new Thread("client-bg-thread") {
setDaemon(true)
start()
override def run(): Unit = result.put(f)
}
result.poll(1, TimeUnit.MINUTES) match {
case null =>
thread.interrupt()
thread.join(5000)
throw new TimeoutException
case r => r
}
}
private def client(args: String*): Int = {
background(
NetworkClient.client(
testPath.toFile,
args.toArray,
NullInputStream,
NullPrintStream,
NullPrintStream,
false
)
)
}
// This ensures that the completion command will send a tab that triggers
// sbt to call definedTestNames or discoveredMainClasses if there hasn't
// been a necessary compilation
def tabs = new FixedInputStream('\t', '\t')
private def complete(completionString: String): Seq[String] = {
val cps = new CachingPrintStream
NetworkClient.complete(
testPath.toFile,
Array(s"--completions=sbtn $completionString"),
false,
tabs,
cps
background(
NetworkClient.complete(
testPath.toFile,
Array(s"--completions=sbtn $completionString"),
false,
tabs,
cps
)
)
cps.lines
}

View File

@ -54,6 +54,9 @@ object EventsTest extends AbstractServerTest {
svr.sendJsonRpc(
s"""{ "jsonrpc": "2.0", "id":$id, "method": "sbt/exec", "params": { "commandLine": "run" } }"""
)
assert(svr.waitForString(10.seconds) { s =>
s contains "Compiled events"
})
assert(svr.waitForString(10.seconds) { s =>
s contains "Waiting for"
})
@ -66,13 +69,15 @@ object EventsTest extends AbstractServerTest {
})
}
/* This test is timing out.
test("cancel on-going task with string id") { _ =>
import sbt.Exec
val id = Exec.newExecId
svr.sendJsonRpc(
s"""{ "jsonrpc": "2.0", "id": "$id", "method": "sbt/exec", "params": { "commandLine": "run" } }"""
)
assert(svr.waitForString(10.seconds) { s =>
s contains "Compiled events"
})
assert(svr.waitForString(10.seconds) { s =>
s contains "Waiting for"
})
@ -84,5 +89,4 @@ object EventsTest extends AbstractServerTest {
s contains """"result":{"status":"Task cancelled""""
})
}
*/
}

View File

@ -14,7 +14,7 @@ import java.util.concurrent.{ LinkedBlockingQueue, TimeUnit }
import java.util.concurrent.atomic.AtomicBoolean
import verify._
import sbt.RunFromSourceMain
import sbt.{ ForkOptions, OutputStrategy, RunFromSourceMain }
import sbt.io.IO
import sbt.io.syntax._
import sbt.protocol.ClientSocket
@ -43,18 +43,9 @@ trait AbstractServerTest extends TestSuite[Unit] {
"server-test"
)
temp = base.toFile
val classpath = sys.props.get("sbt.server.classpath") match {
case Some(s: String) => s.split(java.io.File.pathSeparator).map(file)
case _ => throw new IllegalStateException("No server classpath was specified.")
}
val sbtVersion = sys.props.get("sbt.server.version") match {
case Some(v: String) => v
case _ => throw new IllegalStateException("No server version was specified.")
}
val scalaVersion = sys.props.get("sbt.server.scala.version") match {
case Some(v: String) => v
case _ => throw new IllegalStateException("No server scala version was specified.")
}
val classpath = TestProperties.classpath.split(File.pathSeparator).map(new File(_))
val sbtVersion = TestProperties.version
val scalaVersion = TestProperties.scalaVersion
svr = TestServer.get(testDirectory, scalaVersion, sbtVersion, classpath, temp)
}
override def tearDownSuite(): Unit = {
@ -169,7 +160,12 @@ case class TestServer(
import TestServer.hostLog
hostLog("fork to a new sbt instance")
val process = RunFromSourceMain.fork(baseDirectory, scalaVersion, sbtVersion, classpath)
val forkOptions =
ForkOptions()
.withOutputStrategy(OutputStrategy.StdoutOutput)
.withRunJVMOptions(Vector("-Dsbt.ci=true", "-Dsbt.io.virtual=false"))
val process =
RunFromSourceMain.fork(forkOptions, baseDirectory, scalaVersion, sbtVersion, classpath)
lazy val portfile = baseDirectory / "project" / "target" / "active.json"

View File

@ -157,7 +157,7 @@ object ConcurrentRestrictions {
new Thread(r, s"sbt-completion-service-pool-$id-${i.getAndIncrement()}")
}
val service = completionService[A, R](pool, tags, warn)
(service, () => { service.close(); pool.shutdownNow(); () })
(service, () => { pool.shutdownNow(); () })
}
def completionService[A, R](
@ -168,7 +168,20 @@ object ConcurrentRestrictions {
val pool = Executors.newCachedThreadPool()
val service = completionService[A, R](pool, tags, warn, isSentinel)
(service, () => {
service.close()
pool.shutdownNow()
()
})
}
def cancellableCompletionService[A, R](
tags: ConcurrentRestrictions[A],
warn: String => Unit,
isSentinel: A => Boolean
): (CompletionService[A, R], Boolean => Unit) = {
val pool = Executors.newCachedThreadPool()
val service = completionService[A, R](pool, tags, warn, isSentinel)
(service, force => {
if (force) service.close()
pool.shutdownNow()
()
})