diff --git a/build.sbt b/build.sbt index c6dd9de5b..0b3972fc1 100644 --- a/build.sbt +++ b/build.sbt @@ -561,16 +561,3 @@ lazy val `coursier` = project.in(file(".")) .settings( moduleName := "coursier-root" ) - -val sparkVersion = "1.6.1" - -lazy val spark = project - .dependsOn(cli) - .settings(commonSettings) - .settings(noPublishForScalaVersionSettings("2.10")) - .settings(packAutoSettings) - .settings(proguardSettings) - .settings( - name := "coursier-spark", - libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion - ) diff --git a/cli/src/main/scala-2.11/coursier/cli/Helper.scala b/cli/src/main/scala-2.11/coursier/cli/Helper.scala index 6f898c3e7..461b8eef8 100644 --- a/cli/src/main/scala-2.11/coursier/cli/Helper.scala +++ b/cli/src/main/scala-2.11/coursier/cli/Helper.scala @@ -581,35 +581,36 @@ class Helper( files0 } - lazy val (parentLoader, filteredFiles) = { + def contextLoader = Thread.currentThread().getContextClassLoader - val contextLoader = Thread.currentThread().getContextClassLoader + // TODO Would ClassLoader.getSystemClassLoader be better here? + val baseLoader: ClassLoader = + Launch.mainClassLoader(contextLoader) + .flatMap(cl => Option(cl.getParent)) + .getOrElse { + // proguarded -> no risk of conflicts, no absolute need to find a specific ClassLoader + val isProguarded = Try(contextLoader.loadClass("coursier.cli.Launch")).isFailure + if (warnBaseLoaderNotFound && !isProguarded && common.verbosityLevel >= 0) + Console.err.println( + "Warning: cannot find the main ClassLoader that launched coursier.\n" + + "Was coursier launched by its main launcher? " + + "The ClassLoader of the application that is about to be launched will be intertwined " + + "with the one of coursier, which may be a problem if their dependencies conflict." + ) + contextLoader + } + + lazy val (parentLoader, filteredFiles) = { val files0 = fetch(sources = false, javadoc = false) - val parentLoader0: ClassLoader = - Launch.mainClassLoader(contextLoader) - .flatMap(cl => Option(cl.getParent)) - .getOrElse { - // proguarded -> no risk of conflicts, no absolute need to find a specific ClassLoader - val isProguarded = Try(contextLoader.loadClass("coursier.cli.Launch")).isFailure - if (warnBaseLoaderNotFound && !isProguarded && common.verbosityLevel >= 0) - Console.err.println( - "Warning: cannot find the main ClassLoader that launched coursier.\n" + - "Was coursier launched by its main launcher? " + - "The ClassLoader of the application that is about to be launched will be intertwined " + - "with the one of coursier, which may be a problem if their dependencies conflict." - ) - contextLoader - } - if (isolated.isolated.isEmpty) - (parentLoader0, files0) + (baseLoader, files0) else { val isolatedDeps = isolated.isolatedDeps(common.defaultArtifactType, common.scalaVersion) - val (isolatedLoader, filteredFiles0) = isolated.targets.foldLeft((parentLoader0, files0)) { + val (isolatedLoader, filteredFiles0) = isolated.targets.foldLeft((baseLoader, files0)) { case ((parent, files0), target) => // FIXME These were already fetched above diff --git a/cli/src/main/scala-2.11/coursier/cli/Launch.scala b/cli/src/main/scala-2.11/coursier/cli/Launch.scala index b4731215a..b7cc971e4 100644 --- a/cli/src/main/scala-2.11/coursier/cli/Launch.scala +++ b/cli/src/main/scala-2.11/coursier/cli/Launch.scala @@ -33,6 +33,43 @@ object Launch { mainClassLoader(cl.getParent) } + def run( + loader: ClassLoader, + mainClass: String, + args: Seq[String], + verbosity: Int, + beforeMain: => Unit = () + ): Unit = { + + val cls = + try loader.loadClass(mainClass) + catch { case e: ClassNotFoundException => + Helper.errPrintln(s"Error: class $mainClass not found") + sys.exit(255) + } + val method = + try cls.getMethod("main", classOf[Array[String]]) + catch { case e: NoSuchMethodException => + Helper.errPrintln(s"Error: method main not found in $mainClass") + sys.exit(255) + } + method.setAccessible(true) + + if (verbosity >= 2) + Helper.errPrintln(s"Launching $mainClass ${args.mkString(" ")}") + else if (verbosity == 1) + Helper.errPrintln(s"Launching") + + beforeMain + + Thread.currentThread().setContextClassLoader(loader) + try method.invoke(null, args.toArray) + catch { + case e: java.lang.reflect.InvocationTargetException => + throw Option(e.getCause).getOrElse(e) + } + } + } class IsolatedClassLoader( @@ -86,29 +123,10 @@ case class Launch( else options.mainClass - val cls = - try helper.loader.loadClass(mainClass) - catch { case e: ClassNotFoundException => - Helper.errPrintln(s"Error: class $mainClass not found") - sys.exit(255) - } - val method = - try cls.getMethod("main", classOf[Array[String]]) - catch { case e: NoSuchMethodException => - Helper.errPrintln(s"Error: method main not found in $mainClass") - sys.exit(255) - } - method.setAccessible(true) - - if (options.common.verbosityLevel >= 2) - Helper.errPrintln(s"Launching $mainClass ${userArgs.mkString(" ")}") - else if (options.common.verbosityLevel == 1) - Helper.errPrintln(s"Launching") - - Thread.currentThread().setContextClassLoader(helper.loader) - try method.invoke(null, userArgs.toArray) - catch { - case e: java.lang.reflect.InvocationTargetException => - throw Option(e.getCause).getOrElse(e) - } + Launch.run( + helper.loader, + mainClass, + userArgs, + options.common.verbosityLevel + ) } \ No newline at end of file diff --git a/cli/src/main/scala-2.11/coursier/cli/Options.scala b/cli/src/main/scala-2.11/coursier/cli/Options.scala index 9a5dfeeff..7359826f3 100644 --- a/cli/src/main/scala-2.11/coursier/cli/Options.scala +++ b/cli/src/main/scala-2.11/coursier/cli/Options.scala @@ -220,3 +220,25 @@ case class BootstrapOptions( @Recurse common: CommonOptions ) + +case class SparkSubmitOptions( + @Short("M") + @Short("main") + @Help("Main class to be launched (optional if in manifest)") + mainClass: String, + @Help("If master is yarn-cluster, write YARN app ID to a file. (The ID is deduced from the spark-submit output.)") + @Value("file") + yarnIdFile: String, + @Help("Spark assembly. If empty, automatically generate (default: empty)") + sparkAssembly: String, + noDefaultAssemblyDependencies: Boolean, + assemblyDependencies: List[String], + noDefaultSubmitDependencies: Boolean, + submitDependencies: List[String], + sparkVersion: String, + @Help("Maximum idle time of spark-submit (time with no output). Exit early if no output from spark-submit for more than this duration. Set to 0 for unlimited. (Default: 0)") + @Value("seconds") + maxIdleTime: Int, + @Recurse + common: CommonOptions +) \ No newline at end of file diff --git a/cli/src/main/scala-2.11/coursier/cli/SparkSubmit.scala b/cli/src/main/scala-2.11/coursier/cli/SparkSubmit.scala new file mode 100644 index 000000000..3d6a79584 --- /dev/null +++ b/cli/src/main/scala-2.11/coursier/cli/SparkSubmit.scala @@ -0,0 +1,324 @@ +package coursier.cli + +import java.io.{PrintStream, BufferedReader, File, PipedInputStream, PipedOutputStream, InputStream, InputStreamReader} +import java.net.URLClassLoader +import java.nio.file.Files + +import caseapp._ + +import coursier.{ Attributes, Dependency } +import coursier.cli.spark.{ Assembly, Submit } +import coursier.util.Parse + +import scala.util.control.NonFatal + +object SparkSubmit { + + def scalaSparkVersions(dependencies: Iterable[Dependency]): Either[String, (String, String)] = { + + val sparkCoreMods = dependencies.collect { + case dep if dep.module.organization == "org.apache.spark" && + (dep.module.name == "spark-core_2.10" || dep.module.name == "spark-core_2.11") => + (dep.module, dep.version) + } + + if (sparkCoreMods.isEmpty) + Left("Cannot find spark among dependencies") + else if (sparkCoreMods.size == 1) { + val scalaVersion = sparkCoreMods.head._1.name match { + case "spark-core_2.10" => "2.10" + case "spark-core_2.11" => "2.11" + case _ => throw new Exception("Cannot happen") + } + + val sparkVersion = sparkCoreMods.head._2 + + Right((scalaVersion, sparkVersion)) + } else + Left(s"Found several spark code modules among dependencies (${sparkCoreMods.mkString(", ")})") + + } + +} + +/** + * Submits spark applications. + * + * Can be run with no spark distributions around. + * + * @author Alexandre Archambault + * @author Han Ju + */ +@CommandName("spark-submit") +case class SparkSubmit( + @Recurse + options: SparkSubmitOptions +) extends App with ExtraArgsApp { + + val helper = new Helper(options.common, remainingArgs) + val jars = helper.fetch(sources = false, javadoc = false) + + val (scalaVersion, sparkVersion) = + if (options.sparkVersion.isEmpty) + SparkSubmit.scalaSparkVersions(helper.res.dependencies) match { + case Left(err) => + Console.err.println( + s"Cannot get spark / scala versions from dependencies: $err\n" + + "Set them via --scala-version or --spark-version" + ) + sys.exit(1) + case Right(versions) => versions + } + else + (options.common.scalaVersion, options.sparkVersion) + + val assemblyOrError = + if (options.sparkAssembly.isEmpty) { + + // FIXME Also vaguely done in Helper and below + + val (errors, modVers) = Parse.moduleVersionConfigs( + options.assemblyDependencies, + options.common.scalaVersion + ) + + val deps = modVers.map { + case (module, version, configOpt) => + Dependency( + module, + version, + attributes = Attributes(options.common.defaultArtifactType, ""), + configuration = configOpt.getOrElse(options.common.defaultConfiguration), + exclusions = helper.excludes + ) + } + + if (errors.isEmpty) + Assembly.spark(scalaVersion, sparkVersion, options.noDefaultAssemblyDependencies, deps) + else + Left(s"Cannot parse assembly dependencies:\n${errors.map(" " + _).mkString("\n")}") + } else { + val f = new File(options.sparkAssembly) + if (f.isFile) + Right((f, Nil)) + else if (f.exists()) + Left(s"${options.sparkAssembly} is not a file") + else + Left(s"${options.sparkAssembly} not found") + } + + val (assembly, assemblyJars) = assemblyOrError match { + case Left(err) => + Console.err.println(s"Cannot get spark assembly: $err") + sys.exit(1) + case Right(res) => res + } + + + val idx = { + val idx0 = extraArgs.indexOf("--") + if (idx0 < 0) + extraArgs.length + else + idx0 + } + + assert(idx >= 0) + + val sparkOpts = extraArgs.take(idx) + val jobArgs = extraArgs.drop(idx + 1) + + val mainClass = + if (options.mainClass.isEmpty) + helper.retainedMainClass + else + options.mainClass + + val mainJar = helper + .loader + .loadClass(mainClass) // FIXME Check for errors, provide a nicer error message in that case + .getProtectionDomain + .getCodeSource + .getLocation + .getPath // TODO Safety check: protocol must be file + + val (check, extraJars0) = jars.partition(_.getAbsolutePath == mainJar) + + val extraJars = extraJars0.filterNot(assemblyJars.toSet) + + if (check.isEmpty) + Console.err.println( + s"Warning: cannot find back $mainJar among the dependencies JARs (likely a coursier bug)" + ) + + val extraSparkOpts = Seq( + "--conf", "spark.yarn.jar=" + assembly.getAbsolutePath + ) + + val extraJarsOptions = + if (extraJars.isEmpty) + Nil + else + Seq("--jars", extraJars.mkString(",")) + + val mainClassOptions = Seq("--class", mainClass) + + val sparkSubmitOptions = sparkOpts ++ extraSparkOpts ++ extraJarsOptions ++ mainClassOptions ++ + Seq(mainJar) ++ jobArgs + + val submitCp = Submit.cp( + scalaVersion, + sparkVersion, + options.noDefaultSubmitDependencies, + options.submitDependencies, + options.common + ) + + val submitLoader = new URLClassLoader( + submitCp.map(_.toURI.toURL).toArray, + helper.baseLoader + ) + + Launch.run( + submitLoader, + Submit.mainClassName, + sparkSubmitOptions, + options.common.verbosityLevel, + { + if (options.common.verbosityLevel >= 1) + Console.err.println( + s"Launching spark-submit with arguments:\n" + + sparkSubmitOptions.map(" " + _).mkString("\n") + ) + + OutputHelper.handleOutput( + Some(options.yarnIdFile).filter(_.nonEmpty).map(new File(_)), + Some(options.maxIdleTime).filter(_ > 0) + ) + } + ) +} + + +object OutputHelper { + + def outputInspectThread( + name: String, + from: InputStream, + to: PrintStream, + handlers: Seq[String => Unit] + ) = { + + val t = new Thread { + override def run() = { + val in = new BufferedReader(new InputStreamReader(from)) + var line: String = null + while ({ + line = in.readLine() + line != null + }) { + to.println(line) + handlers.foreach(_(line)) + } + } + } + + t.setName(name) + t.setDaemon(true) + + t + } + + + def handleOutput(yarnAppFileOpt: Option[File], maxIdleTimeOpt: Option[Int]): Unit = { + + var handlers = Seq.empty[String => Unit] + var threads = Seq.empty[Thread] + + for (yarnAppFile <- yarnAppFileOpt) { + + val Pattern = ".*Application report for ([^ ]+) .*".r + + @volatile var written = false + val lock = new AnyRef + def handleMessage(s: String): Unit = + if (!written) + s match { + case Pattern(id) => + lock.synchronized { + if (!written) { + println(s"Detected YARN app ID $id") + val path = yarnAppFile.toPath + Option(path.getParent).foreach(_.toFile.mkdirs()) + Files.write(path, id.getBytes("UTF-8")) + written = true + } + } + case _ => + } + + val f = { line: String => + try handleMessage(line) + catch { + case NonFatal(_) => + } + } + + handlers = handlers :+ f + } + + for (maxIdleTime <- maxIdleTimeOpt if maxIdleTime > 0) { + + @volatile var lastMessageTs = -1L + + def updateLastMessageTs() = { + lastMessageTs = System.currentTimeMillis() + } + + val checkThread = new Thread { + override def run() = + try { + while (true) { + lastMessageTs = -1L + Thread.sleep(maxIdleTime * 1000L) + if (lastMessageTs < 0) { + Console.err.println(s"No output from spark-submit for more than $maxIdleTime s, exiting") + sys.exit(1) + } + } + } catch { + case t: Throwable => + Console.err.println(s"Caught $t in check spark-submit output thread!") + throw t + } + } + + checkThread.setName("check-spark-submit-output") + checkThread.setDaemon(true) + + threads = threads :+ checkThread + + val f = { line: String => + updateLastMessageTs() + } + + handlers = handlers :+ f + } + + def createThread(name: String, replaces: PrintStream, install: PrintStream => Unit): Thread = { + val in = new PipedInputStream + val out = new PipedOutputStream(in) + install(new PrintStream(out)) + outputInspectThread(name, in, replaces, handlers) + } + + if (handlers.nonEmpty) { + threads = threads ++ Seq( + createThread("inspect-out", System.out, System.setOut), + createThread("inspect-err", System.err, System.setErr) + ) + + threads.foreach(_.start()) + } + } +} \ No newline at end of file diff --git a/cli/src/main/scala-2.11/coursier/cli/spark/Assembly.scala b/cli/src/main/scala-2.11/coursier/cli/spark/Assembly.scala new file mode 100644 index 000000000..18d6cafd9 --- /dev/null +++ b/cli/src/main/scala-2.11/coursier/cli/spark/Assembly.scala @@ -0,0 +1,36 @@ +package coursier.cli.spark + +import java.io.{File, FileInputStream, FileOutputStream} +import java.util.zip.{ZipInputStream, ZipOutputStream} + +import coursier.Dependency + +object Assembly { + + sealed abstract class Rule extends Product with Serializable + + object Rule { + case class Exclude(path: String) extends Rule + case class Append(path: String) extends Rule + } + + def make(jars: Seq[File], output: File, rules: Seq[Rule]): Unit = { + + val zos = new ZipOutputStream(new FileOutputStream(output)) + + for (jar <- jars) { + new ZipInputStream(new FileInputStream(jar)) + } + + ??? + } + + def spark( + scalaVersion: String, + sparkVersion: String, + noDefault: Boolean, + extraDependencies: Seq[Dependency] + ): Either[String, (File, Seq[File])] = + throw new Exception("Not implemented: automatic assembly generation") + +} diff --git a/cli/src/main/scala-2.11/coursier/cli/spark/Submit.scala b/cli/src/main/scala-2.11/coursier/cli/spark/Submit.scala new file mode 100644 index 000000000..887dec1af --- /dev/null +++ b/cli/src/main/scala-2.11/coursier/cli/spark/Submit.scala @@ -0,0 +1,51 @@ +package coursier.cli.spark + +import java.io.File + +import coursier.cli.{ CommonOptions, Helper } + +object Submit { + + def cp( + scalaVersion: String, + sparkVersion: String, + noDefault: Boolean, + extraDependencies: Seq[String], + common: CommonOptions + ): Seq[File] = { + + var extraCp = Seq.empty[File] + + for (yarnConf <- sys.env.get("YARN_CONF_DIR") if yarnConf.nonEmpty) { + val f = new File(yarnConf) + + if (!f.isDirectory) { + Console.err.println(s"Error: YARN conf path ($yarnConf) is not a directory or doesn't exist.") + sys.exit(1) + } + + extraCp = extraCp :+ f + } + + def defaultDependencies = Seq( + // FIXME We whould be able to pass these as (parsed) Dependency instances to Helper + s"org.apache.spark::spark-core:$sparkVersion", + s"org.apache.spark::spark-yarn:$sparkVersion" + ) + + val helper = new Helper( + common.copy( + intransitive = Nil, + classifier = Nil, + scalaVersion = scalaVersion + ), + // FIXME We whould be able to pass these as (parsed) Dependency instances to Helper + (if (noDefault) Nil else defaultDependencies) ++ extraDependencies + ) + + helper.fetch(sources = false, javadoc = false) ++ extraCp + } + + def mainClassName = "org.apache.spark.deploy.SparkSubmit" + +} diff --git a/spark/src/main/scala/coursier/cli/Options.scala b/spark/src/main/scala/coursier/cli/Options.scala deleted file mode 100644 index 7602a484c..000000000 --- a/spark/src/main/scala/coursier/cli/Options.scala +++ /dev/null @@ -1,19 +0,0 @@ -package coursier.cli - -import caseapp.{ HelpMessage => Help, ValueDescription => Value, ExtraName => Short, _ } - -case class SparkSubmitOptions( - @Short("M") - @Short("main") - mainClass: String, - @Help("If master is yarn-cluster, write YARN app ID to a file. (The ID is deduced from the spark-submit output.)") - @Value("file") - yarnIdFile: String, - @Help("Spark home (default: SPARK_HOME from the environment)") - sparkHome: String, - @Help("Maximum idle time of spark-submit (time with no output). Exit early if no output from spark-submit for more than this duration. Set to 0 for unlimited. (Default: 0)") - @Value("seconds") - maxIdleTime: Int, - @Recurse - common: CommonOptions -) \ No newline at end of file diff --git a/spark/src/main/scala/coursier/cli/SparkSubmit.scala b/spark/src/main/scala/coursier/cli/SparkSubmit.scala deleted file mode 100644 index 79d2f0b00..000000000 --- a/spark/src/main/scala/coursier/cli/SparkSubmit.scala +++ /dev/null @@ -1,247 +0,0 @@ -package coursier.cli - -import java.io.{PrintStream, BufferedReader, File, PipedInputStream, PipedOutputStream, InputStream, InputStreamReader} -import java.net.{URL, URLClassLoader} -import java.nio.file.{Files, Paths} - -import caseapp._ -import org.apache.commons.io.output.TeeOutputStream - -import scala.util.control.NonFatal - -import org.apache.spark.deploy.{SparkSubmit => SparkMain} - -@CommandName("spark-submit") -case class SparkSubmit( - @Recurse - options: SparkSubmitOptions -) extends App with ExtraArgsApp { - - val helper = new Helper(options.common, remainingArgs) - - val jars = helper.fetch(sources = false, javadoc = false) - - - val sparkHome = - if (options.sparkHome.isEmpty) - sys.env.getOrElse( - "SPARK_HOME", { - Console.err.println("Error: SPARK_HOME not set and the --spark-home option not given a value.") - sys.exit(1) - } - ) - else - options.sparkHome - - def searchAssembly(dir: File): Array[File] = { - Option(dir.listFiles()).getOrElse(Array.empty).filter { f => - f.isFile && f.getName.endsWith(".jar") && f.getName.contains("spark-assembly") - } - } - - val sparkAssembly = { - // TODO Make this more reliable (assemblies can be found in other directories I think, this - // must be fine with spark 2.10 too, ...) - // TODO(han) maybe a conf or sys env ??? - val dirs = List( - new File(sparkHome + "/assembly/target/scala-2.11"), - new File(sparkHome + "/lib") - ) - - // take the first assembly jar - dirs.map(searchAssembly) - .foldLeft(Array(): Array[File])(_ ++ _) match { - case Array(assembly) => - assembly.getAbsolutePath - case Array() => - throw new Exception(s"No spark assembly found under ${dirs.mkString(",")}") - case jars => - throw new Exception(s"Found several assembly JARs: ${jars.mkString(",")}") - } - } - - val libManaged = { - val dir = new File(sparkHome + "/lib_managed/jars") - if (dir.isDirectory) { - dir.listFiles().toSeq.map(_.getAbsolutePath) - } else - Nil - } - - val yarnConfOpt = sys.env.get("YARN_CONF_DIR").filter(_.nonEmpty) - - for (yarnConf <- yarnConfOpt if !new File(yarnConf).isDirectory) - throw new Exception(s"Error: YARN conf path ($yarnConf) is not a directory or doesn't exist.") - - val cp = Seq( - sparkHome + "/conf", - sparkAssembly - ) ++ libManaged ++ yarnConfOpt.toSeq - - def addFileToCP(path: String): Unit = { - val file = new File(path) - val method = classOf[URLClassLoader].getDeclaredMethod("addURL", classOf[URL]) - method.setAccessible(true) - method.invoke(ClassLoader.getSystemClassLoader(), file.toURI().toURL()) - } - - // Inject spark's runtime extra classpath (confs, yarn jars etc.) to the current class loader - cp.foreach(addFileToCP) - - val idx = extraArgs.indexOf("--") - assert(idx >= 0) - - val sparkOpts = extraArgs.take(idx) - val jobArgs = extraArgs.drop(idx + 1) - - val mainClass = - if (options.mainClass.isEmpty) - helper.retainedMainClass - else - options.mainClass - - val mainJar = helper - .loader - .loadClass(mainClass) // FIXME Check for errors, provide a nicer error message in that case - .getProtectionDomain - .getCodeSource - .getLocation - .getPath // TODO Safety check: protocol must be file - - val (check, extraJars) = jars.partition(_.getAbsolutePath == mainJar) - - if (check.isEmpty) - Console.err.println( - s"Warning: cannot find back $mainJar among the dependencies JARs (likely a coursier bug)" - ) - - val extraJarsOptions = - if (extraJars.isEmpty) - Nil - else - Seq("--jars", extraJars.mkString(",")) - - val mainClassOptions = Seq("--class", mainClass) - - val sparkSubmitOptions = sparkOpts ++ extraJarsOptions ++ mainClassOptions ++ - Seq(mainJar) ++ jobArgs - - Console.err.println( - "Running spark app with extra classpath:\n" + - s"${cp.mkString(File.pathSeparator).map(" "+_).mkString("\n")}\n") - - Console.err.println( - s"Running spark app with options:\n${sparkSubmitOptions.map(" "+_).mkString("\n")}\n") - - object YarnAppId { - val Pattern = ".*Application report for ([^ ]+) .*".r - - val fileOpt = Some(options.yarnIdFile).filter(_.nonEmpty) - - @volatile var written = false - val lock = new AnyRef - def handleMessage(s: String): Unit = - if (!written) - s match { - case Pattern(id) => - lock.synchronized { - if (!written) { - println(s"Detected YARN app ID $id") - for (writeAppIdTo <- fileOpt) { - val path = Paths.get(writeAppIdTo) - Option(path.getParent).foreach(_.toFile.mkdirs()) - Files.write(path, id.getBytes("UTF-8")) - } - written = true - } - } - case _ => - } - } - - object IdleChecker { - - @volatile var lastMessageTs = -1L - - def updateLastMessageTs() = { - lastMessageTs = System.currentTimeMillis() - } - - val checkThreadOpt = - if (options.maxIdleTime > 0) { - - val checkThread = new Thread { - override def run() = - try { - while (true) { - lastMessageTs = -1L - Thread.sleep(options.maxIdleTime * 1000L) - if (lastMessageTs < 0) { - Console.err.println(s"No output from spark-submit for more than ${options.maxIdleTime} s, exiting") - sys.exit(1) - } - } - } catch { - case t: Throwable => - Console.err.println(s"Caught $t in check spark-submit output thread!") - throw t - } - } - - checkThread.setName("check-spark-submit-output") - checkThread.setDaemon(true) - - Some(checkThread) - } else - None - } - - // Create a thread that inspects the spark's output - def outputInspectThread(from: InputStream) = { - val t = new Thread { - override def run() = { - val in = new BufferedReader(new InputStreamReader(from)) - var line: String = null - while ({ - line = in.readLine() - line != null - }) { - if (options.maxIdleTime > 0) - IdleChecker.updateLastMessageTs() - - if (YarnAppId.fileOpt.nonEmpty) - try YarnAppId.handleMessage(line) - catch { - case NonFatal(_) => - } - } - } - } - - t.setName("spark-output") - t.setDaemon(true) - - t - } - - // setup the inspection of spark's output - // redirect stderr to stdout - System.setErr(System.out) - - val orig = System.out - - val in = new PipedInputStream() - val out = new PipedOutputStream(in) - - // multiplexing stdout - val tee = new TeeOutputStream(orig, out) - System.setOut(new PrintStream(tee)) - - val isPipeThread = outputInspectThread(in) - - IdleChecker.checkThreadOpt.foreach(_.start()) - isPipeThread.start() - - // After all the setup, finally launch spark - SparkMain.main(sparkSubmitOptions.toArray) -} \ No newline at end of file