Skip to content

Commit

Permalink
refactor(meta): reorganize global barrier manager field (#18920)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Oct 28, 2024
1 parent f0988c4 commit 6263ea6
Show file tree
Hide file tree
Showing 12 changed files with 545 additions and 456 deletions.
16 changes: 8 additions & 8 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use risingwave_common::telemetry::manager::TelemetryManager;
use risingwave_common::telemetry::{report_scarf_enabled, report_to_scarf, telemetry_env_enabled};
use risingwave_common::util::tokio_util::sync::CancellationToken;
use risingwave_common_service::{MetricsManager, TracingExtractLayer};
use risingwave_meta::barrier::GlobalBarrierManager;
use risingwave_meta::controller::catalog::CatalogController;
use risingwave_meta::controller::cluster::ClusterController;
use risingwave_meta::controller::IN_MEMORY_STORE;
Expand Down Expand Up @@ -80,7 +81,7 @@ use thiserror_ext::AsReport;
use tokio::sync::watch;

use crate::backup_restore::BackupManager;
use crate::barrier::{BarrierScheduler, GlobalBarrierManager};
use crate::barrier::BarrierScheduler;
use crate::controller::system_param::SystemParamsController;
use crate::controller::SqlMetaStore;
use crate::hummock::HummockManager;
Expand Down Expand Up @@ -490,7 +491,7 @@ pub async fn start_service_as_election_leader(
env.clone(),
));

let barrier_manager = GlobalBarrierManager::new(
let (barrier_manager, join_handle, shutdown_rx) = GlobalBarrierManager::start(
scheduled_barriers,
env.clone(),
metadata_manager.clone(),
Expand All @@ -501,6 +502,7 @@ pub async fn start_service_as_election_leader(
scale_controller.clone(),
)
.await;
sub_tasks.push((join_handle, shutdown_rx));

{
let source_manager = source_manager.clone();
Expand Down Expand Up @@ -546,7 +548,7 @@ pub async fn start_service_as_election_leader(
metadata_manager.clone(),
stream_manager.clone(),
source_manager.clone(),
barrier_manager.context().clone(),
barrier_manager.clone(),
sink_manager.clone(),
meta_metrics.clone(),
)
Expand All @@ -558,12 +560,11 @@ pub async fn start_service_as_election_leader(
metadata_manager.clone(),
source_manager,
stream_manager.clone(),
barrier_manager.context().clone(),
barrier_manager.clone(),
scale_controller.clone(),
);

let cluster_srv =
ClusterServiceImpl::new(metadata_manager.clone(), barrier_manager.context().clone());
let cluster_srv = ClusterServiceImpl::new(metadata_manager.clone(), barrier_manager.clone());
let stream_srv = StreamServiceImpl::new(
env.clone(),
barrier_scheduler.clone(),
Expand Down Expand Up @@ -630,12 +631,11 @@ pub async fn start_service_as_election_leader(
.await,
);

if cfg!(not(test)) {
{
sub_tasks.push(ClusterController::start_heartbeat_checker(
metadata_manager.cluster_controller.clone(),
Duration::from_secs(1),
));
sub_tasks.push(GlobalBarrierManager::start(barrier_manager));

if !env.opts.disable_automatic_parallelism_control {
sub_tasks.push(stream_manager.start_auto_parallelism_monitor());
Expand Down
114 changes: 50 additions & 64 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ use risingwave_pb::stream_service::WaitEpochCommitRequest;
use tracing::warn;

use super::info::{CommandFragmentChanges, InflightGraphInfo};
use super::trace::TracedEpoch;
use crate::barrier::{GlobalBarrierManagerContext, InflightSubscriptionInfo};
use crate::barrier::{BarrierInfo, GlobalBarrierWorkerContext, InflightSubscriptionInfo};
use crate::controller::fragment::InflightFragmentInfo;
use crate::manager::{DdlType, StreamingJob};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism};
Expand Down Expand Up @@ -206,7 +205,7 @@ pub enum CreateStreamingJobType {
SnapshotBackfill(SnapshotBackfillInfo),
}

/// [`Command`] is the input of [`crate::barrier::GlobalBarrierManager`]. For different commands,
/// [`Command`] is the input of [`crate::barrier::GlobalBarrierWorker`]. For different commands,
/// it will build different barriers to send, and may do different stuffs after the barrier is
/// collected.
#[derive(Debug, Clone, strum::Display)]
Expand Down Expand Up @@ -439,22 +438,15 @@ pub struct CommandContext {
pub node_map: HashMap<WorkerId, PbWorkerNode>,
pub subscription_info: InflightSubscriptionInfo,

pub prev_epoch: TracedEpoch,
pub curr_epoch: TracedEpoch,
pub barrier_info: BarrierInfo,

pub table_ids_to_commit: HashSet<TableId>,

pub current_paused_reason: Option<PausedReason>,

pub command: Command,

pub kind: BarrierKind,

barrier_manager_context: GlobalBarrierManagerContext,

/// The tracing span of this command.
///
/// Differs from [`TracedEpoch`], this span focuses on the lifetime of the corresponding
/// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
/// barrier, including the process of waiting for the barrier to be sent, flowing through the
/// stream graph on compute nodes, and finishing its `post_collect` stuffs.
pub _span: tracing::Span,
Expand All @@ -463,9 +455,7 @@ pub struct CommandContext {
impl std::fmt::Debug for CommandContext {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CommandContext")
.field("prev_epoch", &self.prev_epoch.value().0)
.field("curr_epoch", &self.curr_epoch.value().0)
.field("kind", &self.kind)
.field("barrier_info", &self.barrier_info)
.field("command", &self.command)
.finish()
}
Expand All @@ -474,34 +464,26 @@ impl std::fmt::Debug for CommandContext {
impl CommandContext {
pub(super) fn new(
node_map: HashMap<WorkerId, PbWorkerNode>,
barrier_info: BarrierInfo,
subscription_info: InflightSubscriptionInfo,
prev_epoch: TracedEpoch,
curr_epoch: TracedEpoch,
table_ids_to_commit: HashSet<TableId>,
current_paused_reason: Option<PausedReason>,
command: Command,
kind: BarrierKind,
barrier_manager_context: GlobalBarrierManagerContext,
span: tracing::Span,
) -> Self {
Self {
node_map,
subscription_info,
prev_epoch,
curr_epoch,
barrier_info,
table_ids_to_commit,
current_paused_reason,
command,
kind,
barrier_manager_context,
_span: span,
}
}
}

impl Command {
/// Generate a mutation for the given command.
pub fn to_mutation(&self, current_paused_reason: Option<&PausedReason>) -> Option<Mutation> {
pub fn to_mutation(&self, current_paused_reason: Option<PausedReason>) -> Option<Mutation> {
let mutation =
match self {
Command::Plain(mutation) => mutation.clone(),
Expand All @@ -517,7 +499,7 @@ impl Command {

Command::Resume(reason) => {
// Only resume when the cluster is paused with the same reason.
if current_paused_reason == Some(reason) {
if current_paused_reason == Some(*reason) {
Some(Mutation::Resume(ResumeMutation {}))
} else {
None
Expand Down Expand Up @@ -903,41 +885,35 @@ impl Command {
..Default::default()
}))
}
}

impl CommandContext {
pub fn to_mutation(&self) -> Option<Mutation> {
self.command
.to_mutation(self.current_paused_reason.as_ref())
}

/// Returns the paused reason after executing the current command.
pub fn next_paused_reason(&self) -> Option<PausedReason> {
match &self.command {
pub fn next_paused_reason(
&self,
current_paused_reason: Option<PausedReason>,
) -> Option<PausedReason> {
match self {
Command::Pause(reason) => {
// Only pause when the cluster is not already paused.
if self.current_paused_reason.is_none() {
if current_paused_reason.is_none() {
Some(*reason)
} else {
self.current_paused_reason
current_paused_reason
}
}

Command::Resume(reason) => {
// Only resume when the cluster is paused with the same reason.
if self.current_paused_reason == Some(*reason) {
if current_paused_reason == Some(*reason) {
None
} else {
self.current_paused_reason
current_paused_reason
}
}

_ => self.current_paused_reason,
_ => current_paused_reason,
}
}
}

impl Command {
/// For `CancelStreamingJob`, returns the table id of the target table.
pub fn table_to_cancel(&self) -> Option<TableId> {
match self {
Expand All @@ -948,22 +924,24 @@ impl Command {
}

impl CommandContext {
pub async fn wait_epoch_commit(&self) -> MetaResult<()> {
pub async fn wait_epoch_commit(
&self,
barrier_manager_context: &GlobalBarrierWorkerContext,
) -> MetaResult<()> {
let table_id = self.table_ids_to_commit.iter().next().cloned();
// try wait epoch on an existing random table id
let Some(table_id) = table_id else {
// no need to wait epoch when there is no table id
return Ok(());
};
let futures = self.node_map.values().map(|worker_node| async {
let client = self
.barrier_manager_context
let client = barrier_manager_context
.env
.stream_client_pool()
.get(worker_node)
.await?;
let request = WaitEpochCommitRequest {
epoch: self.prev_epoch.value().0,
epoch: self.barrier_info.prev_epoch(),
table_id: table_id.table_id,
};
client.wait_epoch_commit(request).await
Expand All @@ -976,7 +954,10 @@ impl CommandContext {

/// Do some stuffs after barriers are collected and the new storage version is committed, for
/// the given command.
pub async fn post_collect(&self) -> MetaResult<()> {
pub async fn post_collect(
&self,
barrier_manager_context: &GlobalBarrierWorkerContext,
) -> MetaResult<()> {
match &self.command {
Command::Plain(_) => {}

Expand All @@ -988,18 +969,18 @@ impl CommandContext {
// storage version with this epoch is synced to all compute nodes before the
// execution of the next command of `Update`, as some newly created operators
// may immediately initialize their states on that barrier.
self.wait_epoch_commit().await?;
self.wait_epoch_commit(barrier_manager_context).await?;
}
}

Command::Resume(_) => {}

Command::SourceSplitAssignment(split_assignment) => {
self.barrier_manager_context
barrier_manager_context
.metadata_manager
.update_actor_splits_by_split_assignment(split_assignment)
.await?;
self.barrier_manager_context
barrier_manager_context
.source_manager
.apply_source_change(None, None, Some(split_assignment.clone()), None)
.await;
Expand All @@ -1009,7 +990,7 @@ impl CommandContext {
unregistered_state_table_ids,
..
} => {
self.barrier_manager_context
barrier_manager_context
.hummock_manager
.unregister_table_ids(unregistered_state_table_ids.iter().cloned())
.await?;
Expand All @@ -1027,7 +1008,7 @@ impl CommandContext {
// It won't clean the tables on failure,
// since the failure could be recoverable.
// As such it needs to be handled here.
self.barrier_manager_context
barrier_manager_context
.hummock_manager
.unregister_table_ids(table_fragments.all_table_ids().map(TableId::new))
.await?;
Expand All @@ -1040,7 +1021,7 @@ impl CommandContext {
init_split_assignment,
..
} = info;
self.barrier_manager_context
barrier_manager_context
.metadata_manager
.catalog_controller
.post_collect_table_fragments(
Expand All @@ -1058,7 +1039,7 @@ impl CommandContext {
..
}) = job_type
{
self.barrier_manager_context
barrier_manager_context
.metadata_manager
.catalog_controller
.post_collect_table_fragments(
Expand All @@ -1073,7 +1054,7 @@ impl CommandContext {
// Extract the fragments that include source operators.
let source_fragments = table_fragments.stream_source_fragments();
let backfill_fragments = table_fragments.source_backfill_fragments()?;
self.barrier_manager_context
barrier_manager_context
.source_manager
.apply_source_change(
Some(source_fragments),
Expand All @@ -1088,7 +1069,7 @@ impl CommandContext {
table_parallelism,
..
} => {
self.barrier_manager_context
barrier_manager_context
.scale_controller
.post_apply_reschedule(reschedules, table_parallelism)
.await?;
Expand All @@ -1102,7 +1083,7 @@ impl CommandContext {
..
}) => {
// Update actors and actor_dispatchers for new table fragments.
self.barrier_manager_context
barrier_manager_context
.metadata_manager
.catalog_controller
.post_collect_table_fragments(
Expand All @@ -1114,14 +1095,14 @@ impl CommandContext {
.await?;

// Apply the split changes in source manager.
self.barrier_manager_context
barrier_manager_context
.source_manager
.drop_source_fragments_vec(std::slice::from_ref(old_table_fragments))
.await;
let source_fragments = new_table_fragments.stream_source_fragments();
// XXX: is it possible to have backfill fragments here?
let backfill_fragments = new_table_fragments.source_backfill_fragments()?;
self.barrier_manager_context
barrier_manager_context
.source_manager
.apply_source_change(
Some(source_fragments),
Expand All @@ -1135,7 +1116,7 @@ impl CommandContext {
Command::CreateSubscription {
subscription_id, ..
} => {
self.barrier_manager_context
barrier_manager_context
.metadata_manager
.catalog_controller
.finish_create_subscription_catalog(*subscription_id)
Expand All @@ -1150,10 +1131,15 @@ impl CommandContext {

pub fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
let Some(truncate_timestamptz) = Timestamptz::from_secs(
self.prev_epoch.value().as_timestamptz().timestamp() - retention_second as i64,
self.barrier_info
.prev_epoch
.value()
.as_timestamptz()
.timestamp()
- retention_second as i64,
) else {
warn!(retention_second, prev_epoch = ?self.prev_epoch.value(), "invalid retention second value");
return self.prev_epoch.value();
warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
return self.barrier_info.prev_epoch.value();
};
Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
}
Expand Down
Loading

0 comments on commit 6263ea6

Please sign in to comment.