Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit f6fdd6a

Browse files
committed
Spark on Kubernetes - basic scheduler backend
1 parent bd4eb9c commit f6fdd6a

File tree

13 files changed

+1830
-0
lines changed

13 files changed

+1830
-0
lines changed

pom.xml

+7
Original file line numberDiff line numberDiff line change
@@ -2648,6 +2648,13 @@
26482648
</modules>
26492649
</profile>
26502650

2651+
<profile>
2652+
<id>kubernetes</id>
2653+
<modules>
2654+
<module>resource-managers/kubernetes/core</module>
2655+
</modules>
2656+
</profile>
2657+
26512658
<profile>
26522659
<id>hive-thriftserver</id>
26532660
<modules>
+102
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
19+
<modelVersion>4.0.0</modelVersion>
20+
<parent>
21+
<groupId>org.apache.spark</groupId>
22+
<artifactId>spark-parent_2.11</artifactId>
23+
<version>2.3.0-SNAPSHOT</version>
24+
<relativePath>../../../pom.xml</relativePath>
25+
</parent>
26+
27+
<artifactId>spark-kubernetes_2.11</artifactId>
28+
<packaging>jar</packaging>
29+
<name>Spark Project Kubernetes</name>
30+
<properties>
31+
<sbt.project.name>kubernetes</sbt.project.name>
32+
<kubernetes.client.version>2.2.13</kubernetes.client.version>
33+
</properties>
34+
35+
<dependencies>
36+
<dependency>
37+
<groupId>org.apache.spark</groupId>
38+
<artifactId>spark-core_${scala.binary.version}</artifactId>
39+
<version>${project.version}</version>
40+
</dependency>
41+
42+
<dependency>
43+
<groupId>org.apache.spark</groupId>
44+
<artifactId>spark-core_${scala.binary.version}</artifactId>
45+
<version>${project.version}</version>
46+
<type>test-jar</type>
47+
<scope>test</scope>
48+
</dependency>
49+
50+
<dependency>
51+
<groupId>io.fabric8</groupId>
52+
<artifactId>kubernetes-client</artifactId>
53+
<version>${kubernetes.client.version}</version>
54+
<exclusions>
55+
<exclusion>
56+
<groupId>com.fasterxml.jackson.core</groupId>
57+
<artifactId>jackson-core</artifactId>
58+
</exclusion>
59+
<exclusion>
60+
<groupId>com.fasterxml.jackson.core</groupId>
61+
<artifactId>jackson-databind</artifactId>
62+
</exclusion>
63+
<exclusion>
64+
<groupId>com.fasterxml.jackson.core</groupId>
65+
<artifactId>jackson-annotations</artifactId>
66+
</exclusion>
67+
<exclusion>
68+
<groupId>com.fasterxml.jackson.dataformat</groupId>
69+
<artifactId>jackson-dataformat-yaml</artifactId>
70+
</exclusion>
71+
</exclusions>
72+
</dependency>
73+
74+
<!-- Required by kubernetes-client but we exclude it -->
75+
<dependency>
76+
<groupId>com.fasterxml.jackson.dataformat</groupId>
77+
<artifactId>jackson-dataformat-yaml</artifactId>
78+
<version>${fasterxml.jackson.version}</version>
79+
</dependency>
80+
81+
<!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive -->
82+
<dependency>
83+
<groupId>com.google.guava</groupId>
84+
<artifactId>guava</artifactId>
85+
</dependency>
86+
<!-- End of shaded deps. -->
87+
88+
<dependency>
89+
<groupId>org.mockito</groupId>
90+
<artifactId>mockito-core</artifactId>
91+
<scope>test</scope>
92+
</dependency>
93+
94+
</dependencies>
95+
96+
97+
<build>
98+
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
99+
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
100+
</build>
101+
102+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.k8s
19+
20+
import org.apache.spark.{SparkConf, SparkException}
21+
import org.apache.spark.internal.Logging
22+
23+
private[spark] object ConfigurationUtils extends Logging {
24+
def parsePrefixedKeyValuePairs(
25+
sparkConf: SparkConf,
26+
prefix: String,
27+
configType: String): Map[String, String] = {
28+
val fromPrefix = sparkConf.getAllWithPrefix(prefix)
29+
fromPrefix.groupBy(_._1).foreach {
30+
case (key, values) =>
31+
require(values.size == 1,
32+
s"Cannot have multiple values for a given $configType key, got key $key with" +
33+
s" values $values")
34+
}
35+
fromPrefix.toMap
36+
}
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s
18+
19+
private[spark] object OptionRequirements {
20+
21+
def requireBothOrNeitherDefined(
22+
opt1: Option[_],
23+
opt2: Option[_],
24+
errMessageWhenFirstIsMissing: String,
25+
errMessageWhenSecondIsMissing: String): Unit = {
26+
requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing)
27+
requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing)
28+
}
29+
30+
def requireSecondIfFirstIsDefined(
31+
opt1: Option[_], opt2: Option[_], errMessageWhenSecondIsMissing: String): Unit = {
32+
opt1.foreach { _ =>
33+
require(opt2.isDefined, errMessageWhenSecondIsMissing)
34+
}
35+
}
36+
37+
def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
38+
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s
18+
19+
import java.io.File
20+
21+
import com.google.common.base.Charsets
22+
import com.google.common.io.Files
23+
import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
24+
import io.fabric8.kubernetes.client.utils.HttpClientUtils
25+
import okhttp3.Dispatcher
26+
27+
import org.apache.spark.SparkConf
28+
import org.apache.spark.deploy.k8s.config._
29+
import org.apache.spark.util.ThreadUtils
30+
31+
/**
32+
* Spark-opinionated builder for Kubernetes clients. It uses a prefix plus common suffixes to
33+
* parse configuration keys, similar to the manner in which Spark's SecurityManager parses SSL
34+
* options for different components.
35+
*/
36+
private[spark] object SparkKubernetesClientFactory {
37+
38+
def createKubernetesClient(
39+
master: String,
40+
namespace: Option[String],
41+
kubernetesAuthConfPrefix: String,
42+
sparkConf: SparkConf,
43+
maybeServiceAccountToken: Option[File],
44+
maybeServiceAccountCaCert: Option[File]): KubernetesClient = {
45+
val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX"
46+
val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX"
47+
val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf)
48+
.map(new File(_))
49+
.orElse(maybeServiceAccountToken)
50+
val oauthTokenValue = sparkConf.getOption(oauthTokenConf)
51+
OptionRequirements.requireNandDefined(
52+
oauthTokenFile,
53+
oauthTokenValue,
54+
s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a" +
55+
s" value $oauthTokenConf.")
56+
57+
val caCertFile = sparkConf
58+
.getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX")
59+
.orElse(maybeServiceAccountCaCert.map(_.getAbsolutePath))
60+
val clientKeyFile = sparkConf
61+
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX")
62+
val clientCertFile = sparkConf
63+
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_CERT_FILE_CONF_SUFFIX")
64+
val dispatcher = new Dispatcher(
65+
ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher"))
66+
val config = new ConfigBuilder()
67+
.withApiVersion("v1")
68+
.withMasterUrl(master)
69+
.withWebsocketPingInterval(0)
70+
.withOption(oauthTokenValue) {
71+
(token, configBuilder) => configBuilder.withOauthToken(token)
72+
}.withOption(oauthTokenFile) {
73+
(file, configBuilder) =>
74+
configBuilder.withOauthToken(Files.toString(file, Charsets.UTF_8))
75+
}.withOption(caCertFile) {
76+
(file, configBuilder) => configBuilder.withCaCertFile(file)
77+
}.withOption(clientKeyFile) {
78+
(file, configBuilder) => configBuilder.withClientKeyFile(file)
79+
}.withOption(clientCertFile) {
80+
(file, configBuilder) => configBuilder.withClientCertFile(file)
81+
}.withOption(namespace) {
82+
(ns, configBuilder) => configBuilder.withNamespace(ns)
83+
}.build()
84+
val baseHttpClient = HttpClientUtils.createHttpClient(config)
85+
val httpClientWithCustomDispatcher = baseHttpClient.newBuilder()
86+
.dispatcher(dispatcher)
87+
.build()
88+
new DefaultKubernetesClient(httpClientWithCustomDispatcher, config)
89+
}
90+
91+
private implicit class OptionConfigurableConfigBuilder(configBuilder: ConfigBuilder) {
92+
93+
def withOption[T]
94+
(option: Option[T])
95+
(configurator: ((T, ConfigBuilder) => ConfigBuilder)): OptionConfigurableConfigBuilder = {
96+
new OptionConfigurableConfigBuilder(option.map { opt =>
97+
configurator(opt, configBuilder)
98+
}.getOrElse(configBuilder))
99+
}
100+
101+
def build(): Config = configBuilder.build()
102+
}
103+
}

0 commit comments

Comments
 (0)