diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index c90303b6ba506..cb4fcd1a6072f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -81,6 +81,18 @@ private[spark] object SparkKubernetesClientFactory { }.withOption(namespace) { (ns, configBuilder) => configBuilder.withNamespace(ns) }.build() + val connTimeoutPropertiesMs = sparkConf.get( + KUBERNETES_CLIENT_CONNECTION_TIMEOUT_SYSTEM_PROPERTY) + config.setConnectionTimeout(connTimeoutPropertiesMs.toInt) + val reqTimeoutPropertiesMs = sparkConf.get( + KUBERNETES_CLIENT_REQUEST_TIMEOUT_SYSTEM_PROPERTY) + config.setRequestTimeout(reqTimeoutPropertiesMs.toInt) + val reconnectIntervalPropertiesMs = sparkConf.get( + KUBERNETES_CLIENT_WATCH_RECONNECT_INTERVAL_SYSTEM_PROPERTY) + config.setWatchReconnectInterval(reconnectIntervalPropertiesMs.toInt) + val reconnectLimitProperties = sparkConf.get( + KUBERNETES_CLIENT_WATCH_RECONNECT_LIMIT_SYSTEM_PROPERTY) + config.setWatchReconnectLimit(reconnectLimitProperties) val baseHttpClient = HttpClientUtils.createHttpClient(config) val httpClientWithCustomDispatcher = baseHttpClient.newBuilder() .dispatcher(dispatcher) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala index e395fed810a3d..58cdaad0b3089 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala @@ -554,4 +554,28 @@ package object config extends Logging { resolvedURL } } + + private[spark] val KUBERNETES_CLIENT_WATCH_RECONNECT_INTERVAL_SYSTEM_PROPERTY = + ConfigBuilder("spark.kubernetes.client.watch.reconnectIntervalInMs") + .doc("Connection retry interval for kubernetes client requests.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1s") + + private[spark] val KUBERNETES_CLIENT_WATCH_RECONNECT_LIMIT_SYSTEM_PROPERTY = + ConfigBuilder("spark.kubernetes.client.watch.reconnectLimit") + .doc("Limit of times connections can be attempted for kubernetes client requests.") + .intConf + .createWithDefault(-1) + + private[spark] val KUBERNETES_CLIENT_CONNECTION_TIMEOUT_SYSTEM_PROPERTY = + ConfigBuilder("spark.kubernetes.client.connection.timeoutInMs") + .doc("Connection timeout for kubernetes client requests.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("10s") + + private[spark] val KUBERNETES_CLIENT_REQUEST_TIMEOUT_SYSTEM_PROPERTY = + ConfigBuilder("spark.kubernetes.client.request.timeoutInMs") + .doc("Request timeout for kubernetes client requests.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("10s") } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index d30c88fcc74bf..e36a8763de7b3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -339,6 +339,21 @@ private[spark] class KubernetesClusterSchedulerBackend( val clusterNodeName = pod.getSpec.getNodeName logDebug(s"Executor pod $pod ready, launched at $clusterNodeName as IP $podIP.") executorPodsByIPs.put(podIP, pod) + } else if (action == Action.MODIFIED && pod.getStatus.getPhase == "Failed" + && pod.getMetadata.getDeletionTimestamp == null) { + val podName = pod.getMetadata.getName + val podIP = pod.getStatus.getPodIP + if (podIP != null) { + executorPodsByIPs.remove(podIP) + } + RUNNING_EXECUTOR_PODS_LOCK.synchronized { + runningPodsToExecutors.get(podName).foreach { executorId => + disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod) + logInfo(s"executor $executorId Failed") + } + } + logInfo(s"Received pod $podName failed event. Reason: " + pod.getStatus.getReason) + handleErroredPod(pod) } else if ((action == Action.MODIFIED && pod.getMetadata.getDeletionTimestamp != null) || action == Action.DELETED || action == Action.ERROR) { val podName = pod.getMetadata.getName @@ -393,7 +408,7 @@ private[spark] class KubernetesClusterSchedulerBackend( " exited from explicit termination request.") } else { val containerExitReason = containerExitStatus match { - case VMEM_EXCEEDED_EXIT_CODE | PMEM_EXCEEDED_EXIT_CODE => + case MEM_EXCEEDED_EXIT_CODE => memLimitExceededLogMessage(pod.getStatus.getReason) case _ => // Here we can't be sure that that exit was caused by the application but this seems @@ -474,8 +489,7 @@ private[spark] class KubernetesClusterSchedulerBackend( } private object KubernetesClusterSchedulerBackend { - private val VMEM_EXCEEDED_EXIT_CODE = -103 - private val PMEM_EXCEEDED_EXIT_CODE = -104 + private val MEM_EXCEEDED_EXIT_CODE = 137 private val UNKNOWN_EXIT_CODE = -111 // Number of times we are allowed check for the loss reason for an executor before we give up // and assume the executor failed for good, and attribute it to a framework fault.