Direct call to spark's SparkSubmit

This commit is contained in:
Han JU 2016-09-02 18:39:13 +02:00
parent eac19c1a66
commit ff72eeb7d3
1 changed files with 40 additions and 36 deletions

View File

@ -1,13 +1,16 @@
package coursier.cli
import java.io.{BufferedReader, File, InputStream, InputStreamReader, OutputStream}
import java.io.{PrintStream, BufferedReader, File, PipedInputStream, PipedOutputStream, InputStream, InputStreamReader}
import java.net.{URL, URLClassLoader}
import java.nio.file.{Files, Paths}
import caseapp._
import org.apache.commons.io.output.TeeOutputStream
import scala.util.control.NonFatal
import org.apache.spark.deploy.{SparkSubmit => SparkMain}
@CommandName("spark-submit")
case class SparkSubmit(
@Recurse
@ -30,16 +33,9 @@ case class SparkSubmit(
else
options.sparkHome
def searchAssembly(dir: File): String = {
def searchAssembly(dir: File): Array[File] = {
Option(dir.listFiles()).getOrElse(Array.empty).filter { f =>
f.isFile && f.getName.endsWith(".jar") && f.getName.contains("spark-assembly")
} match {
case Array(assemblyFile) =>
assemblyFile.getAbsolutePath
case Array() =>
throw new Exception(s"No spark assembly found under $dir")
case jars =>
throw new Exception(s"Found several assembly JARs under $dir: ${jars.mkString(",")}")
}
}
@ -52,7 +48,16 @@ case class SparkSubmit(
new File(sparkHome + "/lib")
)
dirs.map(searchAssembly).head
// take the first assembly jar
dirs.map(searchAssembly)
.foldLeft(Array(): Array[File])(_ ++ _) match {
case Array(assembly) =>
assembly.getAbsolutePath
case Array() =>
throw new Exception(s"No spark assembly found under ${dirs.mkString(",")}")
case jars =>
throw new Exception(s"Found several assembly JARs: ${jars.mkString(",")}")
}
}
val libManaged = {
@ -121,12 +126,12 @@ case class SparkSubmit(
val sparkSubmitOptions = sparkOpts ++ extraJarsOptions ++ mainClassOptions ++
Seq(mainJar) ++ jobArgs
val cmd = Seq(
"java",
"-cp",
cp.mkString(File.pathSeparator),
"org.apache.spark.deploy.SparkSubmit"
) ++ sparkSubmitOptions
Console.err.println(
"Running spark app with extra classpath:\n" +
s"${cp.mkString(File.pathSeparator).map(" "+_).mkString("\n")}\n")
Console.err.println(
s"Running spark app with options:\n${sparkSubmitOptions.map(" "+_).mkString("\n")}\n")
object YarnAppId {
val Pattern = ".*Application report for ([^ ]+) .*".r
@ -191,14 +196,8 @@ case class SparkSubmit(
None
}
Console.err.println(s"Running command:\n${cmd.map(" "+_).mkString("\n")}\n")
val process = new ProcessBuilder()
.command(cmd: _*)
.redirectErrorStream(true) // merges error stream into output stream
.start()
def pipeThread(from: InputStream, to: OutputStream) = {
// Create a thread that inspects the spark's output
def outputInspectThread(from: InputStream) = {
val t = new Thread {
override def run() = {
val in = new BufferedReader(new InputStreamReader(from))
@ -210,8 +209,6 @@ case class SparkSubmit(
if (options.maxIdleTime > 0)
IdleChecker.updateLastMessageTs()
to.write((line + "\n").getBytes("UTF-8"))
if (YarnAppId.fileOpt.nonEmpty)
try YarnAppId.handleMessage(line)
catch {
@ -221,23 +218,30 @@ case class SparkSubmit(
}
}
t.setName("pipe-output")
t.setName("spark-output")
t.setDaemon(true)
t
}
val is = process.getInputStream
// setup the inspection of spark's output
// redirect stderr to stdout
System.setErr(System.out)
val isPipeThread = pipeThread(is, System.out)
val orig = System.out
val in = new PipedInputStream()
val out = new PipedOutputStream(in)
// multiplexing stdout
val tee = new TeeOutputStream(orig, out)
System.setOut(new PrintStream(tee))
val isPipeThread = outputInspectThread(in)
IdleChecker.checkThreadOpt.foreach(_.start())
isPipeThread.start()
val exitValue = process.waitFor()
sys.exit(exitValue)
//
SparkSubmit.main(sparkSubmitOptions.toArray)
}
// After all the setup, finally launch spark
SparkMain.main(sparkSubmitOptions.toArray)
}