First sloppy stab at an embedded server in sbt

This commit is contained in:
Johan Andrén 2016-03-01 17:29:02 +01:00 committed by Eugene Yokota
parent 2aa58b43f4
commit 9557107c97
6 changed files with 223 additions and 2 deletions

View File

@ -193,9 +193,15 @@ lazy val mainSettingsProj = (project in file("main-settings")).
utilLogging, sbtIO, utilCompletion, compilerClasspath, libraryManagement)
)
lazy val serverProj = (project in mainPath / "server").
settings(
baseSettings,
libraryDependencies ++= Seq(json4s, json4sNative) // to transitively get json4s
)
// The main integration project for sbt. It brings all of the Projsystems together, configures them, and provides for overriding conventions.
lazy val mainProj = (project in file("main")).
dependsOn(actionsProj, mainSettingsProj, runProj, commandProj).
dependsOn(actionsProj, mainSettingsProj, runProj, commandProj, serverProj).
settings(
testedBaseSettings,
name := "Main",
@ -244,7 +250,7 @@ lazy val myProvided = config("provided") intransitive
def allProjects = Seq(
testingProj, testAgentProj, taskProj, stdTaskProj, runProj,
scriptedSbtProj, scriptedPluginProj,
actionsProj, commandProj, mainSettingsProj, mainProj, sbtProj, bundledLauncherProj)
actionsProj, commandProj, mainSettingsProj, serverProj, mainProj, sbtProj, bundledLauncherProj)
def projectsWithMyProvided = allProjects.map(p => p.copy(configurations = (p.configurations.filter(_ != Provided)) :+ myProvided))
lazy val nonRoots = projectsWithMyProvided.map(p => LocalProject(p.id))

View File

@ -0,0 +1,66 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.typesafe.com>
*/
package sbt.server
import java.net.{ SocketTimeoutException, Socket }
import java.util.concurrent.atomic.AtomicBoolean
abstract class ClientConnection(connection: Socket) {
// TODO handle client disconnect
private val running = new AtomicBoolean(true)
private val delimiter = '\0'.toByte
private val out = connection.getOutputStream
val thread = new Thread(s"sbt-client-${connection.getPort}") {
override def run(): Unit = {
val readBuffer = new Array[Byte](4096)
val in = connection.getInputStream
connection.setSoTimeout(5000)
var buffer: Vector[Byte] = Vector.empty
var bytesRead = 0
while (bytesRead != -1 && running.get) {
try {
bytesRead = in.read(readBuffer)
val bytes = readBuffer.toVector.take(bytesRead)
buffer = buffer ++ bytes
// handle un-framing
val delimPos = bytes.indexOf(delimiter)
if (delimPos > 0) {
val chunk = buffer.take(delimPos)
buffer = buffer.drop(delimPos)
Serialization.deserialize(chunk).fold(
errorDesc => println("Got invalid chunk from client: " + errorDesc),
onCommand
)
}
} catch {
case _: SocketTimeoutException => // its ok
}
}
shutdown()
}
}
thread.start()
def publish(event: Array[Byte]): Unit = {
out.write(event)
out.write(delimiter)
out.flush()
}
def onCommand(command: Command): Unit
def shutdown(): Unit = {
println("Shutting down client connection")
running.set(false)
out.close()
}
}

View File

@ -0,0 +1,54 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.typesafe.com>
*/
package sbt.server
import org.json4s.JsonAST.{ JArray, JString }
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.native.JsonMethods._
object Serialization {
def serialize(event: Event): Array[Byte] = {
compact(render(toJson(event))).getBytes("UTF-8")
}
def toJson(event: Event): JObject = event match {
case LogEvent() =>
JObject(
"type" -> JString("log_event"),
"level" -> JString("INFO"),
"message" -> JString("todo")
)
case StatusEvent() =>
JObject(
"type" -> JString("status_event"),
"status" -> JString("ready"),
"command_queue" -> JArray(List.empty)
)
case ExecutionEvent() =>
JObject(
"type" -> JString("execution_event"),
"command" -> JString("project todo"),
"success" -> JArray(List.empty)
)
}
/**
* @return A command or an invalid input description
*/
def deserialize(bytes: Seq[Byte]): Either[String, Command] = {
val json = parse(new String(bytes.toArray, "UTF-8"))
implicit val formats = DefaultFormats
(json \ "type").extract[String] match {
case "command" => Right(Execution((json \ "command_line").extract[String]))
case cmd => Left(s"Unknown command type $cmd")
}
}
}

View File

@ -0,0 +1,78 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.typesafe.com>
*/
package sbt.server
import java.net.{ SocketTimeoutException, InetAddress, ServerSocket }
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicBoolean
object Server {
trait ServerInstance {
def shutdown(): Unit
}
def start(host: String, port: Int): ServerInstance =
new ServerInstance {
val commandQueue = new ConcurrentLinkedQueue[Command]()
val lock = new AnyRef {}
var clients = Vector[ClientConnection]()
val running = new AtomicBoolean(true)
val serverSocket = new ServerSocket(port, 50, InetAddress.getByName(host))
serverSocket.setSoTimeout(5000)
val serverThread = new Thread("sbt-socket-server") {
override def run(): Unit = {
println(s"SBT socket server started at $host:$port")
while (running.get()) {
try {
val socket = serverSocket.accept()
println(s"New client connected from: ${socket.getPort}")
val connection = new ClientConnection(socket) {
override def onCommand(command: Command): Unit = {
commandQueue.add(command)
}
}
lock.synchronized {
clients = clients :+ connection
}
} catch {
case _: SocketTimeoutException => // its ok
}
}
}
}
serverThread.start()
/** Publish an event to all connected clients */
def publish(event: Event): Unit = {
// TODO do not do this on the calling thread
val bytes = Serialization.serialize(event)
lock.synchronized {
clients.foreach(_.publish(bytes))
}
}
/**
* @return The next queued command if there is one. It will have to be consumed because it is taken off the queue.
*/
def nextCommand(): Option[Command] = {
Option(commandQueue.poll())
}
override def shutdown(): Unit = {
println("Shutting down server")
running.set(false)
}
}
}

View File

@ -0,0 +1,14 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.typesafe.com>
*/
package sbt.server
trait Event
case class LogEvent() extends Event
case class StatusEvent() extends Event
case class ExecutionEvent() extends Event
trait Command
case class Execution(cmd: String) extends Command

View File

@ -84,6 +84,9 @@ object StandardMain {
/** The common interface to standard output, used for all built-in ConsoleLoggers. */
val console = ConsoleOut.systemOutOverwrite(ConsoleOut.overwriteContaining("Resolving "))
// TODO hook it in, start in the right place, shutdown on termination
val server = sbt.server.Server.start("127.0.0.1", 12700)
def initialGlobalLogging: GlobalLogging = GlobalLogging.initial(MainLogging.globalDefault(console), File.createTempFile("sbt", ".log"), console)
def initialState(configuration: xsbti.AppConfiguration, initialDefinitions: Seq[Command], preCommands: Seq[String]): State =