Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 0ab9ca7

Browse files
committed
One more round of comments
1 parent 71a971f commit 0ab9ca7

File tree

6 files changed

+94
-107
lines changed

6 files changed

+94
-107
lines changed

resource-managers/kubernetes/core/pom.xml

+1-9
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,7 @@
5454
<exclusions>
5555
<exclusion>
5656
<groupId>com.fasterxml.jackson.core</groupId>
57-
<artifactId>jackson-core</artifactId>
58-
</exclusion>
59-
<exclusion>
60-
<groupId>com.fasterxml.jackson.core</groupId>
61-
<artifactId>jackson-databind</artifactId>
62-
</exclusion>
63-
<exclusion>
64-
<groupId>com.fasterxml.jackson.core</groupId>
65-
<artifactId>jackson-annotations</artifactId>
57+
<artifactId>*</artifactId>
6658
</exclusion>
6759
<exclusion>
6860
<groupId>com.fasterxml.jackson.dataformat</groupId>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala

+1-8
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,7 @@ private[spark] object ConfigurationUtils {
3434
sparkConf: SparkConf,
3535
prefix: String,
3636
configType: String): Map[String, String] = {
37-
val fromPrefix = sparkConf.getAllWithPrefix(prefix)
38-
fromPrefix.groupBy(_._1).foreach {
39-
case (key, values) =>
40-
require(values.size == 1,
41-
s"Cannot have multiple values for a given $configType key, got key $key with" +
42-
s" values $values")
43-
}
44-
fromPrefix.toMap
37+
sparkConf.getAllWithPrefix(prefix).toMap
4538
}
4639

4740
def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,13 @@ private[spark] object SparkKubernetesClientFactory {
4040
namespace: Option[String],
4141
kubernetesAuthConfPrefix: String,
4242
sparkConf: SparkConf,
43-
maybeServiceAccountToken: Option[File],
44-
maybeServiceAccountCaCert: Option[File]): KubernetesClient = {
43+
defaultServiceAccountToken: Option[File],
44+
defaultServiceAccountCaCert: Option[File]): KubernetesClient = {
4545
val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX"
4646
val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX"
4747
val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf)
4848
.map(new File(_))
49-
.orElse(maybeServiceAccountToken)
49+
.orElse(defaultServiceAccountToken)
5050
val oauthTokenValue = sparkConf.getOption(oauthTokenConf)
5151
ConfigurationUtils.requireNandDefined(
5252
oauthTokenFile,
@@ -56,7 +56,7 @@ private[spark] object SparkKubernetesClientFactory {
5656

5757
val caCertFile = sparkConf
5858
.getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX")
59-
.orElse(maybeServiceAccountCaCert.map(_.getAbsolutePath))
59+
.orElse(defaultServiceAccountCaCert.map(_.getAbsolutePath))
6060
val clientKeyFile = sparkConf
6161
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX")
6262
val clientCertFile = sparkConf

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala

+23-23
Original file line numberDiff line numberDiff line change
@@ -16,45 +16,45 @@
1616
*/
1717
package org.apache.spark.deploy.k8s
1818

19-
import org.apache.spark.{SPARK_VERSION => sparkVersion}
19+
import org.apache.spark.SPARK_VERSION
2020
import org.apache.spark.internal.Logging
2121
import org.apache.spark.internal.config.ConfigBuilder
2222
import org.apache.spark.network.util.ByteUnit
2323

24-
package object config extends Logging {
24+
private[spark] object config extends Logging {
2525

26-
private[spark] val KUBERNETES_NAMESPACE =
26+
val KUBERNETES_NAMESPACE =
2727
ConfigBuilder("spark.kubernetes.namespace")
2828
.doc("The namespace that will be used for running the driver and executor pods. When using" +
2929
" spark-submit in cluster mode, this can also be passed to spark-submit via the" +
3030
" --kubernetes-namespace command line argument.")
3131
.stringConf
3232
.createWithDefault("default")
3333

34-
private[spark] val EXECUTOR_DOCKER_IMAGE =
34+
val EXECUTOR_DOCKER_IMAGE =
3535
ConfigBuilder("spark.kubernetes.executor.docker.image")
3636
.doc("Docker image to use for the executors. Specify this using the standard Docker tag" +
3737
" format.")
3838
.stringConf
39-
.createWithDefault(s"spark-executor:$sparkVersion")
39+
.createWithDefault(s"spark-executor:$SPARK_VERSION")
4040

41-
private[spark] val DOCKER_IMAGE_PULL_POLICY =
41+
val DOCKER_IMAGE_PULL_POLICY =
4242
ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
4343
.doc("Docker image pull policy when pulling any docker image in Kubernetes integration")
4444
.stringConf
4545
.createWithDefault("IfNotPresent")
4646

47-
private[spark] val APISERVER_AUTH_DRIVER_CONF_PREFIX =
47+
val APISERVER_AUTH_DRIVER_CONF_PREFIX =
4848
"spark.kubernetes.authenticate.driver"
49-
private[spark] val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
49+
val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
5050
"spark.kubernetes.authenticate.driver.mounted"
51-
private[spark] val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
52-
private[spark] val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
53-
private[spark] val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
54-
private[spark] val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
55-
private[spark] val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
51+
val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
52+
val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
53+
val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
54+
val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
55+
val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
5656

57-
private[spark] val KUBERNETES_SERVICE_ACCOUNT_NAME =
57+
val KUBERNETES_SERVICE_ACCOUNT_NAME =
5858
ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
5959
.doc("Service account that is used when running the driver pod. The driver pod uses" +
6060
" this service account when requesting executor pods from the API server. If specific" +
@@ -66,49 +66,49 @@ package object config extends Logging {
6666
// Note that while we set a default for this when we start up the
6767
// scheduler, the specific default value is dynamically determined
6868
// based on the executor memory.
69-
private[spark] val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD =
69+
val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD =
7070
ConfigBuilder("spark.kubernetes.executor.memoryOverhead")
7171
.doc("The amount of off-heap memory (in megabytes) to be allocated per executor. This" +
7272
" is memory that accounts for things like VM overheads, interned strings, other native" +
7373
" overheads, etc. This tends to grow with the executor size. (typically 6-10%).")
7474
.bytesConf(ByteUnit.MiB)
7575
.createOptional
7676

77-
private[spark] val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
78-
private[spark] val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
77+
val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
78+
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
7979

80-
private[spark] val KUBERNETES_DRIVER_POD_NAME =
80+
val KUBERNETES_DRIVER_POD_NAME =
8181
ConfigBuilder("spark.kubernetes.driver.pod.name")
8282
.doc("Name of the driver pod.")
8383
.stringConf
8484
.createOptional
8585

86-
private[spark] val KUBERNETES_EXECUTOR_POD_NAME_PREFIX =
86+
val KUBERNETES_EXECUTOR_POD_NAME_PREFIX =
8787
ConfigBuilder("spark.kubernetes.executor.podNamePrefix")
8888
.doc("Prefix to use in front of the executor pod names.")
8989
.internal()
9090
.stringConf
9191
.createWithDefault("spark")
9292

93-
private[spark] val KUBERNETES_ALLOCATION_BATCH_SIZE =
93+
val KUBERNETES_ALLOCATION_BATCH_SIZE =
9494
ConfigBuilder("spark.kubernetes.allocation.batch.size")
9595
.doc("Number of pods to launch at once in each round of executor allocation.")
9696
.intConf
9797
.checkValue(value => value > 0, "Allocation batch size should be a positive integer")
9898
.createWithDefault(5)
9999

100-
private[spark] val KUBERNETES_ALLOCATION_BATCH_DELAY =
100+
val KUBERNETES_ALLOCATION_BATCH_DELAY =
101101
ConfigBuilder("spark.kubernetes.allocation.batch.delay")
102102
.doc("Number of seconds to wait between each round of executor allocation.")
103103
.longConf
104104
.checkValue(value => value > 0, s"Allocation batch delay should be a positive integer")
105105
.createWithDefault(1)
106106

107-
private[spark] val KUBERNETES_EXECUTOR_LIMIT_CORES =
107+
val KUBERNETES_EXECUTOR_LIMIT_CORES =
108108
ConfigBuilder("spark.kubernetes.executor.limit.cores")
109109
.doc("Specify the hard cpu limit for a single executor pod")
110110
.stringConf
111111
.createOptional
112112

113-
private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
113+
val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
114114
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala

+18-16
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,13 @@ import org.apache.spark.deploy.k8s.constants._
2727
import org.apache.spark.util.Utils
2828

2929
/**
30-
* Configures executor pods. Construct one of these with a SparkConf to set up properties that are
31-
* common across all executors. Then, pass in dynamic parameters into createExecutorPod.
30+
* A factory class for configuring and creating executor pods.
3231
*/
3332
private[spark] trait ExecutorPodFactory {
33+
34+
/**
35+
* Configure and construct an executor pod with the given parameters.
36+
*/
3437
def createExecutorPod(
3538
executorId: String,
3639
applicationId: String,
@@ -161,12 +164,12 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
161164
val requiredPorts = Seq(
162165
(EXECUTOR_PORT_NAME, executorPort),
163166
(BLOCK_MANAGER_PORT_NAME, blockManagerPort))
164-
.map(port => {
167+
.map { case (name, port) =>
165168
new ContainerPortBuilder()
166-
.withName(port._1)
167-
.withContainerPort(port._2)
169+
.withName(name)
170+
.withContainerPort(port)
168171
.build()
169-
})
172+
}
170173

171174
val executorContainer = new ContainerBuilder()
172175
.withName(s"executor")
@@ -202,16 +205,15 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
202205
.endSpec()
203206
.build()
204207

205-
val containerWithExecutorLimitCores = executorLimitCores.map {
206-
limitCores =>
207-
val executorCpuLimitQuantity = new QuantityBuilder(false)
208-
.withAmount(limitCores)
209-
.build()
210-
new ContainerBuilder(executorContainer)
211-
.editResources()
212-
.addToLimits("cpu", executorCpuLimitQuantity)
213-
.endResources()
214-
.build()
208+
val containerWithExecutorLimitCores = executorLimitCores.map { limitCores =>
209+
val executorCpuLimitQuantity = new QuantityBuilder(false)
210+
.withAmount(limitCores)
211+
.build()
212+
new ContainerBuilder(executorContainer)
213+
.editResources()
214+
.addToLimits("cpu", executorCpuLimitQuantity)
215+
.endResources()
216+
.build()
215217
}.getOrElse(executorContainer)
216218

217219
new PodBuilder(executorPod)

0 commit comments

Comments
 (0)