Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source): create source stream with retry during recovery #19805

Merged
merged 14 commits into from
Dec 18, 2024
181 changes: 166 additions & 15 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
// limitations under the License.

use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;

use anyhow::anyhow;
use either::Either;
use futures::stream::BoxStream;
use futures::TryStreamExt;
use itertools::Itertools;
use risingwave_common::array::ArrayRef;
Expand All @@ -38,6 +41,7 @@ use thiserror_ext::AsReport;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::{mpsc, oneshot};
use tokio::time::Instant;
use tracing::Instrument;

use super::executor_core::StreamSourceCore;
use super::{
Expand All @@ -46,6 +50,7 @@ use super::{
};
use crate::common::rate_limit::limited_chunk_size;
use crate::executor::prelude::*;
use crate::executor::source::get_infinite_backoff_strategy;
use crate::executor::stream_reader::StreamReaderWithPause;
use crate::executor::UpdateMutation;

Expand Down Expand Up @@ -474,6 +479,8 @@ impl<S: StateStore> SourceExecutor<S> {
};

core.split_state_store.init_epoch(first_epoch).await?;
// initial_dispatch_num is 0 means the source executor doesn't have downstream jobs
// and is newly created
let mut is_uninitialized = self.actor_ctx.initial_dispatch_num == 0;
for ele in &mut boot_state {
if let Some(recover_state) = core
Expand All @@ -492,6 +499,15 @@ impl<S: StateStore> SourceExecutor<S> {
}
}

// Since DDL always accompany with a checkpoint barrier, we can assume that
// if initial_dispatch_num != 0, the source state must be initialized.
if self.actor_ctx.initial_dispatch_num != 0 {
assert!(
!is_uninitialized,
"is_uninitialized must be false if initial_dispatch_num != 0"
);
}
StrikeW marked this conversation as resolved.
Show resolved Hide resolved

// init in-memory split states with persisted state if any
core.init_split_state(boot_state.clone());

Expand All @@ -500,20 +516,133 @@ impl<S: StateStore> SourceExecutor<S> {

let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
tracing::debug!(state = ?recover_state, "start with state");
let (source_chunk_reader, latest_splits) = self
.build_stream_source_reader(
&source_desc,
recover_state,
// For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data.
// It's highly probable that the work of scanning historical data cannot be shared,
// so don't waste work on it.
// For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297
// Note that shared CDC source is special. It already starts from latest.
self.is_shared_non_cdc && is_uninitialized,

let mut received_resume_during_build = false;
let mut barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();

// Build the source stream reader.
let (source_chunk_reader, latest_splits) = if is_uninitialized {
tracing::info!("source uninitialized, build source stream reader w/o retry.");
let (source_chunk_reader, latest_splits) = self
.build_stream_source_reader(
hzxa21 marked this conversation as resolved.
Show resolved Hide resolved
&source_desc,
recover_state,
// For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data.
// It's highly probable that the work of scanning historical data cannot be shared,
// so don't waste work on it.
// For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297
// Note that shared CDC source is special. It already starts from latest.
self.is_shared_non_cdc && is_uninitialized,
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
)
.instrument_await("source_build_reader")
.await?;
(
source_chunk_reader.map_err(StreamExecutorError::connector_error),
latest_splits,
)
.instrument_await("source_build_reader")
.await?;
let source_chunk_reader = source_chunk_reader.map_err(StreamExecutorError::connector_error);
} else {
tracing::info!("source initialized, build source stream reader with retry.");
// Build the source stream reader with retry during recovery.
// We only build source stream reader with retry during recovery,
// because we can rely on the persisted source states to recover the source stream
// and can avoid the potential race with "seek to latest"
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
let mut reader_and_splits: Option<(BoxChunkSourceStream, Option<Vec<SplitImpl>>)> =
None;
let source_reader = source_desc.source.clone();
let (column_ids, source_ctx) = self.prepare_source_stream_build(&source_desc);
let source_ctx = Arc::new(source_ctx);
let mut build_source_stream_fut = Box::pin(async move {
let backoff = get_infinite_backoff_strategy();
tokio_retry::Retry::spawn(backoff, || async {
match source_reader
.build_stream(
recover_state.clone(),
column_ids.clone(),
source_ctx.clone(),
false, // not need to seek to latest since source is initialized
)
.await {
Ok((stream, latest_splits)) => Ok((stream, latest_splits)),
Err(e) => {
tracing::warn!(error = %e.as_report(), "failed to build source stream, retrying...");
Err(e)
}
}
})
.instrument(tracing::info_span!("build_source_stream_with_retry"))
.await
.expect("Retry build source stream until success.")
});

// loop to create source stream until success
loop {
if let Some(barrier) = build_source_stream_and_poll_barrier(
&mut barrier_stream,
&mut reader_and_splits,
&mut build_source_stream_fut,
)
.await?
{
if let Message::Barrier(barrier) = barrier {
if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Throttle(actor_to_apply) => {
if let Some(new_rate_limit) =
actor_to_apply.get(&self.actor_ctx.id)
&& *new_rate_limit != self.rate_limit_rps
{
tracing::info!(
"updating rate limit from {:?} to {:?}",
self.rate_limit_rps,
*new_rate_limit
);

// update the rate limit option, we will apply the rate limit
// when we finish building the source stream.
self.rate_limit_rps = *new_rate_limit;
}
}
Mutation::Resume => {
// We record the Resume mutation here and postpone the resume of the source stream
// after we have successfully built the source stream.
received_resume_during_build = true;
}
_ => {
// ignore other mutations and output a warn log
tracing::warn!(
"Received a mutation {:?} to be ignored, because we only handle Throttle and Resume before
finish building source stream.",
mutation
);
}
}
}

// bump state store epoch
let _ = self.persist_state_and_clear_cache(barrier.epoch).await?;
yield Message::Barrier(barrier);
} else {
unreachable!(
"Only barrier message is expected when building source stream."
);
}
} else {
assert!(reader_and_splits.is_some());
tracing::info!("source stream created successfully");
break;
}
}
let (source_chunk_reader, latest_splits) =
reader_and_splits.expect("source chunk reader and splits must be created");

(
apply_rate_limit(source_chunk_reader, self.rate_limit_rps)
.boxed()
.map_err(StreamExecutorError::connector_error),
latest_splits,
)
};

if let Some(latest_splits) = latest_splits {
// make sure it is written to state table later.
// Then even it receives no messages, we can observe it in state table.
Expand All @@ -525,13 +654,12 @@ impl<S: StateStore> SourceExecutor<S> {
}
// Merge the chunks from source and the barriers into a single stream. We prioritize
// barriers over source data chunks here.
let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
let mut stream =
StreamReaderWithPause::<true, StreamChunk>::new(barrier_stream, source_chunk_reader);
let mut command_paused = false;

// - If the first barrier requires us to pause on startup, pause the stream.
if is_pause_on_startup {
if is_pause_on_startup && !received_resume_during_build {
tracing::info!("source paused on startup");
stream.pause_stream();
command_paused = true;
Expand Down Expand Up @@ -746,6 +874,29 @@ impl<S: StateStore> SourceExecutor<S> {
}
}

async fn build_source_stream_and_poll_barrier(
barrier_stream: &mut BoxStream<'static, StreamExecutorResult<Message>>,
reader_and_splits: &mut Option<(BoxChunkSourceStream, Option<Vec<SplitImpl>>)>,
build_future: &mut Pin<
Box<impl Future<Output = (BoxChunkSourceStream, Option<Vec<SplitImpl>>)>>,
>,
) -> StreamExecutorResult<Option<Message>> {
if reader_and_splits.is_some() {
return Ok(None);
}

tokio::select! {
biased;
build_ret = &mut *build_future => {
*reader_and_splits = Some(build_ret);
Ok(None)
}
msg = barrier_stream.next() => {
msg.transpose()
}
}
}

impl<S: StateStore> Execute for SourceExecutor<S> {
fn execute(self: Box<Self>) -> BoxedMessageStream {
if self.stream_source_core.is_some() {
Expand Down
Loading