Server hooked in as a CommandListener

This commit is contained in:
Johan Andrén 2016-03-02 13:45:25 +01:00 committed by Eugene Yokota
parent f9dd8b73b7
commit ff211d08f9
6 changed files with 96 additions and 68 deletions

View File

@ -210,6 +210,7 @@ object BasicCommands {
commandListers foreach { x =>
x.run(commandQueue, CommandStatus(s, true))
}
askingAlready = true
}
blockUntilNextCommand match {
case Some(line) =>

View File

@ -1,22 +1,37 @@
package sbt
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicBoolean
import server.Server
import sbt.server._
private[sbt] final class NetworkListener extends CommandListener {
private var server: Option[ServerInstance] = None
class NetworkListener extends CommandListener {
def run(queue: ConcurrentLinkedQueue[Option[String]],
status: CommandStatus): Unit =
{
val server = Server.start("127.0.0.1", 12700)
// spawn thread and loop
def onCommand(command: sbt.server.Command): Unit = {
command match {
case Execution(cmd) => queue.add(Some(cmd))
}
}
server = Some(Server.start("127.0.0.1", 12700, onCommand))
}
def shutdown(): Unit =
{
// interrupt and kill the thread
server.foreach(_.shutdown())
}
def setStatus(status: CommandStatus): Unit = ???
def setStatus(cmdStatus: CommandStatus): Unit = {
server.foreach(server =>
server.publish(
if (cmdStatus.canEnter) StatusEvent(Ready)
else StatusEvent(Processing("TODO current command", cmdStatus.state.remainingCommands))
)
)
}
}

View File

@ -9,7 +9,6 @@ import java.util.concurrent.atomic.AtomicBoolean
abstract class ClientConnection(connection: Socket) {
// TODO handle client disconnect
private val running = new AtomicBoolean(true)
private val delimiter: Byte = '\n'.toByte
@ -17,35 +16,38 @@ abstract class ClientConnection(connection: Socket) {
val thread = new Thread(s"sbt-client-${connection.getPort}") {
override def run(): Unit = {
val readBuffer = new Array[Byte](4096)
val in = connection.getInputStream
connection.setSoTimeout(5000)
var buffer: Vector[Byte] = Vector.empty
var bytesRead = 0
while (bytesRead != -1 && running.get) {
try {
bytesRead = in.read(readBuffer)
val bytes = readBuffer.toVector.take(bytesRead)
buffer = buffer ++ bytes
try {
val readBuffer = new Array[Byte](4096)
val in = connection.getInputStream
connection.setSoTimeout(5000)
var buffer: Vector[Byte] = Vector.empty
var bytesRead = 0
while (bytesRead != -1 && running.get) {
try {
bytesRead = in.read(readBuffer)
val bytes = readBuffer.toVector.take(bytesRead)
buffer = buffer ++ bytes
// handle un-framing
val delimPos = bytes.indexOf(delimiter)
if (delimPos > 0) {
val chunk = buffer.take(delimPos)
buffer = buffer.drop(delimPos)
// handle un-framing
val delimPos = bytes.indexOf(delimiter)
if (delimPos > 0) {
val chunk = buffer.take(delimPos)
buffer = buffer.drop(delimPos + 1)
Serialization.deserialize(chunk).fold(
errorDesc => println("Got invalid chunk from client: " + errorDesc),
onCommand
)
}
Serialization.deserialize(chunk).fold(
errorDesc => println("Got invalid chunk from client: " + errorDesc),
onCommand
)
} catch {
case _: SocketTimeoutException => // its ok
}
} catch {
case _: SocketTimeoutException => // its ok
}
}
shutdown()
} finally {
shutdown()
}
}
}
thread.start()

View File

@ -17,25 +17,32 @@ object Serialization {
}
def toJson(event: Event): JObject = event match {
case LogEvent() =>
case LogEvent(level, message) =>
JObject(
"type" -> JString("log_event"),
"level" -> JString("INFO"),
"message" -> JString("todo")
"level" -> JString(level),
"message" -> JString(message)
)
case StatusEvent() =>
case StatusEvent(Ready) =>
JObject(
"type" -> JString("status_event"),
"status" -> JString("ready"),
"command_queue" -> JArray(List.empty)
)
case ExecutionEvent() =>
case StatusEvent(Processing(command, commandQueue)) =>
JObject(
"type" -> JString("status_event"),
"status" -> JString("processing"),
"command_queue" -> JArray(commandQueue.map(JString).toList)
)
case ExecutionEvent(command, status) =>
JObject(
"type" -> JString("execution_event"),
"command" -> JString("project todo"),
"success" -> JArray(List.empty)
"command" -> JString(command),
"success" -> JBool(status)
)
}
@ -47,13 +54,16 @@ object Serialization {
val json = parse(new String(bytes.toArray, "UTF-8"))
implicit val formats = DefaultFormats
// TODO: is using extract safe?
(json \ "type").extract[String] match {
case "execution" => Right(Execution((json \ "command_line").extract[String]))
case cmd => Left(s"Unknown command type $cmd")
(json \ "type").toOption match {
case Some(JString("execution")) =>
(json \ "command_line").toOption match {
case Some(JString(cmd)) => Right(Execution(cmd))
case _ => Left("Missing or invalid command_line field")
}
case Some(cmd) => Left(s"Unknown command type $cmd")
case None => Left("Invalid command, missing type field")
}
} catch {
case e: ParseException => Left(s"Parse error: ${e.getMessage}")
case e: MappingException => Left(s"Missing type field")
case e: ParseException => Left(s"Parse error: ${e.getMessage}")
}
}

View File

@ -8,27 +8,25 @@ import java.net.{ SocketTimeoutException, InetAddress, ServerSocket }
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicBoolean
sealed trait ServerInstance {
private[sbt] sealed trait ServerInstance {
def shutdown(): Unit
def nextCommand(): Option[Command]
def publish(event: Event): Unit
}
object Server {
def start(host: String, port: Int): ServerInstance =
private[sbt] object Server {
def start(host: String, port: Int, onIncommingCommand: Command => Unit): ServerInstance =
new ServerInstance {
val commandQueue = new ConcurrentLinkedQueue[Command]()
val lock = new AnyRef {}
var clients = Vector[ClientConnection]()
val running = new AtomicBoolean(true)
val serverSocket = new ServerSocket(port, 50, InetAddress.getByName(host))
serverSocket.setSoTimeout(5000)
val serverThread = new Thread("sbt-socket-server") {
override def run(): Unit = {
val serverSocket = new ServerSocket(port, 50, InetAddress.getByName(host))
serverSocket.setSoTimeout(5000)
println(s"SBT socket server started at $host:$port")
while (running.get()) {
try {
@ -37,8 +35,7 @@ object Server {
val connection = new ClientConnection(socket) {
override def onCommand(command: Command): Unit = {
println(s"onCommand $command")
commandQueue.add(command)
onIncommingCommand(command)
}
}
@ -64,13 +61,6 @@ object Server {
}
}
/**
* @return The next queued command if there is one. It will have to be consumed because it is taken off the queue.
*/
def nextCommand(): Option[Command] = {
Option(commandQueue.poll())
}
override def shutdown(): Unit = {
println("Shutting down server")
running.set(false)

View File

@ -4,12 +4,22 @@
package sbt
package server
trait Event
/*
* These classes are the protocol for client-server interaction,
* commands can come from the client side, while events are emitted
* from sbt to inform the client of state changes etc.
*/
private[sbt] sealed trait Event
case class LogEvent() extends Event
case class StatusEvent() extends Event
case class ExecutionEvent() extends Event
private[sbt] final case class LogEvent(level: String, message: String) extends Event
trait Command
sealed trait Status
private[sbt] final case object Ready extends Status
private[sbt] final case class Processing(command: String, commandQueue: Seq[String]) extends Status
case class Execution(cmd: String) extends Command
private[sbt] final case class StatusEvent(status: Status) extends Event
private[sbt] final case class ExecutionEvent(command: String, success: Boolean) extends Event
private[sbt] sealed trait Command
private[sbt] final case class Execution(cmd: String) extends Command