Skip to content

Commit 0fba119

Browse files
foxishash211
authored andcommitted
Add unit-testing for executorpodfactory (apache-spark-on-k8s#491)
* Unit test for executorpodfactory * Fix test * Indentation fix * Fix isEmpty and split between lines * Address issues with multi-line code fragments * Replace == with === * mock shuffleManager * .kubernetes. => .k8s. * move to k8s subdir * fix package clause to k8s * mock nodeAffinityExecutorPodModifier * remove commented code * move when clause to before{} block * mock initContainerBootstrap, smallFiles * insert actual logic into smallFiles mock * verify application of nodeAffinityExecutorPodModifier * avoid cumulative invocation * Fixed env-var check to include values, removed mock for small files (cherry picked from commit 887fdce)
1 parent 8314cbc commit 0fba119

File tree

1 file changed

+305
-0
lines changed

1 file changed

+305
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.scheduler.cluster.k8s
18+
19+
import scala.collection.JavaConverters._
20+
21+
import io.fabric8.kubernetes.api.model.{Pod, VolumeBuilder, VolumeMountBuilder, _}
22+
import io.fabric8.kubernetes.client.KubernetesClient
23+
import org.apache.commons.io.FilenameUtils
24+
import org.mockito.{AdditionalAnswers, MockitoAnnotations}
25+
import org.mockito.Matchers.{any, eq => mockitoEq}
26+
import org.mockito.Mockito._
27+
import org.mockito.invocation.InvocationOnMock
28+
import org.mockito.stubbing.Answer
29+
import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach}
30+
31+
import org.apache.spark.{SparkConf, SparkFunSuite}
32+
import org.apache.spark.deploy.k8s.{constants, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap}
33+
import org.apache.spark.deploy.k8s.config._
34+
import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrap, MountSmallFilesBootstrapImpl}
35+
36+
class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterEach {
37+
private val driverPodName: String = "driver-pod"
38+
private val driverPodUid: String = "driver-uid"
39+
private val driverUrl: String = "driver-url"
40+
private val executorPrefix: String = "base"
41+
private val executorImage: String = "executor-image"
42+
private val driverPod = new PodBuilder()
43+
.withNewMetadata()
44+
.withName(driverPodName)
45+
.withUid(driverPodUid)
46+
.endMetadata()
47+
.withNewSpec()
48+
.withNodeName("some-node")
49+
.endSpec()
50+
.withNewStatus()
51+
.withHostIP("192.168.99.100")
52+
.endStatus()
53+
.build()
54+
private var baseConf: SparkConf = _
55+
private val nodeAffinityExecutorPodModifier = mock(classOf[NodeAffinityExecutorPodModifier])
56+
57+
before {
58+
MockitoAnnotations.initMocks(this)
59+
baseConf = new SparkConf()
60+
.set(KUBERNETES_DRIVER_POD_NAME, driverPodName)
61+
.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, executorPrefix)
62+
.set(EXECUTOR_DOCKER_IMAGE, executorImage)
63+
}
64+
private var kubernetesClient: KubernetesClient = _
65+
66+
override def beforeEach(cmap: org.scalatest.ConfigMap) {
67+
reset(nodeAffinityExecutorPodModifier)
68+
when(nodeAffinityExecutorPodModifier.addNodeAffinityAnnotationIfUseful(
69+
any(classOf[Pod]),
70+
any(classOf[Map[String, Int]]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
71+
}
72+
73+
test("basic executor pod has reasonable defaults") {
74+
val factory = new ExecutorPodFactoryImpl(
75+
baseConf,
76+
nodeAffinityExecutorPodModifier,
77+
None,
78+
None,
79+
None,
80+
None,
81+
None)
82+
val executor = factory.createExecutorPod(
83+
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
84+
85+
verify(nodeAffinityExecutorPodModifier, times(1))
86+
.addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]]))
87+
88+
// The executor pod name and default labels.
89+
assert(executor.getMetadata.getName === s"$executorPrefix-exec-1")
90+
assert(executor.getMetadata.getLabels.size() === 3)
91+
92+
// There is exactly 1 container with no volume mounts and default memory limits.
93+
// Default memory limit is 1024M + 384M (minimum overhead constant).
94+
assert(executor.getSpec.getContainers.size() === 1)
95+
assert(executor.getSpec.getContainers.get(0).getImage === executorImage)
96+
assert(executor.getSpec.getContainers.get(0).getVolumeMounts.isEmpty)
97+
assert(executor.getSpec.getContainers.get(0).getResources.getLimits.size() === 1)
98+
assert(executor.getSpec.getContainers.get(0).getResources
99+
.getLimits.get("memory").getAmount === "1408Mi")
100+
101+
// The pod has no node selector, volumes.
102+
assert(executor.getSpec.getNodeSelector.isEmpty)
103+
assert(executor.getSpec.getVolumes.isEmpty)
104+
105+
checkEnv(executor, Map())
106+
checkOwnerReferences(executor, driverPodUid)
107+
}
108+
109+
test("executor pod hostnames get truncated to 63 characters") {
110+
val conf = baseConf.clone()
111+
conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX,
112+
"loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple")
113+
114+
val factory = new ExecutorPodFactoryImpl(
115+
conf, nodeAffinityExecutorPodModifier, None, None, None, None, None)
116+
val executor = factory.createExecutorPod(
117+
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
118+
119+
verify(nodeAffinityExecutorPodModifier, times(1))
120+
.addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]]))
121+
122+
assert(executor.getSpec.getHostname.length === 63)
123+
}
124+
125+
test("secrets get mounted") {
126+
val conf = baseConf.clone()
127+
128+
val secretsBootstrap = new MountSecretsBootstrapImpl(Map("secret1" -> "/var/secret1"))
129+
val factory = new ExecutorPodFactoryImpl(
130+
conf,
131+
nodeAffinityExecutorPodModifier,
132+
Some(secretsBootstrap),
133+
None,
134+
None,
135+
None,
136+
None)
137+
val executor = factory.createExecutorPod(
138+
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
139+
140+
verify(nodeAffinityExecutorPodModifier, times(1))
141+
.addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]]))
142+
143+
assert(executor.getSpec.getContainers.size() === 1)
144+
assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 1)
145+
assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName
146+
=== "secret1-volume")
147+
assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0)
148+
.getMountPath === "/var/secret1")
149+
150+
// check volume mounted.
151+
assert(executor.getSpec.getVolumes.size() === 1)
152+
assert(executor.getSpec.getVolumes.get(0).getSecret.getSecretName === "secret1")
153+
154+
checkOwnerReferences(executor, driverPodUid)
155+
}
156+
157+
test("init-container bootstrap step adds an init container") {
158+
val conf = baseConf.clone()
159+
val initContainerBootstrap = mock(classOf[SparkPodInitContainerBootstrap])
160+
when(initContainerBootstrap.bootstrapInitContainerAndVolumes(
161+
any(classOf[PodWithDetachedInitContainer]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
162+
163+
val factory = new ExecutorPodFactoryImpl(
164+
conf,
165+
nodeAffinityExecutorPodModifier,
166+
None,
167+
None,
168+
Some(initContainerBootstrap),
169+
None,
170+
None)
171+
val executor = factory.createExecutorPod(
172+
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
173+
174+
verify(nodeAffinityExecutorPodModifier, times(1))
175+
.addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]]))
176+
177+
assert(executor.getMetadata.getAnnotations.size() === 1)
178+
assert(executor.getMetadata.getAnnotations.containsKey(constants.INIT_CONTAINER_ANNOTATION))
179+
checkOwnerReferences(executor, driverPodUid)
180+
}
181+
182+
test("the shuffle-service adds a volume mount") {
183+
val conf = baseConf.clone()
184+
conf.set(KUBERNETES_SHUFFLE_LABELS, "label=value")
185+
conf.set(KUBERNETES_SHUFFLE_NAMESPACE, "default")
186+
conf.set(KUBERNETES_SHUFFLE_DIR, "/tmp")
187+
188+
val shuffleManager = mock(classOf[KubernetesExternalShuffleManager])
189+
when(shuffleManager.getExecutorShuffleDirVolumesWithMounts).thenReturn({
190+
val shuffleDirs = Seq("/tmp")
191+
shuffleDirs.zipWithIndex.map { case (shuffleDir, shuffleDirIndex) =>
192+
val volumeName = s"$shuffleDirIndex-${FilenameUtils.getBaseName(shuffleDir)}"
193+
val volume = new VolumeBuilder()
194+
.withName(volumeName)
195+
.withNewHostPath(shuffleDir)
196+
.build()
197+
val volumeMount = new VolumeMountBuilder()
198+
.withName(volumeName)
199+
.withMountPath(shuffleDir)
200+
.build()
201+
(volume, volumeMount)
202+
}
203+
})
204+
205+
val factory = new ExecutorPodFactoryImpl(
206+
conf,
207+
nodeAffinityExecutorPodModifier,
208+
None,
209+
None,
210+
None,
211+
None,
212+
Some(shuffleManager))
213+
val executor = factory.createExecutorPod(
214+
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
215+
216+
verify(nodeAffinityExecutorPodModifier, times(1))
217+
.addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]]))
218+
219+
assert(executor.getSpec.getContainers.size() === 1)
220+
assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 1)
221+
assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName === "0-tmp")
222+
assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0)
223+
.getMountPath === "/tmp")
224+
checkOwnerReferences(executor, driverPodUid)
225+
}
226+
227+
test("Small-files add a secret & secret volume mount to the container") {
228+
val conf = baseConf.clone()
229+
230+
val smallFiles = new MountSmallFilesBootstrapImpl("secret1", "/var/secret1")
231+
val factory = new ExecutorPodFactoryImpl(
232+
conf,
233+
nodeAffinityExecutorPodModifier,
234+
None,
235+
Some(smallFiles),
236+
None,
237+
None,
238+
None)
239+
val executor = factory.createExecutorPod(
240+
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
241+
242+
verify(nodeAffinityExecutorPodModifier, times(1))
243+
.addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]]))
244+
245+
assert(executor.getSpec.getContainers.size() === 1)
246+
assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 1)
247+
assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0)
248+
.getName === "submitted-files")
249+
assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0)
250+
.getMountPath === "/var/secret1")
251+
252+
assert(executor.getSpec.getVolumes.size() === 1)
253+
assert(executor.getSpec.getVolumes.get(0).getSecret.getSecretName === "secret1")
254+
255+
checkOwnerReferences(executor, driverPodUid)
256+
checkEnv(executor, Map("SPARK_MOUNTED_FILES_FROM_SECRET_DIR" -> "/var/secret1"))
257+
}
258+
259+
test("classpath and extra java options get translated into environment variables") {
260+
val conf = baseConf.clone()
261+
conf.set(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS, "foo=bar")
262+
conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz")
263+
264+
val factory = new ExecutorPodFactoryImpl(
265+
conf, nodeAffinityExecutorPodModifier, None, None, None, None, None)
266+
val executor = factory.createExecutorPod(
267+
"1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]())
268+
269+
verify(nodeAffinityExecutorPodModifier, times(1))
270+
.addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]]))
271+
272+
checkEnv(executor,
273+
Map("SPARK_JAVA_OPT_0" -> "foo=bar",
274+
"SPARK_EXECUTOR_EXTRA_CLASSPATH" -> "bar=baz",
275+
"qux" -> "quux"))
276+
checkOwnerReferences(executor, driverPodUid)
277+
}
278+
279+
// There is always exactly one controller reference, and it points to the driver pod.
280+
private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
281+
assert(executor.getMetadata.getOwnerReferences.size() === 1)
282+
assert(executor.getMetadata.getOwnerReferences.get(0).getUid === driverPodUid)
283+
assert(executor.getMetadata.getOwnerReferences.get(0).getController === true)
284+
}
285+
286+
// Check that the expected environment variables are present.
287+
private def checkEnv(executor: Pod, additionalEnvVars: Map[String, String]): Unit = {
288+
val defaultEnvs = Map(
289+
constants.ENV_EXECUTOR_ID -> "1",
290+
constants.ENV_DRIVER_URL -> "dummy",
291+
constants.ENV_EXECUTOR_CORES -> "1",
292+
constants.ENV_EXECUTOR_MEMORY -> "1g",
293+
constants.ENV_APPLICATION_ID -> "dummy",
294+
constants.ENV_MOUNTED_CLASSPATH -> "/var/spark-data/spark-jars/*",
295+
constants.ENV_EXECUTOR_POD_IP -> null,
296+
constants.ENV_EXECUTOR_PORT -> "10000") ++ additionalEnvVars
297+
298+
assert(executor.getSpec.getContainers.size() === 1)
299+
assert(executor.getSpec.getContainers.get(0).getEnv().size() === defaultEnvs.size)
300+
val mapEnvs = executor.getSpec.getContainers.get(0).getEnv.asScala.map {
301+
x => (x.getName, x.getValue)
302+
}.toMap
303+
assert(defaultEnvs === mapEnvs)
304+
}
305+
}

0 commit comments

Comments
 (0)