Skip to content

[KYUUBI #7028] Persist the kubernetes application terminate state into metastore for app info store fallback #7029

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ build/scala-*/**
**/metadata-store-schema*.sql
**/*.derby.sql
**/*.mysql.sql
**/*.postgresql.sql
**/*.sqlite.sql
**/node/**
**/web-ui/dist/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
SELECT '< KYUUBI-7028: Persist Kubernetes metadata into metastore' AS ' ';

CREATE TABLE IF NOT EXISTS k8s_engine_info(
key_id bigint PRIMARY KEY AUTO_INCREMENT COMMENT 'the auto increment key id',
identifier varchar(36) NOT NULL COMMENT 'the identifier id, which is an UUID',
context varchar(32) COMMENT 'the kubernetes context',
namespace varchar(255) COMMENT 'the kubernetes namespace',
pod_name varchar(255) NOT NULL COMMENT 'the kubernetes pod name',
pod_state varchar(32) COMMENT 'the kubernetes pod state',
container_state mediumtext COMMENT 'the kubernetes container state',
engine_id varchar(128) COMMENT 'the engine id',
engine_name mediumtext COMMENT 'the engine name',
engine_state varchar(32) COMMENT 'the engine state',
engine_error mediumtext COMMENT 'the engine diagnose',
update_time bigint COMMENT 'the metadata update time',
UNIQUE INDEX unique_identifier_index(identifier)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
-- the metadata table ddl

CREATE TABLE IF NOT EXISTS metadata(
key_id bigint PRIMARY KEY AUTO_INCREMENT COMMENT 'the auto increment key id',
identifier varchar(36) NOT NULL COMMENT 'the identifier id, which is an UUID',
session_type varchar(32) NOT NULL COMMENT 'the session type, SQL or BATCH',
real_user varchar(255) NOT NULL COMMENT 'the real user',
user_name varchar(255) NOT NULL COMMENT 'the user name, might be a proxy user',
ip_address varchar(128) COMMENT 'the client ip address',
kyuubi_instance varchar(1024) COMMENT 'the kyuubi instance that creates this',
state varchar(128) NOT NULL COMMENT 'the session state',
resource varchar(1024) COMMENT 'the main resource',
class_name varchar(1024) COMMENT 'the main class name',
request_name varchar(1024) COMMENT 'the request name',
request_conf mediumtext COMMENT 'the request config map',
request_args mediumtext COMMENT 'the request arguments',
create_time BIGINT NOT NULL COMMENT 'the metadata create time',
engine_type varchar(32) NOT NULL COMMENT 'the engine type',
cluster_manager varchar(128) COMMENT 'the engine cluster manager',
engine_open_time bigint COMMENT 'the engine open time',
engine_id varchar(128) COMMENT 'the engine application id',
engine_name mediumtext COMMENT 'the engine application name',
engine_url varchar(1024) COMMENT 'the engine tracking url',
engine_state varchar(32) COMMENT 'the engine application state',
engine_error mediumtext COMMENT 'the engine application diagnose',
end_time bigint COMMENT 'the metadata end time',
priority int NOT NULL DEFAULT 10 COMMENT 'the application priority, high value means high priority',
peer_instance_closed boolean default '0' COMMENT 'closed by peer kyuubi instance',
UNIQUE INDEX unique_identifier_index(identifier),
INDEX user_name_index(user_name),
INDEX engine_type_index(engine_type),
INDEX create_time_index(create_time),
-- See more detail about this index in ./005-KYUUBI-5327.mysql.sql
INDEX priority_create_time_index(priority DESC, create_time ASC)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE IF NOT EXISTS k8s_engine_info(
key_id bigint PRIMARY KEY AUTO_INCREMENT COMMENT 'the auto increment key id',
identifier varchar(36) NOT NULL COMMENT 'the identifier id, which is an UUID',
context varchar(32) COMMENT 'the kubernetes context',
namespace varchar(255) COMMENT 'the kubernetes namespace',
pod_name varchar(255) NOT NULL COMMENT 'the kubernetes pod name',
pod_state varchar(32) COMMENT 'the kubernetes pod state',
container_state mediumtext COMMENT 'the kubernetes container state',
engine_id varchar(128) COMMENT 'the engine id',
engine_name mediumtext COMMENT 'the engine name',
engine_state varchar(32) COMMENT 'the engine state',
engine_error mediumtext COMMENT 'the engine diagnose',
update_time bigint COMMENT 'the metadata update time',
UNIQUE INDEX unique_identifier_index(identifier)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
SELECT '< Upgrading MetaStore schema from 1.8.0 to 1.11.0 >' AS ' ';
SOURCE 006-KYUUBI-7028.mysql.sql;
SELECT '< Finished upgrading MetaStore schema from 1.8.0 to 1.11.0 >' AS ' ';
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
SELECT '< KYUUBI-7028: Persist Kubernetes metadata into metastore' AS ' ';

CREATE TABLE IF NOT EXISTS k8s_engine_info(
key_id bigserial PRIMARY KEY,
identifier varchar(36) NOT NULL,
context varchar(32),
namespace varchar(255),
pod_name varchar(255) NOT NULL,
pod_state varchar(32),
container_state text,
engine_id varchar(128),
engine_name text,
engine_state varchar(32),
engine_error text,
update_time bigint NOT NULL
);

COMMENT ON COLUMN k8s_engine_info.key_id IS 'the auto increment key id';
COMMENT ON COLUMN k8s_engine_info.identifier IS 'the identifier id, which is an UUID';
COMMENT ON COLUMN k8s_engine_info.context IS 'the kubernetes context';
COMMENT ON COLUMN k8s_engine_info.namespace IS 'the kubernetes namespace';
COMMENT ON COLUMN k8s_engine_info.pod_name IS 'the kubernetes pod name';
COMMENT ON COLUMN k8s_engine_info.pod_state IS 'the kubernetes pod state';
COMMENT ON COLUMN k8s_engine_info.container_state IS 'the kubernetes container state';
COMMENT ON COLUMN k8s_engine_info.engine_id IS 'the engine id';
COMMENT ON COLUMN k8s_engine_info.engine_name IS 'the engine name';
COMMENT ON COLUMN k8s_engine_info.engine_state IS 'the engine state';
COMMENT ON COLUMN k8s_engine_info.engine_error IS 'the engine diagnose';
COMMENT ON COLUMN k8s_engine_info.update_time IS 'the metadata update time';

CREATE UNIQUE INDEX IF NOT EXISTS k8s_engine_info_unique_identifier_index ON k8s_engine_info(identifier);
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
CREATE TABLE IF NOT EXISTS metadata(
key_id bigserial PRIMARY KEY,
identifier varchar(36) NOT NULL,
session_type varchar(32) NOT NULL,
real_user varchar(255) NOT NULL,
user_name varchar(255) NOT NULL,
ip_address varchar(128),
kyuubi_instance varchar(1024),
state varchar(128) NOT NULL,
resource varchar(1024),
class_name varchar(1024),
request_name varchar(1024),
request_conf text,
request_args text,
create_time bigint NOT NULL,
engine_type varchar(32) NOT NULL,
cluster_manager varchar(128),
engine_open_time bigint,
engine_id varchar(128),
engine_name text,
engine_url varchar(1024),
engine_state varchar(32),
engine_error text,
end_time bigint,
priority int NOT NULL DEFAULT 10,
peer_instance_closed boolean DEFAULT FALSE
);

COMMENT ON COLUMN metadata.key_id IS 'the auto increment key id';
COMMENT ON COLUMN metadata.identifier IS 'the identifier id, which is an UUID';
COMMENT ON COLUMN metadata.session_type IS 'the session type, SQL or BATCH';
COMMENT ON COLUMN metadata.real_user IS 'the real user';
COMMENT ON COLUMN metadata.user_name IS 'the user name, might be a proxy user';
COMMENT ON COLUMN metadata.ip_address IS 'the client ip address';
COMMENT ON COLUMN metadata.kyuubi_instance IS 'the kyuubi instance that creates this';
COMMENT ON COLUMN metadata.state IS 'the session state';
COMMENT ON COLUMN metadata.resource IS 'the main resource';
COMMENT ON COLUMN metadata.class_name IS 'the main class name';
COMMENT ON COLUMN metadata.request_name IS 'the request name';
COMMENT ON COLUMN metadata.request_conf IS 'the request config map';
COMMENT ON COLUMN metadata.request_args IS 'the request arguments';
COMMENT ON COLUMN metadata.create_time IS 'the metadata create time';
COMMENT ON COLUMN metadata.engine_type IS 'the engine type';
COMMENT ON COLUMN metadata.cluster_manager IS 'the engine cluster manager';
COMMENT ON COLUMN metadata.engine_open_time IS 'the engine open time';
COMMENT ON COLUMN metadata.engine_id IS 'the engine application id';
COMMENT ON COLUMN metadata.engine_name IS 'the engine application name';
COMMENT ON COLUMN metadata.engine_url IS 'the engine tracking url';
COMMENT ON COLUMN metadata.engine_state IS 'the engine application state';
COMMENT ON COLUMN metadata.engine_error IS 'the engine application diagnose';
COMMENT ON COLUMN metadata.end_time IS 'the metadata end time';
COMMENT ON COLUMN metadata.priority IS 'the application priority, high value means high priority';
COMMENT ON COLUMN metadata.peer_instance_closed IS 'closed by peer kyuubi instance';

CREATE UNIQUE INDEX IF NOT EXISTS unique_identifier_index ON metadata(identifier);
CREATE INDEX IF NOT EXISTS user_name_index ON metadata(user_name);
CREATE INDEX IF NOT EXISTS engine_type_index ON metadata(engine_type);
CREATE INDEX IF NOT EXISTS create_time_index ON metadata(create_time);
CREATE INDEX IF NOT EXISTS priority_create_time_index ON metadata(priority DESC, create_time ASC);

CREATE TABLE IF NOT EXISTS k8s_engine_info(
key_id bigserial PRIMARY KEY,
identifier varchar(36) NOT NULL,
context varchar(32),
namespace varchar(255),
pod_name varchar(255) NOT NULL,
pod_state varchar(32),
container_state text,
engine_id varchar(128),
engine_name text,
engine_state varchar(32),
engine_error text,
update_time bigint NOT NULL
);

COMMENT ON COLUMN k8s_engine_info.key_id IS 'the auto increment key id';
COMMENT ON COLUMN k8s_engine_info.identifier IS 'the identifier id, which is an UUID';
COMMENT ON COLUMN k8s_engine_info.context IS 'the kubernetes context';
COMMENT ON COLUMN k8s_engine_info.namespace IS 'the kubernetes namespace';
COMMENT ON COLUMN k8s_engine_info.pod_name IS 'the kubernetes pod name';
COMMENT ON COLUMN k8s_engine_info.pod_state IS 'the kubernetes pod state';
COMMENT ON COLUMN k8s_engine_info.container_state IS 'the kubernetes container state';
COMMENT ON COLUMN k8s_engine_info.engine_id IS 'the engine id';
COMMENT ON COLUMN k8s_engine_info.engine_name IS 'the engine name';
COMMENT ON COLUMN k8s_engine_info.engine_state IS 'the engine state';
COMMENT ON COLUMN k8s_engine_info.engine_error IS 'the engine diagnose';
COMMENT ON COLUMN k8s_engine_info.update_time IS 'the metadata update time';

CREATE UNIQUE INDEX IF NOT EXISTS k8s_engine_info_unique_identifier_index ON k8s_engine_info(identifier);
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
SELECT '< Upgrading MetaStore schema from 1.9.0 to 1.11.0 >' AS ' ';
\i 001-KYUUBI-7028.postgresql.sql
SELECT '< Finished upgrading MetaStore schema from 1.9.0 to 1.11.0 >' AS ' ';
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- the k8s_engine_info table ddl
CREATE TABLE IF NOT EXISTS k8s_engine_info(
key_id INTEGER PRIMARY KEY AUTOINCREMENT, -- the auto increment key id
identifier varchar(36) NOT NULL, -- the identifier id, which is an UUID
context varchar(32), -- the kubernetes context
namespace varchar(255), -- the kubernetes namespace
pod_name varchar(255) NOT NULL, -- the kubernetes pod name
pod_state varchar(32), -- the kubernetes pod state
container_state mediumtext, -- the kubernetes container state
engine_id varchar(128), -- the engine id
engine_name mediumtext, -- the engine name
engine_state varchar(32), -- the engine state
engine_error mediumtext, -- the engine diagnose
update_time bigint -- the metadata update time
);

CREATE UNIQUE INDEX IF NOT EXISTS k8s_engine_info_unique_identifier_index ON k8s_engine_info(identifier);
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
-- the metadata table ddl

CREATE TABLE IF NOT EXISTS metadata(
key_id INTEGER PRIMARY KEY AUTOINCREMENT, -- the auto increment key id
identifier varchar(36) NOT NULL, -- the identifier id, which is an UUID
session_type varchar(32) NOT NULL, -- the session type, SQL or BATCH
real_user varchar(255) NOT NULL, -- the real user
user_name varchar(255) NOT NULL, -- the user name, might be a proxy user
ip_address varchar(128), -- the client ip address
kyuubi_instance varchar(1024), -- the kyuubi instance that creates this
state varchar(128) NOT NULL, -- the session state
resource varchar(1024), -- the main resource
class_name varchar(1024), -- the main class name
request_name varchar(1024), -- the request name
request_conf mediumtext, -- the request config map
request_args mediumtext, -- the request arguments
create_time BIGINT NOT NULL, -- the metadata create time
engine_type varchar(32) NOT NULL, -- the engine type
cluster_manager varchar(128), -- the engine cluster manager
engine_open_time bigint, -- the engine open time
engine_id varchar(128), -- the engine application id
engine_name mediumtext, -- the engine application name
engine_url varchar(1024), -- the engine tracking url
engine_state varchar(32), -- the engine application state
engine_error mediumtext, -- the engine application diagnose
end_time bigint, -- the metadata end time
priority INTEGER NOT NULL DEFAULT 10, -- the application priority, high value means high priority
peer_instance_closed boolean default '0' -- closed by peer kyuubi instance
);

CREATE UNIQUE INDEX IF NOT EXISTS metadata_unique_identifier_index ON metadata(identifier);

CREATE INDEX IF NOT EXISTS metadata_user_name_index ON metadata(user_name);

CREATE INDEX IF NOT EXISTS metadata_engine_type_index ON metadata(engine_type);

CREATE INDEX IF NOT EXISTS metadata_create_time_index ON metadata(create_time);

CREATE INDEX IF NOT EXISTS metadata_priority_create_time_index ON metadata(priority, create_time);

-- the k8s_engine_info table ddl
CREATE TABLE IF NOT EXISTS k8s_engine_info(
key_id INTEGER PRIMARY KEY AUTOINCREMENT, -- the auto increment key id
identifier varchar(36) NOT NULL, -- the identifier id, which is an UUID
context varchar(32), -- the kubernetes context
namespace varchar(255), -- the kubernetes namespace
pod_name varchar(255) NOT NULL, -- the kubernetes pod name
pod_state varchar(32), -- the kubernetes pod state
container_state mediumtext, -- the kubernetes container state
engine_id varchar(128), -- the engine id
engine_name mediumtext, -- the engine name
engine_state varchar(32), -- the engine state
engine_error mediumtext, -- the engine diagnose
update_time bigint -- the metadata update time
);

CREATE UNIQUE INDEX IF NOT EXISTS k8s_engine_info_unique_identifier_index ON k8s_engine_info(identifier);
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
SELECT '< Upgrading MetaStore schema from 1.8.0 to 1.11.0 >' AS ' ';
.read 001-KYUUBI-7028.sqlite.sql
SELECT '< Finished upgrading MetaStore schema from 1.8.0 to 1.11.0 >' AS ' ';
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState
import org.apache.kyuubi.engine.KubernetesResourceEventTypes.KubernetesResourceEventType
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.metadata.MetadataManager
import org.apache.kyuubi.server.metadata.api.KubernetesEngineInfo
import org.apache.kyuubi.util.{KubernetesUtils, ThreadUtils}

class KubernetesApplicationOperation extends ApplicationOperation with Logging {
Expand Down Expand Up @@ -255,8 +256,13 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
try {
// need to initialize the kubernetes client if not exists
getOrCreateKubernetesClient(appMgrInfo.kubernetesInfo)
val (_, appInfo) =
appInfoStore.getOrDefault(tag, appMgrInfo.kubernetesInfo -> ApplicationInfo.NOT_FOUND)
val appInfo = appInfoStore.get(tag) match {
case (_, info) => info
case _ =>
// try to get the application info from kubernetes engine info store
metadataManager.flatMap(
_.getKubernetesApplicationInfo(tag)).getOrElse(ApplicationInfo.NOT_FOUND)
}
(appInfo.state, submitTime) match {
// Kyuubi should wait second if pod is not be created
case (NOT_FOUND, Some(_submitTime)) =>
Expand Down Expand Up @@ -340,7 +346,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
updateApplicationState(kubernetesInfo, newPod, eventType)
val appState = toApplicationState(newPod, appStateSource, appStateContainer, eventType)
if (isTerminated(appState)) {
markApplicationTerminated(newPod, eventType)
markApplicationTerminated(kubernetesInfo, newPod, eventType)
}
KubernetesApplicationAuditLogger.audit(
eventType,
Expand All @@ -358,7 +364,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
if (isSparkEnginePod(pod)) {
val eventType = KubernetesResourceEventTypes.DELETE
updateApplicationState(kubernetesInfo, pod, eventType)
markApplicationTerminated(pod, eventType)
markApplicationTerminated(kubernetesInfo, pod, eventType)
KubernetesApplicationAuditLogger.audit(
eventType,
kubernetesInfo,
Expand Down Expand Up @@ -456,13 +462,28 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
}

private def markApplicationTerminated(
kubernetesInfo: KubernetesInfo,
pod: Pod,
eventType: KubernetesResourceEventType): Unit = synchronized {
val key = pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)
val (appState, appError) =
toApplicationStateAndError(pod, appStateSource, appStateContainer, eventType)
// upsert the kubernetes engine info store when the application is terminated
metadataManager.foreach(_.upsertKubernetesMetadata(
KubernetesEngineInfo(
identifier = key,
context = kubernetesInfo.context,
namespace = kubernetesInfo.namespace,
podName = pod.getMetadata.getName,
podState = pod.getStatus.getPhase,
containerState = pod.getStatus.getContainerStatuses.asScala.map(cs =>
s"${cs.getName}->${cs.getState}").mkString(","),
engineId = getPodAppId(pod),
engineName = getPodAppName(pod),
engineState = appState.toString,
engineError = appError)))
if (cleanupTerminatedAppInfoTrigger.getIfPresent(key) == null) {
cleanupTerminatedAppInfoTrigger.put(
key,
toApplicationState(pod, appStateSource, appStateContainer, eventType))
cleanupTerminatedAppInfoTrigger.put(key, appState)
}
}

Expand Down
Loading
Loading