diff --git a/main-command/src/main/scala/sbt/BasicCommands.scala b/main-command/src/main/scala/sbt/BasicCommands.scala index 54f75a75d..e41ccd378 100644 --- a/main-command/src/main/scala/sbt/BasicCommands.scala +++ b/main-command/src/main/scala/sbt/BasicCommands.scala @@ -210,6 +210,7 @@ object BasicCommands { commandListers foreach { x => x.run(commandQueue, CommandStatus(s, true)) } + askingAlready = true } blockUntilNextCommand match { case Some(line) => diff --git a/main/command/src/main/scala/sbt/NetworkListener.scala b/main/command/src/main/scala/sbt/NetworkListener.scala index 6853b256c..9f1c31b60 100644 --- a/main/command/src/main/scala/sbt/NetworkListener.scala +++ b/main/command/src/main/scala/sbt/NetworkListener.scala @@ -1,22 +1,37 @@ package sbt import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.atomic.AtomicBoolean -import server.Server +import sbt.server._ + +private[sbt] final class NetworkListener extends CommandListener { + + private var server: Option[ServerInstance] = None -class NetworkListener extends CommandListener { def run(queue: ConcurrentLinkedQueue[Option[String]], status: CommandStatus): Unit = { - val server = Server.start("127.0.0.1", 12700) - // spawn thread and loop + def onCommand(command: sbt.server.Command): Unit = { + command match { + case Execution(cmd) => queue.add(Some(cmd)) + } + } + + server = Some(Server.start("127.0.0.1", 12700, onCommand)) } def shutdown(): Unit = { // interrupt and kill the thread + server.foreach(_.shutdown()) } - def setStatus(status: CommandStatus): Unit = ??? + def setStatus(cmdStatus: CommandStatus): Unit = { + server.foreach(server => + server.publish( + if (cmdStatus.canEnter) StatusEvent(Ready) + else StatusEvent(Processing("TODO current command", cmdStatus.state.remainingCommands)) + ) + ) + } } diff --git a/main/command/src/main/scala/sbt/server/ClientConnection.scala b/main/command/src/main/scala/sbt/server/ClientConnection.scala index 34bbb9865..42f914693 100644 --- a/main/command/src/main/scala/sbt/server/ClientConnection.scala +++ b/main/command/src/main/scala/sbt/server/ClientConnection.scala @@ -9,7 +9,6 @@ import java.util.concurrent.atomic.AtomicBoolean abstract class ClientConnection(connection: Socket) { - // TODO handle client disconnect private val running = new AtomicBoolean(true) private val delimiter: Byte = '\n'.toByte @@ -17,35 +16,38 @@ abstract class ClientConnection(connection: Socket) { val thread = new Thread(s"sbt-client-${connection.getPort}") { override def run(): Unit = { - val readBuffer = new Array[Byte](4096) - val in = connection.getInputStream - connection.setSoTimeout(5000) - var buffer: Vector[Byte] = Vector.empty - var bytesRead = 0 - while (bytesRead != -1 && running.get) { - try { - bytesRead = in.read(readBuffer) - val bytes = readBuffer.toVector.take(bytesRead) - buffer = buffer ++ bytes + try { + val readBuffer = new Array[Byte](4096) + val in = connection.getInputStream + connection.setSoTimeout(5000) + var buffer: Vector[Byte] = Vector.empty + var bytesRead = 0 + while (bytesRead != -1 && running.get) { + try { + bytesRead = in.read(readBuffer) + val bytes = readBuffer.toVector.take(bytesRead) + buffer = buffer ++ bytes - // handle un-framing - val delimPos = bytes.indexOf(delimiter) - if (delimPos > 0) { - val chunk = buffer.take(delimPos) - buffer = buffer.drop(delimPos) + // handle un-framing + val delimPos = bytes.indexOf(delimiter) + if (delimPos > 0) { + val chunk = buffer.take(delimPos) + buffer = buffer.drop(delimPos + 1) + + Serialization.deserialize(chunk).fold( + errorDesc => println("Got invalid chunk from client: " + errorDesc), + onCommand + ) + } - Serialization.deserialize(chunk).fold( - errorDesc => println("Got invalid chunk from client: " + errorDesc), - onCommand - ) + } catch { + case _: SocketTimeoutException => // its ok } - - } catch { - case _: SocketTimeoutException => // its ok } - } - shutdown() + } finally { + shutdown() + } } } thread.start() diff --git a/main/command/src/main/scala/sbt/server/Serialization.scala b/main/command/src/main/scala/sbt/server/Serialization.scala index 37a4742c9..0b8132a5c 100644 --- a/main/command/src/main/scala/sbt/server/Serialization.scala +++ b/main/command/src/main/scala/sbt/server/Serialization.scala @@ -17,25 +17,32 @@ object Serialization { } def toJson(event: Event): JObject = event match { - case LogEvent() => + case LogEvent(level, message) => JObject( "type" -> JString("log_event"), - "level" -> JString("INFO"), - "message" -> JString("todo") + "level" -> JString(level), + "message" -> JString(message) ) - case StatusEvent() => + case StatusEvent(Ready) => JObject( "type" -> JString("status_event"), "status" -> JString("ready"), "command_queue" -> JArray(List.empty) ) - case ExecutionEvent() => + case StatusEvent(Processing(command, commandQueue)) => + JObject( + "type" -> JString("status_event"), + "status" -> JString("processing"), + "command_queue" -> JArray(commandQueue.map(JString).toList) + ) + + case ExecutionEvent(command, status) => JObject( "type" -> JString("execution_event"), - "command" -> JString("project todo"), - "success" -> JArray(List.empty) + "command" -> JString(command), + "success" -> JBool(status) ) } @@ -47,13 +54,16 @@ object Serialization { val json = parse(new String(bytes.toArray, "UTF-8")) implicit val formats = DefaultFormats - // TODO: is using extract safe? - (json \ "type").extract[String] match { - case "execution" => Right(Execution((json \ "command_line").extract[String])) - case cmd => Left(s"Unknown command type $cmd") + (json \ "type").toOption match { + case Some(JString("execution")) => + (json \ "command_line").toOption match { + case Some(JString(cmd)) => Right(Execution(cmd)) + case _ => Left("Missing or invalid command_line field") + } + case Some(cmd) => Left(s"Unknown command type $cmd") + case None => Left("Invalid command, missing type field") } } catch { - case e: ParseException => Left(s"Parse error: ${e.getMessage}") - case e: MappingException => Left(s"Missing type field") + case e: ParseException => Left(s"Parse error: ${e.getMessage}") } } diff --git a/main/command/src/main/scala/sbt/server/Server.scala b/main/command/src/main/scala/sbt/server/Server.scala index 9a4302ff1..672b9ab89 100644 --- a/main/command/src/main/scala/sbt/server/Server.scala +++ b/main/command/src/main/scala/sbt/server/Server.scala @@ -8,27 +8,25 @@ import java.net.{ SocketTimeoutException, InetAddress, ServerSocket } import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicBoolean -sealed trait ServerInstance { +private[sbt] sealed trait ServerInstance { def shutdown(): Unit - def nextCommand(): Option[Command] + def publish(event: Event): Unit } -object Server { - def start(host: String, port: Int): ServerInstance = +private[sbt] object Server { + def start(host: String, port: Int, onIncommingCommand: Command => Unit): ServerInstance = new ServerInstance { - val commandQueue = new ConcurrentLinkedQueue[Command]() - val lock = new AnyRef {} var clients = Vector[ClientConnection]() val running = new AtomicBoolean(true) - val serverSocket = new ServerSocket(port, 50, InetAddress.getByName(host)) - serverSocket.setSoTimeout(5000) - val serverThread = new Thread("sbt-socket-server") { override def run(): Unit = { + val serverSocket = new ServerSocket(port, 50, InetAddress.getByName(host)) + serverSocket.setSoTimeout(5000) + println(s"SBT socket server started at $host:$port") while (running.get()) { try { @@ -37,8 +35,7 @@ object Server { val connection = new ClientConnection(socket) { override def onCommand(command: Command): Unit = { - println(s"onCommand $command") - commandQueue.add(command) + onIncommingCommand(command) } } @@ -64,13 +61,6 @@ object Server { } } - /** - * @return The next queued command if there is one. It will have to be consumed because it is taken off the queue. - */ - def nextCommand(): Option[Command] = { - Option(commandQueue.poll()) - } - override def shutdown(): Unit = { println("Shutting down server") running.set(false) diff --git a/main/command/src/main/scala/sbt/server/protocol.scala b/main/command/src/main/scala/sbt/server/protocol.scala index e96392edb..dab061021 100644 --- a/main/command/src/main/scala/sbt/server/protocol.scala +++ b/main/command/src/main/scala/sbt/server/protocol.scala @@ -4,12 +4,22 @@ package sbt package server -trait Event +/* + * These classes are the protocol for client-server interaction, + * commands can come from the client side, while events are emitted + * from sbt to inform the client of state changes etc. + */ +private[sbt] sealed trait Event -case class LogEvent() extends Event -case class StatusEvent() extends Event -case class ExecutionEvent() extends Event +private[sbt] final case class LogEvent(level: String, message: String) extends Event -trait Command +sealed trait Status +private[sbt] final case object Ready extends Status +private[sbt] final case class Processing(command: String, commandQueue: Seq[String]) extends Status -case class Execution(cmd: String) extends Command \ No newline at end of file +private[sbt] final case class StatusEvent(status: Status) extends Event +private[sbt] final case class ExecutionEvent(command: String, success: Boolean) extends Event + +private[sbt] sealed trait Command + +private[sbt] final case class Execution(cmd: String) extends Command \ No newline at end of file