Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): support database checkpoint isolation #19173

Merged
merged 7 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,9 +817,9 @@ def section_streaming(outer_panels):
"The number of barriers that have been ingested but not completely processed. This metric reflects the "
"current level of congestion within the system.",
[
panels.target(f"{metric('all_barrier_nums')}", "all_barrier"),
panels.target(f"{metric('all_barrier_nums')}", "all_barrier {{database_id}}"),
panels.target(
f"{metric('in_flight_barrier_nums')}", "in_flight_barrier"
f"{metric('in_flight_barrier_nums')}", "in_flight_barrier {{database_id}}"
),
panels.target(
f"{metric('meta_snapshot_backfill_inflight_barrier_num')}",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def section_overview(panels):
[
panels.target(
f"{metric('all_barrier_nums')} >= bool 200",
"Too Many Barriers",
"Too Many Barriers {{database_id}}",
),
panels.target(
f"sum(rate({metric('recovery_latency_count')}[$__rate_interval])) > bool 0 + sum({metric('recovery_failure_cnt')}) > bool 0",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
stream_plan.Barrier barrier = 2;
repeated uint32 actor_ids_to_collect = 4;
repeated uint32 table_ids_to_sync = 5;
uint32 partial_graph_id = 6;
uint64 partial_graph_id = 6;

Check failure on line 17 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "6" with name "partial_graph_id" on message "InjectBarrierRequest" changed type from "uint32" to "uint64". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
kwannoel marked this conversation as resolved.
Show resolved Hide resolved

repeated common.ActorInfo broadcast_info = 8;
repeated stream_plan.StreamActor actors_to_build = 9;
Expand Down Expand Up @@ -48,7 +48,7 @@
uint32 worker_id = 5;
map<uint32, hummock.TableWatermarks> table_watermarks = 6;
repeated hummock.SstableInfo old_value_sstables = 7;
uint32 partial_graph_id = 8;
uint64 partial_graph_id = 8;

Check failure on line 51 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "8" with name "partial_graph_id" on message "BarrierCompleteResponse" changed type from "uint32" to "uint64". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
// prev_epoch of barrier
uint64 epoch = 9;
}
Expand All @@ -69,7 +69,7 @@
}

message RemovePartialGraphRequest {
repeated uint32 partial_graph_ids = 1;
repeated uint64 partial_graph_ids = 1;

Check failure on line 72 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" with name "partial_graph_ids" on message "RemovePartialGraphRequest" changed type from "uint32" to "uint64". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
}

oneof request {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/model/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;
use serde::{Deserialize, Serialize};

use crate::SubscriptionId;
use crate::{ObjectId, SubscriptionId};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "subscription")]
Expand All @@ -28,7 +28,7 @@ pub struct Model {
pub retention_seconds: i64,
pub definition: String,
pub subscription_state: i32,
pub dependent_table_id: i32,
pub dependent_table_id: ObjectId,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
3 changes: 1 addition & 2 deletions src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_common::catalog::TableId;
use risingwave_meta::manager::MetadataManager;
use risingwave_meta::model::TableParallelism;
use risingwave_meta::stream::{RescheduleOptions, ScaleControllerRef, WorkerReschedule};
Expand Down Expand Up @@ -123,7 +123,6 @@ impl ScaleService for ScaleServiceImpl {
.split_fragment_map_by_database(worker_reschedules)
.await?
{
let database_id = DatabaseId::new(database_id as _);
let streaming_job_ids = self
.metadata_manager
.catalog_controller
Expand Down
10 changes: 2 additions & 8 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,7 @@ impl StreamManagerService for StreamServiceImpl {
async fn pause(&self, _: Request<PauseRequest>) -> Result<Response<PauseResponse>, Status> {
for database_id in self.metadata_manager.list_active_database_ids().await? {
self.barrier_scheduler
.run_command(
DatabaseId::new(database_id as _),
Command::pause(PausedReason::Manual),
)
.run_command(database_id, Command::pause(PausedReason::Manual))
.await?;
}
Ok(Response::new(PauseResponse {}))
Expand All @@ -96,10 +93,7 @@ impl StreamManagerService for StreamServiceImpl {
async fn resume(&self, _: Request<ResumeRequest>) -> Result<Response<ResumeResponse>, Status> {
for database_id in self.metadata_manager.list_active_database_ids().await? {
self.barrier_scheduler
.run_command(
DatabaseId::new(database_id as _),
Command::resume(PausedReason::Manual),
)
.run_command(database_id, Command::resume(PausedReason::Manual))
.await?;
}
Ok(Response::new(ResumeResponse {}))
Expand Down
18 changes: 8 additions & 10 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_common::hash::ActorMapping;
use risingwave_common::types::Timestamptz;
use risingwave_common::util::epoch::Epoch;
use risingwave_connector::source::SplitImpl;
use risingwave_meta_model::{ObjectId, WorkerId};
use risingwave_meta_model::WorkerId;
use risingwave_pb::catalog::{CreateType, Table};
use risingwave_pb::common::PbWorkerNode;
use risingwave_pb::meta::table_fragments::PbActorStatus;
Expand Down Expand Up @@ -106,27 +106,26 @@ impl ReplaceTablePlan {
let mut fragment_changes = HashMap::new();
for fragment in self.new_table_fragments.fragments.values() {
let fragment_change = CommandFragmentChanges::NewFragment(
self.streaming_job.database_id().into(),
self.streaming_job.id().into(),
InflightFragmentInfo {
actors: fragment
.actors
.iter()
.map(|actor| {
(
actor.actor_id as i32,
actor.actor_id,
self.new_table_fragments
.actor_status
.get(&actor.actor_id)
.expect("should exist")
.worker_id(),
.worker_id() as WorkerId,
)
})
.collect(),
state_table_ids: fragment
.state_table_ids
.iter()
.map(|table_id| *table_id as ObjectId)
.map(|table_id| TableId::new(*table_id))
.collect(),
},
);
Expand Down Expand Up @@ -172,19 +171,19 @@ impl CreateStreamingJobCommandInfo {
.iter()
.map(|actor| {
(
actor.actor_id as i32,
actor.actor_id,
self.table_fragments
.actor_status
.get(&actor.actor_id)
.expect("should exist")
.worker_id(),
.worker_id() as WorkerId,
)
})
.collect(),
state_table_ids: fragment
.state_table_ids
.iter()
.map(|table_id| *table_id as ObjectId)
.map(|table_id| TableId::new(*table_id))
.collect(),
},
)
Expand All @@ -207,7 +206,7 @@ pub enum CreateStreamingJobType {
/// [`Command`] is the input of [`crate::barrier::GlobalBarrierWorker`]. For different commands,
/// it will build different barriers to send, and may do different stuffs after the barrier is
/// collected.
#[derive(Debug, Clone, strum::Display)]
#[derive(Debug, strum::Display)]
pub enum Command {
/// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
/// all messages before the checkpoint barrier should have been committed.
Expand Down Expand Up @@ -336,7 +335,6 @@ impl Command {
(
fragment_id,
CommandFragmentChanges::NewFragment(
info.streaming_job.database_id().into(),
info.streaming_job.id().into(),
fragment_info,
),
Expand Down
7 changes: 5 additions & 2 deletions src/meta/src/barrier/creating_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::cmp::max;
use std::collections::HashMap;
use std::ops::Bound::{Excluded, Unbounded};

use risingwave_common::catalog::TableId;
use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_common::metrics::LabelGuardedIntGauge;
use risingwave_meta_model::WorkerId;
use risingwave_pb::ddl_service::DdlProgress;
Expand Down Expand Up @@ -78,7 +78,6 @@ impl CreatingStreamingJobControl {
let actors_to_create = info.table_fragments.actors_to_create();
let graph_info = InflightStreamingJobInfo {
job_id: table_id,
database_id: info.streaming_job.database_id().into(),
fragment_infos,
};

Expand Down Expand Up @@ -165,6 +164,7 @@ impl CreatingStreamingJobControl {
}

fn inject_barrier(
database_id: DatabaseId,
table_id: TableId,
control_stream_manager: &mut ControlStreamManager,
barrier_control: &mut CreatingStreamingJobBarrierControl,
Expand All @@ -177,6 +177,7 @@ impl CreatingStreamingJobControl {
}: CreatingJobInjectBarrierInfo,
) -> MetaResult<()> {
let node_to_collect = control_stream_manager.inject_barrier(
database_id,
Some(table_id),
mutation,
&barrier_info,
Expand Down Expand Up @@ -235,6 +236,7 @@ impl CreatingStreamingJobControl {
.on_new_upstream_epoch(barrier_info, start_consume_upstream)
{
Self::inject_barrier(
DatabaseId::new(self.info.streaming_job.database_id()),
self.info.table_fragments.table_id(),
control_stream_manager,
&mut self.barrier_control,
Expand Down Expand Up @@ -263,6 +265,7 @@ impl CreatingStreamingJobControl {
let table_id = self.info.table_fragments.table_id();
for info in prev_barriers_to_inject {
Self::inject_barrier(
DatabaseId::new(self.info.streaming_job.database_id()),
table_id,
control_stream_manager,
&mut self.barrier_control,
Expand Down
Loading
Loading