move SparkSubmit to its own module with spark-core as dependency

This commit is contained in:
Han JU 2016-09-02 18:06:09 +02:00
parent 65a21adbd1
commit eac19c1a66
4 changed files with 62 additions and 24 deletions

View File

@ -561,3 +561,16 @@ lazy val `coursier` = project.in(file("."))
.settings(
moduleName := "coursier-root"
)
val sparkVersion = "1.6.1"
lazy val spark = project
.dependsOn(cli)
.settings(commonSettings)
.settings(noPublishForScalaVersionSettings("2.10"))
.settings(packAutoSettings)
.settings(proguardSettings)
.settings(
name := "coursier-spark",
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion
)

View File

@ -194,22 +194,6 @@ case class LaunchOptions(
common: CommonOptions
)
case class SparkSubmitOptions(
@Short("M")
@Short("main")
mainClass: String,
@Help("If master is yarn-cluster, write YARN app ID to a file. (The ID is deduced from the spark-submit output.)")
@Value("file")
yarnIdFile: String,
@Help("Spark home (default: SPARK_HOME from the environment)")
sparkHome: String,
@Help("Maximum idle time of spark-submit (time with no output). Exit early if no output from spark-submit for more than this duration. Set to 0 for unlimited. (Default: 0)")
@Value("seconds")
maxIdleTime: Int,
@Recurse
common: CommonOptions
)
case class BootstrapOptions(
@Short("M")
@Short("main")

View File

@ -0,0 +1,19 @@
package coursier.cli
import caseapp.{ HelpMessage => Help, ValueDescription => Value, ExtraName => Short, _ }
case class SparkSubmitOptions(
@Short("M")
@Short("main")
mainClass: String,
@Help("If master is yarn-cluster, write YARN app ID to a file. (The ID is deduced from the spark-submit output.)")
@Value("file")
yarnIdFile: String,
@Help("Spark home (default: SPARK_HOME from the environment)")
sparkHome: String,
@Help("Maximum idle time of spark-submit (time with no output). Exit early if no output from spark-submit for more than this duration. Set to 0 for unlimited. (Default: 0)")
@Value("seconds")
maxIdleTime: Int,
@Recurse
common: CommonOptions
)

View File

@ -1,7 +1,8 @@
package coursier.cli
import java.io.{ BufferedReader, File, InputStream, InputStreamReader, OutputStream }
import java.nio.file.{ Files, Paths }
import java.io.{BufferedReader, File, InputStream, InputStreamReader, OutputStream}
import java.net.{URL, URLClassLoader}
import java.nio.file.{Files, Paths}
import caseapp._
@ -29,22 +30,31 @@ case class SparkSubmit(
else
options.sparkHome
val sparkAssembly = {
// TODO Make this more reliable (assemblies can be found in other directories I think, this
// must be fine with spark 2.10 too, ...)
val dir = new File(sparkHome + "/assembly/target/scala-2.11")
def searchAssembly(dir: File): String = {
Option(dir.listFiles()).getOrElse(Array.empty).filter { f =>
f.isFile && f.getName.endsWith(".jar")
f.isFile && f.getName.endsWith(".jar") && f.getName.contains("spark-assembly")
} match {
case Array(assemblyFile) =>
assemblyFile.getAbsolutePath
case Array() =>
throw new Exception(s"No spark assembly found under $dir")
case jars =>
throw new Exception(s"Found several JARs under $dir")
throw new Exception(s"Found several assembly JARs under $dir: ${jars.mkString(",")}")
}
}
val sparkAssembly = {
// TODO Make this more reliable (assemblies can be found in other directories I think, this
// must be fine with spark 2.10 too, ...)
// TODO(han) maybe a conf or sys env ???
val dirs = List(
new File(sparkHome + "/assembly/target/scala-2.11"),
new File(sparkHome + "/lib")
)
dirs.map(searchAssembly).head
}
val libManaged = {
val dir = new File(sparkHome + "/lib_managed/jars")
if (dir.isDirectory) {
@ -63,6 +73,16 @@ case class SparkSubmit(
sparkAssembly
) ++ libManaged ++ yarnConfOpt.toSeq
def addFileToCP(path: String): Unit = {
val file = new File(path)
val method = classOf[URLClassLoader].getDeclaredMethod("addURL", classOf[URL])
method.setAccessible(true)
method.invoke(ClassLoader.getSystemClassLoader(), file.toURI().toURL())
}
// Inject spark's runtime extra classpath (confs, yarn jars etc.) to the current class loader
cp.foreach(addFileToCP)
val idx = extraArgs.indexOf("--")
assert(idx >= 0)
@ -218,4 +238,6 @@ case class SparkSubmit(
sys.exit(exitValue)
//
SparkSubmit.main(sparkSubmitOptions.toArray)
}