Merge pull request #4227 from eed3si9n/wip/thin

Implement thin client
This commit is contained in:
eugene yokota 2018-06-27 15:27:42 -04:00 committed by GitHub
commit b0e025915a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 345 additions and 108 deletions

View File

@ -459,6 +459,9 @@ lazy val commandProj = (project in file("main-command"))
exclude[DirectMissingMethodProblem]("sbt.CommandSource.copy$default$*"),
exclude[DirectMissingMethodProblem]("sbt.Exec.copy"),
exclude[DirectMissingMethodProblem]("sbt.Exec.copy$default$*"),
// internal
exclude[ReversedMissingMethodProblem]("sbt.internal.client.ServerConnection.*"),
),
unmanagedSources in (Compile, headerCreate) := {
val old = (unmanagedSources in (Compile, headerCreate)).value

View File

@ -194,6 +194,8 @@ $AliasCommand name=
def Client = "client"
def ClientDetailed = "Provides an interactive prompt from which commands can be run on a server."
def DashClient = "-client"
def DashDashClient = "--client"
def StashOnFailure = "sbtStashOnFailure"
def PopOnFailure = "sbtPopOnFailure"

View File

@ -297,7 +297,7 @@ object BasicCommands {
case e :: Nil if e.commandLine == "shell" => Nil
case xs => xs map (_.commandLine)
})
NetworkClient.run(arguments)
NetworkClient.run(s0.configuration, arguments)
"exit" :: s0.copy(remainingCommands = Nil)
}

View File

@ -15,6 +15,7 @@ import sbt.internal.util.complete.Parser
import sbt.internal.util.complete.DefaultParsers._
import sbt.io.IO
import sbt.io.syntax._
object CommandUtil {
def readLines(files: Seq[File]): Seq[String] =
@ -89,4 +90,7 @@ object CommandUtil {
details.map { case (k, v) => k + "\n\n " + v } mkString ("\n", "\n\n", "\n")
final val HelpPatternFlags = Pattern.CASE_INSENSITIVE | Pattern.UNICODE_CASE
private[sbt] def isSbtBuild(baseDir: File) =
(baseDir / "project").exists() || (baseDir * "*.sbt").get.nonEmpty
}

View File

@ -9,17 +9,23 @@ package sbt
package internal
package client
import java.io.IOException
import java.net.{ URI, Socket, InetAddress }
import java.io.{ File, IOException }
import java.util.UUID
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference }
import scala.collection.mutable.ListBuffer
import scala.util.control.NonFatal
import scala.util.{ Success, Failure }
import scala.sys.process.{ BasicIO, Process, ProcessLogger }
import sbt.protocol._
import sbt.internal.util.{ JLine, StringEvent, ConsoleAppender }
import sbt.internal.protocol._
import sbt.internal.langserver.{ LogMessageParams, MessageType, PublishDiagnosticsParams }
import sbt.internal.util.{ JLine, ConsoleAppender }
import sbt.util.Level
import sbt.io.syntax._
import sbt.io.IO
import sjsonnew.support.scalajson.unsafe.Converter
class NetworkClient(arguments: List[String]) { self =>
class NetworkClient(configuration: xsbti.AppConfiguration, arguments: List[String]) { self =>
private val channelName = new AtomicReference("_")
private val status = new AtomicReference("Ready")
private val lock: AnyRef = new AnyRef {}
@ -27,78 +33,198 @@ class NetworkClient(arguments: List[String]) { self =>
private val pendingExecIds = ListBuffer.empty[String]
private val console = ConsoleAppender("thin1")
private def baseDirectory: File = configuration.baseDirectory
lazy val connection = init()
def usageError = sys.error("Expecting: sbt client 127.0.0.1:port")
val connection = init()
start()
// Open server connection based on the portfile
def init(): ServerConnection = {
val u = arguments match {
case List(x) =>
if (x contains "://") new URI(x)
else new URI("tcp://" + x)
case _ => usageError
val portfile = baseDirectory / "project" / "target" / "active.json"
if (!portfile.exists) {
forkServer(portfile)
}
val host = Option(u.getHost) match {
case None => usageError
case Some(x) => x
}
val port = Option(u.getPort) match {
case None => usageError
case Some(x) if x == -1 => usageError
case Some(x) => x
}
println(s"client on port $port")
val socket = new Socket(InetAddress.getByName(host), port)
new ServerConnection(socket) {
override def onEvent(event: EventMessage): Unit = self.onEvent(event)
override def onLogEntry(event: StringEvent): Unit = self.onLogEntry(event)
val (sk, tkn) = ClientSocket.socket(portfile)
val conn = new ServerConnection(sk) {
override def onNotification(msg: JsonRpcNotificationMessage): Unit = self.onNotification(msg)
override def onRequest(msg: JsonRpcRequestMessage): Unit = self.onRequest(msg)
override def onResponse(msg: JsonRpcResponseMessage): Unit = self.onResponse(msg)
override def onShutdown(): Unit = {
running.set(false)
}
}
// initiate handshake
val execId = UUID.randomUUID.toString
val initCommand = InitCommand(tkn, Option(execId))
conn.sendString(Serialization.serializeCommandAsJsonMessage(initCommand))
conn
}
def onLogEntry(event: StringEvent): Unit = {
val level = event.level match {
case "debug" => Level.Debug
case "info" => Level.Info
case "warn" => Level.Warn
case "error" => Level.Error
/**
* Forks another instance of sbt in the background.
* This instance must be shutdown explicitly via `sbt -client shutdown`
*/
def forkServer(portfile: File): Unit = {
console.appendLog(Level.Info, "server was not detected. starting an instance")
val args = List[String]()
val launchOpts = List("-Xms2048M", "-Xmx2048M", "-Xss2M")
val launcherJarString = sys.props.get("java.class.path") match {
case Some(cp) =>
cp.split(File.pathSeparator)
.toList
.headOption
.getOrElse(sys.error("launcher JAR classpath not found"))
case _ => sys.error("property java.class.path expected")
}
console.appendLog(level, event.message)
}
def onEvent(event: EventMessage): Unit =
event match {
case e: ChannelAcceptedEvent =>
channelName.set(e.channelName)
println(event)
case e: ExecStatusEvent =>
status.set(e.status)
// println(event)
e.execId foreach { execId =>
if (e.status == "Done" && (pendingExecIds contains execId)) {
lock.synchronized {
pendingExecIds -= execId
}
val cmd = "java" :: launchOpts ::: "-jar" :: launcherJarString :: args
// val cmd = "sbt"
val io = BasicIO(false, ProcessLogger(_ => ()))
val _ = Process(cmd, baseDirectory).run(io)
def waitForPortfile(n: Int): Unit =
if (portfile.exists) {
console.appendLog(Level.Info, "server found")
} else {
if (n <= 0) sys.error(s"timeout. $portfile is not found.")
else {
Thread.sleep(1000)
if ((n - 1) % 10 == 0) {
console.appendLog(Level.Info, "waiting for the server...")
}
waitForPortfile(n - 1)
}
case e => println(e.toString)
}
waitForPortfile(90)
}
/** Called on the response for a returning message. */
def onReturningReponse(msg: JsonRpcResponseMessage): Unit = {
def printResponse(): Unit = {
msg.result match {
case Some(result) =>
// ignore result JSON
console.success("completed")
case _ =>
msg.error match {
case Some(err) =>
// ignore err details
console.appendLog(Level.Error, "completed")
case _ => // ignore
}
}
}
printResponse()
}
def onResponse(msg: JsonRpcResponseMessage): Unit = {
msg.id foreach {
case execId if pendingExecIds contains execId =>
onReturningReponse(msg)
lock.synchronized {
pendingExecIds -= execId
}
case _ =>
}
}
def onNotification(msg: JsonRpcNotificationMessage): Unit = {
def splitToMessage: Vector[(Level.Value, String)] =
(msg.method, msg.params) match {
case ("window/logMessage", Some(json)) =>
import sbt.internal.langserver.codec.JsonProtocol._
Converter.fromJson[LogMessageParams](json) match {
case Success(params) => splitLogMessage(params)
case Failure(e) => Vector()
}
case ("textDocument/publishDiagnostics", Some(json)) =>
import sbt.internal.langserver.codec.JsonProtocol._
Converter.fromJson[PublishDiagnosticsParams](json) match {
case Success(params) => splitDiagnostics(params)
case Failure(e) => Vector()
}
case _ =>
Vector(
(
Level.Warn,
s"unknown event: ${msg.method} " + Serialization.compactPrintJsonOpt(msg.params)
)
)
}
splitToMessage foreach {
case (level, msg) => console.appendLog(level, msg)
}
}
def splitLogMessage(params: LogMessageParams): Vector[(Level.Value, String)] = {
val level = messageTypeToLevel(params.`type`)
if (level == Level.Debug) Vector()
else Vector((level, params.message))
}
def messageTypeToLevel(severity: Long): Level.Value = {
severity match {
case MessageType.Error => Level.Error
case MessageType.Warning => Level.Warn
case MessageType.Info => Level.Info
case MessageType.Log => Level.Debug
}
}
def splitDiagnostics(params: PublishDiagnosticsParams): Vector[(Level.Value, String)] = {
val uri = new URI(params.uri)
val f = IO.toFile(uri)
params.diagnostics map { d =>
val level = d.severity match {
case Some(severity) => messageTypeToLevel(severity)
case _ => Level.Error
}
val line = d.range.start.line + 1
val offset = d.range.start.character + 1
val msg = s"$f:$line:$offset: ${d.message}"
(level, msg)
}
}
def onRequest(msg: JsonRpcRequestMessage): Unit = {
// ignore
}
def start(): Unit = {
console.appendLog(Level.Info, "entering *experimental* thin client - BEEP WHIRR")
val _ = connection
val userCommands = arguments filterNot { cmd =>
cmd.startsWith("-")
}
if (userCommands.isEmpty) shell()
else batchExecute(userCommands)
}
def batchExecute(userCommands: List[String]): Unit = {
userCommands foreach { cmd =>
println("> " + cmd)
val execId =
if (cmd == "shutdown") sendExecCommand("exit")
else sendExecCommand(cmd)
while (pendingExecIds contains execId) {
Thread.sleep(100)
}
}
}
def shell(): Unit = {
val reader = JLine.simple(None, JLine.HandleCONT, injectThreadSleep = true)
while (running.get) {
reader.readLine("> ", None) match {
case Some("shutdown") =>
// `sbt -client shutdown` shuts down the server
sendExecCommand("exit")
Thread.sleep(100)
running.set(false)
case Some("exit") =>
running.set(false)
case Some(s) =>
val execId = UUID.randomUUID.toString
publishCommand(ExecCommand(s, execId))
lock.synchronized {
pendingExecIds += execId
}
case Some(s) if s.trim.nonEmpty =>
val execId = sendExecCommand(s)
while (pendingExecIds contains execId) {
Thread.sleep(100)
}
@ -107,10 +233,19 @@ class NetworkClient(arguments: List[String]) { self =>
}
}
def publishCommand(command: CommandMessage): Unit = {
val bytes = Serialization.serializeCommand(command)
def sendExecCommand(commandLine: String): String = {
val execId = UUID.randomUUID.toString
sendCommand(ExecCommand(commandLine, execId))
lock.synchronized {
pendingExecIds += execId
}
execId
}
def sendCommand(command: CommandMessage): Unit = {
try {
connection.publish(bytes)
val s = Serialization.serializeCommandAsJsonMessage(command)
connection.sendString(s)
} catch {
case _: IOException =>
// log.debug(e.getMessage)
@ -123,9 +258,9 @@ class NetworkClient(arguments: List[String]) { self =>
}
object NetworkClient {
def run(arguments: List[String]): Unit =
def run(configuration: xsbti.AppConfiguration, arguments: List[String]): Unit =
try {
new NetworkClient(arguments)
new NetworkClient(configuration, arguments)
()
} catch {
case NonFatal(e) => println(e.getMessage)

View File

@ -12,11 +12,12 @@ package client
import java.net.{ SocketTimeoutException, Socket }
import java.util.concurrent.atomic.AtomicBoolean
import sbt.protocol._
import sbt.internal.util.StringEvent
import sbt.internal.protocol._
abstract class ServerConnection(connection: Socket) {
private val running = new AtomicBoolean(true)
private val retByte: Byte = '\r'.toByte
private val delimiter: Byte = '\n'.toByte
private val out = connection.getOutputStream
@ -28,32 +29,63 @@ abstract class ServerConnection(connection: Socket) {
val in = connection.getInputStream
connection.setSoTimeout(5000)
var buffer: Vector[Byte] = Vector.empty
var bytesRead = 0
while (bytesRead != -1 && running.get) {
try {
bytesRead = in.read(readBuffer)
buffer = buffer ++ readBuffer.toVector.take(bytesRead)
// handle un-framing
var delimPos = buffer.indexOf(delimiter)
while (delimPos > -1) {
val chunk = buffer.take(delimPos)
buffer = buffer.drop(delimPos + 1)
def readFrame: Array[Byte] = {
def getContentLength: Int = {
readLine.drop(16).toInt
}
val l = getContentLength
readLine
readLine
readContentLength(l)
}
Serialization
.deserializeEvent(chunk)
.fold(
{ errorDesc =>
val s = new String(chunk.toArray, "UTF-8")
println(s"Got invalid chunk from server: $s \n" + errorDesc)
},
_ match {
case event: EventMessage => onEvent(event)
case event: StringEvent => onLogEntry(event)
}
)
delimPos = buffer.indexOf(delimiter)
def readLine: String = {
if (buffer.isEmpty) {
val bytesRead = in.read(readBuffer)
if (bytesRead > 0) {
buffer = buffer ++ readBuffer.toVector.take(bytesRead)
}
}
val delimPos = buffer.indexOf(delimiter)
if (delimPos > 0) {
val chunk0 = buffer.take(delimPos)
buffer = buffer.drop(delimPos + 1)
// remove \r at the end of line.
val chunk1 = if (chunk0.lastOption contains retByte) chunk0.dropRight(1) else chunk0
new String(chunk1.toArray, "utf-8")
} else readLine
}
def readContentLength(length: Int): Array[Byte] = {
if (buffer.size < length) {
val bytesRead = in.read(readBuffer)
if (bytesRead > 0) {
buffer = buffer ++ readBuffer.toVector.take(bytesRead)
}
}
if (length <= buffer.size) {
val chunk = buffer.take(length)
buffer = buffer.drop(length)
chunk.toArray
} else readContentLength(length)
}
while (running.get) {
try {
val frame = readFrame
Serialization
.deserializeJsonMessage(frame)
.fold(
{ errorDesc =>
val s = new String(frame.toArray, "UTF-8")
println(s"Got invalid chunk from server: $s \n" + errorDesc)
},
_ match {
case msg: JsonRpcRequestMessage => onRequest(msg)
case msg: JsonRpcResponseMessage => onResponse(msg)
case msg: JsonRpcNotificationMessage => onNotification(msg)
}
)
} catch {
case _: SocketTimeoutException => // its ok
}
@ -65,14 +97,28 @@ abstract class ServerConnection(connection: Socket) {
}
thread.start()
def publish(command: Array[Byte]): Unit = {
out.write(command)
out.write(delimiter.toInt)
out.flush()
def sendString(message: String): Unit = {
val a = message.getBytes("UTF-8")
writeLine(s"""Content-Length: ${a.length + 2}""".getBytes("UTF-8"))
writeLine(Array())
writeLine(a)
}
def onEvent(event: EventMessage): Unit
def onLogEntry(event: StringEvent): Unit
def writeLine(a: Array[Byte]): Unit = {
def writeEndLine(): Unit = {
out.write(retByte.toInt)
out.write(delimiter.toInt)
out.flush
}
if (a.nonEmpty) {
out.write(a)
}
writeEndLine
}
def onRequest(msg: JsonRpcRequestMessage): Unit
def onResponse(msg: JsonRpcResponseMessage): Unit
def onNotification(msg: JsonRpcNotificationMessage): Unit
def onShutdown(): Unit

View File

@ -64,15 +64,30 @@ import CommandStrings.BootCommand
final class xMain extends xsbti.AppMain {
def run(configuration: xsbti.AppConfiguration): xsbti.MainResult = {
import BasicCommands.early
import BasicCommandStrings.runEarly
import BasicCommandStrings.{ runEarly, DashClient, DashDashClient }
import BuiltinCommands.defaults
import sbt.internal.CommandStrings.{ BootCommand, DefaultsCommand, InitCommand }
val state = StandardMain.initialState(
configuration,
Seq(defaults, early),
runEarly(DefaultsCommand) :: runEarly(InitCommand) :: BootCommand :: Nil
)
StandardMain.runManaged(state)
import sbt.internal.client.NetworkClient
// if we detect -Dsbt.client=true or -client, run thin client.
val clientModByEnv = java.lang.Boolean.getBoolean("sbt.client")
val userCommands = configuration.arguments.map(_.trim)
if (clientModByEnv || (userCommands.exists { cmd =>
(cmd == DashClient) || (cmd == DashDashClient)
})) {
val args = userCommands.toList filterNot { cmd =>
(cmd == DashClient) || (cmd == DashDashClient)
}
NetworkClient.run(configuration, args)
Exit(0)
} else {
val state = StandardMain.initialState(
configuration,
Seq(defaults, early),
runEarly(DefaultsCommand) :: runEarly(InitCommand) :: BootCommand :: Nil
)
StandardMain.runManaged(state)
}
}
}
@ -861,9 +876,6 @@ object BuiltinCommands {
private val sbtVersionRegex = """sbt\.version\s*=.*""".r
private def isSbtVersionLine(s: String) = sbtVersionRegex.pattern matcher s matches ()
private def isSbtProject(baseDir: File, projectDir: File) =
projectDir.exists() || (baseDir * "*.sbt").get.nonEmpty
private def writeSbtVersionUnconditionally(state: State) = {
val baseDir = state.baseDir
val sbtVersion = BuiltinCommands.sbtVersion(state)
@ -877,7 +889,7 @@ object BuiltinCommands {
if (sbtVersionAbsent) {
val warnMsg = s"No sbt.version set in project/build.properties, base directory: $baseDir"
try {
if (isSbtProject(baseDir, projectDir)) {
if (isSbtBuild(baseDir)) {
val line = s"sbt.version=$sbtVersion"
IO.writeLines(buildProps, line :: buildPropsLines)
state.log info s"Updated file $buildProps: set sbt.version to $sbtVersion"

View File

@ -12,6 +12,7 @@ import sjsonnew.{ JsonFormat, JsonWriter }
import sjsonnew.support.scalajson.unsafe.{ Parser, Converter, CompactPrinter }
import sjsonnew.shaded.scalajson.ast.unsafe.{ JValue, JObject, JString }
import java.nio.ByteBuffer
import java.util.UUID
import scala.util.{ Success, Failure }
import sbt.internal.util.StringEvent
import sbt.internal.protocol.{
@ -35,6 +36,33 @@ object Serialization {
CompactPrinter(json).getBytes("UTF-8")
}
private[sbt] def serializeCommandAsJsonMessage(command: CommandMessage): String = {
import sjsonnew.BasicJsonProtocol._
command match {
case x: InitCommand =>
val execId = x.execId.getOrElse(UUID.randomUUID.toString)
val opt = x.token match {
case Some(t) =>
val json: JValue = Converter.toJson[String](t).get
val v = CompactPrinter(json)
s"""{ "token": $v }"""
case None => "{}"
}
s"""{ "jsonrpc": "2.0", "id": "$execId", "method": "initialize", "params": { "initializationOptions": $opt } }"""
case x: ExecCommand =>
val execId = x.execId.getOrElse(UUID.randomUUID.toString)
val json: JValue = Converter.toJson[String](x.commandLine).get
val v = CompactPrinter(json)
s"""{ "jsonrpc": "2.0", "id": "$execId", "method": "sbt/exec", "params": { "commandLine": $v } }"""
case x: SettingQuery =>
val execId = UUID.randomUUID.toString
val json: JValue = Converter.toJson[String](x.setting).get
val v = CompactPrinter(json)
s"""{ "jsonrpc": "2.0", "id": "$execId", "method": "sbt/setting", "params": { "setting": $v } }"""
}
}
def serializeEventMessage(event: EventMessage): Array[Byte] = {
import codec.JsonProtocol._
val json: JValue = Converter.toJson[EventMessage](event).get
@ -141,18 +169,25 @@ object Serialization {
private[sbt] def deserializeJsonMessage(bytes: Seq[Byte]): Either[String, JsonRpcMessage] = {
val buffer = ByteBuffer.wrap(bytes.toArray)
Parser.parseFromByteBuffer(buffer) match {
case Success(json) =>
case Success(json @ JObject(fields)) =>
import sbt.internal.protocol.codec.JsonRPCProtocol._
Converter.fromJson[JsonRpcRequestMessage](json) match {
case Success(request) if (request.id.nonEmpty) => Right(request)
case Failure(e) => throw e
case _ => {
if ((fields find { _.field == "method" }).isDefined) {
if ((fields find { _.field == "id" }).isDefined)
Converter.fromJson[JsonRpcRequestMessage](json) match {
case Success(request) => Right(request)
case Failure(e) => Left(s"Conversion error: ${e.getMessage}")
} else
Converter.fromJson[JsonRpcNotificationMessage](json) match {
case Success(notification) => Right(notification)
case Failure(e) => throw e
case Failure(e) => Left(s"Conversion error: ${e.getMessage}")
}
} else
Converter.fromJson[JsonRpcResponseMessage](json) match {
case Success(res) => Right(res)
case Failure(e) => Left(s"Conversion error: ${e.getMessage}")
}
}
case Success(json) =>
Left(s"Expected JSON object but found $json")
case Failure(e) =>
Left(s"Parse error: ${e.getMessage}")
}