Skip to content

Commit 82e1673

Browse files
committed
[KYUUBI #7026] Audit the kubernetes pod event type and fix DELETE event process logical
### Why are the changes needed? 1. Audit the kubernetes resource event type. 2. Fix the process logical for DELETE event. Before this pr: I tried to delete the POD manually, then I saw that, kyuubi thought the `appState=PENDING`. ``` :2025-04-15 13:58:20.320 INFO [-1077768163-pool-36-thread-7] org.apache.kyuubi.engine.KubernetesApplicationAuditLogger: eventType=DELETE label=3c58e9fd-cf8c-4cc3-a9aa-82ae40e200d8 context=97 namespace=dls-prod pod=kyuubi-spark-3c58e9fd-cf8c-4cc3-a9aa-82ae40e200d8-driver podState=Pending containers=[] appId=spark-cd125bbd9fc84ffcae6d6b5d41d4d8ad appState=PENDING appError='' ``` It seems that, the pod status in the event is the snapshot before pod deleted. Then we would not receive any event for this POD, and finally the batch FINISHED with application `NOT_FOUND` . <img width="1389" alt="image" src="https://github.com/user-attachments/assets/5df03db6-0924-4a58-9538-b196fbf87f32" /> Seems we need to process the DELETE event specially. 1. get the app state from the pod/container states 2. if the applicationState got is terminated, return the applicationState directly 3. otherwise, the applicationState should be FAILED, as the pod has been deleted. ### How was this patch tested? <img width="1614" alt="image" src="https://github.com/user-attachments/assets/11e64c6f-ad53-4485-b8d2-a351bb23e8ca" /> ### Was this patch authored or co-authored using generative AI tooling? No. Closes #7026 from turboFei/k8s_audit. Closes #7026 4e5695d [Wang, Fei] for delete c167572 [Wang, Fei] audit the pod event type Authored-by: Wang, Fei <[email protected]> Signed-off-by: Wang, Fei <[email protected]>
1 parent 4fc201e commit 82e1673

File tree

3 files changed

+72
-13
lines changed

3 files changed

+72
-13
lines changed

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationAuditLogger.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,22 @@ import io.fabric8.kubernetes.api.model.Pod
2424
import org.apache.kyuubi.Logging
2525
import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource
2626
import org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationStateAndError, LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL}
27+
import org.apache.kyuubi.engine.KubernetesResourceEventTypes.KubernetesResourceEventType
2728

2829
object KubernetesApplicationAuditLogger extends Logging {
2930
final private val AUDIT_BUFFER = new ThreadLocal[StringBuilder]() {
3031
override protected def initialValue: StringBuilder = new StringBuilder()
3132
}
3233

3334
def audit(
35+
eventType: KubernetesResourceEventType,
3436
kubernetesInfo: KubernetesInfo,
3537
pod: Pod,
3638
appStateSource: KubernetesApplicationStateSource,
3739
appStateContainer: String): Unit = {
3840
val sb = AUDIT_BUFFER.get()
3941
sb.setLength(0)
42+
sb.append("eventType=").append(eventType).append("\t")
4043
sb.append(s"label=${pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)}").append("\t")
4144
sb.append(s"context=${kubernetesInfo.context.orNull}").append("\t")
4245
sb.append(s"namespace=${kubernetesInfo.namespace.orNull}").append("\t")
@@ -48,7 +51,7 @@ object KubernetesApplicationAuditLogger extends Logging {
4851
sb.append(s"containers=$containerStatuses").append("\t")
4952
sb.append(s"appId=${pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL)}").append("\t")
5053
val (appState, appError) =
51-
toApplicationStateAndError(pod, appStateSource, appStateContainer)
54+
toApplicationStateAndError(pod, appStateSource, appStateContainer, eventType)
5255
sb.append(s"appState=$appState").append("\t")
5356
sb.append(s"appError='${appError.getOrElse("")}'")
5457
info(sb.toString())

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.kyuubi.config.KyuubiConf.{KubernetesApplicationStateSource, Ku
3535
import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource
3636
import org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{ALL, COMPLETED, NONE}
3737
import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, KILLED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
38+
import org.apache.kyuubi.engine.KubernetesResourceEventTypes.KubernetesResourceEventType
3839
import org.apache.kyuubi.operation.OperationState
3940
import org.apache.kyuubi.server.KyuubiServer
4041
import org.apache.kyuubi.session.KyuubiSessionManager
@@ -315,8 +316,10 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
315316

316317
override def onAdd(pod: Pod): Unit = {
317318
if (isSparkEnginePod(pod)) {
318-
updateApplicationState(kubernetesInfo, pod)
319+
val eventType = KubernetesResourceEventTypes.ADD
320+
updateApplicationState(kubernetesInfo, pod, eventType)
319321
KubernetesApplicationAuditLogger.audit(
322+
eventType,
320323
kubernetesInfo,
321324
pod,
322325
appStateSource,
@@ -327,14 +330,16 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
327330

328331
override def onUpdate(oldPod: Pod, newPod: Pod): Unit = {
329332
if (isSparkEnginePod(newPod)) {
333+
val eventType = KubernetesResourceEventTypes.UPDATE
330334
val kyuubiUniqueKey = newPod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)
331335
val firstUpdate = appInfoStore.get(kyuubiUniqueKey) == null
332-
updateApplicationState(kubernetesInfo, newPod)
333-
val appState = toApplicationState(newPod, appStateSource, appStateContainer)
336+
updateApplicationState(kubernetesInfo, newPod, eventType)
337+
val appState = toApplicationState(newPod, appStateSource, appStateContainer, eventType)
334338
if (isTerminated(appState)) {
335-
markApplicationTerminated(newPod)
339+
markApplicationTerminated(newPod, eventType)
336340
}
337341
KubernetesApplicationAuditLogger.audit(
342+
eventType,
338343
kubernetesInfo,
339344
newPod,
340345
appStateSource,
@@ -347,9 +352,11 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
347352

348353
override def onDelete(pod: Pod, deletedFinalStateUnknown: Boolean): Unit = {
349354
if (isSparkEnginePod(pod)) {
350-
updateApplicationState(kubernetesInfo, pod)
351-
markApplicationTerminated(pod)
355+
val eventType = KubernetesResourceEventTypes.DELETE
356+
updateApplicationState(kubernetesInfo, pod, eventType)
357+
markApplicationTerminated(pod, eventType)
352358
KubernetesApplicationAuditLogger.audit(
359+
eventType,
353360
kubernetesInfo,
354361
pod,
355362
appStateSource,
@@ -388,9 +395,12 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
388395
selectors.containsKey(LABEL_KYUUBI_UNIQUE_KEY) && selectors.containsKey(SPARK_APP_ID_LABEL)
389396
}
390397

391-
private def updateApplicationState(kubernetesInfo: KubernetesInfo, pod: Pod): Unit = {
398+
private def updateApplicationState(
399+
kubernetesInfo: KubernetesInfo,
400+
pod: Pod,
401+
eventType: KubernetesResourceEventType): Unit = {
392402
val (appState, appError) =
393-
toApplicationStateAndError(pod, appStateSource, appStateContainer)
403+
toApplicationStateAndError(pod, appStateSource, appStateContainer, eventType)
394404
debug(s"Driver Informer changes pod: ${pod.getMetadata.getName} to state: $appState")
395405
val kyuubiUniqueKey = pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)
396406
appInfoStore.synchronized {
@@ -439,12 +449,14 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
439449
}.getOrElse(warn(s"Spark UI port not found in service ${svc.getMetadata.getName}"))
440450
}
441451

442-
private def markApplicationTerminated(pod: Pod): Unit = synchronized {
452+
private def markApplicationTerminated(
453+
pod: Pod,
454+
eventType: KubernetesResourceEventType): Unit = synchronized {
443455
val key = pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)
444456
if (cleanupTerminatedAppInfoTrigger.getIfPresent(key) == null) {
445457
cleanupTerminatedAppInfoTrigger.put(
446458
key,
447-
toApplicationState(pod, appStateSource, appStateContainer))
459+
toApplicationState(pod, appStateSource, appStateContainer, eventType))
448460
}
449461
}
450462

@@ -504,11 +516,31 @@ object KubernetesApplicationOperation extends Logging {
504516
def toApplicationState(
505517
pod: Pod,
506518
appStateSource: KubernetesApplicationStateSource,
507-
appStateContainer: String): ApplicationState = {
508-
toApplicationStateAndError(pod, appStateSource, appStateContainer)._1
519+
appStateContainer: String,
520+
eventType: KubernetesResourceEventType): ApplicationState = {
521+
toApplicationStateAndError(pod, appStateSource, appStateContainer, eventType)._1
509522
}
510523

511524
def toApplicationStateAndError(
525+
pod: Pod,
526+
appStateSource: KubernetesApplicationStateSource,
527+
appStateContainer: String,
528+
eventType: KubernetesResourceEventType): (ApplicationState, Option[String]) = {
529+
eventType match {
530+
case KubernetesResourceEventTypes.ADD | KubernetesResourceEventTypes.UPDATE =>
531+
getApplicationStateAndErrorFromPod(pod, appStateSource, appStateContainer)
532+
case KubernetesResourceEventTypes.DELETE =>
533+
val (appState, appError) =
534+
getApplicationStateAndErrorFromPod(pod, appStateSource, appStateContainer)
535+
if (ApplicationState.isTerminated(appState)) {
536+
(appState, appError)
537+
} else {
538+
(ApplicationState.FAILED, Some(s"Pod ${pod.getMetadata.getName} is deleted"))
539+
}
540+
}
541+
}
542+
543+
private def getApplicationStateAndErrorFromPod(
512544
pod: Pod,
513545
appStateSource: KubernetesApplicationStateSource,
514546
appStateContainer: String): (ApplicationState, Option[String]) = {
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine
19+
20+
object KubernetesResourceEventTypes extends Enumeration {
21+
type KubernetesResourceEventType = Value
22+
23+
val ADD, UPDATE, DELETE = Value
24+
}

0 commit comments

Comments
 (0)