From eac19c1a66dd23429bb2390554d55f9677f7107f Mon Sep 17 00:00:00 2001 From: Han JU Date: Fri, 2 Sep 2016 18:06:09 +0200 Subject: [PATCH] move SparkSubmit to its own module with spark-core as dependency --- build.sbt | 13 +++++++ .../scala-2.11/coursier/cli/Options.scala | 16 -------- .../src/main/scala/coursier/cli/Options.scala | 19 ++++++++++ .../scala}/coursier/cli/SparkSubmit.scala | 38 +++++++++++++++---- 4 files changed, 62 insertions(+), 24 deletions(-) create mode 100644 spark/src/main/scala/coursier/cli/Options.scala rename {cli/src/main/scala-2.11 => spark/src/main/scala}/coursier/cli/SparkSubmit.scala (84%) diff --git a/build.sbt b/build.sbt index 0b3972fc1..c6dd9de5b 100644 --- a/build.sbt +++ b/build.sbt @@ -561,3 +561,16 @@ lazy val `coursier` = project.in(file(".")) .settings( moduleName := "coursier-root" ) + +val sparkVersion = "1.6.1" + +lazy val spark = project + .dependsOn(cli) + .settings(commonSettings) + .settings(noPublishForScalaVersionSettings("2.10")) + .settings(packAutoSettings) + .settings(proguardSettings) + .settings( + name := "coursier-spark", + libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion + ) diff --git a/cli/src/main/scala-2.11/coursier/cli/Options.scala b/cli/src/main/scala-2.11/coursier/cli/Options.scala index 36a539856..9a5dfeeff 100644 --- a/cli/src/main/scala-2.11/coursier/cli/Options.scala +++ b/cli/src/main/scala-2.11/coursier/cli/Options.scala @@ -194,22 +194,6 @@ case class LaunchOptions( common: CommonOptions ) -case class SparkSubmitOptions( - @Short("M") - @Short("main") - mainClass: String, - @Help("If master is yarn-cluster, write YARN app ID to a file. (The ID is deduced from the spark-submit output.)") - @Value("file") - yarnIdFile: String, - @Help("Spark home (default: SPARK_HOME from the environment)") - sparkHome: String, - @Help("Maximum idle time of spark-submit (time with no output). Exit early if no output from spark-submit for more than this duration. Set to 0 for unlimited. (Default: 0)") - @Value("seconds") - maxIdleTime: Int, - @Recurse - common: CommonOptions -) - case class BootstrapOptions( @Short("M") @Short("main") diff --git a/spark/src/main/scala/coursier/cli/Options.scala b/spark/src/main/scala/coursier/cli/Options.scala new file mode 100644 index 000000000..7602a484c --- /dev/null +++ b/spark/src/main/scala/coursier/cli/Options.scala @@ -0,0 +1,19 @@ +package coursier.cli + +import caseapp.{ HelpMessage => Help, ValueDescription => Value, ExtraName => Short, _ } + +case class SparkSubmitOptions( + @Short("M") + @Short("main") + mainClass: String, + @Help("If master is yarn-cluster, write YARN app ID to a file. (The ID is deduced from the spark-submit output.)") + @Value("file") + yarnIdFile: String, + @Help("Spark home (default: SPARK_HOME from the environment)") + sparkHome: String, + @Help("Maximum idle time of spark-submit (time with no output). Exit early if no output from spark-submit for more than this duration. Set to 0 for unlimited. (Default: 0)") + @Value("seconds") + maxIdleTime: Int, + @Recurse + common: CommonOptions +) \ No newline at end of file diff --git a/cli/src/main/scala-2.11/coursier/cli/SparkSubmit.scala b/spark/src/main/scala/coursier/cli/SparkSubmit.scala similarity index 84% rename from cli/src/main/scala-2.11/coursier/cli/SparkSubmit.scala rename to spark/src/main/scala/coursier/cli/SparkSubmit.scala index 0e02eb8c0..18f185ac0 100644 --- a/cli/src/main/scala-2.11/coursier/cli/SparkSubmit.scala +++ b/spark/src/main/scala/coursier/cli/SparkSubmit.scala @@ -1,7 +1,8 @@ package coursier.cli -import java.io.{ BufferedReader, File, InputStream, InputStreamReader, OutputStream } -import java.nio.file.{ Files, Paths } +import java.io.{BufferedReader, File, InputStream, InputStreamReader, OutputStream} +import java.net.{URL, URLClassLoader} +import java.nio.file.{Files, Paths} import caseapp._ @@ -29,22 +30,31 @@ case class SparkSubmit( else options.sparkHome - val sparkAssembly = { - // TODO Make this more reliable (assemblies can be found in other directories I think, this - // must be fine with spark 2.10 too, ...) - val dir = new File(sparkHome + "/assembly/target/scala-2.11") + def searchAssembly(dir: File): String = { Option(dir.listFiles()).getOrElse(Array.empty).filter { f => - f.isFile && f.getName.endsWith(".jar") + f.isFile && f.getName.endsWith(".jar") && f.getName.contains("spark-assembly") } match { case Array(assemblyFile) => assemblyFile.getAbsolutePath case Array() => throw new Exception(s"No spark assembly found under $dir") case jars => - throw new Exception(s"Found several JARs under $dir") + throw new Exception(s"Found several assembly JARs under $dir: ${jars.mkString(",")}") } } + val sparkAssembly = { + // TODO Make this more reliable (assemblies can be found in other directories I think, this + // must be fine with spark 2.10 too, ...) + // TODO(han) maybe a conf or sys env ??? + val dirs = List( + new File(sparkHome + "/assembly/target/scala-2.11"), + new File(sparkHome + "/lib") + ) + + dirs.map(searchAssembly).head + } + val libManaged = { val dir = new File(sparkHome + "/lib_managed/jars") if (dir.isDirectory) { @@ -63,6 +73,16 @@ case class SparkSubmit( sparkAssembly ) ++ libManaged ++ yarnConfOpt.toSeq + def addFileToCP(path: String): Unit = { + val file = new File(path) + val method = classOf[URLClassLoader].getDeclaredMethod("addURL", classOf[URL]) + method.setAccessible(true) + method.invoke(ClassLoader.getSystemClassLoader(), file.toURI().toURL()) + } + + // Inject spark's runtime extra classpath (confs, yarn jars etc.) to the current class loader + cp.foreach(addFileToCP) + val idx = extraArgs.indexOf("--") assert(idx >= 0) @@ -218,4 +238,6 @@ case class SparkSubmit( sys.exit(exitValue) + // + SparkSubmit.main(sparkSubmitOptions.toArray) }