@@ -20,8 +20,15 @@ import scala.collection.JavaConverters._
20
20
21
21
import io .fabric8 .kubernetes .api .model ._
22
22
import io .fabric8 .kubernetes .client .KubernetesClient
23
- import org .mockito .MockitoAnnotations
23
+ import io .fabric8 .kubernetes .api .model .{Pod , Volume , VolumeBuilder , VolumeMount , VolumeMountBuilder }
24
+
24
25
import org .scalatest .BeforeAndAfter
26
+ import org .mockito .{AdditionalAnswers , ArgumentCaptor , Mock , MockitoAnnotations }
27
+ import org .mockito .Matchers .{any , eq => mockitoEq }
28
+ import org .mockito .Mockito .{doNothing , never , times , verify , when , mock }
29
+ import org .scalatest .mock .MockitoSugar ._
30
+
31
+ import org .apache .commons .io .FilenameUtils
25
32
26
33
import org .apache .spark .{SparkConf , SparkContext , SparkFunSuite }
27
34
import org .apache .spark .deploy .kubernetes .{constants , SparkPodInitContainerBootstrapImpl }
@@ -49,7 +56,7 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter {
49
56
.endStatus()
50
57
.build()
51
58
private var baseConf : SparkConf = _
52
- private var sc : SparkContext = _
59
+ // private var sc: SparkContext = mock(classOf[SparkContext])
53
60
54
61
before {
55
62
SparkContext .clearActiveContext()
@@ -58,7 +65,7 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter {
58
65
.set(KUBERNETES_DRIVER_POD_NAME , driverPodName)
59
66
.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX , executorPrefix)
60
67
.set(EXECUTOR_DOCKER_IMAGE , executorImage)
61
- sc = new SparkContext (" local" , " test" )
68
+ // sc = new SparkContext("local", "test")
62
69
}
63
70
private var kubernetesClient : KubernetesClient = _
64
71
@@ -170,12 +177,32 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter {
170
177
conf.set(KUBERNETES_SHUFFLE_NAMESPACE , " default" )
171
178
conf.set(KUBERNETES_SHUFFLE_DIR , " /tmp" )
172
179
180
+ /*
173
181
val kubernetesExternalShuffleClient = new KubernetesExternalShuffleClientImpl(
174
182
SparkTransportConf.fromSparkConf(conf, "shuffle"),
175
183
sc.env.securityManager,
176
184
sc.env.securityManager.isAuthenticationEnabled())
177
185
val shuffleManager = new KubernetesExternalShuffleManagerImpl(
178
186
conf, kubernetesClient, kubernetesExternalShuffleClient)
187
+ */
188
+
189
+ val shuffleManager = mock(classOf [KubernetesExternalShuffleManager ])
190
+ when(shuffleManager.getExecutorShuffleDirVolumesWithMounts).thenReturn({
191
+ val shuffleDirs = Seq (" /tmp" )
192
+ shuffleDirs.zipWithIndex.map { case (shuffleDir, shuffleDirIndex) =>
193
+ val volumeName = s " $shuffleDirIndex- ${FilenameUtils .getBaseName(shuffleDir)}"
194
+ val volume = new VolumeBuilder ()
195
+ .withName(volumeName)
196
+ .withNewHostPath(shuffleDir)
197
+ .build()
198
+ val volumeMount = new VolumeMountBuilder ()
199
+ .withName(volumeName)
200
+ .withMountPath(shuffleDir)
201
+ .build()
202
+ (volume, volumeMount)
203
+ }
204
+ })
205
+
179
206
val factory = new ExecutorPodFactoryImpl (
180
207
conf,
181
208
NodeAffinityExecutorPodModifierImpl ,
0 commit comments