Start lightweight client

This is the beginning of a lightweight client, which talks to the
server over Contraband-generated JSON API. Given that the server is
started on port 5173:

```
$ cd /tmp/bogus
$ sbt client localhost:5173
> compile
StatusEvent(Processing, Vector(compile, server))
StatusEvent(Ready, Vector())
StatusEvent(Processing, Vector(, server))
StatusEvent(Ready, Vector())
```
This commit is contained in:
Eugene Yokota 2016-12-02 18:09:16 -05:00
parent 272e733b87
commit fa7253ece3
8 changed files with 224 additions and 17 deletions

View File

@ -152,6 +152,9 @@ object BasicCommandStrings {
def Server = "server" def Server = "server"
def ServerDetailed = "Provides a network server and an interactive prompt from which commands can be run." def ServerDetailed = "Provides a network server and an interactive prompt from which commands can be run."
def Client = "client"
def ClientDetailed = "Provides an interactive prompt from which commands can be run on a server."
def StashOnFailure = "sbtStashOnFailure" def StashOnFailure = "sbtStashOnFailure"
def PopOnFailure = "sbtPopOnFailure" def PopOnFailure = "sbtPopOnFailure"

View File

@ -7,6 +7,7 @@ import sbt.internal.util.Types.{ const, idFun }
import sbt.internal.inc.classpath.ClasspathUtilities.toLoader import sbt.internal.inc.classpath.ClasspathUtilities.toLoader
import sbt.internal.inc.ModuleUtilities import sbt.internal.inc.ModuleUtilities
import sbt.internal.{ Exec, CommandSource, CommandStatus } import sbt.internal.{ Exec, CommandSource, CommandStatus }
import sbt.internal.client.NetworkClient
import DefaultParsers._ import DefaultParsers._
import Function.tupled import Function.tupled
import Command.applyEffect import Command.applyEffect
@ -16,12 +17,11 @@ import BasicKeys._
import java.io.File import java.io.File
import sbt.io.IO import sbt.io.IO
import java.util.concurrent.atomic.AtomicBoolean
import scala.util.control.NonFatal import scala.util.control.NonFatal
object BasicCommands { object BasicCommands {
lazy val allBasicCommands = Seq(nop, ignore, help, completionsCommand, multi, ifLast, append, setOnFailure, clearOnFailure, stashOnFailure, popOnFailure, reboot, call, early, exit, continuous, history, shell, server, read, alias) ++ compatCommands lazy val allBasicCommands = Seq(nop, ignore, help, completionsCommand, multi, ifLast, append, setOnFailure, clearOnFailure,
stashOnFailure, popOnFailure, reboot, call, early, exit, continuous, history, shell, server, client, read, alias) ++ compatCommands
def nop = Command.custom(s => success(() => s)) def nop = Command.custom(s => success(() => s))
def ignore = Command.command(FailureWall)(idFun) def ignore = Command.command(FailureWall)(idFun)
@ -204,6 +204,21 @@ object BasicCommands {
else newState.clearGlobalLog else newState.clearGlobalLog
} }
def client = Command.make(Client, Help.more(Client, ClientDetailed))(clientParser)
def clientParser(s0: State) =
{
val p = (token(Space) ~> repsep(StringBasic, token(Space))) | (token(EOF) map { case _ => Nil })
applyEffect(p)({ inputArg =>
val arguments = inputArg.toList ++
(s0.remainingCommands.toList match {
case "shell" :: Nil => Nil
case xs => xs
})
NetworkClient.run(arguments)
"exit" :: s0.copy(remainingCommands = Nil)
})
}
def read = Command.make(ReadCommand, Help.more(ReadCommand, ReadDetailed))(s => applyEffect(readParser(s))(doRead(s))) def read = Command.make(ReadCommand, Help.more(ReadCommand, ReadDetailed))(s => applyEffect(readParser(s))(doRead(s)))
def readParser(s: State) = def readParser(s: State) =
{ {

View File

@ -0,0 +1,96 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package sbt
package internal
package client
import java.net.{ URI, Socket, InetAddress, SocketException }
import sbt.protocol._
import sbt.internal.util.JLine
import java.util.concurrent.atomic.AtomicBoolean
class NetworkClient(arguments: List[String]) {
private var status: String = "Ready"
private val lock: AnyRef = new AnyRef {}
private val running = new AtomicBoolean(true)
def usageError = sys.error("Expecting: sbt client 127.0.0.1:port")
val connection = init()
start()
def init(): ServerConnection = {
val u = arguments match {
case List(x) =>
if (x contains "://") new URI(x)
else new URI("tcp://" + x)
case _ => usageError
}
val host = Option(u.getHost) match {
case None => usageError
case Some(x) => x
}
val port = Option(u.getPort) match {
case None => usageError
case Some(x) if x == -1 => usageError
case Some(x) => x
}
println(s"client on port $port")
val socket = new Socket(InetAddress.getByName(host), port)
new ServerConnection(socket) {
override def onEvent(event: EventMessage): Unit =
event match {
case e: StatusEvent =>
lock.synchronized {
status = e.status
}
println(event)
case e => println(e.toString)
}
override def onShutdown: Unit =
{
running.set(false)
}
}
}
def start(): Unit =
{
val reader = JLine.simple(None, JLine.HandleCONT, injectThreadSleep = true)
while (running.get) {
reader.readLine("> ", None) match {
case Some("exit") =>
running.set(false)
case Some(s) =>
publishCommand(ExecCommand(s))
case _ => //
}
while (status != "Ready") {
Thread.sleep(100)
}
}
}
def publishCommand(command: CommandMessage): Unit =
{
val bytes = Serialization.serializeCommand(command)
try {
connection.publish(bytes)
} catch {
case e: SocketException =>
// log.debug(e.getMessage)
// toDel += client
}
lock.synchronized {
status = "Processing"
}
}
}
object NetworkClient {
def run(arguments: List[String]): Unit =
try {
new NetworkClient(arguments)
} catch {
case e: Exception => println(e.getMessage)
}
}

View File

@ -0,0 +1,73 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.typesafe.com>
*/
package sbt
package internal
package client
import java.net.{ SocketTimeoutException, Socket }
import java.util.concurrent.atomic.AtomicBoolean
import sbt.protocol._
abstract class ServerConnection(connection: Socket) {
private val running = new AtomicBoolean(true)
private val delimiter: Byte = '\n'.toByte
private val out = connection.getOutputStream
val thread = new Thread(s"sbt-serverconnection-${connection.getPort}") {
override def run(): Unit = {
try {
val readBuffer = new Array[Byte](4096)
val in = connection.getInputStream
connection.setSoTimeout(5000)
var buffer: Vector[Byte] = Vector.empty
var bytesRead = 0
while (bytesRead != -1 && running.get) {
try {
bytesRead = in.read(readBuffer)
buffer = buffer ++ readBuffer.toVector.take(bytesRead)
// handle un-framing
val delimPos = buffer.indexOf(delimiter)
if (delimPos > 0) {
val chunk = buffer.take(delimPos)
buffer = buffer.drop(delimPos + 1)
Serialization.deserializeEvent(chunk).fold({ errorDesc =>
val s = new String(chunk.toArray, "UTF-8")
println(s"Got invalid chunk from server: $s \n" + errorDesc)
},
onEvent
)
}
} catch {
case _: SocketTimeoutException => // its ok
}
}
} finally {
shutdown()
}
}
}
thread.start()
def publish(command: Array[Byte]): Unit = {
out.write(command)
out.write(delimiter.toInt)
out.flush()
}
def onEvent(event: EventMessage): Unit
def onShutdown: Unit
def shutdown(): Unit = {
println("Shutting down client connection")
running.set(false)
out.close()
onShutdown
}
}

View File

@ -16,7 +16,7 @@ abstract class ClientConnection(connection: Socket) {
private val out = connection.getOutputStream private val out = connection.getOutputStream
val thread = new Thread(s"sbt-client-${connection.getPort}") { val thread = new Thread(s"sbt-clientconnection-${connection.getPort}") {
override def run(): Unit = { override def run(): Unit = {
try { try {
val readBuffer = new Array[Byte](4096) val readBuffer = new Array[Byte](4096)
@ -27,16 +27,14 @@ abstract class ClientConnection(connection: Socket) {
while (bytesRead != -1 && running.get) { while (bytesRead != -1 && running.get) {
try { try {
bytesRead = in.read(readBuffer) bytesRead = in.read(readBuffer)
val bytes = readBuffer.toVector.take(bytesRead) buffer = buffer ++ readBuffer.toVector.take(bytesRead)
buffer = buffer ++ bytes
// handle un-framing // handle un-framing
val delimPos = bytes.indexOf(delimiter) val delimPos = buffer.indexOf(delimiter)
if (delimPos > 0) { if (delimPos > 0) {
val chunk = buffer.take(delimPos) val chunk = buffer.take(delimPos)
buffer = buffer.drop(delimPos + 1) buffer = buffer.drop(delimPos + 1)
Serialization.deserialize(chunk).fold( Serialization.deserializeCommand(chunk).fold(
errorDesc => println("Got invalid chunk from client: " + errorDesc), errorDesc => println("Got invalid chunk from client: " + errorDesc),
onCommand onCommand
) )
@ -56,7 +54,7 @@ abstract class ClientConnection(connection: Socket) {
def publish(event: Array[Byte]): Unit = { def publish(event: Array[Byte]): Unit = {
out.write(event) out.write(event)
out.write(delimiter) out.write(delimiter.toInt)
out.flush() out.flush()
} }

View File

@ -58,7 +58,7 @@ private[sbt] object Server {
/** Publish an event to all connected clients */ /** Publish an event to all connected clients */
def publish(event: EventMessage): Unit = { def publish(event: EventMessage): Unit = {
// TODO do not do this on the calling thread // TODO do not do this on the calling thread
val bytes = Serialization.serialize(event) val bytes = Serialization.serializeEvent(event)
lock.synchronized { lock.synchronized {
val toDel: mutable.ListBuffer[ClientConnection] = mutable.ListBuffer.empty val toDel: mutable.ListBuffer[ClientConnection] = mutable.ListBuffer.empty
clients.foreach { client => clients.foreach { client =>

View File

@ -115,7 +115,7 @@ object BuiltinCommands {
def DefaultCommands: Seq[Command] = Seq(ignore, help, completionsCommand, about, tasks, settingsCommand, loadProject, def DefaultCommands: Seq[Command] = Seq(ignore, help, completionsCommand, about, tasks, settingsCommand, loadProject,
projects, project, reboot, read, history, set, sessionCommand, inspect, loadProjectImpl, loadFailed, Cross.crossBuild, Cross.switchVersion, projects, project, reboot, read, history, set, sessionCommand, inspect, loadProjectImpl, loadFailed, Cross.crossBuild, Cross.switchVersion,
Cross.crossRestoreSession, setOnFailure, clearOnFailure, stashOnFailure, popOnFailure, setLogLevel, plugin, plugins, Cross.crossRestoreSession, setOnFailure, clearOnFailure, stashOnFailure, popOnFailure, setLogLevel, plugin, plugins,
ifLast, multi, shell, BasicCommands.server, continuous, eval, alias, append, last, lastGrep, export, boot, nop, call, exit, early, initialize, act) ++ ifLast, multi, shell, BasicCommands.server, BasicCommands.client, continuous, eval, alias, append, last, lastGrep, export, boot, nop, call, exit, early, initialize, act) ++
compatCommands compatCommands
def DefaultBootCommands: Seq[String] = LoadProject :: (IfLast + " " + Shell) :: Nil def DefaultBootCommands: Seq[String] = LoadProject :: (IfLast + " " + Shell) :: Nil

View File

@ -2,19 +2,23 @@
* Copyright (C) 2016 Lightbend Inc. <http://www.typesafe.com> * Copyright (C) 2016 Lightbend Inc. <http://www.typesafe.com>
*/ */
package sbt package sbt
package internal package protocol
package server
import sjsonnew.support.scalajson.unsafe.{ Converter, CompactPrinter } import sjsonnew.support.scalajson.unsafe.{ Converter, CompactPrinter }
import scala.json.ast.unsafe.JValue import scala.json.ast.unsafe.JValue
import sjsonnew.support.scalajson.unsafe.Parser import sjsonnew.support.scalajson.unsafe.Parser
import java.nio.ByteBuffer import java.nio.ByteBuffer
import scala.util.{ Success, Failure } import scala.util.{ Success, Failure }
import sbt.protocol._
object Serialization { object Serialization {
def serializeCommand(command: CommandMessage): Array[Byte] =
{
import codec.JsonProtocol._
val json: JValue = Converter.toJson[CommandMessage](command).get
CompactPrinter(json).getBytes("UTF-8")
}
def serialize(event: EventMessage): Array[Byte] = def serializeEvent(event: EventMessage): Array[Byte] =
{ {
import codec.JsonProtocol._ import codec.JsonProtocol._
val json: JValue = Converter.toJson[EventMessage](event).get val json: JValue = Converter.toJson[EventMessage](event).get
@ -24,7 +28,7 @@ object Serialization {
/** /**
* @return A command or an invalid input description * @return A command or an invalid input description
*/ */
def deserialize(bytes: Seq[Byte]): Either[String, CommandMessage] = def deserializeCommand(bytes: Seq[Byte]): Either[String, CommandMessage] =
{ {
val buffer = ByteBuffer.wrap(bytes.toArray) val buffer = ByteBuffer.wrap(bytes.toArray)
Parser.parseFromByteBuffer(buffer) match { Parser.parseFromByteBuffer(buffer) match {
@ -38,4 +42,22 @@ object Serialization {
Left(s"Parse error: ${e.getMessage}") Left(s"Parse error: ${e.getMessage}")
} }
} }
/**
* @return A command or an invalid input description
*/
def deserializeEvent(bytes: Seq[Byte]): Either[String, EventMessage] =
{
val buffer = ByteBuffer.wrap(bytes.toArray)
Parser.parseFromByteBuffer(buffer) match {
case Success(json) =>
import codec.JsonProtocol._
Converter.fromJson[EventMessage](json) match {
case Success(event) => Right(event)
case Failure(e) => Left(e.getMessage)
}
case Failure(e) =>
Left(s"Parse error: ${e.getMessage}")
}
}
} }