Refactor to CommandExchange and CommandChannel

This commit is contained in:
Eugene Yokota 2016-03-19 02:36:16 -04:00
parent 48d3b01e6b
commit ebf4715dd1
8 changed files with 104 additions and 78 deletions

View File

@ -6,7 +6,7 @@ import sbt.internal.util.complete.{ Completion, Completions, DefaultParsers, His
import sbt.internal.util.Types.{ const, idFun }
import sbt.internal.inc.classpath.ClasspathUtilities.toLoader
import sbt.internal.inc.ModuleUtilities
import sbt.internal.{ NetworkListener, ConsoleListener }
import sbt.internal.{ CommandRequest, CommandSource, CommandStatus }
import DefaultParsers._
import Function.tupled
import Command.applyEffect
@ -16,11 +16,9 @@ import BasicKeys._
import java.io.File
import sbt.io.IO
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicBoolean
import scala.util.control.NonFatal
import scala.annotation.tailrec
object BasicCommands {
lazy val allBasicCommands = Seq(nop, ignore, help, completionsCommand, multi, ifLast, append, setOnFailure, clearOnFailure, stashOnFailure, popOnFailure, reboot, call, early, exit, continuous, history, shell, server, read, alias) ++ compatCommands
@ -195,39 +193,19 @@ object BasicCommands {
}
}
private[sbt] var askingAlready = false
private[sbt] val commandQueue: ConcurrentLinkedQueue[(String, Option[String])] = new ConcurrentLinkedQueue()
private[sbt] val commandListers = Seq(new ConsoleListener(commandQueue), new NetworkListener(commandQueue))
@tailrec private[sbt] def blockUntilNextCommand: (String, Option[String]) =
Option(commandQueue.poll) match {
case Some(x) => x
case _ =>
Thread.sleep(50)
blockUntilNextCommand
}
def server = Command.command(Server, Help.more(Server, ServerDetailed)) { s =>
if (askingAlready) {
commandListers foreach { x =>
x.resume(CommandStatus(s, true))
}
} else {
commandListers foreach { x =>
x.run(CommandStatus(s, true))
}
askingAlready = true
val exchange = State.exchange
exchange.channels foreach { x =>
x.runOrResume(CommandStatus(s, true))
x.setStatus(CommandStatus(s, true), None)
}
blockUntilNextCommand match {
case (source, Some(line)) =>
if (source != "human") {
println(line)
}
commandListers foreach { x =>
x.pause()
}
val newState = s.copy(onFailure = Some(Server), remainingCommands = line +: Server +: s.remainingCommands).setInteractive(true)
if (line.trim.isEmpty) newState else newState.clearGlobalLog
case _ => s.setInteractive(false)
val CommandRequest(source, line) = exchange.blockUntilNextCommand
val newState = s.copy(onFailure = Some(Server), remainingCommands = line +: Server +: s.remainingCommands).setInteractive(true)
exchange.channels foreach { x =>
x.setStatus(CommandStatus(newState, false), Some(source))
}
if (line.trim.isEmpty) newState
else newState.clearGlobalLog
}
def read = Command.make(ReadCommand, Help.more(ReadCommand, ReadDetailed))(s => applyEffect(readParser(s))(doRead(s)))

View File

@ -8,6 +8,7 @@ import java.util.concurrent.Callable
import sbt.util.Logger
import sbt.internal.util.{ AttributeKey, AttributeMap, ErrorHandling, ExitHook, ExitHooks, GlobalLogging }
import sbt.internal.util.complete.HistoryCommands
import sbt.internal.CommandExchange
import sbt.internal.inc.classpath.ClassLoaderCache
/**
@ -178,6 +179,8 @@ object State {
new Reboot(app.scalaProvider.version, state.remainingCommands, app.id, state.configuration.baseDirectory)
}
private[sbt] lazy val exchange = new CommandExchange()
/** Provides operations and transformations on State. */
implicit def stateOps(s: State): StateOps = new StateOps {
def process(f: (String, State) => State): State =

View File

@ -1,3 +0,0 @@
package sbt
case class CommandStatus(state: State, canEnter: Boolean)

View File

@ -0,0 +1,29 @@
package sbt
package internal
/**
* A command channel represents an IO device such as network socket or human
* that can issue command or listen for some outputs.
* We can think of a command channel to be an abstration of the terminal window.
*/
abstract class CommandChannel(exchange: CommandExchange) {
/** start listening for a command request. */
def runOrResume(status: CommandStatus): Unit
def setStatus(status: CommandStatus, lastSource: Option[CommandSource]): Unit
def shutdown(): Unit
}
case class CommandRequest(source: CommandSource, commandLine: String)
sealed trait CommandSource
object CommandSource {
case object Human extends CommandSource
case object Network extends CommandSource
}
/**
* This is a data that is passed on to the channels.
* The canEnter paramter indicates that the console devise or UI
* should stop listening.
*/
case class CommandStatus(state: State, canEnter: Boolean)

View File

@ -0,0 +1,27 @@
package sbt
package internal
import java.util.concurrent.ConcurrentLinkedQueue
import scala.annotation.tailrec
/**
* The command exchange merges multiple command channels (e.g. network and console),
* and acts as the central multiplexing point.
* Instead of blocking on JLine.readLine, the server commmand will block on
* this exchange, which could serve command request from either of the channel.
*/
private[sbt] final class CommandExchange {
private val commandQueue: ConcurrentLinkedQueue[CommandRequest] = new ConcurrentLinkedQueue()
lazy val channels = Seq(new ConsoleChannel(this), new NetworkChannel(this))
def append(request: CommandRequest): Boolean =
commandQueue.add(request)
@tailrec def blockUntilNextCommand: CommandRequest =
Option(commandQueue.poll) match {
case Some(x) => x
case _ =>
Thread.sleep(50)
blockUntilNextCommand
}
}

View File

@ -1,14 +0,0 @@
package sbt
package internal
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicBoolean
abstract class CommandListener(queue: ConcurrentLinkedQueue[(String, Option[String])]) {
// represents a loop that keeps asking an IO device for String input
def run(status: CommandStatus): Unit
def shutdown(): Unit
def setStatus(status: CommandStatus): Unit
def pause(): Unit
def resume(status: CommandStatus): Unit
}

View File

@ -2,29 +2,33 @@ package sbt
package internal
import sbt.internal.util._
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference }
import BasicKeys._
import java.io.File
private[sbt] final class ConsoleListener(queue: ConcurrentLinkedQueue[(String, Option[String])]) extends CommandListener(queue) {
private[sbt] final class ConsoleChannel(exchange: CommandExchange) extends CommandChannel(exchange) {
private var askUserThread: Option[Thread] = None
def makeAskUserThread(status: CommandStatus): Thread = new Thread("ask-user-thread") {
val s = status.state
val history = (s get historyPath) getOrElse Some(new File(s.baseDir, ".history"))
val prompt = (s get shellPrompt) match { case Some(pf) => pf(s); case None => "> " }
val prompt = (s get shellPrompt) match {
case Some(pf) => pf(s)
case None => "> "
}
val reader = new FullReader(history, s.combinedParser)
override def run(): Unit = {
try {
val line = reader.readLine(prompt)
line map { x => queue.add(("human", Some(x))) }
line match {
case Some(cmd) => exchange.append(CommandRequest(CommandSource.Human, cmd))
case None => exchange.append(CommandRequest(CommandSource.Human, "exit"))
}
} catch {
case e: InterruptedException =>
}
}
}
def run(status: CommandStatus): Unit =
def runOrResume(status: CommandStatus): Unit =
askUserThread match {
case Some(x) if x.isAlive => //
case _ =>
@ -33,6 +37,20 @@ private[sbt] final class ConsoleListener(queue: ConcurrentLinkedQueue[(String, O
askUserThread = Some(x)
}
def setStatus(status: CommandStatus, lastSource: Option[CommandSource]): Unit =
if (status.canEnter) ()
else {
shutdown()
lastSource match {
case Some(src) if src != CommandSource.Human =>
val s = status.state
s.remainingCommands.headOption map {
println(_)
}
case _ => //
}
}
def shutdown(): Unit =
askUserThread match {
case Some(x) if x.isAlive =>
@ -40,17 +58,4 @@ private[sbt] final class ConsoleListener(queue: ConcurrentLinkedQueue[(String, O
askUserThread = None
case _ => ()
}
def pause(): Unit = shutdown()
def resume(status: CommandStatus): Unit =
askUserThread match {
case Some(x) if x.isAlive => //
case _ =>
val x = makeAskUserThread(status)
x.start
askUserThread = Some(x)
}
def setStatus(status: CommandStatus): Unit = ()
}

View File

@ -1,29 +1,30 @@
package sbt
package internal
import java.util.concurrent.ConcurrentLinkedQueue
import sbt.internal.server._
private[sbt] final class NetworkListener(queue: ConcurrentLinkedQueue[(String, Option[String])]) extends CommandListener(queue) {
private[sbt] final class NetworkChannel(exchange: CommandExchange) extends CommandChannel(exchange) {
private var server: Option[ServerInstance] = None
def run(status: CommandStatus): Unit =
def runOrResume(status: CommandStatus): Unit =
{
def onCommand(command: internal.server.Command): Unit = {
command match {
case Execution(cmd) => queue.add(("network", Some(cmd)))
case Execution(cmd) => exchange.append(CommandRequest(CommandSource.Network, cmd))
}
}
server = Some(Server.start("127.0.0.1", 12700, onCommand))
server match {
case Some(x) => // do nothing
case _ =>
server = Some(Server.start("127.0.0.1", 12700, onCommand))
}
}
def shutdown(): Unit =
{
// interrupt and kill the thread
server.foreach(_.shutdown())
server = None
}
// network doesn't pause or resume
@ -32,7 +33,7 @@ private[sbt] final class NetworkListener(queue: ConcurrentLinkedQueue[(String, O
// network doesn't pause or resume
def resume(status: CommandStatus): Unit = ()
def setStatus(cmdStatus: CommandStatus): Unit = {
def setStatus(cmdStatus: CommandStatus, lastSource: Option[CommandSource]): Unit = {
server.foreach(server =>
server.publish(
if (cmdStatus.canEnter) StatusEvent(Ready)