Skip to content

Pluggable Pod Controller #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: upstream-master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,13 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val EXECUTOR_POD_CONTROLLER_CLASS =
ConfigBuilder("spark.kubernetes.executor.podController.class")
.doc("Experimental. Specify a class that can handle the creation " +
"and deletion of pods")
.stringConf
.createOptional

val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s

import java.util

import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient


/**
* Responsible for the creation and deletion of Pods as per the
* request of the ExecutorPodAllocator, ExecutorPodLifecycleManager, and the
* KubernetesClusterSchedulerBackend. The default implementation:
* ExecutorPodControllerImpl communicates directly
* with the KubernetesClient to create Pods. This class can be extended
* to have your communication be done with a unique CRD that satisfies
* your specific SLA and security concerns.
*/
private[spark] trait ExecutorPodController {

def initialize(kubernetesClient: KubernetesClient): Unit

def addPod(pod: Pod): Unit

def commitAndGetTotalAllocated(): Int

def removePod(pod: Pod): Unit

def removePods(pods: util.List[Pod]): Boolean
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s

import java.util

import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

private[spark] class ExecutorPodControllerImpl(
val conf: SparkConf)
extends ExecutorPodController {

private var kubernetesClient: KubernetesClient = _

private var numAdded: Int = _

override def initialize(kClient: KubernetesClient) : Unit = {
kubernetesClient = kClient
numAdded = 0
}
override def addPod(pod: Pod): Unit = {
kubernetesClient.pods().create(pod)
synchronized {
numAdded += 1
}
}

override def commitAndGetTotalAllocated(): Int = {
val totalNumAdded = numAdded
synchronized {
numAdded = 0
}
totalNumAdded
}

override def removePod(pod: Pod): Unit = {
// If deletion failed on a previous try, we can try again if resync informs us the pod
// is still around.
// Delete as best attempt - duplicate deletes will throw an exception but the end state
// of getting rid of the pod is what matters.
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.withName(pod.getMetadata.getName)
.delete()
}
}

override def removePods(pods: util.List[Pod]): Boolean = {
kubernetesClient.pods().delete(pods)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ private[spark] class ExecutorPodsAllocator(
executorBuilder: KubernetesExecutorBuilder,
kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore,
executorPodController: ExecutorPodController,
clock: Clock) extends Logging {

private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
Expand Down Expand Up @@ -115,12 +116,12 @@ private[spark] class ExecutorPodsAllocator(
newlyCreatedExecutors --= timedOut
if (shouldDeleteExecutors) {
Utils.tryLogNonFatalError {
kubernetesClient
val pods = kubernetesClient
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, timedOut.toSeq.map(_.toString): _*)
.delete()
executorPodController.removePods(pods.list().getItems)
}
}
}
Expand Down Expand Up @@ -170,13 +171,13 @@ private[spark] class ExecutorPodsAllocator(
if (toDelete.nonEmpty) {
logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).")
Utils.tryLogNonFatalError {
kubernetesClient
val pods = kubernetesClient
.pods()
.withField("status.phase", "Pending")
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*)
.delete()
executorPodController.removePods(pods.list().getItems)
newlyCreatedExecutors --= toDelete
knownPendingCount -= knownPendingToDelete.size
}
Expand All @@ -203,7 +204,7 @@ private[spark] class ExecutorPodsAllocator(
.addToContainers(executorPod.container)
.endSpec()
.build()
kubernetesClient.pods().create(podWithAttachedContainer)
executorPodController.addPod(podWithAttachedContainer)
newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis()
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.KubernetesUtils._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.ExecutorExited
import org.apache.spark.util.Utils

private[spark] class ExecutorPodsLifecycleManager(
val conf: SparkConf,
kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore,
executorPodController: ExecutorPodController,
// Use a best-effort to track which executors have been removed already. It's not generally
// job-breaking if we remove executors more than once but it's ideal if we make an attempt
// to avoid doing so. Expire cache entries so that this data structure doesn't grow beyond
Expand Down Expand Up @@ -125,16 +125,7 @@ private[spark] class ExecutorPodsLifecycleManager(
}

private def removeExecutorFromK8s(updatedPod: Pod): Unit = {
// If deletion failed on a previous try, we can try again if resync informs us the pod
// is still around.
// Delete as best attempt - duplicate deletes will throw an exception but the end state
// of getting rid of the pod is what matters.
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.withName(updatedPod.getMetadata.getName)
.delete()
}
executorPodController.removePod(updatedPod)
}

private def removeExecutorFromSpark(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import java.util.concurrent.TimeUnit
import com.google.common.cache.CacheBuilder
import io.fabric8.kubernetes.client.Config

import org.apache.spark.SparkContext
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkKubernetesClientFactory}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.util.{SystemClock, ThreadUtils}
import org.apache.spark.util.{SystemClock, ThreadUtils, Utils}

private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging {

Expand Down Expand Up @@ -98,10 +98,26 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
val removedExecutorsCache = CacheBuilder.newBuilder()
.expireAfterWrite(3, TimeUnit.MINUTES)
.build[java.lang.Long, java.lang.Long]()

val executorPodControllers = sc.conf.get(EXECUTOR_POD_CONTROLLER_CLASS)
.map(clazz => Utils.loadExtensions(
classOf[ExecutorPodController],
Seq(clazz), sc.conf))
.getOrElse(Seq(new ExecutorPodControllerImpl(sc.conf)))

if (executorPodControllers.size > 1) {
throw new SparkException(
s"Multiple executorPodControllers listed: $executorPodControllers")
}
val executorPodController = executorPodControllers.head

executorPodController.initialize(kubernetesClient)

val executorPodsLifecycleEventHandler = new ExecutorPodsLifecycleManager(
sc.conf,
kubernetesClient,
snapshotsStore,
executorPodController,
removedExecutorsCache)

val executorPodsAllocator = new ExecutorPodsAllocator(
Expand All @@ -110,6 +126,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
new KubernetesExecutorBuilder(),
kubernetesClient,
snapshotsStore,
executorPodController,
new SystemClock())

val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(
Expand All @@ -129,6 +146,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
snapshotsStore,
executorPodsAllocator,
executorPodsLifecycleEventHandler,
executorPodController,
podsWatchEventSource,
podsPollingEventSource)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
snapshotsStore: ExecutorPodsSnapshotsStore,
podAllocator: ExecutorPodsAllocator,
lifecycleEventHandler: ExecutorPodsLifecycleManager,
executorPodController: ExecutorPodController,
watchEvents: ExecutorPodsWatchSnapshotSource,
pollEvents: ExecutorPodsPollingSnapshotSource)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
Expand Down Expand Up @@ -99,11 +100,11 @@ private[spark] class KubernetesClusterSchedulerBackend(

if (shouldDeleteExecutors) {
Utils.tryLogNonFatalError {
kubernetesClient
val pods = kubernetesClient
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.delete()
executorPodController.removePods(pods.list().getItems)
}
}

Expand Down Expand Up @@ -146,11 +147,13 @@ private[spark] class KubernetesClusterSchedulerBackend(
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
.list()
.getItems()

if (!running.list().getItems().isEmpty()) {
logInfo(s"Forcefully deleting ${running.list().getItems().size()} pods " +
if (!running.isEmpty()) {
logInfo(s"Forcefully deleting ${running.size()} pods " +
s"(out of ${executorIds.size}) that are still running after graceful shutdown period.")
running.delete()
executorPodController.removePods(running)
}
}
}
Expand Down
Loading