diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 95cf407f8a56f..011c06284397a 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -24,9 +24,13 @@ message InjectBarrierRequest { message BarrierCompleteResponse { message CreateMviewProgress { + // Note: ideally we should use `executor_id`, but `actor_id` is ok-ish. + // See . uint32 backfill_actor_id = 1; bool done = 2; + // MV backfill snapshot read epoch (0 for Done / Source backfill) uint64 consumed_epoch = 3; + // MV backfill snapshot read rows / Source backfilled rows uint64 consumed_rows = 4; uint32 pending_barrier_num = 5; } diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index e28d41ef09b5e..f10c0e29b60f7 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -28,7 +28,7 @@ use crate::barrier::{ Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, EpochNode, ReplaceTablePlan, }; use crate::manager::{DdlType, MetadataManager}; -use crate::model::{ActorId, TableFragments}; +use crate::model::{ActorId, BackfillUpstreamType, TableFragments}; use crate::MetaResult; type ConsumedRows = u64; @@ -36,28 +36,29 @@ type ConsumedRows = u64; #[derive(Clone, Copy, Debug)] enum BackfillState { Init, - ConsumingUpstream(#[allow(dead_code)] Epoch, ConsumedRows), + ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows), Done(ConsumedRows), } /// Progress of all actors containing backfill executors while creating mview. #[derive(Debug)] pub(super) struct Progress { + // `states` and `done_count` decides whether the progress is done. See `is_done`. states: HashMap, - done_count: usize, + /// Tells whether the backfill is from source or mv. + backfill_upstream_types: HashMap, + + // The following row counts are used to calculate the progress. See `calculate_progress`. /// Upstream mv count. /// Keep track of how many times each upstream MV /// appears in this stream job. upstream_mv_count: HashMap, - - /// Total key count in the upstream materialized view - /// TODO: implement this for source backfill - upstream_total_key_count: u64, - - /// Consumed rows - consumed_rows: u64, + /// Total key count of all the upstream materialized views + upstream_mvs_total_key_count: u64, + mv_backfill_consumed_rows: u64, + source_backfill_consumed_rows: u64, /// DDL definition definition: String, @@ -66,47 +67,55 @@ pub(super) struct Progress { impl Progress { /// Create a [`Progress`] for some creating mview, with all `actors` containing the backfill executors. fn new( - actors: impl IntoIterator, + actors: impl IntoIterator, upstream_mv_count: HashMap, upstream_total_key_count: u64, definition: String, ) -> Self { - let states = actors - .into_iter() - .map(|a| (a, BackfillState::Init)) - .collect::>(); + let mut states = HashMap::new(); + let mut backfill_upstream_types = HashMap::new(); + for (actor, backfill_upstream_type) in actors { + states.insert(actor, BackfillState::Init); + backfill_upstream_types.insert(actor, backfill_upstream_type); + } assert!(!states.is_empty()); Self { states, + backfill_upstream_types, done_count: 0, upstream_mv_count, - upstream_total_key_count, - consumed_rows: 0, + upstream_mvs_total_key_count: upstream_total_key_count, + mv_backfill_consumed_rows: 0, + source_backfill_consumed_rows: 0, definition, } } /// Update the progress of `actor`. fn update(&mut self, actor: ActorId, new_state: BackfillState, upstream_total_key_count: u64) { - self.upstream_total_key_count = upstream_total_key_count; + self.upstream_mvs_total_key_count = upstream_total_key_count; let total_actors = self.states.len(); + let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap(); tracing::debug!(?actor, states = ?self.states, "update progress for actor"); + + let mut old = 0; + let mut new = 0; match self.states.remove(&actor).unwrap() { BackfillState::Init => {} BackfillState::ConsumingUpstream(_, old_consumed_rows) => { - self.consumed_rows -= old_consumed_rows; + old = old_consumed_rows; } BackfillState::Done(_) => panic!("should not report done multiple times"), }; match &new_state { BackfillState::Init => {} BackfillState::ConsumingUpstream(_, new_consumed_rows) => { - self.consumed_rows += new_consumed_rows; + new = *new_consumed_rows; } BackfillState::Done(new_consumed_rows) => { tracing::debug!("actor {} done", actor); - self.consumed_rows += new_consumed_rows; + new = *new_consumed_rows; self.done_count += 1; tracing::debug!( "{} actors out of {} complete", @@ -115,8 +124,19 @@ impl Progress { ); } }; + debug_assert!(new >= old, "backfill progress should not go backward"); + match backfill_upstream_type { + BackfillUpstreamType::MView => { + self.mv_backfill_consumed_rows += new - old; + } + BackfillUpstreamType::Source => { + self.source_backfill_consumed_rows += new - old; + } + BackfillUpstreamType::Values => { + // do not consider progress for values + } + } self.states.insert(actor, new_state); - self.calculate_progress(); } /// Returns whether all backfill executors are done. @@ -137,19 +157,52 @@ impl Progress { } /// `progress` = `consumed_rows` / `upstream_total_key_count` - fn calculate_progress(&self) -> f64 { + fn calculate_progress(&self) -> String { if self.is_done() || self.states.is_empty() { - return 1.0; + return "100%".to_string(); } - let mut upstream_total_key_count = self.upstream_total_key_count as f64; - if upstream_total_key_count == 0.0 { - upstream_total_key_count = 1.0 + let mut mv_count = 0; + let mut source_count = 0; + for backfill_upstream_type in self.backfill_upstream_types.values() { + match backfill_upstream_type { + BackfillUpstreamType::MView => mv_count += 1, + BackfillUpstreamType::Source => source_count += 1, + BackfillUpstreamType::Values => (), + } } - let mut progress = self.consumed_rows as f64 / upstream_total_key_count; - if progress >= 1.0 { - progress = 0.99; + + let mv_progress = (mv_count > 0).then_some({ + if self.upstream_mvs_total_key_count == 0 { + "99.99%".to_string() + } else { + let mut progress = self.mv_backfill_consumed_rows as f64 + / (self.upstream_mvs_total_key_count as f64); + if progress > 1.0 { + progress = 0.9999; + } + format!( + "{:.2}% ({}/{})", + progress * 100.0, + self.mv_backfill_consumed_rows, + self.upstream_mvs_total_key_count + ) + } + }); + let source_progress = (source_count > 0).then_some(format!( + "{} rows consumed", + self.source_backfill_consumed_rows + )); + match (mv_progress, source_progress) { + (Some(mv_progress), Some(source_progress)) => { + format!( + "MView Backfill: {}, Source Backfill: {}", + mv_progress, source_progress + ) + } + (Some(mv_progress), None) => mv_progress, + (None, Some(source_progress)) => source_progress, + (None, None) => "Unknown".to_string(), } - progress } } @@ -231,11 +284,9 @@ pub(super) struct TrackingCommand { /// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`. #[derive(Default, Debug)] pub(super) struct CreateMviewProgressTracker { - // TODO: add a specialized progress for source /// Progress of the create-mview DDL indicated by the `TableId`. progress_map: HashMap, - /// Find the epoch of the create-mview DDL by the actor containing the MV/source backfill executors. actor_map: HashMap, /// Stash of finished jobs. They will be finally finished on checkpoint. @@ -259,14 +310,17 @@ impl CreateMviewProgressTracker { let mut progress_map = HashMap::new(); for (creating_table_id, (definition, table_fragments)) in mview_map { let mut states = HashMap::new(); - let actors = table_fragments.backfill_actor_ids(); - for actor in actors { + let mut backfill_upstream_types = HashMap::new(); + let actors = table_fragments.tracking_progress_actor_ids(); + for (actor, backfill_upstream_type) in actors { actor_map.insert(actor, creating_table_id); states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0)); + backfill_upstream_types.insert(actor, backfill_upstream_type); } let progress = Self::recover_progress( states, + backfill_upstream_types, table_fragments.dependent_table_ids(), definition, &version_stats, @@ -284,28 +338,28 @@ impl CreateMviewProgressTracker { } } + /// ## How recovery works + /// + /// The progress (number of rows consumed) is persisted in state tables. + /// During recovery, the backfill executor will restore the number of rows consumed, + /// and then it will just report progress like newly created executors. fn recover_progress( states: HashMap, + backfill_upstream_types: HashMap, upstream_mv_count: HashMap, definition: String, version_stats: &HummockVersionStats, ) -> Progress { - let upstream_total_key_count = upstream_mv_count - .iter() - .map(|(upstream_mv, count)| { - *count as u64 - * version_stats - .table_stats - .get(&upstream_mv.table_id) - .map_or(0, |stat| stat.total_key_count as u64) - }) - .sum(); + let upstream_mvs_total_key_count = + calculate_total_key_count(&upstream_mv_count, version_stats); Progress { states, + backfill_upstream_types, done_count: 0, // Fill only after first barrier pass upstream_mv_count, - upstream_total_key_count, - consumed_rows: 0, // Fill only after first barrier pass + upstream_mvs_total_key_count, + mv_backfill_consumed_rows: 0, // Fill only after first barrier pass + source_backfill_consumed_rows: 0, // Fill only after first barrier pass definition, } } @@ -318,7 +372,7 @@ impl CreateMviewProgressTracker { let ddl_progress = DdlProgress { id: table_id as u64, statement: x.definition.clone(), - progress: format!("{:.2}%", x.calculate_progress() * 100.0), + progress: x.calculate_progress(), }; (table_id, ddl_progress) }) @@ -377,7 +431,7 @@ impl CreateMviewProgressTracker { /// Finish stashed jobs on checkpoint. pub(super) fn take_finished_jobs(&mut self) -> Vec { - tracing::trace!(finished_jobs=?self.pending_finished_jobs, progress_map=?self.progress_map, "finishing jobs"); + tracing::trace!(finished_jobs=?self.pending_finished_jobs, progress_map=?self.progress_map, "take_finished_jobs"); take(&mut self.pending_finished_jobs) } @@ -449,16 +503,8 @@ impl CreateMviewProgressTracker { upstream_mv_count.insert(*table_id, dispatch_count / actors.len()); } - let upstream_total_key_count: u64 = upstream_mv_count - .iter() - .map(|(upstream_mv, count)| { - *count as u64 - * version_stats - .table_stats - .get(&upstream_mv.table_id) - .map_or(0, |stat| stat.total_key_count as u64) - }) - .sum(); + let upstream_total_key_count: u64 = + calculate_total_key_count(&upstream_mv_count, version_stats); ( upstream_mv_count, upstream_total_key_count, @@ -467,8 +513,8 @@ impl CreateMviewProgressTracker { ) }; - for &actor in &actors { - self.actor_map.insert(actor, creating_mv_id); + for (actor, _backfill_upstream_type) in &actors { + self.actor_map.insert(*actor, creating_mv_id); } let progress = Progress::new( @@ -537,18 +583,8 @@ impl CreateMviewProgressTracker { Entry::Occupied(mut o) => { let progress = &mut o.get_mut().0; - let upstream_total_key_count: u64 = progress - .upstream_mv_count - .iter() - .map(|(upstream_mv, count)| { - assert_ne!(*count, 0); - *count as u64 - * version_stats - .table_stats - .get(&upstream_mv.table_id) - .map_or(0, |stat| stat.total_key_count as u64) - }) - .sum(); + let upstream_total_key_count: u64 = + calculate_total_key_count(&progress.upstream_mv_count, version_stats); tracing::debug!(?table_id, "updating progress for table"); progress.update(actor, new_state, upstream_total_key_count); @@ -577,3 +613,20 @@ impl CreateMviewProgressTracker { } } } + +fn calculate_total_key_count( + table_count: &HashMap, + version_stats: &HummockVersionStats, +) -> u64 { + table_count + .iter() + .map(|(table_id, count)| { + assert_ne!(*count, 0); + *count as u64 + * version_stats + .table_stats + .get(&table_id.table_id) + .map_or(0, |stat| stat.total_key_count as u64) + }) + .sum() +} diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 498950a98014f..6af1fe305dec3 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -87,6 +87,7 @@ impl GlobalBarrierManagerContext { Ok(()) } + // FIXME: didn't consider Values here async fn recover_background_mv_progress(&self) -> MetaResult { let mgr = &self.metadata_manager; let mviews = mgr diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 7eb921e0befa4..5137c74b587cd 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -359,7 +359,7 @@ impl TableFragments { } /// Returns actor ids that need to be tracked when creating MV. - pub fn tracking_progress_actor_ids(&self) -> Vec { + pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> { let mut actor_ids = vec![]; for fragment in self.fragments.values() { if fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32 != 0 { @@ -373,7 +373,12 @@ impl TableFragments { | FragmentTypeFlag::SourceScan as u32)) != 0 { - actor_ids.extend(fragment.actors.iter().map(|actor| actor.actor_id)); + actor_ids.extend(fragment.actors.iter().map(|actor| { + ( + actor.actor_id, + BackfillUpstreamType::from_fragment_type_mask(fragment.fragment_type_mask), + ) + })); } } actor_ids @@ -592,3 +597,36 @@ impl TableFragments { self } } + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BackfillUpstreamType { + MView, + Values, + Source, +} + +impl BackfillUpstreamType { + pub fn from_fragment_type_mask(mask: u32) -> Self { + let is_mview = (mask & FragmentTypeFlag::StreamScan as u32) != 0; + let is_values = (mask & FragmentTypeFlag::Values as u32) != 0; + let is_source = (mask & FragmentTypeFlag::SourceScan as u32) != 0; + + // Note: in theory we can have multiple backfill executors in one fragment, but currently it's not possible. + // See . + debug_assert!( + is_mview as u8 + is_values as u8 + is_source as u8 == 1, + "a backfill fragment should either be mview, value or source, found {:?}", + mask + ); + + if is_mview { + BackfillUpstreamType::MView + } else if is_values { + BackfillUpstreamType::Values + } else if is_source { + BackfillUpstreamType::Source + } else { + unreachable!("invalid fragment type mask: {}", mask); + } + } +} diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index bd166f8082c4c..09b8ffeda81de 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -438,6 +438,8 @@ impl SourceBackfillExecutorInner { yield Message::Barrier(barrier); { + // TODO: persist backfilled row count? + let mut total_backfilled_rows: u64 = 0; let source_backfill_row_count = self .metrics .source_backfill_row_count @@ -581,13 +583,8 @@ impl SourceBackfillExecutorInner { .await?; if self.should_report_finished(&backfill_stage.states) { - // TODO: use a specialized progress for source - // Currently, `CreateMviewProgress` is designed for MV backfill, and rw_ddl_progress calculates - // progress based on the number of consumed rows and an estimated total number of rows from hummock. - // For now, we just rely on the same code path, and for source backfill, the progress will always be 99.99%. tracing::debug!("progress finish"); - let epoch = barrier.epoch; - self.progress.finish(epoch, 114514); + self.progress.finish(barrier.epoch, total_backfilled_rows); // yield barrier after reporting progress yield Message::Barrier(barrier); @@ -599,6 +596,11 @@ impl SourceBackfillExecutorInner { break 'backfill_loop; } } else { + self.progress.update_for_source_backfill( + barrier.epoch, + total_backfilled_rows, + ); + // yield barrier after reporting progress yield Message::Barrier(barrier); } } @@ -665,6 +667,7 @@ impl SourceBackfillExecutorInner { let new_chunk = chunk.clone_with_vis(new_vis); yield Message::Chunk(new_chunk); source_backfill_row_count.inc_by(card as u64); + total_backfilled_rows += card as u64; } } } diff --git a/src/stream/src/task/barrier_manager/progress.rs b/src/stream/src/task/barrier_manager/progress.rs index a91f9b8476111..e8ab8b9f254d7 100644 --- a/src/stream/src/task/barrier_manager/progress.rs +++ b/src/stream/src/task/barrier_manager/progress.rs @@ -213,6 +213,24 @@ impl CreateMviewProgressReporter { ); } + pub fn update_for_source_backfill( + &mut self, + epoch: EpochPair, + current_consumed_rows: ConsumedRows, + ) { + match self.state { + Some(BackfillState::ConsumingUpstream(last_epoch, _last_consumed_rows)) => { + debug_assert_eq!(last_epoch, 0); + } + Some(BackfillState::Done(_)) => unreachable!(), + None => {} + }; + self.update_inner( + epoch, + BackfillState::ConsumingUpstream(0, current_consumed_rows), + ); + } + /// Finish the progress. If the progress is already finished, then perform no-op. /// `current_epoch` should be provided to locate the barrier under concurrent checkpoint. pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) { diff --git a/src/tests/simulation/tests/integration_tests/README.md b/src/tests/simulation/tests/integration_tests/README.md index db8672e299c56..9e0f46307acae 100644 --- a/src/tests/simulation/tests/integration_tests/README.md +++ b/src/tests/simulation/tests/integration_tests/README.md @@ -1,4 +1,4 @@ # Simulation Integration Tests -Take a look at [Developer Guide](https://risingwavelabs.github.io/risingwave/) for more information -about running the integration tests locally. \ No newline at end of file +Take a look at [Developer Guide](https://risingwavelabs.github.io/risingwave/tests/intro.html#integration-tests) for more information +about running the integration tests locally. diff --git a/src/tests/simulation/tests/integration_tests/backfill_tests.rs b/src/tests/simulation/tests/integration_tests/backfill_tests.rs index fb7cb3db6fb8b..7d6569f6bae40 100644 --- a/src/tests/simulation/tests/integration_tests/backfill_tests.rs +++ b/src/tests/simulation/tests/integration_tests/backfill_tests.rs @@ -262,7 +262,7 @@ async fn test_arrangement_backfill_progress() -> Result<()> { let progress = session .run("SELECT progress FROM rw_catalog.rw_ddl_progress") .await?; - let progress = progress.replace('%', ""); + let progress = progress.split_once("%").unwrap().0; let progress = progress.parse::().unwrap(); assert!( (0.5..1.5).contains(&progress), @@ -276,7 +276,7 @@ async fn test_arrangement_backfill_progress() -> Result<()> { let progress = session .run("SELECT progress FROM rw_catalog.rw_ddl_progress") .await?; - let progress = progress.replace('%', ""); + let progress = progress.split_once("%").unwrap().0; let progress = progress.parse::().unwrap(); assert!( (prev_progress - 0.5..prev_progress + 1.5).contains(&progress), diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index 9cc9a6a9ab716..1377fe8730f27 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -318,7 +318,7 @@ async fn test_high_barrier_latency_cancel(config: Configuration) -> Result<()> { .await .unwrap(); tracing::info!(progress, "get progress before cancel stream job"); - let progress = progress.replace('%', ""); + let progress = progress.split_once("%").unwrap().0; let progress = progress.parse::().unwrap(); if progress >= 0.01 { break;