From e040eebd21be8f199ad0fe27cae506ba6f5255f8 Mon Sep 17 00:00:00 2001 From: Adrien Piquerez Date: Tue, 12 May 2020 09:00:44 +0200 Subject: [PATCH] "add failing json rpc response tests" --- .../src/server-test/response/build.sbt | 17 ++++++- .../src/test/scala/testpkg/ResponseTest.scala | 50 +++++++++++++++++++ .../src/test/scala/testpkg/TestServer.scala | 46 ++++++++++++----- 3 files changed, 99 insertions(+), 14 deletions(-) diff --git a/server-test/src/server-test/response/build.sbt b/server-test/src/server-test/response/build.sbt index 5d431a084..ee95370c3 100644 --- a/server-test/src/server-test/response/build.sbt +++ b/server-test/src/server-test/response/build.sbt @@ -25,12 +25,25 @@ Global / serverHandlers += ServerHandler({ callback => case r: JsonRpcRequestMessage if r.method == "foo/rootClasspath" => appendExec(Exec("fooClasspath", Some(r.id), Some(CommandSource(callback.name)))) () + case r if r.method == "foo/respondTwice" => + appendExec(Exec("fooClasspath", Some(r.id), Some(CommandSource(callback.name)))) + jsonRpcRespond("concurrent response", Some(r.id)) + () + case r if r.method == "foo/resultAndError" => + appendExec(Exec("fooCustomFail", Some(r.id), Some(CommandSource(callback.name)))) + jsonRpcRespond("concurrent response", Some(r.id)) + () }, - PartialFunction.empty + { + case r if r.method == "foo/customNotification" => + jsonRpcRespond("notification result", None) + () + } ) }) lazy val fooClasspath = taskKey[Unit]("") + lazy val root = (project in file(".")) .settings( name := "response", @@ -55,5 +68,5 @@ lazy val root = (project in file(".")) val s = state.value val cp = (Compile / fullClasspath).value s.respondEvent(cp.map(_.data)) - }, + } ) diff --git a/server-test/src/test/scala/testpkg/ResponseTest.scala b/server-test/src/test/scala/testpkg/ResponseTest.scala index 8f8ee3459..75cec4250 100644 --- a/server-test/src/test/scala/testpkg/ResponseTest.scala +++ b/server-test/src/test/scala/testpkg/ResponseTest.scala @@ -64,4 +64,54 @@ object ResponseTest extends AbstractServerTest { (s contains """{"jsonrpc":"2.0","method":"foo/something","params":"something"}""") }) } + + test("respond concurrently from a task and the handler") { _ => + svr.sendJsonRpc( + """{ "jsonrpc": "2.0", "id": "15", "method": "foo/respondTwice", "params": {} }""" + ) + assert { + svr.waitForString(1.seconds) { s => + println(s) + s contains "\"id\":\"15\"" + } + } + assert { + // the second response should never be sent + svr.neverReceive(500.milliseconds) { s => + println(s) + s contains "\"id\":\"15\"" + } + } + } + + test("concurrent result and error") { _ => + svr.sendJsonRpc( + """{ "jsonrpc": "2.0", "id": "16", "method": "foo/resultAndError", "params": {} }""" + ) + assert { + svr.waitForString(1.seconds) { s => + println(s) + s contains "\"id\":\"16\"" + } + } + assert { + // the second response (result or error) should never be sent + svr.neverReceive(500.milliseconds) { s => + println(s) + s contains "\"id\":\"16\"" + } + } + } + + test("response to a notification should not be sent") { _ => + svr.sendJsonRpc( + """{ "jsonrpc": "2.0", "method": "foo/customNotification", "params": {} }""" + ) + assert { + svr.neverReceive(500.milliseconds) { s => + println(s) + s contains "\"result\":\"notification result\"" + } + } + } } diff --git a/server-test/src/test/scala/testpkg/TestServer.scala b/server-test/src/test/scala/testpkg/TestServer.scala index ea8625026..ae2e8c2f1 100644 --- a/server-test/src/test/scala/testpkg/TestServer.scala +++ b/server-test/src/test/scala/testpkg/TestServer.scala @@ -8,13 +8,14 @@ package testpkg import java.io.{ File, IOException } +import java.util.concurrent.TimeoutException import verify._ import sbt.RunFromSourceMain import sbt.io.IO import sbt.io.syntax._ import sbt.protocol.ClientSocket -import scala.annotation.tailrec + import scala.concurrent._ import scala.concurrent.duration._ import scala.util.{ Success, Try } @@ -150,6 +151,7 @@ case class TestServer( sbtVersion: String, classpath: Seq[File] ) { + import scala.concurrent.ExecutionContext.Implicits._ import TestServer.hostLog val readBuffer = new Array[Byte](40960) @@ -183,9 +185,15 @@ case class TestServer( waitForPortfile(90.seconds) // make connection to the socket described in the portfile - val (sk, tkn) = ClientSocket.socket(portfile) - val out = sk.getOutputStream - val in = sk.getInputStream + var (sk, _) = ClientSocket.socket(portfile) + var out = sk.getOutputStream + var in = sk.getInputStream + + def resetConnection() = { + sk = ClientSocket.socket(portfile)._1 + out = sk.getOutputStream + in = sk.getInputStream + } // initiate handshake sendJsonRpc( @@ -230,7 +238,7 @@ case class TestServer( writeEndLine } - def readFrame: Option[String] = { + def readFrame: Future[Option[String]] = Future { def getContentLength: Int = { readLine map { line => line.drop(16).toInt @@ -244,14 +252,28 @@ case class TestServer( final def waitForString(duration: FiniteDuration)(f: String => Boolean): Boolean = { val deadline = duration.fromNow - @tailrec def impl(): Boolean = { - if (deadline.isOverdue || !process.isAlive) false - else - readFrame.fold(false)(f) || { - Thread.sleep(100) - impl - } + try { + Await.result(readFrame, deadline.timeLeft).fold(false)(f) || impl + } catch { + case _: TimeoutException => + resetConnection() // create a new connection to invalidate the running readFrame future + false + } + } + impl() + } + + final def neverReceive(duration: FiniteDuration)(f: String => Boolean): Boolean = { + val deadline = duration.fromNow + def impl(): Boolean = { + try { + Await.result(readFrame, deadline.timeLeft).fold(true)(s => !f(s)) && impl + } catch { + case _: TimeoutException => + resetConnection() // create a new connection to invalidate the running readFrame future + true + } } impl() }