From d167623c19e2c59190ed812d0838a0fce5627bfe Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Tue, 15 Apr 2025 22:22:24 -0700 Subject: [PATCH 1/8] migration migrate upsert app app name Add app name column comments --- .rat-excludes | 1 + .../sql/mysql/006-KYUUBI-7028.mysql.sql | 18 ++ .../metadata-store-schema-1.11.0.mysql.sql | 52 +++++ .../mysql/upgrade-1.8.0-to-1.11.0.mysql.sql | 3 + .../postgresql/001-KYUUBI-7028.postgresql.sql | 33 +++ ...etadata-store-schema-1.11.0.postgresql.sql | 91 +++++++++ .../upgrade-1.9.0-to-1.11.0.postgresql.sql | 3 + .../sql/sqlite/001-KYUUBI-7028.sqlite.sql | 18 ++ .../metadata-store-schema-1.11.0.sqlite.sql | 58 ++++++ .../sqlite/upgrade-1.8.0-to-1.11.0.sqlite.sql | 3 + .../KubernetesApplicationOperation.scala | 35 +++- .../server/metadata/MetadataManager.scala | 36 +++- .../server/metadata/MetadataStore.scala | 25 ++- .../metadata/api/KubernetesEngineInfo.scala | 48 +++++ .../metadata/jdbc/JDBCMetadataStore.scala | 190 +++++++++++++++++- .../jdbc/JDBCMetadataStoreSuite.scala | 73 ++++++- 16 files changed, 676 insertions(+), 11 deletions(-) create mode 100644 kyuubi-server/src/main/resources/sql/mysql/006-KYUUBI-7028.mysql.sql create mode 100644 kyuubi-server/src/main/resources/sql/mysql/metadata-store-schema-1.11.0.mysql.sql create mode 100644 kyuubi-server/src/main/resources/sql/mysql/upgrade-1.8.0-to-1.11.0.mysql.sql create mode 100644 kyuubi-server/src/main/resources/sql/postgresql/001-KYUUBI-7028.postgresql.sql create mode 100644 kyuubi-server/src/main/resources/sql/postgresql/metadata-store-schema-1.11.0.postgresql.sql create mode 100644 kyuubi-server/src/main/resources/sql/postgresql/upgrade-1.9.0-to-1.11.0.postgresql.sql create mode 100644 kyuubi-server/src/main/resources/sql/sqlite/001-KYUUBI-7028.sqlite.sql create mode 100644 kyuubi-server/src/main/resources/sql/sqlite/metadata-store-schema-1.11.0.sqlite.sql create mode 100644 kyuubi-server/src/main/resources/sql/sqlite/upgrade-1.8.0-to-1.11.0.sqlite.sql create mode 100644 kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/KubernetesEngineInfo.scala diff --git a/.rat-excludes b/.rat-excludes index 431327294c3..f7a57ec3c9d 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -43,6 +43,7 @@ build/scala-*/** **/metadata-store-schema*.sql **/*.derby.sql **/*.mysql.sql +**/*.postgresql.sql **/*.sqlite.sql **/node/** **/web-ui/dist/** diff --git a/kyuubi-server/src/main/resources/sql/mysql/006-KYUUBI-7028.mysql.sql b/kyuubi-server/src/main/resources/sql/mysql/006-KYUUBI-7028.mysql.sql new file mode 100644 index 00000000000..f6ff28863c3 --- /dev/null +++ b/kyuubi-server/src/main/resources/sql/mysql/006-KYUUBI-7028.mysql.sql @@ -0,0 +1,18 @@ +SELECT '< KYUUBI-7028: Persist Kubernetes metadata into metastore' AS ' '; + +CREATE TABLE IF NOT EXISTS k8s_engine_info( + key_id bigint PRIMARY KEY AUTO_INCREMENT COMMENT 'the auto increment key id', + identifier varchar(36) NOT NULL COMMENT 'the identifier id, which is an UUID', + context varchar(32) COMMENT 'the kubernetes context', + namespace varchar(255) COMMENT 'the kubernetes namespace', + pod_name varchar(255) NOT NULL COMMENT 'the kubernetes pod name', + pod_state varchar(32) COMMENT 'the kubernetes pod state', + container_state mediumtext COMMENT 'the kubernetes container state', + engine_id varchar(128) COMMENT 'the engine id', + engine_name mediumtext COMMENT 'the engine name', + engine_state varchar(32) COMMENT 'the engine state', + engine_error mediumtext COMMENT 'the engine diagnose', + create_time bigint COMMENT 'the metadata create time', + update_time bigint COMMENT 'the metadata update time', + UNIQUE INDEX unique_identifier_index(identifier) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/kyuubi-server/src/main/resources/sql/mysql/metadata-store-schema-1.11.0.mysql.sql b/kyuubi-server/src/main/resources/sql/mysql/metadata-store-schema-1.11.0.mysql.sql new file mode 100644 index 00000000000..58eda600aad --- /dev/null +++ b/kyuubi-server/src/main/resources/sql/mysql/metadata-store-schema-1.11.0.mysql.sql @@ -0,0 +1,52 @@ +-- the metadata table ddl + +CREATE TABLE IF NOT EXISTS metadata( + key_id bigint PRIMARY KEY AUTO_INCREMENT COMMENT 'the auto increment key id', + identifier varchar(36) NOT NULL COMMENT 'the identifier id, which is an UUID', + session_type varchar(32) NOT NULL COMMENT 'the session type, SQL or BATCH', + real_user varchar(255) NOT NULL COMMENT 'the real user', + user_name varchar(255) NOT NULL COMMENT 'the user name, might be a proxy user', + ip_address varchar(128) COMMENT 'the client ip address', + kyuubi_instance varchar(1024) COMMENT 'the kyuubi instance that creates this', + state varchar(128) NOT NULL COMMENT 'the session state', + resource varchar(1024) COMMENT 'the main resource', + class_name varchar(1024) COMMENT 'the main class name', + request_name varchar(1024) COMMENT 'the request name', + request_conf mediumtext COMMENT 'the request config map', + request_args mediumtext COMMENT 'the request arguments', + create_time BIGINT NOT NULL COMMENT 'the metadata create time', + engine_type varchar(32) NOT NULL COMMENT 'the engine type', + cluster_manager varchar(128) COMMENT 'the engine cluster manager', + engine_open_time bigint COMMENT 'the engine open time', + engine_id varchar(128) COMMENT 'the engine application id', + engine_name mediumtext COMMENT 'the engine application name', + engine_url varchar(1024) COMMENT 'the engine tracking url', + engine_state varchar(32) COMMENT 'the engine application state', + engine_error mediumtext COMMENT 'the engine application diagnose', + end_time bigint COMMENT 'the metadata end time', + priority int NOT NULL DEFAULT 10 COMMENT 'the application priority, high value means high priority', + peer_instance_closed boolean default '0' COMMENT 'closed by peer kyuubi instance', + UNIQUE INDEX unique_identifier_index(identifier), + INDEX user_name_index(user_name), + INDEX engine_type_index(engine_type), + INDEX create_time_index(create_time), + -- See more detail about this index in ./005-KYUUBI-5327.mysql.sql + INDEX priority_create_time_index(priority DESC, create_time ASC) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +CREATE TABLE IF NOT EXISTS k8s_engine_info( + key_id bigint PRIMARY KEY AUTO_INCREMENT COMMENT 'the auto increment key id', + identifier varchar(36) NOT NULL COMMENT 'the identifier id, which is an UUID', + context varchar(32) COMMENT 'the kubernetes context', + namespace varchar(255) COMMENT 'the kubernetes namespace', + pod_name varchar(255) NOT NULL COMMENT 'the kubernetes pod name', + pod_state varchar(32) COMMENT 'the kubernetes pod state', + container_state mediumtext COMMENT 'the kubernetes container state', + engine_id varchar(128) COMMENT 'the engine id', + engine_name mediumtext COMMENT 'the engine name', + engine_state varchar(32) COMMENT 'the engine state', + engine_error mediumtext COMMENT 'the engine diagnose', + create_time bigint COMMENT 'the metadata create time', + update_time bigint COMMENT 'the metadata update time', + UNIQUE INDEX unique_identifier_index(identifier) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/kyuubi-server/src/main/resources/sql/mysql/upgrade-1.8.0-to-1.11.0.mysql.sql b/kyuubi-server/src/main/resources/sql/mysql/upgrade-1.8.0-to-1.11.0.mysql.sql new file mode 100644 index 00000000000..02d46e8a00b --- /dev/null +++ b/kyuubi-server/src/main/resources/sql/mysql/upgrade-1.8.0-to-1.11.0.mysql.sql @@ -0,0 +1,3 @@ +SELECT '< Upgrading MetaStore schema from 1.8.0 to 1.11.0 >' AS ' '; +SOURCE 006-KYUUBI-7028.mysql.sql; +SELECT '< Finished upgrading MetaStore schema from 1.8.0 to 1.11.0 >' AS ' '; diff --git a/kyuubi-server/src/main/resources/sql/postgresql/001-KYUUBI-7028.postgresql.sql b/kyuubi-server/src/main/resources/sql/postgresql/001-KYUUBI-7028.postgresql.sql new file mode 100644 index 00000000000..c2773188520 --- /dev/null +++ b/kyuubi-server/src/main/resources/sql/postgresql/001-KYUUBI-7028.postgresql.sql @@ -0,0 +1,33 @@ +SELECT '< KYUUBI-7028: Persist Kubernetes metadata into metastore' AS ' '; + +CREATE TABLE IF NOT EXISTS k8s_engine_info( + key_id bigserial PRIMARY KEY, + identifier varchar(36) NOT NULL, + context varchar(32), + namespace varchar(255), + pod_name varchar(255) NOT NULL, + pod_state varchar(32), + container_state text, + engine_id varchar(128), + engine_name text, + engine_state varchar(32), + engine_error text, + create_time bigint NOT NULL, + update_time bigint NOT NULL + ); + +COMMENT ON COLUMN k8s_engine_info.key_id IS 'the auto increment key id'; +COMMENT ON COLUMN k8s_engine_info.identifier IS 'the identifier id, which is an UUID'; +COMMENT ON COLUMN k8s_engine_info.context IS 'the kubernetes context'; +COMMENT ON COLUMN k8s_engine_info.namespace IS 'the kubernetes namespace'; +COMMENT ON COLUMN k8s_engine_info.pod_name IS 'the kubernetes pod name'; +COMMENT ON COLUMN k8s_engine_info.pod_state IS 'the kubernetes pod state'; +COMMENT ON COLUMN k8s_engine_info.container_state IS 'the kubernetes container state'; +COMMENT ON COLUMN k8s_engine_info.engine_id IS 'the engine id'; +COMMENT ON COLUMN k8s_engine_info.engine_name IS 'the engine name'; +COMMENT ON COLUMN k8s_engine_info.engine_state IS 'the engine state'; +COMMENT ON COLUMN k8s_engine_info.engine_error IS 'the engine diagnose'; +COMMENT ON COLUMN k8s_engine_info.create_time IS 'the metadata create time'; +COMMENT ON COLUMN k8s_engine_info.update_time IS 'the metadata update time'; + +CREATE UNIQUE INDEX IF NOT EXISTS k8s_engine_info_unique_identifier_index ON k8s_engine_info(identifier); diff --git a/kyuubi-server/src/main/resources/sql/postgresql/metadata-store-schema-1.11.0.postgresql.sql b/kyuubi-server/src/main/resources/sql/postgresql/metadata-store-schema-1.11.0.postgresql.sql new file mode 100644 index 00000000000..3d351089cb1 --- /dev/null +++ b/kyuubi-server/src/main/resources/sql/postgresql/metadata-store-schema-1.11.0.postgresql.sql @@ -0,0 +1,91 @@ +CREATE TABLE IF NOT EXISTS metadata( + key_id bigserial PRIMARY KEY, + identifier varchar(36) NOT NULL, + session_type varchar(32) NOT NULL, + real_user varchar(255) NOT NULL, + user_name varchar(255) NOT NULL, + ip_address varchar(128), + kyuubi_instance varchar(1024), + state varchar(128) NOT NULL, + resource varchar(1024), + class_name varchar(1024), + request_name varchar(1024), + request_conf text, + request_args text, + create_time bigint NOT NULL, + engine_type varchar(32) NOT NULL, + cluster_manager varchar(128), + engine_open_time bigint, + engine_id varchar(128), + engine_name text, + engine_url varchar(1024), + engine_state varchar(32), + engine_error text, + end_time bigint, + priority int NOT NULL DEFAULT 10, + peer_instance_closed boolean DEFAULT FALSE +); + +COMMENT ON COLUMN metadata.key_id IS 'the auto increment key id'; +COMMENT ON COLUMN metadata.identifier IS 'the identifier id, which is an UUID'; +COMMENT ON COLUMN metadata.session_type IS 'the session type, SQL or BATCH'; +COMMENT ON COLUMN metadata.real_user IS 'the real user'; +COMMENT ON COLUMN metadata.user_name IS 'the user name, might be a proxy user'; +COMMENT ON COLUMN metadata.ip_address IS 'the client ip address'; +COMMENT ON COLUMN metadata.kyuubi_instance IS 'the kyuubi instance that creates this'; +COMMENT ON COLUMN metadata.state IS 'the session state'; +COMMENT ON COLUMN metadata.resource IS 'the main resource'; +COMMENT ON COLUMN metadata.class_name IS 'the main class name'; +COMMENT ON COLUMN metadata.request_name IS 'the request name'; +COMMENT ON COLUMN metadata.request_conf IS 'the request config map'; +COMMENT ON COLUMN metadata.request_args IS 'the request arguments'; +COMMENT ON COLUMN metadata.create_time IS 'the metadata create time'; +COMMENT ON COLUMN metadata.engine_type IS 'the engine type'; +COMMENT ON COLUMN metadata.cluster_manager IS 'the engine cluster manager'; +COMMENT ON COLUMN metadata.engine_open_time IS 'the engine open time'; +COMMENT ON COLUMN metadata.engine_id IS 'the engine application id'; +COMMENT ON COLUMN metadata.engine_name IS 'the engine application name'; +COMMENT ON COLUMN metadata.engine_url IS 'the engine tracking url'; +COMMENT ON COLUMN metadata.engine_state IS 'the engine application state'; +COMMENT ON COLUMN metadata.engine_error IS 'the engine application diagnose'; +COMMENT ON COLUMN metadata.end_time IS 'the metadata end time'; +COMMENT ON COLUMN metadata.priority IS 'the application priority, high value means high priority'; +COMMENT ON COLUMN metadata.peer_instance_closed IS 'closed by peer kyuubi instance'; + +CREATE UNIQUE INDEX IF NOT EXISTS unique_identifier_index ON metadata(identifier); +CREATE INDEX IF NOT EXISTS user_name_index ON metadata(user_name); +CREATE INDEX IF NOT EXISTS engine_type_index ON metadata(engine_type); +CREATE INDEX IF NOT EXISTS create_time_index ON metadata(create_time); +CREATE INDEX IF NOT EXISTS priority_create_time_index ON metadata(priority DESC, create_time ASC); + +CREATE TABLE IF NOT EXISTS k8s_engine_info( + key_id bigserial PRIMARY KEY, + identifier varchar(36) NOT NULL, + context varchar(32), + namespace varchar(255), + pod_name varchar(255) NOT NULL, + pod_state varchar(32), + container_state text, + engine_id varchar(128), + engine_name text, + engine_state varchar(32), + engine_error text, + create_time bigint NOT NULL, + update_time bigint NOT NULL +); + +COMMENT ON COLUMN k8s_engine_info.key_id IS 'the auto increment key id'; +COMMENT ON COLUMN k8s_engine_info.identifier IS 'the identifier id, which is an UUID'; +COMMENT ON COLUMN k8s_engine_info.context IS 'the kubernetes context'; +COMMENT ON COLUMN k8s_engine_info.namespace IS 'the kubernetes namespace'; +COMMENT ON COLUMN k8s_engine_info.pod_name IS 'the kubernetes pod name'; +COMMENT ON COLUMN k8s_engine_info.pod_state IS 'the kubernetes pod state'; +COMMENT ON COLUMN k8s_engine_info.container_state IS 'the kubernetes container state'; +COMMENT ON COLUMN k8s_engine_info.engine_id IS 'the engine id'; +COMMENT ON COLUMN k8s_engine_info.engine_name IS 'the engine name'; +COMMENT ON COLUMN k8s_engine_info.engine_state IS 'the engine state'; +COMMENT ON COLUMN k8s_engine_info.engine_error IS 'the engine diagnose'; +COMMENT ON COLUMN k8s_engine_info.create_time IS 'the metadata create time'; +COMMENT ON COLUMN k8s_engine_info.update_time IS 'the metadata update time'; + +CREATE UNIQUE INDEX IF NOT EXISTS k8s_engine_info_unique_identifier_index ON k8s_engine_info(identifier); diff --git a/kyuubi-server/src/main/resources/sql/postgresql/upgrade-1.9.0-to-1.11.0.postgresql.sql b/kyuubi-server/src/main/resources/sql/postgresql/upgrade-1.9.0-to-1.11.0.postgresql.sql new file mode 100644 index 00000000000..48dea7959aa --- /dev/null +++ b/kyuubi-server/src/main/resources/sql/postgresql/upgrade-1.9.0-to-1.11.0.postgresql.sql @@ -0,0 +1,3 @@ +SELECT '< Upgrading MetaStore schema from 1.9.0 to 1.11.0 >' AS ' '; +\i 001-KYUUBI-7028.postgresql.sql +SELECT '< Finished upgrading MetaStore schema from 1.9.0 to 1.11.0 >' AS ' '; diff --git a/kyuubi-server/src/main/resources/sql/sqlite/001-KYUUBI-7028.sqlite.sql b/kyuubi-server/src/main/resources/sql/sqlite/001-KYUUBI-7028.sqlite.sql new file mode 100644 index 00000000000..44bf6311643 --- /dev/null +++ b/kyuubi-server/src/main/resources/sql/sqlite/001-KYUUBI-7028.sqlite.sql @@ -0,0 +1,18 @@ +-- the k8s_engine_info table ddl +CREATE TABLE IF NOT EXISTS k8s_engine_info( + key_id INTEGER PRIMARY KEY AUTOINCREMENT, -- the auto increment key id + identifier varchar(36) NOT NULL, -- the identifier id, which is an UUID + context varchar(32), -- the kubernetes context + namespace varchar(255), -- the kubernetes namespace + pod_name varchar(255) NOT NULL, -- the kubernetes pod name + pod_state varchar(32), -- the kubernetes pod state + container_state mediumtext, -- the kubernetes container state + engine_id varchar(128), -- the engine id + engine_name mediumtext, -- the engine name + engine_state varchar(32), -- the engine state + engine_error mediumtext, -- the engine diagnose + create_time bigint, -- the metadata create time + update_time bigint -- the metadata update time +); + +CREATE UNIQUE INDEX IF NOT EXISTS k8s_engine_info_unique_identifier_index ON k8s_engine_info(identifier); diff --git a/kyuubi-server/src/main/resources/sql/sqlite/metadata-store-schema-1.11.0.sqlite.sql b/kyuubi-server/src/main/resources/sql/sqlite/metadata-store-schema-1.11.0.sqlite.sql new file mode 100644 index 00000000000..58a9f1fa9da --- /dev/null +++ b/kyuubi-server/src/main/resources/sql/sqlite/metadata-store-schema-1.11.0.sqlite.sql @@ -0,0 +1,58 @@ +-- the metadata table ddl + +CREATE TABLE IF NOT EXISTS metadata( + key_id INTEGER PRIMARY KEY AUTOINCREMENT, -- the auto increment key id + identifier varchar(36) NOT NULL, -- the identifier id, which is an UUID + session_type varchar(32) NOT NULL, -- the session type, SQL or BATCH + real_user varchar(255) NOT NULL, -- the real user + user_name varchar(255) NOT NULL, -- the user name, might be a proxy user + ip_address varchar(128), -- the client ip address + kyuubi_instance varchar(1024), -- the kyuubi instance that creates this + state varchar(128) NOT NULL, -- the session state + resource varchar(1024), -- the main resource + class_name varchar(1024), -- the main class name + request_name varchar(1024), -- the request name + request_conf mediumtext, -- the request config map + request_args mediumtext, -- the request arguments + create_time BIGINT NOT NULL, -- the metadata create time + engine_type varchar(32) NOT NULL, -- the engine type + cluster_manager varchar(128), -- the engine cluster manager + engine_open_time bigint, -- the engine open time + engine_id varchar(128), -- the engine application id + engine_name mediumtext, -- the engine application name + engine_url varchar(1024), -- the engine tracking url + engine_state varchar(32), -- the engine application state + engine_error mediumtext, -- the engine application diagnose + end_time bigint, -- the metadata end time + priority INTEGER NOT NULL DEFAULT 10, -- the application priority, high value means high priority + peer_instance_closed boolean default '0' -- closed by peer kyuubi instance +); + +CREATE UNIQUE INDEX IF NOT EXISTS metadata_unique_identifier_index ON metadata(identifier); + +CREATE INDEX IF NOT EXISTS metadata_user_name_index ON metadata(user_name); + +CREATE INDEX IF NOT EXISTS metadata_engine_type_index ON metadata(engine_type); + +CREATE INDEX IF NOT EXISTS metadata_create_time_index ON metadata(create_time); + +CREATE INDEX IF NOT EXISTS metadata_priority_create_time_index ON metadata(priority, create_time); + +-- the k8s_engine_info table ddl +CREATE TABLE IF NOT EXISTS k8s_engine_info( + key_id INTEGER PRIMARY KEY AUTOINCREMENT, -- the auto increment key id + identifier varchar(36) NOT NULL, -- the identifier id, which is an UUID + context varchar(32), -- the kubernetes context + namespace varchar(255), -- the kubernetes namespace + pod_name varchar(255) NOT NULL, -- the kubernetes pod name + pod_state varchar(32), -- the kubernetes pod state + container_state mediumtext, -- the kubernetes container state + engine_id varchar(128), -- the engine id + engine_name mediumtext, -- the engine name + engine_state varchar(32), -- the engine state + engine_error mediumtext, -- the engine diagnose + create_time bigint, -- the metadata create time + update_time bigint -- the metadata update time +); + +CREATE UNIQUE INDEX IF NOT EXISTS k8s_engine_info_unique_identifier_index ON k8s_engine_info(identifier); diff --git a/kyuubi-server/src/main/resources/sql/sqlite/upgrade-1.8.0-to-1.11.0.sqlite.sql b/kyuubi-server/src/main/resources/sql/sqlite/upgrade-1.8.0-to-1.11.0.sqlite.sql new file mode 100644 index 00000000000..c759b0bc307 --- /dev/null +++ b/kyuubi-server/src/main/resources/sql/sqlite/upgrade-1.8.0-to-1.11.0.sqlite.sql @@ -0,0 +1,3 @@ +SELECT '< Upgrading MetaStore schema from 1.8.0 to 1.11.0 >' AS ' '; +.read 001-KYUUBI-7028.sqlite.sql +SELECT '< Finished upgrading MetaStore schema from 1.8.0 to 1.11.0 >' AS ' '; diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index c53dda0e36c..116afd50dab 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -38,6 +38,7 @@ import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState import org.apache.kyuubi.engine.KubernetesResourceEventTypes.KubernetesResourceEventType import org.apache.kyuubi.operation.OperationState import org.apache.kyuubi.server.metadata.MetadataManager +import org.apache.kyuubi.server.metadata.api.KubernetesEngineInfo import org.apache.kyuubi.util.{KubernetesUtils, ThreadUtils} class KubernetesApplicationOperation extends ApplicationOperation with Logging { @@ -255,8 +256,13 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { try { // need to initialize the kubernetes client if not exists getOrCreateKubernetesClient(appMgrInfo.kubernetesInfo) - val (_, appInfo) = - appInfoStore.getOrDefault(tag, appMgrInfo.kubernetesInfo -> ApplicationInfo.NOT_FOUND) + val appInfo = appInfoStore.get(tag) match { + case (_, info) => info + case _ => + // try to get the application info from kubernetes engine info store + metadataManager.flatMap( + _.getKubernetesApplicationInfo(tag)).getOrElse(ApplicationInfo.NOT_FOUND) + } (appInfo.state, submitTime) match { // Kyuubi should wait second if pod is not be created case (NOT_FOUND, Some(_submitTime)) => @@ -340,7 +346,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { updateApplicationState(kubernetesInfo, newPod, eventType) val appState = toApplicationState(newPod, appStateSource, appStateContainer, eventType) if (isTerminated(appState)) { - markApplicationTerminated(newPod, eventType) + markApplicationTerminated(kubernetesInfo, newPod, eventType) } KubernetesApplicationAuditLogger.audit( eventType, @@ -358,7 +364,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { if (isSparkEnginePod(pod)) { val eventType = KubernetesResourceEventTypes.DELETE updateApplicationState(kubernetesInfo, pod, eventType) - markApplicationTerminated(pod, eventType) + markApplicationTerminated(kubernetesInfo, pod, eventType) KubernetesApplicationAuditLogger.audit( eventType, kubernetesInfo, @@ -456,13 +462,28 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { } private def markApplicationTerminated( + kubernetesInfo: KubernetesInfo, pod: Pod, eventType: KubernetesResourceEventType): Unit = synchronized { val key = pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY) + val (appState, appError) = + toApplicationStateAndError(pod, appStateSource, appStateContainer, eventType) + // upsert the kubernetes engine info store when the application is terminated + metadataManager.foreach(_.upsertKubernetesMetadata( + KubernetesEngineInfo( + identifier = key, + context = kubernetesInfo.context, + namespace = kubernetesInfo.namespace, + podName = pod.getMetadata.getName, + podState = pod.getStatus.getPhase, + containerState = pod.getStatus.getContainerStatuses.asScala.map(cs => + s"${cs.getName}->${cs.getState}").mkString(","), + engineId = getPodAppId(pod), + engineName = getPodAppName(pod), + engineState = appState.toString, + engineError = appError))) if (cleanupTerminatedAppInfoTrigger.getIfPresent(key) == null) { - cleanupTerminatedAppInfoTrigger.put( - key, - toApplicationState(pod, appStateSource, appStateContainer, eventType)) + cleanupTerminatedAppInfoTrigger.put(key, appState) } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala index 9ca1948dec6..fb9ec12704f 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala @@ -26,9 +26,10 @@ import org.apache.kyuubi.{KyuubiException, Logging} import org.apache.kyuubi.client.api.v1.dto.Batch import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.METADATA_MAX_AGE +import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationState} import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem} import org.apache.kyuubi.operation.OperationState -import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter} +import org.apache.kyuubi.server.metadata.api.{KubernetesEngineInfo, Metadata, MetadataFilter} import org.apache.kyuubi.service.AbstractService import org.apache.kyuubi.session.SessionType import org.apache.kyuubi.util.{ClassUtils, JdbcUtils, ThreadUtils} @@ -134,6 +135,15 @@ class MetadataManager extends AbstractService("MetadataManager") { .filter(_.sessionType == SessionType.BATCH) } + def getKubernetesApplicationInfo(identifier: String): Option[ApplicationInfo] = { + Option(withMetadataRequestMetrics(_metadataStore.getKubernetesMetaEngineInfo(identifier))).map { + metadata => + val appInfo = buildApplicationInfo(metadata) + info(s"Retrieve $appInfo from kubernetes engine info store: $metadata") + appInfo + } + } + def getBatches( filter: MetadataFilter, from: Int, @@ -204,16 +214,32 @@ class MetadataManager extends AbstractService("MetadataManager") { } } + def upsertKubernetesMetadata(kubernetesEngineInfo: KubernetesEngineInfo): Unit = { + try { + withMetadataRequestMetrics(_metadataStore.upsertKubernetesEngineInfo(kubernetesEngineInfo)) + } catch { + case e: Throwable => + error( + s"Error updating kubernetes engine info for session ${kubernetesEngineInfo.identifier}", + e) + } + } + def cleanupMetadataById(batchId: String): Unit = { withMetadataRequestMetrics(_metadataStore.cleanupMetadataByIdentifier(batchId)) } + def cleanupKubernetesMetadataById(identifier: String): Unit = { + withMetadataRequestMetrics(_metadataStore.cleanupKubernetesEngineInfoByIdentifier(identifier)) + } + private def startMetadataCleaner(): Unit = { val stateMaxAge = conf.get(METADATA_MAX_AGE) val interval = conf.get(KyuubiConf.METADATA_CLEANER_INTERVAL) val cleanerTask: Runnable = () => { try { withMetadataRequestMetrics(_metadataStore.cleanupMetadataByAge(stateMaxAge)) + withMetadataRequestMetrics(_metadataStore.cleanupKubernetesEngineInfoByAge(stateMaxAge)) } catch { case e: Throwable => error("Error cleaning up the metadata by age", e) } @@ -356,4 +382,12 @@ object MetadataManager extends Logging { batchMetadata.endTime, Map.empty[String, String].asJava) } + + def buildApplicationInfo(metadata: KubernetesEngineInfo): ApplicationInfo = { + ApplicationInfo( + id = metadata.engineId, + name = metadata.engineName, + state = ApplicationState.withName(metadata.engineState), + error = metadata.engineError) + } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala index b9fb52f779a..46a5505afe8 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.server.metadata import java.io.Closeable -import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter} +import org.apache.kyuubi.server.metadata.api.{KubernetesEngineInfo, Metadata, MetadataFilter} trait MetadataStore extends Closeable { @@ -82,6 +82,18 @@ trait MetadataStore extends Closeable { */ def updateMetadata(metadata: Metadata): Unit + /** + * Upsert the kubernetes engine info. Insert if not exists, otherwise update. + */ + def upsertKubernetesEngineInfo(engineInfo: KubernetesEngineInfo): Unit + + /** + * Get the persisted kubernetes engine info by unique identifier. + * @param identifier the identifier. + * @return selected engine info. + */ + def getKubernetesMetaEngineInfo(identifier: String): KubernetesEngineInfo + /** * Cleanup meta data by identifier. */ @@ -92,4 +104,15 @@ trait MetadataStore extends Closeable { * @param maxAge the batch state info maximum age. */ def cleanupMetadataByAge(maxAge: Long): Unit + + /** + * Cleanup kubernetes engine info by identifier. + */ + def cleanupKubernetesEngineInfoByIdentifier(identifier: String): Unit + + /** + * Check and cleanup the kubernetes engine info with maxAge limitation. + * @param maxAge the kubernetes engine info maximum age. + */ + def cleanupKubernetesEngineInfoByAge(maxAge: Long): Unit } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/KubernetesEngineInfo.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/KubernetesEngineInfo.scala new file mode 100644 index 00000000000..6e57b38d30e --- /dev/null +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/KubernetesEngineInfo.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.server.metadata.api + +/** + * The kubernetes engine info. + * + * @param identifier the kubernetes unique identifier. + * @param context the kubernetes context. + * @param namespace the kubernetes namespace. + * @param podName the kubernetes pod name. + * @param podState the kubernetes pod state. + * @param containerState the kubernetes container state. + * @param engineId the engine id. + * @param engineName the engine name. + * @param engineState the engine state. + * @param engineError the engine error diagnose. + * @param createTime the metadata create time. + * @param updateTime the metadata update time. + */ +case class KubernetesEngineInfo( + identifier: String, + context: Option[String], + namespace: Option[String], + podName: String, + podState: String, + containerState: String, + engineId: String, + engineName: String, + engineState: String, + engineError: Option[String], + createTime: Long = 0L, + updateTime: Long = 0L) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala index af965a220f6..66f75f3ba1d 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala @@ -35,7 +35,7 @@ import org.apache.kyuubi.{KyuubiException, Logging, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.operation.OperationState import org.apache.kyuubi.server.metadata.MetadataStore -import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter} +import org.apache.kyuubi.server.metadata.api.{KubernetesEngineInfo, Metadata, MetadataFilter} import org.apache.kyuubi.server.metadata.jdbc.DatabaseType._ import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf._ import org.apache.kyuubi.session.SessionType @@ -419,6 +419,142 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { } } + private def insertKubernetesEngineInfo(engineInfo: KubernetesEngineInfo): Unit = { + val insertQuery = + s""" + |INSERT INTO $KUBERNETES_ENGINE_INFO_TABLE( + |identifier, + |context, + |namespace, + |pod_name, + |pod_state, + |container_state, + |engine_id, + |engine_name, + |engine_state, + |engine_error, + |create_time, + |update_time + |) + |SELECT ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,? + |WHERE NOT EXISTS ( + | SELECT 1 FROM $KUBERNETES_ENGINE_INFO_TABLE WHERE identifier = ? + |) + |""".stripMargin + JdbcUtils.withConnection { connection => + val currentTime = System.currentTimeMillis() + execute( + connection, + insertQuery, + engineInfo.identifier, + engineInfo.context.orNull, + engineInfo.namespace.orNull, + engineInfo.podName, + engineInfo.podState, + engineInfo.containerState, + engineInfo.engineId, + engineInfo.engineName, + engineInfo.engineState, + engineInfo.engineError.orNull, + currentTime, + currentTime, + engineInfo.identifier) + } + } + + private def updateKubernetesEngineInfo(engineInfo: KubernetesEngineInfo): Unit = { + val queryBuilder = new StringBuilder + val params = ListBuffer[Any]() + + queryBuilder.append(s"UPDATE $KUBERNETES_ENGINE_INFO_TABLE") + val setClauses = ListBuffer[String]() + engineInfo.context.foreach { context => + setClauses += "context = ?" + params += context + } + engineInfo.namespace.foreach { namespace => + setClauses += "namespace = ?" + params += namespace + } + Option(engineInfo.podName).foreach { pod => + setClauses += "pod_name = ?" + params += pod + } + Option(engineInfo.podState).foreach { podState => + setClauses += "pod_state = ?" + params += podState + } + Option(engineInfo.containerState).foreach { containerState => + setClauses += "container_state = ?" + params += containerState + } + Option(engineInfo.engineId).foreach { appId => + setClauses += "engine_id = ?" + params += appId + } + Option(engineInfo.engineName).foreach { appName => + setClauses += "engine_name = ?" + params += appName + } + Option(engineInfo.engineState).foreach { appState => + setClauses += "engine_state = ?" + params += appState + } + engineInfo.engineError.foreach { appError => + setClauses += "engine_error = ?" + params += appError + } + setClauses += "update_time = ?" + params += System.currentTimeMillis() + + queryBuilder.append(setClauses.mkString(" SET ", ", ", "")) + queryBuilder.append(" WHERE identifier = ?") + params += engineInfo.identifier + + val query = queryBuilder.toString() + JdbcUtils.withConnection { connection => + withUpdateCount(connection, query, params.toSeq: _*) { updateCount => + if (updateCount == 0) { + throw new KyuubiException( + s"Error updating kubernetes engine info for ${engineInfo.identifier} by SQL: $query, " + + s"with params: ${params.mkString(", ")}") + } + } + } + + } + + override def upsertKubernetesEngineInfo(metadata: KubernetesEngineInfo): Unit = { + insertKubernetesEngineInfo(metadata) + updateKubernetesEngineInfo(metadata) + } + + override def getKubernetesMetaEngineInfo(identifier: String): KubernetesEngineInfo = { + val query = + s"SELECT $KUBERNETES_ENGINE_INFO_COLUMNS FROM" + + s" $KUBERNETES_ENGINE_INFO_TABLE WHERE identifier = ?" + JdbcUtils.withConnection { connection => + withResultSet(connection, query, identifier) { rs => + buildKubernetesMetadata(rs).headOption.orNull + } + } + } + + override def cleanupKubernetesEngineInfoByIdentifier(identifier: String): Unit = { + val query = s"DELETE FROM $KUBERNETES_ENGINE_INFO_TABLE WHERE identifier = ?" + JdbcUtils.withConnection { connection => + execute(connection, query, identifier) + } + } + + override def cleanupKubernetesEngineInfoByAge(maxAge: Long): Unit = { + val minUpdateTime = System.currentTimeMillis() - maxAge + val query = s"DELETE FROM $KUBERNETES_ENGINE_INFO_TABLE WHERE update_time < ?" + JdbcUtils.withConnection { connection => + execute(connection, query, minUpdateTime) + } + } + private def buildMetadata(resultSet: ResultSet): Seq[Metadata] = { try { val metadataList = ListBuffer[Metadata]() @@ -479,6 +615,44 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { } } + private def buildKubernetesMetadata(resultSet: ResultSet): Seq[KubernetesEngineInfo] = { + try { + val metadataList = ListBuffer[KubernetesEngineInfo]() + while (resultSet.next()) { + val identifier = resultSet.getString("identifier") + val context = Option(resultSet.getString("context")) + val namespace = Option(resultSet.getString("namespace")) + val pod = resultSet.getString("pod_name") + val podState = resultSet.getString("pod_state") + val containerState = resultSet.getString("container_state") + val appId = resultSet.getString("engine_id") + val appName = resultSet.getString("engine_name") + val appState = resultSet.getString("engine_state") + val appError = Option(resultSet.getString("engine_error")) + val createTime = resultSet.getLong("create_time") + val updateTime = resultSet.getLong("update_time") + + val metadata = KubernetesEngineInfo( + identifier = identifier, + context = context, + namespace = namespace, + podName = pod, + podState = podState, + containerState = containerState, + engineId = appId, + engineName = appName, + engineState = appState, + engineError = appError, + createTime = createTime, + updateTime = updateTime) + metadataList += metadata + } + metadataList.toSeq + } finally { + Utils.tryLogNonFatalError(resultSet.close()) + } + } + private def execute(conn: Connection, sql: String, params: Any*): Unit = { debug(s"execute sql: $sql, with params: ${params.mkString(", ")}") var statement: PreparedStatement = null @@ -610,4 +784,18 @@ object JDBCMetadataStore { "engine_error", "end_time", "peer_instance_closed").mkString(",") + private val KUBERNETES_ENGINE_INFO_TABLE = "k8s_engine_info" + private val KUBERNETES_ENGINE_INFO_COLUMNS = Seq( + "identifier", + "context", + "namespace", + "pod_name", + "pod_state", + "container_state", + "engine_id", + "engine_name", + "engine_state", + "engine_error", + "create_time", + "update_time").mkString(",") } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala index 897d21cbf15..1c55b9bc1cc 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala @@ -24,7 +24,9 @@ import org.scalatest.time.SpanSugar._ import org.apache.kyuubi.{KyuubiException, KyuubiFunSuite} import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter} +import org.apache.kyuubi.engine.ApplicationState +import org.apache.kyuubi.server.metadata.MetadataManager +import org.apache.kyuubi.server.metadata.api.{KubernetesEngineInfo, Metadata, MetadataFilter} import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf._ import org.apache.kyuubi.session.SessionType @@ -43,6 +45,7 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite { batch => jdbcMetadataStore.cleanupMetadataByIdentifier(batch.identifier) } + jdbcMetadataStore.cleanupKubernetesEngineInfoByAge(0) jdbcMetadataStore.close() } @@ -266,4 +269,72 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite { assert(jdbcMetadataStore.getLatestSchemaUrl(Seq(url1, url3, url4, url2)).get === url4) assert(jdbcMetadataStore.getLatestSchemaUrl(Seq(url1, url2, url3, url4, url5)).get === url5) } + + test("kubernetes engine info") { + val tag = UUID.randomUUID().toString + val metadata = KubernetesEngineInfo( + identifier = tag, + context = Some("context"), + namespace = Some("namespace"), + podName = "podName", + podState = "podState", + containerState = "containerState", + engineId = "appId", + engineName = "appName", + engineState = "FINISHED", + engineError = Some("appError")) + + jdbcMetadataStore.upsertKubernetesEngineInfo(metadata) + + val metadata2 = jdbcMetadataStore.getKubernetesMetaEngineInfo(tag) + assert(metadata2.identifier == metadata.identifier) + assert(metadata2.context == metadata.context) + assert(metadata2.namespace == metadata.namespace) + assert(metadata2.podName == metadata.podName) + assert(metadata2.podState == metadata.podState) + assert(metadata2.containerState == metadata.containerState) + assert(metadata2.engineId == metadata.engineId) + assert(metadata2.engineName == metadata.engineName) + assert(metadata2.engineState == metadata.engineState) + assert(metadata2.engineError == metadata.engineError) + assert(metadata2.createTime > 0) + assert(metadata2.updateTime > 0) + + val metadata3 = KubernetesEngineInfo( + identifier = tag, + context = Some("context2"), + namespace = Some("namespace2"), + podName = "podName2", + podState = "podState2", + containerState = "containerState2", + engineId = "appId2", + engineName = "appName2", + engineState = "FAILED", + engineError = Some("appError2")) + jdbcMetadataStore.upsertKubernetesEngineInfo(metadata3) + + val metadata4 = jdbcMetadataStore.getKubernetesMetaEngineInfo(tag) + assert(metadata4.identifier == metadata3.identifier) + assert(metadata4.context == metadata3.context) + assert(metadata4.namespace == metadata3.namespace) + assert(metadata4.podName == metadata3.podName) + assert(metadata4.podState == metadata3.podState) + assert(metadata4.containerState == metadata3.containerState) + assert(metadata4.engineId == metadata3.engineId) + assert(metadata4.engineName == metadata3.engineName) + assert(metadata4.engineState == metadata3.engineState) + assert(metadata4.engineError == metadata3.engineError) + assert(metadata4.createTime == metadata2.createTime) + assert(metadata4.updateTime > metadata2.updateTime) + + val applicationInfo = + MetadataManager.buildApplicationInfo(jdbcMetadataStore.getKubernetesMetaEngineInfo(tag)) + assert(applicationInfo.id == "appId2") + assert(applicationInfo.name == "appName2") + assert(applicationInfo.state == ApplicationState.FAILED) + assert(applicationInfo.error == Some("appError2")) + + jdbcMetadataStore.cleanupKubernetesEngineInfoByIdentifier(tag) + assert(jdbcMetadataStore.getKubernetesMetaEngineInfo(tag) == null) + } } From becf9d1a7a86f820867226ceeb0310f0df8c6f32 Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Thu, 24 Apr 2025 23:22:57 -0700 Subject: [PATCH 2/8] insert or replace --- .../metadata/jdbc/JDBCMetadataStore.scala | 51 ++++++++++++++++-- .../metadata/jdbc/JdbcDatabaseDialect.scala | 52 +++++++++++++++++-- 2 files changed, 95 insertions(+), 8 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala index 66f75f3ba1d..65c5ee71852 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala @@ -524,9 +524,35 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { } - override def upsertKubernetesEngineInfo(metadata: KubernetesEngineInfo): Unit = { - insertKubernetesEngineInfo(metadata) - updateKubernetesEngineInfo(metadata) + override def upsertKubernetesEngineInfo(engineInfo: KubernetesEngineInfo): Unit = { + dialect.insertOrReplace( + KUBERNETES_ENGINE_INFO_TABLE, + KUBERNETES_ENGINE_INFO_COLUMNS_TO_INSERT, + KUBERNETES_ENGINE_INFO_COLUMNS_TO_REPLACE, + KUBERNETES_ENGINE_INFO_KEY_COLUMN) match { + case Some(query) => + JdbcUtils.withConnection { connection => + val currentTime = System.currentTimeMillis() + execute( + connection, + query, + engineInfo.identifier, + engineInfo.context.orNull, + engineInfo.namespace.orNull, + engineInfo.podName, + engineInfo.podState, + engineInfo.containerState, + engineInfo.engineId, + engineInfo.engineName, + engineInfo.engineState, + engineInfo.engineError.orNull, + currentTime, + currentTime) + } + case None => + insertKubernetesEngineInfo(engineInfo) + updateKubernetesEngineInfo(engineInfo) + } } override def getKubernetesMetaEngineInfo(identifier: String): KubernetesEngineInfo = { @@ -785,7 +811,8 @@ object JDBCMetadataStore { "end_time", "peer_instance_closed").mkString(",") private val KUBERNETES_ENGINE_INFO_TABLE = "k8s_engine_info" - private val KUBERNETES_ENGINE_INFO_COLUMNS = Seq( + private val KUBERNETES_ENGINE_INFO_KEY_COLUMN = "identifier" + private val KUBERNETES_ENGINE_INFO_COLUMNS_TO_INSERT = Seq( "identifier", "context", "namespace", @@ -797,5 +824,19 @@ object JDBCMetadataStore { "engine_state", "engine_error", "create_time", - "update_time").mkString(",") + "update_time") + private val KUBERNETES_ENGINE_INFO_COLUMNS_TO_REPLACE = Seq( + "context", + "namespace", + "pod_name", + "pod_state", + "container_state", + "engine_id", + "engine_name", + "engine_state", + "engine_error", + "update_time") + private val KUBERNETES_ENGINE_INFO_COLUMNS = + KUBERNETES_ENGINE_INFO_COLUMNS_TO_INSERT.mkString(",") + } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala index c000b9b6743..9aebc04c731 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala @@ -19,6 +19,11 @@ package org.apache.kyuubi.server.metadata.jdbc trait JdbcDatabaseDialect { def limitClause(limit: Int, offset: Int): String + def insertOrReplace( + table: String, + colsToInsert: Seq[String], + colsToReplace: Seq[String], + keyCol: String): Option[String] = None } class GenericDatabaseDialect extends JdbcDatabaseDialect { @@ -27,6 +32,47 @@ class GenericDatabaseDialect extends JdbcDatabaseDialect { } } -class SQLiteDatabaseDialect extends GenericDatabaseDialect {} -class MySQLDatabaseDialect extends GenericDatabaseDialect {} -class PostgreSQLDatabaseDialect extends GenericDatabaseDialect {} +class SQLiteDatabaseDialect extends GenericDatabaseDialect { + override def insertOrReplace( + table: String, + colsToInsert: Seq[String], + colsToReplace: Seq[String], + keyCol: String): Option[String] = { + Some( + s""" + |INSERT OR REPLACE INTO $table (${colsToInsert.mkString(",")}) + |VALUES (${colsToInsert.map(_ => "?").mkString(",")}) + |""".stripMargin) + } +} +class MySQLDatabaseDialect extends GenericDatabaseDialect { + override def insertOrReplace( + table: String, + colsToInsert: Seq[String], + colsToReplace: Seq[String], + keyCol: String): Option[String] = { + Some( + s""" + |INSERT INTO $table (${colsToInsert.mkString(",")}) + |VALUES (${colsToInsert.map(_ => "?").mkString(",")}) + |ON DUPLICATE KEY UPDATE + |${colsToReplace.map(c => s"$c = VALUES($c)").mkString(",")} + |""".stripMargin) + } +} +class PostgreSQLDatabaseDialect extends GenericDatabaseDialect { + override def insertOrReplace( + table: String, + colsToInsert: Seq[String], + colsToReplace: Seq[String], + keyCol: String): Option[String] = { + Some( + s""" + |INSERT INTO $table (${colsToInsert.mkString(",")}) + |VALUES (${colsToInsert.map(_ => "?").mkString(",")}) + |ON CONFLICT ($keyCol) + |DO UPDATE SET + |${colsToReplace.map(c => s"$c = EXCLUDED.$c").mkString(",")} + |""".stripMargin) + } +} From 12c24b1d0bd2aa2aba9d16eea43aa79e2f3bc5ea Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Fri, 25 Apr 2025 00:00:01 -0700 Subject: [PATCH 3/8] do not use MYSQL deprecated VALUES(col) --- .../kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala | 2 +- .../kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala index 65c5ee71852..af9596a3044 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala @@ -436,7 +436,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { |create_time, |update_time |) - |SELECT ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,? + |SELECT ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? |WHERE NOT EXISTS ( | SELECT 1 FROM $KUBERNETES_ENGINE_INFO_TABLE WHERE identifier = ? |) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala index 9aebc04c731..168069ccefa 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala @@ -54,9 +54,9 @@ class MySQLDatabaseDialect extends GenericDatabaseDialect { Some( s""" |INSERT INTO $table (${colsToInsert.mkString(",")}) - |VALUES (${colsToInsert.map(_ => "?").mkString(",")}) + |VALUES (${colsToInsert.map(_ => "?").mkString(",")}) AS new |ON DUPLICATE KEY UPDATE - |${colsToReplace.map(c => s"$c = VALUES($c)").mkString(",")} + |${colsToReplace.map(c => s"$c = new.$c").mkString(",")} |""".stripMargin) } } From 327a0d594c9c470770b9f92079ce1a32ca627b72 Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Fri, 25 Apr 2025 10:45:28 -0700 Subject: [PATCH 4/8] Remove create_time from k8s engine info --- .../sql/mysql/006-KYUUBI-7028.mysql.sql | 1 - .../metadata-store-schema-1.11.0.mysql.sql | 1 - .../postgresql/001-KYUUBI-7028.postgresql.sql | 2 -- ...etadata-store-schema-1.11.0.postgresql.sql | 2 -- .../sql/sqlite/001-KYUUBI-7028.sqlite.sql | 1 - .../metadata-store-schema-1.11.0.sqlite.sql | 1 - .../metadata/api/KubernetesEngineInfo.scala | 2 -- .../metadata/jdbc/JDBCMetadataStore.scala | 20 +++++++------------ .../jdbc/JDBCMetadataStoreSuite.scala | 20 +++++++++++++++++-- 9 files changed, 25 insertions(+), 25 deletions(-) diff --git a/kyuubi-server/src/main/resources/sql/mysql/006-KYUUBI-7028.mysql.sql b/kyuubi-server/src/main/resources/sql/mysql/006-KYUUBI-7028.mysql.sql index f6ff28863c3..7a0314e6b0d 100644 --- a/kyuubi-server/src/main/resources/sql/mysql/006-KYUUBI-7028.mysql.sql +++ b/kyuubi-server/src/main/resources/sql/mysql/006-KYUUBI-7028.mysql.sql @@ -12,7 +12,6 @@ CREATE TABLE IF NOT EXISTS k8s_engine_info( engine_name mediumtext COMMENT 'the engine name', engine_state varchar(32) COMMENT 'the engine state', engine_error mediumtext COMMENT 'the engine diagnose', - create_time bigint COMMENT 'the metadata create time', update_time bigint COMMENT 'the metadata update time', UNIQUE INDEX unique_identifier_index(identifier) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/kyuubi-server/src/main/resources/sql/mysql/metadata-store-schema-1.11.0.mysql.sql b/kyuubi-server/src/main/resources/sql/mysql/metadata-store-schema-1.11.0.mysql.sql index 58eda600aad..e26d3b81ce1 100644 --- a/kyuubi-server/src/main/resources/sql/mysql/metadata-store-schema-1.11.0.mysql.sql +++ b/kyuubi-server/src/main/resources/sql/mysql/metadata-store-schema-1.11.0.mysql.sql @@ -46,7 +46,6 @@ CREATE TABLE IF NOT EXISTS k8s_engine_info( engine_name mediumtext COMMENT 'the engine name', engine_state varchar(32) COMMENT 'the engine state', engine_error mediumtext COMMENT 'the engine diagnose', - create_time bigint COMMENT 'the metadata create time', update_time bigint COMMENT 'the metadata update time', UNIQUE INDEX unique_identifier_index(identifier) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/kyuubi-server/src/main/resources/sql/postgresql/001-KYUUBI-7028.postgresql.sql b/kyuubi-server/src/main/resources/sql/postgresql/001-KYUUBI-7028.postgresql.sql index c2773188520..56988a7ae75 100644 --- a/kyuubi-server/src/main/resources/sql/postgresql/001-KYUUBI-7028.postgresql.sql +++ b/kyuubi-server/src/main/resources/sql/postgresql/001-KYUUBI-7028.postgresql.sql @@ -12,7 +12,6 @@ CREATE TABLE IF NOT EXISTS k8s_engine_info( engine_name text, engine_state varchar(32), engine_error text, - create_time bigint NOT NULL, update_time bigint NOT NULL ); @@ -27,7 +26,6 @@ COMMENT ON COLUMN k8s_engine_info.engine_id IS 'the engine id'; COMMENT ON COLUMN k8s_engine_info.engine_name IS 'the engine name'; COMMENT ON COLUMN k8s_engine_info.engine_state IS 'the engine state'; COMMENT ON COLUMN k8s_engine_info.engine_error IS 'the engine diagnose'; -COMMENT ON COLUMN k8s_engine_info.create_time IS 'the metadata create time'; COMMENT ON COLUMN k8s_engine_info.update_time IS 'the metadata update time'; CREATE UNIQUE INDEX IF NOT EXISTS k8s_engine_info_unique_identifier_index ON k8s_engine_info(identifier); diff --git a/kyuubi-server/src/main/resources/sql/postgresql/metadata-store-schema-1.11.0.postgresql.sql b/kyuubi-server/src/main/resources/sql/postgresql/metadata-store-schema-1.11.0.postgresql.sql index 3d351089cb1..2d6c1c4e290 100644 --- a/kyuubi-server/src/main/resources/sql/postgresql/metadata-store-schema-1.11.0.postgresql.sql +++ b/kyuubi-server/src/main/resources/sql/postgresql/metadata-store-schema-1.11.0.postgresql.sql @@ -70,7 +70,6 @@ CREATE TABLE IF NOT EXISTS k8s_engine_info( engine_name text, engine_state varchar(32), engine_error text, - create_time bigint NOT NULL, update_time bigint NOT NULL ); @@ -85,7 +84,6 @@ COMMENT ON COLUMN k8s_engine_info.engine_id IS 'the engine id'; COMMENT ON COLUMN k8s_engine_info.engine_name IS 'the engine name'; COMMENT ON COLUMN k8s_engine_info.engine_state IS 'the engine state'; COMMENT ON COLUMN k8s_engine_info.engine_error IS 'the engine diagnose'; -COMMENT ON COLUMN k8s_engine_info.create_time IS 'the metadata create time'; COMMENT ON COLUMN k8s_engine_info.update_time IS 'the metadata update time'; CREATE UNIQUE INDEX IF NOT EXISTS k8s_engine_info_unique_identifier_index ON k8s_engine_info(identifier); diff --git a/kyuubi-server/src/main/resources/sql/sqlite/001-KYUUBI-7028.sqlite.sql b/kyuubi-server/src/main/resources/sql/sqlite/001-KYUUBI-7028.sqlite.sql index 44bf6311643..a6f06f09583 100644 --- a/kyuubi-server/src/main/resources/sql/sqlite/001-KYUUBI-7028.sqlite.sql +++ b/kyuubi-server/src/main/resources/sql/sqlite/001-KYUUBI-7028.sqlite.sql @@ -11,7 +11,6 @@ CREATE TABLE IF NOT EXISTS k8s_engine_info( engine_name mediumtext, -- the engine name engine_state varchar(32), -- the engine state engine_error mediumtext, -- the engine diagnose - create_time bigint, -- the metadata create time update_time bigint -- the metadata update time ); diff --git a/kyuubi-server/src/main/resources/sql/sqlite/metadata-store-schema-1.11.0.sqlite.sql b/kyuubi-server/src/main/resources/sql/sqlite/metadata-store-schema-1.11.0.sqlite.sql index 58a9f1fa9da..6ddfb96408e 100644 --- a/kyuubi-server/src/main/resources/sql/sqlite/metadata-store-schema-1.11.0.sqlite.sql +++ b/kyuubi-server/src/main/resources/sql/sqlite/metadata-store-schema-1.11.0.sqlite.sql @@ -51,7 +51,6 @@ CREATE TABLE IF NOT EXISTS k8s_engine_info( engine_name mediumtext, -- the engine name engine_state varchar(32), -- the engine state engine_error mediumtext, -- the engine diagnose - create_time bigint, -- the metadata create time update_time bigint -- the metadata update time ); diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/KubernetesEngineInfo.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/KubernetesEngineInfo.scala index 6e57b38d30e..5ffab565457 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/KubernetesEngineInfo.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/KubernetesEngineInfo.scala @@ -30,7 +30,6 @@ package org.apache.kyuubi.server.metadata.api * @param engineName the engine name. * @param engineState the engine state. * @param engineError the engine error diagnose. - * @param createTime the metadata create time. * @param updateTime the metadata update time. */ case class KubernetesEngineInfo( @@ -44,5 +43,4 @@ case class KubernetesEngineInfo( engineName: String, engineState: String, engineError: Option[String], - createTime: Long = 0L, updateTime: Long = 0L) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala index af9596a3044..c48c8e3d0cf 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala @@ -419,7 +419,8 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { } } - private def insertKubernetesEngineInfo(engineInfo: KubernetesEngineInfo): Unit = { + // Visible for testing. + private[kyuubi] def insertKubernetesEngineInfo(engineInfo: KubernetesEngineInfo): Unit = { val insertQuery = s""" |INSERT INTO $KUBERNETES_ENGINE_INFO_TABLE( @@ -433,16 +434,14 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { |engine_name, |engine_state, |engine_error, - |create_time, |update_time |) - |SELECT ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? + |SELECT ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? |WHERE NOT EXISTS ( | SELECT 1 FROM $KUBERNETES_ENGINE_INFO_TABLE WHERE identifier = ? |) |""".stripMargin JdbcUtils.withConnection { connection => - val currentTime = System.currentTimeMillis() execute( connection, insertQuery, @@ -456,13 +455,13 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { engineInfo.engineName, engineInfo.engineState, engineInfo.engineError.orNull, - currentTime, - currentTime, + System.currentTimeMillis(), engineInfo.identifier) } } - private def updateKubernetesEngineInfo(engineInfo: KubernetesEngineInfo): Unit = { + // Visible for testing. + private[kyuubi] def updateKubernetesEngineInfo(engineInfo: KubernetesEngineInfo): Unit = { val queryBuilder = new StringBuilder val params = ListBuffer[Any]() @@ -532,7 +531,6 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { KUBERNETES_ENGINE_INFO_KEY_COLUMN) match { case Some(query) => JdbcUtils.withConnection { connection => - val currentTime = System.currentTimeMillis() execute( connection, query, @@ -546,8 +544,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { engineInfo.engineName, engineInfo.engineState, engineInfo.engineError.orNull, - currentTime, - currentTime) + System.currentTimeMillis()) } case None => insertKubernetesEngineInfo(engineInfo) @@ -655,7 +652,6 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { val appName = resultSet.getString("engine_name") val appState = resultSet.getString("engine_state") val appError = Option(resultSet.getString("engine_error")) - val createTime = resultSet.getLong("create_time") val updateTime = resultSet.getLong("update_time") val metadata = KubernetesEngineInfo( @@ -669,7 +665,6 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { engineName = appName, engineState = appState, engineError = appError, - createTime = createTime, updateTime = updateTime) metadataList += metadata } @@ -823,7 +818,6 @@ object JDBCMetadataStore { "engine_name", "engine_state", "engine_error", - "create_time", "update_time") private val KUBERNETES_ENGINE_INFO_COLUMNS_TO_REPLACE = Seq( "context", diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala index 1c55b9bc1cc..54ef3818eaf 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala @@ -297,7 +297,6 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite { assert(metadata2.engineName == metadata.engineName) assert(metadata2.engineState == metadata.engineState) assert(metadata2.engineError == metadata.engineError) - assert(metadata2.createTime > 0) assert(metadata2.updateTime > 0) val metadata3 = KubernetesEngineInfo( @@ -313,6 +312,9 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite { engineError = Some("appError2")) jdbcMetadataStore.upsertKubernetesEngineInfo(metadata3) + // test generic insert if not exist + jdbcMetadataStore.insertKubernetesEngineInfo(metadata) + val metadata4 = jdbcMetadataStore.getKubernetesMetaEngineInfo(tag) assert(metadata4.identifier == metadata3.identifier) assert(metadata4.context == metadata3.context) @@ -324,7 +326,6 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite { assert(metadata4.engineName == metadata3.engineName) assert(metadata4.engineState == metadata3.engineState) assert(metadata4.engineError == metadata3.engineError) - assert(metadata4.createTime == metadata2.createTime) assert(metadata4.updateTime > metadata2.updateTime) val applicationInfo = @@ -334,6 +335,21 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite { assert(applicationInfo.state == ApplicationState.FAILED) assert(applicationInfo.error == Some("appError2")) + // test generic update + jdbcMetadataStore.updateKubernetesEngineInfo(metadata) + val metadata5 = jdbcMetadataStore.getKubernetesMetaEngineInfo(tag) + assert(metadata5.identifier == metadata.identifier) + assert(metadata5.context == metadata.context) + assert(metadata5.namespace == metadata.namespace) + assert(metadata5.podName == metadata.podName) + assert(metadata5.podState == metadata.podState) + assert(metadata5.containerState == metadata.containerState) + assert(metadata5.engineId == metadata.engineId) + assert(metadata5.engineName == metadata.engineName) + assert(metadata5.engineState == metadata.engineState) + assert(metadata5.engineError == metadata.engineError) + assert(metadata5.updateTime > 0) + jdbcMetadataStore.cleanupKubernetesEngineInfoByIdentifier(tag) assert(jdbcMetadataStore.getKubernetesMetaEngineInfo(tag) == null) } From 4c59bebb507580bf020cc2e58c58ac8110409e46 Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Fri, 25 Apr 2025 12:30:34 -0700 Subject: [PATCH 5/8] Refine --- .../metadata/jdbc/JDBCMetadataStore.scala | 24 ++++------------ .../metadata/jdbc/JdbcDatabaseDialect.scala | 28 ++++++++----------- 2 files changed, 17 insertions(+), 35 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala index c48c8e3d0cf..37fbd1bbb53 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala @@ -526,8 +526,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { override def upsertKubernetesEngineInfo(engineInfo: KubernetesEngineInfo): Unit = { dialect.insertOrReplace( KUBERNETES_ENGINE_INFO_TABLE, - KUBERNETES_ENGINE_INFO_COLUMNS_TO_INSERT, - KUBERNETES_ENGINE_INFO_COLUMNS_TO_REPLACE, + KUBERNETES_ENGINE_INFO_COLUMNS, KUBERNETES_ENGINE_INFO_KEY_COLUMN) match { case Some(query) => JdbcUtils.withConnection { connection => @@ -554,7 +553,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { override def getKubernetesMetaEngineInfo(identifier: String): KubernetesEngineInfo = { val query = - s"SELECT $KUBERNETES_ENGINE_INFO_COLUMNS FROM" + + s"SELECT $KUBERNETES_ENGINE_INFO_COLUMNS_STR FROM" + s" $KUBERNETES_ENGINE_INFO_TABLE WHERE identifier = ?" JdbcUtils.withConnection { connection => withResultSet(connection, query, identifier) { rs => @@ -807,19 +806,8 @@ object JDBCMetadataStore { "peer_instance_closed").mkString(",") private val KUBERNETES_ENGINE_INFO_TABLE = "k8s_engine_info" private val KUBERNETES_ENGINE_INFO_KEY_COLUMN = "identifier" - private val KUBERNETES_ENGINE_INFO_COLUMNS_TO_INSERT = Seq( - "identifier", - "context", - "namespace", - "pod_name", - "pod_state", - "container_state", - "engine_id", - "engine_name", - "engine_state", - "engine_error", - "update_time") - private val KUBERNETES_ENGINE_INFO_COLUMNS_TO_REPLACE = Seq( + private val KUBERNETES_ENGINE_INFO_COLUMNS = Seq( + KUBERNETES_ENGINE_INFO_KEY_COLUMN, "context", "namespace", "pod_name", @@ -830,7 +818,5 @@ object JDBCMetadataStore { "engine_state", "engine_error", "update_time") - private val KUBERNETES_ENGINE_INFO_COLUMNS = - KUBERNETES_ENGINE_INFO_COLUMNS_TO_INSERT.mkString(",") - + private val KUBERNETES_ENGINE_INFO_COLUMNS_STR = KUBERNETES_ENGINE_INFO_COLUMNS.mkString(",") } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala index 168069ccefa..9d98ab7c5a5 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala @@ -21,8 +21,7 @@ trait JdbcDatabaseDialect { def limitClause(limit: Int, offset: Int): String def insertOrReplace( table: String, - colsToInsert: Seq[String], - colsToReplace: Seq[String], + cols: Seq[String], keyCol: String): Option[String] = None } @@ -35,44 +34,41 @@ class GenericDatabaseDialect extends JdbcDatabaseDialect { class SQLiteDatabaseDialect extends GenericDatabaseDialect { override def insertOrReplace( table: String, - colsToInsert: Seq[String], - colsToReplace: Seq[String], + cols: Seq[String], keyCol: String): Option[String] = { Some( s""" - |INSERT OR REPLACE INTO $table (${colsToInsert.mkString(",")}) - |VALUES (${colsToInsert.map(_ => "?").mkString(",")}) + |INSERT OR REPLACE INTO $table (${cols.mkString(",")}) + |VALUES (${cols.map(_ => "?").mkString(",")}) |""".stripMargin) } } class MySQLDatabaseDialect extends GenericDatabaseDialect { override def insertOrReplace( table: String, - colsToInsert: Seq[String], - colsToReplace: Seq[String], + cols: Seq[String], keyCol: String): Option[String] = { Some( s""" - |INSERT INTO $table (${colsToInsert.mkString(",")}) - |VALUES (${colsToInsert.map(_ => "?").mkString(",")}) AS new + |INSERT INTO $table (${cols.mkString(",")}) + |VALUES (${cols.map(_ => "?").mkString(",")}) AS new |ON DUPLICATE KEY UPDATE - |${colsToReplace.map(c => s"$c = new.$c").mkString(",")} + |${cols.filterNot(_ == keyCol).map(c => s"$c = new.$c").mkString(",")} |""".stripMargin) } } class PostgreSQLDatabaseDialect extends GenericDatabaseDialect { override def insertOrReplace( table: String, - colsToInsert: Seq[String], - colsToReplace: Seq[String], + cols: Seq[String], keyCol: String): Option[String] = { Some( s""" - |INSERT INTO $table (${colsToInsert.mkString(",")}) - |VALUES (${colsToInsert.map(_ => "?").mkString(",")}) + |INSERT INTO $table (${cols.mkString(",")}) + |VALUES (${cols.map(_ => "?").mkString(",")}) |ON CONFLICT ($keyCol) |DO UPDATE SET - |${colsToReplace.map(c => s"$c = EXCLUDED.$c").mkString(",")} + |${cols.filterNot(_ == keyCol).map(c => s"$c = EXCLUDED.$c").mkString(",")} |""".stripMargin) } } From 82ea6266968455ebd64f7ff611cf109e2624e7dc Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Fri, 25 Apr 2025 12:38:22 -0700 Subject: [PATCH 6/8] Add pod name --- .../org/apache/kyuubi/server/metadata/MetadataManager.scala | 3 ++- .../kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala index fb9ec12704f..522ac32dd69 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala @@ -388,6 +388,7 @@ object MetadataManager extends Logging { id = metadata.engineId, name = metadata.engineName, state = ApplicationState.withName(metadata.engineState), - error = metadata.engineError) + error = metadata.engineError, + podName = Option(metadata.podName)) } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala index 54ef3818eaf..9bfdac1baa5 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala @@ -334,6 +334,7 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite { assert(applicationInfo.name == "appName2") assert(applicationInfo.state == ApplicationState.FAILED) assert(applicationInfo.error == Some("appError2")) + assert(applicationInfo.podName == Some("podName2")) // test generic update jdbcMetadataStore.updateKubernetesEngineInfo(metadata) From 186cc690d649e7f48205345aa8f8985de9e3f959 Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Sat, 26 Apr 2025 23:25:40 -0700 Subject: [PATCH 7/8] nit --- .../resources/sql/postgresql/001-KYUUBI-7028.postgresql.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kyuubi-server/src/main/resources/sql/postgresql/001-KYUUBI-7028.postgresql.sql b/kyuubi-server/src/main/resources/sql/postgresql/001-KYUUBI-7028.postgresql.sql index 56988a7ae75..c695996de87 100644 --- a/kyuubi-server/src/main/resources/sql/postgresql/001-KYUUBI-7028.postgresql.sql +++ b/kyuubi-server/src/main/resources/sql/postgresql/001-KYUUBI-7028.postgresql.sql @@ -13,7 +13,7 @@ CREATE TABLE IF NOT EXISTS k8s_engine_info( engine_state varchar(32), engine_error text, update_time bigint NOT NULL - ); +); COMMENT ON COLUMN k8s_engine_info.key_id IS 'the auto increment key id'; COMMENT ON COLUMN k8s_engine_info.identifier IS 'the identifier id, which is an UUID'; From 9f2badef3470d7bcb2a15413ab03544e1215a8dd Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Sun, 27 Apr 2025 00:03:42 -0700 Subject: [PATCH 8/8] generic dialect --- .../metadata/jdbc/JDBCMetadataStore.scala | 124 ++---------------- .../metadata/jdbc/JdbcDatabaseDialect.scala | 63 +++++---- .../jdbc/JDBCMetadataStoreSuite.scala | 18 --- 3 files changed, 47 insertions(+), 158 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala index 37fbd1bbb53..4a020ed6e45 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala @@ -419,32 +419,16 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { } } - // Visible for testing. - private[kyuubi] def insertKubernetesEngineInfo(engineInfo: KubernetesEngineInfo): Unit = { - val insertQuery = - s""" - |INSERT INTO $KUBERNETES_ENGINE_INFO_TABLE( - |identifier, - |context, - |namespace, - |pod_name, - |pod_state, - |container_state, - |engine_id, - |engine_name, - |engine_state, - |engine_error, - |update_time - |) - |SELECT ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? - |WHERE NOT EXISTS ( - | SELECT 1 FROM $KUBERNETES_ENGINE_INFO_TABLE WHERE identifier = ? - |) - |""".stripMargin + override def upsertKubernetesEngineInfo(engineInfo: KubernetesEngineInfo): Unit = { + val query = dialect.insertOrReplace( + KUBERNETES_ENGINE_INFO_TABLE, + KUBERNETES_ENGINE_INFO_COLUMNS, + KUBERNETES_ENGINE_INFO_KEY_COLUMN, + engineInfo.identifier) JdbcUtils.withConnection { connection => execute( connection, - insertQuery, + query, engineInfo.identifier, engineInfo.context.orNull, engineInfo.namespace.orNull, @@ -455,99 +439,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { engineInfo.engineName, engineInfo.engineState, engineInfo.engineError.orNull, - System.currentTimeMillis(), - engineInfo.identifier) - } - } - - // Visible for testing. - private[kyuubi] def updateKubernetesEngineInfo(engineInfo: KubernetesEngineInfo): Unit = { - val queryBuilder = new StringBuilder - val params = ListBuffer[Any]() - - queryBuilder.append(s"UPDATE $KUBERNETES_ENGINE_INFO_TABLE") - val setClauses = ListBuffer[String]() - engineInfo.context.foreach { context => - setClauses += "context = ?" - params += context - } - engineInfo.namespace.foreach { namespace => - setClauses += "namespace = ?" - params += namespace - } - Option(engineInfo.podName).foreach { pod => - setClauses += "pod_name = ?" - params += pod - } - Option(engineInfo.podState).foreach { podState => - setClauses += "pod_state = ?" - params += podState - } - Option(engineInfo.containerState).foreach { containerState => - setClauses += "container_state = ?" - params += containerState - } - Option(engineInfo.engineId).foreach { appId => - setClauses += "engine_id = ?" - params += appId - } - Option(engineInfo.engineName).foreach { appName => - setClauses += "engine_name = ?" - params += appName - } - Option(engineInfo.engineState).foreach { appState => - setClauses += "engine_state = ?" - params += appState - } - engineInfo.engineError.foreach { appError => - setClauses += "engine_error = ?" - params += appError - } - setClauses += "update_time = ?" - params += System.currentTimeMillis() - - queryBuilder.append(setClauses.mkString(" SET ", ", ", "")) - queryBuilder.append(" WHERE identifier = ?") - params += engineInfo.identifier - - val query = queryBuilder.toString() - JdbcUtils.withConnection { connection => - withUpdateCount(connection, query, params.toSeq: _*) { updateCount => - if (updateCount == 0) { - throw new KyuubiException( - s"Error updating kubernetes engine info for ${engineInfo.identifier} by SQL: $query, " + - s"with params: ${params.mkString(", ")}") - } - } - } - - } - - override def upsertKubernetesEngineInfo(engineInfo: KubernetesEngineInfo): Unit = { - dialect.insertOrReplace( - KUBERNETES_ENGINE_INFO_TABLE, - KUBERNETES_ENGINE_INFO_COLUMNS, - KUBERNETES_ENGINE_INFO_KEY_COLUMN) match { - case Some(query) => - JdbcUtils.withConnection { connection => - execute( - connection, - query, - engineInfo.identifier, - engineInfo.context.orNull, - engineInfo.namespace.orNull, - engineInfo.podName, - engineInfo.podState, - engineInfo.containerState, - engineInfo.engineId, - engineInfo.engineName, - engineInfo.engineState, - engineInfo.engineError.orNull, - System.currentTimeMillis()) - } - case None => - insertKubernetesEngineInfo(engineInfo) - updateKubernetesEngineInfo(engineInfo) + System.currentTimeMillis()) } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala index 9d98ab7c5a5..24408e16131 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala @@ -22,53 +22,68 @@ trait JdbcDatabaseDialect { def insertOrReplace( table: String, cols: Seq[String], - keyCol: String): Option[String] = None + keyCol: String, + keyVal: String): String } class GenericDatabaseDialect extends JdbcDatabaseDialect { override def limitClause(limit: Int, offset: Int): String = { s"LIMIT $limit OFFSET $offset" } + + override def insertOrReplace( + table: String, + cols: Seq[String], + keyCol: String, + keyVal: String): String = { + s""" + |INSERT INTO $table (${cols.mkString(",")}) + |SELECT ${cols.map(_ => "?").mkString(",")} + |WHERE NOT EXISTS ( + | SELECT 1 FROM $table WHERE $keyCol = '$keyVal') + |) + |""".stripMargin + } } class SQLiteDatabaseDialect extends GenericDatabaseDialect { override def insertOrReplace( table: String, cols: Seq[String], - keyCol: String): Option[String] = { - Some( - s""" - |INSERT OR REPLACE INTO $table (${cols.mkString(",")}) - |VALUES (${cols.map(_ => "?").mkString(",")}) - |""".stripMargin) + keyCol: String, + keyVal: String): String = { + s""" + |INSERT OR REPLACE INTO $table (${cols.mkString(",")}) + |VALUES (${cols.map(_ => "?").mkString(",")}) + |""".stripMargin } } class MySQLDatabaseDialect extends GenericDatabaseDialect { override def insertOrReplace( table: String, cols: Seq[String], - keyCol: String): Option[String] = { - Some( - s""" - |INSERT INTO $table (${cols.mkString(",")}) - |VALUES (${cols.map(_ => "?").mkString(",")}) AS new - |ON DUPLICATE KEY UPDATE - |${cols.filterNot(_ == keyCol).map(c => s"$c = new.$c").mkString(",")} - |""".stripMargin) + keyCol: String, + keyVal: String): String = { + s""" + |INSERT INTO $table (${cols.mkString(",")}) + |VALUES (${cols.map(_ => "?").mkString(",")}) AS new + |ON DUPLICATE KEY UPDATE + |${cols.filterNot(_ == keyCol).map(c => s"$c = new.$c").mkString(",")} + |""".stripMargin } } class PostgreSQLDatabaseDialect extends GenericDatabaseDialect { override def insertOrReplace( table: String, cols: Seq[String], - keyCol: String): Option[String] = { - Some( - s""" - |INSERT INTO $table (${cols.mkString(",")}) - |VALUES (${cols.map(_ => "?").mkString(",")}) - |ON CONFLICT ($keyCol) - |DO UPDATE SET - |${cols.filterNot(_ == keyCol).map(c => s"$c = EXCLUDED.$c").mkString(",")} - |""".stripMargin) + keyCol: String, + keyVal: String): String = { + s""" + |INSERT INTO $table (${cols.mkString(",")}) + |VALUES (${cols.map(_ => "?").mkString(",")}) + |ON CONFLICT ($keyCol) + |DO UPDATE SET + |${cols.filterNot(_ == keyCol).map(c => s"$c = EXCLUDED.$c").mkString(",")} + |""".stripMargin } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala index 9bfdac1baa5..77f4ca57d53 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala @@ -312,9 +312,6 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite { engineError = Some("appError2")) jdbcMetadataStore.upsertKubernetesEngineInfo(metadata3) - // test generic insert if not exist - jdbcMetadataStore.insertKubernetesEngineInfo(metadata) - val metadata4 = jdbcMetadataStore.getKubernetesMetaEngineInfo(tag) assert(metadata4.identifier == metadata3.identifier) assert(metadata4.context == metadata3.context) @@ -336,21 +333,6 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite { assert(applicationInfo.error == Some("appError2")) assert(applicationInfo.podName == Some("podName2")) - // test generic update - jdbcMetadataStore.updateKubernetesEngineInfo(metadata) - val metadata5 = jdbcMetadataStore.getKubernetesMetaEngineInfo(tag) - assert(metadata5.identifier == metadata.identifier) - assert(metadata5.context == metadata.context) - assert(metadata5.namespace == metadata.namespace) - assert(metadata5.podName == metadata.podName) - assert(metadata5.podState == metadata.podState) - assert(metadata5.containerState == metadata.containerState) - assert(metadata5.engineId == metadata.engineId) - assert(metadata5.engineName == metadata.engineName) - assert(metadata5.engineState == metadata.engineState) - assert(metadata5.engineError == metadata.engineError) - assert(metadata5.updateTime > 0) - jdbcMetadataStore.cleanupKubernetesEngineInfoByIdentifier(tag) assert(jdbcMetadataStore.getKubernetesMetaEngineInfo(tag) == null) }