diff --git a/proto/stream_service.proto b/proto/stream_service.proto index e98d1fdaed754..95cf407f8a56f 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -28,6 +28,7 @@ message BarrierCompleteResponse { bool done = 2; uint64 consumed_epoch = 3; uint64 consumed_rows = 4; + uint32 pending_barrier_num = 5; } string request_id = 1; common.Status status = 2; diff --git a/src/meta/src/barrier/creating_job/barrier_control.rs b/src/meta/src/barrier/creating_job/barrier_control.rs index e1d965c969372..83a1dd9cfb5e7 100644 --- a/src/meta/src/barrier/creating_job/barrier_control.rs +++ b/src/meta/src/barrier/creating_job/barrier_control.rs @@ -12,9 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::Bound::{Excluded, Unbounded}; use std::collections::{BTreeMap, HashSet, VecDeque}; use std::mem::take; +use std::ops::Bound::Unbounded; +use std::ops::{Bound, RangeBounds}; use std::time::Instant; use prometheus::HistogramTimer; @@ -26,22 +27,13 @@ use tracing::debug; use crate::rpc::metrics::MetaMetrics; -#[derive(Debug)] -pub(super) enum CreatingStreamingJobBarrierType { - Snapshot, - LogStore, - Upstream, -} - #[derive(Debug)] struct CreatingStreamingJobEpochState { epoch: u64, node_to_collect: HashSet, resps: Vec, - upstream_epoch_to_notify: Option, is_checkpoint: bool, enqueue_time: Instant, - barrier_type: CreatingStreamingJobBarrierType, } #[derive(Debug)] @@ -49,31 +41,30 @@ pub(super) struct CreatingStreamingJobBarrierControl { table_id: TableId, // key is prev_epoch of barrier inflight_barrier_queue: BTreeMap, + backfill_epoch: u64, initial_epoch: Option, max_collected_epoch: Option, - max_attached_epoch: Option, - // newer epoch at the front. should all be checkpoint barrier + // newer epoch at the front. pending_barriers_to_complete: VecDeque, completing_barrier: Option<(CreatingStreamingJobEpochState, HistogramTimer)>, // metrics consuming_snapshot_barrier_latency: LabelGuardedHistogram<2>, consuming_log_store_barrier_latency: LabelGuardedHistogram<2>, - consuming_upstream_barrier_latency: LabelGuardedHistogram<2>, wait_commit_latency: LabelGuardedHistogram<1>, inflight_barrier_num: LabelGuardedIntGauge<1>, } impl CreatingStreamingJobBarrierControl { - pub(super) fn new(table_id: TableId, metrics: &MetaMetrics) -> Self { + pub(super) fn new(table_id: TableId, backfill_epoch: u64, metrics: &MetaMetrics) -> Self { let table_id_str = format!("{}", table_id.table_id); Self { table_id, inflight_barrier_queue: Default::default(), + backfill_epoch, initial_epoch: None, max_collected_epoch: None, - max_attached_epoch: None, pending_barriers_to_complete: Default::default(), completing_barrier: None, @@ -83,9 +74,6 @@ impl CreatingStreamingJobBarrierControl { consuming_log_store_barrier_latency: metrics .snapshot_backfill_barrier_latency .with_guarded_label_values(&[&table_id_str, "consuming_log_store"]), - consuming_upstream_barrier_latency: metrics - .snapshot_backfill_barrier_latency - .with_guarded_label_values(&[&table_id_str, "consuming_upstream"]), wait_commit_latency: metrics .snapshot_backfill_wait_commit_latency .with_guarded_label_values(&[&table_id_str]), @@ -127,7 +115,6 @@ impl CreatingStreamingJobBarrierControl { epoch: u64, node_to_collect: HashSet, is_checkpoint: bool, - barrier_type: CreatingStreamingJobBarrierType, ) { debug!( epoch, @@ -142,17 +129,12 @@ impl CreatingStreamingJobBarrierControl { if let Some(latest_epoch) = self.latest_epoch() { assert!(epoch > latest_epoch, "{} {}", epoch, latest_epoch); } - if let Some(max_attached_epoch) = self.max_attached_epoch { - assert!(epoch > max_attached_epoch); - } let epoch_state = CreatingStreamingJobEpochState { epoch, node_to_collect, resps: vec![], - upstream_epoch_to_notify: None, is_checkpoint, enqueue_time: Instant::now(), - barrier_type, }; if epoch_state.node_to_collect.is_empty() && self.inflight_barrier_queue.is_empty() { self.add_collected(epoch_state); @@ -163,41 +145,6 @@ impl CreatingStreamingJobBarrierControl { .set(self.inflight_barrier_queue.len() as _); } - pub(super) fn unattached_epochs(&self) -> impl Iterator + '_ { - let range_start = if let Some(max_attached_epoch) = self.max_attached_epoch { - Excluded(max_attached_epoch) - } else { - Unbounded - }; - self.inflight_barrier_queue - .range((range_start, Unbounded)) - .map(|(epoch, state)| (*epoch, state.is_checkpoint)) - } - - /// Attach an `upstream_epoch` to the `epoch` of the creating job. - /// - /// The `upstream_epoch` won't be completed until the `epoch` of the creating job is completed so that - /// the `upstream_epoch` should wait for the progress of creating job, and we can ensure that the downstream - /// creating job can eventually catch up with the upstream. - pub(super) fn attach_upstream_epoch(&mut self, epoch: u64, upstream_epoch: u64) { - debug!( - epoch, - upstream_epoch, - table_id = ?self.table_id.table_id, - "attach epoch" - ); - if let Some(max_attached_epoch) = self.max_attached_epoch { - assert!(epoch > max_attached_epoch); - } - self.max_attached_epoch = Some(epoch); - let epoch_state = self - .inflight_barrier_queue - .get_mut(&epoch) - .expect("should exist"); - assert!(epoch_state.upstream_epoch_to_notify.is_none()); - epoch_state.upstream_epoch_to_notify = Some(upstream_epoch); - } - pub(super) fn collect( &mut self, epoch: u64, @@ -228,46 +175,47 @@ impl CreatingStreamingJobBarrierControl { .set(self.inflight_barrier_queue.len() as _); } - #[expect(clippy::type_complexity)] - /// Return (`upstream_epochs_to_notify`, Some((epoch, resps, `is_first_commit`))) + /// Return Some((epoch, resps, `is_first_commit`)) /// - /// `upstream_epochs_to_notify` is the upstream epochs of non-checkpoint barriers to be notified about barrier completing. - /// These non-checkpoint barriers does not need to call `commit_epoch` and therefore can be completed as long as collected. + /// Only epoch within the `epoch_end_bound` can be started. + /// Usually `epoch_end_bound` is the upstream committed epoch. This is to ensure that + /// the creating job won't have higher committed epoch than the upstream. pub(super) fn start_completing( &mut self, - ) -> (Vec, Option<(u64, Vec, bool)>) { - if self.completing_barrier.is_some() { - return (vec![], None); - } - let mut upstream_epochs_to_notify = vec![]; - while let Some(mut epoch_state) = self.pending_barriers_to_complete.pop_back() { + epoch_end_bound: Bound, + ) -> Option<(u64, Vec, bool)> { + assert!(self.completing_barrier.is_none()); + let epoch_range: (Bound, Bound) = (Unbounded, epoch_end_bound); + while let Some(epoch_state) = self.pending_barriers_to_complete.back() + && epoch_range.contains(&epoch_state.epoch) + { + let mut epoch_state = self + .pending_barriers_to_complete + .pop_back() + .expect("non-empty"); let epoch = epoch_state.epoch; let is_first = self.initial_epoch.expect("should have set") == epoch; if is_first { assert!(epoch_state.is_checkpoint); } else if !epoch_state.is_checkpoint { - if let Some(upstream_epoch) = epoch_state.upstream_epoch_to_notify { - upstream_epochs_to_notify.push(upstream_epoch); - } continue; } let resps = take(&mut epoch_state.resps); self.completing_barrier = Some((epoch_state, self.wait_commit_latency.start_timer())); - return (upstream_epochs_to_notify, Some((epoch, resps, is_first))); + return Some((epoch, resps, is_first)); } - (upstream_epochs_to_notify, None) + None } /// Ack on completing a checkpoint barrier. /// /// Return the upstream epoch to be notified when there is any. - pub(super) fn ack_completed(&mut self, completed_epoch: u64) -> Option { + pub(super) fn ack_completed(&mut self, completed_epoch: u64) { let (epoch_state, wait_commit_timer) = self.completing_barrier.take().expect("should exist"); wait_commit_timer.observe_duration(); assert_eq!(epoch_state.epoch, completed_epoch); - epoch_state.upstream_epoch_to_notify } fn add_collected(&mut self, epoch_state: CreatingStreamingJobEpochState) { @@ -280,10 +228,10 @@ impl CreatingStreamingJobBarrierControl { } self.max_collected_epoch = Some(epoch_state.epoch); let barrier_latency = epoch_state.enqueue_time.elapsed().as_secs_f64(); - let barrier_latency_metrics = match &epoch_state.barrier_type { - CreatingStreamingJobBarrierType::Snapshot => &self.consuming_snapshot_barrier_latency, - CreatingStreamingJobBarrierType::LogStore => &self.consuming_log_store_barrier_latency, - CreatingStreamingJobBarrierType::Upstream => &self.consuming_upstream_barrier_latency, + let barrier_latency_metrics = if epoch_state.epoch < self.backfill_epoch { + &self.consuming_snapshot_barrier_latency + } else { + &self.consuming_log_store_barrier_latency }; barrier_latency_metrics.observe(barrier_latency); self.pending_barriers_to_complete.push_front(epoch_state); diff --git a/src/meta/src/barrier/creating_job/mod.rs b/src/meta/src/barrier/creating_job/mod.rs index 42e416d737881..f3ad5a44aa929 100644 --- a/src/meta/src/barrier/creating_job/mod.rs +++ b/src/meta/src/barrier/creating_job/mod.rs @@ -17,25 +17,21 @@ mod status; use std::cmp::max; use std::collections::HashMap; -use std::mem::take; +use std::ops::Bound::{Excluded, Unbounded}; use std::sync::Arc; -use std::time::Duration; -use prometheus::HistogramTimer; -use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge}; -use risingwave_common::util::epoch::Epoch; +use risingwave_common::catalog::TableId; +use risingwave_common::metrics::LabelGuardedIntGauge; use risingwave_meta_model::WorkerId; use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_service::BarrierCompleteResponse; -use tracing::{debug, info}; +use tracing::info; use crate::barrier::command::CommandContext; -use crate::barrier::creating_job::barrier_control::{ - CreatingStreamingJobBarrierControl, CreatingStreamingJobBarrierType, -}; +use crate::barrier::creating_job::barrier_control::CreatingStreamingJobBarrierControl; use crate::barrier::creating_job::status::{ CreatingJobInjectBarrierInfo, CreatingStreamingJobStatus, }; @@ -52,11 +48,12 @@ pub(super) struct CreatingStreamingJobControl { pub(super) snapshot_backfill_info: SnapshotBackfillInfo, backfill_epoch: u64, + graph_info: InflightGraphInfo, + barrier_control: CreatingStreamingJobBarrierControl, status: CreatingStreamingJobStatus, upstream_lag: LabelGuardedIntGauge<1>, - upstream_wait_progress_latency: LabelGuardedHistogram<1>, } impl CreatingStreamingJobControl { @@ -73,6 +70,7 @@ impl CreatingStreamingJobControl { definition = info.definition, "new creating job" ); + let snapshot_backfill_actors = info.table_fragments.snapshot_backfill_actor_ids(); let mut create_mview_tracker = CreateMviewProgressTracker::default(); create_mview_tracker.update_tracking_jobs(Some((&info, None)), [], version_stat); let fragment_info: HashMap<_, _> = info.new_fragment_info().collect(); @@ -85,14 +83,19 @@ impl CreatingStreamingJobControl { Self { info, snapshot_backfill_info, - barrier_control: CreatingStreamingJobBarrierControl::new(table_id, metrics), + barrier_control: CreatingStreamingJobBarrierControl::new( + table_id, + backfill_epoch, + metrics, + ), backfill_epoch, + graph_info: InflightGraphInfo::new(fragment_info), status: CreatingStreamingJobStatus::ConsumingSnapshot { prev_epoch_fake_physical_time: 0, - pending_commands: vec![], + pending_upstream_barriers: vec![], version_stats: version_stat.clone(), create_mview_tracker, - graph_info: InflightGraphInfo::new(fragment_info), + snapshot_backfill_actors, backfill_epoch, pending_non_checkpoint_barriers: vec![], initial_barrier_info: Some((actors_to_create, initial_mutation)), @@ -100,29 +103,16 @@ impl CreatingStreamingJobControl { upstream_lag: metrics .snapshot_backfill_lag .with_guarded_label_values(&[&table_id_str]), - upstream_wait_progress_latency: metrics - .snapshot_backfill_upstream_wait_progress_latency - .with_guarded_label_values(&[&table_id_str]), } } - pub(super) fn start_wait_progress_timer(&self) -> HistogramTimer { - self.upstream_wait_progress_latency.start_timer() - } - pub(super) fn is_wait_on_worker(&self, worker_id: WorkerId) -> bool { self.barrier_control.is_wait_on_worker(worker_id) - || self - .status - .active_graph_info() - .map(|info| info.contains_worker(worker_id)) - .unwrap_or(false) + || (self.status.is_finishing() && self.graph_info.contains_worker(worker_id)) } pub(super) fn on_new_worker_node_map(&self, node_map: &HashMap) { - if let Some(info) = self.status.active_graph_info() { - info.on_new_worker_node_map(node_map) - } + self.graph_info.on_new_worker_node_map(node_map) } pub(super) fn gen_ddl_progress(&self) -> DdlProgress { @@ -142,32 +132,15 @@ impl CreatingStreamingJobControl { } } CreatingStreamingJobStatus::ConsumingLogStore { - start_consume_log_store_epoch, + log_store_progress_tracker, .. } => { - let max_collected_epoch = max( - self.barrier_control.max_collected_epoch().unwrap_or(0), - self.backfill_epoch, - ); - let lag = Duration::from_millis( - Epoch(*start_consume_log_store_epoch) - .physical_time() - .saturating_sub(Epoch(max_collected_epoch).physical_time()), - ); format!( - "LogStore [remain lag: {:?}, epoch cnt: {}]", - lag, - self.barrier_control.inflight_barrier_count() + "LogStore [{}]", + log_store_progress_tracker.gen_ddl_progress() ) } - CreatingStreamingJobStatus::ConsumingUpstream { .. } => { - format!( - "Upstream [unattached: {}, epoch cnt: {}]", - self.barrier_control.unattached_epochs().count(), - self.barrier_control.inflight_barrier_count(), - ) - } - CreatingStreamingJobStatus::Finishing { .. } => { + CreatingStreamingJobStatus::Finishing(_) => { format!( "Finishing [epoch count: {}]", self.barrier_control.inflight_barrier_count() @@ -182,84 +155,43 @@ impl CreatingStreamingJobControl { } pub(super) fn pinned_upstream_log_epoch(&self) -> Option { - let stop_consume_log_store_epoch = match &self.status { - CreatingStreamingJobStatus::ConsumingSnapshot { .. } - | CreatingStreamingJobStatus::ConsumingLogStore { .. } => None, - CreatingStreamingJobStatus::ConsumingUpstream { - start_consume_upstream_epoch, - .. - } - | CreatingStreamingJobStatus::Finishing { - start_consume_upstream_epoch, - .. - } => Some(*start_consume_upstream_epoch), - }; - if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() { - if max_collected_epoch < self.backfill_epoch { - Some(self.backfill_epoch) - } else if let Some(stop_consume_log_store_epoch) = stop_consume_log_store_epoch - && max_collected_epoch >= stop_consume_log_store_epoch - { - None - } else { - Some(max_collected_epoch) - } + if self.status.is_finishing() { + None } else { - Some(self.backfill_epoch) + // TODO: when supporting recoverable snapshot backfill, we should use the max epoch that has committed + Some(max( + self.barrier_control.max_collected_epoch().unwrap_or(0), + self.backfill_epoch, + )) } } - pub(super) fn may_inject_fake_barrier( - &mut self, + fn inject_barrier( + table_id: TableId, control_stream_manager: &mut ControlStreamManager, - upstream_prev_epoch: u64, - is_checkpoint: bool, + barrier_control: &mut CreatingStreamingJobBarrierControl, + pre_applied_graph_info: &InflightGraphInfo, + applied_graph_info: Option<&InflightGraphInfo>, + CreatingJobInjectBarrierInfo { + curr_epoch, + prev_epoch, + kind, + new_actors, + mutation, + }: CreatingJobInjectBarrierInfo, ) -> MetaResult<()> { - if let Some((barriers_to_inject, graph_info)) = - self.status.may_inject_fake_barrier(is_checkpoint) - { - if let Some(graph_info) = graph_info { - info!( - table_id = self.info.table_fragments.table_id().table_id, - upstream_prev_epoch, "start consuming log store" - ); - self.status = CreatingStreamingJobStatus::ConsumingLogStore { - graph_info, - start_consume_log_store_epoch: upstream_prev_epoch, - }; - } - let graph_info = self - .status - .active_graph_info() - .expect("must exist when having barriers to inject"); - let table_id = self.info.table_fragments.table_id(); - for CreatingJobInjectBarrierInfo { - curr_epoch, - prev_epoch, - kind, - new_actors, - mutation, - } in barriers_to_inject - { - let node_to_collect = control_stream_manager.inject_barrier( - Some(table_id), - mutation, - (&curr_epoch, &prev_epoch), - &kind, - graph_info, - Some(graph_info), - new_actors, - vec![], - vec![], - )?; - self.barrier_control.enqueue_epoch( - prev_epoch.value().0, - node_to_collect, - kind.is_checkpoint(), - CreatingStreamingJobBarrierType::Snapshot, - ); - } - } + let node_to_collect = control_stream_manager.inject_barrier( + Some(table_id), + mutation, + (&curr_epoch, &prev_epoch), + &kind, + pre_applied_graph_info, + applied_graph_info, + new_actors, + vec![], + vec![], + )?; + barrier_control.enqueue_epoch(prev_epoch.value().0, node_to_collect, kind.is_checkpoint()); Ok(()) } @@ -267,7 +199,7 @@ impl CreatingStreamingJobControl { &mut self, control_stream_manager: &mut ControlStreamManager, command_ctx: &Arc, - ) -> MetaResult>> { + ) -> MetaResult<()> { let table_id = self.info.table_fragments.table_id(); let start_consume_upstream = if let Command::MergeSnapshotBackfillStreamingJobs( jobs_to_merge, @@ -277,6 +209,13 @@ impl CreatingStreamingJobControl { } else { false }; + if start_consume_upstream { + info!( + table_id = self.info.table_fragments.table_id().table_id, + prev_epoch = command_ctx.prev_epoch.value().0, + "start consuming upstream" + ); + } let progress_epoch = if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() { max(max_collected_epoch, self.backfill_epoch) @@ -290,125 +229,24 @@ impl CreatingStreamingJobControl { .0 .saturating_sub(progress_epoch) as _, ); - let graph_to_finish = match &mut self.status { - CreatingStreamingJobStatus::ConsumingSnapshot { - pending_commands, .. - } => { - assert!( - !start_consume_upstream, - "should not start consuming upstream for a job that are consuming snapshot" - ); - pending_commands.push(command_ctx.clone()); - None - } - CreatingStreamingJobStatus::ConsumingLogStore { graph_info, .. } => { - let node_to_collect = control_stream_manager.inject_barrier( - Some(table_id), - if start_consume_upstream { - // erase the mutation on upstream except the last command - command_ctx.to_mutation() - } else { - None - }, - (&command_ctx.curr_epoch, &command_ctx.prev_epoch), - &command_ctx.kind, - graph_info, - Some(graph_info), - None, - vec![], - vec![], - )?; - self.barrier_control.enqueue_epoch( - command_ctx.prev_epoch.value().0, - node_to_collect, - command_ctx.kind.is_checkpoint(), - CreatingStreamingJobBarrierType::LogStore, - ); - let prev_epoch = command_ctx.prev_epoch.value().0; + if let Some(barrier_to_inject) = self + .status + .on_new_upstream_epoch(command_ctx, start_consume_upstream) + { + Self::inject_barrier( + self.info.table_fragments.table_id(), + control_stream_manager, + &mut self.barrier_control, + &self.graph_info, if start_consume_upstream { - let graph_info = take(graph_info); - info!( - table_id = self.info.table_fragments.table_id().table_id, - prev_epoch, "start consuming upstream" - ); - self.status = CreatingStreamingJobStatus::ConsumingUpstream { - start_consume_upstream_epoch: prev_epoch, - graph_info, - }; - } - None - } - CreatingStreamingJobStatus::ConsumingUpstream { - start_consume_upstream_epoch, - graph_info, - } => { - assert!( - !start_consume_upstream, - "should not start consuming upstream for a job again" - ); - - let should_finish = command_ctx.kind.is_checkpoint() - && self.barrier_control.unattached_epochs().next().is_none(); - let node_to_collect = control_stream_manager.inject_barrier( - Some(table_id), - // do not send the upstream barrier mutation because in `ConsumingUpstream` stage, - // barriers are still injected and collected independently on the creating jobs. - None, - (&command_ctx.curr_epoch, &command_ctx.prev_epoch), - &command_ctx.kind, - graph_info, - if should_finish { - None - } else { - Some(graph_info) - }, - None, - vec![], - vec![], - )?; - let prev_epoch = command_ctx.prev_epoch.value().0; - self.barrier_control.enqueue_epoch( - prev_epoch, - node_to_collect, - command_ctx.kind.is_checkpoint(), - CreatingStreamingJobBarrierType::Upstream, - ); - let graph_info = if should_finish { - info!(prev_epoch, table_id = ?self.info.table_fragments.table_id(), "mark as finishing"); - self.barrier_control - .attach_upstream_epoch(prev_epoch, prev_epoch); - let graph_info = take(graph_info); - self.status = CreatingStreamingJobStatus::Finishing { - start_consume_upstream_epoch: *start_consume_upstream_epoch, - }; - Some(Some(graph_info)) + None } else { - let mut unattached_epochs_iter = self.barrier_control.unattached_epochs(); - let mut epoch_to_attach = unattached_epochs_iter.next().expect("non-empty").0; - let mut remain_count = 5; - while remain_count > 0 - && let Some((epoch, _)) = unattached_epochs_iter.next() - { - remain_count -= 1; - epoch_to_attach = epoch; - } - drop(unattached_epochs_iter); - self.barrier_control - .attach_upstream_epoch(epoch_to_attach, prev_epoch); - Some(None) - }; - - graph_info - } - CreatingStreamingJobStatus::Finishing { .. } => { - assert!( - !start_consume_upstream, - "should not start consuming upstream for a job again" - ); - None - } - }; - Ok(graph_to_finish) + Some(&self.graph_info) + }, + barrier_to_inject, + )?; + } + Ok(()) } pub(super) fn collect( @@ -416,57 +254,99 @@ impl CreatingStreamingJobControl { epoch: u64, worker_id: WorkerId, resp: BarrierCompleteResponse, - ) { - self.status.update_progress(&resp.create_mview_progress); + control_stream_manager: &mut ControlStreamManager, + ) -> MetaResult<()> { + let prev_barriers_to_inject = self.status.update_progress(&resp.create_mview_progress); self.barrier_control.collect(epoch, worker_id, resp); + if let Some(prev_barriers_to_inject) = prev_barriers_to_inject { + let table_id = self.info.table_fragments.table_id(); + for info in prev_barriers_to_inject { + Self::inject_barrier( + table_id, + control_stream_manager, + &mut self.barrier_control, + &self.graph_info, + Some(&self.graph_info), + info, + )?; + } + } + Ok(()) } pub(super) fn should_merge_to_upstream(&self) -> Option { - if let ( - CreatingStreamingJobStatus::ConsumingLogStore { - graph_info, - start_consume_log_store_epoch, - }, - Some(max_collected_epoch), - ) = (&self.status, self.barrier_control.max_collected_epoch()) + if let CreatingStreamingJobStatus::ConsumingLogStore { + ref log_store_progress_tracker, + } = &self.status + && log_store_progress_tracker.is_finished() { - if max_collected_epoch >= *start_consume_log_store_epoch { - Some(graph_info.clone()) - } else { - let lag = Duration::from_millis( - Epoch(*start_consume_log_store_epoch).physical_time() - - Epoch(max_collected_epoch).physical_time(), - ); - debug!( - ?lag, - max_collected_epoch, start_consume_log_store_epoch, "wait consuming log store" - ); - None - } + Some(self.graph_info.clone()) } else { None } } +} - #[expect(clippy::type_complexity)] +pub(super) enum CompleteJobType { + /// The first barrier + First, + Normal, + /// The last barrier to complete + Finished, +} + +impl CreatingStreamingJobControl { pub(super) fn start_completing( &mut self, - ) -> (Vec, Option<(u64, Vec, bool)>) { - self.barrier_control.start_completing() + min_upstream_inflight_epoch: Option, + ) -> Option<(u64, Vec, CompleteJobType)> { + let (finished_at_epoch, epoch_end_bound) = match &self.status { + CreatingStreamingJobStatus::Finishing(finish_at_epoch) => { + let epoch_end_bound = min_upstream_inflight_epoch + .map(|upstream_epoch| { + if upstream_epoch < *finish_at_epoch { + Excluded(upstream_epoch) + } else { + Unbounded + } + }) + .unwrap_or(Unbounded); + (Some(*finish_at_epoch), epoch_end_bound) + } + CreatingStreamingJobStatus::ConsumingSnapshot { .. } + | CreatingStreamingJobStatus::ConsumingLogStore { .. } => ( + None, + min_upstream_inflight_epoch + .map(Excluded) + .unwrap_or(Unbounded), + ), + }; + self.barrier_control.start_completing(epoch_end_bound).map( + |(epoch, resps, is_first_commit)| { + let status = if let Some(finish_at_epoch) = finished_at_epoch { + assert!(!is_first_commit); + if epoch == finish_at_epoch { + self.barrier_control.ack_completed(epoch); + assert!(self.barrier_control.is_empty()); + CompleteJobType::Finished + } else { + CompleteJobType::Normal + } + } else if is_first_commit { + CompleteJobType::First + } else { + CompleteJobType::Normal + }; + (epoch, resps, status) + }, + ) } - pub(super) fn ack_completed(&mut self, completed_epoch: u64) -> Option<(u64, bool)> { - let upstream_epoch_to_notify = self.barrier_control.ack_completed(completed_epoch); - if let Some(upstream_epoch_to_notify) = upstream_epoch_to_notify { - Some((upstream_epoch_to_notify, self.is_finished())) - } else { - assert!(!self.is_finished()); - None - } + pub(super) fn ack_completed(&mut self, completed_epoch: u64) { + self.barrier_control.ack_completed(completed_epoch); } pub(super) fn is_finished(&self) -> bool { - self.barrier_control.is_empty() - && matches!(&self.status, CreatingStreamingJobStatus::Finishing { .. }) + self.barrier_control.is_empty() && self.status.is_finishing() } } diff --git a/src/meta/src/barrier/creating_job/status.rs b/src/meta/src/barrier/creating_job/status.rs index bfc5b4bbe7faf..f599990eff999 100644 --- a/src/meta/src/barrier/creating_job/status.rs +++ b/src/meta/src/barrier/creating_job/status.rs @@ -12,30 +12,101 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::hash_map::Entry; +use std::collections::{HashMap, HashSet}; use std::mem::take; use std::sync::Arc; +use risingwave_common::hash::ActorId; use risingwave_common::util::epoch::Epoch; use risingwave_meta_model::WorkerId; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::StreamActor; -use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; +use risingwave_pb::stream_service::barrier_complete_response::{ + CreateMviewProgress, PbCreateMviewProgress, +}; +use tracing::warn; use crate::barrier::command::CommandContext; -use crate::barrier::info::InflightGraphInfo; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::{BarrierKind, TracedEpoch}; +#[derive(Debug)] +pub(super) struct CreateMviewLogStoreProgressTracker { + /// `actor_id` -> `pending_barrier_count` + ongoing_actors: HashMap, + finished_actors: HashSet, +} + +impl CreateMviewLogStoreProgressTracker { + fn new(actors: impl Iterator, initial_pending_count: usize) -> Self { + Self { + ongoing_actors: HashMap::from_iter(actors.map(|actor| (actor, initial_pending_count))), + finished_actors: HashSet::new(), + } + } + + pub(super) fn gen_ddl_progress(&self) -> String { + let sum = self.ongoing_actors.values().sum::() as f64; + let count = if self.ongoing_actors.is_empty() { + 1 + } else { + self.ongoing_actors.len() + } as f64; + let avg = sum / count; + format!( + "finished: {}/{}, avg epoch count {}", + self.finished_actors.len(), + self.ongoing_actors.len() + self.finished_actors.len(), + avg + ) + } + + fn update(&mut self, progress: impl IntoIterator) { + for progress in progress { + match self.ongoing_actors.entry(progress.backfill_actor_id) { + Entry::Occupied(mut entry) => { + if progress.done { + entry.remove_entry(); + assert!( + self.finished_actors.insert(progress.backfill_actor_id), + "non-duplicate" + ); + } else { + *entry.get_mut() = progress.pending_barrier_num as _; + } + } + Entry::Vacant(_) => { + if cfg!(debug_assertions) { + panic!( + "reporting progress on non-inflight actor: {:?} {:?}", + progress, self + ); + } else { + warn!(?progress, progress_tracker = ?self, "reporting progress on non-inflight actor"); + } + } + } + } + } + + pub(super) fn is_finished(&self) -> bool { + self.ongoing_actors.is_empty() + } +} + #[derive(Debug)] pub(super) enum CreatingStreamingJobStatus { + /// The creating job is consuming upstream snapshot. + /// Will transit to `ConsumingLogStore` on `update_progress` when + /// the snapshot has been fully consumed after `update_progress`. ConsumingSnapshot { prev_epoch_fake_physical_time: u64, - pending_commands: Vec>, + pending_upstream_barriers: Vec<(TracedEpoch, TracedEpoch, BarrierKind)>, version_stats: HummockVersionStats, create_mview_tracker: CreateMviewProgressTracker, - graph_info: InflightGraphInfo, + snapshot_backfill_actors: HashSet, backfill_epoch: u64, /// The `prev_epoch` of pending non checkpoint barriers pending_non_checkpoint_barriers: Vec, @@ -43,17 +114,16 @@ pub(super) enum CreatingStreamingJobStatus { /// Take the mutation out when injecting the first barrier initial_barrier_info: Option<(HashMap>, Mutation)>, }, + /// The creating job is consuming log store. + /// + /// Will transit to `Finishing` on `on_new_upstream_epoch` when `start_consume_upstream` is `true`. ConsumingLogStore { - graph_info: InflightGraphInfo, - start_consume_log_store_epoch: u64, - }, - ConsumingUpstream { - start_consume_upstream_epoch: u64, - graph_info: InflightGraphInfo, - }, - Finishing { - start_consume_upstream_epoch: u64, + log_store_progress_tracker: CreateMviewLogStoreProgressTracker, }, + /// All backfill actors have started consuming upstream, and the job + /// will be finished when all previously injected barriers have been collected + /// Store the `prev_epoch` that will finish at. + Finishing(u64), } pub(super) struct CreatingJobInjectBarrierInfo { @@ -65,79 +135,138 @@ pub(super) struct CreatingJobInjectBarrierInfo { } impl CreatingStreamingJobStatus { - pub(super) fn active_graph_info(&self) -> Option<&InflightGraphInfo> { - match self { - CreatingStreamingJobStatus::ConsumingSnapshot { graph_info, .. } - | CreatingStreamingJobStatus::ConsumingLogStore { graph_info, .. } - | CreatingStreamingJobStatus::ConsumingUpstream { graph_info, .. } => Some(graph_info), - CreatingStreamingJobStatus::Finishing { .. } => { - // when entering `Finishing`, the graph will have been added to the upstream graph, - // and therefore the separate graph info is inactive. - None - } - } - } - pub(super) fn update_progress( &mut self, create_mview_progress: impl IntoIterator, - ) { - if let Self::ConsumingSnapshot { - create_mview_tracker, - ref version_stats, - .. - } = self - { - create_mview_tracker.update_tracking_jobs(None, create_mview_progress, version_stats); - } - } - - /// return - /// - Some(vec[(`curr_epoch`, `prev_epoch`, `barrier_kind`)]) of barriers to newly inject - /// - Some(`graph_info`) when the status should transit to `ConsumingLogStore` - pub(super) fn may_inject_fake_barrier( - &mut self, - is_checkpoint: bool, - ) -> Option<(Vec, Option)> { - if let CreatingStreamingJobStatus::ConsumingSnapshot { - prev_epoch_fake_physical_time, - pending_commands, - create_mview_tracker, - graph_info, - pending_non_checkpoint_barriers, - ref backfill_epoch, - initial_barrier_info, - .. - } = self - { - if create_mview_tracker.has_pending_finished_jobs() { - assert!(initial_barrier_info.is_none()); - pending_non_checkpoint_barriers.push(*backfill_epoch); + ) -> Option> { + match self { + Self::ConsumingSnapshot { + create_mview_tracker, + ref version_stats, + prev_epoch_fake_physical_time, + pending_upstream_barriers, + pending_non_checkpoint_barriers, + ref backfill_epoch, + initial_barrier_info, + ref snapshot_backfill_actors, + .. + } => { + create_mview_tracker.update_tracking_jobs( + None, + create_mview_progress, + version_stats, + ); + if create_mview_tracker.has_pending_finished_jobs() { + let (new_actors, mutation) = match initial_barrier_info.take() { + Some((new_actors, mutation)) => (Some(new_actors), Some(mutation)), + None => (None, None), + }; + assert!(initial_barrier_info.is_none()); + pending_non_checkpoint_barriers.push(*backfill_epoch); - let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time); - let barriers_to_inject = - [CreatingJobInjectBarrierInfo { + let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time); + let barriers_to_inject: Vec<_> = [CreatingJobInjectBarrierInfo { curr_epoch: TracedEpoch::new(Epoch(*backfill_epoch)), prev_epoch: TracedEpoch::new(prev_epoch), kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)), - new_actors: None, - mutation: None, + new_actors, + mutation, }] .into_iter() - .chain(pending_commands.drain(..).map(|command_ctx| { - CreatingJobInjectBarrierInfo { - curr_epoch: command_ctx.curr_epoch.clone(), - prev_epoch: command_ctx.prev_epoch.clone(), - kind: command_ctx.kind.clone(), + .chain(pending_upstream_barriers.drain(..).map( + |(prev_epoch, curr_epoch, kind)| CreatingJobInjectBarrierInfo { + curr_epoch, + prev_epoch, + kind, new_actors: None, mutation: None, - } - })) + }, + )) .collect(); - let graph_info = take(graph_info); - Some((barriers_to_inject, Some(graph_info))) - } else { + *self = CreatingStreamingJobStatus::ConsumingLogStore { + log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new( + snapshot_backfill_actors.iter().cloned(), + barriers_to_inject.len(), + ), + }; + Some(barriers_to_inject) + } else { + None + } + } + CreatingStreamingJobStatus::ConsumingLogStore { + log_store_progress_tracker, + .. + } => { + log_store_progress_tracker.update(create_mview_progress); + None + } + CreatingStreamingJobStatus::Finishing(_) => None, + } + } + + pub(super) fn on_new_upstream_epoch( + &mut self, + command_ctx: &Arc, + start_consume_upstream: bool, + ) -> Option { + match self { + CreatingStreamingJobStatus::ConsumingSnapshot { + pending_upstream_barriers, + prev_epoch_fake_physical_time, + pending_non_checkpoint_barriers, + initial_barrier_info, + .. + } => { + assert!( + !start_consume_upstream, + "should not start consuming upstream for a job that are consuming snapshot" + ); + pending_upstream_barriers.push(( + command_ctx.prev_epoch.clone(), + command_ctx.curr_epoch.clone(), + command_ctx.kind.clone(), + )); + Some(CreatingStreamingJobStatus::new_fake_barrier( + prev_epoch_fake_physical_time, + pending_non_checkpoint_barriers, + initial_barrier_info, + command_ctx.kind.is_checkpoint(), + )) + } + CreatingStreamingJobStatus::ConsumingLogStore { .. } => { + let prev_epoch = command_ctx.prev_epoch.value().0; + if start_consume_upstream { + assert!(command_ctx.kind.is_checkpoint()); + *self = CreatingStreamingJobStatus::Finishing(prev_epoch); + } + Some(CreatingJobInjectBarrierInfo { + curr_epoch: command_ctx.curr_epoch.clone(), + prev_epoch: command_ctx.prev_epoch.clone(), + kind: command_ctx.kind.clone(), + new_actors: None, + mutation: None, + }) + } + CreatingStreamingJobStatus::Finishing { .. } => { + assert!( + !start_consume_upstream, + "should not start consuming upstream for a job again" + ); + None + } + } + } + + pub(super) fn new_fake_barrier( + prev_epoch_fake_physical_time: &mut u64, + pending_non_checkpoint_barriers: &mut Vec, + initial_barrier_info: &mut Option<(HashMap>, Mutation)>, + is_checkpoint: bool, + ) -> CreatingJobInjectBarrierInfo { + { + { let prev_epoch = TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time)); *prev_epoch_fake_physical_time += 1; @@ -155,19 +284,18 @@ impl CreatingStreamingJobStatus { } else { Default::default() }; - Some(( - vec![CreatingJobInjectBarrierInfo { - curr_epoch, - prev_epoch, - kind, - new_actors, - mutation, - }], - None, - )) + CreatingJobInjectBarrierInfo { + curr_epoch, + prev_epoch, + kind, + new_actors, + mutation, + } } - } else { - None } } + + pub(super) fn is_finishing(&self) -> bool { + matches!(self, Self::Finishing(_)) + } } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 119aed6f0158d..efee9b715c316 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -50,7 +50,7 @@ use tracing::{debug, error, info, warn, Instrument}; use self::command::CommandContext; use self::notifier::Notifier; -use crate::barrier::creating_job::CreatingStreamingJobControl; +use crate::barrier::creating_job::{CompleteJobType, CreatingStreamingJobControl}; use crate::barrier::info::InflightGraphInfo; use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob}; use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager}; @@ -294,7 +294,7 @@ impl CheckpointControl { command_ctx: Arc, notifiers: Vec, node_to_collect: HashSet, - jobs_to_wait: HashSet, + creating_jobs_to_wait: HashSet, ) { let timer = self.context.metrics.barrier_latency.start_timer(); @@ -307,27 +307,9 @@ impl CheckpointControl { tracing::trace!( prev_epoch = command_ctx.prev_epoch.value().0, - ?jobs_to_wait, + ?creating_jobs_to_wait, "enqueue command" ); - let creating_jobs_to_wait = jobs_to_wait - .into_iter() - .map(|table_id| { - ( - table_id, - if node_to_collect.is_empty() { - Some( - self.creating_streaming_job_controls - .get(&table_id) - .expect("should exist") - .start_wait_progress_timer(), - ) - } else { - None - }, - ) - }) - .collect(); self.command_ctx_queue.insert( command_ctx.prev_epoch.value().0, EpochNode { @@ -346,7 +328,11 @@ impl CheckpointControl { /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them. - fn barrier_collected(&mut self, resp: BarrierCompleteResponse) { + fn barrier_collected( + &mut self, + resp: BarrierCompleteResponse, + control_stream_manager: &mut ControlStreamManager, + ) -> MetaResult<()> { let worker_id = resp.worker_id; let prev_epoch = resp.epoch; tracing::trace!( @@ -358,19 +344,6 @@ impl CheckpointControl { if resp.partial_graph_id == u32::MAX { if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) { assert!(node.state.node_to_collect.remove(&(worker_id as _))); - if node.state.node_to_collect.is_empty() { - node.state - .creating_jobs_to_wait - .iter_mut() - .for_each(|(table_id, timer)| { - *timer = Some( - self.creating_streaming_job_controls - .get(table_id) - .expect("should exist") - .start_wait_progress_timer(), - ); - }); - } node.state.resps.push(resp); } else { panic!( @@ -383,8 +356,9 @@ impl CheckpointControl { self.creating_streaming_job_controls .get_mut(&creating_table_id) .expect("should exist") - .collect(prev_epoch, worker_id as _, resp); + .collect(prev_epoch, worker_id as _, resp, control_stream_manager)?; } + Ok(()) } /// Pause inject barrier until True. @@ -500,9 +474,9 @@ struct BarrierEpochState { resps: Vec, - creating_jobs_to_wait: HashMap>, + creating_jobs_to_wait: HashSet, - finished_jobs: HashMap, + finished_jobs: HashMap)>, } impl BarrierEpochState { @@ -789,12 +763,8 @@ impl GlobalBarrierManager { } } (worker_id, resp_result) = self.control_stream_manager.next_complete_barrier_response() => { - match resp_result { - Ok(resp) => { - self.checkpoint_control.barrier_collected(resp); - - } - Err(e) => { + if let Err(e) = resp_result.and_then(|resp| self.checkpoint_control.barrier_collected(resp, &mut self.control_stream_manager)) { + { let failed_command = self.checkpoint_control.command_wait_collect_from_worker(worker_id as _); if failed_command.is_some() || self.state.inflight_graph_info.contains_worker(worker_id as _) @@ -927,19 +897,6 @@ impl GlobalBarrierManager { ); } - // may inject fake barrier - for creating_job in self - .checkpoint_control - .creating_streaming_job_controls - .values_mut() - { - creating_job.may_inject_fake_barrier( - &mut self.control_stream_manager, - prev_epoch.value().0, - checkpoint, - )? - } - self.pending_non_checkpoint_barriers .push(prev_epoch.value().0); let kind = if checkpoint { @@ -958,8 +915,14 @@ impl GlobalBarrierManager { command = Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge); } - let (pre_applied_graph_info, pre_applied_subscription_info) = - self.state.apply_command(&command); + let command = command; + + let ( + pre_applied_graph_info, + pre_applied_subscription_info, + table_ids_to_commit, + jobs_to_wait, + ) = self.state.apply_command(&command); // Tracing related stuff prev_epoch.span().in_scope(|| { @@ -967,8 +930,6 @@ impl GlobalBarrierManager { }); span.record("epoch", curr_epoch.value().0); - let table_ids_to_commit: HashSet<_> = pre_applied_graph_info.existing_table_ids().collect(); - let command_ctx = Arc::new(CommandContext::new( self.active_streaming_nodes.current().clone(), pre_applied_subscription_info, @@ -984,18 +945,12 @@ impl GlobalBarrierManager { send_latency_timer.observe_duration(); - let mut jobs_to_wait = HashSet::new(); - - for (table_id, creating_job) in &mut self.checkpoint_control.creating_streaming_job_controls + for creating_job in &mut self + .checkpoint_control + .creating_streaming_job_controls + .values_mut() { - if let Some(wait_job) = - creating_job.on_new_command(&mut self.control_stream_manager, &command_ctx)? - { - jobs_to_wait.insert(*table_id); - if let Some(graph_to_finish) = wait_job { - self.state.inflight_graph_info.extend(graph_to_finish); - } - } + creating_job.on_new_command(&mut self.control_stream_manager, &command_ctx)?; } let node_to_collect = match self.control_stream_manager.inject_command_ctx_barrier( @@ -1269,6 +1224,51 @@ impl CheckpointControl { &mut self, mut scheduled_barriers: Option<&mut ScheduledBarriers>, ) -> Option { + // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough + let mut creating_jobs_task = vec![]; + { + // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough + let mut finished_jobs = Vec::new(); + let min_upstream_inflight_barrier = self + .command_ctx_queue + .first_key_value() + .map(|(epoch, _)| *epoch); + for (table_id, job) in &mut self.creating_streaming_job_controls { + if let Some((epoch, resps, status)) = + job.start_completing(min_upstream_inflight_barrier) + { + let is_first_time = match status { + CompleteJobType::First => true, + CompleteJobType::Normal => false, + CompleteJobType::Finished => { + finished_jobs.push((*table_id, epoch, resps)); + continue; + } + }; + creating_jobs_task.push((*table_id, epoch, resps, is_first_time)); + } + } + for (table_id, epoch, resps) in finished_jobs { + let epoch_state = &mut self + .command_ctx_queue + .get_mut(&epoch) + .expect("should exist") + .state; + assert!(epoch_state.creating_jobs_to_wait.remove(&table_id)); + debug!(epoch, ?table_id, "finish creating job"); + // It's safe to remove the creating job, because on CompleteJobType::Finished, + // all previous barriers have been collected and completed. + let creating_streaming_job = self + .creating_streaming_job_controls + .remove(&table_id) + .expect("should exist"); + assert!(creating_streaming_job.is_finished()); + assert!(epoch_state + .finished_jobs + .insert(table_id, (creating_streaming_job.info, resps)) + .is_none()); + } + } let mut task = None; while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() && !state.is_inflight() @@ -1296,16 +1296,12 @@ impl CheckpointControl { } continue; } - let commit_info = collect_commit_epoch_info( - take(&mut node.state.resps), - &node.command_ctx, - self.collect_backfill_pinned_upstream_log_epoch(), - ); let table_ids_to_finish = node .state .finished_jobs .drain() - .map(|(table_id, info)| { + .map(|(table_id, (info, resps))| { + node.state.resps.extend(resps); finished_jobs.push(TrackingJob::New(TrackingCommand { info, replace_table_info: None, @@ -1313,6 +1309,11 @@ impl CheckpointControl { table_id }) .collect(); + let commit_info = collect_commit_epoch_info( + take(&mut node.state.resps), + &node.command_ctx, + self.collect_backfill_pinned_upstream_log_epoch(), + ); task = Some(CompleteBarrierTask { commit_info, finished_jobs, @@ -1324,35 +1325,21 @@ impl CheckpointControl { break; } } - { - { - for (table_id, job) in &mut self.creating_streaming_job_controls { - let (upstream_epochs_to_notify, commit_info) = job.start_completing(); - for upstream_epoch in upstream_epochs_to_notify { - let wait_progress_timer = self - .command_ctx_queue - .get_mut(&upstream_epoch) - .expect("should exist") - .state - .creating_jobs_to_wait - .remove(table_id) - .expect("should exist"); - if let Some(timer) = wait_progress_timer { - timer.observe_duration(); - } - } - if let Some((epoch, resps, is_first_time)) = commit_info { - let task = task.get_or_insert_default(); - GlobalBarrierManagerContext::collect_creating_job_commit_epoch_info( - &mut task.commit_info, - epoch, - resps, - job.info.table_fragments.all_table_ids().map(TableId::new), - is_first_time, - ); - task.creating_job_epochs.push((*table_id, epoch)); - } - } + if !creating_jobs_task.is_empty() { + let task = task.get_or_insert_default(); + for (table_id, epoch, resps, is_first_time) in creating_jobs_task { + GlobalBarrierManagerContext::collect_creating_job_commit_epoch_info( + &mut task.commit_info, + epoch, + resps, + self.creating_streaming_job_controls[&table_id] + .info + .table_fragments + .all_table_ids() + .map(TableId::new), + is_first_time, + ); + task.creating_job_epochs.push((table_id, epoch)); } } task @@ -1420,40 +1407,10 @@ impl CheckpointControl { { for (table_id, epoch) in creating_job_epochs { - if let Some((upstream_epoch, is_finished)) = self - .creating_streaming_job_controls + self.creating_streaming_job_controls .get_mut(&table_id) .expect("should exist") .ack_completed(epoch) - { - let wait_progress_timer = self - .command_ctx_queue - .get_mut(&upstream_epoch) - .expect("should exist") - .state - .creating_jobs_to_wait - .remove(&table_id) - .expect("should exist"); - if let Some(timer) = wait_progress_timer { - timer.observe_duration(); - } - if is_finished { - debug!(epoch, ?table_id, "finish creating job"); - let creating_streaming_job = self - .creating_streaming_job_controls - .remove(&table_id) - .expect("should exist"); - assert!(creating_streaming_job.is_finished()); - assert!(self - .command_ctx_queue - .get_mut(&upstream_epoch) - .expect("should exist") - .state - .finished_jobs - .insert(table_id, creating_streaming_job.info) - .is_none()); - } - } } Ok(BarrierCompleteOutput { diff --git a/src/meta/src/barrier/state.rs b/src/meta/src/barrier/state.rs index db2ded5629d7a..d9fe6f13c963c 100644 --- a/src/meta/src/barrier/state.rs +++ b/src/meta/src/barrier/state.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + +use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::Epoch; use risingwave_pb::meta::PausedReason; @@ -83,10 +86,17 @@ impl BarrierManagerState { /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors /// will be removed from the state after the info get resolved. + /// + /// Return (`graph_info`, `subscription_info`, `table_ids_to_commit`, `jobs_to_wait`) pub fn apply_command( &mut self, command: &Command, - ) -> (InflightGraphInfo, InflightSubscriptionInfo) { + ) -> ( + InflightGraphInfo, + InflightSubscriptionInfo, + HashSet, + HashSet, + ) { // update the fragment_infos outside pre_apply let fragment_changes = if let Command::CreateStreamingJob { job_type: CreateStreamingJobType::SnapshotBackfill(_), @@ -108,8 +118,19 @@ impl BarrierManagerState { if let Some(fragment_changes) = fragment_changes { self.inflight_graph_info.post_apply(&fragment_changes); } + + let mut table_ids_to_commit: HashSet<_> = info.existing_table_ids().collect(); + let mut jobs_to_wait = HashSet::new(); + if let Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) = command { + for (table_id, (_, graph_info)) in jobs_to_merge { + jobs_to_wait.insert(*table_id); + table_ids_to_commit.extend(graph_info.existing_table_ids()); + self.inflight_graph_info.extend(graph_info.clone()); + } + } + self.inflight_subscription_info.post_apply(command); - (info, subscription_info) + (info, subscription_info, table_ids_to_commit, jobs_to_wait) } } diff --git a/src/meta/src/rpc/metrics.rs b/src/meta/src/rpc/metrics.rs index 2d5020edb04be..8e0d639cf0ded 100644 --- a/src/meta/src/rpc/metrics.rs +++ b/src/meta/src/rpc/metrics.rs @@ -83,9 +83,6 @@ pub struct MetaMetrics { pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec<2>, // (table_id, barrier_type) /// The latency of commit epoch of `table_id` pub snapshot_backfill_wait_commit_latency: LabelGuardedHistogramVec<1>, // (table_id, ) - /// The latency that the upstream waits on the snapshot backfill progress after the upstream - /// has collected the barrier. - pub snapshot_backfill_upstream_wait_progress_latency: LabelGuardedHistogramVec<1>, /* (table_id, ) */ /// The lags between the upstream epoch and the downstream epoch. pub snapshot_backfill_lag: LabelGuardedIntGaugeVec<1>, // (table_id, ) /// The number of inflight barriers of `table_id` @@ -282,13 +279,7 @@ impl MetaMetrics { ); let snapshot_backfill_wait_commit_latency = register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap(); - let opts = histogram_opts!( - "meta_snapshot_backfill_upstream_wait_progress_latency", - "snapshot backfill upstream_wait_progress_latency", - exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s - ); - let snapshot_backfill_upstream_wait_progress_latency = - register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap(); + let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!( "meta_snapshot_backfill_upstream_lag", "snapshot backfill upstream_lag", @@ -759,7 +750,6 @@ impl MetaMetrics { last_committed_barrier_time, snapshot_backfill_barrier_latency, snapshot_backfill_wait_commit_latency, - snapshot_backfill_upstream_wait_progress_latency, snapshot_backfill_lag, snapshot_backfill_inflight_barrier_num, recovery_failure_cnt, diff --git a/src/stream/src/executor/backfill/snapshot_backfill.rs b/src/stream/src/executor/backfill/snapshot_backfill.rs index 89801a3cf4133..eb1325141fdfd 100644 --- a/src/stream/src/executor/backfill/snapshot_backfill.rs +++ b/src/stream/src/executor/backfill/snapshot_backfill.rs @@ -12,17 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::min; use std::collections::VecDeque; use std::future::{pending, Future}; -use std::mem::replace; +use std::mem::{replace, take}; use std::sync::Arc; use anyhow::anyhow; use futures::future::Either; use futures::{pin_mut, Stream, TryStreamExt}; -use itertools::Itertools; use risingwave_common::array::{Op, StreamChunk}; -use risingwave_common::catalog::TableId; use risingwave_common::metrics::LabelGuardedIntCounter; use risingwave_common::row::OwnedRow; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; @@ -32,7 +31,6 @@ use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::ChangeLogRow; use risingwave_storage::StateStore; use tokio::select; -use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedReceiver; use crate::executor::backfill::utils::{create_builder, mapping_chunk}; @@ -40,7 +38,7 @@ use crate::executor::monitor::StreamingMetrics; use crate::executor::prelude::{try_stream, StreamExt}; use crate::executor::{ expect_first_barrier, ActorContextRef, BackfillExecutor, Barrier, BoxedMessageStream, - DispatcherBarrier, DispatcherMessage, Execute, MergeExecutorInput, Message, Mutation, + DispatcherBarrier, DispatcherMessage, Execute, MergeExecutorInput, Message, StreamExecutorError, StreamExecutorResult, }; use crate::task::CreateMviewProgressReporter; @@ -101,35 +99,14 @@ impl SnapshotBackfillExecutor { #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_inner(mut self) { debug!("snapshot backfill executor start"); - let upstream_table_id = self.upstream_table.table_id(); let first_barrier = expect_first_barrier(&mut self.upstream).await?; debug!(epoch = ?first_barrier.epoch, "get first upstream barrier"); let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?; debug!(epoch = ?first_recv_barrier.epoch, "get first inject barrier"); let should_backfill = first_barrier.epoch != first_recv_barrier.epoch; - let mut barrier_epoch = { + let (mut barrier_epoch, mut need_report_finish) = { if should_backfill { - let subscriber_ids = first_recv_barrier - .added_subscriber_on_mv_table(upstream_table_id) - .collect_vec(); - let snapshot_backfill_table_fragment_id = match subscriber_ids.as_slice() { - [] => { - return Err(anyhow!( - "first recv barrier on backfill should add subscriber on upstream" - ) - .into()); - } - [snapshot_backfill_table_fragment_id] => *snapshot_backfill_table_fragment_id, - multiple => { - return Err(anyhow!( - "first recv barrier on backfill have multiple subscribers {:?} on upstream table {}", - multiple, upstream_table_id.table_id - ) - .into()); - } - }; - let table_id_str = format!("{}", self.upstream_table.table_id().table_id); let actor_id_str = format!("{}", self.actor_ctx.id); @@ -138,12 +115,8 @@ impl SnapshotBackfillExecutor { .snapshot_backfill_consume_row_count .with_guarded_label_values(&[&table_id_str, &actor_id_str, "consume_upstream"]); - let mut upstream_buffer = UpstreamBuffer::new( - &mut self.upstream, - upstream_table_id, - snapshot_backfill_table_fragment_id, - consume_upstream_row_count, - ); + let mut upstream_buffer = + UpstreamBuffer::new(&mut self.upstream, consume_upstream_row_count); let first_barrier_epoch = first_barrier.epoch; @@ -165,7 +138,7 @@ impl SnapshotBackfillExecutor { self.rate_limit, &mut self.barrier_rx, &self.output_indices, - self.progress, + &mut self.progress, first_recv_barrier, ); @@ -187,12 +160,11 @@ impl SnapshotBackfillExecutor { yield Message::Barrier(recv_barrier); } - let mut upstream_buffer = - upstream_buffer.start_consuming_log_store(&mut self.barrier_rx); + let mut upstream_buffer = upstream_buffer.start_consuming_log_store(); let mut barrier_epoch = first_barrier_epoch; - let initial_pending_barrier = upstream_buffer.state.barrier_count(); + let initial_pending_barrier = upstream_buffer.barrier_count(); info!( ?barrier_epoch, table_id = self.upstream_table.table_id().table_id, @@ -210,37 +182,50 @@ impl SnapshotBackfillExecutor { ]); // Phase 2: consume upstream log store - while let Some(barrier) = upstream_buffer.take_buffered_barrier().await? { - assert_eq!(barrier_epoch.curr, barrier.epoch.prev); - barrier_epoch = barrier.epoch; - - debug!(?barrier_epoch, kind = ?barrier.kind, "before consume change log"); - // use `upstream_buffer.run_future` to poll upstream concurrently so that we won't have back-pressure - // on the upstream. Otherwise, in `batch_iter_log_with_pk_bounds`, we may wait upstream epoch to be committed, - // and the back-pressure may cause the upstream unable to consume the barrier and then cause deadlock. - let stream = upstream_buffer - .run_future(self.upstream_table.batch_iter_log_with_pk_bounds( - barrier_epoch.prev, - HummockReadEpoch::Committed(barrier_epoch.prev), - )) - .await?; - let data_types = self.upstream_table.schema().data_types(); - let builder = create_builder(None, self.chunk_size, data_types); - let stream = read_change_log(stream, builder); - pin_mut!(stream); - while let Some(chunk) = upstream_buffer.run_future(stream.try_next()).await? { - debug!( - ?barrier_epoch, - size = chunk.cardinality(), - "consume change log yield chunk", - ); - consuming_log_store_row_count.inc_by(chunk.cardinality() as _); - yield Message::Chunk(chunk); - } + while let Some(upstream_barriers) = + upstream_buffer.next_checkpoint_barrier().await? + { + for upstream_barrier in upstream_barriers { + let barrier = receive_next_barrier(&mut self.barrier_rx).await?; + assert_eq!(upstream_barrier.epoch, barrier.epoch); + assert_eq!(barrier_epoch.curr, barrier.epoch.prev); + barrier_epoch = barrier.epoch; + + debug!(?barrier_epoch, kind = ?barrier.kind, "before consume change log"); + // use `upstream_buffer.run_future` to poll upstream concurrently so that we won't have back-pressure + // on the upstream. Otherwise, in `batch_iter_log_with_pk_bounds`, we may wait upstream epoch to be committed, + // and the back-pressure may cause the upstream unable to consume the barrier and then cause deadlock. + let stream = upstream_buffer + .run_future(self.upstream_table.batch_iter_log_with_pk_bounds( + barrier_epoch.prev, + HummockReadEpoch::Committed(barrier_epoch.prev), + )) + .await?; + let data_types = self.upstream_table.schema().data_types(); + let builder = create_builder(None, self.chunk_size, data_types); + let stream = read_change_log(stream, builder); + pin_mut!(stream); + while let Some(chunk) = + upstream_buffer.run_future(stream.try_next()).await? + { + debug!( + ?barrier_epoch, + size = chunk.cardinality(), + "consume change log yield chunk", + ); + consuming_log_store_row_count.inc_by(chunk.cardinality() as _); + yield Message::Chunk(chunk); + } + + debug!(?barrier_epoch, "after consume change log"); - debug!(?barrier_epoch, "after consume change log"); + self.progress.update_create_mview_log_store_progress( + barrier.epoch, + upstream_buffer.barrier_count(), + ); - yield Message::Barrier(barrier); + yield Message::Barrier(barrier); + } } info!( @@ -248,7 +233,7 @@ impl SnapshotBackfillExecutor { table_id = self.upstream_table.table_id().table_id, "finish consuming log store" ); - barrier_epoch + (barrier_epoch, true) } else { info!( table_id = self.upstream_table.table_id().table_id, @@ -257,7 +242,7 @@ impl SnapshotBackfillExecutor { let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?; assert_eq!(first_barrier.epoch, first_recv_barrier.epoch); yield Message::Barrier(first_recv_barrier); - first_barrier.epoch + (first_barrier.epoch, false) } }; let mut upstream = self.upstream.into_executor(self.barrier_rx).execute(); @@ -266,6 +251,10 @@ impl SnapshotBackfillExecutor { if let Message::Barrier(barrier) = &msg { assert_eq!(barrier.epoch.prev, barrier_epoch.curr); barrier_epoch = barrier.epoch; + if need_report_finish { + need_report_finish = false; + self.progress.finish_consuming_log_store(barrier_epoch); + } } yield msg; } @@ -328,146 +317,84 @@ async fn read_change_log( } } -trait UpstreamBufferState { - // The future must be cancellation-safe - async fn is_finished(&mut self) -> StreamExecutorResult; - fn on_upstream_barrier(&mut self, upstream_barrier: DispatcherBarrier); -} - -struct StateOfConsumingSnapshot { - pending_barriers: Vec, -} - -impl UpstreamBufferState for StateOfConsumingSnapshot { - async fn is_finished(&mut self) -> StreamExecutorResult { - // never finish when consuming snapshot - Ok(false) - } - - fn on_upstream_barrier(&mut self, upstream_barrier: DispatcherBarrier) { - self.pending_barriers.push(upstream_barrier) - } -} - -struct StateOfConsumingLogStore<'a> { - barrier_rx: &'a mut mpsc::UnboundedReceiver, - /// Barriers received from upstream but not yet received the barrier from local barrier worker - /// newer barrier at the front - upstream_pending_barriers: VecDeque, - /// Barriers received from both upstream and local barrier worker - /// newer barrier at the front - barriers: VecDeque, - is_finished: bool, - current_subscriber_id: u32, - upstream_table_id: TableId, -} - -impl<'a> StateOfConsumingLogStore<'a> { - fn barrier_count(&self) -> usize { - self.upstream_pending_barriers.len() + self.barriers.len() - } - - async fn handle_one_pending_barrier(&mut self) -> StreamExecutorResult { - assert!(!self.is_finished); - let barrier = receive_next_barrier(self.barrier_rx).await?; - assert_eq!( - self.upstream_pending_barriers - .pop_back() - .expect("non-empty") - .epoch, - barrier.epoch - ); - if is_finish_barrier(&barrier, self.current_subscriber_id, self.upstream_table_id) { - self.is_finished = true; - } - Ok(barrier) - } -} - -impl<'a> UpstreamBufferState for StateOfConsumingLogStore<'a> { - async fn is_finished(&mut self) -> StreamExecutorResult { - while !self.upstream_pending_barriers.is_empty() { - let barrier = self.handle_one_pending_barrier().await?; - self.barriers.push_front(barrier); - } - if self.is_finished { - assert!(self.upstream_pending_barriers.is_empty()); - } - Ok(self.is_finished) - } - - fn on_upstream_barrier(&mut self, upstream_barrier: DispatcherBarrier) { - self.upstream_pending_barriers.push_front(upstream_barrier); - } -} +struct ConsumingSnapshot; +struct ConsumingLogStore; struct UpstreamBuffer<'a, S> { upstream: &'a mut MergeExecutorInput, - state: S, + max_pending_checkpoint_barrier_num: usize, + pending_non_checkpoint_barriers: Vec, + /// Barriers received from upstream but not yet received the barrier from local barrier worker. + /// + /// In the outer `VecDeque`, newer barriers at the front. + /// In the inner `Vec`, newer barrier at the back, with the last barrier as checkpoint barrier, + /// and others as non-checkpoint barrier + upstream_pending_barriers: VecDeque>, + /// Whether we have started polling any upstream data before the next barrier. + /// When `true`, we should continue polling until the next barrier, because + /// some data in this epoch have been discarded and data in this epoch + /// must be read from log store + is_polling_epoch_data: bool, consume_upstream_row_count: LabelGuardedIntCounter<3>, - upstream_table_id: TableId, - current_subscriber_id: u32, + _phase: S, } -impl<'a> UpstreamBuffer<'a, StateOfConsumingSnapshot> { +impl<'a> UpstreamBuffer<'a, ConsumingSnapshot> { fn new( upstream: &'a mut MergeExecutorInput, - upstream_table_id: TableId, - current_subscriber_id: u32, consume_upstream_row_count: LabelGuardedIntCounter<3>, ) -> Self { Self { upstream, - state: StateOfConsumingSnapshot { - pending_barriers: vec![], - }, + is_polling_epoch_data: false, consume_upstream_row_count, - upstream_table_id, - current_subscriber_id, + pending_non_checkpoint_barriers: vec![], + upstream_pending_barriers: Default::default(), + // no limit on the number of pending barrier in the beginning + max_pending_checkpoint_barrier_num: usize::MAX, + _phase: ConsumingSnapshot {}, } } - fn start_consuming_log_store<'s>( - self, - barrier_rx: &'s mut UnboundedReceiver, - ) -> UpstreamBuffer<'a, StateOfConsumingLogStore<'s>> { - let StateOfConsumingSnapshot { pending_barriers } = self.state; - let mut upstream_pending_barriers = VecDeque::with_capacity(pending_barriers.len()); - for pending_barrier in pending_barriers { - upstream_pending_barriers.push_front(pending_barrier); - } + fn start_consuming_log_store(self) -> UpstreamBuffer<'a, ConsumingLogStore> { + let max_pending_barrier_num = self.barrier_count(); UpstreamBuffer { upstream: self.upstream, - state: StateOfConsumingLogStore { - barrier_rx, - upstream_pending_barriers, - barriers: Default::default(), - is_finished: false, - current_subscriber_id: self.current_subscriber_id, - upstream_table_id: self.upstream_table_id, - }, + pending_non_checkpoint_barriers: self.pending_non_checkpoint_barriers, + upstream_pending_barriers: self.upstream_pending_barriers, + max_pending_checkpoint_barrier_num: max_pending_barrier_num, + is_polling_epoch_data: self.is_polling_epoch_data, consume_upstream_row_count: self.consume_upstream_row_count, - upstream_table_id: self.upstream_table_id, - current_subscriber_id: self.current_subscriber_id, + _phase: ConsumingLogStore {}, } } } -impl<'a, S: UpstreamBufferState> UpstreamBuffer<'a, S> { +impl<'a, S> UpstreamBuffer<'a, S> { async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError { - if let Err(e) = try { - while !self.state.is_finished().await? { - self.consume_until_next_barrier().await?; + { + loop { + if let Err(e) = try { + if self.upstream_pending_barriers.len() + >= self.max_pending_checkpoint_barrier_num + { + // pause the future to block consuming upstream + return pending().await; + } + let barrier = self.consume_until_next_checkpoint_barrier().await?; + self.upstream_pending_barriers.push_front(barrier); + } { + break e; + } } - } { - return e; } - pending().await } /// Consume the upstream until seeing the next barrier. /// `pending_barriers` must be non-empty after this method returns. - async fn consume_until_next_barrier(&mut self) -> StreamExecutorResult<()> { + async fn consume_until_next_checkpoint_barrier( + &mut self, + ) -> StreamExecutorResult> { loop { let msg: DispatcherMessage = self .upstream @@ -476,63 +403,54 @@ impl<'a, S: UpstreamBufferState> UpstreamBuffer<'a, S> { .ok_or_else(|| anyhow!("end of upstream"))?; match msg { DispatcherMessage::Chunk(chunk) => { + self.is_polling_epoch_data = true; self.consume_upstream_row_count .inc_by(chunk.cardinality() as _); } DispatcherMessage::Barrier(barrier) => { - self.state.on_upstream_barrier(barrier); - break Ok(()); + let is_checkpoint = barrier.kind.is_checkpoint(); + self.pending_non_checkpoint_barriers.push(barrier); + if is_checkpoint { + self.is_polling_epoch_data = false; + break Ok(take(&mut self.pending_non_checkpoint_barriers)); + } else { + self.is_polling_epoch_data = true; + } + } + DispatcherMessage::Watermark(_) => { + self.is_polling_epoch_data = true; } - DispatcherMessage::Watermark(_) => {} } } } } -impl<'a, 's> UpstreamBuffer<'a, StateOfConsumingLogStore<'s>> { - async fn take_buffered_barrier(&mut self) -> StreamExecutorResult> { - Ok(if let Some(barrier) = self.state.barriers.pop_back() { - Some(barrier) - } else if !self.state.upstream_pending_barriers.is_empty() { - let barrier = self.state.handle_one_pending_barrier().await?; - Some(barrier) - } else if self.state.is_finished { - None - } else { - self.consume_until_next_barrier().await?; - let barrier = self.state.handle_one_pending_barrier().await?; - Some(barrier) - }) - } -} - -fn is_finish_barrier( - barrier: &Barrier, - current_subscriber_id: u32, - upstream_table_id: TableId, -) -> bool { - if let Some(Mutation::DropSubscriptions { - subscriptions_to_drop, - }) = barrier.mutation.as_deref() - { - let is_finished = subscriptions_to_drop - .iter() - .any(|(subscriber_id, _)| *subscriber_id == current_subscriber_id); - if is_finished { - assert!(subscriptions_to_drop.iter().any( - |(subscriber_id, subscribed_upstream_table_id)| { - *subscriber_id == current_subscriber_id - && upstream_table_id == *subscribed_upstream_table_id +impl<'a> UpstreamBuffer<'a, ConsumingLogStore> { + async fn next_checkpoint_barrier( + &mut self, + ) -> StreamExecutorResult>> { + Ok( + if let Some(barriers) = self.upstream_pending_barriers.pop_back() { + // sub(1) to ensure that the lag is monotonically decreasing. + self.max_pending_checkpoint_barrier_num = min( + self.upstream_pending_barriers.len(), + self.max_pending_checkpoint_barrier_num.saturating_sub(1), + ); + Some(barriers) + } else { + self.max_pending_checkpoint_barrier_num = 0; + if self.is_polling_epoch_data { + let barriers = self.consume_until_next_checkpoint_barrier().await?; + Some(barriers) + } else { + None } - )) - } - is_finished - } else { - false + }, + ) } } -impl<'a, S: UpstreamBufferState> UpstreamBuffer<'a, S> { +impl<'a, S> UpstreamBuffer<'a, S> { /// Run a future while concurrently polling the upstream so that the upstream /// won't be back-pressured. async fn run_future>( @@ -550,6 +468,10 @@ impl<'a, S: UpstreamBufferState> UpstreamBuffer<'a, S> { } } } + + fn barrier_count(&self) -> usize { + self.upstream_pending_barriers.len() + } } async fn receive_next_barrier( @@ -589,7 +511,7 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>( rate_limit: Option, barrier_rx: &'a mut UnboundedReceiver, output_indices: &'a [usize], - mut progress: CreateMviewProgressReporter, + progress: &'a mut CreateMviewProgressReporter, first_recv_barrier: Barrier, ) { let mut barrier_epoch = first_recv_barrier.epoch; diff --git a/src/stream/src/task/barrier_manager/progress.rs b/src/stream/src/task/barrier_manager/progress.rs index 9b2820bb3bfed..a91f9b8476111 100644 --- a/src/stream/src/task/barrier_manager/progress.rs +++ b/src/stream/src/task/barrier_manager/progress.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::assert_matches::assert_matches; use std::fmt::{Display, Formatter}; use risingwave_common::util::epoch::EpochPair; @@ -27,23 +28,30 @@ type ConsumedRows = u64; #[derive(Debug, Clone, Copy)] pub(crate) enum BackfillState { - ConsumingUpstream(ConsumedEpoch, ConsumedRows), - Done(ConsumedRows), + ConsumingUpstreamTable(ConsumedEpoch, ConsumedRows), + DoneConsumingUpstreamTable(ConsumedRows), + ConsumingLogStore { pending_barrier_num: usize }, + DoneConsumingLogStore, } impl BackfillState { pub fn to_pb(self, actor_id: ActorId) -> PbCreateMviewProgress { + let (done, consumed_epoch, consumed_rows, pending_barrier_num) = match self { + BackfillState::ConsumingUpstreamTable(consumed_epoch, consumed_rows) => { + (false, consumed_epoch, consumed_rows, 0) + } + BackfillState::DoneConsumingUpstreamTable(consumed_rows) => (true, 0, consumed_rows, 0), /* unused field for done */ + BackfillState::ConsumingLogStore { + pending_barrier_num, + } => (false, 0, 0, pending_barrier_num as _), + BackfillState::DoneConsumingLogStore => (true, 0, 0, 0), + }; PbCreateMviewProgress { backfill_actor_id: actor_id, - done: matches!(self, BackfillState::Done(_)), - consumed_epoch: match self { - BackfillState::ConsumingUpstream(consumed_epoch, _) => consumed_epoch, - BackfillState::Done(_) => 0, // unused field for done - }, - consumed_rows: match self { - BackfillState::ConsumingUpstream(_, consumed_rows) => consumed_rows, - BackfillState::Done(consumed_rows) => consumed_rows, - }, + done, + consumed_epoch, + consumed_rows, + pending_barrier_num, } } } @@ -51,10 +59,27 @@ impl BackfillState { impl Display for BackfillState { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - BackfillState::ConsumingUpstream(epoch, rows) => { - write!(f, "ConsumingUpstream(epoch: {}, rows: {})", epoch, rows) + BackfillState::ConsumingUpstreamTable(epoch, rows) => { + write!( + f, + "ConsumingUpstreamTable(epoch: {}, rows: {})", + epoch, rows + ) + } + BackfillState::DoneConsumingUpstreamTable(rows) => { + write!(f, "DoneConsumingUpstreamTable(rows: {})", rows) + } + BackfillState::ConsumingLogStore { + pending_barrier_num, + } => { + write!( + f, + "ConsumingLogStore(pending_barrier_num: {pending_barrier_num})" + ) + } + BackfillState::DoneConsumingLogStore => { + write!(f, "DoneConsumingLogStore") } - BackfillState::Done(rows) => write!(f, "Done(rows: {})", rows), } } } @@ -165,7 +190,7 @@ impl CreateMviewProgressReporter { current_consumed_rows: ConsumedRows, ) { match self.state { - Some(BackfillState::ConsumingUpstream(last, last_consumed_rows)) => { + Some(BackfillState::ConsumingUpstreamTable(last, last_consumed_rows)) => { assert!( last < consumed_epoch, "last_epoch: {:#?} must be greater than consumed epoch: {:#?}", @@ -174,22 +199,61 @@ impl CreateMviewProgressReporter { ); assert!(last_consumed_rows <= current_consumed_rows); } - Some(BackfillState::Done(_)) => unreachable!(), + Some(state) => { + panic!( + "should not update consuming progress at invalid state: {:?}", + state + ) + } None => {} }; self.update_inner( epoch, - BackfillState::ConsumingUpstream(consumed_epoch, current_consumed_rows), + BackfillState::ConsumingUpstreamTable(consumed_epoch, current_consumed_rows), ); } /// Finish the progress. If the progress is already finished, then perform no-op. /// `current_epoch` should be provided to locate the barrier under concurrent checkpoint. pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) { - if let Some(BackfillState::Done(_)) = self.state { + if let Some(BackfillState::DoneConsumingUpstreamTable(_)) = self.state { return; } - self.update_inner(epoch, BackfillState::Done(current_consumed_rows)); + self.update_inner( + epoch, + BackfillState::DoneConsumingUpstreamTable(current_consumed_rows), + ); + } + + pub(crate) fn update_create_mview_log_store_progress( + &mut self, + epoch: EpochPair, + pending_barrier_num: usize, + ) { + assert_matches!( + self.state, + Some(BackfillState::DoneConsumingUpstreamTable(_)) + | Some(BackfillState::ConsumingLogStore { .. }), + "cannot update log store progress at state {:?}", + self.state + ); + self.update_inner( + epoch, + BackfillState::ConsumingLogStore { + pending_barrier_num, + }, + ); + } + + pub(crate) fn finish_consuming_log_store(&mut self, epoch: EpochPair) { + assert_matches!( + self.state, + Some(BackfillState::DoneConsumingUpstreamTable(_)) + | Some(BackfillState::ConsumingLogStore { .. }), + "cannot finish log store progress at state {:?}", + self.state + ); + self.update_inner(epoch, BackfillState::DoneConsumingLogStore); } } @@ -201,7 +265,7 @@ impl LocalBarrierManager { /// /// When all backfill executors of the creating mview finish, the creation progress will be done at /// frontend and the mview will be exposed to the user. - pub fn register_create_mview_progress( + pub(crate) fn register_create_mview_progress( &self, backfill_actor_id: ActorId, ) -> CreateMviewProgressReporter {