diff --git a/main-command/src/main/scala/sbt/BasicCommands.scala b/main-command/src/main/scala/sbt/BasicCommands.scala index 4fbae934d..506e28509 100644 --- a/main-command/src/main/scala/sbt/BasicCommands.scala +++ b/main-command/src/main/scala/sbt/BasicCommands.scala @@ -6,7 +6,7 @@ import sbt.internal.util.complete.{ Completion, Completions, DefaultParsers, His import sbt.internal.util.Types.{ const, idFun } import sbt.internal.inc.classpath.ClasspathUtilities.toLoader import sbt.internal.inc.ModuleUtilities -import sbt.internal.{ NetworkListener, ConsoleListener } +import sbt.internal.{ CommandRequest, CommandSource, CommandStatus } import DefaultParsers._ import Function.tupled import Command.applyEffect @@ -16,11 +16,9 @@ import BasicKeys._ import java.io.File import sbt.io.IO -import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicBoolean import scala.util.control.NonFatal -import scala.annotation.tailrec object BasicCommands { lazy val allBasicCommands = Seq(nop, ignore, help, completionsCommand, multi, ifLast, append, setOnFailure, clearOnFailure, stashOnFailure, popOnFailure, reboot, call, early, exit, continuous, history, shell, server, read, alias) ++ compatCommands @@ -195,39 +193,19 @@ object BasicCommands { } } - private[sbt] var askingAlready = false - private[sbt] val commandQueue: ConcurrentLinkedQueue[(String, Option[String])] = new ConcurrentLinkedQueue() - private[sbt] val commandListers = Seq(new ConsoleListener(commandQueue), new NetworkListener(commandQueue)) - @tailrec private[sbt] def blockUntilNextCommand: (String, Option[String]) = - Option(commandQueue.poll) match { - case Some(x) => x - case _ => - Thread.sleep(50) - blockUntilNextCommand - } def server = Command.command(Server, Help.more(Server, ServerDetailed)) { s => - if (askingAlready) { - commandListers foreach { x => - x.resume(CommandStatus(s, true)) - } - } else { - commandListers foreach { x => - x.run(CommandStatus(s, true)) - } - askingAlready = true + val exchange = State.exchange + exchange.channels foreach { x => + x.runOrResume(CommandStatus(s, true)) + x.setStatus(CommandStatus(s, true), None) } - blockUntilNextCommand match { - case (source, Some(line)) => - if (source != "human") { - println(line) - } - commandListers foreach { x => - x.pause() - } - val newState = s.copy(onFailure = Some(Server), remainingCommands = line +: Server +: s.remainingCommands).setInteractive(true) - if (line.trim.isEmpty) newState else newState.clearGlobalLog - case _ => s.setInteractive(false) + val CommandRequest(source, line) = exchange.blockUntilNextCommand + val newState = s.copy(onFailure = Some(Server), remainingCommands = line +: Server +: s.remainingCommands).setInteractive(true) + exchange.channels foreach { x => + x.setStatus(CommandStatus(newState, false), Some(source)) } + if (line.trim.isEmpty) newState + else newState.clearGlobalLog } def read = Command.make(ReadCommand, Help.more(ReadCommand, ReadDetailed))(s => applyEffect(readParser(s))(doRead(s))) diff --git a/main-command/src/main/scala/sbt/State.scala b/main-command/src/main/scala/sbt/State.scala index f4b4041cf..61edef3e6 100644 --- a/main-command/src/main/scala/sbt/State.scala +++ b/main-command/src/main/scala/sbt/State.scala @@ -8,6 +8,7 @@ import java.util.concurrent.Callable import sbt.util.Logger import sbt.internal.util.{ AttributeKey, AttributeMap, ErrorHandling, ExitHook, ExitHooks, GlobalLogging } import sbt.internal.util.complete.HistoryCommands +import sbt.internal.CommandExchange import sbt.internal.inc.classpath.ClassLoaderCache /** @@ -178,6 +179,8 @@ object State { new Reboot(app.scalaProvider.version, state.remainingCommands, app.id, state.configuration.baseDirectory) } + private[sbt] lazy val exchange = new CommandExchange() + /** Provides operations and transformations on State. */ implicit def stateOps(s: State): StateOps = new StateOps { def process(f: (String, State) => State): State = diff --git a/main/command/src/main/scala/sbt/CommandStatus.scala b/main/command/src/main/scala/sbt/CommandStatus.scala deleted file mode 100644 index 60d0eb76c..000000000 --- a/main/command/src/main/scala/sbt/CommandStatus.scala +++ /dev/null @@ -1,3 +0,0 @@ -package sbt - -case class CommandStatus(state: State, canEnter: Boolean) diff --git a/main/command/src/main/scala/sbt/internal/CommandChannel.scala b/main/command/src/main/scala/sbt/internal/CommandChannel.scala new file mode 100644 index 000000000..c6bf79049 --- /dev/null +++ b/main/command/src/main/scala/sbt/internal/CommandChannel.scala @@ -0,0 +1,29 @@ +package sbt +package internal + +/** + * A command channel represents an IO device such as network socket or human + * that can issue command or listen for some outputs. + * We can think of a command channel to be an abstration of the terminal window. + */ +abstract class CommandChannel(exchange: CommandExchange) { + /** start listening for a command request. */ + def runOrResume(status: CommandStatus): Unit + def setStatus(status: CommandStatus, lastSource: Option[CommandSource]): Unit + def shutdown(): Unit +} + +case class CommandRequest(source: CommandSource, commandLine: String) + +sealed trait CommandSource +object CommandSource { + case object Human extends CommandSource + case object Network extends CommandSource +} + +/** + * This is a data that is passed on to the channels. + * The canEnter paramter indicates that the console devise or UI + * should stop listening. + */ +case class CommandStatus(state: State, canEnter: Boolean) diff --git a/main/command/src/main/scala/sbt/internal/CommandExchange.scala b/main/command/src/main/scala/sbt/internal/CommandExchange.scala new file mode 100644 index 000000000..11ecbed25 --- /dev/null +++ b/main/command/src/main/scala/sbt/internal/CommandExchange.scala @@ -0,0 +1,27 @@ +package sbt +package internal + +import java.util.concurrent.ConcurrentLinkedQueue +import scala.annotation.tailrec + +/** + * The command exchange merges multiple command channels (e.g. network and console), + * and acts as the central multiplexing point. + * Instead of blocking on JLine.readLine, the server commmand will block on + * this exchange, which could serve command request from either of the channel. + */ +private[sbt] final class CommandExchange { + private val commandQueue: ConcurrentLinkedQueue[CommandRequest] = new ConcurrentLinkedQueue() + lazy val channels = Seq(new ConsoleChannel(this), new NetworkChannel(this)) + + def append(request: CommandRequest): Boolean = + commandQueue.add(request) + + @tailrec def blockUntilNextCommand: CommandRequest = + Option(commandQueue.poll) match { + case Some(x) => x + case _ => + Thread.sleep(50) + blockUntilNextCommand + } +} diff --git a/main/command/src/main/scala/sbt/internal/CommandListener.scala b/main/command/src/main/scala/sbt/internal/CommandListener.scala deleted file mode 100644 index 2ea5fc7ee..000000000 --- a/main/command/src/main/scala/sbt/internal/CommandListener.scala +++ /dev/null @@ -1,14 +0,0 @@ -package sbt -package internal - -import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.atomic.AtomicBoolean - -abstract class CommandListener(queue: ConcurrentLinkedQueue[(String, Option[String])]) { - // represents a loop that keeps asking an IO device for String input - def run(status: CommandStatus): Unit - def shutdown(): Unit - def setStatus(status: CommandStatus): Unit - def pause(): Unit - def resume(status: CommandStatus): Unit -} diff --git a/main/command/src/main/scala/sbt/internal/ConsoleListener.scala b/main/command/src/main/scala/sbt/internal/ConsoleChannel.scala similarity index 52% rename from main/command/src/main/scala/sbt/internal/ConsoleListener.scala rename to main/command/src/main/scala/sbt/internal/ConsoleChannel.scala index 59bee14a7..f845c02ef 100644 --- a/main/command/src/main/scala/sbt/internal/ConsoleListener.scala +++ b/main/command/src/main/scala/sbt/internal/ConsoleChannel.scala @@ -2,29 +2,33 @@ package sbt package internal import sbt.internal.util._ -import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference } import BasicKeys._ import java.io.File -private[sbt] final class ConsoleListener(queue: ConcurrentLinkedQueue[(String, Option[String])]) extends CommandListener(queue) { +private[sbt] final class ConsoleChannel(exchange: CommandExchange) extends CommandChannel(exchange) { private var askUserThread: Option[Thread] = None def makeAskUserThread(status: CommandStatus): Thread = new Thread("ask-user-thread") { val s = status.state val history = (s get historyPath) getOrElse Some(new File(s.baseDir, ".history")) - val prompt = (s get shellPrompt) match { case Some(pf) => pf(s); case None => "> " } + val prompt = (s get shellPrompt) match { + case Some(pf) => pf(s) + case None => "> " + } val reader = new FullReader(history, s.combinedParser) override def run(): Unit = { try { val line = reader.readLine(prompt) - line map { x => queue.add(("human", Some(x))) } + line match { + case Some(cmd) => exchange.append(CommandRequest(CommandSource.Human, cmd)) + case None => exchange.append(CommandRequest(CommandSource.Human, "exit")) + } } catch { case e: InterruptedException => } } } - def run(status: CommandStatus): Unit = + def runOrResume(status: CommandStatus): Unit = askUserThread match { case Some(x) if x.isAlive => // case _ => @@ -33,6 +37,20 @@ private[sbt] final class ConsoleListener(queue: ConcurrentLinkedQueue[(String, O askUserThread = Some(x) } + def setStatus(status: CommandStatus, lastSource: Option[CommandSource]): Unit = + if (status.canEnter) () + else { + shutdown() + lastSource match { + case Some(src) if src != CommandSource.Human => + val s = status.state + s.remainingCommands.headOption map { + println(_) + } + case _ => // + } + } + def shutdown(): Unit = askUserThread match { case Some(x) if x.isAlive => @@ -40,17 +58,4 @@ private[sbt] final class ConsoleListener(queue: ConcurrentLinkedQueue[(String, O askUserThread = None case _ => () } - - def pause(): Unit = shutdown() - - def resume(status: CommandStatus): Unit = - askUserThread match { - case Some(x) if x.isAlive => // - case _ => - val x = makeAskUserThread(status) - x.start - askUserThread = Some(x) - } - - def setStatus(status: CommandStatus): Unit = () } diff --git a/main/command/src/main/scala/sbt/internal/NetworkListener.scala b/main/command/src/main/scala/sbt/internal/NetworkChannel.scala similarity index 58% rename from main/command/src/main/scala/sbt/internal/NetworkListener.scala rename to main/command/src/main/scala/sbt/internal/NetworkChannel.scala index e3bc7fda5..3d44a7b6a 100644 --- a/main/command/src/main/scala/sbt/internal/NetworkListener.scala +++ b/main/command/src/main/scala/sbt/internal/NetworkChannel.scala @@ -1,29 +1,30 @@ package sbt package internal -import java.util.concurrent.ConcurrentLinkedQueue - import sbt.internal.server._ -private[sbt] final class NetworkListener(queue: ConcurrentLinkedQueue[(String, Option[String])]) extends CommandListener(queue) { - +private[sbt] final class NetworkChannel(exchange: CommandExchange) extends CommandChannel(exchange) { private var server: Option[ServerInstance] = None - def run(status: CommandStatus): Unit = + def runOrResume(status: CommandStatus): Unit = { def onCommand(command: internal.server.Command): Unit = { command match { - case Execution(cmd) => queue.add(("network", Some(cmd))) + case Execution(cmd) => exchange.append(CommandRequest(CommandSource.Network, cmd)) } } - - server = Some(Server.start("127.0.0.1", 12700, onCommand)) + server match { + case Some(x) => // do nothing + case _ => + server = Some(Server.start("127.0.0.1", 12700, onCommand)) + } } def shutdown(): Unit = { // interrupt and kill the thread server.foreach(_.shutdown()) + server = None } // network doesn't pause or resume @@ -32,7 +33,7 @@ private[sbt] final class NetworkListener(queue: ConcurrentLinkedQueue[(String, O // network doesn't pause or resume def resume(status: CommandStatus): Unit = () - def setStatus(cmdStatus: CommandStatus): Unit = { + def setStatus(cmdStatus: CommandStatus, lastSource: Option[CommandSource]): Unit = { server.foreach(server => server.publish( if (cmdStatus.canEnter) StatusEvent(Ready)