diff --git a/.licenserc.yaml b/.licenserc.yaml index 93449f89340cb..6d63d62f092fb 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -18,7 +18,7 @@ header: - "src/sqlparser/**/*.rs" - "java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/internal/*.java" - "java/connector-node/risingwave-sink-iceberg/src/main/java/org/apache/iceberg/**/*.java" - - "src/meta/model_v2/migration/**/*.rs" + - "src/meta/model/migration/**/*.rs" - "lints/ui/**" comment: on-failure diff --git a/Cargo.lock b/Cargo.lock index f94f64782b5aa..00cc6a1386645 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10412,7 +10412,7 @@ dependencies = [ "prost 0.13.1", "risingwave_common", "risingwave_hummock_sdk", - "risingwave_meta_model_v2", + "risingwave_meta_model", "risingwave_object_store", "risingwave_pb", "serde", @@ -11097,8 +11097,8 @@ dependencies = [ "risingwave_frontend", "risingwave_hummock_sdk", "risingwave_meta", + "risingwave_meta_model", "risingwave_meta_model_migration", - "risingwave_meta_model_v2", "risingwave_object_store", "risingwave_pb", "risingwave_rpc_client", @@ -11578,8 +11578,8 @@ dependencies = [ "risingwave_hummock_sdk", "risingwave_license", "risingwave_meta_dashboard", + "risingwave_meta_model", "risingwave_meta_model_migration", - "risingwave_meta_model_v2", "risingwave_object_store", "risingwave_pb", "risingwave_rpc_client", @@ -11629,28 +11629,28 @@ dependencies = [ ] [[package]] -name = "risingwave_meta_model_migration" +name = "risingwave_meta_model" version = "2.1.0-rc.1" dependencies = [ - "async-std", + "prost 0.13.1", + "risingwave_common", + "risingwave_hummock_sdk", + "risingwave_pb", "sea-orm", - "sea-orm-migration", "serde", "serde_json", - "uuid", ] [[package]] -name = "risingwave_meta_model_v2" +name = "risingwave_meta_model_migration" version = "2.1.0-rc.1" dependencies = [ - "prost 0.13.1", - "risingwave_common", - "risingwave_hummock_sdk", - "risingwave_pb", + "async-std", "sea-orm", + "sea-orm-migration", "serde", "serde_json", + "uuid", ] [[package]] @@ -11704,7 +11704,7 @@ dependencies = [ "risingwave_connector", "risingwave_hummock_sdk", "risingwave_meta", - "risingwave_meta_model_v2", + "risingwave_meta_model", "risingwave_pb", "sea-orm", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 8a402e419de13..70dfc27c8e7d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,8 +29,8 @@ members = [ "src/license", "src/meta", "src/meta/dashboard", - "src/meta/model_v2", - "src/meta/model_v2/migration", + "src/meta/model", + "src/meta/model/migration", "src/meta/node", "src/meta/service", "src/object_store", @@ -228,8 +228,8 @@ risingwave_mem_table_spill_test = { path = "./src/stream/spill_test" } risingwave_meta = { path = "./src/meta" } risingwave_meta_dashboard = { path = "./src/meta/dashboard" } risingwave_meta_service = { path = "./src/meta/service" } -risingwave_meta_model_migration = { path = "src/meta/model_v2/migration" } -risingwave_meta_model_v2 = { path = "./src/meta/model_v2" } +risingwave_meta_model = { path = "src/meta/model" } +risingwave_meta_model_migration = { path = "src/meta/model/migration" } risingwave_meta_node = { path = "./src/meta/node" } risingwave_object_store = { path = "./src/object_store" } risingwave_pb = { path = "./src/prost" } diff --git a/src/ctl/Cargo.toml b/src/ctl/Cargo.toml index d36a4fa2b93b7..7b0fc940de946 100644 --- a/src/ctl/Cargo.toml +++ b/src/ctl/Cargo.toml @@ -32,8 +32,8 @@ risingwave_connector = { workspace = true } risingwave_frontend = { workspace = true } risingwave_hummock_sdk = { workspace = true } risingwave_meta = { workspace = true } +risingwave_meta_model = { workspace = true } risingwave_meta_model_migration = { workspace = true } -risingwave_meta_model_v2 = { workspace = true } risingwave_object_store = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } diff --git a/src/ctl/src/cmd_impl/meta/reschedule.rs b/src/ctl/src/cmd_impl/meta/reschedule.rs index 2c46ceee8d6be..1b042b55d1fd1 100644 --- a/src/ctl/src/cmd_impl/meta/reschedule.rs +++ b/src/ctl/src/cmd_impl/meta/reschedule.rs @@ -19,7 +19,7 @@ use anyhow::{anyhow, Result}; use inquire::Confirm; use itertools::Itertools; use regex::Regex; -use risingwave_meta_model_v2::WorkerId; +use risingwave_meta_model::WorkerId; use risingwave_pb::common::WorkerNode; use risingwave_pb::meta::{GetClusterInfoResponse, PbWorkerReschedule}; use serde::{Deserialize, Serialize}; diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 28a281ba5b191..4773a88802184 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -58,8 +58,8 @@ risingwave_connector = { workspace = true } risingwave_hummock_sdk = { workspace = true } risingwave_license = { workspace = true } risingwave_meta_dashboard = { workspace = true } +risingwave_meta_model = { workspace = true } risingwave_meta_model_migration = { workspace = true } -risingwave_meta_model_v2 = { workspace = true } risingwave_object_store = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } diff --git a/src/meta/model_v2/Cargo.toml b/src/meta/model/Cargo.toml similarity index 94% rename from src/meta/model_v2/Cargo.toml rename to src/meta/model/Cargo.toml index 942b6adffa070..991becc820642 100644 --- a/src/meta/model_v2/Cargo.toml +++ b/src/meta/model/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "risingwave_meta_model_v2" +name = "risingwave_meta_model" version = { workspace = true } edition = { workspace = true } homepage = { workspace = true } diff --git a/src/meta/model_v2/migration/Cargo.toml b/src/meta/model/migration/Cargo.toml similarity index 100% rename from src/meta/model_v2/migration/Cargo.toml rename to src/meta/model/migration/Cargo.toml diff --git a/src/meta/model_v2/migration/README.md b/src/meta/model/migration/README.md similarity index 100% rename from src/meta/model_v2/migration/README.md rename to src/meta/model/migration/README.md diff --git a/src/meta/model_v2/migration/src/lib.rs b/src/meta/model/migration/src/lib.rs similarity index 100% rename from src/meta/model_v2/migration/src/lib.rs rename to src/meta/model/migration/src/lib.rs diff --git a/src/meta/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/model/migration/src/m20230908_072257_init.rs similarity index 100% rename from src/meta/model_v2/migration/src/m20230908_072257_init.rs rename to src/meta/model/migration/src/m20230908_072257_init.rs diff --git a/src/meta/model_v2/migration/src/m20231008_020431_hummock.rs b/src/meta/model/migration/src/m20231008_020431_hummock.rs similarity index 100% rename from src/meta/model_v2/migration/src/m20231008_020431_hummock.rs rename to src/meta/model/migration/src/m20231008_020431_hummock.rs diff --git a/src/meta/model_v2/migration/src/m20240304_074901_subscription.rs b/src/meta/model/migration/src/m20240304_074901_subscription.rs similarity index 100% rename from src/meta/model_v2/migration/src/m20240304_074901_subscription.rs rename to src/meta/model/migration/src/m20240304_074901_subscription.rs diff --git a/src/meta/model_v2/migration/src/m20240410_082733_with_version_column_migration.rs b/src/meta/model/migration/src/m20240410_082733_with_version_column_migration.rs similarity index 100% rename from src/meta/model_v2/migration/src/m20240410_082733_with_version_column_migration.rs rename to src/meta/model/migration/src/m20240410_082733_with_version_column_migration.rs diff --git a/src/meta/model_v2/migration/src/m20240410_154406_session_params.rs b/src/meta/model/migration/src/m20240410_154406_session_params.rs similarity index 100% rename from src/meta/model_v2/migration/src/m20240410_154406_session_params.rs rename to src/meta/model/migration/src/m20240410_154406_session_params.rs diff --git a/src/meta/model_v2/migration/src/m20240417_062305_subscription_internal_table_name.rs b/src/meta/model/migration/src/m20240417_062305_subscription_internal_table_name.rs similarity index 100% rename from src/meta/model_v2/migration/src/m20240417_062305_subscription_internal_table_name.rs rename to src/meta/model/migration/src/m20240417_062305_subscription_internal_table_name.rs diff --git a/src/meta/model_v2/migration/src/m20240418_142249_function_runtime.rs b/src/meta/model/migration/src/m20240418_142249_function_runtime.rs similarity index 100% rename from src/meta/model_v2/migration/src/m20240418_142249_function_runtime.rs rename to src/meta/model/migration/src/m20240418_142249_function_runtime.rs diff --git a/src/meta/model_v2/migration/src/m20240506_112555_subscription_partial_ckpt.rs b/src/meta/model/migration/src/m20240506_112555_subscription_partial_ckpt.rs similarity index 100% rename from src/meta/model_v2/migration/src/m20240506_112555_subscription_partial_ckpt.rs rename to src/meta/model/migration/src/m20240506_112555_subscription_partial_ckpt.rs diff --git a/src/meta/model_v2/migration/src/m20240525_090457_secret.rs b/src/meta/model/migration/src/m20240525_090457_secret.rs similarity index 100% rename from src/meta/model_v2/migration/src/m20240525_090457_secret.rs rename to src/meta/model/migration/src/m20240525_090457_secret.rs diff --git a/src/meta/model_v2/migration/src/m20240617_070131_index_column_properties.rs b/src/meta/model/migration/src/m20240617_070131_index_column_properties.rs similarity index 100% rename from src/meta/model_v2/migration/src/m20240617_070131_index_column_properties.rs rename to src/meta/model/migration/src/m20240617_070131_index_column_properties.rs diff --git a/src/meta/model_v2/migration/src/m20240617_071625_sink_into_table_column.rs b/src/meta/model/migration/src/m20240617_071625_sink_into_table_column.rs similarity index 100% rename from src/meta/model_v2/migration/src/m20240617_071625_sink_into_table_column.rs rename to src/meta/model/migration/src/m20240617_071625_sink_into_table_column.rs diff --git a/src/meta/model_v2/migration/src/m20240618_072634_function_compressed_binary.rs b/src/meta/model/migration/src/m20240618_072634_function_compressed_binary.rs similarity index 100% rename from src/meta/model_v2/migration/src/m20240618_072634_function_compressed_binary.rs rename to src/meta/model/migration/src/m20240618_072634_function_compressed_binary.rs diff --git a/src/meta/model_v2/migration/src/m20240630_131430_remove_parallel_unit.rs b/src/meta/model/migration/src/m20240630_131430_remove_parallel_unit.rs similarity index 100% rename from src/meta/model_v2/migration/src/m20240630_131430_remove_parallel_unit.rs rename to src/meta/model/migration/src/m20240630_131430_remove_parallel_unit.rs diff --git a/src/meta/model_v2/migration/src/m20240701_060504_hummock_time_travel.rs b/src/meta/model/migration/src/m20240701_060504_hummock_time_travel.rs similarity index 100% rename from src/meta/model_v2/migration/src/m20240701_060504_hummock_time_travel.rs rename to src/meta/model/migration/src/m20240701_060504_hummock_time_travel.rs diff --git a/src/meta/model_v2/migration/src/m20240702_080451_system_param_value.rs b/src/meta/model/migration/src/m20240702_080451_system_param_value.rs similarity index 100% rename from src/meta/model_v2/migration/src/m20240702_080451_system_param_value.rs rename to src/meta/model/migration/src/m20240702_080451_system_param_value.rs diff --git a/src/meta/model_v2/migration/src/m20240702_084927_unnecessary_fk.rs b/src/meta/model/migration/src/m20240702_084927_unnecessary_fk.rs similarity index 100% rename from src/meta/model_v2/migration/src/m20240702_084927_unnecessary_fk.rs rename to src/meta/model/migration/src/m20240702_084927_unnecessary_fk.rs diff --git a/src/meta/model_v2/migration/src/m20240726_063833_auto_schema_change.rs b/src/meta/model/migration/src/m20240726_063833_auto_schema_change.rs similarity index 100% rename from src/meta/model_v2/migration/src/m20240726_063833_auto_schema_change.rs rename to src/meta/model/migration/src/m20240726_063833_auto_schema_change.rs diff --git a/src/meta/model_v2/migration/src/m20240806_143329_add_rate_limit_to_source_catalog.rs b/src/meta/model/migration/src/m20240806_143329_add_rate_limit_to_source_catalog.rs similarity index 100% rename from src/meta/model_v2/migration/src/m20240806_143329_add_rate_limit_to_source_catalog.rs rename to src/meta/model/migration/src/m20240806_143329_add_rate_limit_to_source_catalog.rs diff --git a/src/meta/model_v2/migration/src/m20240820_081248_add_time_travel_per_table_epoch.rs b/src/meta/model/migration/src/m20240820_081248_add_time_travel_per_table_epoch.rs similarity index 100% rename from src/meta/model_v2/migration/src/m20240820_081248_add_time_travel_per_table_epoch.rs rename to src/meta/model/migration/src/m20240820_081248_add_time_travel_per_table_epoch.rs diff --git a/src/meta/model_v2/migration/src/m20240911_083152_variable_vnode_count.rs b/src/meta/model/migration/src/m20240911_083152_variable_vnode_count.rs similarity index 100% rename from src/meta/model_v2/migration/src/m20240911_083152_variable_vnode_count.rs rename to src/meta/model/migration/src/m20240911_083152_variable_vnode_count.rs diff --git a/src/meta/model_v2/migration/src/m20241016_065621_hummock_gc_history.rs b/src/meta/model/migration/src/m20241016_065621_hummock_gc_history.rs similarity index 100% rename from src/meta/model_v2/migration/src/m20241016_065621_hummock_gc_history.rs rename to src/meta/model/migration/src/m20241016_065621_hummock_gc_history.rs diff --git a/src/meta/model_v2/migration/src/main.rs b/src/meta/model/migration/src/main.rs similarity index 100% rename from src/meta/model_v2/migration/src/main.rs rename to src/meta/model/migration/src/main.rs diff --git a/src/meta/model_v2/src/README.md b/src/meta/model/src/README.md similarity index 71% rename from src/meta/model_v2/src/README.md rename to src/meta/model/src/README.md index 48095d3e6d67f..539093816119c 100644 --- a/src/meta/model_v2/src/README.md +++ b/src/meta/model/src/README.md @@ -1,15 +1,18 @@ # How to define changes between versions and generate migration and model files -- Generate a new migration file and apply it to the database, check [migration](../migration/README.md) for more details. Let's take a local PG database as an example(`postgres://postgres:@localhost:5432/postgres`): +- Generate a new migration file and apply it to the database, check [migration](../migration/README.md) for more + details. Let's take a local PG database as an example(`postgres://postgres:@localhost:5432/postgres`): ```sh export DATABASE_URL=postgres://postgres:@localhost:5432/postgres; cargo run -- generate MIGRATION_NAME cargo run -- up ``` - - Define tables, indexes, foreign keys in the file. The new generated file will include a sample migration script, - you can replace it with your own migration scripts, like defining or changing tables, indexes, foreign keys and other - dml operation to do data correctness etc. Check [writing-migration](https://www.sea-ql.org/SeaORM/docs/migration/writing-migration/) - for more details. + - Define tables, indexes, foreign keys in the file. The new generated file will include a sample migration script, + you can replace it with your own migration scripts, like defining or changing tables, indexes, foreign keys and + other + dml operation to do data correctness etc. + Check [writing-migration](https://www.sea-ql.org/SeaORM/docs/migration/writing-migration/) + for more details. ```rust #[async_trait::async_trait] impl MigrationTrait for Migration { @@ -24,11 +27,12 @@ } } ``` -- Apply migration, and generate model files for new tables and indexes from the database, so you don't need to write them manually, +- Apply migration, and generate model files for new tables and indexes from the database, so you don't need to write + them manually, ```sh cargo run -- up sea-orm-cli generate entity -u postgres://postgres:@localhost:5432/postgres -s public -o {target_dir} - cp {target_dir}/xxx.rs src/meta/src/model_v2/ + cp {target_dir}/xxx.rs src/meta/src/model/ ``` - Defines enum and array types in the model files, since they're basically only supported in PG, and we need to define them in the model files manually. For example: diff --git a/src/meta/model_v2/src/actor.rs b/src/meta/model/src/actor.rs similarity index 100% rename from src/meta/model_v2/src/actor.rs rename to src/meta/model/src/actor.rs diff --git a/src/meta/model_v2/src/actor_dispatcher.rs b/src/meta/model/src/actor_dispatcher.rs similarity index 100% rename from src/meta/model_v2/src/actor_dispatcher.rs rename to src/meta/model/src/actor_dispatcher.rs diff --git a/src/meta/model_v2/src/catalog_version.rs b/src/meta/model/src/catalog_version.rs similarity index 100% rename from src/meta/model_v2/src/catalog_version.rs rename to src/meta/model/src/catalog_version.rs diff --git a/src/meta/model_v2/src/cluster.rs b/src/meta/model/src/cluster.rs similarity index 100% rename from src/meta/model_v2/src/cluster.rs rename to src/meta/model/src/cluster.rs diff --git a/src/meta/model_v2/src/compaction_config.rs b/src/meta/model/src/compaction_config.rs similarity index 100% rename from src/meta/model_v2/src/compaction_config.rs rename to src/meta/model/src/compaction_config.rs diff --git a/src/meta/model_v2/src/compaction_status.rs b/src/meta/model/src/compaction_status.rs similarity index 100% rename from src/meta/model_v2/src/compaction_status.rs rename to src/meta/model/src/compaction_status.rs diff --git a/src/meta/model_v2/src/compaction_task.rs b/src/meta/model/src/compaction_task.rs similarity index 100% rename from src/meta/model_v2/src/compaction_task.rs rename to src/meta/model/src/compaction_task.rs diff --git a/src/meta/model_v2/src/connection.rs b/src/meta/model/src/connection.rs similarity index 100% rename from src/meta/model_v2/src/connection.rs rename to src/meta/model/src/connection.rs diff --git a/src/meta/model_v2/src/database.rs b/src/meta/model/src/database.rs similarity index 100% rename from src/meta/model_v2/src/database.rs rename to src/meta/model/src/database.rs diff --git a/src/meta/model_v2/src/fragment.rs b/src/meta/model/src/fragment.rs similarity index 100% rename from src/meta/model_v2/src/fragment.rs rename to src/meta/model/src/fragment.rs diff --git a/src/meta/model_v2/src/function.rs b/src/meta/model/src/function.rs similarity index 100% rename from src/meta/model_v2/src/function.rs rename to src/meta/model/src/function.rs diff --git a/src/meta/model_v2/src/hummock_epoch_to_version.rs b/src/meta/model/src/hummock_epoch_to_version.rs similarity index 100% rename from src/meta/model_v2/src/hummock_epoch_to_version.rs rename to src/meta/model/src/hummock_epoch_to_version.rs diff --git a/src/meta/model_v2/src/hummock_gc_history.rs b/src/meta/model/src/hummock_gc_history.rs similarity index 100% rename from src/meta/model_v2/src/hummock_gc_history.rs rename to src/meta/model/src/hummock_gc_history.rs diff --git a/src/meta/model_v2/src/hummock_pinned_snapshot.rs b/src/meta/model/src/hummock_pinned_snapshot.rs similarity index 100% rename from src/meta/model_v2/src/hummock_pinned_snapshot.rs rename to src/meta/model/src/hummock_pinned_snapshot.rs diff --git a/src/meta/model_v2/src/hummock_pinned_version.rs b/src/meta/model/src/hummock_pinned_version.rs similarity index 100% rename from src/meta/model_v2/src/hummock_pinned_version.rs rename to src/meta/model/src/hummock_pinned_version.rs diff --git a/src/meta/model_v2/src/hummock_sequence.rs b/src/meta/model/src/hummock_sequence.rs similarity index 100% rename from src/meta/model_v2/src/hummock_sequence.rs rename to src/meta/model/src/hummock_sequence.rs diff --git a/src/meta/model_v2/src/hummock_sstable_info.rs b/src/meta/model/src/hummock_sstable_info.rs similarity index 100% rename from src/meta/model_v2/src/hummock_sstable_info.rs rename to src/meta/model/src/hummock_sstable_info.rs diff --git a/src/meta/model_v2/src/hummock_time_travel_delta.rs b/src/meta/model/src/hummock_time_travel_delta.rs similarity index 100% rename from src/meta/model_v2/src/hummock_time_travel_delta.rs rename to src/meta/model/src/hummock_time_travel_delta.rs diff --git a/src/meta/model_v2/src/hummock_time_travel_version.rs b/src/meta/model/src/hummock_time_travel_version.rs similarity index 100% rename from src/meta/model_v2/src/hummock_time_travel_version.rs rename to src/meta/model/src/hummock_time_travel_version.rs diff --git a/src/meta/model_v2/src/hummock_version_delta.rs b/src/meta/model/src/hummock_version_delta.rs similarity index 100% rename from src/meta/model_v2/src/hummock_version_delta.rs rename to src/meta/model/src/hummock_version_delta.rs diff --git a/src/meta/model_v2/src/hummock_version_stats.rs b/src/meta/model/src/hummock_version_stats.rs similarity index 100% rename from src/meta/model_v2/src/hummock_version_stats.rs rename to src/meta/model/src/hummock_version_stats.rs diff --git a/src/meta/model_v2/src/index.rs b/src/meta/model/src/index.rs similarity index 100% rename from src/meta/model_v2/src/index.rs rename to src/meta/model/src/index.rs diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model/src/lib.rs similarity index 100% rename from src/meta/model_v2/src/lib.rs rename to src/meta/model/src/lib.rs diff --git a/src/meta/model_v2/src/object.rs b/src/meta/model/src/object.rs similarity index 100% rename from src/meta/model_v2/src/object.rs rename to src/meta/model/src/object.rs diff --git a/src/meta/model_v2/src/object_dependency.rs b/src/meta/model/src/object_dependency.rs similarity index 100% rename from src/meta/model_v2/src/object_dependency.rs rename to src/meta/model/src/object_dependency.rs diff --git a/src/meta/model_v2/src/prelude.rs b/src/meta/model/src/prelude.rs similarity index 100% rename from src/meta/model_v2/src/prelude.rs rename to src/meta/model/src/prelude.rs diff --git a/src/meta/model_v2/src/schema.rs b/src/meta/model/src/schema.rs similarity index 100% rename from src/meta/model_v2/src/schema.rs rename to src/meta/model/src/schema.rs diff --git a/src/meta/model_v2/src/secret.rs b/src/meta/model/src/secret.rs similarity index 100% rename from src/meta/model_v2/src/secret.rs rename to src/meta/model/src/secret.rs diff --git a/src/meta/model_v2/src/serde_seaql_migration.rs b/src/meta/model/src/serde_seaql_migration.rs similarity index 100% rename from src/meta/model_v2/src/serde_seaql_migration.rs rename to src/meta/model/src/serde_seaql_migration.rs diff --git a/src/meta/model_v2/src/session_parameter.rs b/src/meta/model/src/session_parameter.rs similarity index 100% rename from src/meta/model_v2/src/session_parameter.rs rename to src/meta/model/src/session_parameter.rs diff --git a/src/meta/model_v2/src/sink.rs b/src/meta/model/src/sink.rs similarity index 100% rename from src/meta/model_v2/src/sink.rs rename to src/meta/model/src/sink.rs diff --git a/src/meta/model_v2/src/source.rs b/src/meta/model/src/source.rs similarity index 100% rename from src/meta/model_v2/src/source.rs rename to src/meta/model/src/source.rs diff --git a/src/meta/model_v2/src/streaming_job.rs b/src/meta/model/src/streaming_job.rs similarity index 100% rename from src/meta/model_v2/src/streaming_job.rs rename to src/meta/model/src/streaming_job.rs diff --git a/src/meta/model_v2/src/subscription.rs b/src/meta/model/src/subscription.rs similarity index 100% rename from src/meta/model_v2/src/subscription.rs rename to src/meta/model/src/subscription.rs diff --git a/src/meta/model_v2/src/system_parameter.rs b/src/meta/model/src/system_parameter.rs similarity index 100% rename from src/meta/model_v2/src/system_parameter.rs rename to src/meta/model/src/system_parameter.rs diff --git a/src/meta/model_v2/src/table.rs b/src/meta/model/src/table.rs similarity index 100% rename from src/meta/model_v2/src/table.rs rename to src/meta/model/src/table.rs diff --git a/src/meta/model_v2/src/user.rs b/src/meta/model/src/user.rs similarity index 100% rename from src/meta/model_v2/src/user.rs rename to src/meta/model/src/user.rs diff --git a/src/meta/model_v2/src/user_privilege.rs b/src/meta/model/src/user_privilege.rs similarity index 100% rename from src/meta/model_v2/src/user_privilege.rs rename to src/meta/model/src/user_privilege.rs diff --git a/src/meta/model_v2/src/view.rs b/src/meta/model/src/view.rs similarity index 100% rename from src/meta/model_v2/src/view.rs rename to src/meta/model/src/view.rs diff --git a/src/meta/model_v2/src/worker.rs b/src/meta/model/src/worker.rs similarity index 100% rename from src/meta/model_v2/src/worker.rs rename to src/meta/model/src/worker.rs diff --git a/src/meta/model_v2/src/worker_property.rs b/src/meta/model/src/worker_property.rs similarity index 100% rename from src/meta/model_v2/src/worker_property.rs rename to src/meta/model/src/worker_property.rs diff --git a/src/meta/service/Cargo.toml b/src/meta/service/Cargo.toml index 69986f8570234..53c3708da0e12 100644 --- a/src/meta/service/Cargo.toml +++ b/src/meta/service/Cargo.toml @@ -26,7 +26,7 @@ risingwave_common = { workspace = true } risingwave_connector = { workspace = true } risingwave_hummock_sdk = { workspace = true } risingwave_meta = { workspace = true } -risingwave_meta_model_v2 = { workspace = true } +risingwave_meta_model = { workspace = true } risingwave_pb = { workspace = true } sea-orm = { workspace = true } serde_json = "1" diff --git a/src/meta/service/src/cloud_service.rs b/src/meta/service/src/cloud_service.rs index c609d202b0d66..e913b91826b6f 100644 --- a/src/meta/service/src/cloud_service.rs +++ b/src/meta/service/src/cloud_service.rs @@ -24,7 +24,7 @@ use risingwave_connector::source::{ }; use risingwave_connector::{dispatch_source_prop, WithOptionsSecResolved}; use risingwave_meta::manager::MetadataManager; -use risingwave_meta_model_v2::ConnectionId; +use risingwave_meta_model::ConnectionId; use risingwave_pb::catalog::connection::Info::PrivateLinkService; use risingwave_pb::cloud_service::cloud_service_server::CloudService; use risingwave_pb::cloud_service::rw_cloud_validate_source_response::{Error, ErrorType}; diff --git a/src/meta/service/src/cluster_limit_service.rs b/src/meta/service/src/cluster_limit_service.rs index 20a1ed6d6ba53..83aae536e7e56 100644 --- a/src/meta/service/src/cluster_limit_service.rs +++ b/src/meta/service/src/cluster_limit_service.rs @@ -19,7 +19,7 @@ use risingwave_common::util::cluster_limit::{ }; use risingwave_meta::manager::{MetaSrvEnv, MetadataManager}; use risingwave_meta::MetaResult; -use risingwave_meta_model_v2::WorkerId; +use risingwave_meta_model::WorkerId; use risingwave_pb::common::worker_node::State; use risingwave_pb::common::WorkerType; use risingwave_pb::meta::cluster_limit_service_server::ClusterLimitService; diff --git a/src/meta/service/src/cluster_service.rs b/src/meta/service/src/cluster_service.rs index 5d008af75a6df..e346ebd446ea9 100644 --- a/src/meta/service/src/cluster_service.rs +++ b/src/meta/service/src/cluster_service.rs @@ -14,7 +14,7 @@ use risingwave_meta::barrier::BarrierManagerRef; use risingwave_meta::manager::MetadataManager; -use risingwave_meta_model_v2::WorkerId; +use risingwave_meta_model::WorkerId; use risingwave_pb::common::worker_node::State; use risingwave_pb::common::HostAddress; use risingwave_pb::meta::cluster_service_server::ClusterService; diff --git a/src/meta/service/src/scale_service.rs b/src/meta/service/src/scale_service.rs index a40dd259b102f..889df5b33904c 100644 --- a/src/meta/service/src/scale_service.rs +++ b/src/meta/service/src/scale_service.rs @@ -16,7 +16,7 @@ use risingwave_common::catalog::TableId; use risingwave_meta::manager::MetadataManager; use risingwave_meta::model::TableParallelism; use risingwave_meta::stream::{RescheduleOptions, ScaleControllerRef, WorkerReschedule}; -use risingwave_meta_model_v2::FragmentId; +use risingwave_meta_model::FragmentId; use risingwave_pb::common::WorkerType; use risingwave_pb::meta::scale_service_server::ScaleService; use risingwave_pb::meta::{ diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index 43c698b22e64e..4adde9de62a26 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -21,7 +21,7 @@ use risingwave_meta::manager::{LocalNotification, MetadataManager}; use risingwave_meta::model; use risingwave_meta::model::ActorId; use risingwave_meta::stream::{SourceManagerRunningInfo, ThrottleConfig}; -use risingwave_meta_model_v2::{SourceId, StreamingParallelism}; +use risingwave_meta_model::{SourceId, StreamingParallelism}; use risingwave_pb::meta::cancel_creating_jobs_request::Jobs; use risingwave_pb::meta::list_actor_splits_response::FragmentType; use risingwave_pb::meta::list_table_fragments_response::{ diff --git a/src/meta/service/src/telemetry_service.rs b/src/meta/service/src/telemetry_service.rs index 32cd653e830af..be76bc05bcf04 100644 --- a/src/meta/service/src/telemetry_service.rs +++ b/src/meta/service/src/telemetry_service.rs @@ -13,7 +13,7 @@ // limitations under the License. use risingwave_meta::controller::SqlMetaStore; -use risingwave_meta_model_v2::prelude::Cluster; +use risingwave_meta_model::prelude::Cluster; use risingwave_pb::meta::telemetry_info_service_server::TelemetryInfoService; use risingwave_pb::meta::{GetTelemetryInfoRequest, TelemetryInfoResponse}; use sea_orm::EntityTrait; diff --git a/src/meta/service/src/user_service.rs b/src/meta/service/src/user_service.rs index 8e9571aea62ea..319bbbebbd04a 100644 --- a/src/meta/service/src/user_service.rs +++ b/src/meta/service/src/user_service.rs @@ -14,7 +14,7 @@ use itertools::Itertools; use risingwave_meta::manager::MetadataManager; -use risingwave_meta_model_v2::UserId; +use risingwave_meta_model::UserId; use risingwave_pb::user::grant_privilege::Object; use risingwave_pb::user::update_user_request::UpdateField; use risingwave_pb::user::user_service_server::UserService; diff --git a/src/meta/src/backup_restore/backup_manager.rs b/src/meta/src/backup_restore/backup_manager.rs index 0cdf660544c89..414c97bab1fd7 100644 --- a/src/meta/src/backup_restore/backup_manager.rs +++ b/src/meta/src/backup_restore/backup_manager.rs @@ -31,7 +31,7 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation}; use thiserror_ext::AsReport; use tokio::task::JoinHandle; -use crate::backup_restore::meta_snapshot_builder_v2; +use crate::backup_restore::meta_snapshot_builder; use crate::backup_restore::metrics::BackupManagerMetrics; use crate::hummock::sequence::next_meta_backup_id; use crate::hummock::{HummockManagerRef, HummockVersionSafePoint}; @@ -353,7 +353,7 @@ impl BackupWorker { }; let meta_store = backup_manager_clone.env.meta_store(); let mut snapshot_builder = - meta_snapshot_builder_v2::MetaSnapshotV2Builder::new(meta_store); + meta_snapshot_builder::MetaSnapshotV2Builder::new(meta_store); // Reuse job id as snapshot id. snapshot_builder .build(job_id, hummock_version_builder) diff --git a/src/meta/src/backup_restore/meta_snapshot_builder_v2.rs b/src/meta/src/backup_restore/meta_snapshot_builder.rs similarity index 95% rename from src/meta/src/backup_restore/meta_snapshot_builder_v2.rs rename to src/meta/src/backup_restore/meta_snapshot_builder.rs index 9e4ad6a0c05a1..ab29020f2de51 100644 --- a/src/meta/src/backup_restore/meta_snapshot_builder_v2.rs +++ b/src/meta/src/backup_restore/meta_snapshot_builder.rs @@ -19,7 +19,7 @@ use risingwave_backup::error::{BackupError, BackupResult}; use risingwave_backup::meta_snapshot_v2::{MetaSnapshotV2, MetadataV2}; use risingwave_backup::MetaSnapshotId; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; -use risingwave_meta_model_v2 as model_v2; +use risingwave_meta_model as model; use risingwave_pb::hummock::PbHummockVersionDelta; use sea_orm::{DbErr, EntityTrait, QueryOrder, TransactionTrait}; @@ -83,8 +83,8 @@ impl MetaSnapshotV2Builder { ) .await .map_err(map_db_err)?; - let version_deltas = model_v2::prelude::HummockVersionDelta::find() - .order_by_asc(model_v2::hummock_version_delta::Column::Id) + let version_deltas = model::prelude::HummockVersionDelta::find() + .order_by_asc(model::hummock_version_delta::Column::Id) .all(&txn) .await .map_err(map_db_err)? diff --git a/src/meta/src/backup_restore/mod.rs b/src/meta/src/backup_restore/mod.rs index dad089dbc8551..c493d98d06fdf 100644 --- a/src/meta/src/backup_restore/mod.rs +++ b/src/meta/src/backup_restore/mod.rs @@ -15,7 +15,7 @@ mod backup_manager; pub use backup_manager::*; mod error; -mod meta_snapshot_builder_v2; +mod meta_snapshot_builder; mod metrics; mod restore; mod restore_impl; diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 6190350627c3c..a5b7efe63098d 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -22,7 +22,7 @@ use risingwave_common::hash::ActorMapping; use risingwave_common::types::Timestamptz; use risingwave_common::util::epoch::Epoch; use risingwave_connector::source::SplitImpl; -use risingwave_meta_model_v2::{ObjectId, WorkerId}; +use risingwave_meta_model::{ObjectId, WorkerId}; use risingwave_pb::catalog::{CreateType, Table}; use risingwave_pb::common::PbWorkerNode; use risingwave_pb::meta::table_fragments::PbActorStatus; @@ -1116,7 +1116,7 @@ impl CommandContext { // Apply the split changes in source manager. self.barrier_manager_context .source_manager - .drop_source_fragments(std::slice::from_ref(old_table_fragments)) + .drop_source_fragments_vec(std::slice::from_ref(old_table_fragments)) .await; let source_fragments = new_table_fragments.stream_source_fragments(); // XXX: is it possible to have backfill fragments here? diff --git a/src/meta/src/barrier/creating_job/barrier_control.rs b/src/meta/src/barrier/creating_job/barrier_control.rs index 90ac3119f4c37..83a1dd9cfb5e7 100644 --- a/src/meta/src/barrier/creating_job/barrier_control.rs +++ b/src/meta/src/barrier/creating_job/barrier_control.rs @@ -21,7 +21,7 @@ use std::time::Instant; use prometheus::HistogramTimer; use risingwave_common::catalog::TableId; use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge}; -use risingwave_meta_model_v2::WorkerId; +use risingwave_meta_model::WorkerId; use risingwave_pb::stream_service::BarrierCompleteResponse; use tracing::debug; diff --git a/src/meta/src/barrier/creating_job/mod.rs b/src/meta/src/barrier/creating_job/mod.rs index 0598cd319c590..f3ad5a44aa929 100644 --- a/src/meta/src/barrier/creating_job/mod.rs +++ b/src/meta/src/barrier/creating_job/mod.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use risingwave_common::catalog::TableId; use risingwave_common::metrics::LabelGuardedIntGauge; -use risingwave_meta_model_v2::WorkerId; +use risingwave_meta_model::WorkerId; use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; diff --git a/src/meta/src/barrier/creating_job/status.rs b/src/meta/src/barrier/creating_job/status.rs index 093747249f1df..f599990eff999 100644 --- a/src/meta/src/barrier/creating_job/status.rs +++ b/src/meta/src/barrier/creating_job/status.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use risingwave_common::hash::ActorId; use risingwave_common::util::epoch::Epoch; -use risingwave_meta_model_v2::WorkerId; +use risingwave_meta_model::WorkerId; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::StreamActor; diff --git a/src/meta/src/barrier/info.rs b/src/meta/src/barrier/info.rs index 2fdfd40b091e6..b02240f402ba0 100644 --- a/src/meta/src/barrier/info.rs +++ b/src/meta/src/barrier/info.rs @@ -15,7 +15,7 @@ use std::collections::{HashMap, HashSet}; use risingwave_common::catalog::TableId; -use risingwave_meta_model_v2::WorkerId; +use risingwave_meta_model::WorkerId; use risingwave_pb::common::WorkerNode; use tracing::warn; diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index d349e7bbfe0a4..efee9b715c316 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -36,7 +36,7 @@ use risingwave_hummock_sdk::table_watermark::{ merge_multiple_new_table_watermarks, TableWatermarks, }; use risingwave_hummock_sdk::{HummockSstableObjectId, LocalSstableInfo}; -use risingwave_meta_model_v2::WorkerId; +use risingwave_meta_model::WorkerId; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::{PausedReason, PbRecoveryStatus}; diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 782b82739bbf9..f10c0e29b60f7 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -18,7 +18,7 @@ use std::mem::take; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::Epoch; -use risingwave_meta_model_v2::ObjectId; +use risingwave_meta_model::ObjectId; use risingwave_pb::catalog::CreateType; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index a4bcbbe8b4a5c..6af1fe305dec3 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -21,7 +21,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::config::DefaultParallelism; use risingwave_common::hash::WorkerSlotId; use risingwave_common::util::epoch::Epoch; -use risingwave_meta_model_v2::{StreamingParallelism, WorkerId}; +use risingwave_meta_model::{StreamingParallelism, WorkerId}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::{PausedReason, Recovery}; use risingwave_pb::stream_plan::barrier_mutation::Mutation; diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 320e09dc5eb1e..6e608489177be 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -26,7 +26,7 @@ use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::util::tracing::TracingContext; use risingwave_hummock_sdk::HummockVersionId; -use risingwave_meta_model_v2::WorkerId; +use risingwave_meta_model::WorkerId; use risingwave_pb::common::{ActorInfo, WorkerNode}; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::{Barrier, BarrierMutation, StreamActor, SubscriptionUpstreamInfo}; diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 049d8806dba70..ec761a39e37d9 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -25,10 +25,10 @@ use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut; use risingwave_common::{bail, current_cluster_version}; use risingwave_connector::source::cdc::build_cdc_table_id; use risingwave_connector::source::UPSTREAM_SOURCE_KEY; -use risingwave_meta_model_v2::object::ObjectType; -use risingwave_meta_model_v2::prelude::*; -use risingwave_meta_model_v2::table::TableType; -use risingwave_meta_model_v2::{ +use risingwave_meta_model::object::ObjectType; +use risingwave_meta_model::prelude::*; +use risingwave_meta_model::table::TableType; +use risingwave_meta_model::{ actor, connection, database, fragment, function, index, object, object_dependency, schema, secret, sink, source, streaming_job, subscription, table, user_privilege, view, ActorId, ActorUpstreamActors, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index 52de9c812c32d..8d58ba8fd6c1b 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -26,9 +26,9 @@ use risingwave_common::util::resource_util::cpu::total_cpu_available; use risingwave_common::util::resource_util::memory::system_memory_available_bytes; use risingwave_common::RW_VERSION; use risingwave_license::LicenseManager; -use risingwave_meta_model_v2::prelude::{Worker, WorkerProperty}; -use risingwave_meta_model_v2::worker::{WorkerStatus, WorkerType}; -use risingwave_meta_model_v2::{worker, worker_property, TransactionId, WorkerId}; +use risingwave_meta_model::prelude::{Worker, WorkerProperty}; +use risingwave_meta_model::worker::{WorkerStatus, WorkerType}; +use risingwave_meta_model::{worker, worker_property, TransactionId, WorkerId}; use risingwave_pb::common::worker_node::{PbProperty, PbResource, PbState}; use risingwave_pb::common::{HostAddress, PbHostAddress, PbWorkerNode, PbWorkerType, WorkerNode}; use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 693cf67597cd5..a79b890cade20 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -22,15 +22,15 @@ use risingwave_common::bail; use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId}; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_common::util::worker_util::WorkerNodeId; -use risingwave_meta_model_migration::{Alias, SelectStatement}; -use risingwave_meta_model_v2::actor::ActorStatus; -use risingwave_meta_model_v2::fragment::DistributionType; -use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob}; -use risingwave_meta_model_v2::{ +use risingwave_meta_model::actor::ActorStatus; +use risingwave_meta_model::fragment::DistributionType; +use risingwave_meta_model::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob}; +use risingwave_meta_model::{ actor, actor_dispatcher, fragment, sink, streaming_job, ActorId, ActorUpstreamActors, ConnectorSplits, ExprContext, FragmentId, I32Array, JobStatus, ObjectId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId, }; +use risingwave_meta_model_migration::{Alias, SelectStatement}; use risingwave_pb::common::PbActorLocation; use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Operation as NotificationOperation, @@ -1500,9 +1500,9 @@ mod tests { use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_common::util::stream_graph_visitor::visit_stream_node; - use risingwave_meta_model_v2::actor::ActorStatus; - use risingwave_meta_model_v2::fragment::DistributionType; - use risingwave_meta_model_v2::{ + use risingwave_meta_model::actor::ActorStatus; + use risingwave_meta_model::fragment::DistributionType; + use risingwave_meta_model::{ actor, actor_dispatcher, fragment, ActorId, ActorUpstreamActors, ConnectorSplits, ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId, VnodeBitmap, }; diff --git a/src/meta/src/controller/id.rs b/src/meta/src/controller/id.rs index 34cc51b326087..5318512afe6c2 100644 --- a/src/meta/src/controller/id.rs +++ b/src/meta/src/controller/id.rs @@ -15,8 +15,8 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use risingwave_meta_model_v2::prelude::{Actor, Fragment}; -use risingwave_meta_model_v2::{actor, fragment}; +use risingwave_meta_model::prelude::{Actor, Fragment}; +use risingwave_meta_model::{actor, fragment}; use sea_orm::sea_query::{Expr, Func}; use sea_orm::{DatabaseConnection, EntityTrait, QuerySelect}; diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 43cb514df6aa9..c7cf45daad9e7 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -17,7 +17,7 @@ use std::collections::BTreeMap; use anyhow::anyhow; use risingwave_common::hash::VnodeCount; use risingwave_common::util::epoch::Epoch; -use risingwave_meta_model_v2::{ +use risingwave_meta_model::{ connection, database, function, index, object, schema, secret, sink, source, subscription, table, view, }; diff --git a/src/meta/src/controller/scale.rs b/src/meta/src/controller/scale.rs index 4c7fcf482cbda..65dd58ff1d34e 100644 --- a/src/meta/src/controller/scale.rs +++ b/src/meta/src/controller/scale.rs @@ -19,18 +19,18 @@ use itertools::Itertools; use risingwave_common::bitmap::Bitmap; use risingwave_common::hash; use risingwave_connector::source::{SplitImpl, SplitMetaData}; +use risingwave_meta_model::actor::ActorStatus; +use risingwave_meta_model::actor_dispatcher::DispatcherType; +use risingwave_meta_model::fragment::DistributionType; +use risingwave_meta_model::prelude::{Actor, ActorDispatcher, Fragment, StreamingJob}; +use risingwave_meta_model::{ + actor, actor_dispatcher, fragment, streaming_job, ActorId, ActorMapping, ActorUpstreamActors, + ConnectorSplits, FragmentId, I32Array, ObjectId, VnodeBitmap, +}; use risingwave_meta_model_migration::{ Alias, CommonTableExpression, Expr, IntoColumnRef, QueryStatementBuilder, SelectStatement, UnionType, WithClause, WithQuery, }; -use risingwave_meta_model_v2::actor::ActorStatus; -use risingwave_meta_model_v2::actor_dispatcher::DispatcherType; -use risingwave_meta_model_v2::fragment::DistributionType; -use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, StreamingJob}; -use risingwave_meta_model_v2::{ - actor, actor_dispatcher, fragment, streaming_job, ActorId, ActorMapping, ActorUpstreamActors, - ConnectorSplits, FragmentId, I32Array, ObjectId, VnodeBitmap, -}; use sea_orm::{ ColumnTrait, ConnectionTrait, DbErr, DerivePartialModel, EntityTrait, FromQueryResult, JoinType, QueryFilter, QuerySelect, RelationTrait, Statement, TransactionTrait, diff --git a/src/meta/src/controller/session_params.rs b/src/meta/src/controller/session_params.rs index 81ad467d573ef..ad3b93f12bd05 100644 --- a/src/meta/src/controller/session_params.rs +++ b/src/meta/src/controller/session_params.rs @@ -16,8 +16,8 @@ use std::sync::Arc; use itertools::Itertools; use risingwave_common::session_config::{SessionConfig, SessionConfigError}; -use risingwave_meta_model_v2::prelude::SessionParameter; -use risingwave_meta_model_v2::session_parameter; +use risingwave_meta_model::prelude::SessionParameter; +use risingwave_meta_model::session_parameter; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::SetSessionParamRequest; use sea_orm::ActiveValue::Set; diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 7d742f5aa64fc..e8adc309d10e2 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -19,15 +19,15 @@ use itertools::Itertools; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_common::{bail, current_cluster_version}; -use risingwave_meta_model_v2::actor::ActorStatus; -use risingwave_meta_model_v2::actor_dispatcher::DispatcherType; -use risingwave_meta_model_v2::object::ObjectType; -use risingwave_meta_model_v2::prelude::{ +use risingwave_meta_model::actor::ActorStatus; +use risingwave_meta_model::actor_dispatcher::DispatcherType; +use risingwave_meta_model::object::ObjectType; +use risingwave_meta_model::prelude::{ Actor, ActorDispatcher, Fragment, Index, Object, ObjectDependency, Sink, Source, StreamingJob as StreamingJobModel, Table, }; -use risingwave_meta_model_v2::table::TableType; -use risingwave_meta_model_v2::{ +use risingwave_meta_model::table::TableType; +use risingwave_meta_model::{ actor, actor_dispatcher, fragment, index, object, object_dependency, sink, source, streaming_job, table, ActorId, ActorUpstreamActors, ColumnCatalogArray, CreateType, DatabaseId, ExprNodeArray, FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SinkId, SourceId, @@ -1648,10 +1648,9 @@ impl CatalogController { // Only hash dispatcher needs mapping if dispatcher.dispatcher_type.as_ref() == &DispatcherType::Hash { - dispatcher.hash_mapping = - Set(upstream_dispatcher_mapping.as_ref().map(|m| { - risingwave_meta_model_v2::ActorMapping::from(&m.to_protobuf()) - })); + dispatcher.hash_mapping = Set(upstream_dispatcher_mapping + .as_ref() + .map(|m| risingwave_meta_model::ActorMapping::from(&m.to_protobuf()))); } let mut new_downstream_actor_ids = diff --git a/src/meta/src/controller/system_param.rs b/src/meta/src/controller/system_param.rs index fdeee871a898b..4bb36c8e1962c 100644 --- a/src/meta/src/controller/system_param.rs +++ b/src/meta/src/controller/system_param.rs @@ -22,8 +22,8 @@ use risingwave_common::system_param::{ check_missing_params, default, derive_missing_fields, set_system_param, }; use risingwave_common::{for_all_params, key_of}; -use risingwave_meta_model_v2::prelude::SystemParameter; -use risingwave_meta_model_v2::system_parameter; +use risingwave_meta_model::prelude::SystemParameter; +use risingwave_meta_model::system_parameter; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::PbSystemParams; use sea_orm::ActiveValue::Set; diff --git a/src/meta/src/controller/user.rs b/src/meta/src/controller/user.rs index 744f3cf112f6e..3a8d728c84fd9 100644 --- a/src/meta/src/controller/user.rs +++ b/src/meta/src/controller/user.rs @@ -16,9 +16,9 @@ use std::collections::{HashMap, HashSet}; use itertools::Itertools; use risingwave_common::catalog::{DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_FOR_PG}; -use risingwave_meta_model_v2::prelude::{Object, User, UserPrivilege}; -use risingwave_meta_model_v2::user_privilege::Action; -use risingwave_meta_model_v2::{object, user, user_privilege, AuthInfo, PrivilegeId, UserId}; +use risingwave_meta_model::prelude::{Object, User, UserPrivilege}; +use risingwave_meta_model::user_privilege::Action; +use risingwave_meta_model::{object, user, user_privilege, AuthInfo, PrivilegeId, UserId}; use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Operation as NotificationOperation, }; @@ -469,7 +469,7 @@ impl CatalogController { #[cfg(test)] mod tests { - use risingwave_meta_model_v2::DatabaseId; + use risingwave_meta_model::DatabaseId; use risingwave_pb::user::grant_privilege::{PbAction, PbActionWithGrantOption, PbObject}; use super::*; diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 7a32dbd927db7..aaa71b1f21cb3 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -19,17 +19,17 @@ use itertools::Itertools; use risingwave_common::bitmap::Bitmap; use risingwave_common::hash; use risingwave_common::hash::{ActorMapping, WorkerSlotId, WorkerSlotMapping}; -use risingwave_meta_model_migration::WithQuery; -use risingwave_meta_model_v2::actor::ActorStatus; -use risingwave_meta_model_v2::fragment::DistributionType; -use risingwave_meta_model_v2::object::ObjectType; -use risingwave_meta_model_v2::prelude::*; -use risingwave_meta_model_v2::{ +use risingwave_meta_model::actor::ActorStatus; +use risingwave_meta_model::fragment::DistributionType; +use risingwave_meta_model::object::ObjectType; +use risingwave_meta_model::prelude::*; +use risingwave_meta_model::{ actor, actor_dispatcher, connection, database, fragment, function, index, object, object_dependency, schema, secret, sink, source, subscription, table, user, user_privilege, view, ActorId, ConnectorSplits, DataTypeArray, DatabaseId, FragmentId, I32Array, ObjectId, PrivilegeId, SchemaId, SourceId, StreamNode, UserId, VnodeBitmap, WorkerId, }; +use risingwave_meta_model_migration::WithQuery; use risingwave_pb::catalog::{ PbConnection, PbFunction, PbIndex, PbSecret, PbSink, PbSource, PbSubscription, PbTable, PbView, }; diff --git a/src/meta/src/dashboard/mod.rs b/src/meta/src/dashboard/mod.rs index f863cf494e098..2bef0505ce443 100644 --- a/src/meta/src/dashboard/mod.rs +++ b/src/meta/src/dashboard/mod.rs @@ -57,7 +57,7 @@ pub(super) mod handlers { use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common_heap_profiling::COLLAPSED_SUFFIX; - use risingwave_meta_model_v2::WorkerId; + use risingwave_meta_model::WorkerId; use risingwave_pb::catalog::table::TableType; use risingwave_pb::catalog::{PbDatabase, PbSchema, Sink, Source, Subscription, Table, View}; use risingwave_pb::common::{WorkerNode, WorkerType}; diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index 78051e28e7cbd..8bfe188d4a3fa 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -16,7 +16,7 @@ use risingwave_common::error::BoxedError; use risingwave_common::session_config::SessionConfigError; use risingwave_connector::error::ConnectorError; use risingwave_connector::sink::SinkError; -use risingwave_meta_model_v2::WorkerId; +use risingwave_meta_model::WorkerId; use risingwave_pb::PbFieldNotFound; use risingwave_rpc_client::error::{RpcError, ToTonicStatus}; diff --git a/src/meta/src/hummock/compactor_manager.rs b/src/meta/src/hummock/compactor_manager.rs index f71f6da0345ae..c2b5f54c1b512 100644 --- a/src/meta/src/hummock/compactor_manager.rs +++ b/src/meta/src/hummock/compactor_manager.rs @@ -135,7 +135,7 @@ pub struct CompactorManagerInner { impl CompactorManagerInner { pub async fn with_meta(env: MetaSrvEnv) -> MetaResult { - use risingwave_meta_model_v2::compaction_task; + use risingwave_meta_model::compaction_task; use sea_orm::EntityTrait; // Retrieve the existing task assignments from metastore. let task_assignment: Vec = compaction_task::Entity::find() diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs index a1a85f57a466c..77822a0d6f3cc 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs @@ -25,7 +25,7 @@ use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId}; use risingwave_hummock_sdk::version::GroupDelta; use risingwave_hummock_sdk::CompactionGroupId; -use risingwave_meta_model_v2::compaction_config; +use risingwave_meta_model::compaction_config; use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index 0a12f57a17d8b..6256c09ce75a7 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -23,7 +23,7 @@ use risingwave_hummock_sdk::{ HummockContextId, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, INVALID_VERSION_ID, }; -use risingwave_meta_model_v2::hummock_gc_history; +use risingwave_meta_model::hummock_gc_history; use risingwave_pb::hummock::{HummockPinnedVersion, ValidationTask}; use sea_orm::{DatabaseConnection, EntityTrait}; @@ -328,7 +328,7 @@ async fn check_gc_history( object_ids: impl IntoIterator, ) -> Result<()> { let futures = object_ids.into_iter().map(|id| async move { - let id: risingwave_meta_model_v2::HummockSstableObjectId = id.try_into().unwrap(); + let id: risingwave_meta_model::HummockSstableObjectId = id.try_into().unwrap(); hummock_gc_history::Entity::find_by_id(id) .one(db) .await diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index e57e9479c9487..6c8eb32e6a6c8 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -23,9 +23,9 @@ use futures::future::try_join_all; use itertools::Itertools; use parking_lot::Mutex; use risingwave_hummock_sdk::HummockSstableObjectId; +use risingwave_meta_model::hummock_sequence::HUMMOCK_NOW; +use risingwave_meta_model::{hummock_gc_history, hummock_sequence}; use risingwave_meta_model_migration::OnConflict; -use risingwave_meta_model_v2::hummock_sequence::HUMMOCK_NOW; -use risingwave_meta_model_v2::{hummock_gc_history, hummock_sequence}; use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent; use risingwave_pb::hummock::FullScanTask; use risingwave_pb::stream_service::GetMinUncommittedSstIdRequest; diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 9db4c20383fb7..68479d64727a2 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -26,7 +26,7 @@ use risingwave_hummock_sdk::{ version_archive_dir, version_checkpoint_path, CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockVersionId, }; -use risingwave_meta_model_v2::{ +use risingwave_meta_model::{ compaction_status, compaction_task, hummock_pinned_version, hummock_version_delta, hummock_version_stats, }; diff --git a/src/meta/src/hummock/manager/sequence.rs b/src/meta/src/hummock/manager/sequence.rs index cbd6bbf362b0f..5e852ae155e66 100644 --- a/src/meta/src/hummock/manager/sequence.rs +++ b/src/meta/src/hummock/manager/sequence.rs @@ -17,11 +17,11 @@ use std::fmt::Display; use std::sync::LazyLock; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; -use risingwave_meta_model_v2::hummock_sequence; -use risingwave_meta_model_v2::hummock_sequence::{ +use risingwave_meta_model::hummock_sequence; +use risingwave_meta_model::hummock_sequence::{ COMPACTION_GROUP_ID, COMPACTION_TASK_ID, META_BACKUP_ID, SSTABLE_OBJECT_ID, }; -use risingwave_meta_model_v2::prelude::HummockSequence; +use risingwave_meta_model::prelude::HummockSequence; use sea_orm::{ActiveModelTrait, ActiveValue, DatabaseConnection, EntityTrait, TransactionTrait}; use tokio::sync::Mutex; diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 2743824df98bc..987919d13fa42 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -121,7 +121,7 @@ fn get_compaction_group_object_ids( } async fn list_pinned_version_from_meta_store(env: &MetaSrvEnv) -> Vec { - use risingwave_meta_model_v2::hummock_pinned_version; + use risingwave_meta_model::hummock_pinned_version; use sea_orm::EntityTrait; hummock_pinned_version::Entity::find() .all(&env.meta_store_ref().conn) diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index 8e564ad231bb7..f9d2534fb8c6b 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -27,8 +27,8 @@ use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion, HummockV use risingwave_hummock_sdk::{ CompactionGroupId, HummockEpoch, HummockSstableId, HummockSstableObjectId, }; -use risingwave_meta_model_v2::hummock_sstable_info::SstableInfoV2Backend; -use risingwave_meta_model_v2::{ +use risingwave_meta_model::hummock_sstable_info::SstableInfoV2Backend; +use risingwave_meta_model::{ hummock_epoch_to_version, hummock_sstable_info, hummock_time_travel_delta, hummock_time_travel_version, }; @@ -73,7 +73,7 @@ impl HummockManager { let version_watermark = hummock_epoch_to_version::Entity::find() .filter( hummock_epoch_to_version::Column::Epoch - .lt(risingwave_meta_model_v2::Epoch::try_from(epoch_watermark).unwrap()), + .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()), ) .order_by_desc(hummock_epoch_to_version::Column::Epoch) .order_by_asc(hummock_epoch_to_version::Column::VersionId) @@ -86,7 +86,7 @@ impl HummockManager { let res = hummock_epoch_to_version::Entity::delete_many() .filter( hummock_epoch_to_version::Column::Epoch - .lt(risingwave_meta_model_v2::Epoch::try_from(epoch_watermark).unwrap()), + .lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()), ) .exec(&txn) .await?; @@ -113,7 +113,7 @@ impl HummockManager { earliest_valid_version.get_sst_ids(), ) }; - let version_ids_to_delete: Vec = + let version_ids_to_delete: Vec = hummock_time_travel_version::Entity::find() .select_only() .column(hummock_time_travel_version::Column::VersionId) @@ -125,7 +125,7 @@ impl HummockManager { .into_tuple() .all(&txn) .await?; - let delta_ids_to_delete: Vec = + let delta_ids_to_delete: Vec = hummock_time_travel_delta::Entity::find() .select_only() .column(hummock_time_travel_delta::Column::VersionId) @@ -225,7 +225,7 @@ impl HummockManager { pub(crate) async fn all_object_ids_in_time_travel( &self, ) -> Result> { - let object_ids: Vec = + let object_ids: Vec = hummock_sstable_info::Entity::find() .select_only() .column(hummock_sstable_info::Column::ObjectId) @@ -265,7 +265,7 @@ impl HummockManager { ) .filter( hummock_epoch_to_version::Column::Epoch - .lte(risingwave_meta_model_v2::Epoch::try_from(query_epoch).unwrap()), + .lte(risingwave_meta_model::Epoch::try_from(query_epoch).unwrap()), ) .order_by_desc(hummock_epoch_to_version::Column::Epoch) .one(&sql_store.conn) @@ -419,7 +419,7 @@ impl HummockManager { ) .await?; let m = hummock_time_travel_version::ActiveModel { - version_id: Set(risingwave_meta_model_v2::HummockVersionId::try_from( + version_id: Set(risingwave_meta_model::HummockVersionId::try_from( version.id.to_u64(), ) .unwrap()), @@ -447,7 +447,7 @@ impl HummockManager { // Ignore delta which adds no data. if written > 0 { let m = hummock_time_travel_delta::ActiveModel { - version_id: Set(risingwave_meta_model_v2::HummockVersionId::try_from( + version_id: Set(risingwave_meta_model::HummockVersionId::try_from( delta.id.to_u64(), ) .unwrap()), diff --git a/src/meta/src/hummock/model/ext/hummock.rs b/src/meta/src/hummock/model/ext/hummock.rs index b66672a2057cb..9a625670efbc8 100644 --- a/src/meta/src/hummock/model/ext/hummock.rs +++ b/src/meta/src/hummock/model/ext/hummock.rs @@ -14,12 +14,12 @@ use itertools::Itertools; use risingwave_hummock_sdk::version::HummockVersionDelta; -use risingwave_meta_model_v2::compaction_config::CompactionConfig; -use risingwave_meta_model_v2::compaction_status::LevelHandlers; -use risingwave_meta_model_v2::compaction_task::CompactionTask; -use risingwave_meta_model_v2::hummock_version_delta::FullVersionDelta; -use risingwave_meta_model_v2::hummock_version_stats::TableStats; -use risingwave_meta_model_v2::{ +use risingwave_meta_model::compaction_config::CompactionConfig; +use risingwave_meta_model::compaction_status::LevelHandlers; +use risingwave_meta_model::compaction_task::CompactionTask; +use risingwave_meta_model::hummock_version_delta::FullVersionDelta; +use risingwave_meta_model::hummock_version_stats::TableStats; +use risingwave_meta_model::{ compaction_config, compaction_status, compaction_task, hummock_pinned_snapshot, hummock_pinned_version, hummock_version_delta, hummock_version_stats, CompactionGroupId, CompactionTaskId, HummockVersionId, WorkerId, diff --git a/src/meta/src/hummock/test_utils.rs b/src/meta/src/hummock/test_utils.rs index 2fc54c06e3cb8..006b31475461d 100644 --- a/src/meta/src/hummock/test_utils.rs +++ b/src/meta/src/hummock/test_utils.rs @@ -31,7 +31,7 @@ use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionStateTableIn use risingwave_hummock_sdk::{ CompactionGroupId, HummockEpoch, HummockSstableObjectId, LocalSstableInfo, SyncResult, }; -use risingwave_meta_model_v2::WorkerId; +use risingwave_meta_model::WorkerId; use risingwave_pb::common::{HostAddress, WorkerType}; use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::CompactionConfig; diff --git a/src/meta/src/manager/diagnose.rs b/src/meta/src/manager/diagnose.rs index 2022c6ae9764d..de8f983056a6a 100644 --- a/src/meta/src/manager/diagnose.rs +++ b/src/meta/src/manager/diagnose.rs @@ -22,7 +22,7 @@ use prometheus_http_query::response::Data::Vector; use risingwave_common::types::Timestamptz; use risingwave_common::util::StackTraceResponseExt; use risingwave_hummock_sdk::level::Level; -use risingwave_meta_model_v2::table::TableType; +use risingwave_meta_model::table::TableType; use risingwave_pb::common::WorkerType; use risingwave_pb::meta::event_log::Event; use risingwave_pb::meta::EventLog; diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 9e9bbcf41da4d..02a8753cb688a 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -20,8 +20,8 @@ use risingwave_common::config::{CompactionConfig, DefaultParallelism, ObjectStor use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::{bail, system_param}; +use risingwave_meta_model::prelude::Cluster; use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait}; -use risingwave_meta_model_v2::prelude::Cluster; use risingwave_pb::meta::SystemParams; use risingwave_rpc_client::{ FrontendClientPool, FrontendClientPoolRef, StreamClientPool, StreamClientPoolRef, diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index cf506e2da1edc..48dfbfaf55c0a 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -19,7 +19,7 @@ use std::time::Duration; use anyhow::anyhow; use futures::future::{select, Either}; use risingwave_common::catalog::{TableId, TableOption}; -use risingwave_meta_model_v2::{ObjectId, SourceId, WorkerId}; +use risingwave_meta_model::{ObjectId, SourceId, WorkerId}; use risingwave_pb::catalog::{PbSink, PbSource, PbTable}; use risingwave_pb::common::worker_node::{PbResource, State}; use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType}; diff --git a/src/meta/src/manager/mod.rs b/src/meta/src/manager/mod.rs index 399ee27a198b3..b49ce350c5501 100644 --- a/src/meta/src/manager/mod.rs +++ b/src/meta/src/manager/mod.rs @@ -31,7 +31,7 @@ pub use event_log::EventLogManagerRef; pub use idle::*; pub use metadata::*; pub use notification::{LocalNotification, MessageStatus, NotificationManagerRef, *}; -pub use risingwave_meta_model_v2::prelude; +pub use risingwave_meta_model::prelude; use risingwave_pb::catalog::{PbSink, PbSource}; use risingwave_pb::common::PbHostAddress; pub use streaming_job::*; diff --git a/src/meta/src/manager/notification_version.rs b/src/meta/src/manager/notification_version.rs index 42738f207406a..ee4771d792416 100644 --- a/src/meta/src/manager/notification_version.rs +++ b/src/meta/src/manager/notification_version.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_meta_model_v2::catalog_version; -use risingwave_meta_model_v2::catalog_version::VersionCategory; -use risingwave_meta_model_v2::prelude::CatalogVersion; +use risingwave_meta_model::catalog_version; +use risingwave_meta_model::catalog_version::VersionCategory; +use risingwave_meta_model::prelude::CatalogVersion; use sea_orm::ActiveValue::Set; use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, TransactionTrait}; diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 848a46476d928..5137c74b587cd 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -19,7 +19,7 @@ use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::{VirtualNode, WorkerSlotId}; use risingwave_connector::source::SplitImpl; -use risingwave_meta_model_v2::{SourceId, WorkerId}; +use risingwave_meta_model::{SourceId, WorkerId}; use risingwave_pb::common::PbActorLocation; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State}; diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 0cc4e82969c22..995643215d317 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -38,8 +38,8 @@ use risingwave_connector::source::{ UPSTREAM_SOURCE_KEY, }; use risingwave_connector::{dispatch_source_prop, WithOptionsSecResolved}; -use risingwave_meta_model_v2::object::ObjectType; -use risingwave_meta_model_v2::{ +use risingwave_meta_model::object::ObjectType; +use risingwave_meta_model::{ ConnectionId, DatabaseId, FunctionId, IndexId, ObjectId, SchemaId, SecretId, SinkId, SourceId, SubscriptionId, TableId, UserId, ViewId, }; @@ -48,8 +48,8 @@ use risingwave_pb::catalog::connection::PrivateLinkService; use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ - Comment, Connection, CreateType, Database, Function, PbSink, PbSource, PbTable, Schema, Secret, - Sink, Source, Subscription, Table, View, + connection, Comment, Connection, CreateType, Database, Function, PbSink, PbSource, PbTable, + Schema, Secret, Sink, Source, Subscription, Table, View, }; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ @@ -59,6 +59,7 @@ use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::PbFragment; use risingwave_pb::meta::PbTableParallelism; use risingwave_pb::stream_plan::stream_node::NodeBody; +use risingwave_pb::stream_plan::update_mutation::PbMergeUpdate; use risingwave_pb::stream_plan::{ Dispatcher, DispatcherType, FragmentTypeFlag, MergeNode, PbStreamFragmentGraph, StreamFragmentGraph as StreamFragmentGraphProto, @@ -70,15 +71,17 @@ use tracing::log::warn; use tracing::Instrument; use crate::barrier::BarrierManagerRef; +use crate::controller::catalog::ReleaseContext; use crate::controller::cluster::StreamingClusterInfo; use crate::error::{bail_invalid_parameter, bail_unavailable}; use crate::manager::{ DdlType, LocalNotification, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, + IGNORED_NOTIFICATION_VERSION, }; use crate::model::{FragmentId, StreamContext, TableFragments, TableParallelism}; use crate::rpc::cloud_provider::AwsEc2Client; use crate::stream::{ - create_source_worker_handle, ActorGraphBuildResult, ActorGraphBuilder, + create_source_worker_handle, validate_sink, ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption, GlobalStreamManagerRef, ReplaceTableContext, SourceManagerRef, StreamFragmentGraph, }; @@ -304,7 +307,7 @@ impl DdlController { _create_type, affected_table_replace_info, ) => { - ctrl.create_streaming_job_v2( + ctrl.create_streaming_job( stream_job, fragment_graph, affected_table_replace_info, @@ -320,7 +323,7 @@ impl DdlController { fragment_graph, col_index_mapping, }) => { - ctrl.replace_table_v2(streaming_job, fragment_graph, col_index_mapping) + ctrl.replace_table(streaming_job, fragment_graph, col_index_mapping) .await } DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await, @@ -424,7 +427,7 @@ impl DdlController { .await } - /// Shared source is handled in [`Self::create_streaming_job_v2`] + /// Shared source is handled in [`Self::create_streaming_job`] async fn create_source_without_streaming_job( &self, source: Source, @@ -924,6 +927,550 @@ impl DdlController { .push(upstream_fragment_id); } + /// For [`CreateType::Foreground`], the function will only return after backfilling finishes + /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]). + pub async fn create_streaming_job( + &self, + mut streaming_job: StreamingJob, + mut fragment_graph: StreamFragmentGraphProto, + affected_table_replace_info: Option, + ) -> MetaResult { + let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); + self.metadata_manager + .catalog_controller + .create_job_catalog( + &mut streaming_job, + &ctx, + &fragment_graph.parallelism, + fragment_graph.max_parallelism as _, + ) + .await?; + let job_id = streaming_job.id(); + + match &mut streaming_job { + StreamingJob::Table(src, table, job_type) => { + // If we're creating a table with connector, we should additionally fill its ID first. + fill_table_stream_graph_info(src, table, *job_type, &mut fragment_graph); + } + StreamingJob::Source(src) => { + // set the inner source id of source node. + for fragment in fragment_graph.fragments.values_mut() { + visit_fragment(fragment, |node_body| { + if let NodeBody::Source(source_node) = node_body { + source_node.source_inner.as_mut().unwrap().source_id = src.id; + } + }); + } + } + _ => {} + } + + tracing::debug!( + id = job_id, + definition = streaming_job.definition(), + create_type = streaming_job.create_type().as_str_name(), + "starting streaming job", + ); + let _permit = self + .creating_streaming_job_permits + .semaphore + .acquire() + .await + .unwrap(); + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; + + let id = streaming_job.id(); + let name = streaming_job.name(); + let definition = streaming_job.definition(); + let source_id = match &streaming_job { + StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id), + _ => None, + }; + + // create streaming job. + match self + .create_streaming_job_inner( + ctx, + streaming_job, + fragment_graph, + affected_table_replace_info, + ) + .await + { + Ok(version) => Ok(version), + Err(err) => { + tracing::error!(id = job_id, error = %err.as_report(), "failed to create streaming job"); + let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail { + id, + name, + definition, + error: err.as_report().to_string(), + }; + self.env.event_log_manager_ref().add_event_logs(vec![ + risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event), + ]); + let aborted = self + .metadata_manager + .catalog_controller + .try_abort_creating_streaming_job(job_id as _, false) + .await?; + if aborted { + tracing::warn!(id = job_id, "aborted streaming job"); + if let Some(source_id) = source_id { + self.source_manager + .unregister_sources(vec![source_id as SourceId]) + .await; + } + } + Err(err) + } + } + } + + async fn create_streaming_job_inner( + &self, + ctx: StreamContext, + mut streaming_job: StreamingJob, + fragment_graph: StreamFragmentGraphProto, + affected_table_replace_info: Option, + ) -> MetaResult { + let mut fragment_graph = + StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?; + streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); + streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); + + // create internal table catalogs and refill table id. + let internal_tables = fragment_graph.internal_tables().into_values().collect_vec(); + let table_id_map = self + .metadata_manager + .catalog_controller + .create_internal_table_catalog(&streaming_job, internal_tables) + .await?; + fragment_graph.refill_internal_table_ids(table_id_map); + + let affected_table_replace_info = match affected_table_replace_info { + Some(replace_table_info) => { + let ReplaceTableInfo { + mut streaming_job, + fragment_graph, + .. + } = replace_table_info; + + let fragment_graph = + StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?; + streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); + streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); + let streaming_job = streaming_job; + + Some((streaming_job, fragment_graph)) + } + None => None, + }; + + // create fragment and actor catalogs. + tracing::debug!(id = streaming_job.id(), "building streaming job"); + let (ctx, table_fragments) = self + .build_stream_job( + ctx, + streaming_job, + fragment_graph, + affected_table_replace_info, + ) + .await?; + + let streaming_job = &ctx.streaming_job; + + match streaming_job { + StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => { + Self::validate_cdc_table(table, &table_fragments).await?; + } + StreamingJob::Table(Some(source), ..) => { + // Register the source on the connector node. + self.source_manager.register_source(source).await?; + } + StreamingJob::Sink(sink, _) => { + // Validate the sink on the connector node. + validate_sink(sink).await?; + } + StreamingJob::Source(source) => { + // Register the source on the connector node. + self.source_manager.register_source(source).await?; + } + _ => {} + } + + self.metadata_manager + .catalog_controller + .prepare_streaming_job(table_fragments.to_protobuf(), streaming_job, false) + .await?; + + // create streaming jobs. + let stream_job_id = streaming_job.id(); + match (streaming_job.create_type(), &streaming_job) { + (CreateType::Unspecified, _) + | (CreateType::Foreground, _) + // FIXME(kwannoel): Unify background stream's creation path with MV below. + | (CreateType::Background, StreamingJob::Sink(_, _)) => { + let version = self.stream_manager + .create_streaming_job(table_fragments, ctx) + .await?; + Ok(version) + } + (CreateType::Background, _) => { + let ctrl = self.clone(); + let fut = async move { + let _ = ctrl + .stream_manager + .create_streaming_job(table_fragments, ctx) + .await.inspect_err(|err| { + tracing::error!(id = stream_job_id, error = ?err.as_report(), "failed to create background streaming job"); + }); + }; + tokio::spawn(fut); + Ok(IGNORED_NOTIFICATION_VERSION) + } + } + } + + pub async fn drop_object( + &self, + object_type: ObjectType, + object_id: ObjectId, + drop_mode: DropMode, + target_replace_info: Option, + ) -> MetaResult { + let (release_ctx, mut version) = match object_type { + ObjectType::Database => { + self.metadata_manager + .catalog_controller + .drop_database(object_id) + .await? + } + ObjectType::Schema => { + return self + .metadata_manager + .catalog_controller + .drop_schema(object_id, drop_mode) + .await; + } + ObjectType::Function => { + return self + .metadata_manager + .catalog_controller + .drop_function(object_id) + .await; + } + ObjectType::Connection => { + let (version, conn) = self + .metadata_manager + .catalog_controller + .drop_connection(object_id) + .await?; + if let Some(connection::Info::PrivateLinkService(svc)) = &conn.info { + self.delete_vpc_endpoint(svc).await?; + } + return Ok(version); + } + _ => { + self.metadata_manager + .catalog_controller + .drop_relation(object_type, object_id, drop_mode) + .await? + } + }; + + if let Some(replace_table_info) = target_replace_info { + let stream_ctx = + StreamContext::from_protobuf(replace_table_info.fragment_graph.get_ctx().unwrap()); + + let ReplaceTableInfo { + mut streaming_job, + fragment_graph, + .. + } = replace_table_info; + + let sink_id = if let ObjectType::Sink = object_type { + object_id as _ + } else { + panic!("additional replace table event only occurs when dropping sink into table") + }; + + let fragment_graph = + StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?; + streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); + streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); + let streaming_job = streaming_job; + + let table = streaming_job.table().unwrap(); + + tracing::debug!(id = streaming_job.id(), "replacing table for dropped sink"); + let dummy_id = self + .metadata_manager + .catalog_controller + .create_job_catalog_for_replace( + &streaming_job, + &stream_ctx, + table.get_version()?, + &fragment_graph.specified_parallelism(), + fragment_graph.max_parallelism(), + ) + .await? as u32; + + let (ctx, table_fragments) = self + .inject_replace_table_job_for_table_sink( + dummy_id, + &self.metadata_manager, + stream_ctx, + None, + None, + Some(sink_id), + &streaming_job, + fragment_graph, + ) + .await?; + + let result: MetaResult> = try { + let merge_updates = ctx.merge_updates.clone(); + + self.metadata_manager + .catalog_controller + .prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true) + .await?; + + self.stream_manager + .replace_table(table_fragments, ctx) + .await?; + + merge_updates + }; + + version = match result { + Ok(merge_updates) => { + let version = self + .metadata_manager + .catalog_controller + .finish_replace_streaming_job( + dummy_id as _, + streaming_job, + merge_updates, + None, + None, + Some(sink_id), + vec![], + ) + .await?; + Ok(version) + } + Err(err) => { + tracing::error!(id = object_id, error = ?err.as_report(), "failed to replace table"); + let _ = self.metadata_manager + .catalog_controller + .try_abort_replacing_streaming_job(dummy_id as _) + .await + .inspect_err(|err| { + tracing::error!(id = object_id, error = ?err.as_report(), "failed to abort replacing table"); + }); + Err(err) + } + }?; + } + + let ReleaseContext { + streaming_job_ids, + state_table_ids, + source_ids, + connections, + source_fragments, + removed_actors, + removed_fragments, + } = release_ctx; + + // delete vpc endpoints. + for conn in connections { + let _ = self + .delete_vpc_endpoint(&conn.to_protobuf()) + .await + .inspect_err(|err| { + tracing::warn!(err = ?err.as_report(), "failed to delete vpc endpoint"); + }); + } + + // unregister sources. + self.source_manager + .unregister_sources(source_ids.into_iter().map(|id| id as _).collect()) + .await; + + // unregister fragments and actors from source manager. + self.source_manager + .drop_source_fragments( + source_fragments + .into_iter() + .map(|(source_id, fragments)| { + ( + source_id, + fragments.into_iter().map(|id| id as u32).collect(), + ) + }) + .collect(), + removed_actors.iter().map(|id| *id as _).collect(), + ) + .await; + + // drop streaming jobs. + self.stream_manager + .drop_streaming_jobs( + removed_actors.into_iter().map(|id| id as _).collect(), + streaming_job_ids, + state_table_ids, + removed_fragments.iter().map(|id| *id as _).collect(), + ) + .await; + + Ok(version) + } + + /// This is used for `ALTER TABLE ADD/DROP COLUMN`. + pub async fn replace_table( + &self, + mut streaming_job: StreamingJob, + fragment_graph: StreamFragmentGraphProto, + table_col_index_mapping: Option, + ) -> MetaResult { + let job_id = streaming_job.id(); + + let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; + let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); + + // 1. build fragment graph. + let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?; + streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); + streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); + let streaming_job = streaming_job; + + let StreamingJob::Table(_, table, ..) = &streaming_job else { + unreachable!("unexpected job: {streaming_job:?}") + }; + let dummy_id = self + .metadata_manager + .catalog_controller + .create_job_catalog_for_replace( + &streaming_job, + &ctx, + table.get_version()?, + &fragment_graph.specified_parallelism(), + fragment_graph.max_parallelism(), + ) + .await?; + + tracing::debug!(id = streaming_job.id(), "building replace streaming job"); + let mut updated_sink_catalogs = vec![]; + + let result: MetaResult> = try { + let (mut ctx, mut table_fragments) = self + .build_replace_table( + ctx, + &streaming_job, + fragment_graph, + table_col_index_mapping.clone(), + dummy_id as _, + ) + .await?; + + let mut union_fragment_id = None; + + for (fragment_id, fragment) in &mut table_fragments.fragments { + for actor in &mut fragment.actors { + if let Some(node) = &mut actor.nodes { + visit_stream_node(node, |body| { + if let NodeBody::Union(_) = body { + if let Some(union_fragment_id) = union_fragment_id.as_mut() { + // The union fragment should be unique. + assert_eq!(*union_fragment_id, *fragment_id); + } else { + union_fragment_id = Some(*fragment_id); + } + } + }) + }; + } + } + + let target_fragment_id = + union_fragment_id.expect("fragment of placeholder merger not found"); + + let catalogs = self + .metadata_manager + .get_sink_catalog_by_ids(&table.incoming_sinks) + .await?; + + for sink in catalogs { + let sink_id = &sink.id; + + let sink_table_fragments = self + .metadata_manager + .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(*sink_id)) + .await?; + + let sink_fragment = sink_table_fragments.sink_fragment().unwrap(); + + Self::inject_replace_table_plan_for_sink( + Some(*sink_id), + &sink_fragment, + table, + &mut ctx, + &mut table_fragments, + target_fragment_id, + Some(&sink.unique_identity()), + ); + + if sink.original_target_columns.is_empty() { + updated_sink_catalogs.push(sink.id as _); + } + } + + let merge_updates = ctx.merge_updates.clone(); + + self.metadata_manager + .catalog_controller + .prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true) + .await?; + + self.stream_manager + .replace_table(table_fragments, ctx) + .await?; + merge_updates + }; + + match result { + Ok(merge_updates) => { + let version = self + .metadata_manager + .catalog_controller + .finish_replace_streaming_job( + dummy_id, + streaming_job, + merge_updates, + table_col_index_mapping, + None, + None, + updated_sink_catalogs, + ) + .await?; + Ok(version) + } + Err(err) => { + tracing::error!(id = job_id, error = ?err.as_report(), "failed to replace table"); + let _ = self.metadata_manager + .catalog_controller + .try_abort_replacing_streaming_job(dummy_id) + .await.inspect_err(|err| { + tracing::error!(id = job_id, error = ?err.as_report(), "failed to abort replacing table"); + }); + Err(err) + } + } + } + async fn drop_streaming_job( &self, job_id: StreamingJobId, diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs deleted file mode 100644 index 5ab5c0d3ad00a..0000000000000 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ /dev/null @@ -1,580 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use itertools::Itertools; -use risingwave_common::util::column_index_mapping::ColIndexMapping; -use risingwave_common::util::stream_graph_visitor::{visit_fragment, visit_stream_node}; -use risingwave_meta_model_v2::object::ObjectType; -use risingwave_meta_model_v2::{ObjectId, SourceId}; -use risingwave_pb::catalog::{connection, CreateType}; -use risingwave_pb::ddl_service::TableJobType; -use risingwave_pb::stream_plan::stream_node::NodeBody; -use risingwave_pb::stream_plan::update_mutation::PbMergeUpdate; -use risingwave_pb::stream_plan::StreamFragmentGraph as StreamFragmentGraphProto; -use thiserror_ext::AsReport; - -use crate::controller::catalog::ReleaseContext; -use crate::manager::{NotificationVersion, StreamingJob, IGNORED_NOTIFICATION_VERSION}; -use crate::model::StreamContext; -use crate::rpc::ddl_controller::{ - fill_table_stream_graph_info, DdlController, DropMode, ReplaceTableInfo, -}; -use crate::stream::{validate_sink, StreamFragmentGraph}; -use crate::MetaResult; - -impl DdlController { - /// For [`CreateType::Foreground`], the function will only return after backfilling finishes - /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]). - pub async fn create_streaming_job_v2( - &self, - mut streaming_job: StreamingJob, - mut fragment_graph: StreamFragmentGraphProto, - affected_table_replace_info: Option, - ) -> MetaResult { - let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); - self.metadata_manager - .catalog_controller - .create_job_catalog( - &mut streaming_job, - &ctx, - &fragment_graph.parallelism, - fragment_graph.max_parallelism as _, - ) - .await?; - let job_id = streaming_job.id(); - - match &mut streaming_job { - StreamingJob::Table(src, table, job_type) => { - // If we're creating a table with connector, we should additionally fill its ID first. - fill_table_stream_graph_info(src, table, *job_type, &mut fragment_graph); - } - StreamingJob::Source(src) => { - // set the inner source id of source node. - for fragment in fragment_graph.fragments.values_mut() { - visit_fragment(fragment, |node_body| { - if let NodeBody::Source(source_node) = node_body { - source_node.source_inner.as_mut().unwrap().source_id = src.id; - } - }); - } - } - _ => {} - } - - tracing::debug!( - id = job_id, - definition = streaming_job.definition(), - create_type = streaming_job.create_type().as_str_name(), - "starting streaming job", - ); - let _permit = self - .creating_streaming_job_permits - .semaphore - .acquire() - .await - .unwrap(); - let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; - - let id = streaming_job.id(); - let name = streaming_job.name(); - let definition = streaming_job.definition(); - let source_id = match &streaming_job { - StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id), - _ => None, - }; - - // create streaming job. - match self - .create_streaming_job_inner_v2( - ctx, - streaming_job, - fragment_graph, - affected_table_replace_info, - ) - .await - { - Ok(version) => Ok(version), - Err(err) => { - tracing::error!(id = job_id, error = %err.as_report(), "failed to create streaming job"); - let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail { - id, - name, - definition, - error: err.as_report().to_string(), - }; - self.env.event_log_manager_ref().add_event_logs(vec![ - risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event), - ]); - let aborted = self - .metadata_manager - .catalog_controller - .try_abort_creating_streaming_job(job_id as _, false) - .await?; - if aborted { - tracing::warn!(id = job_id, "aborted streaming job"); - if let Some(source_id) = source_id { - self.source_manager - .unregister_sources(vec![source_id as SourceId]) - .await; - } - } - Err(err) - } - } - } - - async fn create_streaming_job_inner_v2( - &self, - ctx: StreamContext, - mut streaming_job: StreamingJob, - fragment_graph: StreamFragmentGraphProto, - affected_table_replace_info: Option, - ) -> MetaResult { - let mut fragment_graph = - StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?; - streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); - streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); - - // create internal table catalogs and refill table id. - let internal_tables = fragment_graph.internal_tables().into_values().collect_vec(); - let table_id_map = self - .metadata_manager - .catalog_controller - .create_internal_table_catalog(&streaming_job, internal_tables) - .await?; - fragment_graph.refill_internal_table_ids(table_id_map); - - let affected_table_replace_info = match affected_table_replace_info { - Some(replace_table_info) => { - let ReplaceTableInfo { - mut streaming_job, - fragment_graph, - .. - } = replace_table_info; - - let fragment_graph = - StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?; - streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); - streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); - let streaming_job = streaming_job; - - Some((streaming_job, fragment_graph)) - } - None => None, - }; - - // create fragment and actor catalogs. - tracing::debug!(id = streaming_job.id(), "building streaming job"); - let (ctx, table_fragments) = self - .build_stream_job( - ctx, - streaming_job, - fragment_graph, - affected_table_replace_info, - ) - .await?; - - let streaming_job = &ctx.streaming_job; - - match streaming_job { - StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => { - Self::validate_cdc_table(table, &table_fragments).await?; - } - StreamingJob::Table(Some(source), ..) => { - // Register the source on the connector node. - self.source_manager.register_source(source).await?; - } - StreamingJob::Sink(sink, _) => { - // Validate the sink on the connector node. - validate_sink(sink).await?; - } - StreamingJob::Source(source) => { - // Register the source on the connector node. - self.source_manager.register_source(source).await?; - } - _ => {} - } - - self.metadata_manager - .catalog_controller - .prepare_streaming_job(table_fragments.to_protobuf(), streaming_job, false) - .await?; - - // create streaming jobs. - let stream_job_id = streaming_job.id(); - match (streaming_job.create_type(), &streaming_job) { - (CreateType::Unspecified, _) - | (CreateType::Foreground, _) - // FIXME(kwannoel): Unify background stream's creation path with MV below. - | (CreateType::Background, StreamingJob::Sink(_, _)) => { - let version = self.stream_manager - .create_streaming_job(table_fragments, ctx) - .await?; - Ok(version) - } - (CreateType::Background, _) => { - let ctrl = self.clone(); - let fut = async move { - let _ = ctrl - .stream_manager - .create_streaming_job(table_fragments, ctx) - .await.inspect_err(|err| { - tracing::error!(id = stream_job_id, error = ?err.as_report(), "failed to create background streaming job"); - }); - }; - tokio::spawn(fut); - Ok(IGNORED_NOTIFICATION_VERSION) - } - } - } - - pub async fn drop_object( - &self, - object_type: ObjectType, - object_id: ObjectId, - drop_mode: DropMode, - target_replace_info: Option, - ) -> MetaResult { - let (release_ctx, mut version) = match object_type { - ObjectType::Database => { - self.metadata_manager - .catalog_controller - .drop_database(object_id) - .await? - } - ObjectType::Schema => { - return self - .metadata_manager - .catalog_controller - .drop_schema(object_id, drop_mode) - .await; - } - ObjectType::Function => { - return self - .metadata_manager - .catalog_controller - .drop_function(object_id) - .await; - } - ObjectType::Connection => { - let (version, conn) = self - .metadata_manager - .catalog_controller - .drop_connection(object_id) - .await?; - if let Some(connection::Info::PrivateLinkService(svc)) = &conn.info { - self.delete_vpc_endpoint(svc).await?; - } - return Ok(version); - } - _ => { - self.metadata_manager - .catalog_controller - .drop_relation(object_type, object_id, drop_mode) - .await? - } - }; - - if let Some(replace_table_info) = target_replace_info { - let stream_ctx = - StreamContext::from_protobuf(replace_table_info.fragment_graph.get_ctx().unwrap()); - - let ReplaceTableInfo { - mut streaming_job, - fragment_graph, - .. - } = replace_table_info; - - let sink_id = if let ObjectType::Sink = object_type { - object_id as _ - } else { - panic!("additional replace table event only occurs when dropping sink into table") - }; - - let fragment_graph = - StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?; - streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); - streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); - let streaming_job = streaming_job; - - let table = streaming_job.table().unwrap(); - - tracing::debug!(id = streaming_job.id(), "replacing table for dropped sink"); - let dummy_id = self - .metadata_manager - .catalog_controller - .create_job_catalog_for_replace( - &streaming_job, - &stream_ctx, - table.get_version()?, - &fragment_graph.specified_parallelism(), - fragment_graph.max_parallelism(), - ) - .await? as u32; - - let (ctx, table_fragments) = self - .inject_replace_table_job_for_table_sink( - dummy_id, - &self.metadata_manager, - stream_ctx, - None, - None, - Some(sink_id), - &streaming_job, - fragment_graph, - ) - .await?; - - let result: MetaResult> = try { - let merge_updates = ctx.merge_updates.clone(); - - self.metadata_manager - .catalog_controller - .prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true) - .await?; - - self.stream_manager - .replace_table(table_fragments, ctx) - .await?; - - merge_updates - }; - - version = match result { - Ok(merge_updates) => { - let version = self - .metadata_manager - .catalog_controller - .finish_replace_streaming_job( - dummy_id as _, - streaming_job, - merge_updates, - None, - None, - Some(sink_id), - vec![], - ) - .await?; - Ok(version) - } - Err(err) => { - tracing::error!(id = object_id, error = ?err.as_report(), "failed to replace table"); - let _ = self.metadata_manager - .catalog_controller - .try_abort_replacing_streaming_job(dummy_id as _) - .await - .inspect_err(|err| { - tracing::error!(id = object_id, error = ?err.as_report(), "failed to abort replacing table"); - }); - Err(err) - } - }?; - } - - let ReleaseContext { - streaming_job_ids, - state_table_ids, - source_ids, - connections, - source_fragments, - removed_actors, - removed_fragments, - } = release_ctx; - - // delete vpc endpoints. - for conn in connections { - let _ = self - .delete_vpc_endpoint(&conn.to_protobuf()) - .await - .inspect_err(|err| { - tracing::warn!(err = ?err.as_report(), "failed to delete vpc endpoint"); - }); - } - - // unregister sources. - self.source_manager - .unregister_sources(source_ids.into_iter().map(|id| id as _).collect()) - .await; - - // unregister fragments and actors from source manager. - self.source_manager - .drop_source_fragments_v2( - source_fragments - .into_iter() - .map(|(source_id, fragments)| { - ( - source_id, - fragments.into_iter().map(|id| id as u32).collect(), - ) - }) - .collect(), - removed_actors.iter().map(|id| *id as _).collect(), - ) - .await; - - // drop streaming jobs. - self.stream_manager - .drop_streaming_jobs( - removed_actors.into_iter().map(|id| id as _).collect(), - streaming_job_ids, - state_table_ids, - removed_fragments.iter().map(|id| *id as _).collect(), - ) - .await; - - Ok(version) - } - - /// This is used for `ALTER TABLE ADD/DROP COLUMN`. - pub async fn replace_table_v2( - &self, - mut streaming_job: StreamingJob, - fragment_graph: StreamFragmentGraphProto, - table_col_index_mapping: Option, - ) -> MetaResult { - let job_id = streaming_job.id(); - - let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; - let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); - - // 1. build fragment graph. - let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?; - streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); - streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); - let streaming_job = streaming_job; - - let StreamingJob::Table(_, table, ..) = &streaming_job else { - unreachable!("unexpected job: {streaming_job:?}") - }; - let dummy_id = self - .metadata_manager - .catalog_controller - .create_job_catalog_for_replace( - &streaming_job, - &ctx, - table.get_version()?, - &fragment_graph.specified_parallelism(), - fragment_graph.max_parallelism(), - ) - .await?; - - tracing::debug!(id = streaming_job.id(), "building replace streaming job"); - let mut updated_sink_catalogs = vec![]; - - let result: MetaResult> = try { - let (mut ctx, mut table_fragments) = self - .build_replace_table( - ctx, - &streaming_job, - fragment_graph, - table_col_index_mapping.clone(), - dummy_id as _, - ) - .await?; - - let mut union_fragment_id = None; - - for (fragment_id, fragment) in &mut table_fragments.fragments { - for actor in &mut fragment.actors { - if let Some(node) = &mut actor.nodes { - visit_stream_node(node, |body| { - if let NodeBody::Union(_) = body { - if let Some(union_fragment_id) = union_fragment_id.as_mut() { - // The union fragment should be unique. - assert_eq!(*union_fragment_id, *fragment_id); - } else { - union_fragment_id = Some(*fragment_id); - } - } - }) - }; - } - } - - let target_fragment_id = - union_fragment_id.expect("fragment of placeholder merger not found"); - - let catalogs = self - .metadata_manager - .get_sink_catalog_by_ids(&table.incoming_sinks) - .await?; - - for sink in catalogs { - let sink_id = &sink.id; - - let sink_table_fragments = self - .metadata_manager - .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(*sink_id)) - .await?; - - let sink_fragment = sink_table_fragments.sink_fragment().unwrap(); - - Self::inject_replace_table_plan_for_sink( - Some(*sink_id), - &sink_fragment, - table, - &mut ctx, - &mut table_fragments, - target_fragment_id, - Some(&sink.unique_identity()), - ); - - if sink.original_target_columns.is_empty() { - updated_sink_catalogs.push(sink.id as _); - } - } - - let merge_updates = ctx.merge_updates.clone(); - - self.metadata_manager - .catalog_controller - .prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true) - .await?; - - self.stream_manager - .replace_table(table_fragments, ctx) - .await?; - merge_updates - }; - - match result { - Ok(merge_updates) => { - let version = self - .metadata_manager - .catalog_controller - .finish_replace_streaming_job( - dummy_id, - streaming_job, - merge_updates, - table_col_index_mapping, - None, - None, - updated_sink_catalogs, - ) - .await?; - Ok(version) - } - Err(err) => { - tracing::error!(id = job_id, error = ?err.as_report(), "failed to replace table"); - let _ = self.metadata_manager - .catalog_controller - .try_abort_replacing_streaming_job(dummy_id) - .await.inspect_err(|err| { - tracing::error!(id = job_id, error = ?err.as_report(), "failed to abort replacing table"); - }); - Err(err) - } - } - } -} diff --git a/src/meta/src/rpc/metrics.rs b/src/meta/src/rpc/metrics.rs index e9fa97b1f87a0..8e0d639cf0ded 100644 --- a/src/meta/src/rpc/metrics.rs +++ b/src/meta/src/rpc/metrics.rs @@ -34,7 +34,7 @@ use risingwave_common::{ register_guarded_int_gauge_vec_with_registry, }; use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics; -use risingwave_meta_model_v2::WorkerId; +use risingwave_meta_model::WorkerId; use risingwave_object_store::object::object_metrics::{ ObjectStoreMetrics, GLOBAL_OBJECT_STORE_METRICS, }; @@ -886,7 +886,7 @@ pub fn start_worker_info_monitor( (join_handle, shutdown_tx) } -pub async fn refresh_fragment_info_metrics_v2( +pub async fn refresh_fragment_info_metrics( catalog_controller: &CatalogControllerRef, cluster_controller: &ClusterControllerRef, hummock_manager: &HummockManagerRef, @@ -1028,7 +1028,7 @@ pub fn start_fragment_info_monitor( } } - refresh_fragment_info_metrics_v2( + refresh_fragment_info_metrics( &metadata_manager.catalog_controller, &metadata_manager.cluster_controller, &hummock_manager, diff --git a/src/meta/src/rpc/mod.rs b/src/meta/src/rpc/mod.rs index 09fbf7e12f48e..8b256d1b2145e 100644 --- a/src/meta/src/rpc/mod.rs +++ b/src/meta/src/rpc/mod.rs @@ -14,7 +14,6 @@ pub mod cloud_provider; pub mod ddl_controller; -mod ddl_controller_v2; pub mod election; pub mod intercept; pub mod metrics; diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index aba20f1e30642..2dbd6364d4e2c 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -30,7 +30,7 @@ use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableId; use risingwave_common::hash::ActorMapping; use risingwave_common::util::iter_util::ZipEqDebug; -use risingwave_meta_model_v2::{actor, fragment, ObjectId, StreamingParallelism, WorkerId}; +use risingwave_meta_model::{actor, fragment, ObjectId, StreamingParallelism, WorkerId}; use risingwave_pb::common::{PbActorLocation, WorkerNode, WorkerType}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; @@ -498,7 +498,7 @@ impl ScaleController { fragment_state: &mut HashMap, fragment_to_table: &mut HashMap, mgr: &MetadataManager, - fragment_ids: Vec, + fragment_ids: Vec, ) -> Result<(), MetaError> { let RescheduleWorkingSet { fragments, @@ -513,7 +513,7 @@ impl ScaleController { .await?; let mut fragment_actors: HashMap< - risingwave_meta_model_v2::FragmentId, + risingwave_meta_model::FragmentId, Vec, > = HashMap::new(); @@ -1843,7 +1843,7 @@ impl ScaleController { // index for fragment_id -> [actor_id] let mut fragment_actor_id_map = HashMap::new(); - async fn build_index_v2( + async fn build_index( no_shuffle_source_fragment_ids: &mut HashSet, no_shuffle_target_fragment_ids: &mut HashSet, fragment_distribution_map: &mut HashMap< @@ -1870,7 +1870,7 @@ impl ScaleController { for (fragment_id, downstreams) in fragment_downstreams { for (downstream_fragment_id, dispatcher_type) in downstreams { - if let risingwave_meta_model_v2::actor_dispatcher::DispatcherType::NoShuffle = + if let risingwave_meta_model::actor_dispatcher::DispatcherType::NoShuffle = dispatcher_type { no_shuffle_source_fragment_ids.insert(fragment_id as FragmentId); @@ -1910,7 +1910,7 @@ impl ScaleController { .map(|id| *id as ObjectId) .collect(); - build_index_v2( + build_index( &mut no_shuffle_source_fragment_ids, &mut no_shuffle_target_fragment_ids, &mut fragment_distribution_map, diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 6f6a9950bb363..c5bcc0c179ba3 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -29,7 +29,7 @@ use risingwave_connector::source::{ SplitEnumerator, SplitId, SplitImpl, SplitMetaData, }; use risingwave_connector::{dispatch_source_prop, WithOptionsSecResolved}; -use risingwave_meta_model_v2::SourceId; +use risingwave_meta_model::SourceId; use risingwave_pb::catalog::Source; use risingwave_pb::source::{ConnectorSplit, ConnectorSplits}; use risingwave_pb::stream_plan::Dispatcher; @@ -752,7 +752,7 @@ impl SourceManager { }) } - pub async fn drop_source_fragments_v2( + pub async fn drop_source_fragments( &self, source_fragments: HashMap>, removed_actors: HashSet, @@ -762,7 +762,7 @@ impl SourceManager { } /// For dropping MV. - pub async fn drop_source_fragments(&self, table_fragments: &[TableFragments]) { + pub async fn drop_source_fragments_vec(&self, table_fragments: &[TableFragments]) { let mut core = self.core.lock().await; // Extract the fragments that include source operators. diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index 099571bab2cea..3446a2661d962 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -23,7 +23,7 @@ use risingwave_common::bitmap::Bitmap; use risingwave_common::hash::{ActorId, ActorMapping, WorkerSlotId}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::stream_graph_visitor::visit_tables; -use risingwave_meta_model_v2::WorkerId; +use risingwave_meta_model::WorkerId; use risingwave_pb::meta::table_fragments::Fragment; use risingwave_pb::plan_common::ExprContext; use risingwave_pb::stream_plan::stream_node::NodeBody; diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index bb6a8f31c0fc6..a697716f05929 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -28,7 +28,7 @@ use risingwave_common::hash::VnodeCount; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::stream_graph_visitor; use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont; -use risingwave_meta_model_v2::WorkerId; +use risingwave_meta_model::WorkerId; use risingwave_pb::catalog::Table; use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::meta::table_fragments::Fragment; diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index 23cb4489f8bce..5d465b19d195d 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -27,7 +27,7 @@ use itertools::Itertools; use risingwave_common::bitmap::Bitmap; use risingwave_common::hash::{ActorMapping, VirtualNode, WorkerSlotId, WorkerSlotMapping}; use risingwave_common::{bail, hash}; -use risingwave_meta_model_v2::WorkerId; +use risingwave_meta_model::WorkerId; use risingwave_pb::common::{ActorInfo, WorkerNode}; use risingwave_pb::meta::table_fragments::fragment::{ FragmentDistributionType, PbFragmentDistributionType, diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 218fc09b5189a..21a642f1be1c7 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -19,7 +19,7 @@ use futures::future::join_all; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::catalog::TableId; -use risingwave_meta_model_v2::{ObjectId, WorkerId}; +use risingwave_meta_model::{ObjectId, WorkerId}; use risingwave_pb::catalog::{CreateType, Subscription, Table}; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use risingwave_pb::stream_plan::Dispatcher; @@ -489,7 +489,7 @@ impl GlobalStreamManager { &self, removed_actors: Vec, streaming_job_ids: Vec, - state_table_ids: Vec, + state_table_ids: Vec, fragment_ids: HashSet, ) { if !removed_actors.is_empty() diff --git a/src/prost/build.rs b/src/prost/build.rs index 0e1b2ea5c1db6..c4744e14c1b60 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -170,7 +170,7 @@ fn main() -> Result<(), Box> { .type_attribute("common.ColumnOrder", "#[derive(Eq, Hash)]") .type_attribute("common.OrderType", "#[derive(Eq, Hash)]") .type_attribute("common.Buffer", "#[derive(Eq)]") - // Eq is required to derive `FromJsonQueryResult` for models in risingwave_meta_model_v2. + // Eq is required to derive `FromJsonQueryResult` for models in risingwave_meta_model. .type_attribute("hummock.TableStats", "#[derive(Eq)]") .type_attribute("hummock.SstableInfo", "#[derive(Eq)]") .type_attribute("hummock.KeyRange", "#[derive(Eq)]") diff --git a/src/storage/backup/Cargo.toml b/src/storage/backup/Cargo.toml index ef994a38c4a15..3fd7d7ecde5ce 100644 --- a/src/storage/backup/Cargo.toml +++ b/src/storage/backup/Cargo.toml @@ -23,7 +23,7 @@ parking_lot = { workspace = true } prost = { workspace = true } risingwave_common = { workspace = true } risingwave_hummock_sdk = { workspace = true } -risingwave_meta_model_v2 = { workspace = true } +risingwave_meta_model = { workspace = true } risingwave_object_store = { workspace = true } risingwave_pb = { workspace = true } serde = { version = "1", features = ["derive"] } diff --git a/src/storage/backup/src/meta_snapshot_v2.rs b/src/storage/backup/src/meta_snapshot_v2.rs index 6afb90b1258dd..4fea523916fff 100644 --- a/src/storage/backup/src/meta_snapshot_v2.rs +++ b/src/storage/backup/src/meta_snapshot_v2.rs @@ -33,35 +33,35 @@ impl From for BackupError { macro_rules! for_all_metadata_models_v2 { ($macro:ident) => { $macro! { - {seaql_migrations, risingwave_meta_model_v2::serde_seaql_migration}, - {version_stats, risingwave_meta_model_v2::hummock_version_stats}, - {compaction_configs, risingwave_meta_model_v2::compaction_config}, - {actors, risingwave_meta_model_v2::actor}, - {clusters, risingwave_meta_model_v2::cluster}, - {actor_dispatchers, risingwave_meta_model_v2::actor_dispatcher}, - {catalog_versions, risingwave_meta_model_v2::catalog_version}, - {connections, risingwave_meta_model_v2::connection}, - {databases, risingwave_meta_model_v2::database}, - {fragments, risingwave_meta_model_v2::fragment}, - {functions, risingwave_meta_model_v2::function}, - {indexes, risingwave_meta_model_v2::index}, - {objects, risingwave_meta_model_v2::object}, - {object_dependencies, risingwave_meta_model_v2::object_dependency}, - {schemas, risingwave_meta_model_v2::schema}, - {sinks, risingwave_meta_model_v2::sink}, - {sources, risingwave_meta_model_v2::source}, - {streaming_jobs, risingwave_meta_model_v2::streaming_job}, - {subscriptions, risingwave_meta_model_v2::subscription}, - {system_parameters, risingwave_meta_model_v2::system_parameter}, - {tables, risingwave_meta_model_v2::table}, - {users, risingwave_meta_model_v2::user}, - {user_privileges, risingwave_meta_model_v2::user_privilege}, - {views, risingwave_meta_model_v2::view}, - {workers, risingwave_meta_model_v2::worker}, - {worker_properties, risingwave_meta_model_v2::worker_property}, - {hummock_sequences, risingwave_meta_model_v2::hummock_sequence}, - {session_parameters, risingwave_meta_model_v2::session_parameter}, - {secrets, risingwave_meta_model_v2::secret} + {seaql_migrations, risingwave_meta_model::serde_seaql_migration}, + {version_stats, risingwave_meta_model::hummock_version_stats}, + {compaction_configs, risingwave_meta_model::compaction_config}, + {actors, risingwave_meta_model::actor}, + {clusters, risingwave_meta_model::cluster}, + {actor_dispatchers, risingwave_meta_model::actor_dispatcher}, + {catalog_versions, risingwave_meta_model::catalog_version}, + {connections, risingwave_meta_model::connection}, + {databases, risingwave_meta_model::database}, + {fragments, risingwave_meta_model::fragment}, + {functions, risingwave_meta_model::function}, + {indexes, risingwave_meta_model::index}, + {objects, risingwave_meta_model::object}, + {object_dependencies, risingwave_meta_model::object_dependency}, + {schemas, risingwave_meta_model::schema}, + {sinks, risingwave_meta_model::sink}, + {sources, risingwave_meta_model::source}, + {streaming_jobs, risingwave_meta_model::streaming_job}, + {subscriptions, risingwave_meta_model::subscription}, + {system_parameters, risingwave_meta_model::system_parameter}, + {tables, risingwave_meta_model::table}, + {users, risingwave_meta_model::user}, + {user_privileges, risingwave_meta_model::user_privilege}, + {views, risingwave_meta_model::view}, + {workers, risingwave_meta_model::worker}, + {worker_properties, risingwave_meta_model::worker_property}, + {hummock_sequences, risingwave_meta_model::hummock_sequence}, + {session_parameters, risingwave_meta_model::session_parameter}, + {secrets, risingwave_meta_model::secret} } }; }