From efec2f0a801639eb6da19c7a5ba56ea1952791b3 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 16 Dec 2024 11:00:49 +0800 Subject: [PATCH 01/10] source with retry in recovery --- .../src/executor/source/source_executor.rs | 173 ++++++++++++++++-- 1 file changed, 158 insertions(+), 15 deletions(-) diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index fe73710ba76de..e19d0626492c2 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -13,10 +13,13 @@ // limitations under the License. use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; use std::time::Duration; use anyhow::anyhow; use either::Either; +use futures::stream::BoxStream; use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::array::ArrayRef; @@ -38,6 +41,7 @@ use thiserror_ext::AsReport; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::{mpsc, oneshot}; use tokio::time::Instant; +use tracing::Instrument; use super::executor_core::StreamSourceCore; use super::{ @@ -46,6 +50,7 @@ use super::{ }; use crate::common::rate_limit::limited_chunk_size; use crate::executor::prelude::*; +use crate::executor::source::get_infinite_backoff_strategy; use crate::executor::stream_reader::StreamReaderWithPause; use crate::executor::UpdateMutation; @@ -474,6 +479,8 @@ impl SourceExecutor { }; core.split_state_store.init_epoch(first_epoch).await?; + // initial_dispatch_num is 0 means the source executor doesn't have downstream jobs + // and is newly created let mut is_uninitialized = self.actor_ctx.initial_dispatch_num == 0; for ele in &mut boot_state { if let Some(recover_state) = core @@ -500,20 +507,133 @@ impl SourceExecutor { let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state); tracing::debug!(state = ?recover_state, "start with state"); - let (source_chunk_reader, latest_splits) = self - .build_stream_source_reader( - &source_desc, - recover_state, - // For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data. - // It's highly probable that the work of scanning historical data cannot be shared, - // so don't waste work on it. - // For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297 - // Note that shared CDC source is special. It already starts from latest. - self.is_shared_non_cdc && is_uninitialized, + + let mut received_resume_during_build = false; + let mut barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); + + // Build the source stream reader. + let (source_chunk_reader, latest_splits) = if is_uninitialized { + let (source_chunk_reader, latest_splits) = self + .build_stream_source_reader( + &source_desc, + recover_state, + // For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data. + // It's highly probable that the work of scanning historical data cannot be shared, + // so don't waste work on it. + // For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297 + // Note that shared CDC source is special. It already starts from latest. + self.is_shared_non_cdc && is_uninitialized, + ) + .instrument_await("source_build_reader") + .await?; + ( + source_chunk_reader.map_err(StreamExecutorError::connector_error), + latest_splits, ) - .instrument_await("source_build_reader") - .await?; - let source_chunk_reader = source_chunk_reader.map_err(StreamExecutorError::connector_error); + } else { + // Build the source stream reader with retry during recovery. + // We only build source stream reader with retry during recovery, + // becasue we can rely on the persisted source states to recover the source stream + // and can avoid the potential race with "seek to latest" + let mut reader_and_splits: Option<(BoxChunkSourceStream, Option>)> = + None; + let seek_to_latest = self.is_shared_non_cdc && is_uninitialized; + let source_reader = source_desc.source.clone(); + let (column_ids, source_ctx) = self.prepare_source_stream_build(&source_desc); + let source_ctx = Arc::new(source_ctx); + let mut build_source_stream_fut = Box::pin(async move { + let backoff = get_infinite_backoff_strategy(); + tokio_retry::Retry::spawn(backoff, || async { + match source_reader + .build_stream( + recover_state.clone(), + column_ids.clone(), + source_ctx.clone(), + seek_to_latest, + ) + .await { + Ok((stream, latest_splits)) => Ok((stream, latest_splits)), + Err(e) => { + tracing::warn!(error = %e.as_report(), "failed to build source stream, retrying..."); + Err(e) + } + } + }) + .instrument(tracing::info_span!("build_source_stream_with_retry")) + .await + .expect("Retry build source stream until success.") + }); + + // loop to create source stream until success + loop { + if let Some(barrier) = build_source_stream_and_poll_barrier( + &mut barrier_stream, + &mut reader_and_splits, + &mut build_source_stream_fut, + ) + .await? + { + if let Message::Barrier(barrier) = barrier { + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Throttle(actor_to_apply) => { + if let Some(new_rate_limit) = + actor_to_apply.get(&self.actor_ctx.id) + && *new_rate_limit != self.rate_limit_rps + { + tracing::info!( + "updating rate limit from {:?} to {:?}", + self.rate_limit_rps, + *new_rate_limit + ); + + // update the rate limit option, we will apply the rate limit + // when we finish building the source stream. + self.rate_limit_rps = *new_rate_limit; + } + } + Mutation::Resume => { + // We record the Resume mutation here and postpone the resume of the source stream + // after we have successfully built the source stream. + received_resume_during_build = true; + } + _ => { + // ignore other mutations and output a warn log + tracing::warn!( + "Received a mutation {:?} to be ignored, because we only handle Throttle and Resume before + finish building source stream.", + mutation + ); + } + } + } + + // bump state store epoch + let _ = self.persist_state_and_clear_cache(barrier.epoch).await?; + yield Message::Barrier(barrier); + } else { + unreachable!( + "Only barrier message is expected when building source stream." + ); + } + } else { + assert!(reader_and_splits.is_some()); + tracing::info!("source stream created successfully"); + break; + } + } + let (source_chunk_reader, latest_splits) = + reader_and_splits.expect("source chunk reader and splits must be created"); + + ( + apply_rate_limit(source_chunk_reader, self.rate_limit_rps) + .boxed() + .map_err(StreamExecutorError::connector_error), + // source_chunk_reader.map_err(StreamExecutorError::connector_error), + latest_splits, + ) + }; + if let Some(latest_splits) = latest_splits { // make sure it is written to state table later. // Then even it receives no messages, we can observe it in state table. @@ -525,13 +645,13 @@ impl SourceExecutor { } // Merge the chunks from source and the barriers into a single stream. We prioritize // barriers over source data chunks here. - let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); + // let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); let mut stream = StreamReaderWithPause::::new(barrier_stream, source_chunk_reader); let mut command_paused = false; // - If the first barrier requires us to pause on startup, pause the stream. - if is_pause_on_startup { + if is_pause_on_startup && !received_resume_during_build { tracing::info!("source paused on startup"); stream.pause_stream(); command_paused = true; @@ -746,6 +866,29 @@ impl SourceExecutor { } } +async fn build_source_stream_and_poll_barrier( + barrier_stream: &mut BoxStream<'static, StreamExecutorResult>, + reader_and_splits: &mut Option<(BoxChunkSourceStream, Option>)>, + build_future: &mut Pin< + Box>)>>, + >, +) -> StreamExecutorResult> { + if reader_and_splits.is_some() { + return Ok(None); + } + + tokio::select! { + biased; + build_ret = &mut *build_future => { + *reader_and_splits = Some(build_ret); + Ok(None) + } + msg = barrier_stream.next() => { + msg.transpose() + } + } +} + impl Execute for SourceExecutor { fn execute(self: Box) -> BoxedMessageStream { if self.stream_source_core.is_some() { From 381a9854b7342a2be623c89037b3a5db7fcbc6d3 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 16 Dec 2024 11:03:25 +0800 Subject: [PATCH 02/10] minor --- src/stream/src/executor/source/source_executor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index e19d0626492c2..113539f9c9907 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -533,7 +533,7 @@ impl SourceExecutor { } else { // Build the source stream reader with retry during recovery. // We only build source stream reader with retry during recovery, - // becasue we can rely on the persisted source states to recover the source stream + // because we can rely on the persisted source states to recover the source stream // and can avoid the potential race with "seek to latest" let mut reader_and_splits: Option<(BoxChunkSourceStream, Option>)> = None; From 84822f43c2b4d38d9f4f728bfcd325f88ceb6cda Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 16 Dec 2024 11:11:58 +0800 Subject: [PATCH 03/10] minor --- src/stream/src/executor/source/source_executor.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 113539f9c9907..f579f94c9e152 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -629,7 +629,6 @@ impl SourceExecutor { apply_rate_limit(source_chunk_reader, self.rate_limit_rps) .boxed() .map_err(StreamExecutorError::connector_error), - // source_chunk_reader.map_err(StreamExecutorError::connector_error), latest_splits, ) }; @@ -645,7 +644,6 @@ impl SourceExecutor { } // Merge the chunks from source and the barriers into a single stream. We prioritize // barriers over source data chunks here. - // let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); let mut stream = StreamReaderWithPause::::new(barrier_stream, source_chunk_reader); let mut command_paused = false; From a8ef7da0055064139ddaa49867cb32caf050dc0b Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 16 Dec 2024 15:03:42 +0800 Subject: [PATCH 04/10] add log and assert --- src/stream/src/executor/source/source_executor.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index f579f94c9e152..e3affd034edbf 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -499,6 +499,15 @@ impl SourceExecutor { } } + // Since DDL always accompany with a checkpoint barrier, we can assume that + // if initial_dispatch_num != 0, the source state must be initialized. + if self.actor_ctx.initial_dispatch_num != 0 { + assert!( + !is_uninitialized, + "is_uninitialized must be false if initial_dispatch_num != 0" + ); + } + // init in-memory split states with persisted state if any core.init_split_state(boot_state.clone()); @@ -513,6 +522,7 @@ impl SourceExecutor { // Build the source stream reader. let (source_chunk_reader, latest_splits) = if is_uninitialized { + tracing::info!("source uninitialized, build source stream reader w/o retry."); let (source_chunk_reader, latest_splits) = self .build_stream_source_reader( &source_desc, @@ -531,6 +541,7 @@ impl SourceExecutor { latest_splits, ) } else { + tracing::info!("source initialized, build source stream reader with retry."); // Build the source stream reader with retry during recovery. // We only build source stream reader with retry during recovery, // because we can rely on the persisted source states to recover the source stream From ccec558ad5e506780de8372f9d0221b0e9f6a4e7 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 16 Dec 2024 15:06:42 +0800 Subject: [PATCH 05/10] minor --- src/stream/src/executor/source/source_executor.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index e3affd034edbf..64b2aeb2b6860 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -548,7 +548,6 @@ impl SourceExecutor { // and can avoid the potential race with "seek to latest" let mut reader_and_splits: Option<(BoxChunkSourceStream, Option>)> = None; - let seek_to_latest = self.is_shared_non_cdc && is_uninitialized; let source_reader = source_desc.source.clone(); let (column_ids, source_ctx) = self.prepare_source_stream_build(&source_desc); let source_ctx = Arc::new(source_ctx); @@ -560,7 +559,7 @@ impl SourceExecutor { recover_state.clone(), column_ids.clone(), source_ctx.clone(), - seek_to_latest, + false, // not need to seek to latest since source is initialized ) .await { Ok((stream, latest_splits)) => Ok((stream, latest_splits)), From a32ec1d4073e946b45021435d8ae7a84e877a38d Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 16 Dec 2024 17:47:21 +0800 Subject: [PATCH 06/10] minor --- .../connector/source/common/PostgresValidator.java | 13 +++---------- src/stream/src/executor/source/source_executor.rs | 4 ++-- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index 2199dd6168d62..d8e6b17d9afb7 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -354,17 +354,10 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException } } - if (!isPublicationExists) { + if (!isPublicationExists && !pubAutoCreate) { // We require a publication on upstream to publish table cdc events - if (!pubAutoCreate) { - throw ValidatorUtils.invalidArgument( - "Publication '" + pubName + "' doesn't exist and auto create is disabled"); - } else { - // createPublicationIfNeeded(Optional.empty()); - LOG.info( - "Publication '{}' doesn't exist, will be created in the process of streaming job.", - this.pubName); - } + throw ValidatorUtils.invalidArgument( + "Publication '" + pubName + "' doesn't exist and auto create is disabled"); } // If the source properties is created by share source, skip the following diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 64b2aeb2b6860..3a2a6bf476cd9 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -532,7 +532,7 @@ impl SourceExecutor { // so don't waste work on it. // For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297 // Note that shared CDC source is special. It already starts from latest. - self.is_shared_non_cdc && is_uninitialized, + self.is_shared_non_cdc, ) .instrument_await("source_build_reader") .await?; @@ -559,7 +559,7 @@ impl SourceExecutor { recover_state.clone(), column_ids.clone(), source_ctx.clone(), - false, // not need to seek to latest since source is initialized + false, // not need to seek to latest since source state is initialized ) .await { Ok((stream, latest_splits)) => Ok((stream, latest_splits)), From d23834a5c0de64b05a00e2b181f1c21ebbc91c32 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 16 Dec 2024 18:19:55 +0800 Subject: [PATCH 07/10] create pg pub in meta --- .../source/common/DbzSourceUtils.java | 25 ++++++++++++++++--- .../source/common/PostgresValidator.java | 7 ++++++ .../source/core/JniDbzSourceHandler.java | 2 +- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java index d333b00f0fdbf..e544f4421dd10 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java @@ -33,22 +33,41 @@ public class DbzSourceUtils { static final Logger LOG = LoggerFactory.getLogger(DbzSourceUtils.class); + public static void createPostgresPublicationInValidate(Map properties) + throws SQLException { + boolean pubAutoCreate = + properties.get(DbzConnectorConfig.PG_PUB_CREATE).equalsIgnoreCase("true"); + var pubName = properties.get(DbzConnectorConfig.PG_PUB_NAME); + if (!pubAutoCreate) { + LOG.info( + "Postgres publication auto creation is disabled, skip creation for publication {}.", + pubName); + return; + } + createPostgresPublicationInner(properties, pubName); + } + /** * This method is used to create a publication for the cdc source job or cdc table if it doesn't * exist. */ - public static void createPostgresPublicationIfNeeded( + public static void createPostgresPublicationInSourceExecutor( Map properties, long sourceId) throws SQLException { boolean pubAutoCreate = properties.get(DbzConnectorConfig.PG_PUB_CREATE).equalsIgnoreCase("true"); + var pubName = properties.get(DbzConnectorConfig.PG_PUB_NAME); if (!pubAutoCreate) { LOG.info( - "Postgres publication auto creation is disabled, skip creation for source {}.", + "Postgres publication auto creation is disabled, skip creation for publication {}, sourceId = {}.", + pubName, sourceId); return; } + createPostgresPublicationInner(properties, pubName); + } - var pubName = properties.get(DbzConnectorConfig.PG_PUB_NAME); + private static void createPostgresPublicationInner( + Map properties, String pubName) throws SQLException { var dbHost = properties.get(DbzConnectorConfig.HOST); var dbPort = properties.get(DbzConnectorConfig.PORT); var dbName = properties.get(DbzConnectorConfig.DB_NAME); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index d8e6b17d9afb7..1f9a1e8d86246 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -363,6 +363,13 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException // If the source properties is created by share source, skip the following // check of publication if (isCdcSourceJob) { + if (!isPublicationExists) { + LOG.info( + "creating cdc source job: publication '{}' doesn't exist, creating...", + pubName); + DbzSourceUtils.createPostgresPublicationInValidate(userProps); + LOG.info("creating cdc source job: publication '{}' created successfully", pubName); + } return; } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java index dd03d0adaca8a..ac159a202e8ba 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java @@ -60,7 +60,7 @@ public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long boolean isCdcSourceJob = request.getIsSourceJob(); if (request.getSourceType() == POSTGRES) { - DbzSourceUtils.createPostgresPublicationIfNeeded( + DbzSourceUtils.createPostgresPublicationInSourceExecutor( request.getPropertiesMap(), request.getSourceId()); } From e19afe1ea8ff4a95457bdc861c3f1c2bc473f1a5 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 16 Dec 2024 18:24:09 +0800 Subject: [PATCH 08/10] fix --- src/stream/src/executor/source/source_executor.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 3a2a6bf476cd9..9cb965db90602 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -499,15 +499,6 @@ impl SourceExecutor { } } - // Since DDL always accompany with a checkpoint barrier, we can assume that - // if initial_dispatch_num != 0, the source state must be initialized. - if self.actor_ctx.initial_dispatch_num != 0 { - assert!( - !is_uninitialized, - "is_uninitialized must be false if initial_dispatch_num != 0" - ); - } - // init in-memory split states with persisted state if any core.init_split_state(boot_state.clone()); From 329bdc369ddb2f9eb5b5a461e15b2a4b3f02186c Mon Sep 17 00:00:00 2001 From: StrikeW Date: Tue, 17 Dec 2024 10:19:32 +0800 Subject: [PATCH 09/10] add --- src/stream/src/executor/source/source_executor.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 9cb965db90602..96dd4bd3f5818 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -537,6 +537,7 @@ impl SourceExecutor { // We only build source stream reader with retry during recovery, // because we can rely on the persisted source states to recover the source stream // and can avoid the potential race with "seek to latest" + // https://github.com/risingwavelabs/risingwave/issues/19681#issuecomment-2532183002 let mut reader_and_splits: Option<(BoxChunkSourceStream, Option>)> = None; let source_reader = source_desc.source.clone(); From 06127417b8370fdc79fcf10943f73e34fe80f4a9 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Tue, 17 Dec 2024 13:41:46 +0800 Subject: [PATCH 10/10] remove e2e workaround --- e2e_test/source_inline/kafka/issue_19563.slt.serial | 4 ---- 1 file changed, 4 deletions(-) diff --git a/e2e_test/source_inline/kafka/issue_19563.slt.serial b/e2e_test/source_inline/kafka/issue_19563.slt.serial index cb3986f934df4..e080e831d9025 100644 --- a/e2e_test/source_inline/kafka/issue_19563.slt.serial +++ b/e2e_test/source_inline/kafka/issue_19563.slt.serial @@ -46,10 +46,6 @@ select array_length(upstream_fragment_ids) from rw_fragments where array_contain ---- 3 -# XXX: wait until source reader is ready. then produce data. -# This is a temporary workaround for a data loss bug https://github.com/risingwavelabs/risingwave/issues/19681#issuecomment-2532183002 -sleep 2s - system ok cat <