Exec and CommandExchange refactoring

This commit is contained in:
Eugene Yokota 2016-03-21 14:15:42 -04:00
parent 3359163636
commit a89fcb37ca
5 changed files with 74 additions and 52 deletions

View File

@ -6,7 +6,7 @@ import sbt.internal.util.complete.{ Completion, Completions, DefaultParsers, His
import sbt.internal.util.Types.{ const, idFun } import sbt.internal.util.Types.{ const, idFun }
import sbt.internal.inc.classpath.ClasspathUtilities.toLoader import sbt.internal.inc.classpath.ClasspathUtilities.toLoader
import sbt.internal.inc.ModuleUtilities import sbt.internal.inc.ModuleUtilities
import sbt.internal.{ CommandRequest, CommandSource, CommandStatus } import sbt.internal.{ Exec, CommandSource, CommandStatus }
import DefaultParsers._ import DefaultParsers._
import Function.tupled import Function.tupled
import Command.applyEffect import Command.applyEffect
@ -193,17 +193,13 @@ object BasicCommands {
} }
} }
def server = Command.command(Server, Help.more(Server, ServerDetailed)) { s => def server = Command.command(Server, Help.more(Server, ServerDetailed)) { s0 =>
val exchange = State.exchange val exchange = State.exchange
exchange.channels foreach { x => val s1 = exchange.run(s0)
x.runOrResume(CommandStatus(s, true)) exchange.publishStatus(CommandStatus(s0, true), None)
x.setStatus(CommandStatus(s, true), None) val Exec(source, line) = exchange.blockUntilNextExec
} val newState = s1.copy(onFailure = Some(Server), remainingCommands = line +: Server +: s1.remainingCommands).setInteractive(true)
val CommandRequest(source, line) = exchange.blockUntilNextCommand exchange.publishStatus(CommandStatus(newState, false), Some(source))
val newState = s.copy(onFailure = Some(Server), remainingCommands = line +: Server +: s.remainingCommands).setInteractive(true)
exchange.channels foreach { x =>
x.setStatus(CommandStatus(newState, false), Some(source))
}
if (line.trim.isEmpty) newState if (line.trim.isEmpty) newState
else newState.clearGlobalLog else newState.clearGlobalLog
} }

View File

@ -1,19 +1,26 @@
package sbt package sbt
package internal package internal
import java.util.concurrent.ConcurrentLinkedQueue
/** /**
* A command channel represents an IO device such as network socket or human * A command channel represents an IO device such as network socket or human
* that can issue command or listen for some outputs. * that can issue command or listen for some outputs.
* We can think of a command channel to be an abstration of the terminal window. * We can think of a command channel to be an abstration of the terminal window.
*/ */
abstract class CommandChannel(exchange: CommandExchange) { abstract class CommandChannel {
/** start listening for a command request. */ private val commandQueue: ConcurrentLinkedQueue[Exec] = new ConcurrentLinkedQueue()
def runOrResume(status: CommandStatus): Unit def append(exec: Exec): Boolean =
def setStatus(status: CommandStatus, lastSource: Option[CommandSource]): Unit commandQueue.add(exec)
def poll: Option[Exec] = Option(commandQueue.poll)
/** start listening for a command exec. */
def run(s: State): State
def publishStatus(status: CommandStatus, lastSource: Option[CommandSource]): Unit
def shutdown(): Unit def shutdown(): Unit
} }
case class CommandRequest(source: CommandSource, commandLine: String) case class Exec(source: CommandSource, commandLine: String)
sealed trait CommandSource sealed trait CommandSource
object CommandSource { object CommandSource {

View File

@ -1,8 +1,9 @@
package sbt package sbt
package internal package internal
import java.util.concurrent.ConcurrentLinkedQueue
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.mutable.ListBuffer
import java.util.concurrent.ConcurrentLinkedQueue
/** /**
* The command exchange merges multiple command channels (e.g. network and console), * The command exchange merges multiple command channels (e.g. network and console),
@ -11,17 +12,41 @@ import scala.annotation.tailrec
* this exchange, which could serve command request from either of the channel. * this exchange, which could serve command request from either of the channel.
*/ */
private[sbt] final class CommandExchange { private[sbt] final class CommandExchange {
private val commandQueue: ConcurrentLinkedQueue[CommandRequest] = new ConcurrentLinkedQueue() private val commandQueue: ConcurrentLinkedQueue[Exec] = new ConcurrentLinkedQueue()
lazy val channels = Seq(new ConsoleChannel(this), new NetworkChannel(this)) private val channelBuffer: ListBuffer[CommandChannel] = new ListBuffer()
def channels: List[CommandChannel] = channelBuffer.toList
def subscribe(c: CommandChannel): Unit =
channelBuffer.append(c)
def append(request: CommandRequest): Boolean = subscribe(new ConsoleChannel())
commandQueue.add(request) subscribe(new NetworkChannel())
@tailrec def blockUntilNextCommand: CommandRequest = // periodically move all messages from all the channels
@tailrec def blockUntilNextExec: Exec =
{
@tailrec def slurpMessages(): Unit =
(((None: Option[Exec]) /: channels) { _ orElse _.poll }) match {
case Some(x) =>
commandQueue.add(x)
slurpMessages
case _ => ()
}
slurpMessages()
Option(commandQueue.poll) match { Option(commandQueue.poll) match {
case Some(x) => x case Some(x) => x
case _ => case _ =>
Thread.sleep(50) Thread.sleep(50)
blockUntilNextCommand blockUntilNextExec
}
}
// fanout run to all channels
def run(s: State): State =
(s /: channels) { (acc, c) => c.run(acc) }
// fanout publishStatus to all channels
def publishStatus(status: CommandStatus, lastSource: Option[CommandSource]): Unit =
channels foreach { c =>
c.publishStatus(status, lastSource)
} }
} }

View File

@ -5,7 +5,7 @@ import sbt.internal.util._
import BasicKeys._ import BasicKeys._
import java.io.File import java.io.File
private[sbt] final class ConsoleChannel(exchange: CommandExchange) extends CommandChannel(exchange) { private[sbt] final class ConsoleChannel extends CommandChannel {
private var askUserThread: Option[Thread] = None private var askUserThread: Option[Thread] = None
def makeAskUserThread(status: CommandStatus): Thread = new Thread("ask-user-thread") { def makeAskUserThread(status: CommandStatus): Thread = new Thread("ask-user-thread") {
val s = status.state val s = status.state
@ -19,8 +19,8 @@ private[sbt] final class ConsoleChannel(exchange: CommandExchange) extends Comma
try { try {
val line = reader.readLine(prompt) val line = reader.readLine(prompt)
line match { line match {
case Some(cmd) => exchange.append(CommandRequest(CommandSource.Human, cmd)) case Some(cmd) => append(Exec(CommandSource.Human, cmd))
case None => exchange.append(CommandRequest(CommandSource.Human, "exit")) case None => append(Exec(CommandSource.Human, "exit"))
} }
} catch { } catch {
case e: InterruptedException => case e: InterruptedException =>
@ -28,7 +28,10 @@ private[sbt] final class ConsoleChannel(exchange: CommandExchange) extends Comma
} }
} }
def runOrResume(status: CommandStatus): Unit = def run(s: State): State = s
def publishStatus(status: CommandStatus, lastSource: Option[CommandSource]): Unit =
if (status.canEnter) {
askUserThread match { askUserThread match {
case Some(x) if x.isAlive => // case Some(x) if x.isAlive => //
case _ => case _ =>
@ -36,10 +39,7 @@ private[sbt] final class ConsoleChannel(exchange: CommandExchange) extends Comma
x.start x.start
askUserThread = Some(x) askUserThread = Some(x)
} }
} else {
def setStatus(status: CommandStatus, lastSource: Option[CommandSource]): Unit =
if (status.canEnter) ()
else {
shutdown() shutdown()
lastSource match { lastSource match {
case Some(src) if src != CommandSource.Human => case Some(src) if src != CommandSource.Human =>

View File

@ -4,19 +4,18 @@ package internal
import sbt.internal.server._ import sbt.internal.server._
import BasicKeys._ import BasicKeys._
private[sbt] final class NetworkChannel(exchange: CommandExchange) extends CommandChannel(exchange) { private[sbt] final class NetworkChannel extends CommandChannel {
private var server: Option[ServerInstance] = None private var server: Option[ServerInstance] = None
def runOrResume(status: CommandStatus): Unit = def run(s: State): State =
{ {
val s = status.state
val port = (s get serverPort) match { val port = (s get serverPort) match {
case Some(x) => x case Some(x) => x
case None => 5001 case None => 5001
} }
def onCommand(command: internal.server.Command): Unit = { def onCommand(command: internal.server.Command): Unit = {
command match { command match {
case Execution(cmd) => exchange.append(CommandRequest(CommandSource.Network, cmd)) case Execution(cmd) => append(Exec(CommandSource.Network, cmd))
} }
} }
server match { server match {
@ -24,6 +23,7 @@ private[sbt] final class NetworkChannel(exchange: CommandExchange) extends Comma
case _ => case _ =>
server = Some(Server.start("127.0.0.1", port, onCommand)) server = Some(Server.start("127.0.0.1", port, onCommand))
} }
s
} }
def shutdown(): Unit = def shutdown(): Unit =
@ -33,13 +33,7 @@ private[sbt] final class NetworkChannel(exchange: CommandExchange) extends Comma
server = None server = None
} }
// network doesn't pause or resume def publishStatus(cmdStatus: CommandStatus, lastSource: Option[CommandSource]): Unit = {
def pause(): Unit = ()
// network doesn't pause or resume
def resume(status: CommandStatus): Unit = ()
def setStatus(cmdStatus: CommandStatus, lastSource: Option[CommandSource]): Unit = {
server.foreach(server => server.foreach(server =>
server.publish( server.publish(
if (cmdStatus.canEnter) StatusEvent(Ready) if (cmdStatus.canEnter) StatusEvent(Ready)