@@ -32,6 +32,7 @@ import scala.concurrent.{ExecutionContext, Future}
32
32
import org .apache .spark .{SparkEnv , SparkException }
33
33
import org .apache .spark .deploy .k8s .config ._
34
34
import org .apache .spark .deploy .k8s .constants ._
35
+ import org .apache .spark .internal .config ._
35
36
import org .apache .spark .rpc .{RpcAddress , RpcCallContext , RpcEndpointAddress , RpcEnv }
36
37
import org .apache .spark .scheduler .{ExecutorExited , SlaveLost , TaskSchedulerImpl }
37
38
import org .apache .spark .scheduler .cluster .CoarseGrainedClusterMessages .{RetrieveSparkAppConfig , SparkAppConfig }
@@ -54,6 +55,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
54
55
private val RUNNING_EXECUTOR_PODS_LOCK = new Object
55
56
// Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
56
57
private val runningExecutorsToPods = new mutable.HashMap [String , Pod ]
58
+ // Executors names with failed status and guarded by RUNNING_EXECUTOR_PODS_LOCK.
59
+ private val failedExecutors = new mutable.HashSet [String ]
57
60
// Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK.
58
61
private val runningPodsToExecutors = new mutable.HashMap [String , String ]
59
62
private val executorPodsByIPs = new ConcurrentHashMap [String , Pod ]()
@@ -114,19 +117,20 @@ private[spark] class KubernetesClusterSchedulerBackend(
114
117
override def run (): Unit = {
115
118
handleDisconnectedExecutors()
116
119
RUNNING_EXECUTOR_PODS_LOCK .synchronized {
117
- if (totalRegisteredExecutors.get() < runningExecutorsToPods.size ) {
120
+ if (totalRegisteredExecutors.get() < runningExecutorSize() ) {
118
121
logDebug(" Waiting for pending executors before scaling" )
119
- } else if (totalExpectedExecutors.get() <= runningExecutorsToPods.size ) {
122
+ } else if (totalExpectedExecutors.get() <= runningExecutorSize() ) {
120
123
logDebug(" Maximum allowed executor limit reached. Not scaling up further." )
121
124
} else {
122
125
val nodeToLocalTaskCount = getNodesWithLocalTaskCounts
123
126
for (i <- 0 until math.min(
124
- totalExpectedExecutors.get - runningExecutorsToPods.size , podAllocationSize)) {
127
+ totalExpectedExecutors.get - runningExecutorSize() , podAllocationSize)) {
125
128
val (executorId, pod) = allocateNewExecutorPod(nodeToLocalTaskCount)
126
129
runningExecutorsToPods.put(executorId, pod)
127
130
runningPodsToExecutors.put(pod.getMetadata.getName, executorId)
128
131
logInfo(
129
- s " Requesting a new executor, total executors is now ${runningExecutorsToPods.size}" )
132
+ s " Requesting a new executor $executorId, total executors is now " +
133
+ s " ${runningExecutorSize()} ( ${failedExecutors.size} failed) " )
130
134
}
131
135
}
132
136
}
@@ -172,9 +176,33 @@ private[spark] class KubernetesClusterSchedulerBackend(
172
176
runningExecutorsToPods.remove(executorId).map { pod =>
173
177
kubernetesClient.pods().delete(pod)
174
178
runningPodsToExecutors.remove(pod.getMetadata.getName)
179
+ failedExecutors -= pod.getMetadata.getName
175
180
}.getOrElse(logWarning(s " Unable to remove pod for unknown executor $executorId" ))
176
181
}
177
182
}
183
+
184
+ // It represent current created executors exclude failed one.
185
+ // To avoid create too many failed executor,
186
+ // we limit the accounting size of failed executors to maxNumExecutorFailures
187
+ // So after create totalExpectedExecutors + maxNumExecutorFailures executors,
188
+ // we stop create more even if all of them failed
189
+ def runningExecutorSize (): Int = runningExecutorsToPods.size -
190
+ math.min(failedExecutors.size, maxNumExecutorFailures)
191
+
192
+ // Default to twice the number of executors (twice the maximum number of executors if dynamic
193
+ // allocation is enabled), with a minimum of 3.
194
+ val maxNumExecutorFailures = {
195
+ val effectiveNumExecutors =
196
+ if (Utils .isDynamicAllocationEnabled(conf)) {
197
+ conf.get(DYN_ALLOCATION_MAX_EXECUTORS )
198
+ } else {
199
+ conf.get(EXECUTOR_INSTANCES ).getOrElse(0 )
200
+ }
201
+ // By default, effectiveNumExecutors is Int.MaxValue if dynamic allocation is enabled. We need
202
+ // avoid the integer overflow here.
203
+ math.max(3 ,
204
+ if (effectiveNumExecutors > Int .MaxValue / 2 ) Int .MaxValue else 2 * effectiveNumExecutors)
205
+ }
178
206
}
179
207
180
208
private def getInitialTargetExecutorNumber (defaultNumExecutors : Int = 1 ): Int = {
@@ -233,6 +261,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
233
261
runningExecutorsToPods.values.foreach(kubernetesClient.pods().delete(_))
234
262
runningExecutorsToPods.clear()
235
263
runningPodsToExecutors.clear()
264
+ failedExecutors.clear()
236
265
}
237
266
executorPodsByIPs.clear()
238
267
val resource = executorWatchResource.getAndSet(null )
@@ -311,6 +340,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
311
340
kubernetesClient.pods().delete(executorPod)
312
341
disconnectedPodsByExecutorIdPendingRemoval.put(executor, executorPod)
313
342
runningPodsToExecutors.remove(executorPod.getMetadata.getName)
343
+ failedExecutors -= executorPod.getMetadata.getName
314
344
}
315
345
if (maybeRemovedExecutor.isEmpty) {
316
346
logWarning(s " Unable to remove pod for unknown executor $executor" )
@@ -354,6 +384,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
354
384
logInfo(s " Received delete pod $podName event. Reason: " + pod.getStatus.getReason)
355
385
handleDeletedPod(pod)
356
386
}
387
+ } else if (action == Action .MODIFIED && pod.getStatus.getPhase == " Failed" ) {
388
+ logError(s " Executor pod ${pod.getMetadata.getName} failed with container status " +
389
+ s " ${pod.getStatus.getContainerStatuses}" )
390
+ handleFailedPod(pod)
357
391
}
358
392
}
359
393
@@ -407,6 +441,13 @@ private[spark] class KubernetesClusterSchedulerBackend(
407
441
podsWithKnownExitReasons.put(pod.getMetadata.getName, exitReason)
408
442
}
409
443
444
+ def handleFailedPod (pod : Pod ): Unit = {
445
+ RUNNING_EXECUTOR_PODS_LOCK .synchronized {
446
+ failedExecutors += pod.getMetadata.getName
447
+ }
448
+ handleErroredPod(pod)
449
+ }
450
+
410
451
def handleDeletedPod (pod : Pod ): Unit = {
411
452
val exitMessage = if (isPodAlreadyReleased(pod)) {
412
453
s " Container in pod ${pod.getMetadata.getName} exited from explicit termination request. "
0 commit comments