Skip to content

add better support for integrating with external cluster manager #50770

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3377,7 +3377,7 @@ object SparkContext extends Logging {
}
}

private def getClusterManager(url: String): Option[ExternalClusterManager] = {
private[spark] def getClusterManager(url: String): Option[ExternalClusterManager] = {
val loader = Utils.getContextOrSparkClassLoader
val serviceLoaders =
ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
Expand Down
71 changes: 57 additions & 14 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,7 @@ private[spark] class SparkSubmit extends Logging {
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
error("Master must either be yarn or start with spark, k8s, or local")
-1
case _ => EXTERNAL
}
case None => LOCAL // default master or remote mode.
}
Expand Down Expand Up @@ -332,6 +330,7 @@ private[spark] class SparkSubmit extends Logging {
!sparkConf.get(ALLOW_CUSTOM_CLASSPATH_BY_PROXY_USER_IN_CLUSTER_MODE) &&
args.proxyUser != null &&
(isYarnCluster || isStandAloneCluster || isKubernetesCluster)
val isExternalCluster = clusterManager == EXTERNAL && deployMode == CLUSTER

if (!isStandAloneCluster) {
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
Expand Down Expand Up @@ -378,6 +377,38 @@ private[spark] class SparkSubmit extends Logging {
val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
val targetDir = Utils.createTempDir()

if (clusterManager == EXTERNAL) {
val loader = getSubmitClassLoader(sparkConf)
sparkConf
.get(EXTERNAL_CLUSTER_MANAGER_JARS)
.foreach(_
.split(",")
.map(downloadFile(_, targetDir, sparkConf, hadoopConf))
.foreach(addJarToClasspath(_, loader))
)

val maybeSubmitClass = sparkConf.get(EXTERNAL_CLUSTER_SUBMIT_CLASS)
if (maybeSubmitClass.isEmpty) {
error(
"External cluster submit class must be specified when running in external cluster mode."
)
}
maybeSubmitClass.foreach { submitClass =>
if (!Utils.classIsLoadable(submitClass) && !Utils.isTesting) {
error(
s"Could not load external cluster submit class: $submitClass. " +
"Make sure the classes are included in the classpath")
}
}
args.maybeMaster match {
case Some(url) if SparkContext.getClusterManager(url).isEmpty =>
error(s"Master URL is not recognized: $url")
case Some(_) =>
case None =>
error("Master URL must be specified when running in external cluster mode.")
}
}

// Kerberos is not supported in standalone mode
if (clusterManager != STANDALONE
&& args.principal != null
Expand Down Expand Up @@ -688,24 +719,28 @@ private[spark] class SparkSubmit extends Logging {
mergeFn = Some(mergeFileLists(_, _))),

// Other options
OptionAssigner(args.numExecutors, YARN | KUBERNETES, ALL_DEPLOY_MODES,
OptionAssigner(args.numExecutors, YARN | KUBERNETES | EXTERNAL, ALL_DEPLOY_MODES,
confKey = EXECUTOR_INSTANCES.key),
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
OptionAssigner(
args.executorCores, STANDALONE | YARN | KUBERNETES | EXTERNAL, ALL_DEPLOY_MODES,
confKey = EXECUTOR_CORES.key),
OptionAssigner(args.executorMemory, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
OptionAssigner(
args.executorMemory, STANDALONE | YARN | KUBERNETES | EXTERNAL, ALL_DEPLOY_MODES,
confKey = EXECUTOR_MEMORY.key),
OptionAssigner(args.totalExecutorCores, STANDALONE, ALL_DEPLOY_MODES,
confKey = CORES_MAX.key),
OptionAssigner(args.files, LOCAL | STANDALONE | KUBERNETES, ALL_DEPLOY_MODES,
OptionAssigner(
args.files, LOCAL | STANDALONE | KUBERNETES | EXTERNAL, ALL_DEPLOY_MODES,
confKey = FILES.key),
OptionAssigner(args.archives, LOCAL | STANDALONE | KUBERNETES, ALL_DEPLOY_MODES,
OptionAssigner(
args.archives, LOCAL | STANDALONE | KUBERNETES | EXTERNAL, ALL_DEPLOY_MODES,
confKey = ARCHIVES.key),
OptionAssigner(args.jars, LOCAL, CLIENT, confKey = JARS.key),
OptionAssigner(args.jars, STANDALONE | KUBERNETES, ALL_DEPLOY_MODES,
OptionAssigner(args.jars, STANDALONE | KUBERNETES | EXTERNAL, ALL_DEPLOY_MODES,
confKey = JARS.key),
OptionAssigner(args.driverMemory, STANDALONE | YARN | KUBERNETES, CLUSTER,
OptionAssigner(args.driverMemory, STANDALONE | YARN | KUBERNETES | EXTERNAL, CLUSTER,
confKey = DRIVER_MEMORY.key),
OptionAssigner(args.driverCores, STANDALONE | YARN | KUBERNETES, CLUSTER,
OptionAssigner(args.driverCores, STANDALONE | YARN | KUBERNETES | EXTERNAL, CLUSTER,
confKey = DRIVER_CORES.key),
OptionAssigner(args.supervise.toString, STANDALONE, CLUSTER,
confKey = DRIVER_SUPERVISE.key),
Expand Down Expand Up @@ -831,8 +866,15 @@ private[spark] class SparkSubmit extends Logging {
}
}

if (isKubernetesCluster) {
childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS
if (isKubernetesCluster || isExternalCluster) {
childMainClass =
if (isKubernetesCluster) KUBERNETES_CLUSTER_SUBMIT_CLASS
else sparkConf
.get(EXTERNAL_CLUSTER_SUBMIT_CLASS)
.getOrElse(
throw new SparkException(
s"Config option ${EXTERNAL_CLUSTER_SUBMIT_CLASS} must be set when " +
s"using an external cluster manager like ${args.maybeMaster.get}"))
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
if (args.isPython) {
childArgs ++= Array("--primary-py-file", args.primaryResource)
Expand Down Expand Up @@ -1066,7 +1108,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
private val STANDALONE = 2
private val LOCAL = 8
private val KUBERNETES = 16
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | LOCAL | KUBERNETES
private val EXTERNAL = 32
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | LOCAL | KUBERNETES | EXTERNAL

// Deploy modes
private val CLIENT = 1
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2830,4 +2830,26 @@ package object config {
.checkValues(Set("connect", "classic"))
.createWithDefault(
if (sys.env.get("SPARK_CONNECT_MODE").contains("1")) "connect" else "classic")

private[spark] val EXTERNAL_CLUSTER_SUBMIT_CLASS =
ConfigBuilder("spark.submit.external.class")
.doc(
"""
|The class to use for submitting applications to the cluster.
|This is used in external cluster mode to submit the application to the cluster.
|""".stripMargin)
.version("4.0.0")
.stringConf
.createOptional

private[spark] val EXTERNAL_CLUSTER_MANAGER_JARS =
ConfigBuilder("spark.submit.external.jars")
.doc(
"""
|The JAR which contains the external cluster manager implementation.
|This is used in external cluster mode to submit the application to the cluster.
|""".stripMargin)
.version("4.0.0")
.stringConf
.createOptional
}
50 changes: 50 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,34 @@ class SparkSubmitSuite
conf.get("spark.kubernetes.driver.container.image") should be ("bar")
}

test("handles external cluster mode") {
val clArgs = Seq(
"--deploy-mode", "cluster",
"--proxy-user", "test.user",
"--master", "myclusterManager",
"--executor-memory", "5g",
"--class", "org.SomeClass",
"--driver-memory", "4g",
"--conf", "spark.submit.external.class=org.apache.spark.submit.ArmadaClusterSubmit",
"/home/thejar.jar",
"arg1")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, conf, mainClass) = submit.prepareSubmitEnvironment(appArgs)

val childArgsMap = childArgs.grouped(2).map(a => a(0) -> a(1)).toMap
childArgsMap.get("--primary-java-resource").get should endWith ("/home/thejar.jar")
childArgsMap.get("--main-class") should be (Some("org.SomeClass"))
childArgsMap.get("--arg") should be (Some("arg1"))
childArgsMap.get("--proxy-user") should be (Some("test.user"))
mainClass should be ("org.apache.spark.submit.ArmadaClusterSubmit")
classpath should have length (0)
conf.get("spark.master") should be ("myclusterManager")
conf.get("spark.executor.memory") should be ("5g")
conf.get("spark.driver.memory") should be ("4g")
conf.get("spark.submit.external.class") should be(
"org.apache.spark.submit.ArmadaClusterSubmit")
}

test("SPARK-35084: include jars of the --packages" +
" in k8s client mode & driver runs inside a POD") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
Expand Down Expand Up @@ -812,6 +840,28 @@ class SparkSubmitSuite
}
}

test("launch simple application with spark-submit and external master in client mode") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val jar1 = TestUtils.createJarWithClasses(Seq("ExternalClusterSubmit"))
val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",")
val args = Seq(
"--name", "testApp",
"--master", "myclusterManager",
"--deploy-mode", "client",
// "--proxy-user", "test.user",
"--jars", jarsString,
"--executor-memory", "5g",
"--class", JarCreationTest.getClass.getName.stripSuffix("$"),
"--driver-memory", "4g",
"--conf", "spark.submit.external.class=ExternalClusterSubmit",
"--conf", "spark.ui.enabled=false",
"--conf", "spark.master.rest.enabled=false",
unusedJar.toString, "SparkSubmitClassA"
)
runSparkSubmit(args)
}

test("includes jars passed in through --jars") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
Expand Down