Merge pull request #396 from alexarchambault/topic/submit-spark-2

Add support for Spark 2 in spark-submit command
This commit is contained in:
Alexandre Archambault 2016-11-18 11:52:04 +01:00 committed by GitHub
commit 8813a31cd9
3 changed files with 113 additions and 42 deletions

View File

@ -264,13 +264,17 @@ case class SparkSubmitOptions(
@Help("If master is yarn-cluster, write YARN app ID to a file. (The ID is deduced from the spark-submit output.)") @Help("If master is yarn-cluster, write YARN app ID to a file. (The ID is deduced from the spark-submit output.)")
@Value("file") @Value("file")
yarnIdFile: String = "", yarnIdFile: String = "",
@Help("Spark assembly. If empty, automatically generate (default: empty)") @Help("Generate Spark Yarn assembly (Spark 1.x) or fetch Spark Yarn jars (Spark 2.x), and supply those to Spark via conf. (Default: true)")
sparkAssembly: String = "", autoAssembly: Boolean = true,
noDefaultAssemblyDependencies: Boolean = false, @Help("Include default dependencies in Spark Yarn assembly or jars (see --auto-assembly). If --auto-assembly is false, the corresponding dependencies will still be shunted from the job classpath if this option is true. (Default: same as --auto-assembly)")
defaultAssemblyDependencies: Option[Boolean] = None,
assemblyDependencies: List[String] = Nil, assemblyDependencies: List[String] = Nil,
noDefaultSubmitDependencies: Boolean = false, noDefaultSubmitDependencies: Boolean = false,
submitDependencies: List[String] = Nil, submitDependencies: List[String] = Nil,
sparkVersion: String = "", @Help("Spark version - if empty, deduced from the job classpath. (Default: empty)")
sparkVersion: String = "",
@Help("YARN version - only used with Spark 2. (Default: 2.7.3)")
yarnVersion: String = "2.7.3",
@Help("Maximum idle time of spark-submit (time with no output). Exit early if no output from spark-submit for more than this duration. Set to 0 for unlimited. (Default: 0)") @Help("Maximum idle time of spark-submit (time with no output). Exit early if no output from spark-submit for more than this duration. Set to 0 for unlimited. (Default: 0)")
@Value("seconds") @Value("seconds")
maxIdleTime: Int = 0, maxIdleTime: Int = 0,

View File

@ -90,31 +90,51 @@ case class SparkSubmit(
else else
(options.common.scalaVersion, options.sparkVersion) (options.common.scalaVersion, options.sparkVersion)
val assemblyOrError = val (sparkYarnExtraConf, sparkBaseJars) =
if (options.sparkAssembly.isEmpty) if (!options.autoAssembly || sparkVersion.startsWith("2.")) {
Assembly.spark(
val assemblyJars = Assembly.sparkJars(
scalaVersion, scalaVersion,
sparkVersion, sparkVersion,
options.noDefaultAssemblyDependencies, options.yarnVersion,
options.defaultAssemblyDependencies.getOrElse(options.autoAssembly),
options.assemblyDependencies.flatMap(_.split(",")).filter(_.nonEmpty), options.assemblyDependencies.flatMap(_.split(",")).filter(_.nonEmpty),
options.common options.common
) )
else {
val f = new File(options.sparkAssembly)
if (f.isFile)
Right((f, Nil))
else if (f.exists())
Left(s"${options.sparkAssembly} is not a file")
else
Left(s"${options.sparkAssembly} not found")
}
val (assembly, assemblyJars) = assemblyOrError match { val extraConf =
case Left(err) => if (options.autoAssembly && sparkVersion.startsWith("2."))
Console.err.println(s"Cannot get spark assembly: $err") Seq(
sys.exit(1) "spark.yarn.jars" -> assemblyJars.map(_.getAbsolutePath).mkString(",")
case Right(res) => res )
} else
Nil
(extraConf, assemblyJars)
} else {
val assemblyAndJarsOrError = Assembly.spark(
scalaVersion,
sparkVersion,
options.yarnVersion,
options.defaultAssemblyDependencies.getOrElse(true),
options.assemblyDependencies.flatMap(_.split(",")).filter(_.nonEmpty),
options.common
)
val (assembly, assemblyJars) = assemblyAndJarsOrError match {
case Left(err) =>
Console.err.println(s"Cannot get spark assembly: $err")
sys.exit(1)
case Right(res) => res
}
val extraConf = Seq(
"spark.yarn.jar" -> assembly.getAbsolutePath
)
(extraConf, assemblyJars)
}
val idx = { val idx = {
@ -146,16 +166,18 @@ case class SparkSubmit(
val (check, extraJars0) = jars.partition(_.getAbsolutePath == mainJar) val (check, extraJars0) = jars.partition(_.getAbsolutePath == mainJar)
val extraJars = extraJars0.filterNot(assemblyJars.toSet) val extraJars = extraJars0.filterNot(sparkBaseJars.toSet)
if (check.isEmpty) if (check.isEmpty)
Console.err.println( Console.err.println(
s"Warning: cannot find back $mainJar among the dependencies JARs (likely a coursier bug)" s"Warning: cannot find back $mainJar among the dependencies JARs (likely a coursier bug)"
) )
val extraSparkOpts = Seq( val extraSparkOpts = sparkYarnExtraConf.flatMap {
"--conf", "spark.yarn.jar=" + assembly.getAbsolutePath case (k, v) => Seq(
) "--conf", s"$k=$v"
)
}
val extraJarsOptions = val extraJarsOptions =
if (extraJars.isEmpty) if (extraJars.isEmpty)

View File

@ -140,32 +140,77 @@ object Assembly {
Rule.ExcludePattern("META-INF/.*\\.[rR][sS][aA]") Rule.ExcludePattern("META-INF/.*\\.[rR][sS][aA]")
) )
def sparkAssemblyDependencies( def sparkBaseDependencies(
scalaVersion: String, scalaVersion: String,
sparkVersion: String sparkVersion: String,
) = Seq( yarnVersion: String
s"org.apache.spark:spark-core_$scalaVersion:$sparkVersion", ) =
s"org.apache.spark:spark-bagel_$scalaVersion:$sparkVersion", if (sparkVersion.startsWith("2."))
s"org.apache.spark:spark-mllib_$scalaVersion:$sparkVersion", Seq(
s"org.apache.spark:spark-streaming_$scalaVersion:$sparkVersion", s"org.apache.spark::spark-hive-thriftserver:$sparkVersion",
s"org.apache.spark:spark-graphx_$scalaVersion:$sparkVersion", s"org.apache.spark::spark-repl:$sparkVersion",
s"org.apache.spark:spark-sql_$scalaVersion:$sparkVersion", s"org.apache.spark::spark-hive:$sparkVersion",
s"org.apache.spark:spark-repl_$scalaVersion:$sparkVersion", s"org.apache.spark::spark-graphx:$sparkVersion",
s"org.apache.spark:spark-yarn_$scalaVersion:$sparkVersion" s"org.apache.spark::spark-mllib:$sparkVersion",
) s"org.apache.spark::spark-streaming:$sparkVersion",
s"org.apache.spark::spark-yarn:$sparkVersion",
s"org.apache.spark::spark-sql:$sparkVersion",
s"org.apache.hadoop:hadoop-client:$yarnVersion",
s"org.apache.hadoop:hadoop-yarn-server-web-proxy:$yarnVersion",
s"org.apache.hadoop:hadoop-yarn-server-nodemanager:$yarnVersion"
)
else
Seq(
s"org.apache.spark:spark-core_$scalaVersion:$sparkVersion",
s"org.apache.spark:spark-bagel_$scalaVersion:$sparkVersion",
s"org.apache.spark:spark-mllib_$scalaVersion:$sparkVersion",
s"org.apache.spark:spark-streaming_$scalaVersion:$sparkVersion",
s"org.apache.spark:spark-graphx_$scalaVersion:$sparkVersion",
s"org.apache.spark:spark-sql_$scalaVersion:$sparkVersion",
s"org.apache.spark:spark-repl_$scalaVersion:$sparkVersion",
s"org.apache.spark:spark-yarn_$scalaVersion:$sparkVersion"
)
def sparkJarsHelper(
scalaVersion: String,
sparkVersion: String,
yarnVersion: String,
default: Boolean,
extraDependencies: Seq[String],
options: CommonOptions
): Helper = {
val base = if (default) sparkBaseDependencies(scalaVersion, sparkVersion, yarnVersion) else Seq()
new Helper(options, extraDependencies ++ base)
}
def sparkJars(
scalaVersion: String,
sparkVersion: String,
yarnVersion: String,
default: Boolean,
extraDependencies: Seq[String],
options: CommonOptions,
artifactTypes: Set[String] = Set("jar")
): Seq[File] = {
val helper = sparkJarsHelper(scalaVersion, sparkVersion, yarnVersion, default, extraDependencies, options)
helper.fetch(sources = false, javadoc = false, artifactTypes = artifactTypes)
}
def spark( def spark(
scalaVersion: String, scalaVersion: String,
sparkVersion: String, sparkVersion: String,
noDefault: Boolean, yarnVersion: String,
default: Boolean,
extraDependencies: Seq[String], extraDependencies: Seq[String],
options: CommonOptions, options: CommonOptions,
artifactTypes: Set[String] = Set("jar"), artifactTypes: Set[String] = Set("jar"),
checksumSeed: Array[Byte] = "v1".getBytes("UTF-8") checksumSeed: Array[Byte] = "v1".getBytes("UTF-8")
): Either[String, (File, Seq[File])] = { ): Either[String, (File, Seq[File])] = {
val base = if (noDefault) Seq() else sparkAssemblyDependencies(scalaVersion, sparkVersion) val helper = sparkJarsHelper(scalaVersion, sparkVersion, yarnVersion, default, extraDependencies, options)
val helper = new Helper(options, extraDependencies ++ base)
val artifacts = helper.artifacts(sources = false, javadoc = false, artifactTypes = artifactTypes) val artifacts = helper.artifacts(sources = false, javadoc = false, artifactTypes = artifactTypes)
val jars = helper.fetch(sources = false, javadoc = false, artifactTypes = artifactTypes) val jars = helper.fetch(sources = false, javadoc = false, artifactTypes = artifactTypes)