@@ -38,6 +38,7 @@ import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState
38
38
import org .apache .kyuubi .engine .KubernetesResourceEventTypes .KubernetesResourceEventType
39
39
import org .apache .kyuubi .operation .OperationState
40
40
import org .apache .kyuubi .server .KyuubiServer
41
+ import org .apache .kyuubi .server .metadata .api .KubernetesMetadata
41
42
import org .apache .kyuubi .session .KyuubiSessionManager
42
43
import org .apache .kyuubi .util .{KubernetesUtils , ThreadUtils }
43
44
@@ -336,7 +337,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
336
337
updateApplicationState(kubernetesInfo, newPod, eventType)
337
338
val appState = toApplicationState(newPod, appStateSource, appStateContainer, eventType)
338
339
if (isTerminated(appState)) {
339
- markApplicationTerminated(newPod, eventType)
340
+ markApplicationTerminated(kubernetesInfo, newPod, eventType)
340
341
}
341
342
KubernetesApplicationAuditLogger .audit(
342
343
eventType,
@@ -354,7 +355,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
354
355
if (isSparkEnginePod(pod)) {
355
356
val eventType = KubernetesResourceEventTypes .DELETE
356
357
updateApplicationState(kubernetesInfo, pod, eventType)
357
- markApplicationTerminated(pod, eventType)
358
+ markApplicationTerminated(kubernetesInfo, pod, eventType)
358
359
KubernetesApplicationAuditLogger .audit(
359
360
eventType,
360
361
kubernetesInfo,
@@ -450,13 +451,24 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
450
451
}
451
452
452
453
private def markApplicationTerminated (
454
+ kubernetesInfo : KubernetesInfo ,
453
455
pod : Pod ,
454
456
eventType : KubernetesResourceEventType ): Unit = synchronized {
455
457
val key = pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY )
458
+ val (appState, appError) =
459
+ toApplicationStateAndError(pod, appStateSource, appStateContainer, eventType)
460
+ // upsert the kubernetes metadata when the application is terminated
461
+ metadataManager.foreach(_.upsertKubernetesMetadata(
462
+ KubernetesMetadata (
463
+ identifier = key,
464
+ context = kubernetesInfo.context,
465
+ namespace = kubernetesInfo.namespace,
466
+ podName = pod.getMetadata.getName,
467
+ appId = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL ),
468
+ appState = appState.toString,
469
+ appError = appError)))
456
470
if (cleanupTerminatedAppInfoTrigger.getIfPresent(key) == null ) {
457
- cleanupTerminatedAppInfoTrigger.put(
458
- key,
459
- toApplicationState(pod, appStateSource, appStateContainer, eventType))
471
+ cleanupTerminatedAppInfoTrigger.put(key, appState)
460
472
}
461
473
}
462
474
0 commit comments