From 6263ea63592bcfa159082658969a1a86585ed473 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Mon, 28 Oct 2024 12:40:57 +0800 Subject: [PATCH] refactor(meta): reorganize global barrier manager field (#18920) --- src/meta/node/src/server.rs | 16 +- src/meta/src/barrier/command.rs | 114 ++-- src/meta/src/barrier/creating_job/mod.rs | 40 +- src/meta/src/barrier/creating_job/status.rs | 56 +- src/meta/src/barrier/info.rs | 17 +- src/meta/src/barrier/mod.rs | 571 +++++++++++--------- src/meta/src/barrier/progress.rs | 2 +- src/meta/src/barrier/recovery.rs | 67 +-- src/meta/src/barrier/rpc.rs | 51 +- src/meta/src/barrier/schedule.rs | 2 +- src/meta/src/barrier/state.rs | 58 +- src/meta/src/rpc/ddl_controller.rs | 7 +- 12 files changed, 545 insertions(+), 456 deletions(-) diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index c03914498d81a..1e9a71cdda2b0 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -25,6 +25,7 @@ use risingwave_common::telemetry::manager::TelemetryManager; use risingwave_common::telemetry::{report_scarf_enabled, report_to_scarf, telemetry_env_enabled}; use risingwave_common::util::tokio_util::sync::CancellationToken; use risingwave_common_service::{MetricsManager, TracingExtractLayer}; +use risingwave_meta::barrier::GlobalBarrierManager; use risingwave_meta::controller::catalog::CatalogController; use risingwave_meta::controller::cluster::ClusterController; use risingwave_meta::controller::IN_MEMORY_STORE; @@ -80,7 +81,7 @@ use thiserror_ext::AsReport; use tokio::sync::watch; use crate::backup_restore::BackupManager; -use crate::barrier::{BarrierScheduler, GlobalBarrierManager}; +use crate::barrier::BarrierScheduler; use crate::controller::system_param::SystemParamsController; use crate::controller::SqlMetaStore; use crate::hummock::HummockManager; @@ -490,7 +491,7 @@ pub async fn start_service_as_election_leader( env.clone(), )); - let barrier_manager = GlobalBarrierManager::new( + let (barrier_manager, join_handle, shutdown_rx) = GlobalBarrierManager::start( scheduled_barriers, env.clone(), metadata_manager.clone(), @@ -501,6 +502,7 @@ pub async fn start_service_as_election_leader( scale_controller.clone(), ) .await; + sub_tasks.push((join_handle, shutdown_rx)); { let source_manager = source_manager.clone(); @@ -546,7 +548,7 @@ pub async fn start_service_as_election_leader( metadata_manager.clone(), stream_manager.clone(), source_manager.clone(), - barrier_manager.context().clone(), + barrier_manager.clone(), sink_manager.clone(), meta_metrics.clone(), ) @@ -558,12 +560,11 @@ pub async fn start_service_as_election_leader( metadata_manager.clone(), source_manager, stream_manager.clone(), - barrier_manager.context().clone(), + barrier_manager.clone(), scale_controller.clone(), ); - let cluster_srv = - ClusterServiceImpl::new(metadata_manager.clone(), barrier_manager.context().clone()); + let cluster_srv = ClusterServiceImpl::new(metadata_manager.clone(), barrier_manager.clone()); let stream_srv = StreamServiceImpl::new( env.clone(), barrier_scheduler.clone(), @@ -630,12 +631,11 @@ pub async fn start_service_as_election_leader( .await, ); - if cfg!(not(test)) { + { sub_tasks.push(ClusterController::start_heartbeat_checker( metadata_manager.cluster_controller.clone(), Duration::from_secs(1), )); - sub_tasks.push(GlobalBarrierManager::start(barrier_manager)); if !env.opts.disable_automatic_parallelism_control { sub_tasks.push(stream_manager.start_auto_parallelism_monitor()); diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index a5b7efe63098d..ef4a571f67d11 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -41,8 +41,7 @@ use risingwave_pb::stream_service::WaitEpochCommitRequest; use tracing::warn; use super::info::{CommandFragmentChanges, InflightGraphInfo}; -use super::trace::TracedEpoch; -use crate::barrier::{GlobalBarrierManagerContext, InflightSubscriptionInfo}; +use crate::barrier::{BarrierInfo, GlobalBarrierWorkerContext, InflightSubscriptionInfo}; use crate::controller::fragment::InflightFragmentInfo; use crate::manager::{DdlType, StreamingJob}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; @@ -206,7 +205,7 @@ pub enum CreateStreamingJobType { SnapshotBackfill(SnapshotBackfillInfo), } -/// [`Command`] is the input of [`crate::barrier::GlobalBarrierManager`]. For different commands, +/// [`Command`] is the input of [`crate::barrier::GlobalBarrierWorker`]. For different commands, /// it will build different barriers to send, and may do different stuffs after the barrier is /// collected. #[derive(Debug, Clone, strum::Display)] @@ -439,22 +438,15 @@ pub struct CommandContext { pub node_map: HashMap, pub subscription_info: InflightSubscriptionInfo, - pub prev_epoch: TracedEpoch, - pub curr_epoch: TracedEpoch, + pub barrier_info: BarrierInfo, pub table_ids_to_commit: HashSet, - pub current_paused_reason: Option, - pub command: Command, - pub kind: BarrierKind, - - barrier_manager_context: GlobalBarrierManagerContext, - /// The tracing span of this command. /// - /// Differs from [`TracedEpoch`], this span focuses on the lifetime of the corresponding + /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding /// barrier, including the process of waiting for the barrier to be sent, flowing through the /// stream graph on compute nodes, and finishing its `post_collect` stuffs. pub _span: tracing::Span, @@ -463,9 +455,7 @@ pub struct CommandContext { impl std::fmt::Debug for CommandContext { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("CommandContext") - .field("prev_epoch", &self.prev_epoch.value().0) - .field("curr_epoch", &self.curr_epoch.value().0) - .field("kind", &self.kind) + .field("barrier_info", &self.barrier_info) .field("command", &self.command) .finish() } @@ -474,26 +464,18 @@ impl std::fmt::Debug for CommandContext { impl CommandContext { pub(super) fn new( node_map: HashMap, + barrier_info: BarrierInfo, subscription_info: InflightSubscriptionInfo, - prev_epoch: TracedEpoch, - curr_epoch: TracedEpoch, table_ids_to_commit: HashSet, - current_paused_reason: Option, command: Command, - kind: BarrierKind, - barrier_manager_context: GlobalBarrierManagerContext, span: tracing::Span, ) -> Self { Self { node_map, subscription_info, - prev_epoch, - curr_epoch, + barrier_info, table_ids_to_commit, - current_paused_reason, command, - kind, - barrier_manager_context, _span: span, } } @@ -501,7 +483,7 @@ impl CommandContext { impl Command { /// Generate a mutation for the given command. - pub fn to_mutation(&self, current_paused_reason: Option<&PausedReason>) -> Option { + pub fn to_mutation(&self, current_paused_reason: Option) -> Option { let mutation = match self { Command::Plain(mutation) => mutation.clone(), @@ -517,7 +499,7 @@ impl Command { Command::Resume(reason) => { // Only resume when the cluster is paused with the same reason. - if current_paused_reason == Some(reason) { + if current_paused_reason == Some(*reason) { Some(Mutation::Resume(ResumeMutation {})) } else { None @@ -903,41 +885,35 @@ impl Command { ..Default::default() })) } -} - -impl CommandContext { - pub fn to_mutation(&self) -> Option { - self.command - .to_mutation(self.current_paused_reason.as_ref()) - } /// Returns the paused reason after executing the current command. - pub fn next_paused_reason(&self) -> Option { - match &self.command { + pub fn next_paused_reason( + &self, + current_paused_reason: Option, + ) -> Option { + match self { Command::Pause(reason) => { // Only pause when the cluster is not already paused. - if self.current_paused_reason.is_none() { + if current_paused_reason.is_none() { Some(*reason) } else { - self.current_paused_reason + current_paused_reason } } Command::Resume(reason) => { // Only resume when the cluster is paused with the same reason. - if self.current_paused_reason == Some(*reason) { + if current_paused_reason == Some(*reason) { None } else { - self.current_paused_reason + current_paused_reason } } - _ => self.current_paused_reason, + _ => current_paused_reason, } } -} -impl Command { /// For `CancelStreamingJob`, returns the table id of the target table. pub fn table_to_cancel(&self) -> Option { match self { @@ -948,7 +924,10 @@ impl Command { } impl CommandContext { - pub async fn wait_epoch_commit(&self) -> MetaResult<()> { + pub async fn wait_epoch_commit( + &self, + barrier_manager_context: &GlobalBarrierWorkerContext, + ) -> MetaResult<()> { let table_id = self.table_ids_to_commit.iter().next().cloned(); // try wait epoch on an existing random table id let Some(table_id) = table_id else { @@ -956,14 +935,13 @@ impl CommandContext { return Ok(()); }; let futures = self.node_map.values().map(|worker_node| async { - let client = self - .barrier_manager_context + let client = barrier_manager_context .env .stream_client_pool() .get(worker_node) .await?; let request = WaitEpochCommitRequest { - epoch: self.prev_epoch.value().0, + epoch: self.barrier_info.prev_epoch(), table_id: table_id.table_id, }; client.wait_epoch_commit(request).await @@ -976,7 +954,10 @@ impl CommandContext { /// Do some stuffs after barriers are collected and the new storage version is committed, for /// the given command. - pub async fn post_collect(&self) -> MetaResult<()> { + pub async fn post_collect( + &self, + barrier_manager_context: &GlobalBarrierWorkerContext, + ) -> MetaResult<()> { match &self.command { Command::Plain(_) => {} @@ -988,18 +969,18 @@ impl CommandContext { // storage version with this epoch is synced to all compute nodes before the // execution of the next command of `Update`, as some newly created operators // may immediately initialize their states on that barrier. - self.wait_epoch_commit().await?; + self.wait_epoch_commit(barrier_manager_context).await?; } } Command::Resume(_) => {} Command::SourceSplitAssignment(split_assignment) => { - self.barrier_manager_context + barrier_manager_context .metadata_manager .update_actor_splits_by_split_assignment(split_assignment) .await?; - self.barrier_manager_context + barrier_manager_context .source_manager .apply_source_change(None, None, Some(split_assignment.clone()), None) .await; @@ -1009,7 +990,7 @@ impl CommandContext { unregistered_state_table_ids, .. } => { - self.barrier_manager_context + barrier_manager_context .hummock_manager .unregister_table_ids(unregistered_state_table_ids.iter().cloned()) .await?; @@ -1027,7 +1008,7 @@ impl CommandContext { // It won't clean the tables on failure, // since the failure could be recoverable. // As such it needs to be handled here. - self.barrier_manager_context + barrier_manager_context .hummock_manager .unregister_table_ids(table_fragments.all_table_ids().map(TableId::new)) .await?; @@ -1040,7 +1021,7 @@ impl CommandContext { init_split_assignment, .. } = info; - self.barrier_manager_context + barrier_manager_context .metadata_manager .catalog_controller .post_collect_table_fragments( @@ -1058,7 +1039,7 @@ impl CommandContext { .. }) = job_type { - self.barrier_manager_context + barrier_manager_context .metadata_manager .catalog_controller .post_collect_table_fragments( @@ -1073,7 +1054,7 @@ impl CommandContext { // Extract the fragments that include source operators. let source_fragments = table_fragments.stream_source_fragments(); let backfill_fragments = table_fragments.source_backfill_fragments()?; - self.barrier_manager_context + barrier_manager_context .source_manager .apply_source_change( Some(source_fragments), @@ -1088,7 +1069,7 @@ impl CommandContext { table_parallelism, .. } => { - self.barrier_manager_context + barrier_manager_context .scale_controller .post_apply_reschedule(reschedules, table_parallelism) .await?; @@ -1102,7 +1083,7 @@ impl CommandContext { .. }) => { // Update actors and actor_dispatchers for new table fragments. - self.barrier_manager_context + barrier_manager_context .metadata_manager .catalog_controller .post_collect_table_fragments( @@ -1114,14 +1095,14 @@ impl CommandContext { .await?; // Apply the split changes in source manager. - self.barrier_manager_context + barrier_manager_context .source_manager .drop_source_fragments_vec(std::slice::from_ref(old_table_fragments)) .await; let source_fragments = new_table_fragments.stream_source_fragments(); // XXX: is it possible to have backfill fragments here? let backfill_fragments = new_table_fragments.source_backfill_fragments()?; - self.barrier_manager_context + barrier_manager_context .source_manager .apply_source_change( Some(source_fragments), @@ -1135,7 +1116,7 @@ impl CommandContext { Command::CreateSubscription { subscription_id, .. } => { - self.barrier_manager_context + barrier_manager_context .metadata_manager .catalog_controller .finish_create_subscription_catalog(*subscription_id) @@ -1150,10 +1131,15 @@ impl CommandContext { pub fn get_truncate_epoch(&self, retention_second: u64) -> Epoch { let Some(truncate_timestamptz) = Timestamptz::from_secs( - self.prev_epoch.value().as_timestamptz().timestamp() - retention_second as i64, + self.barrier_info + .prev_epoch + .value() + .as_timestamptz() + .timestamp() + - retention_second as i64, ) else { - warn!(retention_second, prev_epoch = ?self.prev_epoch.value(), "invalid retention second value"); - return self.prev_epoch.value(); + warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value"); + return self.barrier_info.prev_epoch.value(); }; Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64) } diff --git a/src/meta/src/barrier/creating_job/mod.rs b/src/meta/src/barrier/creating_job/mod.rs index f3ad5a44aa929..1ffdfb25fa13e 100644 --- a/src/meta/src/barrier/creating_job/mod.rs +++ b/src/meta/src/barrier/creating_job/mod.rs @@ -18,7 +18,6 @@ mod status; use std::cmp::max; use std::collections::HashMap; use std::ops::Bound::{Excluded, Unbounded}; -use std::sync::Arc; use risingwave_common::catalog::TableId; use risingwave_common::metrics::LabelGuardedIntGauge; @@ -30,7 +29,6 @@ use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_service::BarrierCompleteResponse; use tracing::info; -use crate::barrier::command::CommandContext; use crate::barrier::creating_job::barrier_control::CreatingStreamingJobBarrierControl; use crate::barrier::creating_job::status::{ CreatingJobInjectBarrierInfo, CreatingStreamingJobStatus, @@ -38,7 +36,7 @@ use crate::barrier::creating_job::status::{ use crate::barrier::info::InflightGraphInfo; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::rpc::ControlStreamManager; -use crate::barrier::{Command, CreateStreamingJobCommandInfo, SnapshotBackfillInfo}; +use crate::barrier::{BarrierInfo, Command, CreateStreamingJobCommandInfo, SnapshotBackfillInfo}; use crate::rpc::metrics::MetaMetrics; use crate::MetaResult; @@ -173,9 +171,7 @@ impl CreatingStreamingJobControl { pre_applied_graph_info: &InflightGraphInfo, applied_graph_info: Option<&InflightGraphInfo>, CreatingJobInjectBarrierInfo { - curr_epoch, - prev_epoch, - kind, + barrier_info, new_actors, mutation, }: CreatingJobInjectBarrierInfo, @@ -183,36 +179,38 @@ impl CreatingStreamingJobControl { let node_to_collect = control_stream_manager.inject_barrier( Some(table_id), mutation, - (&curr_epoch, &prev_epoch), - &kind, + &barrier_info, pre_applied_graph_info, applied_graph_info, new_actors, vec![], vec![], )?; - barrier_control.enqueue_epoch(prev_epoch.value().0, node_to_collect, kind.is_checkpoint()); + barrier_control.enqueue_epoch( + barrier_info.prev_epoch(), + node_to_collect, + barrier_info.kind.is_checkpoint(), + ); Ok(()) } pub(super) fn on_new_command( &mut self, control_stream_manager: &mut ControlStreamManager, - command_ctx: &Arc, + command: &Command, + barrier_info: &BarrierInfo, ) -> MetaResult<()> { let table_id = self.info.table_fragments.table_id(); - let start_consume_upstream = if let Command::MergeSnapshotBackfillStreamingJobs( - jobs_to_merge, - ) = &command_ctx.command - { - jobs_to_merge.contains_key(&table_id) - } else { - false - }; + let start_consume_upstream = + if let Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) = command { + jobs_to_merge.contains_key(&table_id) + } else { + false + }; if start_consume_upstream { info!( table_id = self.info.table_fragments.table_id().table_id, - prev_epoch = command_ctx.prev_epoch.value().0, + prev_epoch = barrier_info.prev_epoch(), "start consuming upstream" ); } @@ -223,7 +221,7 @@ impl CreatingStreamingJobControl { self.backfill_epoch }; self.upstream_lag.set( - command_ctx + barrier_info .prev_epoch .value() .0 @@ -231,7 +229,7 @@ impl CreatingStreamingJobControl { ); if let Some(barrier_to_inject) = self .status - .on_new_upstream_epoch(command_ctx, start_consume_upstream) + .on_new_upstream_epoch(barrier_info, start_consume_upstream) { Self::inject_barrier( self.info.table_fragments.table_id(), diff --git a/src/meta/src/barrier/creating_job/status.rs b/src/meta/src/barrier/creating_job/status.rs index f599990eff999..f5d18400a98e1 100644 --- a/src/meta/src/barrier/creating_job/status.rs +++ b/src/meta/src/barrier/creating_job/status.rs @@ -15,7 +15,6 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::mem::take; -use std::sync::Arc; use risingwave_common::hash::ActorId; use risingwave_common::util::epoch::Epoch; @@ -28,9 +27,8 @@ use risingwave_pb::stream_service::barrier_complete_response::{ }; use tracing::warn; -use crate::barrier::command::CommandContext; use crate::barrier::progress::CreateMviewProgressTracker; -use crate::barrier::{BarrierKind, TracedEpoch}; +use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch}; #[derive(Debug)] pub(super) struct CreateMviewLogStoreProgressTracker { @@ -103,7 +101,7 @@ pub(super) enum CreatingStreamingJobStatus { /// the snapshot has been fully consumed after `update_progress`. ConsumingSnapshot { prev_epoch_fake_physical_time: u64, - pending_upstream_barriers: Vec<(TracedEpoch, TracedEpoch, BarrierKind)>, + pending_upstream_barriers: Vec, version_stats: HummockVersionStats, create_mview_tracker: CreateMviewProgressTracker, snapshot_backfill_actors: HashSet, @@ -127,9 +125,7 @@ pub(super) enum CreatingStreamingJobStatus { } pub(super) struct CreatingJobInjectBarrierInfo { - pub curr_epoch: TracedEpoch, - pub prev_epoch: TracedEpoch, - pub kind: BarrierKind, + pub barrier_info: BarrierInfo, pub new_actors: Option>>, pub mutation: Option, } @@ -166,22 +162,22 @@ impl CreatingStreamingJobStatus { let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time); let barriers_to_inject: Vec<_> = [CreatingJobInjectBarrierInfo { - curr_epoch: TracedEpoch::new(Epoch(*backfill_epoch)), - prev_epoch: TracedEpoch::new(prev_epoch), - kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)), + barrier_info: BarrierInfo { + curr_epoch: TracedEpoch::new(Epoch(*backfill_epoch)), + prev_epoch: TracedEpoch::new(prev_epoch), + kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)), + }, new_actors, mutation, }] .into_iter() - .chain(pending_upstream_barriers.drain(..).map( - |(prev_epoch, curr_epoch, kind)| CreatingJobInjectBarrierInfo { - curr_epoch, - prev_epoch, - kind, + .chain(pending_upstream_barriers.drain(..).map(|barrier_info| { + CreatingJobInjectBarrierInfo { + barrier_info, new_actors: None, mutation: None, - }, - )) + } + })) .collect(); *self = CreatingStreamingJobStatus::ConsumingLogStore { @@ -208,7 +204,7 @@ impl CreatingStreamingJobStatus { pub(super) fn on_new_upstream_epoch( &mut self, - command_ctx: &Arc, + barrier_info: &BarrierInfo, start_consume_upstream: bool, ) -> Option { match self { @@ -223,28 +219,22 @@ impl CreatingStreamingJobStatus { !start_consume_upstream, "should not start consuming upstream for a job that are consuming snapshot" ); - pending_upstream_barriers.push(( - command_ctx.prev_epoch.clone(), - command_ctx.curr_epoch.clone(), - command_ctx.kind.clone(), - )); + pending_upstream_barriers.push(barrier_info.clone()); Some(CreatingStreamingJobStatus::new_fake_barrier( prev_epoch_fake_physical_time, pending_non_checkpoint_barriers, initial_barrier_info, - command_ctx.kind.is_checkpoint(), + barrier_info.kind.is_checkpoint(), )) } CreatingStreamingJobStatus::ConsumingLogStore { .. } => { - let prev_epoch = command_ctx.prev_epoch.value().0; + let prev_epoch = barrier_info.prev_epoch(); if start_consume_upstream { - assert!(command_ctx.kind.is_checkpoint()); + assert!(barrier_info.kind.is_checkpoint()); *self = CreatingStreamingJobStatus::Finishing(prev_epoch); } Some(CreatingJobInjectBarrierInfo { - curr_epoch: command_ctx.curr_epoch.clone(), - prev_epoch: command_ctx.prev_epoch.clone(), - kind: command_ctx.kind.clone(), + barrier_info: barrier_info.clone(), new_actors: None, mutation: None, }) @@ -285,9 +275,11 @@ impl CreatingStreamingJobStatus { Default::default() }; CreatingJobInjectBarrierInfo { - curr_epoch, - prev_epoch, - kind, + barrier_info: BarrierInfo { + prev_epoch, + curr_epoch, + kind, + }, new_actors, mutation, } diff --git a/src/meta/src/barrier/info.rs b/src/meta/src/barrier/info.rs index b02240f402ba0..144797d1295db 100644 --- a/src/meta/src/barrier/info.rs +++ b/src/meta/src/barrier/info.rs @@ -19,10 +19,23 @@ use risingwave_meta_model::WorkerId; use risingwave_pb::common::WorkerNode; use tracing::warn; -use crate::barrier::Command; +use crate::barrier::{BarrierKind, Command, TracedEpoch}; use crate::controller::fragment::InflightFragmentInfo; use crate::model::{ActorId, FragmentId}; +#[derive(Debug, Clone)] +pub(super) struct BarrierInfo { + pub prev_epoch: TracedEpoch, + pub curr_epoch: TracedEpoch, + pub kind: BarrierKind, +} + +impl BarrierInfo { + pub(super) fn prev_epoch(&self) -> u64 { + self.prev_epoch.value().0 + } +} + #[derive(Debug, Clone)] pub(crate) enum CommandFragmentChanges { NewFragment(InflightFragmentInfo), @@ -40,7 +53,7 @@ pub struct InflightSubscriptionInfo { } /// [`InflightGraphInfo`] resolves the actor info read from meta store for -/// [`crate::barrier::GlobalBarrierManager`]. +/// [`crate::barrier::GlobalBarrierWorker`]. #[derive(Default, Clone, Debug)] pub struct InflightGraphInfo { /// `node_id` => actors diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index efee9b715c316..f5835f5f4f4c9 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -35,7 +35,7 @@ use risingwave_hummock_sdk::table_stats::from_prost_table_stats_map; use risingwave_hummock_sdk::table_watermark::{ merge_multiple_new_table_watermarks, TableWatermarks, }; -use risingwave_hummock_sdk::{HummockSstableObjectId, LocalSstableInfo}; +use risingwave_hummock_sdk::{HummockSstableObjectId, HummockVersionId, LocalSstableInfo}; use risingwave_meta_model::WorkerId; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; @@ -43,6 +43,7 @@ use risingwave_pb::meta::{PausedReason, PbRecoveryStatus}; use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; use risingwave_pb::stream_service::BarrierCompleteResponse; use thiserror_ext::AsReport; +use tokio::sync::mpsc::unbounded_channel; use tokio::sync::oneshot::{Receiver, Sender}; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; @@ -51,11 +52,11 @@ use tracing::{debug, error, info, warn, Instrument}; use self::command::CommandContext; use self::notifier::Notifier; use crate::barrier::creating_job::{CompleteJobType, CreatingStreamingJobControl}; -use crate::barrier::info::InflightGraphInfo; +use crate::barrier::info::{BarrierInfo, InflightGraphInfo}; use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob}; use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager}; use crate::barrier::schedule::ScheduledBarriers; -use crate::barrier::state::BarrierManagerState; +use crate::barrier::state::BarrierWorkerState; use crate::error::MetaErrorInner; use crate::hummock::{CommitEpochInfo, HummockManagerRef, NewTableFragmentInfo}; use crate::manager::sink_coordination::SinkCoordinatorManager; @@ -146,16 +147,12 @@ impl From<&BarrierManagerStatus> for PbRecoveryStatus { } } -pub enum BarrierManagerRequest { +pub(crate) enum BarrierManagerRequest { GetDdlProgress(Sender>), } #[derive(Clone)] -pub struct GlobalBarrierManagerContext { - status: Arc>, - - request_tx: mpsc::UnboundedSender, - +struct GlobalBarrierWorkerContext { metadata_manager: MetadataManager, hummock_manager: HummockManagerRef, @@ -171,16 +168,51 @@ pub struct GlobalBarrierManagerContext { env: MetaSrvEnv, } -/// [`crate::barrier::GlobalBarrierManager`] sends barriers to all registered compute nodes and +impl GlobalBarrierManager { + pub async fn start( + scheduled_barriers: schedule::ScheduledBarriers, + env: MetaSrvEnv, + metadata_manager: MetadataManager, + hummock_manager: HummockManagerRef, + source_manager: SourceManagerRef, + sink_manager: SinkCoordinatorManager, + meta_metrics: Arc, + scale_controller: ScaleControllerRef, + ) -> (Arc, JoinHandle<()>, oneshot::Sender<()>) { + let (request_tx, request_rx) = unbounded_channel(); + let barrier_worker = GlobalBarrierWorker::new( + scheduled_barriers, + env, + metadata_manager, + hummock_manager, + source_manager, + sink_manager, + meta_metrics, + scale_controller, + request_rx, + ) + .await; + let manager = Self { + status: barrier_worker.status.clone(), + hummock_manager: barrier_worker.context.hummock_manager.clone(), + request_tx, + metadata_manager: barrier_worker.context.metadata_manager.clone(), + }; + let (join_handle, shutdown_tx) = barrier_worker.start(); + (Arc::new(manager), join_handle, shutdown_tx) + } +} + +/// [`crate::barrier::GlobalBarrierWorker`] sends barriers to all registered compute nodes and /// collect them, with monotonic increasing epoch numbers. On compute nodes, `LocalBarrierManager` /// in `risingwave_stream` crate will serve these requests and dispatch them to source actors. /// /// Configuration change in our system is achieved by the mutation in the barrier. Thus, -/// [`crate::barrier::GlobalBarrierManager`] provides a set of interfaces like a state machine, +/// [`crate::barrier::GlobalBarrierWorker`] provides a set of interfaces like a state machine, /// accepting [`Command`] that carries info to build `Mutation`. To keep the consistency between /// barrier manager and meta store, some actions like "drop materialized view" or "create mv on mv" /// must be done in barrier manager transactional using [`Command`]. -pub struct GlobalBarrierManager { +struct GlobalBarrierWorker { /// Enable recovery or not when failover. enable_recovery: bool, @@ -190,18 +222,19 @@ pub struct GlobalBarrierManager { /// The max barrier nums in flight in_flight_barrier_nums: usize, - context: GlobalBarrierManagerContext, + context: GlobalBarrierWorkerContext, - env: MetaSrvEnv, + status: Arc>, - state: BarrierManagerState, + env: MetaSrvEnv, checkpoint_control: CheckpointControl, - request_rx: mpsc::UnboundedReceiver, + /// Command that has been collected but is still completing. + /// The join handle of the completing future is stored. + completing_task: CompletingTask, - /// The `prev_epoch` of pending non checkpoint barriers - pending_non_checkpoint_barriers: Vec, + request_rx: mpsc::UnboundedReceiver, active_streaming_nodes: ActiveStreamingWorkerNodes, @@ -210,32 +243,35 @@ pub struct GlobalBarrierManager { /// Controls the concurrent execution of commands. struct CheckpointControl { + state: BarrierWorkerState, + /// Save the state and message of barrier in order. /// Key is the `prev_epoch`. command_ctx_queue: BTreeMap, + /// The barrier that are completing. + /// Some((`prev_epoch`, `should_pause_inject_barrier`)) + completing_barrier: Option<(u64, bool)>, creating_streaming_job_controls: HashMap, - /// Command that has been collected but is still completing. - /// The join handle of the completing future is stored. - completing_task: CompletingTask, - hummock_version_stats: HummockVersionStats, create_mview_tracker: CreateMviewProgressTracker, - context: GlobalBarrierManagerContext, + context: GlobalBarrierWorkerContext, } impl CheckpointControl { async fn new( - context: GlobalBarrierManagerContext, + context: GlobalBarrierWorkerContext, create_mview_tracker: CreateMviewProgressTracker, + state: BarrierWorkerState, ) -> Self { Self { + state, command_ctx_queue: Default::default(), + completing_barrier: None, creating_streaming_job_controls: Default::default(), - completing_task: CompletingTask::None, hummock_version_stats: context.hummock_manager.get_version_stats().await, create_mview_tracker, context, @@ -244,12 +280,9 @@ impl CheckpointControl { fn total_command_num(&self) -> usize { self.command_ctx_queue.len() - + match &self.completing_task { - CompletingTask::Completing { - command_ctx: Some(_), - .. - } => 1, - _ => 0, + + match &self.completing_barrier { + Some(_) => 1, + None => 0, } } @@ -291,7 +324,7 @@ impl CheckpointControl { /// Enqueue a barrier command fn enqueue_command( &mut self, - command_ctx: Arc, + command_ctx: CommandContext, notifiers: Vec, node_to_collect: HashSet, creating_jobs_to_wait: HashSet, @@ -300,18 +333,18 @@ impl CheckpointControl { if let Some((_, node)) = self.command_ctx_queue.last_key_value() { assert_eq!( - command_ctx.prev_epoch.value(), - node.command_ctx.curr_epoch.value() + command_ctx.barrier_info.prev_epoch.value(), + node.command_ctx.barrier_info.curr_epoch.value() ); } tracing::trace!( - prev_epoch = command_ctx.prev_epoch.value().0, + prev_epoch = command_ctx.barrier_info.prev_epoch(), ?creating_jobs_to_wait, "enqueue command" ); self.command_ctx_queue.insert( - command_ctx.prev_epoch.value().0, + command_ctx.barrier_info.prev_epoch(), EpochNode { enqueue_time: timer, state: BarrierEpochState { @@ -374,37 +407,40 @@ impl CheckpointControl { let should_pause = self .command_ctx_queue .last_key_value() - .map(|(_, x)| &x.command_ctx) - .or(match &self.completing_task { - CompletingTask::None | CompletingTask::Err(_) => None, - CompletingTask::Completing { command_ctx, .. } => command_ctx.as_ref(), - }) - .map(|command_ctx| command_ctx.command.should_pause_inject_barrier()) + .map(|(_, x)| x.command_ctx.command.should_pause_inject_barrier()) + .or(self + .completing_barrier + .map(|(_, should_pause)| should_pause)) .unwrap_or(false); debug_assert_eq!( self.command_ctx_queue .values() - .map(|node| &node.command_ctx) + .map(|node| node.command_ctx.command.should_pause_inject_barrier()) .chain( - match &self.completing_task { - CompletingTask::None | CompletingTask::Err(_) => None, - CompletingTask::Completing { command_ctx, .. } => command_ctx.as_ref(), - } - .into_iter() + self.completing_barrier + .map(|(_, should_pause)| should_pause) + .into_iter() ) - .any(|command_ctx| command_ctx.command.should_pause_inject_barrier()), + .any(|should_pause| should_pause), should_pause ); in_flight_not_full && !should_pause } +} +impl GlobalBarrierWorker { /// We need to make sure there are no changes when doing recovery pub async fn clear_on_err(&mut self, err: &MetaError) { // join spawned completing command to finish no matter it succeeds or not. let is_err = match replace(&mut self.completing_task, CompletingTask::None) { CompletingTask::None => false, - CompletingTask::Completing { join_handle, .. } => { + CompletingTask::Completing { + join_handle, + command_prev_epoch, + creating_job_epochs, + .. + } => { info!("waiting for completing command to finish in recovery"); match join_handle.await { Err(e) => { @@ -415,41 +451,66 @@ impl CheckpointControl { warn!(err = ?e.as_report(), "failed to complete barrier during clear"); true } - Ok(Ok(_)) => false, + Ok(Ok(hummock_version_stats)) => { + self.checkpoint_control + .ack_completed(BarrierCompleteOutput { + command_prev_epoch, + creating_job_epochs, + hummock_version_stats, + }); + false + } } } CompletingTask::Err(_) => true, }; if !is_err { // continue to finish the pending collected barrier. - while let Some(task) = self.next_complete_barrier_task(None) { - if let Err(e) = self.context.clone().complete_barrier(task).await { - error!( - err = ?e.as_report(), - "failed to complete barrier during recovery" - ); - break; - } else { - info!("succeed to complete barrier during recovery") + while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) { + let (command_prev_epoch, creating_job_epochs) = ( + task.command_context + .as_ref() + .map(|(command, _)| command.barrier_info.prev_epoch()), + task.creating_job_epochs.clone(), + ); + match self.context.clone().complete_barrier(task).await { + Ok(hummock_version_stats) => { + self.checkpoint_control + .ack_completed(BarrierCompleteOutput { + command_prev_epoch, + creating_job_epochs, + hummock_version_stats, + }); + } + Err(e) => { + error!( + err = ?e.as_report(), + "failed to complete barrier during recovery" + ); + break; + } } } } - for (_, node) in take(&mut self.command_ctx_queue) { + for (_, node) in take(&mut self.checkpoint_control.command_ctx_queue) { for notifier in node.notifiers { notifier.notify_failed(err.clone()); } node.enqueue_time.observe_duration(); } - self.create_mview_tracker.abort_all(); + self.checkpoint_control.create_mview_tracker.abort_all(); } +} +impl CheckpointControl { /// Return the earliest command waiting on the `worker_id`. - fn command_wait_collect_from_worker(&self, worker_id: WorkerId) -> Option<&CommandContext> { + fn barrier_wait_collect_from_worker(&self, worker_id: WorkerId) -> Option<&BarrierInfo> { for epoch_node in self.command_ctx_queue.values() { if epoch_node.state.node_to_collect.contains(&worker_id) { - return Some(&epoch_node.command_ctx); + return Some(&epoch_node.command_ctx.barrier_info); } } + // TODO: include barrier in creating jobs None } } @@ -461,8 +522,9 @@ struct EpochNode { /// Whether this barrier is in-flight or completed. state: BarrierEpochState, + /// Context of this command to generate barrier and do some post jobs. - command_ctx: Arc, + command_ctx: CommandContext, /// Notifiers of this barrier. notifiers: Vec, } @@ -488,8 +550,7 @@ impl BarrierEpochState { enum CompletingTask { None, Completing { - command_ctx: Option>, - table_ids_to_finish: HashSet, + command_prev_epoch: Option, creating_job_epochs: Vec<(TableId, u64)>, // The join handle of a spawned task that completes the barrier. @@ -501,9 +562,8 @@ enum CompletingTask { Err(MetaError), } -impl GlobalBarrierManager { - /// Create a new [`crate::barrier::GlobalBarrierManager`]. - #[allow(clippy::too_many_arguments)] +impl GlobalBarrierWorker { + /// Create a new [`crate::barrier::GlobalBarrierWorker`]. pub async fn new( scheduled_barriers: schedule::ScheduledBarriers, env: MetaSrvEnv, @@ -513,11 +573,12 @@ impl GlobalBarrierManager { sink_manager: SinkCoordinatorManager, metrics: Arc, scale_controller: ScaleControllerRef, + request_rx: mpsc::UnboundedReceiver, ) -> Self { let enable_recovery = env.opts.enable_recovery; let in_flight_barrier_nums = env.opts.in_flight_barrier_nums; - let initial_invalid_state = BarrierManagerState::new( + let initial_invalid_state = BarrierWorkerState::new( None, InflightGraphInfo::default(), InflightSubscriptionInfo::default(), @@ -528,11 +589,9 @@ impl GlobalBarrierManager { let tracker = CreateMviewProgressTracker::default(); - let (request_tx, request_rx) = mpsc::unbounded_channel(); + let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting))); - let context = GlobalBarrierManagerContext { - status: Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting))), - request_tx, + let context = GlobalBarrierWorkerContext { metadata_manager, hummock_manager, source_manager, @@ -543,31 +602,28 @@ impl GlobalBarrierManager { }; let control_stream_manager = ControlStreamManager::new(context.clone()); - let checkpoint_control = CheckpointControl::new(context.clone(), tracker).await; + let checkpoint_control = + CheckpointControl::new(context.clone(), tracker, initial_invalid_state).await; Self { enable_recovery, scheduled_barriers, in_flight_barrier_nums, context, + status, env, - state: initial_invalid_state, checkpoint_control, + completing_task: CompletingTask::None, request_rx, - pending_non_checkpoint_barriers: Vec::new(), active_streaming_nodes, control_stream_manager, } } - pub fn context(&self) -> &GlobalBarrierManagerContext { - &self.context - } - - pub fn start(barrier_manager: GlobalBarrierManager) -> (JoinHandle<()>, Sender<()>) { + pub fn start(self) -> (JoinHandle<()>, Sender<()>) { let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); let join_handle = tokio::spawn(async move { - barrier_manager.run(shutdown_rx).await; + self.run(shutdown_rx).await; }); (join_handle, shutdown_tx) @@ -630,8 +686,7 @@ impl GlobalBarrierManager { // consistency. // Even if there's no actor to recover, we still go through the recovery process to // inject the first `Initial` barrier. - self.context - .set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap)); + self.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap)); let span = tracing::info_span!("bootstrap_recovery"); crate::telemetry::report_event( risingwave_pb::telemetry::TelemetryEventStage::Recovery, @@ -648,7 +703,7 @@ impl GlobalBarrierManager { self.recovery(paused_reason, None).instrument(span).await; } - self.context.set_status(BarrierManagerStatus::Running); + self.set_status(BarrierManagerStatus::Running); let (local_notification_tx, mut local_notification_rx) = tokio::sync::mpsc::unbounded_channel(); @@ -738,11 +793,11 @@ impl GlobalBarrierManager { info!(?changed_worker, "worker changed"); - self.state.inflight_graph_info + self.checkpoint_control.state.inflight_graph_info .on_new_worker_node_map(self.active_streaming_nodes.current()); self.checkpoint_control.creating_streaming_job_controls.values().for_each(|job| job.on_new_worker_node_map(self.active_streaming_nodes.current())); if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker { - self.control_stream_manager.add_worker(node, &self.state.inflight_subscription_info).await; + self.control_stream_manager.add_worker(node, &self.checkpoint_control.state.inflight_subscription_info).await; } } @@ -762,17 +817,33 @@ impl GlobalBarrierManager { _ => {} } } - (worker_id, resp_result) = self.control_stream_manager.next_complete_barrier_response() => { + complete_result = self + .completing_task + .next_completed_barrier( + &mut self.scheduled_barriers, + &mut self.checkpoint_control, + &mut self.control_stream_manager + ) => { + match complete_result { + Ok(output) => { + self.checkpoint_control.ack_completed(output); + } + Err(e) => { + self.failure_recovery(e).await; + } + } + }, + (worker_id, resp_result) = self.control_stream_manager.next_collect_barrier_response() => { if let Err(e) = resp_result.and_then(|resp| self.checkpoint_control.barrier_collected(resp, &mut self.control_stream_manager)) { { - let failed_command = self.checkpoint_control.command_wait_collect_from_worker(worker_id as _); - if failed_command.is_some() - || self.state.inflight_graph_info.contains_worker(worker_id as _) - || self.checkpoint_control.creating_streaming_job_controls.values().any(|job| job.is_wait_on_worker(worker_id as _)) { + let failed_barrier = self.checkpoint_control.barrier_wait_collect_from_worker(worker_id as _); + if failed_barrier.is_some() + || self.checkpoint_control.state.inflight_graph_info.contains_worker(worker_id as _) + || self.checkpoint_control.creating_streaming_job_controls.values().any(|job| job.is_wait_on_worker(worker_id)) { let errors = self.control_stream_manager.collect_errors(worker_id, e).await; let err = merge_node_rpc_errors("get error from control stream", errors); - if let Some(failed_command) = failed_command { - self.context.report_collect_failure(failed_command, &err); + if let Some(failed_barrier) = failed_barrier { + self.context.report_collect_failure(failed_barrier, &err); } self.failure_recovery(err).await; } else { @@ -781,25 +852,11 @@ impl GlobalBarrierManager { } } } - complete_result = self.checkpoint_control.next_completed_barrier(&mut self.scheduled_barriers) => { - match complete_result { - Ok(output) => { - if !output.table_ids_to_finish.is_empty() { - self.control_stream_manager.remove_partial_graph( - output.table_ids_to_finish.iter().map(|table_id| table_id.table_id).collect() - ); - } - } - Err(e) => { - self.failure_recovery(e).await; - } - } - }, scheduled = self.scheduled_barriers.next_barrier(), if self .checkpoint_control .can_inject_barrier(self.in_flight_barrier_nums) => { - if let Err(e) = self.handle_new_barrier(scheduled) { + if let Err(e) = self.checkpoint_control.handle_new_barrier(scheduled, &mut self.control_stream_manager, &self.active_streaming_nodes) { self.failure_recovery(e).await; } } @@ -807,9 +864,16 @@ impl GlobalBarrierManager { self.checkpoint_control.update_barrier_nums_metrics(); } } +} +impl CheckpointControl { /// Handle the new barrier from the scheduled queue and inject it. - fn handle_new_barrier(&mut self, scheduled: Scheduled) -> MetaResult<()> { + fn handle_new_barrier( + &mut self, + scheduled: Scheduled, + control_stream_manager: &mut ControlStreamManager, + active_streaming_nodes: &ActiveStreamingWorkerNodes, + ) -> MetaResult<()> { let Scheduled { mut command, mut notifiers, @@ -820,7 +884,6 @@ impl GlobalBarrierManager { if let Some(table_to_cancel) = command.table_to_cancel() && self - .checkpoint_control .creating_streaming_job_controls .contains_key(&table_to_cancel) { @@ -836,11 +899,7 @@ impl GlobalBarrierManager { } if let Command::RescheduleFragment { .. } = &command { - if !self - .checkpoint_control - .creating_streaming_job_controls - .is_empty() - { + if !self.creating_streaming_job_controls.is_empty() { warn!("ignore reschedule when creating streaming job with snapshot backfill"); for notifier in notifiers { notifier.notify_start_failed( @@ -854,7 +913,7 @@ impl GlobalBarrierManager { } } - let Some((prev_epoch, curr_epoch)) = self.state.next_epoch_pair(&command) else { + let Some(barrier_info) = self.state.next_barrier_info(&command, checkpoint) else { // skip the command when there is nothing to do with the barrier for mut notifier in notifiers { notifier.notify_started(); @@ -882,35 +941,22 @@ impl GlobalBarrierManager { let mutation = command .to_mutation(None) .expect("should have some mutation in `CreateStreamingJob` command"); - self.checkpoint_control - .creating_streaming_job_controls - .insert( - info.table_fragments.table_id(), - CreatingStreamingJobControl::new( - info.clone(), - snapshot_backfill_info.clone(), - prev_epoch.value().0, - &self.checkpoint_control.hummock_version_stats, - &self.context.metrics, - mutation, - ), - ); + self.creating_streaming_job_controls.insert( + info.table_fragments.table_id(), + CreatingStreamingJobControl::new( + info.clone(), + snapshot_backfill_info.clone(), + barrier_info.prev_epoch(), + &self.hummock_version_stats, + &self.context.metrics, + mutation, + ), + ); } - self.pending_non_checkpoint_barriers - .push(prev_epoch.value().0); - let kind = if checkpoint { - let epochs = take(&mut self.pending_non_checkpoint_barriers); - BarrierKind::Checkpoint(epochs) - } else { - BarrierKind::Barrier - }; - - tracing::trace!(prev_epoch = prev_epoch.value().0, "inject barrier"); - // Collect the jobs to finish - if let (BarrierKind::Checkpoint(_), Command::Plain(None)) = (&kind, &command) - && let Some(jobs_to_merge) = self.checkpoint_control.jobs_to_merge() + if let (BarrierKind::Checkpoint(_), Command::Plain(None)) = (&barrier_info.kind, &command) + && let Some(jobs_to_merge) = self.jobs_to_merge() { command = Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge); } @@ -922,39 +968,25 @@ impl GlobalBarrierManager { pre_applied_subscription_info, table_ids_to_commit, jobs_to_wait, + prev_paused_reason, ) = self.state.apply_command(&command); // Tracing related stuff - prev_epoch.span().in_scope(|| { - tracing::info!(target: "rw_tracing", epoch = curr_epoch.value().0, "new barrier enqueued"); + barrier_info.prev_epoch.span().in_scope(|| { + tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch.value().0, "new barrier enqueued"); }); - span.record("epoch", curr_epoch.value().0); - - let command_ctx = Arc::new(CommandContext::new( - self.active_streaming_nodes.current().clone(), - pre_applied_subscription_info, - prev_epoch.clone(), - curr_epoch.clone(), - table_ids_to_commit, - self.state.paused_reason(), - command, - kind, - self.context.clone(), - span, - )); + span.record("epoch", barrier_info.curr_epoch.value().0); send_latency_timer.observe_duration(); - for creating_job in &mut self - .checkpoint_control - .creating_streaming_job_controls - .values_mut() - { - creating_job.on_new_command(&mut self.control_stream_manager, &command_ctx)?; + for creating_job in &mut self.creating_streaming_job_controls.values_mut() { + creating_job.on_new_command(control_stream_manager, &command, &barrier_info)?; } - let node_to_collect = match self.control_stream_manager.inject_command_ctx_barrier( - &command_ctx, + let node_to_collect = match control_stream_manager.inject_command_ctx_barrier( + &command, + &barrier_info, + prev_paused_reason, &pre_applied_graph_info, Some(&self.state.inflight_graph_info), ) { @@ -969,32 +1001,37 @@ impl GlobalBarrierManager { }; // Notify about the injection. - let curr_paused_reason = command_ctx.next_paused_reason(); - notifiers.iter_mut().for_each(|n| n.notify_started()); - // Update the paused state after the barrier is injected. - self.state.set_paused_reason(curr_paused_reason); - // Record the in-flight barrier. - self.checkpoint_control.enqueue_command( - command_ctx, - notifiers, - node_to_collect, - jobs_to_wait, + let command_ctx = CommandContext::new( + active_streaming_nodes.current().clone(), + barrier_info, + pre_applied_subscription_info, + table_ids_to_commit.clone(), + command, + span, ); + // Record the in-flight barrier. + self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait); + Ok(()) } +} + +impl GlobalBarrierWorker { + /// Set barrier manager status. + fn set_status(&self, new_status: BarrierManagerStatus) { + self.status.store(Arc::new(new_status)); + } async fn failure_recovery(&mut self, err: MetaError) { - self.checkpoint_control.clear_on_err(&err).await; - self.pending_non_checkpoint_barriers.clear(); + self.clear_on_err(&err).await; if self.enable_recovery { - self.context - .set_status(BarrierManagerStatus::Recovering(RecoveryReason::Failover( - err.clone(), - ))); + self.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Failover( + err.clone(), + ))); let span = tracing::info_span!( "failure_recovery", error = %err.as_report(), @@ -1012,7 +1049,7 @@ impl GlobalBarrierManager { // No need to clean dirty tables for barrier recovery, // The foreground stream job should cleanup their own tables. self.recovery(None, Some(err)).instrument(span).await; - self.context.set_status(BarrierManagerStatus::Running); + self.set_status(BarrierManagerStatus::Running); } else { panic!("failed to execute barrier: {}", err.as_report()); } @@ -1020,10 +1057,9 @@ impl GlobalBarrierManager { async fn adhoc_recovery(&mut self) { let err = MetaErrorInner::AdhocRecovery.into(); - self.checkpoint_control.clear_on_err(&err).await; + self.clear_on_err(&err).await; - self.context - .set_status(BarrierManagerStatus::Recovering(RecoveryReason::Adhoc)); + self.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Adhoc)); let span = tracing::info_span!( "adhoc_recovery", error = %err.as_report(), @@ -1041,22 +1077,21 @@ impl GlobalBarrierManager { // No need to clean dirty tables for barrier recovery, // The foreground stream job should cleanup their own tables. self.recovery(None, Some(err)).instrument(span).await; - self.context.set_status(BarrierManagerStatus::Running); + self.set_status(BarrierManagerStatus::Running); } } #[derive(Default)] -struct CompleteBarrierTask { +pub struct CompleteBarrierTask { commit_info: CommitEpochInfo, finished_jobs: Vec, notifiers: Vec, /// Some((`command_ctx`, `enqueue_time`)) - command_context: Option<(Arc, HistogramTimer)>, - table_ids_to_finish: HashSet, + command_context: Option<(CommandContext, HistogramTimer)>, creating_job_epochs: Vec<(TableId, u64)>, } -impl GlobalBarrierManagerContext { +impl GlobalBarrierWorkerContext { fn collect_creating_job_commit_epoch_info( commit_info: &mut CommitEpochInfo, epoch: u64, @@ -1093,7 +1128,7 @@ impl GlobalBarrierManagerContext { let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); self.hummock_manager.commit_epoch(task.commit_info).await?; if let Some((command_ctx, _)) = &task.command_context { - command_ctx.post_collect().await?; + command_ctx.post_collect(&self).await?; } wait_commit_timer.observe_duration(); @@ -1120,16 +1155,12 @@ impl GlobalBarrierManagerContext { self.report_complete_event(duration_sec, &command_ctx); self.metrics .last_committed_barrier_time - .set(command_ctx.curr_epoch.value().as_unix_secs() as i64); + .set(command_ctx.barrier_info.curr_epoch.value().as_unix_secs() as i64); } } Ok(self.hummock_manager.get_version_stats().await) } - - pub fn hummock_manager(&self) -> &HummockManagerRef { - &self.hummock_manager - } } impl CreateMviewProgressTracker { @@ -1173,16 +1204,16 @@ impl CreateMviewProgressTracker { } } -impl GlobalBarrierManagerContext { +impl GlobalBarrierWorkerContext { fn report_complete_event(&self, duration_sec: f64, command_ctx: &CommandContext) { // Record barrier latency in event log. use risingwave_pb::meta::event_log; let event = event_log::EventBarrierComplete { - prev_epoch: command_ctx.prev_epoch.value().0, - cur_epoch: command_ctx.curr_epoch.value().0, + prev_epoch: command_ctx.barrier_info.prev_epoch(), + cur_epoch: command_ctx.barrier_info.curr_epoch.value().0, duration_sec, command: command_ctx.command.to_string(), - barrier_kind: command_ctx.kind.as_str_name().to_string(), + barrier_kind: command_ctx.barrier_info.kind.as_str_name().to_string(), }; self.env .event_log_manager_ref() @@ -1191,7 +1222,9 @@ impl GlobalBarrierManagerContext { } struct BarrierCompleteOutput { - table_ids_to_finish: HashSet, + command_prev_epoch: Option, + creating_job_epochs: Vec<(TableId, u64)>, + hummock_version_stats: HummockVersionStats, } impl CheckpointControl { @@ -1222,7 +1255,7 @@ impl CheckpointControl { fn next_complete_barrier_task( &mut self, - mut scheduled_barriers: Option<&mut ScheduledBarriers>, + mut context: Option<(&mut ScheduledBarriers, &mut ControlStreamManager)>, ) -> Option { // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough let mut creating_jobs_task = vec![]; @@ -1248,6 +1281,16 @@ impl CheckpointControl { creating_jobs_task.push((*table_id, epoch, resps, is_first_time)); } } + if !finished_jobs.is_empty() + && let Some((_, control_stream_manager)) = &mut context + { + control_stream_manager.remove_partial_graph( + finished_jobs + .iter() + .map(|(table_id, _, _)| table_id.table_id) + .collect(), + ); + } for (table_id, epoch, resps) in finished_jobs { let epoch_state = &mut self .command_ctx_queue @@ -1270,6 +1313,7 @@ impl CheckpointControl { } } let mut task = None; + assert!(self.completing_barrier.is_none()); while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value() && !state.is_inflight() { @@ -1280,46 +1324,46 @@ impl CheckpointControl { let mut finished_jobs = self .create_mview_tracker .apply_collected_command(&node, &self.hummock_version_stats); - if !node.command_ctx.kind.is_checkpoint() { + if !node.command_ctx.barrier_info.kind.is_checkpoint() { assert!(finished_jobs.is_empty()); node.notifiers.into_iter().for_each(|notifier| { notifier.notify_collected(); }); - if let Some(scheduled_barriers) = &mut scheduled_barriers + if let Some((scheduled_barriers, _)) = &mut context && self.create_mview_tracker.has_pending_finished_jobs() && self .command_ctx_queue .values() - .all(|node| !node.command_ctx.kind.is_checkpoint()) + .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint()) { scheduled_barriers.force_checkpoint_in_next_barrier(); } continue; } - let table_ids_to_finish = node - .state + node.state .finished_jobs .drain() - .map(|(table_id, (info, resps))| { + .for_each(|(_, (info, resps))| { node.state.resps.extend(resps); finished_jobs.push(TrackingJob::New(TrackingCommand { info, replace_table_info: None, })); - table_id - }) - .collect(); + }); let commit_info = collect_commit_epoch_info( take(&mut node.state.resps), &node.command_ctx, self.collect_backfill_pinned_upstream_log_epoch(), ); + self.completing_barrier = Some(( + node.command_ctx.barrier_info.prev_epoch(), + node.command_ctx.command.should_pause_inject_barrier(), + )); task = Some(CompleteBarrierTask { commit_info, finished_jobs, notifiers: node.notifiers, command_context: Some((node.command_ctx, node.enqueue_time)), - table_ids_to_finish, creating_job_epochs: vec![], }); break; @@ -1328,7 +1372,7 @@ impl CheckpointControl { if !creating_jobs_task.is_empty() { let task = task.get_or_insert_default(); for (table_id, epoch, resps, is_first_time) in creating_jobs_task { - GlobalBarrierManagerContext::collect_creating_job_commit_epoch_info( + GlobalBarrierWorkerContext::collect_creating_job_commit_epoch_info( &mut task.commit_info, epoch, resps, @@ -1344,27 +1388,32 @@ impl CheckpointControl { } task } +} +impl CompletingTask { pub(super) fn next_completed_barrier<'a>( &'a mut self, scheduled_barriers: &mut ScheduledBarriers, + checkpoint_control: &mut CheckpointControl, + control_stream_manager: &mut ControlStreamManager, ) -> impl Future> + 'a { // If there is no completing barrier, try to start completing the earliest barrier if // it has been collected. - if let CompletingTask::None = &self.completing_task { - if let Some(task) = self.next_complete_barrier_task(Some(scheduled_barriers)) { + if let CompletingTask::None = self { + if let Some(task) = checkpoint_control + .next_complete_barrier_task(Some((scheduled_barriers, control_stream_manager))) + { { - let command_ctx = task + let creating_job_epochs = task.creating_job_epochs.clone(); + let command_prev_epoch = task .command_context .as_ref() - .map(|(command_ctx, _)| command_ctx.clone()); - let table_ids_to_finish = task.table_ids_to_finish.clone(); - let creating_job_epochs = task.creating_job_epochs.clone(); - let join_handle = tokio::spawn(self.context.clone().complete_barrier(task)); - self.completing_task = CompletingTask::Completing { - command_ctx, + .map(|(command, _)| command.barrier_info.prev_epoch()); + let join_handle = + tokio::spawn(checkpoint_control.context.clone().complete_barrier(task)); + *self = CompletingTask::Completing { + command_prev_epoch, join_handle, - table_ids_to_finish, creating_job_epochs, }; } @@ -1375,11 +1424,11 @@ impl CheckpointControl { } async fn next_completed_barrier_inner(&mut self) -> MetaResult { - let CompletingTask::Completing { join_handle, .. } = &mut self.completing_task else { + let CompletingTask::Completing { join_handle, .. } = self else { return pending().await; }; - let (table_ids_to_finish, creating_job_epochs) = { + { { let join_result: MetaResult<_> = try { join_handle @@ -1393,34 +1442,46 @@ impl CheckpointControl { } else { CompletingTask::None }; - let completed_command = - replace(&mut self.completing_task, next_completing_command_status); - self.hummock_version_stats = join_result?; + let completed_command = replace(self, next_completing_command_status); + let hummock_version_stats = join_result?; must_match!(completed_command, CompletingTask::Completing { - table_ids_to_finish, creating_job_epochs, + command_prev_epoch, .. - } => (table_ids_to_finish, creating_job_epochs)) + } => { + Ok(BarrierCompleteOutput { + command_prev_epoch, + creating_job_epochs, + hummock_version_stats, + }) + }) } - }; + } + } +} +impl CheckpointControl { + fn ack_completed(&mut self, output: BarrierCompleteOutput) { { - for (table_id, epoch) in creating_job_epochs { + self.hummock_version_stats = output.hummock_version_stats; + assert_eq!( + self.completing_barrier + .take() + .map(|(prev_epoch, _)| prev_epoch), + output.command_prev_epoch + ); + for (table_id, epoch) in output.creating_job_epochs { self.creating_streaming_job_controls .get_mut(&table_id) .expect("should exist") .ack_completed(epoch) } - - Ok(BarrierCompleteOutput { - table_ids_to_finish, - }) } } } -impl GlobalBarrierManagerContext { +impl GlobalBarrierManager { /// Check the status of barrier manager, return error if it is not `Running`. pub fn check_status_running(&self) -> MetaResult<()> { let status = self.status.load(); @@ -1442,12 +1503,9 @@ impl GlobalBarrierManagerContext { pub fn get_recovery_status(&self) -> PbRecoveryStatus { (&**self.status.load()).into() } +} - /// Set barrier manager status. - fn set_status(&self, new_status: BarrierManagerStatus) { - self.status.store(Arc::new(new_status)); - } - +impl GlobalBarrierWorkerContext { /// Resolve actor information from cluster, fragment manager and `ChangedTableId`. /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor /// will create or drop before this barrier flow through them. @@ -1466,7 +1524,18 @@ impl GlobalBarrierManagerContext { .collect(), )) } +} + +pub struct GlobalBarrierManager { + status: Arc>, + hummock_manager: HummockManagerRef, + request_tx: mpsc::UnboundedSender, + metadata_manager: MetadataManager, +} +pub type BarrierManagerRef = Arc; + +impl GlobalBarrierManager { /// Serving `SHOW JOBS / SELECT * FROM rw_ddl_progress` pub async fn get_ddl_progress(&self) -> MetaResult> { let mut ddl_progress = { @@ -1496,9 +1565,11 @@ impl GlobalBarrierManagerContext { Ok(ddl_progress.into_values().collect()) } -} -pub type BarrierManagerRef = GlobalBarrierManagerContext; + pub async fn get_hummock_version_id(&self) -> HummockVersionId { + self.hummock_manager.get_version_id().await + } +} #[expect(clippy::type_complexity)] fn collect_resp_info( @@ -1607,11 +1678,11 @@ fn collect_commit_epoch_info( let table_new_change_log = build_table_change_log_delta( old_value_ssts.into_iter(), synced_ssts.iter().map(|sst| &sst.sst_info), - must_match!(&command_ctx.kind, BarrierKind::Checkpoint(epochs) => epochs), + must_match!(&command_ctx.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs), mv_log_store_truncate_epoch.into_iter(), ); - let epoch = command_ctx.prev_epoch.value().0; + let epoch = command_ctx.barrier_info.prev_epoch(); let tables_to_commit = command_ctx .table_ids_to_commit .iter() diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index f10c0e29b60f7..592f1920b68b9 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -417,7 +417,7 @@ impl CreateMviewProgressTracker { // for checkpoint, we should also clear it. self.cancel_command(table_id); } - if command_ctx.kind.is_checkpoint() { + if command_ctx.barrier_info.kind.is_checkpoint() { self.take_finished_jobs() } else { vec![] diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index d19f6ed1a8c34..ffd3c74ab6a11 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -32,19 +32,19 @@ use tokio::time::Instant; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tracing::{debug, info, warn, Instrument}; -use super::{CheckpointControl, TracedEpoch}; -use crate::barrier::info::{InflightGraphInfo, InflightSubscriptionInfo}; +use super::{CheckpointControl, GlobalBarrierWorker, GlobalBarrierWorkerContext, TracedEpoch}; +use crate::barrier::info::{BarrierInfo, InflightGraphInfo, InflightSubscriptionInfo}; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::rpc::ControlStreamManager; use crate::barrier::schedule::ScheduledBarriers; -use crate::barrier::state::BarrierManagerState; -use crate::barrier::{BarrierKind, GlobalBarrierManager, GlobalBarrierManagerContext}; +use crate::barrier::state::BarrierWorkerState; +use crate::barrier::BarrierKind; use crate::manager::ActiveStreamingWorkerNodes; use crate::model::{TableFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy}; use crate::{model, MetaError, MetaResult}; -impl GlobalBarrierManager { +impl GlobalBarrierWorker { // Migration timeout. const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration = Duration::from_secs(300); // Retry base interval in milliseconds. @@ -61,7 +61,7 @@ impl GlobalBarrierManager { } } -impl GlobalBarrierManagerContext { +impl GlobalBarrierWorkerContext { /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted. async fn clean_dirty_streaming_jobs(&self) -> MetaResult<()> { self.metadata_manager @@ -125,7 +125,7 @@ impl GlobalBarrierManagerContext { } } -impl GlobalBarrierManager { +impl GlobalBarrierWorker { /// Recovery the whole cluster from the latest epoch. /// /// If `paused_reason` is `Some`, all data sources (including connectors and DMLs) will be @@ -313,15 +313,19 @@ impl GlobalBarrierManager { subscriptions_to_add: Default::default(), }); - let new_epoch = if let Some(prev_epoch) = &prev_epoch { + let new_epoch = if let Some(prev_epoch) = prev_epoch { // Use a different `curr_epoch` for each recovery attempt. - let new_epoch = prev_epoch.next(); + let curr_epoch = prev_epoch.next(); + let barrier_info = BarrierInfo { + prev_epoch, + curr_epoch, + kind: BarrierKind::Initial, + }; let mut node_to_collect = control_stream_manager.inject_barrier( None, Some(mutation), - (&new_epoch, prev_epoch), - &BarrierKind::Initial, + &barrier_info, &info, Some(&info), Some(node_actors), @@ -331,21 +335,21 @@ impl GlobalBarrierManager { debug!(?node_to_collect, "inject initial barrier"); while !node_to_collect.is_empty() { let (worker_id, result) = control_stream_manager - .next_complete_barrier_response() + .next_collect_barrier_response() .await; let resp = result?; - assert_eq!(resp.epoch, prev_epoch.value().0); + assert_eq!(resp.epoch, barrier_info.prev_epoch()); assert!(node_to_collect.remove(&worker_id)); } debug!("collected initial barrier"); - Some(new_epoch) + Some(barrier_info.curr_epoch) } else { assert!(info.is_empty()); None }; ( - BarrierManagerState::new(new_epoch, info, subscription_info, paused_reason), + BarrierWorkerState::new(new_epoch, info, subscription_info, paused_reason), active_streaming_nodes, control_stream_manager, tracker, @@ -366,29 +370,30 @@ impl GlobalBarrierManager { let create_mview_tracker: CreateMviewProgressTracker; + let state: BarrierWorkerState; ( - self.state, + state, self.active_streaming_nodes, self.control_stream_manager, create_mview_tracker, ) = new_state; - self.checkpoint_control = - CheckpointControl::new(self.context.clone(), create_mview_tracker).await; - tracing::info!( - epoch = self.state.in_flight_prev_epoch().map(|epoch| epoch.value().0), - paused = ?self.state.paused_reason(), + epoch = state.in_flight_prev_epoch().map(|epoch| epoch.value().0), + paused = ?state.paused_reason(), "recovery success" ); + self.checkpoint_control = + CheckpointControl::new(self.context.clone(), create_mview_tracker, state).await; + self.env .notification_manager() .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {})); } } -impl GlobalBarrierManagerContext { +impl GlobalBarrierWorkerContext { /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated. async fn migrate_actors( &self, @@ -447,7 +452,7 @@ impl GlobalBarrierManagerContext { let mut available_size = new_worker_slots.len(); if available_size < to_migration_size - && start.elapsed() > GlobalBarrierManager::RECOVERY_FORCE_MIGRATION_TIMEOUT + && start.elapsed() > GlobalBarrierWorker::RECOVERY_FORCE_MIGRATION_TIMEOUT { let mut factor = 2; @@ -507,7 +512,7 @@ impl GlobalBarrierManagerContext { let changed = active_nodes .wait_changed( Duration::from_millis(5000), - GlobalBarrierManager::RECOVERY_FORCE_MIGRATION_TIMEOUT, + GlobalBarrierWorker::RECOVERY_FORCE_MIGRATION_TIMEOUT, |active_nodes| { let current_nodes = active_nodes .current() @@ -751,7 +756,7 @@ mod tests { // total 10, assigned custom, actual 5, default full -> fixed(5) assert_eq!( TableParallelism::Fixed(5), - GlobalBarrierManagerContext::derive_target_parallelism( + GlobalBarrierWorkerContext::derive_target_parallelism( 10, TableParallelism::Custom, Some(5), @@ -762,7 +767,7 @@ mod tests { // total 10, assigned custom, actual 10, default full -> adaptive assert_eq!( TableParallelism::Adaptive, - GlobalBarrierManagerContext::derive_target_parallelism( + GlobalBarrierWorkerContext::derive_target_parallelism( 10, TableParallelism::Custom, Some(10), @@ -773,7 +778,7 @@ mod tests { // total 10, assigned custom, actual 11, default full -> adaptive assert_eq!( TableParallelism::Adaptive, - GlobalBarrierManagerContext::derive_target_parallelism( + GlobalBarrierWorkerContext::derive_target_parallelism( 10, TableParallelism::Custom, Some(11), @@ -784,7 +789,7 @@ mod tests { // total 10, assigned fixed(5), actual _, default full -> fixed(5) assert_eq!( TableParallelism::Adaptive, - GlobalBarrierManagerContext::derive_target_parallelism( + GlobalBarrierWorkerContext::derive_target_parallelism( 10, TableParallelism::Custom, None, @@ -795,7 +800,7 @@ mod tests { // total 10, assigned adaptive, actual _, default full -> adaptive assert_eq!( TableParallelism::Adaptive, - GlobalBarrierManagerContext::derive_target_parallelism( + GlobalBarrierWorkerContext::derive_target_parallelism( 10, TableParallelism::Adaptive, None, @@ -806,7 +811,7 @@ mod tests { // total 10, assigned adaptive, actual 5, default 5 -> fixed(5) assert_eq!( TableParallelism::Fixed(5), - GlobalBarrierManagerContext::derive_target_parallelism( + GlobalBarrierWorkerContext::derive_target_parallelism( 10, TableParallelism::Adaptive, Some(5), @@ -817,7 +822,7 @@ mod tests { // total 10, assigned adaptive, actual 6, default 5 -> adaptive assert_eq!( TableParallelism::Adaptive, - GlobalBarrierManagerContext::derive_target_parallelism( + GlobalBarrierWorkerContext::derive_target_parallelism( 10, TableParallelism::Adaptive, Some(6), diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 6e608489177be..62c2615eb37c4 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -28,6 +28,7 @@ use risingwave_common::util::tracing::TracingContext; use risingwave_hummock_sdk::HummockVersionId; use risingwave_meta_model::WorkerId; use risingwave_pb::common::{ActorInfo, WorkerNode}; +use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::{Barrier, BarrierMutation, StreamActor, SubscriptionUpstreamInfo}; use risingwave_pb::stream_service::streaming_control_stream_request::RemovePartialGraphRequest; @@ -43,9 +44,8 @@ use tokio_retry::strategy::ExponentialBackoff; use tracing::{error, info, warn}; use uuid::Uuid; -use super::command::CommandContext; -use super::{BarrierKind, GlobalBarrierManagerContext, InflightSubscriptionInfo, TracedEpoch}; -use crate::barrier::info::InflightGraphInfo; +use super::{Command, GlobalBarrierWorkerContext, InflightSubscriptionInfo}; +use crate::barrier::info::{BarrierInfo, InflightGraphInfo}; use crate::{MetaError, MetaResult}; const COLLECT_ERROR_TIMEOUT: Duration = Duration::from_secs(3); @@ -56,12 +56,12 @@ struct ControlStreamNode { } pub(super) struct ControlStreamManager { - context: GlobalBarrierManagerContext, + context: GlobalBarrierWorkerContext, nodes: HashMap, } impl ControlStreamManager { - pub(super) fn new(context: GlobalBarrierManagerContext) -> Self { + pub(super) fn new(context: GlobalBarrierWorkerContext) -> Self { Self { context, nodes: Default::default(), @@ -203,7 +203,7 @@ impl ControlStreamManager { Some((worker_id, result)) } - pub(super) async fn next_complete_barrier_response( + pub(super) async fn next_collect_barrier_response( &mut self, ) -> (WorkerId, MetaResult) { use streaming_control_stream_response::Response; @@ -248,11 +248,13 @@ impl ControlStreamManager { impl ControlStreamManager { pub(super) fn inject_command_ctx_barrier( &mut self, - command_ctx: &CommandContext, + command: &Command, + barrier_info: &BarrierInfo, + prev_paused_reason: Option, pre_applied_graph_info: &InflightGraphInfo, applied_graph_info: Option<&InflightGraphInfo>, ) -> MetaResult> { - let mutation = command_ctx.to_mutation(); + let mutation = command.to_mutation(prev_paused_reason); let subscriptions_to_add = if let Some(Mutation::Add(add)) = &mutation { add.subscriptions_to_add.clone() } else { @@ -266,11 +268,10 @@ impl ControlStreamManager { self.inject_barrier( None, mutation, - (&command_ctx.curr_epoch, &command_ctx.prev_epoch), - &command_ctx.kind, + barrier_info, pre_applied_graph_info, applied_graph_info, - command_ctx.command.actors_to_create(), + command.actors_to_create(), subscriptions_to_add, subscriptions_to_remove, ) @@ -280,8 +281,7 @@ impl ControlStreamManager { &mut self, creating_table_id: Option, mutation: Option, - (curr_epoch, prev_epoch): (&TracedEpoch, &TracedEpoch), - kind: &BarrierKind, + barrier_info: &BarrierInfo, pre_applied_graph_info: &InflightGraphInfo, applied_graph_info: Option<&InflightGraphInfo>, mut new_actors: Option>>, @@ -351,12 +351,13 @@ impl ControlStreamManager { let mutation = mutation.clone(); let barrier = Barrier { epoch: Some(risingwave_pb::data::Epoch { - curr: curr_epoch.value().0, - prev: prev_epoch.value().0, + curr: barrier_info.curr_epoch.value().0, + prev: barrier_info.prev_epoch(), }), mutation: mutation.clone().map(|_| BarrierMutation { mutation }), - tracing_context: TracingContext::from_span(curr_epoch.span()).to_protobuf(), - kind: kind.to_protobuf() as i32, + tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span()) + .to_protobuf(), + kind: barrier_info.kind.to_protobuf() as i32, passed_actors: vec![], }; @@ -401,8 +402,8 @@ impl ControlStreamManager { // Record failure in event log. use risingwave_pb::meta::event_log; let event = event_log::EventInjectBarrierFail { - prev_epoch: prev_epoch.value().0, - cur_epoch: curr_epoch.value().0, + prev_epoch: barrier_info.prev_epoch(), + cur_epoch: barrier_info.curr_epoch.value().0, error: e.to_report_string(), }; self.context @@ -434,7 +435,7 @@ impl ControlStreamManager { } } -impl GlobalBarrierManagerContext { +impl GlobalBarrierWorkerContext { async fn new_control_stream_node( &self, node: WorkerNode, @@ -455,16 +456,12 @@ impl GlobalBarrierManagerContext { } /// Send barrier-complete-rpc and wait for responses from all CNs - pub(super) fn report_collect_failure( - &self, - command_context: &CommandContext, - error: &MetaError, - ) { + pub(super) fn report_collect_failure(&self, barrier_info: &BarrierInfo, error: &MetaError) { // Record failure in event log. use risingwave_pb::meta::event_log; let event = event_log::EventCollectBarrierFail { - prev_epoch: command_context.prev_epoch.value().0, - cur_epoch: command_context.curr_epoch.value().0, + prev_epoch: barrier_info.prev_epoch(), + cur_epoch: barrier_info.curr_epoch.value().0, error: error.to_report_string(), }; self.env diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index c39fdb56e4fb7..fa1044bfb5fe8 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -330,7 +330,7 @@ impl BarrierScheduler { } /// The receiver side of the barrier scheduling queue. -/// Held by the [`super::GlobalBarrierManager`] to execute these commands. +/// Held by the [`super::GlobalBarrierWorker`] to execute these commands. pub struct ScheduledBarriers { min_interval: Option, diff --git a/src/meta/src/barrier/state.rs b/src/meta/src/barrier/state.rs index d9fe6f13c963c..9b59136750a2e 100644 --- a/src/meta/src/barrier/state.rs +++ b/src/meta/src/barrier/state.rs @@ -13,22 +13,26 @@ // limitations under the License. use std::collections::HashSet; +use std::mem::take; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::Epoch; use risingwave_pb::meta::PausedReason; -use crate::barrier::info::{InflightGraphInfo, InflightSubscriptionInfo}; -use crate::barrier::{Command, CreateStreamingJobType, TracedEpoch}; +use crate::barrier::info::{BarrierInfo, InflightGraphInfo, InflightSubscriptionInfo}; +use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch}; -/// `BarrierManagerState` defines the necessary state of `GlobalBarrierManager`. -pub struct BarrierManagerState { +/// The latest state of `GlobalBarrierWorker` after injecting the latest barrier. +pub(super) struct BarrierWorkerState { /// The last sent `prev_epoch` /// /// There's no need to persist this field. On recovery, we will restore this from the latest /// committed snapshot in `HummockManager`. in_flight_prev_epoch: Option, + /// The `prev_epoch` of pending non checkpoint barriers + pending_non_checkpoint_barriers: Vec, + /// Inflight running actors info. pub(crate) inflight_graph_info: InflightGraphInfo, @@ -38,7 +42,7 @@ pub struct BarrierManagerState { paused_reason: Option, } -impl BarrierManagerState { +impl BarrierWorkerState { pub fn new( in_flight_prev_epoch: Option, inflight_graph_info: InflightGraphInfo, @@ -47,6 +51,7 @@ impl BarrierManagerState { ) -> Self { Self { in_flight_prev_epoch, + pending_non_checkpoint_barriers: vec![], inflight_graph_info, inflight_subscription_info, paused_reason, @@ -57,7 +62,7 @@ impl BarrierManagerState { self.paused_reason } - pub fn set_paused_reason(&mut self, paused_reason: Option) { + fn set_paused_reason(&mut self, paused_reason: Option) { if self.paused_reason != paused_reason { tracing::info!(current = ?self.paused_reason, new = ?paused_reason, "update paused state"); self.paused_reason = paused_reason; @@ -68,8 +73,12 @@ impl BarrierManagerState { self.in_flight_prev_epoch.as_ref() } - /// Returns the epoch pair for the next barrier, and updates the state. - pub fn next_epoch_pair(&mut self, command: &Command) -> Option<(TracedEpoch, TracedEpoch)> { + /// Returns the `BarrierInfo` for the next barrier, and updates the state. + pub fn next_barrier_info( + &mut self, + command: &Command, + is_checkpoint: bool, + ) -> Option { if self.inflight_graph_info.is_empty() && !matches!(&command, Command::CreateStreamingJob { .. }) { @@ -79,15 +88,27 @@ impl BarrierManagerState { .in_flight_prev_epoch .get_or_insert_with(|| TracedEpoch::new(Epoch::now())); let prev_epoch = in_flight_prev_epoch.clone(); - let next_epoch = prev_epoch.next(); - *in_flight_prev_epoch = next_epoch.clone(); - Some((prev_epoch, next_epoch)) + let curr_epoch = prev_epoch.next(); + *in_flight_prev_epoch = curr_epoch.clone(); + self.pending_non_checkpoint_barriers + .push(prev_epoch.value().0); + let kind = if is_checkpoint { + let epochs = take(&mut self.pending_non_checkpoint_barriers); + BarrierKind::Checkpoint(epochs) + } else { + BarrierKind::Barrier + }; + Some(BarrierInfo { + prev_epoch, + curr_epoch, + kind, + }) } /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors /// will be removed from the state after the info get resolved. /// - /// Return (`graph_info`, `subscription_info`, `table_ids_to_commit`, `jobs_to_wait`) + /// Return (`graph_info`, `subscription_info`, `table_ids_to_commit`, `jobs_to_wait`, `prev_paused_reason`) pub fn apply_command( &mut self, command: &Command, @@ -96,6 +117,7 @@ impl BarrierManagerState { InflightSubscriptionInfo, HashSet, HashSet, + Option, ) { // update the fragment_infos outside pre_apply let fragment_changes = if let Command::CreateStreamingJob { @@ -131,6 +153,16 @@ impl BarrierManagerState { self.inflight_subscription_info.post_apply(command); - (info, subscription_info, table_ids_to_commit, jobs_to_wait) + let prev_paused_reason = self.paused_reason; + let curr_paused_reason = command.next_paused_reason(prev_paused_reason); + self.set_paused_reason(curr_paused_reason); + + ( + info, + subscription_info, + table_ids_to_commit, + jobs_to_wait, + prev_paused_reason, + ) } } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index d2c46b284bddf..c2629cc3718f6 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -355,12 +355,7 @@ impl DdlController { let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??; Ok(Some(WaitVersion { catalog_version: notification_version, - hummock_version_id: self - .barrier_manager - .hummock_manager() - .get_version_id() - .await - .to_u64(), + hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(), })) }