From 71753f11223cd0b4d67ba6baf7ac636fa246e7fc Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Fri, 6 Sep 2024 17:02:32 +0800 Subject: [PATCH] fix(meta): correctly update serving parallelism mapping (#18439) --- src/meta/src/serving/mod.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/meta/src/serving/mod.rs b/src/meta/src/serving/mod.rs index 69e17a978212e..86277c0f2a501 100644 --- a/src/meta/src/serving/mod.rs +++ b/src/meta/src/serving/mod.rs @@ -192,7 +192,16 @@ pub async fn start_serving_vnode_mapping_worker( continue; } let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await; - let (upserted, failed) = serving_vnode_mapping.upsert(streaming_parallelisms, &workers); + let filtered_streaming_parallelisms = fragment_ids.iter().filter_map(|frag_id|{ + match streaming_parallelisms.get(frag_id) { + Some(parallelism) => Some((*frag_id, *parallelism)), + None => { + tracing::warn!(fragment_id = *frag_id, "streaming parallelism not found"); + None + } + } + }).collect(); + let (upserted, failed) = serving_vnode_mapping.upsert(filtered_streaming_parallelisms, &workers); if !upserted.is_empty() { tracing::debug!("Update serving vnode mapping for fragments {:?}.", upserted.keys()); notification_manager.notify_frontend_without_version(Operation::Update, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_fragment_worker_slot_mapping(&upserted) }));