Keep reworking spark-submit command

This commit is contained in:
Alexandre Archambault 2016-09-09 00:34:37 +02:00
parent 34d04fe45e
commit e55c6384bd
9 changed files with 497 additions and 324 deletions

View File

@ -561,16 +561,3 @@ lazy val `coursier` = project.in(file("."))
.settings(
moduleName := "coursier-root"
)
val sparkVersion = "1.6.1"
lazy val spark = project
.dependsOn(cli)
.settings(commonSettings)
.settings(noPublishForScalaVersionSettings("2.10"))
.settings(packAutoSettings)
.settings(proguardSettings)
.settings(
name := "coursier-spark",
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion
)

View File

@ -581,35 +581,36 @@ class Helper(
files0
}
lazy val (parentLoader, filteredFiles) = {
def contextLoader = Thread.currentThread().getContextClassLoader
val contextLoader = Thread.currentThread().getContextClassLoader
// TODO Would ClassLoader.getSystemClassLoader be better here?
val baseLoader: ClassLoader =
Launch.mainClassLoader(contextLoader)
.flatMap(cl => Option(cl.getParent))
.getOrElse {
// proguarded -> no risk of conflicts, no absolute need to find a specific ClassLoader
val isProguarded = Try(contextLoader.loadClass("coursier.cli.Launch")).isFailure
if (warnBaseLoaderNotFound && !isProguarded && common.verbosityLevel >= 0)
Console.err.println(
"Warning: cannot find the main ClassLoader that launched coursier.\n" +
"Was coursier launched by its main launcher? " +
"The ClassLoader of the application that is about to be launched will be intertwined " +
"with the one of coursier, which may be a problem if their dependencies conflict."
)
contextLoader
}
lazy val (parentLoader, filteredFiles) = {
val files0 = fetch(sources = false, javadoc = false)
val parentLoader0: ClassLoader =
Launch.mainClassLoader(contextLoader)
.flatMap(cl => Option(cl.getParent))
.getOrElse {
// proguarded -> no risk of conflicts, no absolute need to find a specific ClassLoader
val isProguarded = Try(contextLoader.loadClass("coursier.cli.Launch")).isFailure
if (warnBaseLoaderNotFound && !isProguarded && common.verbosityLevel >= 0)
Console.err.println(
"Warning: cannot find the main ClassLoader that launched coursier.\n" +
"Was coursier launched by its main launcher? " +
"The ClassLoader of the application that is about to be launched will be intertwined " +
"with the one of coursier, which may be a problem if their dependencies conflict."
)
contextLoader
}
if (isolated.isolated.isEmpty)
(parentLoader0, files0)
(baseLoader, files0)
else {
val isolatedDeps = isolated.isolatedDeps(common.defaultArtifactType, common.scalaVersion)
val (isolatedLoader, filteredFiles0) = isolated.targets.foldLeft((parentLoader0, files0)) {
val (isolatedLoader, filteredFiles0) = isolated.targets.foldLeft((baseLoader, files0)) {
case ((parent, files0), target) =>
// FIXME These were already fetched above

View File

@ -33,6 +33,43 @@ object Launch {
mainClassLoader(cl.getParent)
}
def run(
loader: ClassLoader,
mainClass: String,
args: Seq[String],
verbosity: Int,
beforeMain: => Unit = ()
): Unit = {
val cls =
try loader.loadClass(mainClass)
catch { case e: ClassNotFoundException =>
Helper.errPrintln(s"Error: class $mainClass not found")
sys.exit(255)
}
val method =
try cls.getMethod("main", classOf[Array[String]])
catch { case e: NoSuchMethodException =>
Helper.errPrintln(s"Error: method main not found in $mainClass")
sys.exit(255)
}
method.setAccessible(true)
if (verbosity >= 2)
Helper.errPrintln(s"Launching $mainClass ${args.mkString(" ")}")
else if (verbosity == 1)
Helper.errPrintln(s"Launching")
beforeMain
Thread.currentThread().setContextClassLoader(loader)
try method.invoke(null, args.toArray)
catch {
case e: java.lang.reflect.InvocationTargetException =>
throw Option(e.getCause).getOrElse(e)
}
}
}
class IsolatedClassLoader(
@ -86,29 +123,10 @@ case class Launch(
else
options.mainClass
val cls =
try helper.loader.loadClass(mainClass)
catch { case e: ClassNotFoundException =>
Helper.errPrintln(s"Error: class $mainClass not found")
sys.exit(255)
}
val method =
try cls.getMethod("main", classOf[Array[String]])
catch { case e: NoSuchMethodException =>
Helper.errPrintln(s"Error: method main not found in $mainClass")
sys.exit(255)
}
method.setAccessible(true)
if (options.common.verbosityLevel >= 2)
Helper.errPrintln(s"Launching $mainClass ${userArgs.mkString(" ")}")
else if (options.common.verbosityLevel == 1)
Helper.errPrintln(s"Launching")
Thread.currentThread().setContextClassLoader(helper.loader)
try method.invoke(null, userArgs.toArray)
catch {
case e: java.lang.reflect.InvocationTargetException =>
throw Option(e.getCause).getOrElse(e)
}
Launch.run(
helper.loader,
mainClass,
userArgs,
options.common.verbosityLevel
)
}

View File

@ -220,3 +220,25 @@ case class BootstrapOptions(
@Recurse
common: CommonOptions
)
case class SparkSubmitOptions(
@Short("M")
@Short("main")
@Help("Main class to be launched (optional if in manifest)")
mainClass: String,
@Help("If master is yarn-cluster, write YARN app ID to a file. (The ID is deduced from the spark-submit output.)")
@Value("file")
yarnIdFile: String,
@Help("Spark assembly. If empty, automatically generate (default: empty)")
sparkAssembly: String,
noDefaultAssemblyDependencies: Boolean,
assemblyDependencies: List[String],
noDefaultSubmitDependencies: Boolean,
submitDependencies: List[String],
sparkVersion: String,
@Help("Maximum idle time of spark-submit (time with no output). Exit early if no output from spark-submit for more than this duration. Set to 0 for unlimited. (Default: 0)")
@Value("seconds")
maxIdleTime: Int,
@Recurse
common: CommonOptions
)

View File

@ -0,0 +1,324 @@
package coursier.cli
import java.io.{PrintStream, BufferedReader, File, PipedInputStream, PipedOutputStream, InputStream, InputStreamReader}
import java.net.URLClassLoader
import java.nio.file.Files
import caseapp._
import coursier.{ Attributes, Dependency }
import coursier.cli.spark.{ Assembly, Submit }
import coursier.util.Parse
import scala.util.control.NonFatal
object SparkSubmit {
def scalaSparkVersions(dependencies: Iterable[Dependency]): Either[String, (String, String)] = {
val sparkCoreMods = dependencies.collect {
case dep if dep.module.organization == "org.apache.spark" &&
(dep.module.name == "spark-core_2.10" || dep.module.name == "spark-core_2.11") =>
(dep.module, dep.version)
}
if (sparkCoreMods.isEmpty)
Left("Cannot find spark among dependencies")
else if (sparkCoreMods.size == 1) {
val scalaVersion = sparkCoreMods.head._1.name match {
case "spark-core_2.10" => "2.10"
case "spark-core_2.11" => "2.11"
case _ => throw new Exception("Cannot happen")
}
val sparkVersion = sparkCoreMods.head._2
Right((scalaVersion, sparkVersion))
} else
Left(s"Found several spark code modules among dependencies (${sparkCoreMods.mkString(", ")})")
}
}
/**
* Submits spark applications.
*
* Can be run with no spark distributions around.
*
* @author Alexandre Archambault
* @author Han Ju
*/
@CommandName("spark-submit")
case class SparkSubmit(
@Recurse
options: SparkSubmitOptions
) extends App with ExtraArgsApp {
val helper = new Helper(options.common, remainingArgs)
val jars = helper.fetch(sources = false, javadoc = false)
val (scalaVersion, sparkVersion) =
if (options.sparkVersion.isEmpty)
SparkSubmit.scalaSparkVersions(helper.res.dependencies) match {
case Left(err) =>
Console.err.println(
s"Cannot get spark / scala versions from dependencies: $err\n" +
"Set them via --scala-version or --spark-version"
)
sys.exit(1)
case Right(versions) => versions
}
else
(options.common.scalaVersion, options.sparkVersion)
val assemblyOrError =
if (options.sparkAssembly.isEmpty) {
// FIXME Also vaguely done in Helper and below
val (errors, modVers) = Parse.moduleVersionConfigs(
options.assemblyDependencies,
options.common.scalaVersion
)
val deps = modVers.map {
case (module, version, configOpt) =>
Dependency(
module,
version,
attributes = Attributes(options.common.defaultArtifactType, ""),
configuration = configOpt.getOrElse(options.common.defaultConfiguration),
exclusions = helper.excludes
)
}
if (errors.isEmpty)
Assembly.spark(scalaVersion, sparkVersion, options.noDefaultAssemblyDependencies, deps)
else
Left(s"Cannot parse assembly dependencies:\n${errors.map(" " + _).mkString("\n")}")
} else {
val f = new File(options.sparkAssembly)
if (f.isFile)
Right((f, Nil))
else if (f.exists())
Left(s"${options.sparkAssembly} is not a file")
else
Left(s"${options.sparkAssembly} not found")
}
val (assembly, assemblyJars) = assemblyOrError match {
case Left(err) =>
Console.err.println(s"Cannot get spark assembly: $err")
sys.exit(1)
case Right(res) => res
}
val idx = {
val idx0 = extraArgs.indexOf("--")
if (idx0 < 0)
extraArgs.length
else
idx0
}
assert(idx >= 0)
val sparkOpts = extraArgs.take(idx)
val jobArgs = extraArgs.drop(idx + 1)
val mainClass =
if (options.mainClass.isEmpty)
helper.retainedMainClass
else
options.mainClass
val mainJar = helper
.loader
.loadClass(mainClass) // FIXME Check for errors, provide a nicer error message in that case
.getProtectionDomain
.getCodeSource
.getLocation
.getPath // TODO Safety check: protocol must be file
val (check, extraJars0) = jars.partition(_.getAbsolutePath == mainJar)
val extraJars = extraJars0.filterNot(assemblyJars.toSet)
if (check.isEmpty)
Console.err.println(
s"Warning: cannot find back $mainJar among the dependencies JARs (likely a coursier bug)"
)
val extraSparkOpts = Seq(
"--conf", "spark.yarn.jar=" + assembly.getAbsolutePath
)
val extraJarsOptions =
if (extraJars.isEmpty)
Nil
else
Seq("--jars", extraJars.mkString(","))
val mainClassOptions = Seq("--class", mainClass)
val sparkSubmitOptions = sparkOpts ++ extraSparkOpts ++ extraJarsOptions ++ mainClassOptions ++
Seq(mainJar) ++ jobArgs
val submitCp = Submit.cp(
scalaVersion,
sparkVersion,
options.noDefaultSubmitDependencies,
options.submitDependencies,
options.common
)
val submitLoader = new URLClassLoader(
submitCp.map(_.toURI.toURL).toArray,
helper.baseLoader
)
Launch.run(
submitLoader,
Submit.mainClassName,
sparkSubmitOptions,
options.common.verbosityLevel,
{
if (options.common.verbosityLevel >= 1)
Console.err.println(
s"Launching spark-submit with arguments:\n" +
sparkSubmitOptions.map(" " + _).mkString("\n")
)
OutputHelper.handleOutput(
Some(options.yarnIdFile).filter(_.nonEmpty).map(new File(_)),
Some(options.maxIdleTime).filter(_ > 0)
)
}
)
}
object OutputHelper {
def outputInspectThread(
name: String,
from: InputStream,
to: PrintStream,
handlers: Seq[String => Unit]
) = {
val t = new Thread {
override def run() = {
val in = new BufferedReader(new InputStreamReader(from))
var line: String = null
while ({
line = in.readLine()
line != null
}) {
to.println(line)
handlers.foreach(_(line))
}
}
}
t.setName(name)
t.setDaemon(true)
t
}
def handleOutput(yarnAppFileOpt: Option[File], maxIdleTimeOpt: Option[Int]): Unit = {
var handlers = Seq.empty[String => Unit]
var threads = Seq.empty[Thread]
for (yarnAppFile <- yarnAppFileOpt) {
val Pattern = ".*Application report for ([^ ]+) .*".r
@volatile var written = false
val lock = new AnyRef
def handleMessage(s: String): Unit =
if (!written)
s match {
case Pattern(id) =>
lock.synchronized {
if (!written) {
println(s"Detected YARN app ID $id")
val path = yarnAppFile.toPath
Option(path.getParent).foreach(_.toFile.mkdirs())
Files.write(path, id.getBytes("UTF-8"))
written = true
}
}
case _ =>
}
val f = { line: String =>
try handleMessage(line)
catch {
case NonFatal(_) =>
}
}
handlers = handlers :+ f
}
for (maxIdleTime <- maxIdleTimeOpt if maxIdleTime > 0) {
@volatile var lastMessageTs = -1L
def updateLastMessageTs() = {
lastMessageTs = System.currentTimeMillis()
}
val checkThread = new Thread {
override def run() =
try {
while (true) {
lastMessageTs = -1L
Thread.sleep(maxIdleTime * 1000L)
if (lastMessageTs < 0) {
Console.err.println(s"No output from spark-submit for more than $maxIdleTime s, exiting")
sys.exit(1)
}
}
} catch {
case t: Throwable =>
Console.err.println(s"Caught $t in check spark-submit output thread!")
throw t
}
}
checkThread.setName("check-spark-submit-output")
checkThread.setDaemon(true)
threads = threads :+ checkThread
val f = { line: String =>
updateLastMessageTs()
}
handlers = handlers :+ f
}
def createThread(name: String, replaces: PrintStream, install: PrintStream => Unit): Thread = {
val in = new PipedInputStream
val out = new PipedOutputStream(in)
install(new PrintStream(out))
outputInspectThread(name, in, replaces, handlers)
}
if (handlers.nonEmpty) {
threads = threads ++ Seq(
createThread("inspect-out", System.out, System.setOut),
createThread("inspect-err", System.err, System.setErr)
)
threads.foreach(_.start())
}
}
}

View File

@ -0,0 +1,36 @@
package coursier.cli.spark
import java.io.{File, FileInputStream, FileOutputStream}
import java.util.zip.{ZipInputStream, ZipOutputStream}
import coursier.Dependency
object Assembly {
sealed abstract class Rule extends Product with Serializable
object Rule {
case class Exclude(path: String) extends Rule
case class Append(path: String) extends Rule
}
def make(jars: Seq[File], output: File, rules: Seq[Rule]): Unit = {
val zos = new ZipOutputStream(new FileOutputStream(output))
for (jar <- jars) {
new ZipInputStream(new FileInputStream(jar))
}
???
}
def spark(
scalaVersion: String,
sparkVersion: String,
noDefault: Boolean,
extraDependencies: Seq[Dependency]
): Either[String, (File, Seq[File])] =
throw new Exception("Not implemented: automatic assembly generation")
}

View File

@ -0,0 +1,51 @@
package coursier.cli.spark
import java.io.File
import coursier.cli.{ CommonOptions, Helper }
object Submit {
def cp(
scalaVersion: String,
sparkVersion: String,
noDefault: Boolean,
extraDependencies: Seq[String],
common: CommonOptions
): Seq[File] = {
var extraCp = Seq.empty[File]
for (yarnConf <- sys.env.get("YARN_CONF_DIR") if yarnConf.nonEmpty) {
val f = new File(yarnConf)
if (!f.isDirectory) {
Console.err.println(s"Error: YARN conf path ($yarnConf) is not a directory or doesn't exist.")
sys.exit(1)
}
extraCp = extraCp :+ f
}
def defaultDependencies = Seq(
// FIXME We whould be able to pass these as (parsed) Dependency instances to Helper
s"org.apache.spark::spark-core:$sparkVersion",
s"org.apache.spark::spark-yarn:$sparkVersion"
)
val helper = new Helper(
common.copy(
intransitive = Nil,
classifier = Nil,
scalaVersion = scalaVersion
),
// FIXME We whould be able to pass these as (parsed) Dependency instances to Helper
(if (noDefault) Nil else defaultDependencies) ++ extraDependencies
)
helper.fetch(sources = false, javadoc = false) ++ extraCp
}
def mainClassName = "org.apache.spark.deploy.SparkSubmit"
}

View File

@ -1,19 +0,0 @@
package coursier.cli
import caseapp.{ HelpMessage => Help, ValueDescription => Value, ExtraName => Short, _ }
case class SparkSubmitOptions(
@Short("M")
@Short("main")
mainClass: String,
@Help("If master is yarn-cluster, write YARN app ID to a file. (The ID is deduced from the spark-submit output.)")
@Value("file")
yarnIdFile: String,
@Help("Spark home (default: SPARK_HOME from the environment)")
sparkHome: String,
@Help("Maximum idle time of spark-submit (time with no output). Exit early if no output from spark-submit for more than this duration. Set to 0 for unlimited. (Default: 0)")
@Value("seconds")
maxIdleTime: Int,
@Recurse
common: CommonOptions
)

View File

@ -1,247 +0,0 @@
package coursier.cli
import java.io.{PrintStream, BufferedReader, File, PipedInputStream, PipedOutputStream, InputStream, InputStreamReader}
import java.net.{URL, URLClassLoader}
import java.nio.file.{Files, Paths}
import caseapp._
import org.apache.commons.io.output.TeeOutputStream
import scala.util.control.NonFatal
import org.apache.spark.deploy.{SparkSubmit => SparkMain}
@CommandName("spark-submit")
case class SparkSubmit(
@Recurse
options: SparkSubmitOptions
) extends App with ExtraArgsApp {
val helper = new Helper(options.common, remainingArgs)
val jars = helper.fetch(sources = false, javadoc = false)
val sparkHome =
if (options.sparkHome.isEmpty)
sys.env.getOrElse(
"SPARK_HOME", {
Console.err.println("Error: SPARK_HOME not set and the --spark-home option not given a value.")
sys.exit(1)
}
)
else
options.sparkHome
def searchAssembly(dir: File): Array[File] = {
Option(dir.listFiles()).getOrElse(Array.empty).filter { f =>
f.isFile && f.getName.endsWith(".jar") && f.getName.contains("spark-assembly")
}
}
val sparkAssembly = {
// TODO Make this more reliable (assemblies can be found in other directories I think, this
// must be fine with spark 2.10 too, ...)
// TODO(han) maybe a conf or sys env ???
val dirs = List(
new File(sparkHome + "/assembly/target/scala-2.11"),
new File(sparkHome + "/lib")
)
// take the first assembly jar
dirs.map(searchAssembly)
.foldLeft(Array(): Array[File])(_ ++ _) match {
case Array(assembly) =>
assembly.getAbsolutePath
case Array() =>
throw new Exception(s"No spark assembly found under ${dirs.mkString(",")}")
case jars =>
throw new Exception(s"Found several assembly JARs: ${jars.mkString(",")}")
}
}
val libManaged = {
val dir = new File(sparkHome + "/lib_managed/jars")
if (dir.isDirectory) {
dir.listFiles().toSeq.map(_.getAbsolutePath)
} else
Nil
}
val yarnConfOpt = sys.env.get("YARN_CONF_DIR").filter(_.nonEmpty)
for (yarnConf <- yarnConfOpt if !new File(yarnConf).isDirectory)
throw new Exception(s"Error: YARN conf path ($yarnConf) is not a directory or doesn't exist.")
val cp = Seq(
sparkHome + "/conf",
sparkAssembly
) ++ libManaged ++ yarnConfOpt.toSeq
def addFileToCP(path: String): Unit = {
val file = new File(path)
val method = classOf[URLClassLoader].getDeclaredMethod("addURL", classOf[URL])
method.setAccessible(true)
method.invoke(ClassLoader.getSystemClassLoader(), file.toURI().toURL())
}
// Inject spark's runtime extra classpath (confs, yarn jars etc.) to the current class loader
cp.foreach(addFileToCP)
val idx = extraArgs.indexOf("--")
assert(idx >= 0)
val sparkOpts = extraArgs.take(idx)
val jobArgs = extraArgs.drop(idx + 1)
val mainClass =
if (options.mainClass.isEmpty)
helper.retainedMainClass
else
options.mainClass
val mainJar = helper
.loader
.loadClass(mainClass) // FIXME Check for errors, provide a nicer error message in that case
.getProtectionDomain
.getCodeSource
.getLocation
.getPath // TODO Safety check: protocol must be file
val (check, extraJars) = jars.partition(_.getAbsolutePath == mainJar)
if (check.isEmpty)
Console.err.println(
s"Warning: cannot find back $mainJar among the dependencies JARs (likely a coursier bug)"
)
val extraJarsOptions =
if (extraJars.isEmpty)
Nil
else
Seq("--jars", extraJars.mkString(","))
val mainClassOptions = Seq("--class", mainClass)
val sparkSubmitOptions = sparkOpts ++ extraJarsOptions ++ mainClassOptions ++
Seq(mainJar) ++ jobArgs
Console.err.println(
"Running spark app with extra classpath:\n" +
s"${cp.mkString(File.pathSeparator).map(" "+_).mkString("\n")}\n")
Console.err.println(
s"Running spark app with options:\n${sparkSubmitOptions.map(" "+_).mkString("\n")}\n")
object YarnAppId {
val Pattern = ".*Application report for ([^ ]+) .*".r
val fileOpt = Some(options.yarnIdFile).filter(_.nonEmpty)
@volatile var written = false
val lock = new AnyRef
def handleMessage(s: String): Unit =
if (!written)
s match {
case Pattern(id) =>
lock.synchronized {
if (!written) {
println(s"Detected YARN app ID $id")
for (writeAppIdTo <- fileOpt) {
val path = Paths.get(writeAppIdTo)
Option(path.getParent).foreach(_.toFile.mkdirs())
Files.write(path, id.getBytes("UTF-8"))
}
written = true
}
}
case _ =>
}
}
object IdleChecker {
@volatile var lastMessageTs = -1L
def updateLastMessageTs() = {
lastMessageTs = System.currentTimeMillis()
}
val checkThreadOpt =
if (options.maxIdleTime > 0) {
val checkThread = new Thread {
override def run() =
try {
while (true) {
lastMessageTs = -1L
Thread.sleep(options.maxIdleTime * 1000L)
if (lastMessageTs < 0) {
Console.err.println(s"No output from spark-submit for more than ${options.maxIdleTime} s, exiting")
sys.exit(1)
}
}
} catch {
case t: Throwable =>
Console.err.println(s"Caught $t in check spark-submit output thread!")
throw t
}
}
checkThread.setName("check-spark-submit-output")
checkThread.setDaemon(true)
Some(checkThread)
} else
None
}
// Create a thread that inspects the spark's output
def outputInspectThread(from: InputStream) = {
val t = new Thread {
override def run() = {
val in = new BufferedReader(new InputStreamReader(from))
var line: String = null
while ({
line = in.readLine()
line != null
}) {
if (options.maxIdleTime > 0)
IdleChecker.updateLastMessageTs()
if (YarnAppId.fileOpt.nonEmpty)
try YarnAppId.handleMessage(line)
catch {
case NonFatal(_) =>
}
}
}
}
t.setName("spark-output")
t.setDaemon(true)
t
}
// setup the inspection of spark's output
// redirect stderr to stdout
System.setErr(System.out)
val orig = System.out
val in = new PipedInputStream()
val out = new PipedOutputStream(in)
// multiplexing stdout
val tee = new TeeOutputStream(orig, out)
System.setOut(new PrintStream(tee))
val isPipeThread = outputInspectThread(in)
IdleChecker.checkThreadOpt.foreach(_.start())
isPipeThread.start()
// After all the setup, finally launch spark
SparkMain.main(sparkSubmitOptions.toArray)
}