From ff72eeb7d336e01517b0275a7b6e7f7578cea11e Mon Sep 17 00:00:00 2001 From: Han JU Date: Fri, 2 Sep 2016 18:39:13 +0200 Subject: [PATCH] Direct call to spark's SparkSubmit --- .../main/scala/coursier/cli/SparkSubmit.scala | 76 ++++++++++--------- 1 file changed, 40 insertions(+), 36 deletions(-) diff --git a/spark/src/main/scala/coursier/cli/SparkSubmit.scala b/spark/src/main/scala/coursier/cli/SparkSubmit.scala index 18f185ac0..79d2f0b00 100644 --- a/spark/src/main/scala/coursier/cli/SparkSubmit.scala +++ b/spark/src/main/scala/coursier/cli/SparkSubmit.scala @@ -1,13 +1,16 @@ package coursier.cli -import java.io.{BufferedReader, File, InputStream, InputStreamReader, OutputStream} +import java.io.{PrintStream, BufferedReader, File, PipedInputStream, PipedOutputStream, InputStream, InputStreamReader} import java.net.{URL, URLClassLoader} import java.nio.file.{Files, Paths} import caseapp._ +import org.apache.commons.io.output.TeeOutputStream import scala.util.control.NonFatal +import org.apache.spark.deploy.{SparkSubmit => SparkMain} + @CommandName("spark-submit") case class SparkSubmit( @Recurse @@ -30,16 +33,9 @@ case class SparkSubmit( else options.sparkHome - def searchAssembly(dir: File): String = { + def searchAssembly(dir: File): Array[File] = { Option(dir.listFiles()).getOrElse(Array.empty).filter { f => f.isFile && f.getName.endsWith(".jar") && f.getName.contains("spark-assembly") - } match { - case Array(assemblyFile) => - assemblyFile.getAbsolutePath - case Array() => - throw new Exception(s"No spark assembly found under $dir") - case jars => - throw new Exception(s"Found several assembly JARs under $dir: ${jars.mkString(",")}") } } @@ -52,7 +48,16 @@ case class SparkSubmit( new File(sparkHome + "/lib") ) - dirs.map(searchAssembly).head + // take the first assembly jar + dirs.map(searchAssembly) + .foldLeft(Array(): Array[File])(_ ++ _) match { + case Array(assembly) => + assembly.getAbsolutePath + case Array() => + throw new Exception(s"No spark assembly found under ${dirs.mkString(",")}") + case jars => + throw new Exception(s"Found several assembly JARs: ${jars.mkString(",")}") + } } val libManaged = { @@ -121,12 +126,12 @@ case class SparkSubmit( val sparkSubmitOptions = sparkOpts ++ extraJarsOptions ++ mainClassOptions ++ Seq(mainJar) ++ jobArgs - val cmd = Seq( - "java", - "-cp", - cp.mkString(File.pathSeparator), - "org.apache.spark.deploy.SparkSubmit" - ) ++ sparkSubmitOptions + Console.err.println( + "Running spark app with extra classpath:\n" + + s"${cp.mkString(File.pathSeparator).map(" "+_).mkString("\n")}\n") + + Console.err.println( + s"Running spark app with options:\n${sparkSubmitOptions.map(" "+_).mkString("\n")}\n") object YarnAppId { val Pattern = ".*Application report for ([^ ]+) .*".r @@ -191,14 +196,8 @@ case class SparkSubmit( None } - Console.err.println(s"Running command:\n${cmd.map(" "+_).mkString("\n")}\n") - - val process = new ProcessBuilder() - .command(cmd: _*) - .redirectErrorStream(true) // merges error stream into output stream - .start() - - def pipeThread(from: InputStream, to: OutputStream) = { + // Create a thread that inspects the spark's output + def outputInspectThread(from: InputStream) = { val t = new Thread { override def run() = { val in = new BufferedReader(new InputStreamReader(from)) @@ -210,8 +209,6 @@ case class SparkSubmit( if (options.maxIdleTime > 0) IdleChecker.updateLastMessageTs() - to.write((line + "\n").getBytes("UTF-8")) - if (YarnAppId.fileOpt.nonEmpty) try YarnAppId.handleMessage(line) catch { @@ -221,23 +218,30 @@ case class SparkSubmit( } } - t.setName("pipe-output") + t.setName("spark-output") t.setDaemon(true) t } - val is = process.getInputStream + // setup the inspection of spark's output + // redirect stderr to stdout + System.setErr(System.out) - val isPipeThread = pipeThread(is, System.out) + val orig = System.out + + val in = new PipedInputStream() + val out = new PipedOutputStream(in) + + // multiplexing stdout + val tee = new TeeOutputStream(orig, out) + System.setOut(new PrintStream(tee)) + + val isPipeThread = outputInspectThread(in) IdleChecker.checkThreadOpt.foreach(_.start()) isPipeThread.start() - val exitValue = process.waitFor() - - sys.exit(exitValue) - - // - SparkSubmit.main(sparkSubmitOptions.toArray) -} + // After all the setup, finally launch spark + SparkMain.main(sparkSubmitOptions.toArray) +} \ No newline at end of file