From 845ed68183de5cb63525417063eb4b30064b9707 Mon Sep 17 00:00:00 2001 From: August Date: Mon, 2 Dec 2024 22:58:29 +0800 Subject: [PATCH] feat: add rw_internal_table_info to identity which streaming job the internal table belongs (#19642) --- proto/catalog.proto | 3 + .../catalog/system_catalog/rw_catalog/mod.rs | 1 + .../rw_catalog/rw_fragment_id_to_ddl.rs | 2 +- .../rw_catalog/rw_internal_table_info.rs | 55 +++++++++++++++++++ .../rw_catalog/rw_internal_tables.rs | 2 + .../rw_catalog/rw_streaming_parallelism.rs | 2 + src/frontend/src/catalog/table_catalog.rs | 6 ++ .../optimizer/plan_node/stream_materialize.rs | 1 + src/frontend/src/optimizer/plan_node/utils.rs | 1 + .../src/scheduler/distributed/query.rs | 1 + src/meta/model/src/table.rs | 2 +- src/meta/src/controller/mod.rs | 1 + src/meta/src/controller/streaming_job.rs | 1 + src/storage/src/compaction_catalog_manager.rs | 1 + 14 files changed, 77 insertions(+), 2 deletions(-) create mode 100644 src/frontend/src/catalog/system_catalog/rw_catalog/rw_internal_table_info.rs diff --git a/proto/catalog.proto b/proto/catalog.proto index 2f4cc2968a232..990f6ad0d68a1 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -470,6 +470,9 @@ message Table { // The information used by webhook source to validate the incoming data. optional WebhookSourceInfo webhook_info = 41; + // This field stores the job ID for internal tables. + optional uint32 job_id = 42; + // Per-table catalog version, used by schema change. `None` for internal // tables and tests. Not to be confused with the global catalog version for // notification service. diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs index 947560e44e62e..d959a375db447 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs @@ -61,4 +61,5 @@ mod rw_worker_nodes; mod rw_actor_id_to_ddl; mod rw_actor_splits; mod rw_fragment_id_to_ddl; +mod rw_internal_table_info; mod rw_worker_actor_count; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragment_id_to_ddl.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragment_id_to_ddl.rs index 094e85903a31a..49b7f5006b7d6 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragment_id_to_ddl.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragment_id_to_ddl.rs @@ -15,7 +15,7 @@ use risingwave_common::types::Fields; use risingwave_frontend_macro::system_catalog; -/// Provides a mapping from `actor_id` to its ddl info. +/// Provides a mapping from `fragment_id` to its ddl info. #[system_catalog( view, "rw_catalog.rw_fragment_id_to_ddl", diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_internal_table_info.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_internal_table_info.rs new file mode 100644 index 0000000000000..2e7a947f9ad57 --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_internal_table_info.rs @@ -0,0 +1,55 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::Fields; +use risingwave_frontend_macro::system_catalog; + +#[system_catalog( + view, + "rw_catalog.rw_internal_table_info", + "WITH all_streaming_jobs AS ( + SELECT id, name, 'table' as job_type, schema_id, owner FROM rw_tables + UNION ALL + SELECT id, name, 'materialized view' as job_type, schema_id, owner FROM rw_materialized_views + UNION ALL + SELECT id, name, 'sink' as job_type, schema_id, owner FROM rw_sinks + UNION ALL + SELECT id, name, 'index' as job_type, schema_id, owner FROM rw_indexes + UNION ALL + SELECT id, name, 'source' as job_type, schema_id, owner FROM rw_sources WHERE is_shared = true + ) + + SELECT i.id, + i.name, + j.id as job_id, + j.name as job_name, + j.job_type, + s.name as schema_name, + u.name as owner + FROM rw_catalog.rw_internal_tables i + JOIN all_streaming_jobs j ON i.job_id = j.id + JOIN rw_catalog.rw_schemas s ON j.schema_id = s.id + JOIN rw_catalog.rw_users u ON j.owner = u.id" +)] +#[derive(Fields)] +struct RwInternalTableInfo { + #[primary_key] + id: i32, + name: String, + job_id: i32, + job_name: String, + job_type: String, + schema_name: String, + owner: String, +} diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_internal_tables.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_internal_tables.rs index 989226d104a5e..10394ebd23a13 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_internal_tables.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_internal_tables.rs @@ -25,6 +25,7 @@ struct RwInternalTable { id: i32, name: String, schema_id: i32, + job_id: i32, owner: i32, definition: String, acl: Vec, @@ -48,6 +49,7 @@ fn read_rw_internal_tables(reader: &SysCatalogReaderImpl) -> Result, + + pub job_id: Option, } #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] @@ -467,6 +469,7 @@ impl TableCatalog { cdc_table_id: self.cdc_table_id.clone(), maybe_vnode_count: self.vnode_count.to_protobuf(), webhook_info: self.webhook_info.clone(), + job_id: self.job_id.map(|id| id.table_id), } } @@ -660,6 +663,7 @@ impl From for TableCatalog { cdc_table_id: tb.cdc_table_id, vnode_count, webhook_info: tb.webhook_info, + job_id: tb.job_id.map(TableId::from), } } } @@ -752,6 +756,7 @@ mod tests { cdc_table_id: None, maybe_vnode_count: VnodeCount::set(233).to_protobuf(), webhook_info: None, + job_id: None, } .into(); @@ -820,6 +825,7 @@ mod tests { cdc_table_id: None, vnode_count: VnodeCount::set(233), webhook_info: None, + job_id: None, } ); assert_eq!(table, TableCatalog::from(table.to_prost(0, 0))); diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index c8f673f11b52a..8b1e6d622aea3 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -291,6 +291,7 @@ impl StreamMaterialize { cdc_table_id: None, vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later webhook_info, + job_id: None, }) } diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 327380088f29c..b4ced0dbe7c70 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -197,6 +197,7 @@ impl TableCatalogBuilder { cdc_table_id: None, vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later webhook_info: None, + job_id: None, } } diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index d6d63720af04f..12378110ae7ab 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -591,6 +591,7 @@ pub(crate) mod tests { cdc_table_id: None, vnode_count: VnodeCount::set(vnode_count), webhook_info: None, + job_id: None, }; let batch_plan_node: PlanRef = LogicalScan::create( "".to_string(), diff --git a/src/meta/model/src/table.rs b/src/meta/model/src/table.rs index 20fae0c926b1c..ec354d09c211d 100644 --- a/src/meta/model/src/table.rs +++ b/src/meta/model/src/table.rs @@ -243,7 +243,7 @@ impl From for ActiveModel { name: Set(pb_table.name), optional_associated_source_id, table_type: Set(table_type.into()), - belongs_to_job_id: Set(None), + belongs_to_job_id: Set(pb_table.job_id.map(|x| x as _)), columns: Set(pb_table.columns.into()), pk: Set(pb_table.pk.into()), distribution_key: Set(pb_table.distribution_key.into()), diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index c2b6fccf97562..a432bbb6c5d40 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -203,6 +203,7 @@ impl From> for PbTable { cdc_table_id: value.0.cdc_table_id, maybe_vnode_count: VnodeCount::set(value.0.vnode_count).to_protobuf(), webhook_info: value.0.webhook_info.map(|info| info.to_protobuf()), + job_id: value.0.belongs_to_job_id.map(|id| id as _), } } } diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 497b4e6b4c631..8b6e46dd9ec2f 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -383,6 +383,7 @@ impl CatalogController { .oid; table_id_map.insert(table.id, table_id as u32); table.id = table_id as _; + table.job_id = Some(job_id as _); let table_model = table::ActiveModel { table_id: Set(table_id as _), diff --git a/src/storage/src/compaction_catalog_manager.rs b/src/storage/src/compaction_catalog_manager.rs index 22ed5cf4fd0b5..86742b3cc1ec8 100644 --- a/src/storage/src/compaction_catalog_manager.rs +++ b/src/storage/src/compaction_catalog_manager.rs @@ -569,6 +569,7 @@ mod tests { cdc_table_id: None, maybe_vnode_count: None, webhook_info: None, + job_id: None, } }