Skip to content

Commit 3feec05

Browse files
committed
migration
migrate upsert app
1 parent 82e1673 commit 3feec05

16 files changed

+591
-11
lines changed

.rat-excludes

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ build/scala-*/**
4343
**/metadata-store-schema*.sql
4444
**/*.derby.sql
4545
**/*.mysql.sql
46+
**/*.postgresql.sql
4647
**/*.sqlite.sql
4748
**/node/**
4849
**/web-ui/dist/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
SELECT '< KYUUBI-7028: Persist Kubernetes metadata into metastore' AS ' ';
2+
3+
CREATE TABLE IF NOT EXISTS kubernetes_metadata(
4+
key_id bigint PRIMARY KEY AUTO_INCREMENT COMMENT 'the auto increment key id',
5+
identifier varchar(36) NOT NULL COMMENT 'the identifier id, which is an UUID',
6+
context varchar(32) COMMENT 'the kubernetes context',
7+
namespace varchar(255) COMMENT 'the kubernetes namespace',
8+
pod_name varchar(255) NOT NULL COMMENT 'the kubernetes pod name',
9+
app_id varchar(128) COMMENT 'the application id',
10+
app_state varchar(32) COMMENT 'the application state',
11+
app_error mediumtext COMMENT NOT NULL COMMENT 'the application diagnose',
12+
create_time bigint COMMENT 'the metadata create time',
13+
update_time bigint COMMENT 'the metadata update time',
14+
UNIQUE INDEX unique_identifier_index(identifier)
15+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
-- the metadata table ddl
2+
3+
CREATE TABLE IF NOT EXISTS metadata(
4+
key_id bigint PRIMARY KEY AUTO_INCREMENT COMMENT 'the auto increment key id',
5+
identifier varchar(36) NOT NULL COMMENT 'the identifier id, which is an UUID',
6+
session_type varchar(32) NOT NULL COMMENT 'the session type, SQL or BATCH',
7+
real_user varchar(255) NOT NULL COMMENT 'the real user',
8+
user_name varchar(255) NOT NULL COMMENT 'the user name, might be a proxy user',
9+
ip_address varchar(128) COMMENT 'the client ip address',
10+
kyuubi_instance varchar(1024) COMMENT 'the kyuubi instance that creates this',
11+
state varchar(128) NOT NULL COMMENT 'the session state',
12+
resource varchar(1024) COMMENT 'the main resource',
13+
class_name varchar(1024) COMMENT 'the main class name',
14+
request_name varchar(1024) COMMENT 'the request name',
15+
request_conf mediumtext COMMENT 'the request config map',
16+
request_args mediumtext COMMENT 'the request arguments',
17+
create_time BIGINT NOT NULL COMMENT 'the metadata create time',
18+
engine_type varchar(32) NOT NULL COMMENT 'the engine type',
19+
cluster_manager varchar(128) COMMENT 'the engine cluster manager',
20+
engine_open_time bigint COMMENT 'the engine open time',
21+
engine_id varchar(128) COMMENT 'the engine application id',
22+
engine_name mediumtext COMMENT 'the engine application name',
23+
engine_url varchar(1024) COMMENT 'the engine tracking url',
24+
engine_state varchar(32) COMMENT 'the engine application state',
25+
engine_error mediumtext COMMENT 'the engine application diagnose',
26+
end_time bigint COMMENT 'the metadata end time',
27+
priority int NOT NULL DEFAULT 10 COMMENT 'the application priority, high value means high priority',
28+
peer_instance_closed boolean default '0' COMMENT 'closed by peer kyuubi instance',
29+
UNIQUE INDEX unique_identifier_index(identifier),
30+
INDEX user_name_index(user_name),
31+
INDEX engine_type_index(engine_type),
32+
INDEX create_time_index(create_time),
33+
-- See more detail about this index in ./005-KYUUBI-5327.mysql.sql
34+
INDEX priority_create_time_index(priority DESC, create_time ASC)
35+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
36+
37+
CREATE TABLE IF NOT EXISTS kubernetes_metadata(
38+
key_id bigint PRIMARY KEY AUTO_INCREMENT COMMENT 'the auto increment key id',
39+
identifier varchar(36) NOT NULL COMMENT 'the identifier id, which is an UUID',
40+
context varchar(32) COMMENT 'the kubernetes context',
41+
namespace varchar(255) COMMENT 'the kubernetes namespace',
42+
pod_name varchar(255) NOT NULL COMMENT 'the kubernetes pod name',
43+
app_id varchar(128) COMMENT 'the application id',
44+
app_state varchar(32) COMMENT 'the application state',
45+
app_error mediumtext COMMENT NOT NULL COMMENT 'the application diagnose',
46+
create_time bigint COMMENT 'the metadata create time',
47+
update_time bigint COMMENT 'the metadata update time',
48+
UNIQUE INDEX unique_identifier_index(identifier)
49+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
SELECT '< Upgrading MetaStore schema from 1.8.0 to 1.11.0 >' AS ' ';
2+
SOURCE 006-KYUUBI-7028.mysql.sql;
3+
SELECT '< Finished upgrading MetaStore schema from 1.8.0 to 1.11.0 >' AS ' ';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
SELECT '< KYUUBI-7028: Persist Kubernetes metadata into metastore' AS ' ';
2+
3+
CREATE TABLE IF NOT EXISTS kubernetes_metadata(
4+
key_id bigserial PRIMARY KEY,
5+
identifier varchar(36) NOT NULL,
6+
context varchar(32),
7+
namespace varchar(255),
8+
pod_name varchar(255) NOT NULL,
9+
app_id varchar(128),
10+
app_state varchar(32),
11+
app_error text,
12+
create_time bigint NOT NULL,
13+
update_time bigint NOT NULL
14+
);
15+
16+
COMMENT ON COLUMN kubernetes_metadata.key_id IS 'the auto increment key id';
17+
COMMENT ON COLUMN kubernetes_metadata.identifier IS 'the identifier id, which is an UUID';
18+
COMMENT ON COLUMN kubernetes_metadata.context IS 'the kubernetes context';
19+
COMMENT ON COLUMN kubernetes_metadata.namespace IS 'the kubernetes namespace';
20+
COMMENT ON COLUMN kubernetes_metadata.pod_name IS 'the kubernetes pod name';
21+
COMMENT ON COLUMN kubernetes_metadata.app_id IS 'the application id';
22+
COMMENT ON COLUMN kubernetes_metadata.app_state IS 'the application state';
23+
COMMENT ON COLUMN kubernetes_metadata.app_error IS 'the application diagnose';
24+
COMMENT ON COLUMN kubernetes_metadata.create_time IS 'the metadata create time';
25+
COMMENT ON COLUMN kubernetes_metadata.update_time IS 'the metadata update time';
26+
27+
CREATE UNIQUE INDEX IF NOT EXISTS kubernetes_metadata_unique_identifier_index ON kubernetes_metadata(identifier);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
CREATE TABLE IF NOT EXISTS metadata(
2+
key_id bigserial PRIMARY KEY,
3+
identifier varchar(36) NOT NULL,
4+
session_type varchar(32) NOT NULL,
5+
real_user varchar(255) NOT NULL,
6+
user_name varchar(255) NOT NULL,
7+
ip_address varchar(128),
8+
kyuubi_instance varchar(1024),
9+
state varchar(128) NOT NULL,
10+
resource varchar(1024),
11+
class_name varchar(1024),
12+
request_name varchar(1024),
13+
request_conf text,
14+
request_args text,
15+
create_time bigint NOT NULL,
16+
engine_type varchar(32) NOT NULL,
17+
cluster_manager varchar(128),
18+
engine_open_time bigint,
19+
engine_id varchar(128),
20+
engine_name text,
21+
engine_url varchar(1024),
22+
engine_state varchar(32),
23+
engine_error text,
24+
end_time bigint,
25+
priority int NOT NULL DEFAULT 10,
26+
peer_instance_closed boolean DEFAULT FALSE
27+
);
28+
29+
COMMENT ON COLUMN metadata.key_id IS 'the auto increment key id';
30+
COMMENT ON COLUMN metadata.identifier IS 'the identifier id, which is an UUID';
31+
COMMENT ON COLUMN metadata.session_type IS 'the session type, SQL or BATCH';
32+
COMMENT ON COLUMN metadata.real_user IS 'the real user';
33+
COMMENT ON COLUMN metadata.user_name IS 'the user name, might be a proxy user';
34+
COMMENT ON COLUMN metadata.ip_address IS 'the client ip address';
35+
COMMENT ON COLUMN metadata.kyuubi_instance IS 'the kyuubi instance that creates this';
36+
COMMENT ON COLUMN metadata.state IS 'the session state';
37+
COMMENT ON COLUMN metadata.resource IS 'the main resource';
38+
COMMENT ON COLUMN metadata.class_name IS 'the main class name';
39+
COMMENT ON COLUMN metadata.request_name IS 'the request name';
40+
COMMENT ON COLUMN metadata.request_conf IS 'the request config map';
41+
COMMENT ON COLUMN metadata.request_args IS 'the request arguments';
42+
COMMENT ON COLUMN metadata.create_time IS 'the metadata create time';
43+
COMMENT ON COLUMN metadata.engine_type IS 'the engine type';
44+
COMMENT ON COLUMN metadata.cluster_manager IS 'the engine cluster manager';
45+
COMMENT ON COLUMN metadata.engine_open_time IS 'the engine open time';
46+
COMMENT ON COLUMN metadata.engine_id IS 'the engine application id';
47+
COMMENT ON COLUMN metadata.engine_name IS 'the engine application name';
48+
COMMENT ON COLUMN metadata.engine_url IS 'the engine tracking url';
49+
COMMENT ON COLUMN metadata.engine_state IS 'the engine application state';
50+
COMMENT ON COLUMN metadata.engine_error IS 'the engine application diagnose';
51+
COMMENT ON COLUMN metadata.end_time IS 'the metadata end time';
52+
COMMENT ON COLUMN metadata.priority IS 'the application priority, high value means high priority';
53+
COMMENT ON COLUMN metadata.peer_instance_closed IS 'closed by peer kyuubi instance';
54+
55+
CREATE UNIQUE INDEX IF NOT EXISTS unique_identifier_index ON metadata(identifier);
56+
CREATE INDEX IF NOT EXISTS user_name_index ON metadata(user_name);
57+
CREATE INDEX IF NOT EXISTS engine_type_index ON metadata(engine_type);
58+
CREATE INDEX IF NOT EXISTS create_time_index ON metadata(create_time);
59+
CREATE INDEX IF NOT EXISTS priority_create_time_index ON metadata(priority DESC, create_time ASC);
60+
61+
CREATE TABLE IF NOT EXISTS kubernetes_metadata(
62+
key_id bigserial PRIMARY KEY,
63+
identifier varchar(36) NOT NULL,
64+
context varchar(32),
65+
namespace varchar(255),
66+
pod_name varchar(255) NOT NULL,
67+
app_id varchar(128),
68+
app_state varchar(32),
69+
app_error text,
70+
create_time bigint NOT NULL,
71+
update_time bigint NOT NULL
72+
);
73+
74+
COMMENT ON COLUMN kubernetes_metadata.key_id IS 'the auto increment key id';
75+
COMMENT ON COLUMN kubernetes_metadata.identifier IS 'the identifier id, which is an UUID';
76+
COMMENT ON COLUMN kubernetes_metadata.context IS 'the kubernetes context';
77+
COMMENT ON COLUMN kubernetes_metadata.namespace IS 'the kubernetes namespace';
78+
COMMENT ON COLUMN kubernetes_metadata.pod_name IS 'the kubernetes pod name';
79+
COMMENT ON COLUMN kubernetes_metadata.app_id IS 'the application id';
80+
COMMENT ON COLUMN kubernetes_metadata.app_state IS 'the application state';
81+
COMMENT ON COLUMN kubernetes_metadata.app_error IS 'the application diagnose';
82+
COMMENT ON COLUMN kubernetes_metadata.create_time IS 'the metadata create time';
83+
COMMENT ON COLUMN kubernetes_metadata.update_time IS 'the metadata update time';
84+
85+
CREATE UNIQUE INDEX IF NOT EXISTS kubernetes_metadata_unique_identifier_index ON kubernetes_metadata(identifier);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
SELECT '< Upgrading MetaStore schema from 1.9.0 to 1.11.0 >' AS ' ';
2+
\i 001-KYUUBI-7028.postgresql.sql
3+
SELECT '< Finished upgrading MetaStore schema from 1.9.0 to 1.11.0 >' AS ' ';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
-- the kubernetes_metadata table ddl
2+
CREATE TABLE IF NOT EXISTS kubernetes_metadata(
3+
key_id INTEGER PRIMARY KEY AUTOINCREMENT, -- the auto increment key id
4+
identifier varchar(36) NOT NULL, -- the identifier id, which is an UUID
5+
context varchar(32), -- the kubernetes context
6+
namespace varchar(255), -- the kubernetes namespace
7+
pod_name varchar(255) NOT NULL, -- the kubernetes pod name
8+
app_id varchar(128), -- the application id
9+
app_state varchar(32), -- the application state
10+
app_error mediumtext, -- the application diagnose
11+
create_time bigint, -- the metadata create time
12+
update_time bigint -- the metadata update time
13+
);
14+
15+
CREATE UNIQUE INDEX IF NOT EXISTS kubernetes_metadata_unique_identifier_index ON kubernetes_metadata(identifier);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
-- the metadata table ddl
2+
3+
CREATE TABLE IF NOT EXISTS metadata(
4+
key_id INTEGER PRIMARY KEY AUTOINCREMENT, -- the auto increment key id
5+
identifier varchar(36) NOT NULL, -- the identifier id, which is an UUID
6+
session_type varchar(32) NOT NULL, -- the session type, SQL or BATCH
7+
real_user varchar(255) NOT NULL, -- the real user
8+
user_name varchar(255) NOT NULL, -- the user name, might be a proxy user
9+
ip_address varchar(128), -- the client ip address
10+
kyuubi_instance varchar(1024), -- the kyuubi instance that creates this
11+
state varchar(128) NOT NULL, -- the session state
12+
resource varchar(1024), -- the main resource
13+
class_name varchar(1024), -- the main class name
14+
request_name varchar(1024), -- the request name
15+
request_conf mediumtext, -- the request config map
16+
request_args mediumtext, -- the request arguments
17+
create_time BIGINT NOT NULL, -- the metadata create time
18+
engine_type varchar(32) NOT NULL, -- the engine type
19+
cluster_manager varchar(128), -- the engine cluster manager
20+
engine_open_time bigint, -- the engine open time
21+
engine_id varchar(128), -- the engine application id
22+
engine_name mediumtext, -- the engine application name
23+
engine_url varchar(1024), -- the engine tracking url
24+
engine_state varchar(32), -- the engine application state
25+
engine_error mediumtext, -- the engine application diagnose
26+
end_time bigint, -- the metadata end time
27+
priority INTEGER NOT NULL DEFAULT 10, -- the application priority, high value means high priority
28+
peer_instance_closed boolean default '0' -- closed by peer kyuubi instance
29+
);
30+
31+
CREATE UNIQUE INDEX IF NOT EXISTS metadata_unique_identifier_index ON metadata(identifier);
32+
33+
CREATE INDEX IF NOT EXISTS metadata_user_name_index ON metadata(user_name);
34+
35+
CREATE INDEX IF NOT EXISTS metadata_engine_type_index ON metadata(engine_type);
36+
37+
CREATE INDEX IF NOT EXISTS metadata_create_time_index ON metadata(create_time);
38+
39+
CREATE INDEX IF NOT EXISTS metadata_priority_create_time_index ON metadata(priority, create_time);
40+
41+
-- the kubernetes_metadata table ddl
42+
CREATE TABLE IF NOT EXISTS kubernetes_metadata(
43+
key_id INTEGER PRIMARY KEY AUTOINCREMENT, -- the auto increment key id
44+
identifier varchar(36) NOT NULL, -- the identifier id, which is an UUID
45+
context varchar(32), -- the kubernetes context
46+
namespace varchar(255), -- the kubernetes namespace
47+
pod_name varchar(255) NOT NULL, -- the kubernetes pod name
48+
app_id varchar(128), -- the application id
49+
app_state varchar(32), -- the application state
50+
app_error mediumtext, -- the application diagnose
51+
create_time bigint, -- the metadata create time
52+
update_time bigint -- the metadata update time
53+
);
54+
55+
CREATE UNIQUE INDEX IF NOT EXISTS kubernetes_metadata_unique_identifier_index ON kubernetes_metadata(identifier);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
SELECT '< Upgrading MetaStore schema from 1.8.0 to 1.11.0 >' AS ' ';
2+
.read 001-KYUUBI-7028.sqlite.sql
3+
SELECT '< Finished upgrading MetaStore schema from 1.8.0 to 1.11.0 >' AS ' ';

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala

+24-7
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState
3838
import org.apache.kyuubi.engine.KubernetesResourceEventTypes.KubernetesResourceEventType
3939
import org.apache.kyuubi.operation.OperationState
4040
import org.apache.kyuubi.server.KyuubiServer
41+
import org.apache.kyuubi.server.metadata.api.KubernetesMetadata
4142
import org.apache.kyuubi.session.KyuubiSessionManager
4243
import org.apache.kyuubi.util.{KubernetesUtils, ThreadUtils}
4344

@@ -251,8 +252,13 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
251252
try {
252253
// need to initialize the kubernetes client if not exists
253254
getOrCreateKubernetesClient(appMgrInfo.kubernetesInfo)
254-
val (_, appInfo) =
255-
appInfoStore.getOrDefault(tag, appMgrInfo.kubernetesInfo -> ApplicationInfo.NOT_FOUND)
255+
val appInfo = appInfoStore.get(tag) match {
256+
case (_, info) => info
257+
case _ =>
258+
// try to get the application info from kubernetes metadata
259+
metadataManager.flatMap(
260+
_.getKubernetesApplicationInfo(tag)).getOrElse(ApplicationInfo.NOT_FOUND)
261+
}
256262
(appInfo.state, submitTime) match {
257263
// Kyuubi should wait second if pod is not be created
258264
case (NOT_FOUND, Some(_submitTime)) =>
@@ -336,7 +342,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
336342
updateApplicationState(kubernetesInfo, newPod, eventType)
337343
val appState = toApplicationState(newPod, appStateSource, appStateContainer, eventType)
338344
if (isTerminated(appState)) {
339-
markApplicationTerminated(newPod, eventType)
345+
markApplicationTerminated(kubernetesInfo, newPod, eventType)
340346
}
341347
KubernetesApplicationAuditLogger.audit(
342348
eventType,
@@ -354,7 +360,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
354360
if (isSparkEnginePod(pod)) {
355361
val eventType = KubernetesResourceEventTypes.DELETE
356362
updateApplicationState(kubernetesInfo, pod, eventType)
357-
markApplicationTerminated(pod, eventType)
363+
markApplicationTerminated(kubernetesInfo, pod, eventType)
358364
KubernetesApplicationAuditLogger.audit(
359365
eventType,
360366
kubernetesInfo,
@@ -450,13 +456,24 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
450456
}
451457

452458
private def markApplicationTerminated(
459+
kubernetesInfo: KubernetesInfo,
453460
pod: Pod,
454461
eventType: KubernetesResourceEventType): Unit = synchronized {
455462
val key = pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)
463+
val (appState, appError) =
464+
toApplicationStateAndError(pod, appStateSource, appStateContainer, eventType)
465+
// upsert the kubernetes metadata when the application is terminated
466+
metadataManager.foreach(_.upsertKubernetesMetadata(
467+
KubernetesMetadata(
468+
identifier = key,
469+
context = kubernetesInfo.context,
470+
namespace = kubernetesInfo.namespace,
471+
podName = pod.getMetadata.getName,
472+
appId = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL),
473+
appState = appState.toString,
474+
appError = appError)))
456475
if (cleanupTerminatedAppInfoTrigger.getIfPresent(key) == null) {
457-
cleanupTerminatedAppInfoTrigger.put(
458-
key,
459-
toApplicationState(pod, appStateSource, appStateContainer, eventType))
476+
cleanupTerminatedAppInfoTrigger.put(key, appState)
460477
}
461478
}
462479

0 commit comments

Comments
 (0)