diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 819ea4a7b13d0..89134fad94542 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -356,6 +356,13 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val EXECUTOR_POD_CONTROLLER_CLASS = + ConfigBuilder("spark.kubernetes.executor.podController.class") + .doc("Experimental. Specify a class that can handle the creation " + + "and deletion of pods") + .stringConf + .createOptional + val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets." diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodController.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodController.scala new file mode 100644 index 0000000000000..e91f6f186f40a --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodController.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util + +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.KubernetesClient + + +/** + * Responsible for the creation and deletion of Pods as per the + * request of the ExecutorPodAllocator, ExecutorPodLifecycleManager, and the + * KubernetesClusterSchedulerBackend. The default implementation: + * ExecutorPodControllerImpl communicates directly + * with the KubernetesClient to create Pods. This class can be extended + * to have your communication be done with a unique CRD that satisfies + * your specific SLA and security concerns. + */ +private[spark] trait ExecutorPodController { + + def initialize(kubernetesClient: KubernetesClient): Unit + + def addPod(pod: Pod): Unit + + def commitAndGetTotalAllocated(): Int + + def removePod(pod: Pod): Unit + + def removePods(pods: util.List[Pod]): Boolean +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodControllerImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodControllerImpl.scala new file mode 100644 index 0000000000000..b77758c59b372 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodControllerImpl.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util + +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodControllerImpl( + val conf: SparkConf) + extends ExecutorPodController { + + private var kubernetesClient: KubernetesClient = _ + + private var numAdded: Int = _ + + override def initialize(kClient: KubernetesClient) : Unit = { + kubernetesClient = kClient + numAdded = 0 + } + override def addPod(pod: Pod): Unit = { + kubernetesClient.pods().create(pod) + synchronized { + numAdded += 1 + } + } + + override def commitAndGetTotalAllocated(): Int = { + val totalNumAdded = numAdded + synchronized { + numAdded = 0 + } + totalNumAdded + } + + override def removePod(pod: Pod): Unit = { + // If deletion failed on a previous try, we can try again if resync informs us the pod + // is still around. + // Delete as best attempt - duplicate deletes will throw an exception but the end state + // of getting rid of the pod is what matters. + Utils.tryLogNonFatalError { + kubernetesClient + .pods() + .withName(pod.getMetadata.getName) + .delete() + } + } + + override def removePods(pods: util.List[Pod]): Boolean = { + kubernetesClient.pods().delete(pods) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 2201bf91d3905..b302e90b2e7f7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -35,6 +35,7 @@ private[spark] class ExecutorPodsAllocator( executorBuilder: KubernetesExecutorBuilder, kubernetesClient: KubernetesClient, snapshotsStore: ExecutorPodsSnapshotsStore, + executorPodController: ExecutorPodController, clock: Clock) extends Logging { private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) @@ -115,12 +116,12 @@ private[spark] class ExecutorPodsAllocator( newlyCreatedExecutors --= timedOut if (shouldDeleteExecutors) { Utils.tryLogNonFatalError { - kubernetesClient + val pods = kubernetesClient .pods() .withLabel(SPARK_APP_ID_LABEL, applicationId) .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) .withLabelIn(SPARK_EXECUTOR_ID_LABEL, timedOut.toSeq.map(_.toString): _*) - .delete() + executorPodController.removePods(pods.list().getItems) } } } @@ -170,13 +171,13 @@ private[spark] class ExecutorPodsAllocator( if (toDelete.nonEmpty) { logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).") Utils.tryLogNonFatalError { - kubernetesClient + val pods = kubernetesClient .pods() .withField("status.phase", "Pending") .withLabel(SPARK_APP_ID_LABEL, applicationId) .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) .withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*) - .delete() + executorPodController.removePods(pods.list().getItems) newlyCreatedExecutors --= toDelete knownPendingCount -= knownPendingToDelete.size } @@ -203,7 +204,7 @@ private[spark] class ExecutorPodsAllocator( .addToContainers(executorPod.container) .endSpec() .build() - kubernetesClient.pods().create(podWithAttachedContainer) + executorPodController.addPod(podWithAttachedContainer) newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis() logDebug(s"Requested executor with id $newExecutorId from Kubernetes.") } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala index d6b75824d05d8..62778d889e3ea 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala @@ -27,12 +27,12 @@ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.KubernetesUtils._ import org.apache.spark.internal.Logging import org.apache.spark.scheduler.ExecutorExited -import org.apache.spark.util.Utils private[spark] class ExecutorPodsLifecycleManager( val conf: SparkConf, kubernetesClient: KubernetesClient, snapshotsStore: ExecutorPodsSnapshotsStore, + executorPodController: ExecutorPodController, // Use a best-effort to track which executors have been removed already. It's not generally // job-breaking if we remove executors more than once but it's ideal if we make an attempt // to avoid doing so. Expire cache entries so that this data structure doesn't grow beyond @@ -125,16 +125,7 @@ private[spark] class ExecutorPodsLifecycleManager( } private def removeExecutorFromK8s(updatedPod: Pod): Unit = { - // If deletion failed on a previous try, we can try again if resync informs us the pod - // is still around. - // Delete as best attempt - duplicate deletes will throw an exception but the end state - // of getting rid of the pod is what matters. - Utils.tryLogNonFatalError { - kubernetesClient - .pods() - .withName(updatedPod.getMetadata.getName) - .delete() - } + executorPodController.removePod(updatedPod) } private def removeExecutorFromSpark( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index c591214d10796..fdca2071ac87b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -22,13 +22,13 @@ import java.util.concurrent.TimeUnit import com.google.common.cache.CacheBuilder import io.fabric8.kubernetes.client.Config -import org.apache.spark.SparkContext +import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkKubernetesClientFactory} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} -import org.apache.spark.util.{SystemClock, ThreadUtils} +import org.apache.spark.util.{SystemClock, ThreadUtils, Utils} private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging { @@ -98,10 +98,26 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit val removedExecutorsCache = CacheBuilder.newBuilder() .expireAfterWrite(3, TimeUnit.MINUTES) .build[java.lang.Long, java.lang.Long]() + + val executorPodControllers = sc.conf.get(EXECUTOR_POD_CONTROLLER_CLASS) + .map(clazz => Utils.loadExtensions( + classOf[ExecutorPodController], + Seq(clazz), sc.conf)) + .getOrElse(Seq(new ExecutorPodControllerImpl(sc.conf))) + + if (executorPodControllers.size > 1) { + throw new SparkException( + s"Multiple executorPodControllers listed: $executorPodControllers") + } + val executorPodController = executorPodControllers.head + + executorPodController.initialize(kubernetesClient) + val executorPodsLifecycleEventHandler = new ExecutorPodsLifecycleManager( sc.conf, kubernetesClient, snapshotsStore, + executorPodController, removedExecutorsCache) val executorPodsAllocator = new ExecutorPodsAllocator( @@ -110,6 +126,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit new KubernetesExecutorBuilder(), kubernetesClient, snapshotsStore, + executorPodController, new SystemClock()) val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource( @@ -129,6 +146,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit snapshotsStore, executorPodsAllocator, executorPodsLifecycleEventHandler, + executorPodController, podsWatchEventSource, podsPollingEventSource) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index e221a926daca8..4dbc45bf8f77b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -40,6 +40,7 @@ private[spark] class KubernetesClusterSchedulerBackend( snapshotsStore: ExecutorPodsSnapshotsStore, podAllocator: ExecutorPodsAllocator, lifecycleEventHandler: ExecutorPodsLifecycleManager, + executorPodController: ExecutorPodController, watchEvents: ExecutorPodsWatchSnapshotSource, pollEvents: ExecutorPodsPollingSnapshotSource) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { @@ -99,11 +100,11 @@ private[spark] class KubernetesClusterSchedulerBackend( if (shouldDeleteExecutors) { Utils.tryLogNonFatalError { - kubernetesClient + val pods = kubernetesClient .pods() .withLabel(SPARK_APP_ID_LABEL, applicationId()) .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) - .delete() + executorPodController.removePods(pods.list().getItems) } } @@ -146,11 +147,13 @@ private[spark] class KubernetesClusterSchedulerBackend( .withLabel(SPARK_APP_ID_LABEL, applicationId()) .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) .withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*) + .list() + .getItems() - if (!running.list().getItems().isEmpty()) { - logInfo(s"Forcefully deleting ${running.list().getItems().size()} pods " + + if (!running.isEmpty()) { + logInfo(s"Forcefully deleting ${running.size()} pods " + s"(out of ${executorIds.size}) that are still running after graceful shutdown period.") - running.delete() + executorPodController.removePods(running) } } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodControllerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodControllerSuite.scala new file mode 100644 index 0000000000000..9580d4893687a --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodControllerSuite.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.dsl.PodResource +import org.mockito.{Mock, MockitoAnnotations} +import org.mockito.Mockito.{times, verify, when} +import org.scalatest.BeforeAndAfter +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.Fabric8Aliases.PODS + +class ExecutorPodControllerSuite extends SparkFunSuite with BeforeAndAfter { + + private var executorPodController: ExecutorPodController = _ + + private val sparkConf = new SparkConf(false) + + private val execExampleId = "exec-id" + + private def buildPod(execId: String ): Pod = { + new PodBuilder() + .withNewMetadata() + .withName(execId) + .endMetadata() + .build() + } + + private val execPod = buildPod(execExampleId) + + @Mock + private var kubernetesClient: KubernetesClient = _ + + @Mock + private var podOperations: PODS = _ + + @Mock + private var execPodOperations: PodResource[Pod, DoneablePod] = _ + + + before { + MockitoAnnotations.initMocks(this) + when(kubernetesClient.pods()).thenReturn(podOperations) + when(podOperations.withName(execExampleId)) + .thenReturn(execPodOperations) + executorPodController = new ExecutorPodControllerImpl(sparkConf) + executorPodController.initialize(kubernetesClient) + } + + test("Adding a pod and watching counter go up correctly") { + val numAllocated = 5 + for ( _ <- 0 until numAllocated) { + executorPodController.addPod(execPod) + } + verify(podOperations, times(numAllocated)).create(execPod) + assert(executorPodController.commitAndGetTotalAllocated() == numAllocated) + assert(executorPodController.commitAndGetTotalAllocated() == 0) + executorPodController.addPod(execPod) + assert(executorPodController.commitAndGetTotalAllocated() == 1) + } + + test("Remove a single pod") { + executorPodController.removePod(execPod) + verify(execPodOperations).delete() + } + + test("Remove a list of pods") { + val execList = (1 to 3).map { num => + buildPod(s"$execExampleId-$num") + } + executorPodController.removePods(execList.asJava) + verify(podOperations).delete(execList.asJava) + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 4475d5db6f03a..dbbe87852ef4c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -16,12 +16,14 @@ */ package org.apache.spark.scheduler.cluster.k8s -import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder} +import java.util + +import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder, PodList} import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.dsl.PodResource import org.mockito.{Mock, MockitoAnnotations} import org.mockito.ArgumentMatchers.{any, eq => meq} -import org.mockito.Mockito.{never, times, verify, when} +import org.mockito.Mockito.{doNothing, never, times, verify, when} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.scalatest.BeforeAndAfter @@ -65,12 +67,21 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { @Mock private var labeledPods: LABELED_PODS = _ + @Mock + private var podList: PodList = _ + + @Mock + private var pods: util.List[Pod] = _ + @Mock private var driverPodOperations: PodResource[Pod, DoneablePod] = _ @Mock private var executorBuilder: KubernetesExecutorBuilder = _ + @Mock + private var executorPodController: ExecutorPodController = _ + private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _ private var podsAllocatorUnderTest: ExecutorPodsAllocator = _ @@ -84,8 +95,10 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { meq(kubernetesClient))).thenAnswer(executorPodAnswer()) snapshotsStore = new DeterministicExecutorPodsSnapshotsStore() waitForExecutorPodsClock = new ManualClock(0L) + doNothing().when(executorPodController).initialize(kubernetesClient) podsAllocatorUnderTest = new ExecutorPodsAllocator( - conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + conf, secMgr, executorBuilder, kubernetesClient, + snapshotsStore, executorPodController, waitForExecutorPodsClock) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID) } @@ -93,9 +106,10 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { " first has not finished.") { podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize + 1) for (nextId <- 1 to podAllocationSize) { - verify(podOperations).create(podWithAttachedContainerForId(nextId)) + verify(executorPodController).addPod(podWithAttachedContainerForId(nextId)) } - verify(podOperations, never()).create(podWithAttachedContainerForId(podAllocationSize + 1)) + verify(executorPodController, + never()).addPod(podWithAttachedContainerForId(podAllocationSize + 1)) } test("Request executors in batches. Allow another batch to be requested if" + @@ -105,13 +119,14 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(runningExecutor(execId)) } snapshotsStore.notifySubscribers() - verify(podOperations, never()).create(podWithAttachedContainerForId(podAllocationSize + 1)) + verify(executorPodController, + never()).addPod(podWithAttachedContainerForId(podAllocationSize + 1)) snapshotsStore.updatePod(runningExecutor(podAllocationSize)) snapshotsStore.notifySubscribers() - verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 1)) + verify(executorPodController).addPod(podWithAttachedContainerForId(podAllocationSize + 1)) snapshotsStore.updatePod(runningExecutor(podAllocationSize)) snapshotsStore.notifySubscribers() - verify(podOperations, times(podAllocationSize + 1)).create(any(classOf[Pod])) + verify(executorPodController, times(podAllocationSize + 1)).addPod(any(classOf[Pod])) } test("When a current batch reaches error states immediately, re-request" + @@ -123,7 +138,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { val failedPod = failedExecutorWithoutDeletion(podAllocationSize) snapshotsStore.updatePod(failedPod) snapshotsStore.notifySubscribers() - verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 1)) + verify(executorPodController).addPod(podWithAttachedContainerForId(podAllocationSize + 1)) } test("When an executor is requested but the API does not report it in a reasonable time, retry" + @@ -137,12 +152,14 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { when(podOperations .withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1")) .thenReturn(labeledPods) + when(labeledPods.list()).thenReturn(podList) + when(podList.getItems).thenReturn(pods) podsAllocatorUnderTest.setTotalExpectedExecutors(1) verify(podOperations).create(podWithAttachedContainerForId(1)) waitForExecutorPodsClock.setTime(podCreationTimeout + 1) snapshotsStore.notifySubscribers() - verify(labeledPods).delete() - verify(podOperations).create(podWithAttachedContainerForId(2)) + verify(executorPodController).removePods(pods) + verify(executorPodController).addPod(podWithAttachedContainerForId(2)) } test("SPARK-28487: scale up and down on target executor count changes") { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala index 9920f4d3ea737..b5c726e8f125b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala @@ -17,14 +17,11 @@ package org.apache.spark.scheduler.cluster.k8s import com.google.common.cache.CacheBuilder -import io.fabric8.kubernetes.api.model.{DoneablePod, Pod} +import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.KubernetesClient -import io.fabric8.kubernetes.client.dsl.PodResource import org.mockito.{Mock, MockitoAnnotations} import org.mockito.ArgumentMatchers.any -import org.mockito.Mockito.{mock, never, times, verify, when} -import org.mockito.invocation.InvocationOnMock -import org.mockito.stubbing.Answer +import org.mockito.Mockito.{doNothing, never, times, verify, when} import org.scalatest.BeforeAndAfter import scala.collection.mutable @@ -37,8 +34,6 @@ import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfter { - private var namedExecutorPods: mutable.Map[String, PodResource[Pod, DoneablePod]] = _ - @Mock private var kubernetesClient: KubernetesClient = _ @@ -48,6 +43,9 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte @Mock private var schedulerBackend: KubernetesClusterSchedulerBackend = _ + @Mock + private var executorPodController: ExecutorPodController = _ + private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _ private var eventHandlerUnderTest: ExecutorPodsLifecycleManager = _ @@ -55,14 +53,14 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte MockitoAnnotations.initMocks(this) val removedExecutorsCache = CacheBuilder.newBuilder().build[java.lang.Long, java.lang.Long] snapshotsStore = new DeterministicExecutorPodsSnapshotsStore() - namedExecutorPods = mutable.Map.empty[String, PodResource[Pod, DoneablePod]] when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty[String]) when(kubernetesClient.pods()).thenReturn(podOperations) - when(podOperations.withName(any(classOf[String]))).thenAnswer(namedPodsAnswer()) + doNothing().when(executorPodController).initialize(kubernetesClient) eventHandlerUnderTest = new ExecutorPodsLifecycleManager( new SparkConf(), kubernetesClient, snapshotsStore, + executorPodController, removedExecutorsCache) eventHandlerUnderTest.start(schedulerBackend) } @@ -74,7 +72,7 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte val msg = exitReasonMessage(1, failedPod) val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg) verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason) - verify(namedExecutorPods(failedPod.getMetadata.getName)).delete() + verify(executorPodController).removePod(any(classOf[Pod])) } test("Don't remove executors twice from Spark but remove from K8s repeatedly.") { @@ -85,7 +83,7 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte val msg = exitReasonMessage(1, failedPod) val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg) verify(schedulerBackend, times(1)).doRemoveExecutor("1", expectedLossReason) - verify(namedExecutorPods(failedPod.getMetadata.getName), times(2)).delete() + verify(executorPodController, times(2)).removePod(any(classOf[Pod])) } test("When the scheduler backend lists executor ids that aren't present in the cluster," + @@ -108,7 +106,7 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte val msg = exitReasonMessage(1, failedPod) val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg) verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason) - verify(podOperations, never()).delete() + verify(executorPodController, never()).removePod(any(classOf[Pod])) } private def exitReasonMessage(failedExecutorId: Int, failedPod: Pod): String = { @@ -123,11 +121,4 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte |${containersDescription(failedPod)} """.stripMargin } - - private def namedPodsAnswer(): Answer[PodResource[Pod, DoneablePod]] = - (invocation: InvocationOnMock) => { - val podName: String = invocation.getArgument(0) - namedExecutorPods.getOrElseUpdate( - podName, mock(classOf[PodResource[Pod, DoneablePod]])) - } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index 7e1e39c85a183..bd6e898706d8a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -65,6 +65,12 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn @Mock private var labeledPods: LABELED_PODS = _ + @Mock + private var podList: PodList = _ + + @Mock + private var pods: java.util.List[Pod] = _ + @Mock private var taskScheduler: TaskSchedulerImpl = _ @@ -77,6 +83,9 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn @Mock private var lifecycleEventHandler: ExecutorPodsLifecycleManager = _ + @Mock + private var executorPodController: ExecutorPodController = _ + @Mock private var watchEvents: ExecutorPodsWatchSnapshotSource = _ @@ -107,6 +116,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn eventQueue, podAllocator, lifecycleEventHandler, + executorPodController, watchEvents, pollEvents) } @@ -123,11 +133,13 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn test("Stop all components") { when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods) when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods) + when(labeledPods.list()).thenReturn(podList) + when(podList.getItems).thenReturn(pods) schedulerBackendUnderTest.stop() verify(eventQueue).stop() verify(watchEvents).stop() verify(pollEvents).stop() - verify(labeledPods).delete() + verify(executorPodController).removePods(pods) verify(kubernetesClient).close() } @@ -151,23 +163,23 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods) when(labeledPods.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2")).thenReturn(labeledPods) + val podList = mock(classOf[PodList]) when(labeledPods.list()).thenReturn(podList) - when(podList.getItems()).thenReturn(Arrays.asList[Pod]()) + when(podList.getItems()).thenReturn(pods) schedulerBackendUnderTest.doKillExecutors(Seq("1", "2")) verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled)) verify(driverEndpointRef).send(RemoveExecutor("2", ExecutorKilled)) - verify(labeledPods, never()).delete() + verify(executorPodController, never()).removePods(pods) schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD) * 2, TimeUnit.MILLISECONDS) - verify(labeledPods, never()).delete() + verify(executorPodController, never()).removePods(pods) - when(podList.getItems()).thenReturn(Arrays.asList(mock(classOf[Pod]))) schedulerBackendUnderTest.doKillExecutors(Seq("1", "2")) - verify(labeledPods, never()).delete() + verify(executorPodController, never()).removePods(pods) schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD) * 2, TimeUnit.MILLISECONDS) - verify(labeledPods).delete() + verify(executorPodController).removePods(pods) } }