Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit cb12fec

Browse files
committed
Another round of comments
1 parent 3b587b4 commit cb12fec

File tree

4 files changed

+28
-24
lines changed

4 files changed

+28
-24
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@ private[spark] object Config extends Logging {
4040

4141
val DOCKER_IMAGE_PULL_POLICY =
4242
ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
43-
.doc("Docker image pull policy when pulling any docker image in Kubernetes integration")
43+
.doc("Kubernetes image pull policy. Valid values are Always, Never, and IfNotPresent.")
4444
.stringConf
45+
.checkValues(Set("Always", "Never", "IfNotPresent"))
4546
.createWithDefault("IfNotPresent")
4647

4748
val APISERVER_AUTH_DRIVER_CONF_PREFIX =
@@ -101,7 +102,7 @@ private[spark] object Config extends Logging {
101102
ConfigBuilder("spark.kubernetes.allocation.batch.delay")
102103
.doc("Number of seconds to wait between each round of executor allocation.")
103104
.longConf
104-
.checkValue(value => value > 0, s"Allocation batch delay should be a positive integer")
105+
.checkValue(value => value > 0, "Allocation batch delay should be a positive integer")
105106
.createWithDefault(1)
106107

107108
val KUBERNETES_EXECUTOR_LIMIT_CORES =

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ private[spark] object SparkKubernetesClientFactory {
9292
extends AnyVal {
9393

9494
def withOption[T]
95-
(option: Option[T])
96-
(configurator: ((T, ConfigBuilder) => ConfigBuilder)): ConfigBuilder = {
95+
(option: Option[T])
96+
(configurator: ((T, ConfigBuilder) => ConfigBuilder)): ConfigBuilder = {
9797
option.map { opt =>
9898
configurator(opt, configBuilder)
9999
}.getOrElse(configBuilder)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
9494
private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB
9595

9696
private val executorCores = sparkConf.getDouble("spark.executor.cores", 1)
97-
private val executorLimitCores = sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key)
97+
private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
9898

9999
override def createExecutorPod(
100100
executorId: String,

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala

+22-19
Original file line numberDiff line numberDiff line change
@@ -103,24 +103,26 @@ private[spark] class KubernetesClusterSchedulerBackend(
103103
val currentTotalRegisteredExecutors = totalRegisteredExecutors.get
104104
val currentTotalExpectedExecutors = totalExpectedExecutors.get
105105
val currentNodeToLocalTaskCount = getNodesWithLocalTaskCounts()
106-
if (currentTotalRegisteredExecutors < runningExecutorsToPods.size) {
107-
logDebug("Waiting for pending executors before scaling")
108-
} else if (currentTotalExpectedExecutors <= runningExecutorsToPods.size) {
109-
logDebug("Maximum allowed executor limit reached. Not scaling up further.")
110-
} else {
111-
for (i <- 0 until math.min(
112-
currentTotalExpectedExecutors - runningExecutorsToPods.size, podAllocationSize)) {
113-
val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString
114-
val executorPod = executorPodFactory.createExecutorPod(
115-
executorId,
116-
applicationId(),
117-
driverUrl,
118-
conf.getExecutorEnv,
119-
driverPod,
120-
currentNodeToLocalTaskCount)
121-
executorsToAllocate(executorId) = executorPod
122-
logInfo(
123-
s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}")
106+
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
107+
if (currentTotalRegisteredExecutors < runningExecutorsToPods.size) {
108+
logDebug("Waiting for pending executors before scaling")
109+
} else if (currentTotalExpectedExecutors <= runningExecutorsToPods.size) {
110+
logDebug("Maximum allowed executor limit reached. Not scaling up further.")
111+
} else {
112+
for (i <- 0 until math.min(
113+
currentTotalExpectedExecutors - runningExecutorsToPods.size, podAllocationSize)) {
114+
val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString
115+
val executorPod = executorPodFactory.createExecutorPod(
116+
executorId,
117+
applicationId(),
118+
driverUrl,
119+
conf.getExecutorEnv,
120+
driverPod,
121+
currentNodeToLocalTaskCount)
122+
executorsToAllocate(executorId) = executorPod
123+
logInfo(
124+
s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}")
125+
}
124126
}
125127
}
126128

@@ -182,7 +184,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
182184

183185
def deleteExecutorFromClusterAndDataStructures(executorId: String): Unit = {
184186
deleteExecutorFromDataStructures(executorId).foreach { pod =>
185-
kubernetesClient.pods().delete(pod) }
187+
kubernetesClient.pods().delete(pod)
188+
}
186189
}
187190

188191
def deleteExecutorFromDataStructures(executorId: String): Option[Pod] = {

0 commit comments

Comments
 (0)