diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index a79b890cade20..195612bfbd8be 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -720,11 +720,14 @@ impl CatalogController { pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult { let inner = self.inner.read().await; - let job = StreamingJob::find_by_id(job_id) + let max_parallelism: i32 = StreamingJob::find_by_id(job_id) + .select_only() + .column(streaming_job::Column::MaxParallelism) + .into_tuple() .one(&inner.db) .await? .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?; - Ok(job.max_parallelism as usize) + Ok(max_parallelism as usize) } /// Get all actor ids in the target streaming jobs. diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 516adbf8b6c90..bfcb7830ec6e9 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -700,12 +700,33 @@ impl CatalogController { )); } + // 3. check parallelism. + let original_max_parallelism: i32 = StreamingJobModel::find_by_id(id as ObjectId) + .select_only() + .column(streaming_job::Column::MaxParallelism) + .into_tuple() + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), id))?; + + if original_max_parallelism != max_parallelism as i32 { + // We already override the max parallelism in `StreamFragmentGraph` before entering this function. + // This should not happen in normal cases. + bail!( + "cannot use a different max parallelism \ + when altering or creating/dropping a sink into an existing table, \ + original: {}, new: {}", + original_max_parallelism, + max_parallelism + ); + } + let parallelism = match specified_parallelism { None => StreamingParallelism::Adaptive, Some(n) => StreamingParallelism::Fixed(n.get() as _), }; - // 3. create streaming object for new replace table. + // 4. create streaming object for new replace table. let obj_id = Self::create_streaming_job_obj( &txn, ObjectType::Table, @@ -719,7 +740,7 @@ impl CatalogController { ) .await?; - // 4. record dependency for new replace table. + // 5. record dependency for new replace table. ObjectDependency::insert(object_dependency::ActiveModel { oid: Set(id as _), used_by: Set(obj_id as _), diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index baffdbd72fba9..cec9efa323a46 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1059,6 +1059,16 @@ impl DdlController { .. } = replace_table_info; + // Ensure the max parallelism unchanged before replacing table. + let original_max_parallelism = self + .metadata_manager + .get_job_max_parallelism(streaming_job.id().into()) + .await?; + let fragment_graph = PbStreamFragmentGraph { + max_parallelism: original_max_parallelism as _, + ..fragment_graph + }; + let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?; streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); @@ -1198,6 +1208,16 @@ impl DdlController { panic!("additional replace table event only occurs when dropping sink into table") }; + // Ensure the max parallelism unchanged before replacing table. + let original_max_parallelism = self + .metadata_manager + .get_job_max_parallelism(streaming_job.id().into()) + .await?; + let fragment_graph = PbStreamFragmentGraph { + max_parallelism: original_max_parallelism as _, + ..fragment_graph + }; + let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?; streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); @@ -1344,6 +1364,16 @@ impl DdlController { let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); + // Ensure the max parallelism unchanged before replacing table. + let original_max_parallelism = self + .metadata_manager + .get_job_max_parallelism(streaming_job.id().into()) + .await?; + let fragment_graph = PbStreamFragmentGraph { + max_parallelism: original_max_parallelism as _, + ..fragment_graph + }; + // 1. build fragment graph. let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?; streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id());