From a89fcb37cabbc6ee3e2ece5f965cd83b06495dac Mon Sep 17 00:00:00 2001 From: Eugene Yokota Date: Mon, 21 Mar 2016 14:15:42 -0400 Subject: [PATCH] Exec and CommandExchange refactoring --- .../src/main/scala/sbt/BasicCommands.scala | 18 +++---- .../scala/sbt/internal/CommandChannel.scala | 17 +++++-- .../scala/sbt/internal/CommandExchange.scala | 47 ++++++++++++++----- .../scala/sbt/internal/ConsoleChannel.scala | 28 +++++------ .../scala/sbt/internal/NetworkChannel.scala | 16 ++----- 5 files changed, 74 insertions(+), 52 deletions(-) diff --git a/main-command/src/main/scala/sbt/BasicCommands.scala b/main-command/src/main/scala/sbt/BasicCommands.scala index 506e28509..12e0ef49b 100644 --- a/main-command/src/main/scala/sbt/BasicCommands.scala +++ b/main-command/src/main/scala/sbt/BasicCommands.scala @@ -6,7 +6,7 @@ import sbt.internal.util.complete.{ Completion, Completions, DefaultParsers, His import sbt.internal.util.Types.{ const, idFun } import sbt.internal.inc.classpath.ClasspathUtilities.toLoader import sbt.internal.inc.ModuleUtilities -import sbt.internal.{ CommandRequest, CommandSource, CommandStatus } +import sbt.internal.{ Exec, CommandSource, CommandStatus } import DefaultParsers._ import Function.tupled import Command.applyEffect @@ -193,17 +193,13 @@ object BasicCommands { } } - def server = Command.command(Server, Help.more(Server, ServerDetailed)) { s => + def server = Command.command(Server, Help.more(Server, ServerDetailed)) { s0 => val exchange = State.exchange - exchange.channels foreach { x => - x.runOrResume(CommandStatus(s, true)) - x.setStatus(CommandStatus(s, true), None) - } - val CommandRequest(source, line) = exchange.blockUntilNextCommand - val newState = s.copy(onFailure = Some(Server), remainingCommands = line +: Server +: s.remainingCommands).setInteractive(true) - exchange.channels foreach { x => - x.setStatus(CommandStatus(newState, false), Some(source)) - } + val s1 = exchange.run(s0) + exchange.publishStatus(CommandStatus(s0, true), None) + val Exec(source, line) = exchange.blockUntilNextExec + val newState = s1.copy(onFailure = Some(Server), remainingCommands = line +: Server +: s1.remainingCommands).setInteractive(true) + exchange.publishStatus(CommandStatus(newState, false), Some(source)) if (line.trim.isEmpty) newState else newState.clearGlobalLog } diff --git a/main/command/src/main/scala/sbt/internal/CommandChannel.scala b/main/command/src/main/scala/sbt/internal/CommandChannel.scala index c6bf79049..f01d9ffbd 100644 --- a/main/command/src/main/scala/sbt/internal/CommandChannel.scala +++ b/main/command/src/main/scala/sbt/internal/CommandChannel.scala @@ -1,19 +1,26 @@ package sbt package internal +import java.util.concurrent.ConcurrentLinkedQueue + /** * A command channel represents an IO device such as network socket or human * that can issue command or listen for some outputs. * We can think of a command channel to be an abstration of the terminal window. */ -abstract class CommandChannel(exchange: CommandExchange) { - /** start listening for a command request. */ - def runOrResume(status: CommandStatus): Unit - def setStatus(status: CommandStatus, lastSource: Option[CommandSource]): Unit +abstract class CommandChannel { + private val commandQueue: ConcurrentLinkedQueue[Exec] = new ConcurrentLinkedQueue() + def append(exec: Exec): Boolean = + commandQueue.add(exec) + def poll: Option[Exec] = Option(commandQueue.poll) + + /** start listening for a command exec. */ + def run(s: State): State + def publishStatus(status: CommandStatus, lastSource: Option[CommandSource]): Unit def shutdown(): Unit } -case class CommandRequest(source: CommandSource, commandLine: String) +case class Exec(source: CommandSource, commandLine: String) sealed trait CommandSource object CommandSource { diff --git a/main/command/src/main/scala/sbt/internal/CommandExchange.scala b/main/command/src/main/scala/sbt/internal/CommandExchange.scala index 11ecbed25..6bce31b0f 100644 --- a/main/command/src/main/scala/sbt/internal/CommandExchange.scala +++ b/main/command/src/main/scala/sbt/internal/CommandExchange.scala @@ -1,8 +1,9 @@ package sbt package internal -import java.util.concurrent.ConcurrentLinkedQueue import scala.annotation.tailrec +import scala.collection.mutable.ListBuffer +import java.util.concurrent.ConcurrentLinkedQueue /** * The command exchange merges multiple command channels (e.g. network and console), @@ -11,17 +12,41 @@ import scala.annotation.tailrec * this exchange, which could serve command request from either of the channel. */ private[sbt] final class CommandExchange { - private val commandQueue: ConcurrentLinkedQueue[CommandRequest] = new ConcurrentLinkedQueue() - lazy val channels = Seq(new ConsoleChannel(this), new NetworkChannel(this)) + private val commandQueue: ConcurrentLinkedQueue[Exec] = new ConcurrentLinkedQueue() + private val channelBuffer: ListBuffer[CommandChannel] = new ListBuffer() + def channels: List[CommandChannel] = channelBuffer.toList + def subscribe(c: CommandChannel): Unit = + channelBuffer.append(c) - def append(request: CommandRequest): Boolean = - commandQueue.add(request) + subscribe(new ConsoleChannel()) + subscribe(new NetworkChannel()) - @tailrec def blockUntilNextCommand: CommandRequest = - Option(commandQueue.poll) match { - case Some(x) => x - case _ => - Thread.sleep(50) - blockUntilNextCommand + // periodically move all messages from all the channels + @tailrec def blockUntilNextExec: Exec = + { + @tailrec def slurpMessages(): Unit = + (((None: Option[Exec]) /: channels) { _ orElse _.poll }) match { + case Some(x) => + commandQueue.add(x) + slurpMessages + case _ => () + } + slurpMessages() + Option(commandQueue.poll) match { + case Some(x) => x + case _ => + Thread.sleep(50) + blockUntilNextExec + } + } + + // fanout run to all channels + def run(s: State): State = + (s /: channels) { (acc, c) => c.run(acc) } + + // fanout publishStatus to all channels + def publishStatus(status: CommandStatus, lastSource: Option[CommandSource]): Unit = + channels foreach { c => + c.publishStatus(status, lastSource) } } diff --git a/main/command/src/main/scala/sbt/internal/ConsoleChannel.scala b/main/command/src/main/scala/sbt/internal/ConsoleChannel.scala index f845c02ef..8827c866c 100644 --- a/main/command/src/main/scala/sbt/internal/ConsoleChannel.scala +++ b/main/command/src/main/scala/sbt/internal/ConsoleChannel.scala @@ -5,7 +5,7 @@ import sbt.internal.util._ import BasicKeys._ import java.io.File -private[sbt] final class ConsoleChannel(exchange: CommandExchange) extends CommandChannel(exchange) { +private[sbt] final class ConsoleChannel extends CommandChannel { private var askUserThread: Option[Thread] = None def makeAskUserThread(status: CommandStatus): Thread = new Thread("ask-user-thread") { val s = status.state @@ -19,8 +19,8 @@ private[sbt] final class ConsoleChannel(exchange: CommandExchange) extends Comma try { val line = reader.readLine(prompt) line match { - case Some(cmd) => exchange.append(CommandRequest(CommandSource.Human, cmd)) - case None => exchange.append(CommandRequest(CommandSource.Human, "exit")) + case Some(cmd) => append(Exec(CommandSource.Human, cmd)) + case None => append(Exec(CommandSource.Human, "exit")) } } catch { case e: InterruptedException => @@ -28,18 +28,18 @@ private[sbt] final class ConsoleChannel(exchange: CommandExchange) extends Comma } } - def runOrResume(status: CommandStatus): Unit = - askUserThread match { - case Some(x) if x.isAlive => // - case _ => - val x = makeAskUserThread(status) - x.start - askUserThread = Some(x) - } + def run(s: State): State = s - def setStatus(status: CommandStatus, lastSource: Option[CommandSource]): Unit = - if (status.canEnter) () - else { + def publishStatus(status: CommandStatus, lastSource: Option[CommandSource]): Unit = + if (status.canEnter) { + askUserThread match { + case Some(x) if x.isAlive => // + case _ => + val x = makeAskUserThread(status) + x.start + askUserThread = Some(x) + } + } else { shutdown() lastSource match { case Some(src) if src != CommandSource.Human => diff --git a/main/command/src/main/scala/sbt/internal/NetworkChannel.scala b/main/command/src/main/scala/sbt/internal/NetworkChannel.scala index 701a4ecdb..f4a2abc02 100644 --- a/main/command/src/main/scala/sbt/internal/NetworkChannel.scala +++ b/main/command/src/main/scala/sbt/internal/NetworkChannel.scala @@ -4,19 +4,18 @@ package internal import sbt.internal.server._ import BasicKeys._ -private[sbt] final class NetworkChannel(exchange: CommandExchange) extends CommandChannel(exchange) { +private[sbt] final class NetworkChannel extends CommandChannel { private var server: Option[ServerInstance] = None - def runOrResume(status: CommandStatus): Unit = + def run(s: State): State = { - val s = status.state val port = (s get serverPort) match { case Some(x) => x case None => 5001 } def onCommand(command: internal.server.Command): Unit = { command match { - case Execution(cmd) => exchange.append(CommandRequest(CommandSource.Network, cmd)) + case Execution(cmd) => append(Exec(CommandSource.Network, cmd)) } } server match { @@ -24,6 +23,7 @@ private[sbt] final class NetworkChannel(exchange: CommandExchange) extends Comma case _ => server = Some(Server.start("127.0.0.1", port, onCommand)) } + s } def shutdown(): Unit = @@ -33,13 +33,7 @@ private[sbt] final class NetworkChannel(exchange: CommandExchange) extends Comma server = None } - // network doesn't pause or resume - def pause(): Unit = () - - // network doesn't pause or resume - def resume(status: CommandStatus): Unit = () - - def setStatus(cmdStatus: CommandStatus, lastSource: Option[CommandSource]): Unit = { + def publishStatus(cmdStatus: CommandStatus, lastSource: Option[CommandSource]): Unit = { server.foreach(server => server.publish( if (cmdStatus.canEnter) StatusEvent(Ready)