Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 7afce3f

Browse files
committed
Addressed more comments
1 parent 7f14b71 commit 7afce3f

File tree

6 files changed

+46
-47
lines changed

6 files changed

+46
-47
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala

+7-8
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import java.io.File
2020

2121
import com.google.common.base.Charsets
2222
import com.google.common.io.Files
23-
import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
23+
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
2424
import io.fabric8.kubernetes.client.utils.HttpClientUtils
2525
import okhttp3.Dispatcher
2626

@@ -88,16 +88,15 @@ private[spark] object SparkKubernetesClientFactory {
8888
new DefaultKubernetesClient(httpClientWithCustomDispatcher, config)
8989
}
9090

91-
private implicit class OptionConfigurableConfigBuilder(configBuilder: ConfigBuilder) {
91+
private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder)
92+
extends AnyVal {
9293

9394
def withOption[T]
94-
(option: Option[T])
95-
(configurator: ((T, ConfigBuilder) => ConfigBuilder)): OptionConfigurableConfigBuilder = {
96-
new OptionConfigurableConfigBuilder(option.map { opt =>
95+
(option: Option[T])
96+
(configurator: ((T, ConfigBuilder) => ConfigBuilder)): ConfigBuilder = {
97+
option.map { opt =>
9798
configurator(opt, configBuilder)
98-
}.getOrElse(configBuilder))
99+
}.getOrElse(configBuilder)
99100
}
100-
101-
def build(): Config = configBuilder.build()
102101
}
103102
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala

+24-23
Original file line numberDiff line numberDiff line change
@@ -16,34 +16,35 @@
1616
*/
1717
package org.apache.spark.deploy.k8s
1818

19-
package object constants {
19+
private[spark] object constants {
20+
2021
// Labels
21-
private[spark] val SPARK_APP_ID_LABEL = "spark-app-selector"
22-
private[spark] val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id"
23-
private[spark] val SPARK_ROLE_LABEL = "spark-role"
24-
private[spark] val SPARK_POD_DRIVER_ROLE = "driver"
25-
private[spark] val SPARK_POD_EXECUTOR_ROLE = "executor"
22+
val SPARK_APP_ID_LABEL = "spark-app-selector"
23+
val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id"
24+
val SPARK_ROLE_LABEL = "spark-role"
25+
val SPARK_POD_DRIVER_ROLE = "driver"
26+
val SPARK_POD_EXECUTOR_ROLE = "executor"
2627

2728
// Default and fixed ports
28-
private[spark] val DEFAULT_DRIVER_PORT = 7078
29-
private[spark] val DEFAULT_BLOCKMANAGER_PORT = 7079
30-
private[spark] val BLOCK_MANAGER_PORT_NAME = "blockmanager"
31-
private[spark] val EXECUTOR_PORT_NAME = "executor"
29+
val DEFAULT_DRIVER_PORT = 7078
30+
val DEFAULT_BLOCKMANAGER_PORT = 7079
31+
val BLOCK_MANAGER_PORT_NAME = "blockmanager"
32+
val EXECUTOR_PORT_NAME = "executor"
3233

3334
// Environment Variables
34-
private[spark] val ENV_EXECUTOR_PORT = "SPARK_EXECUTOR_PORT"
35-
private[spark] val ENV_DRIVER_URL = "SPARK_DRIVER_URL"
36-
private[spark] val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES"
37-
private[spark] val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"
38-
private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
39-
private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
40-
private[spark] val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP"
41-
private[spark] val ENV_EXECUTOR_EXTRA_CLASSPATH = "SPARK_EXECUTOR_EXTRA_CLASSPATH"
42-
private[spark] val ENV_MOUNTED_CLASSPATH = "SPARK_MOUNTED_CLASSPATH"
43-
private[spark] val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
35+
val ENV_EXECUTOR_PORT = "SPARK_EXECUTOR_PORT"
36+
val ENV_DRIVER_URL = "SPARK_DRIVER_URL"
37+
val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES"
38+
val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"
39+
val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
40+
val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
41+
val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP"
42+
val ENV_EXECUTOR_EXTRA_CLASSPATH = "SPARK_EXECUTOR_EXTRA_CLASSPATH"
43+
val ENV_MOUNTED_CLASSPATH = "SPARK_MOUNTED_CLASSPATH"
44+
val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
4445

4546
// Miscellaneous
46-
private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
47-
private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10
48-
private[spark] val MEMORY_OVERHEAD_MIN_MIB = 384L
47+
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
48+
val MEMORY_OVERHEAD_FACTOR = 0.10
49+
val MEMORY_OVERHEAD_MIN_MIB = 384L
4950
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
6161
require(
6262
!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
6363
s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" +
64-
s" Spark.")
64+
" Spark.")
6565
require(
6666
!executorLabels.contains(SPARK_ROLE_LABEL),
6767
s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.")
@@ -172,7 +172,7 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
172172
}
173173

174174
val executorContainer = new ContainerBuilder()
175-
.withName(s"executor")
175+
.withName("executor")
176176
.withImage(executorDockerImage)
177177
.withImagePullPolicy(dockerImagePullPolicy)
178178
.withNewResources()

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala

+4-5
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
330330
if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
331331
log.warn(s"Executor with id $executorId was not marked as disconnected, but the" +
332332
s" watch received an event of type $action for this executor. The executor may" +
333-
s" have failed to start in the first place and never registered with the driver.")
333+
" have failed to start in the first place and never registered with the driver.")
334334
}
335335
disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
336336

@@ -354,7 +354,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
354354

355355
private def getExecutorExitStatus(containerStatus: ContainerStatus): Int = {
356356
Option(containerStatus.getState).map { containerState =>
357-
Option(containerState.getTerminated).map {containerStateTerminated =>
357+
Option(containerState.getTerminated).map { containerStateTerminated =>
358358
containerStateTerminated.getExitCode.intValue()
359359
}.getOrElse(UNKNOWN_EXIT_CODE)
360360
}.getOrElse(UNKNOWN_EXIT_CODE)
@@ -403,8 +403,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
403403
}
404404

405405
private class KubernetesDriverEndpoint(
406-
rpcEnv: RpcEnv,
407-
sparkProperties: Seq[(String, String)])
406+
rpcEnv: RpcEnv,
407+
sparkProperties: Seq[(String, String)])
408408
extends DriverEndpoint(rpcEnv, sparkProperties) {
409409

410410
override def onDisconnected(rpcAddress: RpcAddress): Unit = {
@@ -427,4 +427,3 @@ private object KubernetesClusterSchedulerBackend {
427427
// and assume the executor failed for good, and attribute it to a framework fault.
428428
val MAX_EXECUTOR_LOST_REASON_CHECKS = 10
429429
}
430-

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala

+4-5
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{Register
3939
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
4040
import org.apache.spark.util.ThreadUtils
4141

42-
class KubernetesClusterSchedulerBackendSuite
43-
extends SparkFunSuite with BeforeAndAfter {
42+
class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAndAfter {
4443

4544
private val APP_ID = "test-spark-app"
4645
private val DRIVER_POD_NAME = "spark-driver-pod"
@@ -49,7 +48,7 @@ class KubernetesClusterSchedulerBackendSuite
4948
private val SPARK_DRIVER_PORT = 7077
5049
private val POD_ALLOCATION_INTERVAL = 60L
5150
private val DRIVER_URL = RpcEndpointAddress(
52-
SPARK_DRIVER_HOST, SPARK_DRIVER_PORT, CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
51+
SPARK_DRIVER_HOST, SPARK_DRIVER_PORT, CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
5352
private val FIRST_EXECUTOR_POD = new PodBuilder()
5453
.withNewMetadata()
5554
.withName("pod1")
@@ -75,9 +74,9 @@ class KubernetesClusterSchedulerBackendSuite
7574

7675
private type PODS = MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
7776
private type LABELED_PODS = FilterWatchListDeletable[
78-
Pod, PodList, java.lang.Boolean, Watch, Watcher[Pod]]
77+
Pod, PodList, java.lang.Boolean, Watch, Watcher[Pod]]
7978
private type IN_NAMESPACE_PODS = NonNamespaceOperation[
80-
Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
79+
Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
8180

8281
@Mock
8382
private var sparkContext: SparkContext = _

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

+5-4
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,16 @@ import java.util.concurrent._
2222
import java.util.concurrent.atomic.AtomicInteger
2323
import java.util.regex.Pattern
2424

25-
import org.apache.hadoop.yarn.api.records._
26-
import org.apache.hadoop.yarn.client.api.AMRMClient
27-
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
28-
import org.apache.hadoop.yarn.conf.YarnConfiguration
2925
import scala.collection.JavaConverters._
3026
import scala.collection.mutable
3127
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
3228
import scala.util.control.NonFatal
3329

30+
import org.apache.hadoop.yarn.api.records._
31+
import org.apache.hadoop.yarn.client.api.AMRMClient
32+
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
33+
import org.apache.hadoop.yarn.conf.YarnConfiguration
34+
3435
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
3536
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
3637
import org.apache.spark.deploy.yarn.config._

0 commit comments

Comments
 (0)