diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a82fd5264337e..a5d8bbdbc0989 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -3377,7 +3377,7 @@ object SparkContext extends Logging { } } - private def getClusterManager(url: String): Option[ExternalClusterManager] = { + private[spark] def getClusterManager(url: String): Option[ExternalClusterManager] = { val loader = Utils.getContextOrSparkClassLoader val serviceLoaders = ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url)) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index c529e37e7e1bf..4bf8ce96dbea5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -260,9 +260,7 @@ private[spark] class SparkSubmit extends Logging { case m if m.startsWith("spark") => STANDALONE case m if m.startsWith("k8s") => KUBERNETES case m if m.startsWith("local") => LOCAL - case _ => - error("Master must either be yarn or start with spark, k8s, or local") - -1 + case _ => EXTERNAL } case None => LOCAL // default master or remote mode. } @@ -332,6 +330,7 @@ private[spark] class SparkSubmit extends Logging { !sparkConf.get(ALLOW_CUSTOM_CLASSPATH_BY_PROXY_USER_IN_CLUSTER_MODE) && args.proxyUser != null && (isYarnCluster || isStandAloneCluster || isKubernetesCluster) + val isExternalCluster = clusterManager == EXTERNAL && deployMode == CLUSTER if (!isStandAloneCluster) { // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files @@ -378,6 +377,38 @@ private[spark] class SparkSubmit extends Logging { val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf)) val targetDir = Utils.createTempDir() + if (clusterManager == EXTERNAL) { + val loader = getSubmitClassLoader(sparkConf) + sparkConf + .get(EXTERNAL_CLUSTER_MANAGER_JARS) + .foreach(_ + .split(",") + .map(downloadFile(_, targetDir, sparkConf, hadoopConf)) + .foreach(addJarToClasspath(_, loader)) + ) + + val maybeSubmitClass = sparkConf.get(EXTERNAL_CLUSTER_SUBMIT_CLASS) + if (maybeSubmitClass.isEmpty) { + error( + "External cluster submit class must be specified when running in external cluster mode." + ) + } + maybeSubmitClass.foreach { submitClass => + if (!Utils.classIsLoadable(submitClass) && !Utils.isTesting) { + error( + s"Could not load external cluster submit class: $submitClass. " + + "Make sure the classes are included in the classpath") + } + } + args.maybeMaster match { + case Some(url) if SparkContext.getClusterManager(url).isEmpty => + error(s"Master URL is not recognized: $url") + case Some(_) => + case None => + error("Master URL must be specified when running in external cluster mode.") + } + } + // Kerberos is not supported in standalone mode if (clusterManager != STANDALONE && args.principal != null @@ -688,24 +719,28 @@ private[spark] class SparkSubmit extends Logging { mergeFn = Some(mergeFileLists(_, _))), // Other options - OptionAssigner(args.numExecutors, YARN | KUBERNETES, ALL_DEPLOY_MODES, + OptionAssigner(args.numExecutors, YARN | KUBERNETES | EXTERNAL, ALL_DEPLOY_MODES, confKey = EXECUTOR_INSTANCES.key), - OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES, + OptionAssigner( + args.executorCores, STANDALONE | YARN | KUBERNETES | EXTERNAL, ALL_DEPLOY_MODES, confKey = EXECUTOR_CORES.key), - OptionAssigner(args.executorMemory, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES, + OptionAssigner( + args.executorMemory, STANDALONE | YARN | KUBERNETES | EXTERNAL, ALL_DEPLOY_MODES, confKey = EXECUTOR_MEMORY.key), OptionAssigner(args.totalExecutorCores, STANDALONE, ALL_DEPLOY_MODES, confKey = CORES_MAX.key), - OptionAssigner(args.files, LOCAL | STANDALONE | KUBERNETES, ALL_DEPLOY_MODES, + OptionAssigner( + args.files, LOCAL | STANDALONE | KUBERNETES | EXTERNAL, ALL_DEPLOY_MODES, confKey = FILES.key), - OptionAssigner(args.archives, LOCAL | STANDALONE | KUBERNETES, ALL_DEPLOY_MODES, + OptionAssigner( + args.archives, LOCAL | STANDALONE | KUBERNETES | EXTERNAL, ALL_DEPLOY_MODES, confKey = ARCHIVES.key), OptionAssigner(args.jars, LOCAL, CLIENT, confKey = JARS.key), - OptionAssigner(args.jars, STANDALONE | KUBERNETES, ALL_DEPLOY_MODES, + OptionAssigner(args.jars, STANDALONE | KUBERNETES | EXTERNAL, ALL_DEPLOY_MODES, confKey = JARS.key), - OptionAssigner(args.driverMemory, STANDALONE | YARN | KUBERNETES, CLUSTER, + OptionAssigner(args.driverMemory, STANDALONE | YARN | KUBERNETES | EXTERNAL, CLUSTER, confKey = DRIVER_MEMORY.key), - OptionAssigner(args.driverCores, STANDALONE | YARN | KUBERNETES, CLUSTER, + OptionAssigner(args.driverCores, STANDALONE | YARN | KUBERNETES | EXTERNAL, CLUSTER, confKey = DRIVER_CORES.key), OptionAssigner(args.supervise.toString, STANDALONE, CLUSTER, confKey = DRIVER_SUPERVISE.key), @@ -831,8 +866,15 @@ private[spark] class SparkSubmit extends Logging { } } - if (isKubernetesCluster) { - childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS + if (isKubernetesCluster || isExternalCluster) { + childMainClass = + if (isKubernetesCluster) KUBERNETES_CLUSTER_SUBMIT_CLASS + else sparkConf + .get(EXTERNAL_CLUSTER_SUBMIT_CLASS) + .getOrElse( + throw new SparkException( + s"Config option ${EXTERNAL_CLUSTER_SUBMIT_CLASS} must be set when " + + s"using an external cluster manager like ${args.maybeMaster.get}")) if (args.primaryResource != SparkLauncher.NO_RESOURCE) { if (args.isPython) { childArgs ++= Array("--primary-py-file", args.primaryResource) @@ -1066,7 +1108,8 @@ object SparkSubmit extends CommandLineUtils with Logging { private val STANDALONE = 2 private val LOCAL = 8 private val KUBERNETES = 16 - private val ALL_CLUSTER_MGRS = YARN | STANDALONE | LOCAL | KUBERNETES + private val EXTERNAL = 32 + private val ALL_CLUSTER_MGRS = YARN | STANDALONE | LOCAL | KUBERNETES | EXTERNAL // Deploy modes private val CLIENT = 1 diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 039387cba719f..ed0c2a6cb6ee4 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2830,4 +2830,26 @@ package object config { .checkValues(Set("connect", "classic")) .createWithDefault( if (sys.env.get("SPARK_CONNECT_MODE").contains("1")) "connect" else "classic") + + private[spark] val EXTERNAL_CLUSTER_SUBMIT_CLASS = + ConfigBuilder("spark.submit.external.class") + .doc( + """ + |The class to use for submitting applications to the cluster. + |This is used in external cluster mode to submit the application to the cluster. + |""".stripMargin) + .version("4.0.0") + .stringConf + .createOptional + + private[spark] val EXTERNAL_CLUSTER_MANAGER_JARS = + ConfigBuilder("spark.submit.external.jars") + .doc( + """ + |The JAR which contains the external cluster manager implementation. + |This is used in external cluster mode to submit the application to the cluster. + |""".stripMargin) + .version("4.0.0") + .stringConf + .createOptional } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index bd34e6f2bba3d..9c2562f1e635b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -464,6 +464,34 @@ class SparkSubmitSuite conf.get("spark.kubernetes.driver.container.image") should be ("bar") } + test("handles external cluster mode") { + val clArgs = Seq( + "--deploy-mode", "cluster", + "--proxy-user", "test.user", + "--master", "myclusterManager", + "--executor-memory", "5g", + "--class", "org.SomeClass", + "--driver-memory", "4g", + "--conf", "spark.submit.external.class=org.apache.spark.submit.ArmadaClusterSubmit", + "/home/thejar.jar", + "arg1") + val appArgs = new SparkSubmitArguments(clArgs) + val (childArgs, classpath, conf, mainClass) = submit.prepareSubmitEnvironment(appArgs) + + val childArgsMap = childArgs.grouped(2).map(a => a(0) -> a(1)).toMap + childArgsMap.get("--primary-java-resource").get should endWith ("/home/thejar.jar") + childArgsMap.get("--main-class") should be (Some("org.SomeClass")) + childArgsMap.get("--arg") should be (Some("arg1")) + childArgsMap.get("--proxy-user") should be (Some("test.user")) + mainClass should be ("org.apache.spark.submit.ArmadaClusterSubmit") + classpath should have length (0) + conf.get("spark.master") should be ("myclusterManager") + conf.get("spark.executor.memory") should be ("5g") + conf.get("spark.driver.memory") should be ("4g") + conf.get("spark.submit.external.class") should be( + "org.apache.spark.submit.ArmadaClusterSubmit") + } + test("SPARK-35084: include jars of the --packages" + " in k8s client mode & driver runs inside a POD") { val unusedJar = TestUtils.createJarWithClasses(Seq.empty) @@ -812,6 +840,28 @@ class SparkSubmitSuite } } + test("launch simple application with spark-submit and external master in client mode") { + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val jar1 = TestUtils.createJarWithClasses(Seq("ExternalClusterSubmit")) + val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA")) + val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",") + val args = Seq( + "--name", "testApp", + "--master", "myclusterManager", + "--deploy-mode", "client", +// "--proxy-user", "test.user", + "--jars", jarsString, + "--executor-memory", "5g", + "--class", JarCreationTest.getClass.getName.stripSuffix("$"), + "--driver-memory", "4g", + "--conf", "spark.submit.external.class=ExternalClusterSubmit", + "--conf", "spark.ui.enabled=false", + "--conf", "spark.master.rest.enabled=false", + unusedJar.toString, "SparkSubmitClassA" + ) + runSparkSubmit(args) + } + test("includes jars passed in through --jars") { val unusedJar = TestUtils.createJarWithClasses(Seq.empty) val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))