diff --git a/.rat-excludes b/.rat-excludes index 431327294c3..f7a57ec3c9d 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -43,6 +43,7 @@ build/scala-*/** **/metadata-store-schema*.sql **/*.derby.sql **/*.mysql.sql +**/*.postgresql.sql **/*.sqlite.sql **/node/** **/web-ui/dist/** diff --git a/kyuubi-server/src/main/resources/sql/mysql/006-KYUUBI-7028.mysql.sql b/kyuubi-server/src/main/resources/sql/mysql/006-KYUUBI-7028.mysql.sql new file mode 100644 index 00000000000..7a0314e6b0d --- /dev/null +++ b/kyuubi-server/src/main/resources/sql/mysql/006-KYUUBI-7028.mysql.sql @@ -0,0 +1,17 @@ +SELECT '< KYUUBI-7028: Persist Kubernetes metadata into metastore' AS ' '; + +CREATE TABLE IF NOT EXISTS k8s_engine_info( + key_id bigint PRIMARY KEY AUTO_INCREMENT COMMENT 'the auto increment key id', + identifier varchar(36) NOT NULL COMMENT 'the identifier id, which is an UUID', + context varchar(32) COMMENT 'the kubernetes context', + namespace varchar(255) COMMENT 'the kubernetes namespace', + pod_name varchar(255) NOT NULL COMMENT 'the kubernetes pod name', + pod_state varchar(32) COMMENT 'the kubernetes pod state', + container_state mediumtext COMMENT 'the kubernetes container state', + engine_id varchar(128) COMMENT 'the engine id', + engine_name mediumtext COMMENT 'the engine name', + engine_state varchar(32) COMMENT 'the engine state', + engine_error mediumtext COMMENT 'the engine diagnose', + update_time bigint COMMENT 'the metadata update time', + UNIQUE INDEX unique_identifier_index(identifier) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/kyuubi-server/src/main/resources/sql/mysql/metadata-store-schema-1.11.0.mysql.sql b/kyuubi-server/src/main/resources/sql/mysql/metadata-store-schema-1.11.0.mysql.sql new file mode 100644 index 00000000000..e26d3b81ce1 --- /dev/null +++ b/kyuubi-server/src/main/resources/sql/mysql/metadata-store-schema-1.11.0.mysql.sql @@ -0,0 +1,51 @@ +-- the metadata table ddl + +CREATE TABLE IF NOT EXISTS metadata( + key_id bigint PRIMARY KEY AUTO_INCREMENT COMMENT 'the auto increment key id', + identifier varchar(36) NOT NULL COMMENT 'the identifier id, which is an UUID', + session_type varchar(32) NOT NULL COMMENT 'the session type, SQL or BATCH', + real_user varchar(255) NOT NULL COMMENT 'the real user', + user_name varchar(255) NOT NULL COMMENT 'the user name, might be a proxy user', + ip_address varchar(128) COMMENT 'the client ip address', + kyuubi_instance varchar(1024) COMMENT 'the kyuubi instance that creates this', + state varchar(128) NOT NULL COMMENT 'the session state', + resource varchar(1024) COMMENT 'the main resource', + class_name varchar(1024) COMMENT 'the main class name', + request_name varchar(1024) COMMENT 'the request name', + request_conf mediumtext COMMENT 'the request config map', + request_args mediumtext COMMENT 'the request arguments', + create_time BIGINT NOT NULL COMMENT 'the metadata create time', + engine_type varchar(32) NOT NULL COMMENT 'the engine type', + cluster_manager varchar(128) COMMENT 'the engine cluster manager', + engine_open_time bigint COMMENT 'the engine open time', + engine_id varchar(128) COMMENT 'the engine application id', + engine_name mediumtext COMMENT 'the engine application name', + engine_url varchar(1024) COMMENT 'the engine tracking url', + engine_state varchar(32) COMMENT 'the engine application state', + engine_error mediumtext COMMENT 'the engine application diagnose', + end_time bigint COMMENT 'the metadata end time', + priority int NOT NULL DEFAULT 10 COMMENT 'the application priority, high value means high priority', + peer_instance_closed boolean default '0' COMMENT 'closed by peer kyuubi instance', + UNIQUE INDEX unique_identifier_index(identifier), + INDEX user_name_index(user_name), + INDEX engine_type_index(engine_type), + INDEX create_time_index(create_time), + -- See more detail about this index in ./005-KYUUBI-5327.mysql.sql + INDEX priority_create_time_index(priority DESC, create_time ASC) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +CREATE TABLE IF NOT EXISTS k8s_engine_info( + key_id bigint PRIMARY KEY AUTO_INCREMENT COMMENT 'the auto increment key id', + identifier varchar(36) NOT NULL COMMENT 'the identifier id, which is an UUID', + context varchar(32) COMMENT 'the kubernetes context', + namespace varchar(255) COMMENT 'the kubernetes namespace', + pod_name varchar(255) NOT NULL COMMENT 'the kubernetes pod name', + pod_state varchar(32) COMMENT 'the kubernetes pod state', + container_state mediumtext COMMENT 'the kubernetes container state', + engine_id varchar(128) COMMENT 'the engine id', + engine_name mediumtext COMMENT 'the engine name', + engine_state varchar(32) COMMENT 'the engine state', + engine_error mediumtext COMMENT 'the engine diagnose', + update_time bigint COMMENT 'the metadata update time', + UNIQUE INDEX unique_identifier_index(identifier) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/kyuubi-server/src/main/resources/sql/mysql/upgrade-1.8.0-to-1.11.0.mysql.sql b/kyuubi-server/src/main/resources/sql/mysql/upgrade-1.8.0-to-1.11.0.mysql.sql new file mode 100644 index 00000000000..02d46e8a00b --- /dev/null +++ b/kyuubi-server/src/main/resources/sql/mysql/upgrade-1.8.0-to-1.11.0.mysql.sql @@ -0,0 +1,3 @@ +SELECT '< Upgrading MetaStore schema from 1.8.0 to 1.11.0 >' AS ' '; +SOURCE 006-KYUUBI-7028.mysql.sql; +SELECT '< Finished upgrading MetaStore schema from 1.8.0 to 1.11.0 >' AS ' '; diff --git a/kyuubi-server/src/main/resources/sql/postgresql/001-KYUUBI-7028.postgresql.sql b/kyuubi-server/src/main/resources/sql/postgresql/001-KYUUBI-7028.postgresql.sql new file mode 100644 index 00000000000..c695996de87 --- /dev/null +++ b/kyuubi-server/src/main/resources/sql/postgresql/001-KYUUBI-7028.postgresql.sql @@ -0,0 +1,31 @@ +SELECT '< KYUUBI-7028: Persist Kubernetes metadata into metastore' AS ' '; + +CREATE TABLE IF NOT EXISTS k8s_engine_info( + key_id bigserial PRIMARY KEY, + identifier varchar(36) NOT NULL, + context varchar(32), + namespace varchar(255), + pod_name varchar(255) NOT NULL, + pod_state varchar(32), + container_state text, + engine_id varchar(128), + engine_name text, + engine_state varchar(32), + engine_error text, + update_time bigint NOT NULL +); + +COMMENT ON COLUMN k8s_engine_info.key_id IS 'the auto increment key id'; +COMMENT ON COLUMN k8s_engine_info.identifier IS 'the identifier id, which is an UUID'; +COMMENT ON COLUMN k8s_engine_info.context IS 'the kubernetes context'; +COMMENT ON COLUMN k8s_engine_info.namespace IS 'the kubernetes namespace'; +COMMENT ON COLUMN k8s_engine_info.pod_name IS 'the kubernetes pod name'; +COMMENT ON COLUMN k8s_engine_info.pod_state IS 'the kubernetes pod state'; +COMMENT ON COLUMN k8s_engine_info.container_state IS 'the kubernetes container state'; +COMMENT ON COLUMN k8s_engine_info.engine_id IS 'the engine id'; +COMMENT ON COLUMN k8s_engine_info.engine_name IS 'the engine name'; +COMMENT ON COLUMN k8s_engine_info.engine_state IS 'the engine state'; +COMMENT ON COLUMN k8s_engine_info.engine_error IS 'the engine diagnose'; +COMMENT ON COLUMN k8s_engine_info.update_time IS 'the metadata update time'; + +CREATE UNIQUE INDEX IF NOT EXISTS k8s_engine_info_unique_identifier_index ON k8s_engine_info(identifier); diff --git a/kyuubi-server/src/main/resources/sql/postgresql/metadata-store-schema-1.11.0.postgresql.sql b/kyuubi-server/src/main/resources/sql/postgresql/metadata-store-schema-1.11.0.postgresql.sql new file mode 100644 index 00000000000..2d6c1c4e290 --- /dev/null +++ b/kyuubi-server/src/main/resources/sql/postgresql/metadata-store-schema-1.11.0.postgresql.sql @@ -0,0 +1,89 @@ +CREATE TABLE IF NOT EXISTS metadata( + key_id bigserial PRIMARY KEY, + identifier varchar(36) NOT NULL, + session_type varchar(32) NOT NULL, + real_user varchar(255) NOT NULL, + user_name varchar(255) NOT NULL, + ip_address varchar(128), + kyuubi_instance varchar(1024), + state varchar(128) NOT NULL, + resource varchar(1024), + class_name varchar(1024), + request_name varchar(1024), + request_conf text, + request_args text, + create_time bigint NOT NULL, + engine_type varchar(32) NOT NULL, + cluster_manager varchar(128), + engine_open_time bigint, + engine_id varchar(128), + engine_name text, + engine_url varchar(1024), + engine_state varchar(32), + engine_error text, + end_time bigint, + priority int NOT NULL DEFAULT 10, + peer_instance_closed boolean DEFAULT FALSE +); + +COMMENT ON COLUMN metadata.key_id IS 'the auto increment key id'; +COMMENT ON COLUMN metadata.identifier IS 'the identifier id, which is an UUID'; +COMMENT ON COLUMN metadata.session_type IS 'the session type, SQL or BATCH'; +COMMENT ON COLUMN metadata.real_user IS 'the real user'; +COMMENT ON COLUMN metadata.user_name IS 'the user name, might be a proxy user'; +COMMENT ON COLUMN metadata.ip_address IS 'the client ip address'; +COMMENT ON COLUMN metadata.kyuubi_instance IS 'the kyuubi instance that creates this'; +COMMENT ON COLUMN metadata.state IS 'the session state'; +COMMENT ON COLUMN metadata.resource IS 'the main resource'; +COMMENT ON COLUMN metadata.class_name IS 'the main class name'; +COMMENT ON COLUMN metadata.request_name IS 'the request name'; +COMMENT ON COLUMN metadata.request_conf IS 'the request config map'; +COMMENT ON COLUMN metadata.request_args IS 'the request arguments'; +COMMENT ON COLUMN metadata.create_time IS 'the metadata create time'; +COMMENT ON COLUMN metadata.engine_type IS 'the engine type'; +COMMENT ON COLUMN metadata.cluster_manager IS 'the engine cluster manager'; +COMMENT ON COLUMN metadata.engine_open_time IS 'the engine open time'; +COMMENT ON COLUMN metadata.engine_id IS 'the engine application id'; +COMMENT ON COLUMN metadata.engine_name IS 'the engine application name'; +COMMENT ON COLUMN metadata.engine_url IS 'the engine tracking url'; +COMMENT ON COLUMN metadata.engine_state IS 'the engine application state'; +COMMENT ON COLUMN metadata.engine_error IS 'the engine application diagnose'; +COMMENT ON COLUMN metadata.end_time IS 'the metadata end time'; +COMMENT ON COLUMN metadata.priority IS 'the application priority, high value means high priority'; +COMMENT ON COLUMN metadata.peer_instance_closed IS 'closed by peer kyuubi instance'; + +CREATE UNIQUE INDEX IF NOT EXISTS unique_identifier_index ON metadata(identifier); +CREATE INDEX IF NOT EXISTS user_name_index ON metadata(user_name); +CREATE INDEX IF NOT EXISTS engine_type_index ON metadata(engine_type); +CREATE INDEX IF NOT EXISTS create_time_index ON metadata(create_time); +CREATE INDEX IF NOT EXISTS priority_create_time_index ON metadata(priority DESC, create_time ASC); + +CREATE TABLE IF NOT EXISTS k8s_engine_info( + key_id bigserial PRIMARY KEY, + identifier varchar(36) NOT NULL, + context varchar(32), + namespace varchar(255), + pod_name varchar(255) NOT NULL, + pod_state varchar(32), + container_state text, + engine_id varchar(128), + engine_name text, + engine_state varchar(32), + engine_error text, + update_time bigint NOT NULL +); + +COMMENT ON COLUMN k8s_engine_info.key_id IS 'the auto increment key id'; +COMMENT ON COLUMN k8s_engine_info.identifier IS 'the identifier id, which is an UUID'; +COMMENT ON COLUMN k8s_engine_info.context IS 'the kubernetes context'; +COMMENT ON COLUMN k8s_engine_info.namespace IS 'the kubernetes namespace'; +COMMENT ON COLUMN k8s_engine_info.pod_name IS 'the kubernetes pod name'; +COMMENT ON COLUMN k8s_engine_info.pod_state IS 'the kubernetes pod state'; +COMMENT ON COLUMN k8s_engine_info.container_state IS 'the kubernetes container state'; +COMMENT ON COLUMN k8s_engine_info.engine_id IS 'the engine id'; +COMMENT ON COLUMN k8s_engine_info.engine_name IS 'the engine name'; +COMMENT ON COLUMN k8s_engine_info.engine_state IS 'the engine state'; +COMMENT ON COLUMN k8s_engine_info.engine_error IS 'the engine diagnose'; +COMMENT ON COLUMN k8s_engine_info.update_time IS 'the metadata update time'; + +CREATE UNIQUE INDEX IF NOT EXISTS k8s_engine_info_unique_identifier_index ON k8s_engine_info(identifier); diff --git a/kyuubi-server/src/main/resources/sql/postgresql/upgrade-1.9.0-to-1.11.0.postgresql.sql b/kyuubi-server/src/main/resources/sql/postgresql/upgrade-1.9.0-to-1.11.0.postgresql.sql new file mode 100644 index 00000000000..48dea7959aa --- /dev/null +++ b/kyuubi-server/src/main/resources/sql/postgresql/upgrade-1.9.0-to-1.11.0.postgresql.sql @@ -0,0 +1,3 @@ +SELECT '< Upgrading MetaStore schema from 1.9.0 to 1.11.0 >' AS ' '; +\i 001-KYUUBI-7028.postgresql.sql +SELECT '< Finished upgrading MetaStore schema from 1.9.0 to 1.11.0 >' AS ' '; diff --git a/kyuubi-server/src/main/resources/sql/sqlite/001-KYUUBI-7028.sqlite.sql b/kyuubi-server/src/main/resources/sql/sqlite/001-KYUUBI-7028.sqlite.sql new file mode 100644 index 00000000000..a6f06f09583 --- /dev/null +++ b/kyuubi-server/src/main/resources/sql/sqlite/001-KYUUBI-7028.sqlite.sql @@ -0,0 +1,17 @@ +-- the k8s_engine_info table ddl +CREATE TABLE IF NOT EXISTS k8s_engine_info( + key_id INTEGER PRIMARY KEY AUTOINCREMENT, -- the auto increment key id + identifier varchar(36) NOT NULL, -- the identifier id, which is an UUID + context varchar(32), -- the kubernetes context + namespace varchar(255), -- the kubernetes namespace + pod_name varchar(255) NOT NULL, -- the kubernetes pod name + pod_state varchar(32), -- the kubernetes pod state + container_state mediumtext, -- the kubernetes container state + engine_id varchar(128), -- the engine id + engine_name mediumtext, -- the engine name + engine_state varchar(32), -- the engine state + engine_error mediumtext, -- the engine diagnose + update_time bigint -- the metadata update time +); + +CREATE UNIQUE INDEX IF NOT EXISTS k8s_engine_info_unique_identifier_index ON k8s_engine_info(identifier); diff --git a/kyuubi-server/src/main/resources/sql/sqlite/metadata-store-schema-1.11.0.sqlite.sql b/kyuubi-server/src/main/resources/sql/sqlite/metadata-store-schema-1.11.0.sqlite.sql new file mode 100644 index 00000000000..6ddfb96408e --- /dev/null +++ b/kyuubi-server/src/main/resources/sql/sqlite/metadata-store-schema-1.11.0.sqlite.sql @@ -0,0 +1,57 @@ +-- the metadata table ddl + +CREATE TABLE IF NOT EXISTS metadata( + key_id INTEGER PRIMARY KEY AUTOINCREMENT, -- the auto increment key id + identifier varchar(36) NOT NULL, -- the identifier id, which is an UUID + session_type varchar(32) NOT NULL, -- the session type, SQL or BATCH + real_user varchar(255) NOT NULL, -- the real user + user_name varchar(255) NOT NULL, -- the user name, might be a proxy user + ip_address varchar(128), -- the client ip address + kyuubi_instance varchar(1024), -- the kyuubi instance that creates this + state varchar(128) NOT NULL, -- the session state + resource varchar(1024), -- the main resource + class_name varchar(1024), -- the main class name + request_name varchar(1024), -- the request name + request_conf mediumtext, -- the request config map + request_args mediumtext, -- the request arguments + create_time BIGINT NOT NULL, -- the metadata create time + engine_type varchar(32) NOT NULL, -- the engine type + cluster_manager varchar(128), -- the engine cluster manager + engine_open_time bigint, -- the engine open time + engine_id varchar(128), -- the engine application id + engine_name mediumtext, -- the engine application name + engine_url varchar(1024), -- the engine tracking url + engine_state varchar(32), -- the engine application state + engine_error mediumtext, -- the engine application diagnose + end_time bigint, -- the metadata end time + priority INTEGER NOT NULL DEFAULT 10, -- the application priority, high value means high priority + peer_instance_closed boolean default '0' -- closed by peer kyuubi instance +); + +CREATE UNIQUE INDEX IF NOT EXISTS metadata_unique_identifier_index ON metadata(identifier); + +CREATE INDEX IF NOT EXISTS metadata_user_name_index ON metadata(user_name); + +CREATE INDEX IF NOT EXISTS metadata_engine_type_index ON metadata(engine_type); + +CREATE INDEX IF NOT EXISTS metadata_create_time_index ON metadata(create_time); + +CREATE INDEX IF NOT EXISTS metadata_priority_create_time_index ON metadata(priority, create_time); + +-- the k8s_engine_info table ddl +CREATE TABLE IF NOT EXISTS k8s_engine_info( + key_id INTEGER PRIMARY KEY AUTOINCREMENT, -- the auto increment key id + identifier varchar(36) NOT NULL, -- the identifier id, which is an UUID + context varchar(32), -- the kubernetes context + namespace varchar(255), -- the kubernetes namespace + pod_name varchar(255) NOT NULL, -- the kubernetes pod name + pod_state varchar(32), -- the kubernetes pod state + container_state mediumtext, -- the kubernetes container state + engine_id varchar(128), -- the engine id + engine_name mediumtext, -- the engine name + engine_state varchar(32), -- the engine state + engine_error mediumtext, -- the engine diagnose + update_time bigint -- the metadata update time +); + +CREATE UNIQUE INDEX IF NOT EXISTS k8s_engine_info_unique_identifier_index ON k8s_engine_info(identifier); diff --git a/kyuubi-server/src/main/resources/sql/sqlite/upgrade-1.8.0-to-1.11.0.sqlite.sql b/kyuubi-server/src/main/resources/sql/sqlite/upgrade-1.8.0-to-1.11.0.sqlite.sql new file mode 100644 index 00000000000..c759b0bc307 --- /dev/null +++ b/kyuubi-server/src/main/resources/sql/sqlite/upgrade-1.8.0-to-1.11.0.sqlite.sql @@ -0,0 +1,3 @@ +SELECT '< Upgrading MetaStore schema from 1.8.0 to 1.11.0 >' AS ' '; +.read 001-KYUUBI-7028.sqlite.sql +SELECT '< Finished upgrading MetaStore schema from 1.8.0 to 1.11.0 >' AS ' '; diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index c53dda0e36c..116afd50dab 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -38,6 +38,7 @@ import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState import org.apache.kyuubi.engine.KubernetesResourceEventTypes.KubernetesResourceEventType import org.apache.kyuubi.operation.OperationState import org.apache.kyuubi.server.metadata.MetadataManager +import org.apache.kyuubi.server.metadata.api.KubernetesEngineInfo import org.apache.kyuubi.util.{KubernetesUtils, ThreadUtils} class KubernetesApplicationOperation extends ApplicationOperation with Logging { @@ -255,8 +256,13 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { try { // need to initialize the kubernetes client if not exists getOrCreateKubernetesClient(appMgrInfo.kubernetesInfo) - val (_, appInfo) = - appInfoStore.getOrDefault(tag, appMgrInfo.kubernetesInfo -> ApplicationInfo.NOT_FOUND) + val appInfo = appInfoStore.get(tag) match { + case (_, info) => info + case _ => + // try to get the application info from kubernetes engine info store + metadataManager.flatMap( + _.getKubernetesApplicationInfo(tag)).getOrElse(ApplicationInfo.NOT_FOUND) + } (appInfo.state, submitTime) match { // Kyuubi should wait second if pod is not be created case (NOT_FOUND, Some(_submitTime)) => @@ -340,7 +346,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { updateApplicationState(kubernetesInfo, newPod, eventType) val appState = toApplicationState(newPod, appStateSource, appStateContainer, eventType) if (isTerminated(appState)) { - markApplicationTerminated(newPod, eventType) + markApplicationTerminated(kubernetesInfo, newPod, eventType) } KubernetesApplicationAuditLogger.audit( eventType, @@ -358,7 +364,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { if (isSparkEnginePod(pod)) { val eventType = KubernetesResourceEventTypes.DELETE updateApplicationState(kubernetesInfo, pod, eventType) - markApplicationTerminated(pod, eventType) + markApplicationTerminated(kubernetesInfo, pod, eventType) KubernetesApplicationAuditLogger.audit( eventType, kubernetesInfo, @@ -456,13 +462,28 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { } private def markApplicationTerminated( + kubernetesInfo: KubernetesInfo, pod: Pod, eventType: KubernetesResourceEventType): Unit = synchronized { val key = pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY) + val (appState, appError) = + toApplicationStateAndError(pod, appStateSource, appStateContainer, eventType) + // upsert the kubernetes engine info store when the application is terminated + metadataManager.foreach(_.upsertKubernetesMetadata( + KubernetesEngineInfo( + identifier = key, + context = kubernetesInfo.context, + namespace = kubernetesInfo.namespace, + podName = pod.getMetadata.getName, + podState = pod.getStatus.getPhase, + containerState = pod.getStatus.getContainerStatuses.asScala.map(cs => + s"${cs.getName}->${cs.getState}").mkString(","), + engineId = getPodAppId(pod), + engineName = getPodAppName(pod), + engineState = appState.toString, + engineError = appError))) if (cleanupTerminatedAppInfoTrigger.getIfPresent(key) == null) { - cleanupTerminatedAppInfoTrigger.put( - key, - toApplicationState(pod, appStateSource, appStateContainer, eventType)) + cleanupTerminatedAppInfoTrigger.put(key, appState) } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala index 9ca1948dec6..522ac32dd69 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala @@ -26,9 +26,10 @@ import org.apache.kyuubi.{KyuubiException, Logging} import org.apache.kyuubi.client.api.v1.dto.Batch import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.METADATA_MAX_AGE +import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationState} import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem} import org.apache.kyuubi.operation.OperationState -import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter} +import org.apache.kyuubi.server.metadata.api.{KubernetesEngineInfo, Metadata, MetadataFilter} import org.apache.kyuubi.service.AbstractService import org.apache.kyuubi.session.SessionType import org.apache.kyuubi.util.{ClassUtils, JdbcUtils, ThreadUtils} @@ -134,6 +135,15 @@ class MetadataManager extends AbstractService("MetadataManager") { .filter(_.sessionType == SessionType.BATCH) } + def getKubernetesApplicationInfo(identifier: String): Option[ApplicationInfo] = { + Option(withMetadataRequestMetrics(_metadataStore.getKubernetesMetaEngineInfo(identifier))).map { + metadata => + val appInfo = buildApplicationInfo(metadata) + info(s"Retrieve $appInfo from kubernetes engine info store: $metadata") + appInfo + } + } + def getBatches( filter: MetadataFilter, from: Int, @@ -204,16 +214,32 @@ class MetadataManager extends AbstractService("MetadataManager") { } } + def upsertKubernetesMetadata(kubernetesEngineInfo: KubernetesEngineInfo): Unit = { + try { + withMetadataRequestMetrics(_metadataStore.upsertKubernetesEngineInfo(kubernetesEngineInfo)) + } catch { + case e: Throwable => + error( + s"Error updating kubernetes engine info for session ${kubernetesEngineInfo.identifier}", + e) + } + } + def cleanupMetadataById(batchId: String): Unit = { withMetadataRequestMetrics(_metadataStore.cleanupMetadataByIdentifier(batchId)) } + def cleanupKubernetesMetadataById(identifier: String): Unit = { + withMetadataRequestMetrics(_metadataStore.cleanupKubernetesEngineInfoByIdentifier(identifier)) + } + private def startMetadataCleaner(): Unit = { val stateMaxAge = conf.get(METADATA_MAX_AGE) val interval = conf.get(KyuubiConf.METADATA_CLEANER_INTERVAL) val cleanerTask: Runnable = () => { try { withMetadataRequestMetrics(_metadataStore.cleanupMetadataByAge(stateMaxAge)) + withMetadataRequestMetrics(_metadataStore.cleanupKubernetesEngineInfoByAge(stateMaxAge)) } catch { case e: Throwable => error("Error cleaning up the metadata by age", e) } @@ -356,4 +382,13 @@ object MetadataManager extends Logging { batchMetadata.endTime, Map.empty[String, String].asJava) } + + def buildApplicationInfo(metadata: KubernetesEngineInfo): ApplicationInfo = { + ApplicationInfo( + id = metadata.engineId, + name = metadata.engineName, + state = ApplicationState.withName(metadata.engineState), + error = metadata.engineError, + podName = Option(metadata.podName)) + } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala index b9fb52f779a..46a5505afe8 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.server.metadata import java.io.Closeable -import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter} +import org.apache.kyuubi.server.metadata.api.{KubernetesEngineInfo, Metadata, MetadataFilter} trait MetadataStore extends Closeable { @@ -82,6 +82,18 @@ trait MetadataStore extends Closeable { */ def updateMetadata(metadata: Metadata): Unit + /** + * Upsert the kubernetes engine info. Insert if not exists, otherwise update. + */ + def upsertKubernetesEngineInfo(engineInfo: KubernetesEngineInfo): Unit + + /** + * Get the persisted kubernetes engine info by unique identifier. + * @param identifier the identifier. + * @return selected engine info. + */ + def getKubernetesMetaEngineInfo(identifier: String): KubernetesEngineInfo + /** * Cleanup meta data by identifier. */ @@ -92,4 +104,15 @@ trait MetadataStore extends Closeable { * @param maxAge the batch state info maximum age. */ def cleanupMetadataByAge(maxAge: Long): Unit + + /** + * Cleanup kubernetes engine info by identifier. + */ + def cleanupKubernetesEngineInfoByIdentifier(identifier: String): Unit + + /** + * Check and cleanup the kubernetes engine info with maxAge limitation. + * @param maxAge the kubernetes engine info maximum age. + */ + def cleanupKubernetesEngineInfoByAge(maxAge: Long): Unit } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/KubernetesEngineInfo.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/KubernetesEngineInfo.scala new file mode 100644 index 00000000000..5ffab565457 --- /dev/null +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/KubernetesEngineInfo.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.server.metadata.api + +/** + * The kubernetes engine info. + * + * @param identifier the kubernetes unique identifier. + * @param context the kubernetes context. + * @param namespace the kubernetes namespace. + * @param podName the kubernetes pod name. + * @param podState the kubernetes pod state. + * @param containerState the kubernetes container state. + * @param engineId the engine id. + * @param engineName the engine name. + * @param engineState the engine state. + * @param engineError the engine error diagnose. + * @param updateTime the metadata update time. + */ +case class KubernetesEngineInfo( + identifier: String, + context: Option[String], + namespace: Option[String], + podName: String, + podState: String, + containerState: String, + engineId: String, + engineName: String, + engineState: String, + engineError: Option[String], + updateTime: Long = 0L) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala index af965a220f6..4a020ed6e45 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala @@ -35,7 +35,7 @@ import org.apache.kyuubi.{KyuubiException, Logging, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.operation.OperationState import org.apache.kyuubi.server.metadata.MetadataStore -import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter} +import org.apache.kyuubi.server.metadata.api.{KubernetesEngineInfo, Metadata, MetadataFilter} import org.apache.kyuubi.server.metadata.jdbc.DatabaseType._ import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf._ import org.apache.kyuubi.session.SessionType @@ -419,6 +419,56 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { } } + override def upsertKubernetesEngineInfo(engineInfo: KubernetesEngineInfo): Unit = { + val query = dialect.insertOrReplace( + KUBERNETES_ENGINE_INFO_TABLE, + KUBERNETES_ENGINE_INFO_COLUMNS, + KUBERNETES_ENGINE_INFO_KEY_COLUMN, + engineInfo.identifier) + JdbcUtils.withConnection { connection => + execute( + connection, + query, + engineInfo.identifier, + engineInfo.context.orNull, + engineInfo.namespace.orNull, + engineInfo.podName, + engineInfo.podState, + engineInfo.containerState, + engineInfo.engineId, + engineInfo.engineName, + engineInfo.engineState, + engineInfo.engineError.orNull, + System.currentTimeMillis()) + } + } + + override def getKubernetesMetaEngineInfo(identifier: String): KubernetesEngineInfo = { + val query = + s"SELECT $KUBERNETES_ENGINE_INFO_COLUMNS_STR FROM" + + s" $KUBERNETES_ENGINE_INFO_TABLE WHERE identifier = ?" + JdbcUtils.withConnection { connection => + withResultSet(connection, query, identifier) { rs => + buildKubernetesMetadata(rs).headOption.orNull + } + } + } + + override def cleanupKubernetesEngineInfoByIdentifier(identifier: String): Unit = { + val query = s"DELETE FROM $KUBERNETES_ENGINE_INFO_TABLE WHERE identifier = ?" + JdbcUtils.withConnection { connection => + execute(connection, query, identifier) + } + } + + override def cleanupKubernetesEngineInfoByAge(maxAge: Long): Unit = { + val minUpdateTime = System.currentTimeMillis() - maxAge + val query = s"DELETE FROM $KUBERNETES_ENGINE_INFO_TABLE WHERE update_time < ?" + JdbcUtils.withConnection { connection => + execute(connection, query, minUpdateTime) + } + } + private def buildMetadata(resultSet: ResultSet): Seq[Metadata] = { try { val metadataList = ListBuffer[Metadata]() @@ -479,6 +529,42 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { } } + private def buildKubernetesMetadata(resultSet: ResultSet): Seq[KubernetesEngineInfo] = { + try { + val metadataList = ListBuffer[KubernetesEngineInfo]() + while (resultSet.next()) { + val identifier = resultSet.getString("identifier") + val context = Option(resultSet.getString("context")) + val namespace = Option(resultSet.getString("namespace")) + val pod = resultSet.getString("pod_name") + val podState = resultSet.getString("pod_state") + val containerState = resultSet.getString("container_state") + val appId = resultSet.getString("engine_id") + val appName = resultSet.getString("engine_name") + val appState = resultSet.getString("engine_state") + val appError = Option(resultSet.getString("engine_error")) + val updateTime = resultSet.getLong("update_time") + + val metadata = KubernetesEngineInfo( + identifier = identifier, + context = context, + namespace = namespace, + podName = pod, + podState = podState, + containerState = containerState, + engineId = appId, + engineName = appName, + engineState = appState, + engineError = appError, + updateTime = updateTime) + metadataList += metadata + } + metadataList.toSeq + } finally { + Utils.tryLogNonFatalError(resultSet.close()) + } + } + private def execute(conn: Connection, sql: String, params: Any*): Unit = { debug(s"execute sql: $sql, with params: ${params.mkString(", ")}") var statement: PreparedStatement = null @@ -610,4 +696,19 @@ object JDBCMetadataStore { "engine_error", "end_time", "peer_instance_closed").mkString(",") + private val KUBERNETES_ENGINE_INFO_TABLE = "k8s_engine_info" + private val KUBERNETES_ENGINE_INFO_KEY_COLUMN = "identifier" + private val KUBERNETES_ENGINE_INFO_COLUMNS = Seq( + KUBERNETES_ENGINE_INFO_KEY_COLUMN, + "context", + "namespace", + "pod_name", + "pod_state", + "container_state", + "engine_id", + "engine_name", + "engine_state", + "engine_error", + "update_time") + private val KUBERNETES_ENGINE_INFO_COLUMNS_STR = KUBERNETES_ENGINE_INFO_COLUMNS.mkString(",") } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala index c000b9b6743..24408e16131 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JdbcDatabaseDialect.scala @@ -19,14 +19,71 @@ package org.apache.kyuubi.server.metadata.jdbc trait JdbcDatabaseDialect { def limitClause(limit: Int, offset: Int): String + def insertOrReplace( + table: String, + cols: Seq[String], + keyCol: String, + keyVal: String): String } class GenericDatabaseDialect extends JdbcDatabaseDialect { override def limitClause(limit: Int, offset: Int): String = { s"LIMIT $limit OFFSET $offset" } + + override def insertOrReplace( + table: String, + cols: Seq[String], + keyCol: String, + keyVal: String): String = { + s""" + |INSERT INTO $table (${cols.mkString(",")}) + |SELECT ${cols.map(_ => "?").mkString(",")} + |WHERE NOT EXISTS ( + | SELECT 1 FROM $table WHERE $keyCol = '$keyVal') + |) + |""".stripMargin + } } -class SQLiteDatabaseDialect extends GenericDatabaseDialect {} -class MySQLDatabaseDialect extends GenericDatabaseDialect {} -class PostgreSQLDatabaseDialect extends GenericDatabaseDialect {} +class SQLiteDatabaseDialect extends GenericDatabaseDialect { + override def insertOrReplace( + table: String, + cols: Seq[String], + keyCol: String, + keyVal: String): String = { + s""" + |INSERT OR REPLACE INTO $table (${cols.mkString(",")}) + |VALUES (${cols.map(_ => "?").mkString(",")}) + |""".stripMargin + } +} +class MySQLDatabaseDialect extends GenericDatabaseDialect { + override def insertOrReplace( + table: String, + cols: Seq[String], + keyCol: String, + keyVal: String): String = { + s""" + |INSERT INTO $table (${cols.mkString(",")}) + |VALUES (${cols.map(_ => "?").mkString(",")}) AS new + |ON DUPLICATE KEY UPDATE + |${cols.filterNot(_ == keyCol).map(c => s"$c = new.$c").mkString(",")} + |""".stripMargin + } +} +class PostgreSQLDatabaseDialect extends GenericDatabaseDialect { + override def insertOrReplace( + table: String, + cols: Seq[String], + keyCol: String, + keyVal: String): String = { + s""" + |INSERT INTO $table (${cols.mkString(",")}) + |VALUES (${cols.map(_ => "?").mkString(",")}) + |ON CONFLICT ($keyCol) + |DO UPDATE SET + |${cols.filterNot(_ == keyCol).map(c => s"$c = EXCLUDED.$c").mkString(",")} + |""".stripMargin + } +} diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala index 897d21cbf15..77f4ca57d53 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreSuite.scala @@ -24,7 +24,9 @@ import org.scalatest.time.SpanSugar._ import org.apache.kyuubi.{KyuubiException, KyuubiFunSuite} import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter} +import org.apache.kyuubi.engine.ApplicationState +import org.apache.kyuubi.server.metadata.MetadataManager +import org.apache.kyuubi.server.metadata.api.{KubernetesEngineInfo, Metadata, MetadataFilter} import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf._ import org.apache.kyuubi.session.SessionType @@ -43,6 +45,7 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite { batch => jdbcMetadataStore.cleanupMetadataByIdentifier(batch.identifier) } + jdbcMetadataStore.cleanupKubernetesEngineInfoByAge(0) jdbcMetadataStore.close() } @@ -266,4 +269,71 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite { assert(jdbcMetadataStore.getLatestSchemaUrl(Seq(url1, url3, url4, url2)).get === url4) assert(jdbcMetadataStore.getLatestSchemaUrl(Seq(url1, url2, url3, url4, url5)).get === url5) } + + test("kubernetes engine info") { + val tag = UUID.randomUUID().toString + val metadata = KubernetesEngineInfo( + identifier = tag, + context = Some("context"), + namespace = Some("namespace"), + podName = "podName", + podState = "podState", + containerState = "containerState", + engineId = "appId", + engineName = "appName", + engineState = "FINISHED", + engineError = Some("appError")) + + jdbcMetadataStore.upsertKubernetesEngineInfo(metadata) + + val metadata2 = jdbcMetadataStore.getKubernetesMetaEngineInfo(tag) + assert(metadata2.identifier == metadata.identifier) + assert(metadata2.context == metadata.context) + assert(metadata2.namespace == metadata.namespace) + assert(metadata2.podName == metadata.podName) + assert(metadata2.podState == metadata.podState) + assert(metadata2.containerState == metadata.containerState) + assert(metadata2.engineId == metadata.engineId) + assert(metadata2.engineName == metadata.engineName) + assert(metadata2.engineState == metadata.engineState) + assert(metadata2.engineError == metadata.engineError) + assert(metadata2.updateTime > 0) + + val metadata3 = KubernetesEngineInfo( + identifier = tag, + context = Some("context2"), + namespace = Some("namespace2"), + podName = "podName2", + podState = "podState2", + containerState = "containerState2", + engineId = "appId2", + engineName = "appName2", + engineState = "FAILED", + engineError = Some("appError2")) + jdbcMetadataStore.upsertKubernetesEngineInfo(metadata3) + + val metadata4 = jdbcMetadataStore.getKubernetesMetaEngineInfo(tag) + assert(metadata4.identifier == metadata3.identifier) + assert(metadata4.context == metadata3.context) + assert(metadata4.namespace == metadata3.namespace) + assert(metadata4.podName == metadata3.podName) + assert(metadata4.podState == metadata3.podState) + assert(metadata4.containerState == metadata3.containerState) + assert(metadata4.engineId == metadata3.engineId) + assert(metadata4.engineName == metadata3.engineName) + assert(metadata4.engineState == metadata3.engineState) + assert(metadata4.engineError == metadata3.engineError) + assert(metadata4.updateTime > metadata2.updateTime) + + val applicationInfo = + MetadataManager.buildApplicationInfo(jdbcMetadataStore.getKubernetesMetaEngineInfo(tag)) + assert(applicationInfo.id == "appId2") + assert(applicationInfo.name == "appName2") + assert(applicationInfo.state == ApplicationState.FAILED) + assert(applicationInfo.error == Some("appError2")) + assert(applicationInfo.podName == Some("podName2")) + + jdbcMetadataStore.cleanupKubernetesEngineInfoByIdentifier(tag) + assert(jdbcMetadataStore.getKubernetesMetaEngineInfo(tag) == null) + } }