diff --git a/e2e_test/source_inline/kafka/issue_19563.slt.serial b/e2e_test/source_inline/kafka/issue_19563.slt.serial index cb3986f934df4..e080e831d9025 100644 --- a/e2e_test/source_inline/kafka/issue_19563.slt.serial +++ b/e2e_test/source_inline/kafka/issue_19563.slt.serial @@ -46,10 +46,6 @@ select array_length(upstream_fragment_ids) from rw_fragments where array_contain ---- 3 -# XXX: wait until source reader is ready. then produce data. -# This is a temporary workaround for a data loss bug https://github.com/risingwavelabs/risingwave/issues/19681#issuecomment-2532183002 -sleep 2s - system ok cat < properties) + throws SQLException { + boolean pubAutoCreate = + properties.get(DbzConnectorConfig.PG_PUB_CREATE).equalsIgnoreCase("true"); + var pubName = properties.get(DbzConnectorConfig.PG_PUB_NAME); + if (!pubAutoCreate) { + LOG.info( + "Postgres publication auto creation is disabled, skip creation for publication {}.", + pubName); + return; + } + createPostgresPublicationInner(properties, pubName); + } + /** * This method is used to create a publication for the cdc source job or cdc table if it doesn't * exist. */ - public static void createPostgresPublicationIfNeeded( + public static void createPostgresPublicationInSourceExecutor( Map properties, long sourceId) throws SQLException { boolean pubAutoCreate = properties.get(DbzConnectorConfig.PG_PUB_CREATE).equalsIgnoreCase("true"); + var pubName = properties.get(DbzConnectorConfig.PG_PUB_NAME); if (!pubAutoCreate) { LOG.info( - "Postgres publication auto creation is disabled, skip creation for source {}.", + "Postgres publication auto creation is disabled, skip creation for publication {}, sourceId = {}.", + pubName, sourceId); return; } + createPostgresPublicationInner(properties, pubName); + } - var pubName = properties.get(DbzConnectorConfig.PG_PUB_NAME); + private static void createPostgresPublicationInner( + Map properties, String pubName) throws SQLException { var dbHost = properties.get(DbzConnectorConfig.HOST); var dbPort = properties.get(DbzConnectorConfig.PORT); var dbName = properties.get(DbzConnectorConfig.DB_NAME); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index 2199dd6168d62..1f9a1e8d86246 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -354,22 +354,22 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException } } - if (!isPublicationExists) { + if (!isPublicationExists && !pubAutoCreate) { // We require a publication on upstream to publish table cdc events - if (!pubAutoCreate) { - throw ValidatorUtils.invalidArgument( - "Publication '" + pubName + "' doesn't exist and auto create is disabled"); - } else { - // createPublicationIfNeeded(Optional.empty()); - LOG.info( - "Publication '{}' doesn't exist, will be created in the process of streaming job.", - this.pubName); - } + throw ValidatorUtils.invalidArgument( + "Publication '" + pubName + "' doesn't exist and auto create is disabled"); } // If the source properties is created by share source, skip the following // check of publication if (isCdcSourceJob) { + if (!isPublicationExists) { + LOG.info( + "creating cdc source job: publication '{}' doesn't exist, creating...", + pubName); + DbzSourceUtils.createPostgresPublicationInValidate(userProps); + LOG.info("creating cdc source job: publication '{}' created successfully", pubName); + } return; } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java index dd03d0adaca8a..ac159a202e8ba 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java @@ -60,7 +60,7 @@ public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long boolean isCdcSourceJob = request.getIsSourceJob(); if (request.getSourceType() == POSTGRES) { - DbzSourceUtils.createPostgresPublicationIfNeeded( + DbzSourceUtils.createPostgresPublicationInSourceExecutor( request.getPropertiesMap(), request.getSourceId()); } diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index fe73710ba76de..96dd4bd3f5818 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -13,10 +13,13 @@ // limitations under the License. use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; use std::time::Duration; use anyhow::anyhow; use either::Either; +use futures::stream::BoxStream; use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::array::ArrayRef; @@ -38,6 +41,7 @@ use thiserror_ext::AsReport; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::{mpsc, oneshot}; use tokio::time::Instant; +use tracing::Instrument; use super::executor_core::StreamSourceCore; use super::{ @@ -46,6 +50,7 @@ use super::{ }; use crate::common::rate_limit::limited_chunk_size; use crate::executor::prelude::*; +use crate::executor::source::get_infinite_backoff_strategy; use crate::executor::stream_reader::StreamReaderWithPause; use crate::executor::UpdateMutation; @@ -474,6 +479,8 @@ impl SourceExecutor { }; core.split_state_store.init_epoch(first_epoch).await?; + // initial_dispatch_num is 0 means the source executor doesn't have downstream jobs + // and is newly created let mut is_uninitialized = self.actor_ctx.initial_dispatch_num == 0; for ele in &mut boot_state { if let Some(recover_state) = core @@ -500,20 +507,134 @@ impl SourceExecutor { let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state); tracing::debug!(state = ?recover_state, "start with state"); - let (source_chunk_reader, latest_splits) = self - .build_stream_source_reader( - &source_desc, - recover_state, - // For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data. - // It's highly probable that the work of scanning historical data cannot be shared, - // so don't waste work on it. - // For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297 - // Note that shared CDC source is special. It already starts from latest. - self.is_shared_non_cdc && is_uninitialized, + + let mut received_resume_during_build = false; + let mut barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); + + // Build the source stream reader. + let (source_chunk_reader, latest_splits) = if is_uninitialized { + tracing::info!("source uninitialized, build source stream reader w/o retry."); + let (source_chunk_reader, latest_splits) = self + .build_stream_source_reader( + &source_desc, + recover_state, + // For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data. + // It's highly probable that the work of scanning historical data cannot be shared, + // so don't waste work on it. + // For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297 + // Note that shared CDC source is special. It already starts from latest. + self.is_shared_non_cdc, + ) + .instrument_await("source_build_reader") + .await?; + ( + source_chunk_reader.map_err(StreamExecutorError::connector_error), + latest_splits, ) - .instrument_await("source_build_reader") - .await?; - let source_chunk_reader = source_chunk_reader.map_err(StreamExecutorError::connector_error); + } else { + tracing::info!("source initialized, build source stream reader with retry."); + // Build the source stream reader with retry during recovery. + // We only build source stream reader with retry during recovery, + // because we can rely on the persisted source states to recover the source stream + // and can avoid the potential race with "seek to latest" + // https://github.com/risingwavelabs/risingwave/issues/19681#issuecomment-2532183002 + let mut reader_and_splits: Option<(BoxChunkSourceStream, Option>)> = + None; + let source_reader = source_desc.source.clone(); + let (column_ids, source_ctx) = self.prepare_source_stream_build(&source_desc); + let source_ctx = Arc::new(source_ctx); + let mut build_source_stream_fut = Box::pin(async move { + let backoff = get_infinite_backoff_strategy(); + tokio_retry::Retry::spawn(backoff, || async { + match source_reader + .build_stream( + recover_state.clone(), + column_ids.clone(), + source_ctx.clone(), + false, // not need to seek to latest since source state is initialized + ) + .await { + Ok((stream, latest_splits)) => Ok((stream, latest_splits)), + Err(e) => { + tracing::warn!(error = %e.as_report(), "failed to build source stream, retrying..."); + Err(e) + } + } + }) + .instrument(tracing::info_span!("build_source_stream_with_retry")) + .await + .expect("Retry build source stream until success.") + }); + + // loop to create source stream until success + loop { + if let Some(barrier) = build_source_stream_and_poll_barrier( + &mut barrier_stream, + &mut reader_and_splits, + &mut build_source_stream_fut, + ) + .await? + { + if let Message::Barrier(barrier) = barrier { + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Throttle(actor_to_apply) => { + if let Some(new_rate_limit) = + actor_to_apply.get(&self.actor_ctx.id) + && *new_rate_limit != self.rate_limit_rps + { + tracing::info!( + "updating rate limit from {:?} to {:?}", + self.rate_limit_rps, + *new_rate_limit + ); + + // update the rate limit option, we will apply the rate limit + // when we finish building the source stream. + self.rate_limit_rps = *new_rate_limit; + } + } + Mutation::Resume => { + // We record the Resume mutation here and postpone the resume of the source stream + // after we have successfully built the source stream. + received_resume_during_build = true; + } + _ => { + // ignore other mutations and output a warn log + tracing::warn!( + "Received a mutation {:?} to be ignored, because we only handle Throttle and Resume before + finish building source stream.", + mutation + ); + } + } + } + + // bump state store epoch + let _ = self.persist_state_and_clear_cache(barrier.epoch).await?; + yield Message::Barrier(barrier); + } else { + unreachable!( + "Only barrier message is expected when building source stream." + ); + } + } else { + assert!(reader_and_splits.is_some()); + tracing::info!("source stream created successfully"); + break; + } + } + let (source_chunk_reader, latest_splits) = + reader_and_splits.expect("source chunk reader and splits must be created"); + + ( + apply_rate_limit(source_chunk_reader, self.rate_limit_rps) + .boxed() + .map_err(StreamExecutorError::connector_error), + latest_splits, + ) + }; + if let Some(latest_splits) = latest_splits { // make sure it is written to state table later. // Then even it receives no messages, we can observe it in state table. @@ -525,13 +646,12 @@ impl SourceExecutor { } // Merge the chunks from source and the barriers into a single stream. We prioritize // barriers over source data chunks here. - let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); let mut stream = StreamReaderWithPause::::new(barrier_stream, source_chunk_reader); let mut command_paused = false; // - If the first barrier requires us to pause on startup, pause the stream. - if is_pause_on_startup { + if is_pause_on_startup && !received_resume_during_build { tracing::info!("source paused on startup"); stream.pause_stream(); command_paused = true; @@ -746,6 +866,29 @@ impl SourceExecutor { } } +async fn build_source_stream_and_poll_barrier( + barrier_stream: &mut BoxStream<'static, StreamExecutorResult>, + reader_and_splits: &mut Option<(BoxChunkSourceStream, Option>)>, + build_future: &mut Pin< + Box>)>>, + >, +) -> StreamExecutorResult> { + if reader_and_splits.is_some() { + return Ok(None); + } + + tokio::select! { + biased; + build_ret = &mut *build_future => { + *reader_and_splits = Some(build_ret); + Ok(None) + } + msg = barrier_stream.next() => { + msg.transpose() + } + } +} + impl Execute for SourceExecutor { fn execute(self: Box) -> BoxedMessageStream { if self.stream_source_core.is_some() {