Skip to content

Commit

Permalink
refactor(meta): reorganize code of global barrier manager (part 1) (#…
Browse files Browse the repository at this point in the history
…19334)

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
wenym1 and github-actions[bot] authored Nov 11, 2024
1 parent b689615 commit 3c2e81b
Show file tree
Hide file tree
Showing 18 changed files with 2,672 additions and 2,463 deletions.
820 changes: 820 additions & 0 deletions src/meta/src/barrier/checkpoint/control.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,17 @@ use std::cmp::max;
use std::collections::HashMap;
use std::ops::Bound::{Excluded, Unbounded};

use barrier_control::CreatingStreamingJobBarrierControl;
use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_common::metrics::LabelGuardedIntGauge;
use risingwave_meta_model::WorkerId;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_service::BarrierCompleteResponse;
use status::{CreatingJobInjectBarrierInfo, CreatingStreamingJobStatus};
use tracing::info;

use crate::barrier::creating_job::barrier_control::CreatingStreamingJobBarrierControl;
use crate::barrier::creating_job::status::{
CreatingJobInjectBarrierInfo, CreatingStreamingJobStatus,
};
use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::rpc::ControlStreamManager;
Expand All @@ -41,8 +39,8 @@ use crate::rpc::metrics::GLOBAL_META_METRICS;
use crate::MetaResult;

#[derive(Debug)]
pub(super) struct CreatingStreamingJobControl {
pub(super) info: CreateStreamingJobCommandInfo,
pub(crate) struct CreatingStreamingJobControl {
pub(crate) info: CreateStreamingJobCommandInfo,
pub(super) snapshot_backfill_info: SnapshotBackfillInfo,
backfill_epoch: u64,

Expand Down Expand Up @@ -103,7 +101,7 @@ impl CreatingStreamingJobControl {
}
}

pub(super) fn is_wait_on_worker(&self, worker_id: WorkerId) -> bool {
pub(crate) fn is_wait_on_worker(&self, worker_id: WorkerId) -> bool {
self.barrier_control.is_wait_on_worker(worker_id)
|| (self.status.is_finishing()
&& InflightFragmentInfo::contains_worker(
Expand All @@ -112,7 +110,7 @@ impl CreatingStreamingJobControl {
))
}

pub(super) fn gen_ddl_progress(&self) -> DdlProgress {
pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
let progress = match &self.status {
CreatingStreamingJobStatus::ConsumingSnapshot {
create_mview_tracker,
Expand Down
File renamed without changes.
20 changes: 20 additions & 0 deletions src/meta/src/barrier/checkpoint/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod control;
mod creating_job;
mod state;

pub(super) use control::{CheckpointControl, DatabaseCheckpointControl, EpochNode};
pub(super) use state::BarrierWorkerState;
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
use crate::controller::fragment::InflightFragmentInfo;

/// The latest state of `GlobalBarrierWorker` after injecting the latest barrier.
pub(super) struct BarrierWorkerState {
pub(crate) struct BarrierWorkerState {
/// The last sent `prev_epoch`
///
/// There's no need to persist this field. On recovery, we will restore this from the latest
Expand Down
247 changes: 17 additions & 230 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::collections::{HashMap, HashSet};
use std::fmt::Formatter;

use futures::future::try_join_all;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorMapping;
Expand All @@ -37,17 +36,15 @@ use risingwave_pb::stream_plan::{
DropSubscriptionsMutation, PauseMutation, ResumeMutation, SourceChangeSplitMutation,
StopMutation, StreamActor, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
};
use risingwave_pb::stream_service::WaitEpochCommitRequest;
use tracing::warn;

use super::info::{CommandFragmentChanges, InflightStreamingJobInfo};
use crate::barrier::info::BarrierInfo;
use crate::barrier::{GlobalBarrierWorkerContextImpl, InflightSubscriptionInfo};
use crate::barrier::InflightSubscriptionInfo;
use crate::controller::fragment::InflightFragmentInfo;
use crate::manager::{DdlType, StreamingJob};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig};
use crate::MetaResult;

/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
/// in some fragment, like scaling or migrating.
Expand Down Expand Up @@ -203,7 +200,7 @@ pub enum CreateStreamingJobType {
SnapshotBackfill(SnapshotBackfillInfo),
}

/// [`Command`] is the input of [`crate::barrier::GlobalBarrierWorker`]. For different commands,
/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
/// it will build different barriers to send, and may do different stuffs after the barrier is
/// collected.
#[derive(Debug, strum::Display)]
Expand Down Expand Up @@ -479,6 +476,21 @@ impl CommandContext {
_span: span,
}
}

pub fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
let Some(truncate_timestamptz) = Timestamptz::from_secs(
self.barrier_info
.prev_epoch
.value()
.as_timestamptz()
.timestamp()
- retention_second as i64,
) else {
warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
return self.barrier_info.prev_epoch.value();
};
Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
}
}

impl Command {
Expand Down Expand Up @@ -922,228 +934,3 @@ impl Command {
}
}
}

impl CommandContext {
pub async fn wait_epoch_commit(
&self,
barrier_manager_context: &GlobalBarrierWorkerContextImpl,
) -> MetaResult<()> {
let table_id = self.table_ids_to_commit.iter().next().cloned();
// try wait epoch on an existing random table id
let Some(table_id) = table_id else {
// no need to wait epoch when there is no table id
return Ok(());
};
let futures = self.node_map.values().map(|worker_node| async {
let client = barrier_manager_context
.env
.stream_client_pool()
.get(worker_node)
.await?;
let request = WaitEpochCommitRequest {
epoch: self.barrier_info.prev_epoch(),
table_id: table_id.table_id,
};
client.wait_epoch_commit(request).await
});

try_join_all(futures).await?;

Ok(())
}

/// Do some stuffs after barriers are collected and the new storage version is committed, for
/// the given command.
pub async fn post_collect(
&self,
barrier_manager_context: &GlobalBarrierWorkerContextImpl,
) -> MetaResult<()> {
let Some(command) = &self.command else {
return Ok(());
};
match command {
Command::Flush => {}

Command::Throttle(_) => {}

Command::Pause(reason) => {
if let PausedReason::ConfigChange = reason {
// After the `Pause` barrier is collected and committed, we must ensure that the
// storage version with this epoch is synced to all compute nodes before the
// execution of the next command of `Update`, as some newly created operators
// may immediately initialize their states on that barrier.
self.wait_epoch_commit(barrier_manager_context).await?;
}
}

Command::Resume(_) => {}

Command::SourceSplitAssignment(split_assignment) => {
barrier_manager_context
.metadata_manager
.update_actor_splits_by_split_assignment(split_assignment)
.await?;
barrier_manager_context
.source_manager
.apply_source_change(None, None, Some(split_assignment.clone()), None)
.await;
}

Command::DropStreamingJobs {
unregistered_state_table_ids,
..
} => {
barrier_manager_context
.hummock_manager
.unregister_table_ids(unregistered_state_table_ids.iter().cloned())
.await?;
}

Command::CancelStreamingJob(table_fragments) => {
tracing::debug!(id = ?table_fragments.table_id(), "cancelling stream job");

// NOTE(kwannoel): At this point, meta has already registered the table ids.
// We should unregister them.
// This is required for background ddl, for foreground ddl this is a no-op.
// Foreground ddl is handled entirely by stream manager, so it will unregister
// the table ids on failure.
// On the other hand background ddl could be handled by barrier manager.
// It won't clean the tables on failure,
// since the failure could be recoverable.
// As such it needs to be handled here.
barrier_manager_context
.hummock_manager
.unregister_table_ids(table_fragments.all_table_ids().map(TableId::new))
.await?;
}

Command::CreateStreamingJob { info, job_type } => {
let CreateStreamingJobCommandInfo {
table_fragments,
dispatchers,
init_split_assignment,
..
} = info;
barrier_manager_context
.metadata_manager
.catalog_controller
.post_collect_table_fragments(
table_fragments.table_id().table_id as _,
table_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
)
.await?;

if let CreateStreamingJobType::SinkIntoTable(ReplaceTablePlan {
new_table_fragments,
dispatchers,
init_split_assignment,
..
}) = job_type
{
barrier_manager_context
.metadata_manager
.catalog_controller
.post_collect_table_fragments(
new_table_fragments.table_id().table_id as _,
new_table_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
)
.await?;
}

// Extract the fragments that include source operators.
let source_fragments = table_fragments.stream_source_fragments();
let backfill_fragments = table_fragments.source_backfill_fragments()?;
barrier_manager_context
.source_manager
.apply_source_change(
Some(source_fragments),
Some(backfill_fragments),
Some(init_split_assignment.clone()),
None,
)
.await;
}
Command::RescheduleFragment {
reschedules,
table_parallelism,
..
} => {
barrier_manager_context
.scale_controller
.post_apply_reschedule(reschedules, table_parallelism)
.await?;
}

Command::ReplaceTable(ReplaceTablePlan {
old_table_fragments,
new_table_fragments,
dispatchers,
init_split_assignment,
..
}) => {
// Update actors and actor_dispatchers for new table fragments.
barrier_manager_context
.metadata_manager
.catalog_controller
.post_collect_table_fragments(
new_table_fragments.table_id().table_id as _,
new_table_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
)
.await?;

// Apply the split changes in source manager.
barrier_manager_context
.source_manager
.drop_source_fragments_vec(std::slice::from_ref(old_table_fragments))
.await;
let source_fragments = new_table_fragments.stream_source_fragments();
// XXX: is it possible to have backfill fragments here?
let backfill_fragments = new_table_fragments.source_backfill_fragments()?;
barrier_manager_context
.source_manager
.apply_source_change(
Some(source_fragments),
Some(backfill_fragments),
Some(init_split_assignment.clone()),
None,
)
.await;
}

Command::CreateSubscription {
subscription_id, ..
} => {
barrier_manager_context
.metadata_manager
.catalog_controller
.finish_create_subscription_catalog(*subscription_id)
.await?
}
Command::DropSubscription { .. } => {}
Command::MergeSnapshotBackfillStreamingJobs(_) => {}
}

Ok(())
}

pub fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
let Some(truncate_timestamptz) = Timestamptz::from_secs(
self.barrier_info
.prev_epoch
.value()
.as_timestamptz()
.timestamp()
- retention_second as i64,
) else {
warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
return self.barrier_info.prev_epoch.value();
};
Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
}
}
Loading

0 comments on commit 3c2e81b

Please sign in to comment.