From 709894d8a44025bdba7835e9d9c352a2a607ba84 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Fri, 20 Dec 2024 16:13:41 +0800 Subject: [PATCH] fix: Fix the panic issue with the parallelism() call (#19849) Signed-off-by: Shanicky Chen --- .../src/executor/join/local_lookup_join.rs | 2 +- src/batch/src/worker_manager/worker_node_manager.rs | 5 ++++- src/common/src/vnode_mapping/vnode_placement.rs | 2 +- .../system_catalog/rw_catalog/rw_worker_nodes.rs | 8 ++------ src/frontend/src/handler/show.rs | 4 ++-- src/meta/service/src/cluster_limit_service.rs | 2 +- src/meta/src/barrier/context/recovery.rs | 13 ++++++++----- src/meta/src/controller/cluster.rs | 2 +- src/meta/src/manager/diagnose.rs | 2 +- src/meta/src/stream/scale.rs | 4 ++-- src/meta/src/stream/stream_graph/schedule.rs | 2 +- src/meta/src/stream/stream_manager.rs | 5 ++++- src/prost/src/lib.rs | 10 +++++++++- src/tests/simulation/src/ctl_ext.rs | 2 +- .../tests/integration_tests/scale/schedulability.rs | 2 +- 15 files changed, 39 insertions(+), 26 deletions(-) diff --git a/src/batch/executors/src/executor/join/local_lookup_join.rs b/src/batch/executors/src/executor/join/local_lookup_join.rs index a8796c0b034fb..051c75b48b85d 100644 --- a/src/batch/executors/src/executor/join/local_lookup_join.rs +++ b/src/batch/executors/src/executor/join/local_lookup_join.rs @@ -404,7 +404,7 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder { let worker_slot_mapping: HashMap = worker_nodes .iter() .flat_map(|worker| { - (0..(worker.parallelism())) + (0..(worker.compute_node_parallelism())) .map(|i| (WorkerSlotId::new(worker.id, i), worker.clone())) }) .collect(); diff --git a/src/batch/src/worker_manager/worker_node_manager.rs b/src/batch/src/worker_manager/worker_node_manager.rs index ed8c30de61141..6895266745282 100644 --- a/src/batch/src/worker_manager/worker_node_manager.rs +++ b/src/batch/src/worker_manager/worker_node_manager.rs @@ -336,7 +336,10 @@ impl WorkerNodeSelector { } else { self.apply_worker_node_mask(self.manager.list_serving_worker_nodes()) }; - worker_nodes.iter().map(|node| node.parallelism()).sum() + worker_nodes + .iter() + .map(|node| node.compute_node_parallelism()) + .sum() } pub fn fragment_mapping(&self, fragment_id: FragmentId) -> Result { diff --git a/src/common/src/vnode_mapping/vnode_placement.rs b/src/common/src/vnode_mapping/vnode_placement.rs index 33e544693d0ac..f09f38fd402d3 100644 --- a/src/common/src/vnode_mapping/vnode_placement.rs +++ b/src/common/src/vnode_mapping/vnode_placement.rs @@ -42,7 +42,7 @@ pub fn place_vnode( .iter() .filter(|w| w.property.as_ref().map_or(false, |p| p.is_serving)) .sorted_by_key(|w| w.id) - .map(|w| (0..w.parallelism()).map(|idx| WorkerSlotId::new(w.id, idx))) + .map(|w| (0..w.compute_node_parallelism()).map(|idx| WorkerSlotId::new(w.id, idx))) .collect(); // Set serving parallelism to the minimum of total number of worker slots, specified diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs index cbe584bee53a4..89f275d939776 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs @@ -29,7 +29,7 @@ struct RwWorkerNode { port: Option, r#type: String, state: String, - parallelism: i32, + parallelism: Option, is_streaming: Option, is_serving: Option, is_unschedulable: Option, @@ -59,11 +59,7 @@ async fn read_rw_worker_nodes_info(reader: &SysCatalogReaderImpl) -> Result, is_streaming: Option, is_serving: Option, is_unschedulable: Option, @@ -483,7 +483,7 @@ pub async fn handle_show_object( addr: addr.to_string(), r#type: worker.get_type().unwrap().as_str_name().into(), state: worker.get_state().unwrap().as_str_name().to_owned(), - parallelism: worker.parallelism() as _, + parallelism: worker.parallelism().map(|parallelism| parallelism as i32), is_streaming: property.map(|p| p.is_streaming), is_serving: property.map(|p| p.is_serving), is_unschedulable: property.map(|p| p.is_unschedulable), diff --git a/src/meta/service/src/cluster_limit_service.rs b/src/meta/service/src/cluster_limit_service.rs index 83aae536e7e56..4a6d58be7bd90 100644 --- a/src/meta/service/src/cluster_limit_service.rs +++ b/src/meta/service/src/cluster_limit_service.rs @@ -51,7 +51,7 @@ impl ClusterLimitServiceImpl { .list_worker_node(Some(WorkerType::ComputeNode), Some(State::Running)) .await? .into_iter() - .map(|e| (e.id as _, e.parallelism())) + .map(|e| (e.id as _, e.compute_node_parallelism())) .collect(); let worker_actor_count: HashMap = self .metadata_manager diff --git a/src/meta/src/barrier/context/recovery.rs b/src/meta/src/barrier/context/recovery.rs index 03575cff699e3..d2f974381fc90 100644 --- a/src/meta/src/barrier/context/recovery.rs +++ b/src/meta/src/barrier/context/recovery.rs @@ -410,7 +410,9 @@ impl GlobalBarrierWorkerContextImpl { let active_worker_slots: HashSet<_> = active_nodes .current() .values() - .flat_map(|node| (0..node.parallelism()).map(|idx| WorkerSlotId::new(node.id, idx))) + .flat_map(|node| { + (0..node.compute_node_parallelism()).map(|idx| WorkerSlotId::new(node.id, idx)) + }) .collect(); let expired_worker_slots: BTreeSet<_> = all_inuse_worker_slots @@ -439,7 +441,8 @@ impl GlobalBarrierWorkerContextImpl { .current() .values() .flat_map(|worker| { - (0..worker.parallelism()).map(move |i| WorkerSlotId::new(worker.id, i as _)) + (0..worker.compute_node_parallelism()) + .map(move |i| WorkerSlotId::new(worker.id, i as _)) }) .collect_vec(); @@ -457,7 +460,7 @@ impl GlobalBarrierWorkerContextImpl { .current() .values() .flat_map(|worker| { - (0..worker.parallelism() * factor) + (0..worker.compute_node_parallelism() * factor) .map(move |i| WorkerSlotId::new(worker.id, i as _)) }) .collect_vec(); @@ -513,7 +516,7 @@ impl GlobalBarrierWorkerContextImpl { let current_nodes = active_nodes .current() .values() - .map(|node| (node.id, &node.host, node.parallelism())) + .map(|node| (node.id, &node.host, node.compute_node_parallelism())) .collect_vec(); warn!( current_nodes = ?current_nodes, @@ -554,7 +557,7 @@ impl GlobalBarrierWorkerContextImpl { let available_parallelism = active_nodes .current() .values() - .map(|worker_node| worker_node.parallelism()) + .map(|worker_node| worker_node.compute_node_parallelism()) .sum(); let table_parallelisms: HashMap<_, _> = { diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index 3fbfcb19c1c98..e2a72e99dbcac 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -400,7 +400,7 @@ impl StreamingClusterInfo { pub fn parallelism(&self) -> usize { self.worker_nodes .values() - .map(|worker| worker.parallelism()) + .map(|worker| worker.compute_node_parallelism()) .sum() } } diff --git a/src/meta/src/manager/diagnose.rs b/src/meta/src/manager/diagnose.rs index 062483374aa31..2322ab1f12082 100644 --- a/src/meta/src/manager/diagnose.rs +++ b/src/meta/src/manager/diagnose.rs @@ -171,7 +171,7 @@ impl DiagnoseCommand { &mut row, worker_node.get_state().ok().map(|s| s.as_str_name()), ); - row.add_cell(worker_node.parallelism().into()); + try_add_cell(&mut row, worker_node.parallelism()); try_add_cell( &mut row, worker_node.property.as_ref().map(|p| p.is_streaming), diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 578ee101d0f27..f06d31b690815 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -1818,7 +1818,7 @@ impl ScaleController { let schedulable_worker_slots = workers .values() - .map(|worker| (worker.id as WorkerId, worker.parallelism())) + .map(|worker| (worker.id as WorkerId, worker.compute_node_parallelism())) .collect::>(); // index for no shuffle relation @@ -2579,7 +2579,7 @@ impl GlobalStreamManager { match prev_worker { // todo, add label checking in further changes - Some(prev_worker) if prev_worker.parallelism() != worker.parallelism() => { + Some(prev_worker) if prev_worker.compute_node_parallelism() != worker.compute_node_parallelism() => { tracing::info!(worker = worker.id, "worker parallelism changed"); should_trigger = true; } diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index 0cae1abf24654..ff46a7a8ca526 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -223,7 +223,7 @@ impl Scheduler { let slots = workers .iter() - .map(|(worker_id, worker)| (*worker_id as WorkerId, worker.parallelism())) + .map(|(worker_id, worker)| (*worker_id as WorkerId, worker.compute_node_parallelism())) .collect(); let parallelism = default_parallelism.get(); diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 957531140eac9..3b8886fb9e3b0 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -639,7 +639,10 @@ impl GlobalStreamManager { .collect::>(); // Check if the provided parallelism is valid. - let available_parallelism = worker_nodes.iter().map(|w| w.parallelism()).sum::(); + let available_parallelism = worker_nodes + .iter() + .map(|w| w.compute_node_parallelism()) + .sum::(); let max_parallelism = self .metadata_manager .get_job_max_parallelism(table_id) diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index 0ebd4d5f4a096..4c57d4f0e472e 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -221,13 +221,21 @@ impl stream_plan::MaterializeNode { // Encapsulating the use of parallelism. impl common::WorkerNode { - pub fn parallelism(&self) -> usize { + pub fn compute_node_parallelism(&self) -> usize { assert_eq!(self.r#type(), WorkerType::ComputeNode); self.property .as_ref() .expect("property should be exist") .parallelism as usize } + + pub fn parallelism(&self) -> Option { + if WorkerType::ComputeNode == self.r#type() { + Some(self.compute_node_parallelism()) + } else { + None + } + } } impl stream_plan::SourceNode { diff --git a/src/tests/simulation/src/ctl_ext.rs b/src/tests/simulation/src/ctl_ext.rs index acb49a0df4d09..4284b21263247 100644 --- a/src/tests/simulation/src/ctl_ext.rs +++ b/src/tests/simulation/src/ctl_ext.rs @@ -227,7 +227,7 @@ impl Fragment { self.r .worker_nodes .iter() - .map(|w| (w.id, w.parallelism())) + .map(|w| (w.id, w.compute_node_parallelism())) .collect() } diff --git a/src/tests/simulation/tests/integration_tests/scale/schedulability.rs b/src/tests/simulation/tests/integration_tests/scale/schedulability.rs index 77275a39df70a..75535513e0a6f 100644 --- a/src/tests/simulation/tests/integration_tests/scale/schedulability.rs +++ b/src/tests/simulation/tests/integration_tests/scale/schedulability.rs @@ -39,7 +39,7 @@ async fn test_cordon_normal() -> Result<()> { let rest_worker_slots: HashSet<_> = workers .iter() .flat_map(|worker| { - (0..worker.parallelism()).map(|idx| WorkerSlotId::new(worker.id, idx as _)) + (0..worker.compute_node_parallelism()).map(|idx| WorkerSlotId::new(worker.id, idx as _)) }) .collect();