Unifying towards using events

This commit is contained in:
Eugene Yokota 2016-12-06 00:40:53 -05:00
parent 46d8f952e4
commit d96ef58605
16 changed files with 194 additions and 111 deletions

View File

@ -6,7 +6,7 @@ import sbt.internal.util.complete.{ Completion, Completions, DefaultParsers, His
import sbt.internal.util.Types.{ const, idFun }
import sbt.internal.inc.classpath.ClasspathUtilities.toLoader
import sbt.internal.inc.ModuleUtilities
import sbt.internal.{ Exec, CommandSource, CommandStatus }
import sbt.internal.{ Exec, ConsolePromptEvent, ConsoleUnpromptEvent }
import sbt.internal.client.NetworkClient
import DefaultParsers._
import Function.tupled
@ -196,10 +196,11 @@ object BasicCommands {
def server = Command.command(Server, Help.more(Server, ServerDetailed)) { s0 =>
val exchange = State.exchange
val s1 = exchange.run(s0)
exchange.publishStatus(CommandStatus(s0, true), None)
val Exec(source, line) = exchange.blockUntilNextExec
exchange.publishEvent(ConsolePromptEvent(s0))
val Exec(line, source) = exchange.blockUntilNextExec
println(s"server (line, source): ($line, $source)")
val newState = s1.copy(onFailure = Some(Server), remainingCommands = line +: Server +: s1.remainingCommands).setInteractive(true)
exchange.publishStatus(CommandStatus(newState, false), Some(source))
exchange.publishEvent(ConsoleUnpromptEvent(source))
if (line.trim.isEmpty) newState
else newState.clearGlobalLog
}

View File

@ -7,6 +7,7 @@ import sbt.internal.inc.ReflectUtilities
import sbt.internal.util.complete.{ DefaultParsers, EditDistance, Parser }
import sbt.internal.util.Types.const
import sbt.internal.util.{ AttributeKey, AttributeMap, Util }
import sbt.protocol.ExecStatusEvent
sealed trait Command {
def help: State => Help
@ -87,15 +88,18 @@ object Command {
}
}
/** This is the main function State transfer function of the sbt command processing, called by MainLoop.next, */
def process(command: String, state: State): State =
{
val parser = combine(state.definedCommands)
parse(command, parser(state)) match {
val newState = parse(command, parser(state)) match {
case Right(s) => s() // apply command. command side effects happen here
case Left(errMsg) =>
state.log.error(errMsg)
state.fail
}
State.exchange.publishEvent(ExecStatusEvent("Ready", newState.remainingCommands.toVector))
newState
}
def invalidValue(label: String, allowed: Iterable[String])(value: String): String =
"Not a valid " + label + ": " + value + similar(value, allowed)

View File

@ -2,6 +2,7 @@ package sbt
package internal
import java.util.concurrent.ConcurrentLinkedQueue
import sbt.protocol.EventMessage
/**
* A command channel represents an IO device such as network socket or human
@ -14,22 +15,21 @@ abstract class CommandChannel {
commandQueue.add(exec)
def poll: Option[Exec] = Option(commandQueue.poll)
def publishStatus(status: CommandStatus, lastSource: Option[CommandSource]): Unit
def publishEvent(event: EventMessage): Unit
def publishBytes(bytes: Array[Byte]): Unit
def shutdown(): Unit
}
case class Exec(source: CommandSource, commandLine: String)
case class Exec(commandLine: String, source: Option[CommandSource])
sealed trait CommandSource
object CommandSource {
case object Human extends CommandSource
case object Network extends CommandSource
}
case class CommandSource(channelName: String)
/**
* This is a data that is passed on to the channels.
* The canEnter paramter indicates that the console devise or UI
* should stop listening.
/*
* This is a data passed specifically for local prompting console.
*/
case class CommandStatus(state: State, canEnter: Boolean)
case class ConsolePromptEvent(state: State) extends EventMessage
/*
* This is a data passed specifically for unprompting local console.
*/
case class ConsoleUnpromptEvent(lastSource: Option[CommandSource]) extends EventMessage

View File

@ -5,11 +5,10 @@ import java.net.SocketException
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
import sbt.internal.server._
import sbt.protocol.Serialization
import sbt.protocol.{ EventMessage, Serialization }
import scala.collection.mutable.ListBuffer
import scala.annotation.tailrec
import BasicKeys.serverPort
import sbt.protocol.StatusEvent
import java.net.Socket
/**
@ -21,6 +20,7 @@ import java.net.Socket
private[sbt] final class CommandExchange {
private val lock = new AnyRef {}
private var server: Option[ServerInstance] = None
private var consoleChannel: Option[ConsoleChannel] = None
private val commandQueue: ConcurrentLinkedQueue[Exec] = new ConcurrentLinkedQueue()
private val channelBuffer: ListBuffer[CommandChannel] = new ListBuffer()
private val nextChannelId: AtomicInteger = new AtomicInteger(0)
@ -30,8 +30,6 @@ private[sbt] final class CommandExchange {
channelBuffer.append(c)
}
subscribe(new ConsoleChannel())
// periodically move all messages from all the channels
@tailrec def blockUntilNextExec: Exec =
{
@ -51,7 +49,17 @@ private[sbt] final class CommandExchange {
}
}
def run(s: State): State = runServer(s)
def run(s: State): State =
{
consoleChannel match {
case Some(x) => // do nothing
case _ =>
val x = new ConsoleChannel("console0")
consoleChannel = Some(x)
subscribe(x)
}
runServer(s)
}
private def newChannelName: String = s"channel-${nextChannelId.incrementAndGet()}"
@ -85,27 +93,33 @@ private[sbt] final class CommandExchange {
server = None
}
// fanout publishStatus to all channels
def publishStatus(status: CommandStatus, lastSource: Option[CommandSource]): Unit =
// fanout publisEvent
def publishEvent(event: EventMessage): Unit =
{
val toDel: ListBuffer[CommandChannel] = ListBuffer.empty
val event =
if (status.canEnter) StatusEvent("Ready", Vector())
else StatusEvent("Processing", status.state.remainingCommands.toVector)
// TODO do not do this on the calling thread
val bytes = Serialization.serializeEvent(event)
channels.foreach {
case c: ConsoleChannel =>
c.publishStatus(status, lastSource)
case c: NetworkChannel =>
try {
c.publishBytes(bytes)
} catch {
case e: SocketException =>
// log.debug(e.getMessage)
toDel += c
event match {
// Special treatment for ConsolePromptEvent since it's hand coded without codec.
case e: ConsolePromptEvent =>
channels collect {
case c: ConsoleChannel => c.publishEvent(e)
}
case e: ConsoleUnpromptEvent =>
channels collect {
case c: ConsoleChannel => c.publishEvent(e)
}
case _ =>
// TODO do not do this on the calling thread
val bytes = Serialization.serializeEvent(event)
channels.foreach {
case c: ConsoleChannel =>
c.publishEvent(event)
case c: NetworkChannel =>
try {
c.publishBytes(bytes)
} catch {
case e: SocketException =>
toDel += c
}
}
}
toDel.toList match {

View File

@ -4,11 +4,11 @@ package internal
import sbt.internal.util._
import BasicKeys._
import java.io.File
import sbt.protocol.EventMessage
private[sbt] final class ConsoleChannel extends CommandChannel {
private[sbt] final class ConsoleChannel(name: String) extends CommandChannel {
private var askUserThread: Option[Thread] = None
def makeAskUserThread(status: CommandStatus): Thread = new Thread("ask-user-thread") {
val s = status.state
def makeAskUserThread(s: State): Thread = new Thread("ask-user-thread") {
val history = (s get historyPath) getOrElse Some(new File(s.baseDir, ".history"))
val prompt = (s get shellPrompt) match {
case Some(pf) => pf(s)
@ -19,8 +19,8 @@ private[sbt] final class ConsoleChannel extends CommandChannel {
// This internally handles thread interruption and returns Some("")
val line = reader.readLine(prompt)
line match {
case Some(cmd) => append(Exec(CommandSource.Human, cmd))
case None => append(Exec(CommandSource.Human, "exit"))
case Some(cmd) => append(Exec(cmd, Some(CommandSource(name))))
case None => append(Exec("exit", Some(CommandSource(name))))
}
askUserThread = None
}
@ -30,25 +30,27 @@ private[sbt] final class ConsoleChannel extends CommandChannel {
def publishBytes(bytes: Array[Byte]): Unit = ()
def publishStatus(status: CommandStatus, lastSource: Option[CommandSource]): Unit =
if (status.canEnter) {
askUserThread match {
case Some(x) => //
case _ =>
val x = makeAskUserThread(status)
askUserThread = Some(x)
x.start
}
} else {
lastSource match {
case Some(src) if src != CommandSource.Human =>
askUserThread match {
case Some(x) =>
shutdown()
case _ =>
}
case _ =>
}
def publishEvent(event: EventMessage): Unit =
event match {
case e: ConsolePromptEvent =>
askUserThread match {
case Some(x) => //
case _ =>
val x = makeAskUserThread(e.state)
askUserThread = Some(x)
x.start
}
case e: ConsoleUnpromptEvent =>
e.lastSource match {
case Some(src) if src.channelName != name =>
askUserThread match {
case Some(x) =>
shutdown()
case _ =>
}
case _ =>
}
case _ => //
}
def shutdown(): Unit =

View File

@ -6,6 +6,7 @@ package internal
package client
import java.net.{ URI, Socket, InetAddress, SocketException }
import java.util.UUID
import sbt.protocol._
import sbt.internal.util.JLine
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference }
@ -39,7 +40,7 @@ class NetworkClient(arguments: List[String]) {
new ServerConnection(socket) {
override def onEvent(event: EventMessage): Unit =
event match {
case e: StatusEvent =>
case e: ExecStatusEvent =>
lock.synchronized {
status.set(e.status)
}
@ -61,12 +62,13 @@ class NetworkClient(arguments: List[String]) {
case Some("exit") =>
running.set(false)
case Some(s) =>
publishCommand(ExecCommand(s))
val execId = UUID.randomUUID.toString
publishCommand(ExecCommand(s, execId))
while (status.get != "Ready") {
Thread.sleep(100)
}
case _ => //
}
while (status.get != "Ready") {
Thread.sleep(100)
}
}
}

View File

@ -7,7 +7,7 @@ package server
import java.net.{ Socket, SocketTimeoutException }
import java.util.concurrent.atomic.AtomicBoolean
import sbt.protocol.{ Serialization, CommandMessage, ExecCommand }
import sbt.protocol.{ Serialization, CommandMessage, ExecCommand, EventMessage }
final class NetworkChannel(name: String, connection: Socket) extends CommandChannel {
private val running = new AtomicBoolean(true)
@ -50,10 +50,8 @@ final class NetworkChannel(name: String, connection: Socket) extends CommandChan
}
thread.start()
def publishStatus(status: CommandStatus, lastSource: Option[CommandSource]): Unit =
{
()
}
def publishEvent(event: EventMessage): Unit = ()
def publishBytes(event: Array[Byte]): Unit =
{
out.write(event)
@ -63,7 +61,7 @@ final class NetworkChannel(name: String, connection: Socket) extends CommandChan
def onCommand(command: CommandMessage): Unit =
command match {
case x: ExecCommand => append(Exec(CommandSource.Network, x.commandLine))
case x: ExecCommand => append(Exec(x.commandLine, Some(CommandSource(name))))
}
def shutdown(): Unit = {

View File

@ -35,21 +35,15 @@ object Aggregation {
runTasks(s, structure, ts, DummyTaskMap(Nil), show)
}
@deprecated("Use `timedRun` and `showRun` directly or use `runTasks`.", "0.13.0")
def runTasksWithResult[T](s: State, structure: BuildStructure, ts: Values[Task[T]], extra: DummyTaskMap, show: ShowConfig)(implicit display: Show[ScopedKey[_]]): (State, Result[Seq[KeyValue[T]]]) =
private def showRun[T](complete: Complete[T], show: ShowConfig)(implicit display: Show[ScopedKey[_]]): Unit =
{
val complete = timedRun[T](s, ts, extra)
showRun(complete, show)
(complete.state, complete.results)
import complete._
val log = state.log
val extracted = Project.extract(state)
val success = results match { case Value(_) => true; case Inc(_) => false }
results.toEither.right.foreach { r => if (show.taskValues) printSettings(r, show.print) }
if (show.success) printSuccess(start, stop, extracted, success, log)
}
def showRun[T](complete: Complete[T], show: ShowConfig)(implicit display: Show[ScopedKey[_]]): Unit = {
import complete._
val log = state.log
val extracted = Project.extract(state)
val success = results match { case Value(_) => true; case Inc(_) => false }
results.toEither.right.foreach { r => if (show.taskValues) printSettings(r, show.print) }
if (show.success) printSuccess(start, stop, extracted, success, log)
}
def timedRun[T](s: State, ts: Values[Task[T]], extra: DummyTaskMap): Complete[T] =
{
import EvaluateTask._

View File

@ -6,28 +6,37 @@
package sbt.protocol
/** Command to execute sbt command. */
final class ExecCommand private (
val commandLine: String) extends sbt.protocol.CommandMessage() with Serializable {
val commandLine: String,
val execId: Option[String]) extends sbt.protocol.CommandMessage() with Serializable {
private def this(commandLine: String) = this(commandLine, None)
override def equals(o: Any): Boolean = o match {
case x: ExecCommand => (this.commandLine == x.commandLine)
case x: ExecCommand => (this.commandLine == x.commandLine) && (this.execId == x.execId)
case _ => false
}
override def hashCode: Int = {
37 * (17 + commandLine.##)
37 * (37 * (17 + commandLine.##) + execId.##)
}
override def toString: String = {
"ExecCommand(" + commandLine + ")"
"ExecCommand(" + commandLine + ", " + execId + ")"
}
protected[this] def copy(commandLine: String = commandLine): ExecCommand = {
new ExecCommand(commandLine)
protected[this] def copy(commandLine: String = commandLine, execId: Option[String] = execId): ExecCommand = {
new ExecCommand(commandLine, execId)
}
def withCommandLine(commandLine: String): ExecCommand = {
copy(commandLine = commandLine)
}
def withExecId(execId: Option[String]): ExecCommand = {
copy(execId = execId)
}
def withExecId(execId: String): ExecCommand = {
copy(execId = Option(execId))
}
}
object ExecCommand {
def apply(commandLine: String): ExecCommand = new ExecCommand(commandLine)
def apply(commandLine: String): ExecCommand = new ExecCommand(commandLine, None)
def apply(commandLine: String, execId: Option[String]): ExecCommand = new ExecCommand(commandLine, execId)
def apply(commandLine: String, execId: String): ExecCommand = new ExecCommand(commandLine, Option(execId))
}

View File

@ -5,33 +5,33 @@
// DO NOT EDIT MANUALLY
package sbt.protocol
/** Status event. */
final class StatusEvent private (
final class ExecStatusEvent private (
val status: String,
val commandQueue: Vector[String]) extends sbt.protocol.EventMessage() with Serializable {
override def equals(o: Any): Boolean = o match {
case x: StatusEvent => (this.status == x.status) && (this.commandQueue == x.commandQueue)
case x: ExecStatusEvent => (this.status == x.status) && (this.commandQueue == x.commandQueue)
case _ => false
}
override def hashCode: Int = {
37 * (37 * (17 + status.##) + commandQueue.##)
}
override def toString: String = {
"StatusEvent(" + status + ", " + commandQueue + ")"
"ExecStatusEvent(" + status + ", " + commandQueue + ")"
}
protected[this] def copy(status: String = status, commandQueue: Vector[String] = commandQueue): StatusEvent = {
new StatusEvent(status, commandQueue)
protected[this] def copy(status: String = status, commandQueue: Vector[String] = commandQueue): ExecStatusEvent = {
new ExecStatusEvent(status, commandQueue)
}
def withStatus(status: String): StatusEvent = {
def withStatus(status: String): ExecStatusEvent = {
copy(status = status)
}
def withCommandQueue(commandQueue: Vector[String]): StatusEvent = {
def withCommandQueue(commandQueue: Vector[String]): ExecStatusEvent = {
copy(commandQueue = commandQueue)
}
}
object StatusEvent {
object ExecStatusEvent {
def apply(status: String, commandQueue: Vector[String]): StatusEvent = new StatusEvent(status, commandQueue)
def apply(status: String, commandQueue: Vector[String]): ExecStatusEvent = new ExecStatusEvent(status, commandQueue)
}

View File

@ -0,0 +1,27 @@
/**
* This code is generated using sbt-datatype.
*/
// DO NOT EDIT MANUALLY
package sbt.protocol.codec
import _root_.sjsonnew.{ deserializationError, serializationError, Builder, JsonFormat, Unbuilder }
trait ConsolePromptEventFormats { self: sjsonnew.BasicJsonProtocol =>
implicit lazy val ConsolePromptEventFormat: JsonFormat[sbt.protocol.ConsolePromptEvent] = new JsonFormat[sbt.protocol.ConsolePromptEvent] {
override def read[J](jsOpt: Option[J], unbuilder: Unbuilder[J]): sbt.protocol.ConsolePromptEvent = {
jsOpt match {
case Some(js) =>
unbuilder.beginObject(js)
unbuilder.endObject()
sbt.protocol.ConsolePromptEvent()
case None =>
deserializationError("Expected JsObject but found None")
}
}
override def write[J](obj: sbt.protocol.ConsolePromptEvent, builder: Builder[J]): Unit = {
builder.beginObject()
builder.endObject()
}
}
}

View File

@ -5,6 +5,6 @@
// DO NOT EDIT MANUALLY
package sbt.protocol.codec
import _root_.sjsonnew.{ deserializationError, serializationError, Builder, JsonFormat, Unbuilder }
trait EventMessageFormats { self: sjsonnew.BasicJsonProtocol with sbt.protocol.codec.LogEventFormats with sbt.protocol.codec.StatusEventFormats =>
implicit lazy val EventMessageFormat: JsonFormat[sbt.protocol.EventMessage] = flatUnionFormat2[sbt.protocol.EventMessage, sbt.protocol.LogEvent, sbt.protocol.StatusEvent]("type")
trait EventMessageFormats { self: sjsonnew.BasicJsonProtocol with sbt.protocol.codec.LogEventFormats with sbt.protocol.codec.ExecStatusEventFormats =>
implicit lazy val EventMessageFormat: JsonFormat[sbt.protocol.EventMessage] = flatUnionFormat2[sbt.protocol.EventMessage, sbt.protocol.LogEvent, sbt.protocol.ExecStatusEvent]("type")
}

View File

@ -12,8 +12,9 @@ implicit lazy val ExecCommandFormat: JsonFormat[sbt.protocol.ExecCommand] = new
case Some(js) =>
unbuilder.beginObject(js)
val commandLine = unbuilder.readField[String]("commandLine")
val execId = unbuilder.readField[Option[String]]("execId")
unbuilder.endObject()
sbt.protocol.ExecCommand(commandLine)
sbt.protocol.ExecCommand(commandLine, execId)
case None =>
deserializationError("Expected JsObject but found None")
}
@ -21,6 +22,7 @@ implicit lazy val ExecCommandFormat: JsonFormat[sbt.protocol.ExecCommand] = new
override def write[J](obj: sbt.protocol.ExecCommand, builder: Builder[J]): Unit = {
builder.beginObject()
builder.addField("commandLine", obj.commandLine)
builder.addField("execId", obj.execId)
builder.endObject()
}
}

View File

@ -0,0 +1,29 @@
/**
* This code is generated using sbt-datatype.
*/
// DO NOT EDIT MANUALLY
package sbt.protocol.codec
import _root_.sjsonnew.{ deserializationError, serializationError, Builder, JsonFormat, Unbuilder }
trait ExecStatusEventFormats { self: sjsonnew.BasicJsonProtocol =>
implicit lazy val ExecStatusEventFormat: JsonFormat[sbt.protocol.ExecStatusEvent] = new JsonFormat[sbt.protocol.ExecStatusEvent] {
override def read[J](jsOpt: Option[J], unbuilder: Unbuilder[J]): sbt.protocol.ExecStatusEvent = {
jsOpt match {
case Some(js) =>
unbuilder.beginObject(js)
val status = unbuilder.readField[String]("status")
val commandQueue = unbuilder.readField[Vector[String]]("commandQueue")
unbuilder.endObject()
sbt.protocol.ExecStatusEvent(status, commandQueue)
case None =>
deserializationError("Expected JsObject but found None")
}
}
override def write[J](obj: sbt.protocol.ExecStatusEvent, builder: Builder[J]): Unit = {
builder.beginObject()
builder.addField("status", obj.status)
builder.addField("commandQueue", obj.commandQueue)
builder.endObject()
}
}
}

View File

@ -4,5 +4,5 @@
// DO NOT EDIT MANUALLY
package sbt.protocol.codec
trait JsonProtocol extends sjsonnew.BasicJsonProtocol with sbt.protocol.codec.ExecCommandFormats with sbt.protocol.codec.CommandMessageFormats with sbt.protocol.codec.LogEventFormats with sbt.protocol.codec.StatusEventFormats with sbt.protocol.codec.EventMessageFormats
trait JsonProtocol extends sjsonnew.BasicJsonProtocol with sbt.protocol.codec.ExecCommandFormats with sbt.protocol.codec.CommandMessageFormats with sbt.protocol.codec.LogEventFormats with sbt.protocol.codec.ExecStatusEventFormats with sbt.protocol.codec.EventMessageFormats
object JsonProtocol extends JsonProtocol

View File

@ -10,6 +10,7 @@ interface CommandMessage {
## Command to execute sbt command.
type ExecCommand implements CommandMessage {
commandLine: String!
execId: String @since("0.0.1")
}
## Message for events.
@ -23,7 +24,7 @@ type LogEvent implements EventMessage {
}
## Status event.
type StatusEvent implements EventMessage {
type ExecStatusEvent implements EventMessage {
status: String!
commandQueue: [String]
}