1
+ /*
2
+ * Licensed to the Apache Software Foundation (ASF) under one or more
3
+ * contributor license agreements. See the NOTICE file distributed with
4
+ * this work for additional information regarding copyright ownership.
5
+ * The ASF licenses this file to You under the Apache License, Version 2.0
6
+ * (the "License"); you may not use this file except in compliance with
7
+ * the License. You may obtain a copy of the License at
8
+ *
9
+ * http://www.apache.org/licenses/LICENSE-2.0
10
+ *
11
+ * Unless required by applicable law or agreed to in writing, software
12
+ * distributed under the License is distributed on an "AS IS" BASIS,
13
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
+ * See the License for the specific language governing permissions and
15
+ * limitations under the License.
16
+ */
17
+ package org .apache .spark .scheduler .cluster .kubernetes
18
+
19
+ import scala .collection .JavaConverters ._
20
+
21
+ import io .fabric8 .kubernetes .api .model ._
22
+ import io .fabric8 .kubernetes .client .KubernetesClient
23
+ import org .mockito .MockitoAnnotations
24
+ import org .scalatest .BeforeAndAfter
25
+
26
+ import org .apache .spark .{SparkConf , SparkContext , SparkFunSuite }
27
+ import org .apache .spark .deploy .kubernetes .{constants , SparkPodInitContainerBootstrapImpl }
28
+ import org .apache .spark .deploy .kubernetes .config ._
29
+ import org .apache .spark .deploy .kubernetes .submit .{MountSecretsBootstrapImpl , MountSmallFilesBootstrapImpl }
30
+ import org .apache .spark .network .netty .SparkTransportConf
31
+ import org .apache .spark .network .shuffle .kubernetes .KubernetesExternalShuffleClientImpl
32
+
33
+ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter {
34
+ private val driverPodName : String = " driver-pod"
35
+ private val driverPodUid : String = " driver-uid"
36
+ private val driverUrl : String = " driver-url"
37
+ private val executorPrefix : String = " base"
38
+ private val executorImage : String = " executor-image"
39
+ private val driverPod = new PodBuilder ()
40
+ .withNewMetadata()
41
+ .withName(driverPodName)
42
+ .withUid(driverPodUid)
43
+ .endMetadata()
44
+ .withNewSpec()
45
+ .withNodeName(" some-node" )
46
+ .endSpec()
47
+ .withNewStatus()
48
+ .withHostIP(" 192.168.99.100" )
49
+ .endStatus()
50
+ .build()
51
+ private var baseConf : SparkConf = _
52
+ private var sc : SparkContext = _
53
+
54
+ before {
55
+ SparkContext .clearActiveContext()
56
+ MockitoAnnotations .initMocks(this )
57
+ baseConf = new SparkConf ()
58
+ .set(KUBERNETES_DRIVER_POD_NAME , driverPodName)
59
+ .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX , executorPrefix)
60
+ .set(EXECUTOR_DOCKER_IMAGE , executorImage)
61
+ sc = new SparkContext (" local" , " test" )
62
+ }
63
+ private var kubernetesClient : KubernetesClient = _
64
+
65
+ test(" basic executor pod has reasonable defaults" ) {
66
+ val factory = new ExecutorPodFactoryImpl (baseConf,
67
+ NodeAffinityExecutorPodModifierImpl , None , None , None , None , None )
68
+ val executor = factory.createExecutorPod(" 1" , " dummy" , " dummy" ,
69
+ Seq [(String , String )](), driverPod, Map [String , Int ]())
70
+
71
+ // The executor pod name and default labels.
72
+ assert(executor.getMetadata.getName == s " $executorPrefix-exec-1 " )
73
+ assert(executor.getMetadata.getLabels.size() == 3 )
74
+
75
+ // There is exactly 1 container with no volume mounts and default memory limits.
76
+ // Default memory limit is 1024M + 384M (minimum overhead constant).
77
+ assert(executor.getSpec.getContainers.size() == 1 )
78
+ assert(executor.getSpec.getContainers.get(0 ).getImage == executorImage)
79
+ assert(executor.getSpec.getContainers.get(0 ).getVolumeMounts.size() == 0 )
80
+ assert(executor.getSpec.getContainers.get(0 ).getResources.getLimits.size() == 1 )
81
+ assert(executor.getSpec.getContainers.get(0 ).getResources.
82
+ getLimits.get(" memory" ).getAmount == " 1408Mi" )
83
+
84
+ // The pod has no node selector, volumes.
85
+ assert(executor.getSpec.getNodeSelector.size() == 0 )
86
+ assert(executor.getSpec.getVolumes.size() == 0 )
87
+
88
+ checkEnv(executor, Set ())
89
+ checkOwnerReferences(executor, driverPodUid)
90
+ }
91
+
92
+ test(" executor pod hostnames get truncated to 63 characters" ) {
93
+ val conf = baseConf.clone()
94
+ conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX ,
95
+ " loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple" )
96
+
97
+ val factory = new ExecutorPodFactoryImpl (conf,
98
+ NodeAffinityExecutorPodModifierImpl , None , None , None , None , None )
99
+ val executor = factory.createExecutorPod(" 1" ,
100
+ " dummy" , " dummy" , Seq [(String , String )](), driverPod, Map [String , Int ]())
101
+
102
+ assert(executor.getSpec.getHostname.length == 63 )
103
+ }
104
+
105
+ test(" secrets get mounted" ) {
106
+ val conf = baseConf.clone()
107
+
108
+ val secretsBootstrap = new MountSecretsBootstrapImpl (Map (" secret1" -> " /var/secret1" ))
109
+ val factory = new ExecutorPodFactoryImpl (conf,
110
+ NodeAffinityExecutorPodModifierImpl , Some (secretsBootstrap), None , None , None , None )
111
+ val executor = factory.createExecutorPod(" 1" ,
112
+ " dummy" , " dummy" , Seq [(String , String )](), driverPod, Map [String , Int ]())
113
+
114
+ assert(executor.getSpec.getContainers.size() == 1 )
115
+ assert(executor.getSpec.getContainers.get(0 ).getVolumeMounts.size() == 1 )
116
+ assert(executor.getSpec.getContainers.get(0 ).getVolumeMounts.get(0 ).getName == " secret1-volume" )
117
+ assert(executor.getSpec.getContainers.get(0 ).getVolumeMounts.get(0 ).
118
+ getMountPath == " /var/secret1" )
119
+
120
+ // check volume mounted.
121
+ assert(executor.getSpec.getVolumes.size() == 1 )
122
+ assert(executor.getSpec.getVolumes.get(0 ).getSecret.getSecretName == " secret1" )
123
+
124
+ checkOwnerReferences(executor, driverPodUid)
125
+ }
126
+
127
+ test(" init-container bootstrap step adds an init container" ) {
128
+ val conf = baseConf.clone()
129
+
130
+ val initContainerBootstrap = new SparkPodInitContainerBootstrapImpl (" init-image" ,
131
+ " IfNotPresent" , " /some/path/" , " some/other/path" , 10 , " config-map-name" , " config-map-key" )
132
+ val factory = new ExecutorPodFactoryImpl (conf,
133
+ NodeAffinityExecutorPodModifierImpl , None , None , Some (initContainerBootstrap), None , None )
134
+ val executor = factory.createExecutorPod(" 1" ,
135
+ " dummy" , " dummy" , Seq [(String , String )](), driverPod, Map [String , Int ]())
136
+
137
+ assert(executor.getMetadata.getAnnotations.size() == 1 )
138
+ assert(executor.getMetadata.getAnnotations.containsKey(constants.INIT_CONTAINER_ANNOTATION ))
139
+ checkOwnerReferences(executor, driverPodUid)
140
+ }
141
+
142
+ test(" the shuffle-service adds a volume mount" ) {
143
+ val conf = baseConf.clone()
144
+ conf.set(KUBERNETES_SHUFFLE_LABELS , " label=value" )
145
+ conf.set(KUBERNETES_SHUFFLE_NAMESPACE , " default" )
146
+
147
+ val kubernetesExternalShuffleClient = new KubernetesExternalShuffleClientImpl (
148
+ SparkTransportConf .fromSparkConf(conf, " shuffle" ),
149
+ sc.env.securityManager,
150
+ sc.env.securityManager.isAuthenticationEnabled())
151
+ val shuffleManager = new KubernetesExternalShuffleManagerImpl (conf,
152
+ kubernetesClient, kubernetesExternalShuffleClient)
153
+ val factory = new ExecutorPodFactoryImpl (conf,
154
+ NodeAffinityExecutorPodModifierImpl , None , None , None , None , Some (shuffleManager))
155
+ val executor = factory.createExecutorPod(" 1" ,
156
+ " dummy" , " dummy" , Seq [(String , String )](), driverPod, Map [String , Int ]())
157
+
158
+
159
+ assert(executor.getSpec.getContainers.size() == 1 )
160
+ assert(executor.getSpec.getContainers.get(0 ).getVolumeMounts.size() == 1 )
161
+ assert(executor.getSpec.getContainers.get(0 ).getVolumeMounts.get(0 ).getName == " 0-tmp" )
162
+ assert(executor.getSpec.getContainers.get(0 ).getVolumeMounts.get(0 ).
163
+ getMountPath == " /tmp" )
164
+ checkOwnerReferences(executor, driverPodUid)
165
+ }
166
+
167
+ test(" Small-files add a secret & secret volume mount to the container" ) {
168
+ val conf = baseConf.clone()
169
+ val smallFiles = new MountSmallFilesBootstrapImpl (" secret1" , " /var/secret1" )
170
+
171
+ val factory = new ExecutorPodFactoryImpl (conf,
172
+ NodeAffinityExecutorPodModifierImpl , None , Some (smallFiles), None , None , None )
173
+ val executor = factory.createExecutorPod(" 1" ,
174
+ " dummy" , " dummy" , Seq [(String , String )](), driverPod, Map [String , Int ]())
175
+
176
+
177
+ assert(executor.getSpec.getContainers.size() == 1 )
178
+ assert(executor.getSpec.getContainers.get(0 ).getVolumeMounts.size() == 1 )
179
+ assert(executor.getSpec.getContainers.get(0 ).getVolumeMounts.get(0 )
180
+ .getName == " submitted-files" )
181
+ assert(executor.getSpec.getContainers.get(0 ).getVolumeMounts.get(0 )
182
+ .getMountPath == " /var/secret1" )
183
+
184
+ assert(executor.getSpec.getVolumes.size() == 1 )
185
+ assert(executor.getSpec.getVolumes.get(0 ).getSecret.getSecretName == " secret1" )
186
+
187
+ checkOwnerReferences(executor, driverPodUid)
188
+ checkEnv(executor, Set ())
189
+ }
190
+
191
+ test(" classpath and extra java options get translated into environment variables" ) {
192
+ val conf = baseConf.clone()
193
+ conf.set(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS , " foo=bar" )
194
+ conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH , " bar=baz" )
195
+
196
+ val factory = new ExecutorPodFactoryImpl (conf,
197
+ NodeAffinityExecutorPodModifierImpl , None , None , None , None , None )
198
+ val executor = factory.createExecutorPod(" 1" ,
199
+ " dummy" , " dummy" , Seq [(String , String )](" qux" -> " quux" ), driverPod, Map [String , Int ]())
200
+
201
+ checkEnv(executor, Set (" SPARK_JAVA_OPT_0" , " SPARK_EXECUTOR_EXTRA_CLASSPATH" , " qux" ))
202
+ checkOwnerReferences(executor, driverPodUid)
203
+ }
204
+
205
+ // There is always exactly one controller reference, and it points to the driver pod.
206
+ private def checkOwnerReferences (executor : Pod , driverPodUid : String ): Unit = {
207
+ assert(executor.getMetadata.getOwnerReferences.size() == 1 )
208
+ assert(executor.getMetadata.getOwnerReferences.get(0 ).getUid == driverPodUid)
209
+ assert(executor.getMetadata.getOwnerReferences.get(0 ).getController == true )
210
+ }
211
+
212
+ // Check that the expected environment variables are present.
213
+ private def checkEnv (executor : Pod , additionalEnvVars : Set [String ]): Unit = {
214
+ val defaultEnvs = Set (constants.ENV_EXECUTOR_ID ,
215
+ constants.ENV_DRIVER_URL , constants.ENV_EXECUTOR_CORES ,
216
+ constants.ENV_EXECUTOR_MEMORY , constants.ENV_APPLICATION_ID ,
217
+ constants.ENV_MOUNTED_CLASSPATH , constants.ENV_EXECUTOR_POD_IP ,
218
+ constants.ENV_EXECUTOR_PORT ) ++ additionalEnvVars
219
+
220
+ assert(executor.getSpec.getContainers.size() == 1 )
221
+ assert(executor.getSpec.getContainers.get(0 ).getEnv().size() == defaultEnvs.size)
222
+ val setEnvs = executor.getSpec.getContainers.get(0 ).getEnv.asScala.map {
223
+ x => x.getName
224
+ }.toSet
225
+ assert(defaultEnvs == setEnvs)
226
+ }
227
+ }
0 commit comments