From 49b335f6abc2a1f4d72acde1de4b9db11ca061a1 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 5 Dec 2024 14:51:21 +0800 Subject: [PATCH 01/15] change `SourceCtrlOpts::rate_limit` to `split_txn: bool` Signed-off-by: Richard Chien --- src/batch/executors/src/executor/source.rs | 2 +- src/connector/src/parser/mod.rs | 14 +++++++------- src/connector/src/source/base.rs | 6 +++--- src/stream/src/common/rate_limit.rs | 6 +++--- src/stream/src/executor/source/fetch_executor.rs | 2 +- .../src/executor/source/fs_source_executor.rs | 2 +- .../executor/source/source_backfill_executor.rs | 2 +- src/stream/src/executor/source/source_executor.rs | 2 +- 8 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/batch/executors/src/executor/source.rs b/src/batch/executors/src/executor/source.rs index d5cb717df92a2..ec1e05b8c6fe8 100644 --- a/src/batch/executors/src/executor/source.rs +++ b/src/batch/executors/src/executor/source.rs @@ -150,7 +150,7 @@ impl SourceExecutor { self.metrics, SourceCtrlOpts { chunk_size: self.chunk_size, - rate_limit: None, + split_txn: false, }, ConnectorProperties::default(), None, diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 30e1d1ef92ba3..d2aa5502ae53b 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -205,14 +205,14 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { } #[try_stream(ok = Vec, error = ConnectorError)] -async fn ensure_largest_at_rate_limit(stream: BoxSourceStream, rate_limit: u32) { +async fn ensure_max_chunk_size(stream: BoxSourceStream, max_chunk_size: usize) { #[for_await] for batch in stream { let mut batch = batch?; let mut start = 0; let end = batch.len(); while start < end { - let next = std::cmp::min(start + rate_limit as usize, end); + let next = std::cmp::min(start + max_chunk_size, end); yield std::mem::take(&mut batch[start..next].as_mut()).to_vec(); start = next; } @@ -234,12 +234,12 @@ impl P { let actor_id = self.source_ctx().actor_id; let source_id = self.source_ctx().source_id.table_id(); + // TODO(): remove this later // Ensure chunk size is smaller than rate limit - let data_stream = if let Some(rate_limit) = &self.source_ctx().source_ctrl_opts.rate_limit { - Box::pin(ensure_largest_at_rate_limit(data_stream, *rate_limit)) - } else { - data_stream - }; + let data_stream = Box::pin(ensure_max_chunk_size( + data_stream, + self.source_ctx().source_ctrl_opts.chunk_size, + )); // The parser stream will be long-lived. We use `instrument_with` here to create // a new span for the polling of each chunk. diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 3525152a35c27..26bca3ba1c0e1 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -146,8 +146,8 @@ pub const MAX_CHUNK_SIZE: usize = 1024; pub struct SourceCtrlOpts { /// The max size of a chunk yielded by source stream. pub chunk_size: usize, - /// Rate limit of source - pub rate_limit: Option, + /// Whether to allow splitting a transaction into multiple chunks to meet the `max_chunk_size`. + pub split_txn: bool, } // The options in `SourceCtrlOpts` are so important that we don't want to impl `Default` for it, @@ -226,7 +226,7 @@ impl SourceContext { Arc::new(SourceMetrics::default()), SourceCtrlOpts { chunk_size: MAX_CHUNK_SIZE, - rate_limit: None, + split_txn: false, }, ConnectorProperties::default(), None, diff --git a/src/stream/src/common/rate_limit.rs b/src/stream/src/common/rate_limit.rs index 6144ae6810eba..57c518c971c9b 100644 --- a/src/stream/src/common/rate_limit.rs +++ b/src/stream/src/common/rate_limit.rs @@ -13,9 +13,9 @@ // limitations under the License. /// Get the rate-limited max chunk size. -pub(crate) fn limited_chunk_size(rate_limit: Option) -> usize { +pub(crate) fn limited_chunk_size(rate_limit_burst: Option) -> usize { let config_chunk_size = crate::config::chunk_size(); - rate_limit - .map(|limit| config_chunk_size.min(limit as usize)) + rate_limit_burst + .map(|burst| config_chunk_size.min(burst as usize)) .unwrap_or(config_chunk_size) } diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index f1866379aee3c..650bd3ece8670 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -182,7 +182,7 @@ impl FsFetchExecutor { source_desc.metrics.clone(), SourceCtrlOpts { chunk_size: limited_chunk_size(self.rate_limit_rps), - rate_limit: self.rate_limit_rps, + split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn }, source_desc.source.config.clone(), None, diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index b6c8e888f26a6..a1c2fe503c7a3 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -105,7 +105,7 @@ impl FsSourceExecutor { source_desc.metrics.clone(), SourceCtrlOpts { chunk_size: limited_chunk_size(self.rate_limit_rps), - rate_limit: self.rate_limit_rps, + split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn }, source_desc.source.config.clone(), None, diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 4613b794055f2..1cb1dd95f38ad 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -297,7 +297,7 @@ impl SourceBackfillExecutorInner { source_desc.metrics.clone(), SourceCtrlOpts { chunk_size: limited_chunk_size(self.rate_limit_rps), - rate_limit: self.rate_limit_rps, + split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn }, source_desc.source.config.clone(), None, diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 96dd4bd3f5818..545f281279f23 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -203,7 +203,7 @@ impl SourceExecutor { source_desc.metrics.clone(), SourceCtrlOpts { chunk_size: limited_chunk_size(self.rate_limit_rps), - rate_limit: self.rate_limit_rps, + split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn }, source_desc.source.config.clone(), schema_change_tx, From a6cc2a5df6adeea3813c655eb01d7048f1f9ffbd Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 6 Dec 2024 15:38:01 +0800 Subject: [PATCH 02/15] minor Signed-off-by: Richard Chien --- src/connector/src/parser/mod.rs | 30 +++++++++++++++++------------- src/connector/src/source/base.rs | 2 +- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index d2aa5502ae53b..a247678a15748 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -49,7 +49,7 @@ use crate::schema::schema_registry::SchemaRegistryAuth; use crate::source::monitor::GLOBAL_SOURCE_METRICS; use crate::source::{ BoxSourceStream, ChunkSourceStream, SourceColumnDesc, SourceColumnType, SourceContext, - SourceContextRef, SourceMessage, SourceMeta, + SourceContextRef, SourceCtrlOpts, SourceMessage, SourceMeta, }; mod access_builder; @@ -221,29 +221,32 @@ async fn ensure_max_chunk_size(stream: BoxSourceStream, max_chunk_size: usize) { #[easy_ext::ext(SourceParserIntoStreamExt)] impl P { - /// Parse a data stream of one source split into a stream of [`StreamChunk`]. + /// Parse a stream of vectors of [`SourceMessage`] into a stream of [`StreamChunk`]. /// /// # Arguments - /// - `data_stream`: A data stream of one source split. - /// To be able to split multiple messages from mq, so it is not a pure byte stream + /// + /// - `msg_stream`: A stream of vectors of [`SourceMessage`]. /// /// # Returns /// - /// A [`ChunkSourceStream`] which is a stream of parsed messages. - pub fn into_stream(self, data_stream: BoxSourceStream) -> impl ChunkSourceStream { + /// A [`ChunkSourceStream`] which is a stream of parsed chunks. Each of the parsed chunks + /// are guaranteed to have less than or equal to `source_ctrl_opts.chunk_size` rows, unless + /// there's a large transaction and `source_ctrl_opts.split_txn` is false. + pub fn into_stream(self, msg_stream: BoxSourceStream) -> impl ChunkSourceStream { let actor_id = self.source_ctx().actor_id; let source_id = self.source_ctx().source_id.table_id(); // TODO(): remove this later // Ensure chunk size is smaller than rate limit - let data_stream = Box::pin(ensure_max_chunk_size( - data_stream, + let msg_stream = Box::pin(ensure_max_chunk_size( + msg_stream, self.source_ctx().source_ctrl_opts.chunk_size, )); - // The parser stream will be long-lived. We use `instrument_with` here to create + // The stream will be long-lived. We use `instrument_with` here to create // a new span for the polling of each chunk. - into_chunk_stream_inner(self, data_stream) + let source_ctrl_opts = self.source_ctx().source_ctrl_opts; + into_chunk_stream_inner(self, msg_stream, source_ctrl_opts) .instrument_with(move || tracing::info_span!("source_parse_chunk", actor_id, source_id)) } } @@ -257,7 +260,8 @@ const MAX_TRANSACTION_SIZE: usize = 4096; #[try_stream(ok = StreamChunk, error = crate::error::ConnectorError)] async fn into_chunk_stream_inner( mut parser: P, - data_stream: BoxSourceStream, + msg_stream: BoxSourceStream, + source_ctrl_ops: SourceCtrlOpts, ) { let columns = parser.columns().to_vec(); @@ -271,7 +275,7 @@ async fn into_chunk_stream_inner( let mut direct_cdc_event_lag_latency_metrics = HashMap::new(); #[for_await] - for batch in data_stream { + for batch in msg_stream { // It's possible that the split is not active, which means the next batch may arrive // very lately, so we should prefer emitting all records in current batch before the end // of each iteration, instead of merging them with the next batch. An exception is when @@ -490,7 +494,7 @@ pub enum ByteStreamSourceParserImpl { } impl ByteStreamSourceParserImpl { - /// Converts [`SourceMessage`] stream into [`StreamChunk`] stream. + /// Converts [`SourceMessage`] vec stream into [`StreamChunk`] stream. pub fn into_stream(self, msg_stream: BoxSourceStream) -> impl ChunkSourceStream + Unpin { #[auto_enum(futures03::Stream)] let stream = match self { diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 26bca3ba1c0e1..4fd5d5d424702 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -142,7 +142,7 @@ pub type SourceEnumeratorContextRef = Arc; /// The max size of a chunk yielded by source stream. pub const MAX_CHUNK_SIZE: usize = 1024; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub struct SourceCtrlOpts { /// The max size of a chunk yielded by source stream. pub chunk_size: usize, From 7a7f58c5bc6905882ac65d7d0584143cd0e556df Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 11 Dec 2024 17:05:26 +0800 Subject: [PATCH 03/15] store reference of SourceStreamChunkBuilder in SourceStreamChunkRowWriter Signed-off-by: Richard Chien --- src/connector/src/parser/chunk_builder.rs | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/src/connector/src/parser/chunk_builder.rs b/src/connector/src/parser/chunk_builder.rs index 574a1a5bacacb..01e06632f6168 100644 --- a/src/connector/src/parser/chunk_builder.rs +++ b/src/connector/src/parser/chunk_builder.rs @@ -55,10 +55,7 @@ impl SourceStreamChunkBuilder { pub fn row_writer(&mut self) -> SourceStreamChunkRowWriter<'_> { SourceStreamChunkRowWriter { - descs: &self.descs, - builders: &mut self.builders, - op_builder: &mut self.op_builder, - vis_builder: &mut self.vis_builder, + builder: self, visible: true, // write visible rows by default row_meta: None, } @@ -104,10 +101,7 @@ impl SourceStreamChunkBuilder { /// - errors for non-primary key columns will be ignored and filled with default value instead; /// - other errors will be propagated. pub struct SourceStreamChunkRowWriter<'a> { - descs: &'a [SourceColumnDesc], - builders: &'a mut [ArrayBuilderImpl], - op_builder: &'a mut Vec, - vis_builder: &'a mut BitmapBuilder, + builder: &'a mut SourceStreamChunkBuilder, /// Whether the rows written by this writer should be visible in output `StreamChunk`. visible: bool, @@ -140,8 +134,8 @@ impl<'a> SourceStreamChunkRowWriter<'a> { impl SourceStreamChunkRowWriter<'_> { fn append_op(&mut self, op: Op) { - self.op_builder.push(op); - self.vis_builder.append(self.visible); + self.builder.op_builder.push(op); + self.builder.vis_builder.append(self.visible); } fn do_action<'a, A: OpAction>( @@ -296,8 +290,8 @@ impl SourceStreamChunkRowWriter<'_> { // Columns that changes have been applied to. Used to rollback when an error occurs. let mut applied_columns = 0; - let result = (self.descs.iter()) - .zip_eq_fast(self.builders.iter_mut()) + let result = (self.builder.descs.iter()) + .zip_eq_fast(self.builder.builders.iter_mut()) .try_for_each(|(desc, builder)| { wrapped_f(desc).map(|output| { A::apply(builder, output); @@ -312,7 +306,7 @@ impl SourceStreamChunkRowWriter<'_> { } Err(e) => { for i in 0..applied_columns { - A::rollback(&mut self.builders[i]); + A::rollback(&mut self.builder.builders[i]); } Err(e) } From 0cbf5525a8f6ffffdd3f1f43684713d0ac35d0d5 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 11 Dec 2024 18:02:08 +0800 Subject: [PATCH 04/15] add `RecordType::ops` method to get `Op`s corresponding to the record type Signed-off-by: Richard Chien --- src/common/src/array/stream_chunk.rs | 2 -- src/common/src/array/stream_record.rs | 11 +++++++++++ src/stream/src/executor/row_id_gen.rs | 3 +-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/common/src/array/stream_chunk.rs b/src/common/src/array/stream_chunk.rs index b004329a084c1..307283726c340 100644 --- a/src/common/src/array/stream_chunk.rs +++ b/src/common/src/array/stream_chunk.rs @@ -101,8 +101,6 @@ impl Op { } } -pub type Ops<'a> = &'a [Op]; - /// `StreamChunk` is used to pass data over the streaming pathway. #[derive(Clone, PartialEq)] pub struct StreamChunk { diff --git a/src/common/src/array/stream_record.rs b/src/common/src/array/stream_record.rs index cfd474017d933..7682a62d3f33a 100644 --- a/src/common/src/array/stream_record.rs +++ b/src/common/src/array/stream_record.rs @@ -27,6 +27,17 @@ pub enum RecordType { Update, } +impl RecordType { + /// Get the corresponding `Op`s for this record type. + pub fn ops(self) -> &'static [Op] { + match self { + RecordType::Insert => &[Op::Insert], + RecordType::Delete => &[Op::Delete], + RecordType::Update => &[Op::UpdateDelete, Op::UpdateInsert], + } + } +} + /// Generic type to represent a row change. #[derive(Debug, Clone, Copy)] pub enum Record { diff --git a/src/stream/src/executor/row_id_gen.rs b/src/stream/src/executor/row_id_gen.rs index 216d62432191b..5b59181af8dbf 100644 --- a/src/stream/src/executor/row_id_gen.rs +++ b/src/stream/src/executor/row_id_gen.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::array::stream_chunk::Ops; use risingwave_common::array::{Array, ArrayBuilder, ArrayRef, Op, SerialArrayBuilder}; use risingwave_common::bitmap::Bitmap; use risingwave_common::hash::VnodeBitmapExt; @@ -57,7 +56,7 @@ impl RowIdGenExecutor { fn gen_row_id_column_by_op( &mut self, column: &ArrayRef, - ops: Ops<'_>, + ops: &'_ [Op], vis: &Bitmap, ) -> ArrayRef { let len = column.len(); From 44603ed3f773e4c60017e555c0b6f2eb8f528fee Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 11 Dec 2024 18:02:33 +0800 Subject: [PATCH 05/15] simplify SourceStreamChunkRowWriter Signed-off-by: Richard Chien --- src/connector/src/parser/chunk_builder.rs | 60 ++++++++++------------- 1 file changed, 25 insertions(+), 35 deletions(-) diff --git a/src/connector/src/parser/chunk_builder.rs b/src/connector/src/parser/chunk_builder.rs index 01e06632f6168..ce29f4166c0f5 100644 --- a/src/connector/src/parser/chunk_builder.rs +++ b/src/connector/src/parser/chunk_builder.rs @@ -14,6 +14,7 @@ use std::sync::LazyLock; +use risingwave_common::array::stream_record::RecordType; use risingwave_common::array::{ArrayBuilderImpl, Op, StreamChunk}; use risingwave_common::bitmap::BitmapBuilder; use risingwave_common::log::LogSuppresser; @@ -133,12 +134,7 @@ impl<'a> SourceStreamChunkRowWriter<'a> { } impl SourceStreamChunkRowWriter<'_> { - fn append_op(&mut self, op: Op) { - self.builder.op_builder.push(op); - self.builder.vis_builder.append(self.visible); - } - - fn do_action<'a, A: OpAction>( + fn do_action<'a, A: RowWriterAction>( &'a mut self, mut f: impl FnMut(&SourceColumnDesc) -> AccessResult>, ) -> AccessResult<()> { @@ -301,7 +297,12 @@ impl SourceStreamChunkRowWriter<'_> { match result { Ok(_) => { - A::finish(self); + // commit the action by appending `Op`s and visibility + for op in A::RECORD_TYPE.ops() { + self.builder.op_builder.push(*op); + self.builder.vis_builder.append(self.visible); + } + Ok(()) } Err(e) => { @@ -325,7 +326,7 @@ impl SourceStreamChunkRowWriter<'_> { where D: Into>, { - self.do_action::(|desc| f(desc).map(Into::into)) + self.do_action::(|desc| f(desc).map(Into::into)) } /// Write a `Delete` record to the [`StreamChunk`], with the given fallible closure that @@ -340,7 +341,7 @@ impl SourceStreamChunkRowWriter<'_> { where D: Into>, { - self.do_action::(|desc| f(desc).map(Into::into)) + self.do_action::(|desc| f(desc).map(Into::into)) } /// Write a `Update` record to the [`StreamChunk`], with the given fallible closure that @@ -356,27 +357,28 @@ impl SourceStreamChunkRowWriter<'_> { D1: Into>, D2: Into>, { - self.do_action::(|desc| f(desc).map(|(old, new)| (old.into(), new.into()))) + self.do_action::(|desc| f(desc).map(|(old, new)| (old.into(), new.into()))) } } -trait OpAction { +trait RowWriterAction { type Output<'a>; + const RECORD_TYPE: RecordType; fn output_for<'a>(datum: impl Into>) -> Self::Output<'a>; fn apply(builder: &mut ArrayBuilderImpl, output: Self::Output<'_>); fn rollback(builder: &mut ArrayBuilderImpl); - - fn finish(writer: &mut SourceStreamChunkRowWriter<'_>); } -struct OpActionInsert; +struct InsertAction; -impl OpAction for OpActionInsert { +impl RowWriterAction for InsertAction { type Output<'a> = DatumCow<'a>; + const RECORD_TYPE: RecordType = RecordType::Insert; + #[inline(always)] fn output_for<'a>(datum: impl Into>) -> Self::Output<'a> { datum.into() @@ -391,18 +393,15 @@ impl OpAction for OpActionInsert { fn rollback(builder: &mut ArrayBuilderImpl) { builder.pop().unwrap() } - - #[inline(always)] - fn finish(writer: &mut SourceStreamChunkRowWriter<'_>) { - writer.append_op(Op::Insert); - } } -struct OpActionDelete; +struct DeleteAction; -impl OpAction for OpActionDelete { +impl RowWriterAction for DeleteAction { type Output<'a> = DatumCow<'a>; + const RECORD_TYPE: RecordType = RecordType::Delete; + #[inline(always)] fn output_for<'a>(datum: impl Into>) -> Self::Output<'a> { datum.into() @@ -417,18 +416,15 @@ impl OpAction for OpActionDelete { fn rollback(builder: &mut ArrayBuilderImpl) { builder.pop().unwrap() } - - #[inline(always)] - fn finish(writer: &mut SourceStreamChunkRowWriter<'_>) { - writer.append_op(Op::Delete); - } } -struct OpActionUpdate; +struct UpdateAction; -impl OpAction for OpActionUpdate { +impl RowWriterAction for UpdateAction { type Output<'a> = (DatumCow<'a>, DatumCow<'a>); + const RECORD_TYPE: RecordType = RecordType::Update; + #[inline(always)] fn output_for<'a>(datum: impl Into>) -> Self::Output<'a> { let datum = datum.into(); @@ -446,10 +442,4 @@ impl OpAction for OpActionUpdate { builder.pop().unwrap(); builder.pop().unwrap(); } - - #[inline(always)] - fn finish(writer: &mut SourceStreamChunkRowWriter<'_>) { - writer.append_op(Op::UpdateDelete); - writer.append_op(Op::UpdateInsert); - } } From 0ab470c9fc8a8005d7d2152ed74889fb367cd94d Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Mon, 16 Dec 2024 16:56:26 +0800 Subject: [PATCH 06/15] rewrite `SourceStreamChunkBuilder` Signed-off-by: Richard Chien --- Cargo.lock | 1 + src/connector/Cargo.toml | 1 + src/connector/src/parser/chunk_builder.rs | 191 +++++++++++++++++++++- src/connector/src/parser/mod.rs | 123 +++++--------- 4 files changed, 223 insertions(+), 93 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e38b9366b81ac..72ea825abef40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11138,6 +11138,7 @@ dependencies = [ "serde_with 3.8.1", "serde_yaml", "simd-json", + "smallvec", "sqlx", "strum 0.26.3", "strum_macros 0.26.4", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 8e3c58bbc2d89..5341d4bb40b77 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -138,6 +138,7 @@ serde_derive = "1" serde_json = "1" serde_with = { version = "3", features = ["json"] } simd-json = { version = "0.14.2", features = ["hints"] } +smallvec = "1" sqlx = { workspace = true } strum = "0.26" strum_macros = "0.26" diff --git a/src/connector/src/parser/chunk_builder.rs b/src/connector/src/parser/chunk_builder.rs index ce29f4166c0f5..361028266844c 100644 --- a/src/connector/src/parser/chunk_builder.rs +++ b/src/connector/src/parser/chunk_builder.rs @@ -22,6 +22,7 @@ use risingwave_common::types::{Datum, DatumCow, ScalarRefImpl}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_connector_codec::decoder::{AccessError, AccessResult}; use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; +use smallvec::SmallVec; use thiserror_ext::AsReport; use super::MessageMeta; @@ -29,31 +30,121 @@ use crate::parser::utils::{ extract_cdc_meta_column, extract_header_inner_from_meta, extract_headers_from_meta, extract_subject_from_meta, extract_timestamp_from_meta, }; -use crate::source::{SourceColumnDesc, SourceColumnType, SourceMeta}; +use crate::source::{SourceColumnDesc, SourceColumnType, SourceCtrlOpts, SourceMeta}; + +/// Maximum number of rows in a transaction. If a transaction is larger than this, it will be force +/// committed to avoid potential OOM. +const MAX_TRANSACTION_SIZE: usize = 4096; + +/// Represents an ongoing transaction. +struct Transaction { + id: Box, + len: usize, +} /// A builder for building a [`StreamChunk`] from [`SourceColumnDesc`]. pub struct SourceStreamChunkBuilder { - descs: Vec, + column_descs: Vec, + source_ctrl_ops: SourceCtrlOpts, builders: Vec, op_builder: Vec, vis_builder: BitmapBuilder, + ongoing_txn: Option, + ready_chunks: SmallVec<[StreamChunk; 1]>, } impl SourceStreamChunkBuilder { - pub fn with_capacity(descs: Vec, cap: usize) -> Self { - let builders = descs + // TODO(): remove + pub fn with_capacity(column_descs: Vec, cap: usize) -> Self { + let builders = column_descs .iter() .map(|desc| desc.data_type.create_array_builder(cap)) .collect(); Self { - descs, + column_descs, + source_ctrl_ops: SourceCtrlOpts { + chunk_size: 256, + split_txn: false, + }, builders, op_builder: Vec::with_capacity(cap), vis_builder: BitmapBuilder::with_capacity(cap), + ongoing_txn: None, + ready_chunks: SmallVec::new(), + } + } + + pub fn new(column_descs: Vec, source_ctrl_ops: SourceCtrlOpts) -> Self { + let (builders, op_builder, vis_builder) = + Self::create_builders(&column_descs, source_ctrl_ops.chunk_size); + + Self { + column_descs, + source_ctrl_ops, + builders, + op_builder, + vis_builder, + ongoing_txn: None, + ready_chunks: SmallVec::new(), + } + } + + fn create_builders( + column_descs: &[SourceColumnDesc], + chunk_size: usize, + ) -> (Vec, Vec, BitmapBuilder) { + let reserved_capacity = chunk_size + 1; // it's possible to have an additional `U-` at the end + let builders = column_descs + .iter() + .map(|desc| desc.data_type.create_array_builder(reserved_capacity)) + .collect(); + let op_builder = Vec::with_capacity(reserved_capacity); + let vis_builder = BitmapBuilder::with_capacity(reserved_capacity); + (builders, op_builder, vis_builder) + } + + /// Begin a (CDC) transaction with the given `txn_id`. + pub fn begin_transaction(&mut self, txn_id: Box) { + if let Some(ref txn) = self.ongoing_txn { + tracing::warn!( + ongoing_txn_id = txn.id, + new_txn_id = txn_id, + "already in a transaction" + ); + } + tracing::debug!(txn_id, "begin upstream transaction"); + self.ongoing_txn = Some(Transaction { id: txn_id, len: 0 }); + } + + /// Commit the ongoing transaction with the given `txn_id`. + pub fn commit_transaction(&mut self, txn_id: Box) { + if let Some(txn) = self.ongoing_txn.take() { + if txn.id != txn_id { + tracing::warn!( + expected_txn_id = txn.id, + actual_txn_id = txn_id, + "unexpected transaction id" + ); + } + tracing::debug!(txn_id, "commit upstream transaction"); + + if self.current_chunk_len() >= self.source_ctrl_ops.chunk_size { + // if `split_txn` is on, we should've finished the chunk already + assert!(!self.source_ctrl_ops.split_txn); + self.finish_current_chunk(); + } + } else { + tracing::warn!(txn_id, "no ongoing transaction to commit"); } } + /// Check if the builder is in an ongoing transaction. + pub fn is_in_transaction(&self) -> bool { + self.ongoing_txn.is_some() + } + + /// Get a row writer for parser to write records to the builder. pub fn row_writer(&mut self) -> SourceStreamChunkRowWriter<'_> { SourceStreamChunkRowWriter { builder: self, @@ -62,6 +153,60 @@ impl SourceStreamChunkBuilder { } } + /// Write a heartbeat record to the builder. The builder will decide whether to finish the + /// current chunk or not. Currently it ensures that heartbeats are always in separate chunks. + pub fn heartbeat(&mut self, meta: MessageMeta<'_>) { + if self.current_chunk_len() > 0 { + // If there are records in the chunk, finish it first. + // If there's an ongoing transaction, `finish_current_chunk` will handle it properly. + // Note this + self.finish_current_chunk(); + } + + _ = self + .row_writer() + .invisible() + .with_meta(meta) + .do_insert(|_| Ok(Datum::None)); + self.finish_current_chunk(); // each heartbeat should be a separate chunk + } + + /// Finish and build a [`StreamChunk`] from the current pending records in the builder, + /// no matter whether the builder is in a transaction or not, `split_txn` or not. The + /// built chunk will be appended to the `ready_chunks` and the builder will be reset. + pub fn finish_current_chunk(&mut self) { + if self.op_builder.is_empty() { + return; + } + + let (builders, op_builder, vis_builder) = + Self::create_builders(&self.column_descs, self.source_ctrl_ops.chunk_size); + let chunk = StreamChunk::with_visibility( + std::mem::replace(&mut self.op_builder, op_builder), + std::mem::replace(&mut self.builders, builders) + .into_iter() + .map(|builder| builder.finish().into()) + .collect(), + std::mem::replace(&mut self.vis_builder, vis_builder).finish(), + ); + self.ready_chunks.push(chunk); + + if let Some(ref mut txn) = self.ongoing_txn { + tracing::warn!( + txn_id = txn.id, + len = txn.len, + "splitting an ongoing transaction" + ); + txn.len = 0; + } + } + + /// Consumes and returns the ready [`StreamChunk`]s. + pub fn consume_ready_chunks(&mut self) -> impl Iterator + '_ { + self.ready_chunks.drain(..) + } + + // TODO(): remove /// Consumes the builder and returns a [`StreamChunk`]. pub fn finish(self) -> StreamChunk { StreamChunk::with_visibility( @@ -74,22 +219,51 @@ impl SourceStreamChunkBuilder { ) } + // TODO(): remove /// Resets the builder and returns a [`StreamChunk`], while reserving `next_cap` capacity for /// the builders of the next [`StreamChunk`]. #[must_use] pub fn take_and_reserve(&mut self, next_cap: usize) -> StreamChunk { - let descs = std::mem::take(&mut self.descs); // we don't use `descs` in `finish` + let descs = std::mem::take(&mut self.column_descs); // we don't use `descs` in `finish` let builder = std::mem::replace(self, Self::with_capacity(descs, next_cap)); builder.finish() } + // TODO(): remove pub fn len(&self) -> usize { self.op_builder.len() } + fn current_chunk_len(&self) -> usize { + self.op_builder.len() + } + + // TODO(): remove pub fn is_empty(&self) -> bool { self.op_builder.is_empty() } + + /// Commit a newly-written record by appending `op` and `vis` to the corresponding builders. + /// This is supposed to be called via the `row_writer` only. + fn commit_record(&mut self, op: Op, vis: bool) { + self.op_builder.push(op); + self.vis_builder.append(vis); + + let curr_chunk_size = self.current_chunk_len(); + let max_chunk_size = self.source_ctrl_ops.chunk_size; + + if let Some(ref mut txn) = self.ongoing_txn { + txn.len += 1; + + if txn.len >= MAX_TRANSACTION_SIZE + || (self.source_ctrl_ops.split_txn && curr_chunk_size >= max_chunk_size) + { + self.finish_current_chunk(); + } + } else if curr_chunk_size >= max_chunk_size { + self.finish_current_chunk(); + } + } } /// `SourceStreamChunkRowWriter` is responsible to write one or more records to the [`StreamChunk`], @@ -286,7 +460,7 @@ impl SourceStreamChunkRowWriter<'_> { // Columns that changes have been applied to. Used to rollback when an error occurs. let mut applied_columns = 0; - let result = (self.builder.descs.iter()) + let result = (self.builder.column_descs.iter()) .zip_eq_fast(self.builder.builders.iter_mut()) .try_for_each(|(desc, builder)| { wrapped_f(desc).map(|output| { @@ -299,8 +473,7 @@ impl SourceStreamChunkRowWriter<'_> { Ok(_) => { // commit the action by appending `Op`s and visibility for op in A::RECORD_TYPE.ops() { - self.builder.op_builder.push(*op); - self.builder.vis_builder.append(self.visible); + self.builder.commit_record(*op, self.visible); } Ok(()) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index a247678a15748..aca94d7d69a2c 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -31,7 +31,7 @@ use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{KAFKA_TIMESTAMP_COLUMN_NAME, TABLE_NAME_COLUMN_NAME}; use risingwave_common::log::LogSuppresser; use risingwave_common::metrics::GLOBAL_ERROR_METRICS; -use risingwave_common::types::{Datum, DatumCow, DatumRef}; +use risingwave_common::types::{DatumCow, DatumRef}; use risingwave_common::util::tracing::InstrumentStream; use risingwave_connector_codec::decoder::avro::MapHandling; use thiserror_ext::AsReport; @@ -198,10 +198,6 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { self.parse_one(key, payload, writer) .map_ok(|_| ParseResult::Rows) } - - fn append_empty_row<'a>(&'a mut self, mut writer: SourceStreamChunkRowWriter<'a>) { - _ = writer.do_insert(|_column| Ok(Datum::None)); - } } #[try_stream(ok = Vec, error = ConnectorError)] @@ -251,10 +247,6 @@ impl P { } } -/// Maximum number of rows in a transaction. If a transaction is larger than this, it will be force -/// committed to avoid potential OOM. -const MAX_TRANSACTION_SIZE: usize = 4096; - // TODO: when upsert is disabled, how to filter those empty payload // Currently, an err is returned for non upsert with empty payload #[try_stream(ok = StreamChunk, error = crate::error::ConnectorError)] @@ -263,15 +255,9 @@ async fn into_chunk_stream_inner( msg_stream: BoxSourceStream, source_ctrl_ops: SourceCtrlOpts, ) { - let columns = parser.columns().to_vec(); - - let mut chunk_builder = SourceStreamChunkBuilder::with_capacity(columns, 0); + let mut chunk_builder = + SourceStreamChunkBuilder::new(parser.columns().to_vec(), source_ctrl_ops); - struct Transaction { - id: Box, - len: usize, - } - let mut current_transaction = None; let mut direct_cdc_event_lag_latency_metrics = HashMap::new(); #[for_await] @@ -296,49 +282,30 @@ async fn into_chunk_stream_inner( // heartbeat message. Note that all messages in `batch` should belong to the same // split, so we don't have to do a split to heartbeats mapping here. - if let Some(Transaction { id, len }) = &mut current_transaction { - // if there's an ongoing transaction, something may be wrong - tracing::warn!( - id, - len, - "got a batch of empty messages during an ongoing transaction" - ); - // for the sake of simplicity, let's force emit the partial transaction chunk - if *len > 0 { - *len = 0; // reset `len` while keeping `id` - yield chunk_builder.take_and_reserve(1); // next chunk will only contain the heartbeat - } - } - - // According to the invariant we mentioned at the beginning of the `for batch` loop, - // there should be no data of previous batch in `chunk_builder`. - assert!(chunk_builder.is_empty()); - let heartbeat_msg = batch.last().unwrap(); tracing::debug!( offset = heartbeat_msg.offset, - "emitting a heartbeat message" + "handling a heartbeat message" ); - // TODO(rc): should be `chunk_builder.append_heartbeat` instead, which is simpler - parser.append_empty_row(chunk_builder.row_writer().invisible().with_meta( - MessageMeta { - meta: &heartbeat_msg.meta, - split_id: &heartbeat_msg.split_id, - offset: &heartbeat_msg.offset, - }, - )); - yield chunk_builder.take_and_reserve(batch_len); - - continue; + chunk_builder.heartbeat(MessageMeta { + meta: &heartbeat_msg.meta, + split_id: &heartbeat_msg.split_id, + offset: &heartbeat_msg.offset, + }); + + for chunk in chunk_builder.consume_ready_chunks() { + yield chunk; + } + continue; // continue to next batch } // When we reach here, there is at least one data message in the batch. We should ignore all // heartbeat messages. - let mut txn_started_in_last_batch = current_transaction.is_some(); + let mut txn_started_in_last_batch = chunk_builder.is_in_transaction(); let process_time_ms = chrono::Utc::now().timestamp_millis(); - for (i, msg) in batch.into_iter().enumerate() { + for msg in batch.into_iter() { if msg.is_cdc_heartbeat() { // ignore heartbeat messages continue; @@ -359,7 +326,9 @@ async fn into_chunk_stream_inner( direct_cdc_event_lag_latency.observe(lag_ms as f64); } - let old_len = chunk_builder.len(); + // Parse the message and write to the chunk builder, it's possible that the message + // contains multiple rows. When the chunk size reached the limit during parsing, the + // chunk builder may yield the chunk to `ready_chunks` and start a new chunk. match parser .parse_one_with_txn( msg.key, @@ -375,12 +344,6 @@ async fn into_chunk_stream_inner( // It's possible that parsing multiple rows in a single message PARTIALLY failed. // We still have to maintain the row number in this case. res @ (Ok(ParseResult::Rows) | Err(_)) => { - // Aggregate the number of new rows into the current transaction. - if let Some(Transaction { len, .. }) = &mut current_transaction { - let n_new_rows = chunk_builder.len() - old_len; - *len += n_new_rows; - } - if let Err(error) = res { // TODO: not using tracing span to provide `split_id` and `offset` due to performance concern, // see #13105 @@ -405,28 +368,30 @@ async fn into_chunk_stream_inner( context.fragment_id.to_string(), ]); } + + for chunk in chunk_builder.consume_ready_chunks() { + yield chunk; + } } Ok(ParseResult::TransactionControl(txn_ctl)) => match txn_ctl { TransactionControl::Begin { id } => { - if let Some(Transaction { id: current_id, .. }) = ¤t_transaction { - tracing::warn!(current_id, id, "already in transaction"); - } - tracing::debug!(id, "begin upstream transaction"); - current_transaction = Some(Transaction { id, len: 0 }); + chunk_builder.begin_transaction(id); } TransactionControl::Commit { id } => { - let current_id = current_transaction.as_ref().map(|t| &t.id); - if current_id != Some(&id) { - tracing::warn!(?current_id, id, "transaction id mismatch"); - } - tracing::debug!(id, "commit upstream transaction"); - current_transaction = None; + chunk_builder.commit_transaction(id); + assert!(!chunk_builder.is_in_transaction()); if txn_started_in_last_batch { - yield chunk_builder.take_and_reserve(batch_len - (i + 1)); + // If a transaction is across multiple batches, we yield the chunk + // immediately after the transaction is committed. + chunk_builder.finish_current_chunk(); txn_started_in_last_batch = false; } + + for chunk in chunk_builder.consume_ready_chunks() { + yield chunk; + } } }, @@ -454,22 +419,12 @@ async fn into_chunk_stream_inner( } } - if let Some(Transaction { len, id }) = &mut current_transaction { - // in transaction, check whether it's too large - if *len > MAX_TRANSACTION_SIZE { - // force commit - tracing::warn!( - id, - len, - "transaction is larger than {MAX_TRANSACTION_SIZE} rows, force commit" - ); - *len = 0; // reset `len` while keeping `id` - yield chunk_builder.take_and_reserve(batch_len); // use curr batch len as next capacity, just a hint - } - // TODO(rc): we will have better chunk size control later - } else if !chunk_builder.is_empty() { - // not in transaction, yield the chunk now - yield chunk_builder.take_and_reserve(batch_len); // use curr batch len as next capacity, just a hint + // Finish the remaining records in the batch. + if !chunk_builder.is_in_transaction() { + chunk_builder.finish_current_chunk(); + } + for chunk in chunk_builder.consume_ready_chunks() { + yield chunk; } } } From a4074e9a56b0e3bc755c9d9f8645aeb25088bc41 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 17 Dec 2024 14:29:41 +0800 Subject: [PATCH 07/15] remove old methods of SourceStreamChunkBuilder Signed-off-by: Richard Chien --- src/connector/benches/debezium_json_parser.rs | 6 +- src/connector/benches/json_vs_plain_parser.rs | 18 +++++- src/connector/src/parser/bytes_parser.rs | 8 ++- .../src/parser/canal/simd_json_parser.rs | 34 +++++++---- src/connector/src/parser/chunk_builder.rs | 56 +----------------- src/connector/src/parser/csv_parser.rs | 13 +++-- .../src/parser/debezium/avro_parser.rs | 18 +++--- .../src/parser/debezium/debezium_parser.rs | 14 +++-- .../src/parser/debezium/mongo_json_parser.rs | 23 +++++--- .../src/parser/debezium/simd_json_parser.rs | 24 ++++---- .../src/parser/maxwell/simd_json_parser.rs | 14 +++-- src/connector/src/parser/plain_parser.rs | 24 +++++--- src/connector/src/source/base.rs | 10 ++++ .../src/executor/backfill/cdc/cdc_backfill.rs | 57 +++++++++---------- 14 files changed, 162 insertions(+), 157 deletions(-) diff --git a/src/connector/benches/debezium_json_parser.rs b/src/connector/benches/debezium_json_parser.rs index 73e356012aebc..239a1788bfac1 100644 --- a/src/connector/benches/debezium_json_parser.rs +++ b/src/connector/benches/debezium_json_parser.rs @@ -22,6 +22,7 @@ use json_common::*; use paste::paste; use rand::Rng; use risingwave_connector::parser::{DebeziumParser, SourceStreamChunkBuilder}; +use risingwave_connector::source::SourceCtrlOpts; fn generate_debezium_json_row(rng: &mut impl Rng, change_event: &str) -> String { let source = r#"{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null}"#; @@ -57,7 +58,10 @@ macro_rules! create_debezium_bench_helpers { || (block_on(DebeziumParser::new_for_test(get_descs())).unwrap(), records.clone()) , | (mut parser, records) | async move { let mut builder = - SourceStreamChunkBuilder::with_capacity(get_descs(), NUM_RECORDS); + SourceStreamChunkBuilder::new(get_descs(), SourceCtrlOpts { + chunk_size: NUM_RECORDS, + split_txn: false, + }); for record in records { let writer = builder.row_writer(); parser.parse_inner(None, record, writer).await.unwrap(); diff --git a/src/connector/benches/json_vs_plain_parser.rs b/src/connector/benches/json_vs_plain_parser.rs index b9a1139dcb03b..88bcb144a7280 100644 --- a/src/connector/benches/json_vs_plain_parser.rs +++ b/src/connector/benches/json_vs_plain_parser.rs @@ -23,7 +23,7 @@ use json_common::*; use old_json_parser::JsonParser; use risingwave_connector::parser::plain_parser::PlainParser; use risingwave_connector::parser::{SourceStreamChunkBuilder, SpecificParserConfig}; -use risingwave_connector::source::SourceContext; +use risingwave_connector::source::{SourceContext, SourceCtrlOpts}; // The original implementation used to parse JSON prior to #13707. mod old_json_parser { @@ -130,7 +130,13 @@ fn bench_plain_parser_and_json_parser(c: &mut Criterion) { (parser, records.clone()) }, |(mut parser, records)| async move { - let mut builder = SourceStreamChunkBuilder::with_capacity(get_descs(), NUM_RECORDS); + let mut builder = SourceStreamChunkBuilder::new( + get_descs(), + SourceCtrlOpts { + chunk_size: NUM_RECORDS, + split_txn: false, + }, + ); for record in records { let writer = builder.row_writer(); parser @@ -155,7 +161,13 @@ fn bench_plain_parser_and_json_parser(c: &mut Criterion) { (parser, records.clone()) }, |(parser, records)| async move { - let mut builder = SourceStreamChunkBuilder::with_capacity(get_descs(), NUM_RECORDS); + let mut builder = SourceStreamChunkBuilder::new( + get_descs(), + SourceCtrlOpts { + chunk_size: NUM_RECORDS, + split_txn: false, + }, + ); for record in records { let writer = builder.row_writer(); parser.parse_inner(record, writer).await.unwrap(); diff --git a/src/connector/src/parser/bytes_parser.rs b/src/connector/src/parser/bytes_parser.rs index eeccf17be7a27..37bc099085c71 100644 --- a/src/connector/src/parser/bytes_parser.rs +++ b/src/connector/src/parser/bytes_parser.rs @@ -45,6 +45,7 @@ impl BytesAccessBuilder { #[cfg(test)] mod tests { + use itertools::Itertools; use risingwave_common::array::Op; use risingwave_common::row::Row; use risingwave_common::types::{DataType, ScalarImpl, ToOwnedDatum}; @@ -54,7 +55,7 @@ mod tests { BytesProperties, EncodingProperties, ProtocolProperties, SourceColumnDesc, SourceStreamChunkBuilder, SpecificParserConfig, }; - use crate::source::SourceContext; + use crate::source::{SourceContext, SourceCtrlOpts}; fn get_payload() -> Vec> { vec![br#"t"#.to_vec(), br#"random"#.to_vec()] @@ -70,7 +71,7 @@ mod tests { .await .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2); + let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test()); for payload in get_payload() { let writer = builder.row_writer(); @@ -80,7 +81,8 @@ mod tests { .unwrap(); } - let chunk = builder.finish(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); let mut rows = chunk.rows(); { let (op, row) = rows.next().unwrap(); diff --git a/src/connector/src/parser/canal/simd_json_parser.rs b/src/connector/src/parser/canal/simd_json_parser.rs index 780b30654f22b..9f299b45c61e8 100644 --- a/src/connector/src/parser/canal/simd_json_parser.rs +++ b/src/connector/src/parser/canal/simd_json_parser.rs @@ -141,6 +141,7 @@ mod tests { use super::*; use crate::parser::SourceStreamChunkBuilder; + use crate::source::SourceCtrlOpts; #[tokio::test] async fn test_data_types() { @@ -162,12 +163,15 @@ mod tests { ) .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 1); + let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test()); - let writer = builder.row_writer(); - parser.parse_inner(payload.to_vec(), writer).await.unwrap(); + parser + .parse_inner(payload.to_vec(), builder.row_writer()) + .await + .unwrap(); - let chunk = builder.finish(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); let (op, row) = chunk.rows().next().unwrap(); assert_eq!(op, Op::Insert); assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(1))); @@ -233,12 +237,15 @@ mod tests { ) .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2); + let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test()); - let writer = builder.row_writer(); - parser.parse_inner(payload.to_vec(), writer).await.unwrap(); + parser + .parse_inner(payload.to_vec(), builder.row_writer()) + .await + .unwrap(); - let chunk = builder.finish(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); let mut rows = chunk.rows(); @@ -287,12 +294,15 @@ mod tests { ) .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2); + let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test()); - let writer = builder.row_writer(); - parser.parse_inner(payload.to_vec(), writer).await.unwrap(); + parser + .parse_inner(payload.to_vec(), builder.row_writer()) + .await + .unwrap(); - let chunk = builder.finish(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); let mut rows = chunk.rows(); diff --git a/src/connector/src/parser/chunk_builder.rs b/src/connector/src/parser/chunk_builder.rs index 361028266844c..cc5bd10e7c483 100644 --- a/src/connector/src/parser/chunk_builder.rs +++ b/src/connector/src/parser/chunk_builder.rs @@ -54,27 +54,6 @@ pub struct SourceStreamChunkBuilder { } impl SourceStreamChunkBuilder { - // TODO(): remove - pub fn with_capacity(column_descs: Vec, cap: usize) -> Self { - let builders = column_descs - .iter() - .map(|desc| desc.data_type.create_array_builder(cap)) - .collect(); - - Self { - column_descs, - source_ctrl_ops: SourceCtrlOpts { - chunk_size: 256, - split_txn: false, - }, - builders, - op_builder: Vec::with_capacity(cap), - vis_builder: BitmapBuilder::with_capacity(cap), - ongoing_txn: None, - ready_chunks: SmallVec::new(), - } - } - pub fn new(column_descs: Vec, source_ctrl_ops: SourceCtrlOpts) -> Self { let (builders, op_builder, vis_builder) = Self::create_builders(&column_descs, source_ctrl_ops.chunk_size); @@ -202,47 +181,14 @@ impl SourceStreamChunkBuilder { } /// Consumes and returns the ready [`StreamChunk`]s. - pub fn consume_ready_chunks(&mut self) -> impl Iterator + '_ { + pub fn consume_ready_chunks(&mut self) -> impl ExactSizeIterator + '_ { self.ready_chunks.drain(..) } - // TODO(): remove - /// Consumes the builder and returns a [`StreamChunk`]. - pub fn finish(self) -> StreamChunk { - StreamChunk::with_visibility( - self.op_builder, - self.builders - .into_iter() - .map(|builder| builder.finish().into()) - .collect(), - self.vis_builder.finish(), - ) - } - - // TODO(): remove - /// Resets the builder and returns a [`StreamChunk`], while reserving `next_cap` capacity for - /// the builders of the next [`StreamChunk`]. - #[must_use] - pub fn take_and_reserve(&mut self, next_cap: usize) -> StreamChunk { - let descs = std::mem::take(&mut self.column_descs); // we don't use `descs` in `finish` - let builder = std::mem::replace(self, Self::with_capacity(descs, next_cap)); - builder.finish() - } - - // TODO(): remove - pub fn len(&self) -> usize { - self.op_builder.len() - } - fn current_chunk_len(&self) -> usize { self.op_builder.len() } - // TODO(): remove - pub fn is_empty(&self) -> bool { - self.op_builder.is_empty() - } - /// Commit a newly-written record by appending `op` and `vis` to the corresponding builders. /// This is supposed to be called via the `row_writer` only. fn commit_record(&mut self, op: Op, vis: bool) { diff --git a/src/connector/src/parser/csv_parser.rs b/src/connector/src/parser/csv_parser.rs index 1024f22770a6c..d94f924da7a8f 100644 --- a/src/connector/src/parser/csv_parser.rs +++ b/src/connector/src/parser/csv_parser.rs @@ -175,12 +175,15 @@ impl ByteStreamSourceParser for CsvParser { #[cfg(test)] mod tests { + use itertools::Itertools; use risingwave_common::array::Op; use risingwave_common::row::Row; use risingwave_common::types::{DataType, ToOwnedDatum}; use super::*; use crate::parser::SourceStreamChunkBuilder; + use crate::source::SourceCtrlOpts; + #[tokio::test] async fn test_csv_without_headers() { let data = vec![ @@ -204,14 +207,15 @@ mod tests { SourceContext::dummy().into(), ) .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4); + let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test()); for item in data { parser .parse_inner(item.as_bytes().to_vec(), builder.row_writer()) .await .unwrap(); } - let chunk = builder.finish(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); let mut rows = chunk.rows(); { let (op, row) = rows.next().unwrap(); @@ -311,13 +315,14 @@ mod tests { SourceContext::dummy().into(), ) .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4); + let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test()); for item in data { let _ = parser .parse_inner(item.as_bytes().to_vec(), builder.row_writer()) .await; } - let chunk = builder.finish(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); let mut rows = chunk.rows(); { let (op, row) = rows.next().unwrap(); diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index 4058f1d331c42..d5ccccb7977d3 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -207,7 +207,7 @@ mod tests { use super::*; use crate::parser::{DebeziumParser, SourceStreamChunkBuilder, SpecificParserConfig}; - use crate::source::{SourceColumnDesc, SourceContext}; + use crate::source::{SourceColumnDesc, SourceContext, SourceCtrlOpts}; use crate::WithOptionsSecResolved; const DEBEZIUM_AVRO_DATA: &[u8] = b"\x00\x00\x00\x00\x06\x00\x02\xd2\x0f\x0a\x53\x61\x6c\x6c\x79\x0c\x54\x68\x6f\x6d\x61\x73\x2a\x73\x61\x6c\x6c\x79\x2e\x74\x68\x6f\x6d\x61\x73\x40\x61\x63\x6d\x65\x2e\x63\x6f\x6d\x16\x32\x2e\x31\x2e\x32\x2e\x46\x69\x6e\x61\x6c\x0a\x6d\x79\x73\x71\x6c\x12\x64\x62\x73\x65\x72\x76\x65\x72\x31\xc0\xb4\xe8\xb7\xc9\x61\x00\x30\x66\x69\x72\x73\x74\x5f\x69\x6e\x5f\x64\x61\x74\x61\x5f\x63\x6f\x6c\x6c\x65\x63\x74\x69\x6f\x6e\x12\x69\x6e\x76\x65\x6e\x74\x6f\x72\x79\x00\x02\x12\x63\x75\x73\x74\x6f\x6d\x65\x72\x73\x00\x00\x20\x6d\x79\x73\x71\x6c\x2d\x62\x69\x6e\x2e\x30\x30\x30\x30\x30\x33\x8c\x06\x00\x00\x00\x02\x72\x02\x92\xc3\xe8\xb7\xc9\x61\x00"; @@ -225,15 +225,13 @@ mod tests { columns: Vec, payload: Vec, ) -> Vec<(Op, OwnedRow)> { - let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2); - { - let writer = builder.row_writer(); - parser - .parse_inner(None, Some(payload), writer) - .await - .unwrap(); - } - let chunk = builder.finish(); + let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test()); + parser + .parse_inner(None, Some(payload), builder.row_writer()) + .await + .unwrap(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); chunk .rows() .map(|(op, row_ref)| (op, row_ref.into_owned_row())) diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index fe917de7d3696..cc228f8acd9f9 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -208,6 +208,7 @@ mod tests { use std::ops::Deref; use std::sync::Arc; + use itertools::Itertools; use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId, CDC_SOURCE_COLUMN_NUM}; use risingwave_common::row::Row; use risingwave_common::types::Timestamptz; @@ -217,7 +218,7 @@ mod tests { use super::*; use crate::parser::{JsonProperties, SourceStreamChunkBuilder, TransactionControl}; - use crate::source::{ConnectorProperties, DataType}; + use crate::source::{ConnectorProperties, DataType, SourceCtrlOpts}; #[tokio::test] async fn test_parse_transaction_metadata() { @@ -249,7 +250,7 @@ mod tests { let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx)) .await .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); + let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test()); // "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#; @@ -258,7 +259,7 @@ mod tests { .parse_one_with_txn( None, Some(begin_msg.as_bytes().to_vec()), - builder.row_writer(), + dummy_builder.row_writer(), ) .await; match res { @@ -271,7 +272,7 @@ mod tests { .parse_one_with_txn( None, Some(commit_msg.as_bytes().to_vec()), - builder.row_writer(), + dummy_builder.row_writer(), ) .await; match res { @@ -321,7 +322,7 @@ mod tests { let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx)) .await .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); + let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test()); let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "c", "ts_ms": 1695277757017, "transaction": null } }"#; @@ -334,7 +335,8 @@ mod tests { .await; match res { Ok(ParseResult::Rows) => { - let chunk = builder.finish(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); for (_, row) in chunk.rows() { let commit_ts = row.datum_at(5).unwrap().into_timestamptz(); assert_eq!(commit_ts, Timestamptz::from_millis(1695277757000).unwrap()); diff --git a/src/connector/src/parser/debezium/mongo_json_parser.rs b/src/connector/src/parser/debezium/mongo_json_parser.rs index 4e3a679c31816..53a81de389923 100644 --- a/src/connector/src/parser/debezium/mongo_json_parser.rs +++ b/src/connector/src/parser/debezium/mongo_json_parser.rs @@ -136,6 +136,7 @@ impl ByteStreamSourceParser for DebeziumMongoJsonParser { #[cfg(test)] mod tests { + use itertools::Itertools; use risingwave_common::array::Op; use risingwave_common::catalog::ColumnId; use risingwave_common::row::Row; @@ -144,6 +145,7 @@ mod tests { use super::*; use crate::parser::unified::debezium::extract_bson_id; use crate::parser::SourceStreamChunkBuilder; + use crate::source::SourceCtrlOpts; #[test] fn test_parse_bson_value_id_int() { let data = r#"{"_id":{"$numberInt":"2345"}}"#; @@ -183,13 +185,14 @@ mod tests { ]; let mut parser = DebeziumMongoJsonParser::new(columns.clone(), SourceContext::dummy().into()).unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 3); - let writer = builder.row_writer(); + let mut builder = + SourceStreamChunkBuilder::new(columns.clone(), SourceCtrlOpts::for_test()); parser - .parse_inner(Some(key), Some(payload), writer) + .parse_inner(Some(key), Some(payload), builder.row_writer()) .await .unwrap(); - let chunk = builder.finish(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); let mut rows = chunk.rows(); let (op, row) = rows.next().unwrap(); @@ -221,11 +224,15 @@ mod tests { DebeziumMongoJsonParser::new(columns.clone(), SourceContext::dummy().into()) .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 3); + let mut builder = + SourceStreamChunkBuilder::new(columns.clone(), SourceCtrlOpts::for_test()); - let writer = builder.row_writer(); - parser.parse_inner(None, Some(data), writer).await.unwrap(); - let chunk = builder.finish(); + parser + .parse_inner(None, Some(data), builder.row_writer()) + .await + .unwrap(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); let mut rows = chunk.rows(); let (op, row) = rows.next().unwrap(); assert_eq!(op, Op::Insert); diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index b0ad675302813..a7a2b8020ff09 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -107,6 +107,7 @@ impl AccessBuilder for DebeziumMongoJsonAccessBuilder { #[cfg(test)] mod tests { use chrono::{NaiveDate, NaiveTime}; + use itertools::Itertools; use risingwave_common::array::{Op, StructValue}; use risingwave_common::catalog::ColumnId; use risingwave_common::row::{OwnedRow, Row}; @@ -120,7 +121,7 @@ mod tests { DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, ProtocolProperties, SourceColumnDesc, SourceStreamChunkBuilder, SpecificParserConfig, }; - use crate::source::SourceContext; + use crate::source::{SourceContext, SourceCtrlOpts}; fn assert_json_eq(parse_result: &Option, json_str: &str) { if let Some(ScalarImpl::Jsonb(json_val)) = parse_result { @@ -153,15 +154,13 @@ mod tests { columns: Vec, payload: Vec, ) -> Vec<(Op, OwnedRow)> { - let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2); - { - let writer = builder.row_writer(); - parser - .parse_inner(None, Some(payload), writer) - .await - .unwrap(); - } - let chunk = builder.finish(); + let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test()); + parser + .parse_inner(None, Some(payload), builder.row_writer()) + .await + .unwrap(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); chunk .rows() .map(|(op, row_ref)| (op, row_ref.into_owned_row())) @@ -509,7 +508,8 @@ mod tests { ]; let mut parser = build_parser(columns.clone()).await; - let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2); + let mut dummy_builder = + SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test()); let normal_values = ["111", "1", "33", "444", "555.0", "666.0"]; let overflow_values = [ @@ -530,7 +530,7 @@ mod tests { ).as_bytes().to_vec(); let res = parser - .parse_inner(None, Some(data), builder.row_writer()) + .parse_inner(None, Some(data), dummy_builder.row_writer()) .await; if i < 5 { // For other overflow, the parsing succeeds but the type conversion fails diff --git a/src/connector/src/parser/maxwell/simd_json_parser.rs b/src/connector/src/parser/maxwell/simd_json_parser.rs index e5bc7291ccf07..26377631c61f1 100644 --- a/src/connector/src/parser/maxwell/simd_json_parser.rs +++ b/src/connector/src/parser/maxwell/simd_json_parser.rs @@ -14,6 +14,7 @@ #[cfg(test)] mod tests { + use itertools::Itertools; use risingwave_common::array::Op; use risingwave_common::row::Row; use risingwave_common::types::{DataType, ScalarImpl, ToOwnedDatum}; @@ -23,7 +24,7 @@ mod tests { EncodingProperties, JsonProperties, ProtocolProperties, SourceColumnDesc, SourceStreamChunkBuilder, SpecificParserConfig, }; - use crate::source::SourceContext; + use crate::source::{SourceContext, SourceCtrlOpts}; #[tokio::test] async fn test_json_parser() { @@ -45,7 +46,7 @@ mod tests { .await .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4); + let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test()); let payloads = vec![ br#"{"database":"test","table":"t","type":"insert","ts":1666937996,"xid":1171,"commit":true,"data":{"id":1,"name":"tom","is_adult":0,"birthday":"2017-12-31 16:00:01"}}"#.to_vec(), br#"{"database":"test","table":"t","type":"insert","ts":1666938023,"xid":1254,"commit":true,"data":{"id":2,"name":"alex","is_adult":1,"birthday":"1999-12-31 16:00:01"}}"#.to_vec(), @@ -53,11 +54,14 @@ mod tests { ]; for payload in payloads { - let writer = builder.row_writer(); - parser.parse_inner(payload, writer).await.unwrap(); + parser + .parse_inner(payload, builder.row_writer()) + .await + .unwrap(); } - let chunk = builder.finish(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); let mut rows = chunk.rows(); diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index fe0113798e413..fcd8c9f308b6a 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -217,7 +217,7 @@ mod tests { use super::*; use crate::parser::{MessageMeta, SourceStreamChunkBuilder, TransactionControl}; use crate::source::cdc::DebeziumCdcMeta; - use crate::source::{ConnectorProperties, DataType, SourceMessage, SplitId}; + use crate::source::{ConnectorProperties, DataType, SourceCtrlOpts, SourceMessage, SplitId}; #[tokio::test] async fn test_emit_transactional_chunk() { @@ -252,7 +252,11 @@ mod tests { let mut transactional = false; // for untransactional source, we expect emit a chunk for each message batch let message_stream = source_message_stream(transactional); - let chunk_stream = crate::parser::into_chunk_stream_inner(parser, message_stream.boxed()); + let chunk_stream = crate::parser::into_chunk_stream_inner( + parser, + message_stream.boxed(), + SourceCtrlOpts::for_test(), + ); let output: std::result::Result, _> = block_on(chunk_stream.collect::>()) .into_iter() .collect(); @@ -289,7 +293,11 @@ mod tests { // for transactional source, we expect emit a single chunk for the transaction transactional = true; let message_stream = source_message_stream(transactional); - let chunk_stream = crate::parser::into_chunk_stream_inner(parser, message_stream.boxed()); + let chunk_stream = crate::parser::into_chunk_stream_inner( + parser, + message_stream.boxed(), + SourceCtrlOpts::for_test(), + ); let output: std::result::Result, _> = block_on(chunk_stream.collect::>()) .into_iter() .collect(); @@ -406,7 +414,7 @@ mod tests { ) .await .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); + let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test()); // "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#; @@ -451,8 +459,8 @@ mod tests { _ => panic!("unexpected parse result: {:?}", res), } - let output = builder.take_and_reserve(10); - assert_eq!(0, output.cardinality()); + builder.finish_current_chunk(); + assert!(builder.consume_ready_chunks().next().is_none()); } #[tokio::test] @@ -483,7 +491,7 @@ mod tests { ) .await .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); + let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test()); let msg = r#"{"schema":null,"payload": { "databaseName": "mydb", "ddl": "ALTER TABLE test add column v2 varchar(32)", "schemaName": null, "source": { "connector": "mysql", "db": "mydb", "file": "binlog.000065", "gtid": null, "name": "RW_CDC_0", "pos": 234, "query": null, "row": 0, "sequence": null, "server_id": 1, "snapshot": "false", "table": "test", "thread": null, "ts_ms": 1718354727000, "version": "2.4.2.Final" }, "tableChanges": [ { "id": "\"mydb\".\"test\"", "table": { "columns": [ { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 4, "length": null, "name": "id", "nativeType": null, "optional": false, "position": 1, "scale": null, "typeExpression": "INT", "typeName": "INT" }, { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 2014, "length": null, "name": "v1", "nativeType": null, "optional": true, "position": 2, "scale": null, "typeExpression": "TIMESTAMP", "typeName": "TIMESTAMP" }, { "autoIncremented": false, "charsetName": "utf8mb4", "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 12, "length": 32, "name": "v2", "nativeType": null, "optional": true, "position": 3, "scale": null, "typeExpression": "VARCHAR", "typeName": "VARCHAR" } ], "comment": null, "defaultCharsetName": "utf8mb4", "primaryKeyColumnNames": [ "id" ] }, "type": "ALTER" } ], "ts_ms": 1718354727594 }}"#; let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new( @@ -501,7 +509,7 @@ mod tests { .parse_one_with_txn( None, Some(msg.as_bytes().to_vec()), - builder.row_writer().with_meta(msg_meta), + dummy_builder.row_writer().with_meta(msg_meta), ) .await; diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 4fd5d5d424702..1742334ee088c 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -154,6 +154,16 @@ pub struct SourceCtrlOpts { // so that we can prevent any unintentional use of the default value. impl !Default for SourceCtrlOpts {} +impl SourceCtrlOpts { + #[cfg(test)] + pub fn for_test() -> Self { + SourceCtrlOpts { + chunk_size: 256, + split_txn: false, + } + } +} + #[derive(Debug)] pub struct SourceEnumeratorContext { pub info: SourceEnumeratorInfo, diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 356540884e162..ea665d0cf2f0e 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -22,14 +22,12 @@ use itertools::Itertools; use risingwave_common::array::DataChunk; use risingwave_common::bail; use risingwave_common::catalog::ColumnDesc; -use risingwave_common::row::RowExt; -use risingwave_common::util::iter_util::ZipEqFast; use risingwave_connector::parser::{ ByteStreamSourceParser, DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, ProtocolProperties, SourceStreamChunkBuilder, SpecificParserConfig, }; use risingwave_connector::source::cdc::external::{CdcOffset, ExternalTableReaderImpl}; -use risingwave_connector::source::{SourceColumnDesc, SourceContext}; +use risingwave_connector::source::{SourceColumnDesc, SourceContext, SourceCtrlOpts}; use rw_futures_util::pausable; use thiserror_ext::AsReport; use tracing::Instrument; @@ -822,24 +820,31 @@ async fn parse_debezium_chunk( parser: &mut DebeziumParser, chunk: &StreamChunk, ) -> StreamExecutorResult { - // here we transform the input chunk in (payload varchar, _rw_offset varchar, _rw_table_name varchar) schema + // here we transform the input chunk in `(payload varchar, _rw_offset varchar, _rw_table_name varchar)` schema // to chunk with downstream table schema `info.schema` of MergeNode contains the schema of the // table job with `_rw_offset` in the end // see `gen_create_table_plan_for_cdc_source` for details - let mut builder = - SourceStreamChunkBuilder::with_capacity(parser.columns().to_vec(), chunk.capacity()); - // The schema of input chunk (payload varchar, _rw_offset varchar, _rw_table_name varchar, _row_id) + // use `SourceStreamChunkBuilder` for convenience + let mut builder = SourceStreamChunkBuilder::new( + parser.columns().to_vec(), + SourceCtrlOpts { + chunk_size: chunk.capacity(), + split_txn: false, + }, + ); + + // The schema of input chunk `(payload varchar, _rw_offset varchar, _rw_table_name varchar, _row_id)` // We should use the debezium parser to parse the first column, // then chain the parsed row with `_rw_offset` row to get a new row. - let payloads = chunk.data_chunk().project(vec![0].as_slice()); - let offset_columns = chunk.data_chunk().project(vec![1].as_slice()); + let payloads = chunk.data_chunk().project(&[0]); + let offsets = chunk.data_chunk().project(&[1]).compact(); // TODO: preserve the transaction semantics for payload in payloads.rows() { let ScalarRefImpl::Jsonb(jsonb_ref) = payload.datum_at(0).expect("payload must exist") else { - unreachable!("payload must be jsonb"); + panic!("payload must be jsonb"); }; parser @@ -851,31 +856,23 @@ async fn parse_debezium_chunk( .await .unwrap(); } + builder.finish_current_chunk(); - let parsed_chunk = builder.finish(); - let (data_chunk, ops) = parsed_chunk.into_parts(); - - // concat the rows in the parsed chunk with the _rw_offset column, we should also retain the Op column - let mut new_rows = Vec::with_capacity(chunk.capacity()); - let offset_columns = offset_columns.compact(); - for (data_row, offset_row) in data_chunk - .rows_with_holes() - .zip_eq_fast(offset_columns.rows_with_holes()) - { - let combined = data_row.chain(offset_row); - new_rows.push(combined); - } + let parsed_chunk = { + let mut iter = builder.consume_ready_chunks(); + assert_eq!(1, iter.len()); + iter.next().unwrap() + }; + assert_eq!(parsed_chunk.capacity(), chunk.capacity()); // each payload is expected to generate one row + let (ops, mut columns, vis) = parsed_chunk.into_inner(); + // note that `vis` is not necessarily the same as the original chunk's visibilities - let data_types = parser - .columns() - .iter() - .map(|col| col.data_type.clone()) - .chain(std::iter::once(DataType::Varchar)) // _rw_offset column - .collect_vec(); + // concat the rows in the parsed chunk with the `_rw_offset` column + columns.extend(offsets.into_parts().0); Ok(StreamChunk::from_parts( ops, - DataChunk::from_rows(new_rows.as_slice(), data_types.as_slice()), + DataChunk::from_parts(columns.into(), vis), )) } From 79849fa64972751a7e1f98afcbea918b02b7f858 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 17 Dec 2024 14:42:46 +0800 Subject: [PATCH 08/15] fix clippy Signed-off-by: Richard Chien --- src/connector/src/parser/bytes_parser.rs | 1 - src/connector/src/parser/csv_parser.rs | 1 - src/connector/src/parser/debezium/debezium_parser.rs | 1 - src/connector/src/parser/debezium/mongo_json_parser.rs | 1 - src/connector/src/parser/debezium/simd_json_parser.rs | 1 - src/connector/src/parser/maxwell/simd_json_parser.rs | 1 - src/connector/src/parser/mod.rs | 2 +- 7 files changed, 1 insertion(+), 7 deletions(-) diff --git a/src/connector/src/parser/bytes_parser.rs b/src/connector/src/parser/bytes_parser.rs index 37bc099085c71..841dbfea4ab36 100644 --- a/src/connector/src/parser/bytes_parser.rs +++ b/src/connector/src/parser/bytes_parser.rs @@ -45,7 +45,6 @@ impl BytesAccessBuilder { #[cfg(test)] mod tests { - use itertools::Itertools; use risingwave_common::array::Op; use risingwave_common::row::Row; use risingwave_common::types::{DataType, ScalarImpl, ToOwnedDatum}; diff --git a/src/connector/src/parser/csv_parser.rs b/src/connector/src/parser/csv_parser.rs index d94f924da7a8f..3bb16b4bf60e6 100644 --- a/src/connector/src/parser/csv_parser.rs +++ b/src/connector/src/parser/csv_parser.rs @@ -175,7 +175,6 @@ impl ByteStreamSourceParser for CsvParser { #[cfg(test)] mod tests { - use itertools::Itertools; use risingwave_common::array::Op; use risingwave_common::row::Row; use risingwave_common::types::{DataType, ToOwnedDatum}; diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index cc228f8acd9f9..d17f34add0058 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -208,7 +208,6 @@ mod tests { use std::ops::Deref; use std::sync::Arc; - use itertools::Itertools; use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId, CDC_SOURCE_COLUMN_NUM}; use risingwave_common::row::Row; use risingwave_common::types::Timestamptz; diff --git a/src/connector/src/parser/debezium/mongo_json_parser.rs b/src/connector/src/parser/debezium/mongo_json_parser.rs index 53a81de389923..1d18fa5c53f0c 100644 --- a/src/connector/src/parser/debezium/mongo_json_parser.rs +++ b/src/connector/src/parser/debezium/mongo_json_parser.rs @@ -136,7 +136,6 @@ impl ByteStreamSourceParser for DebeziumMongoJsonParser { #[cfg(test)] mod tests { - use itertools::Itertools; use risingwave_common::array::Op; use risingwave_common::catalog::ColumnId; use risingwave_common::row::Row; diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index a7a2b8020ff09..7e713e1a29e72 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -107,7 +107,6 @@ impl AccessBuilder for DebeziumMongoJsonAccessBuilder { #[cfg(test)] mod tests { use chrono::{NaiveDate, NaiveTime}; - use itertools::Itertools; use risingwave_common::array::{Op, StructValue}; use risingwave_common::catalog::ColumnId; use risingwave_common::row::{OwnedRow, Row}; diff --git a/src/connector/src/parser/maxwell/simd_json_parser.rs b/src/connector/src/parser/maxwell/simd_json_parser.rs index 26377631c61f1..e3943480959c5 100644 --- a/src/connector/src/parser/maxwell/simd_json_parser.rs +++ b/src/connector/src/parser/maxwell/simd_json_parser.rs @@ -14,7 +14,6 @@ #[cfg(test)] mod tests { - use itertools::Itertools; use risingwave_common::array::Op; use risingwave_common::row::Row; use risingwave_common::types::{DataType, ScalarImpl, ToOwnedDatum}; diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index aca94d7d69a2c..70a5e8994ba1a 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -305,7 +305,7 @@ async fn into_chunk_stream_inner( let mut txn_started_in_last_batch = chunk_builder.is_in_transaction(); let process_time_ms = chrono::Utc::now().timestamp_millis(); - for msg in batch.into_iter() { + for msg in batch { if msg.is_cdc_heartbeat() { // ignore heartbeat messages continue; From 666d4151d77fd9ea284665873911dee6366ebd82 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 17 Dec 2024 15:49:43 +0800 Subject: [PATCH 09/15] fix typo Signed-off-by: Richard Chien --- src/connector/src/parser/chunk_builder.rs | 18 +++++++++--------- src/connector/src/parser/mod.rs | 4 ++-- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/connector/src/parser/chunk_builder.rs b/src/connector/src/parser/chunk_builder.rs index cc5bd10e7c483..27b825ac90ed5 100644 --- a/src/connector/src/parser/chunk_builder.rs +++ b/src/connector/src/parser/chunk_builder.rs @@ -45,7 +45,7 @@ struct Transaction { /// A builder for building a [`StreamChunk`] from [`SourceColumnDesc`]. pub struct SourceStreamChunkBuilder { column_descs: Vec, - source_ctrl_ops: SourceCtrlOpts, + source_ctrl_opts: SourceCtrlOpts, builders: Vec, op_builder: Vec, vis_builder: BitmapBuilder, @@ -54,13 +54,13 @@ pub struct SourceStreamChunkBuilder { } impl SourceStreamChunkBuilder { - pub fn new(column_descs: Vec, source_ctrl_ops: SourceCtrlOpts) -> Self { + pub fn new(column_descs: Vec, source_ctrl_opts: SourceCtrlOpts) -> Self { let (builders, op_builder, vis_builder) = - Self::create_builders(&column_descs, source_ctrl_ops.chunk_size); + Self::create_builders(&column_descs, source_ctrl_opts.chunk_size); Self { column_descs, - source_ctrl_ops, + source_ctrl_opts, builders, op_builder, vis_builder, @@ -108,9 +108,9 @@ impl SourceStreamChunkBuilder { } tracing::debug!(txn_id, "commit upstream transaction"); - if self.current_chunk_len() >= self.source_ctrl_ops.chunk_size { + if self.current_chunk_len() >= self.source_ctrl_opts.chunk_size { // if `split_txn` is on, we should've finished the chunk already - assert!(!self.source_ctrl_ops.split_txn); + assert!(!self.source_ctrl_opts.split_txn); self.finish_current_chunk(); } } else { @@ -159,7 +159,7 @@ impl SourceStreamChunkBuilder { } let (builders, op_builder, vis_builder) = - Self::create_builders(&self.column_descs, self.source_ctrl_ops.chunk_size); + Self::create_builders(&self.column_descs, self.source_ctrl_opts.chunk_size); let chunk = StreamChunk::with_visibility( std::mem::replace(&mut self.op_builder, op_builder), std::mem::replace(&mut self.builders, builders) @@ -196,13 +196,13 @@ impl SourceStreamChunkBuilder { self.vis_builder.append(vis); let curr_chunk_size = self.current_chunk_len(); - let max_chunk_size = self.source_ctrl_ops.chunk_size; + let max_chunk_size = self.source_ctrl_opts.chunk_size; if let Some(ref mut txn) = self.ongoing_txn { txn.len += 1; if txn.len >= MAX_TRANSACTION_SIZE - || (self.source_ctrl_ops.split_txn && curr_chunk_size >= max_chunk_size) + || (self.source_ctrl_opts.split_txn && curr_chunk_size >= max_chunk_size) { self.finish_current_chunk(); } diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 70a5e8994ba1a..47b8ebc6287fb 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -253,10 +253,10 @@ impl P { async fn into_chunk_stream_inner( mut parser: P, msg_stream: BoxSourceStream, - source_ctrl_ops: SourceCtrlOpts, + source_ctrl_opts: SourceCtrlOpts, ) { let mut chunk_builder = - SourceStreamChunkBuilder::new(parser.columns().to_vec(), source_ctrl_ops); + SourceStreamChunkBuilder::new(parser.columns().to_vec(), source_ctrl_opts); let mut direct_cdc_event_lag_latency_metrics = HashMap::new(); From f001393c0f41425686d35d1a18d40af04a330cf1 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 17 Dec 2024 15:50:27 +0800 Subject: [PATCH 10/15] remove `ensure_max_chunk_size` Signed-off-by: Richard Chien --- src/connector/src/parser/mod.rs | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 47b8ebc6287fb..531377c11f927 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -200,21 +200,6 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { } } -#[try_stream(ok = Vec, error = ConnectorError)] -async fn ensure_max_chunk_size(stream: BoxSourceStream, max_chunk_size: usize) { - #[for_await] - for batch in stream { - let mut batch = batch?; - let mut start = 0; - let end = batch.len(); - while start < end { - let next = std::cmp::min(start + max_chunk_size, end); - yield std::mem::take(&mut batch[start..next].as_mut()).to_vec(); - start = next; - } - } -} - #[easy_ext::ext(SourceParserIntoStreamExt)] impl P { /// Parse a stream of vectors of [`SourceMessage`] into a stream of [`StreamChunk`]. @@ -232,13 +217,6 @@ impl P { let actor_id = self.source_ctx().actor_id; let source_id = self.source_ctx().source_id.table_id(); - // TODO(): remove this later - // Ensure chunk size is smaller than rate limit - let msg_stream = Box::pin(ensure_max_chunk_size( - msg_stream, - self.source_ctx().source_ctrl_opts.chunk_size, - )); - // The stream will be long-lived. We use `instrument_with` here to create // a new span for the polling of each chunk. let source_ctrl_opts = self.source_ctx().source_ctrl_opts; From 6f1b54b06244a372da6aacd7b70e2d15b4b0645e Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 17 Dec 2024 16:11:44 +0800 Subject: [PATCH 11/15] fix clippy Signed-off-by: Richard Chien --- src/connector/src/parser/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 531377c11f927..6a1511aa66531 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -43,13 +43,13 @@ pub use self::sql_server::{sql_server_row_to_owned_row, ScalarImplTiberiusWrappe pub use self::unified::json::{JsonAccess, TimestamptzHandling}; pub use self::unified::Access; use self::upsert_parser::UpsertParser; -use crate::error::{ConnectorError, ConnectorResult}; +use crate::error::ConnectorResult; use crate::parser::maxwell::MaxwellParser; use crate::schema::schema_registry::SchemaRegistryAuth; use crate::source::monitor::GLOBAL_SOURCE_METRICS; use crate::source::{ BoxSourceStream, ChunkSourceStream, SourceColumnDesc, SourceColumnType, SourceContext, - SourceContextRef, SourceCtrlOpts, SourceMessage, SourceMeta, + SourceContextRef, SourceCtrlOpts, SourceMeta, }; mod access_builder; From 0d98fa981429a3b10695da1c6f81af10a1da1360 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 18 Dec 2024 13:30:52 +0800 Subject: [PATCH 12/15] fix ut Signed-off-by: Richard Chien --- src/connector/src/parser/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 6a1511aa66531..b7a7f280f4041 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -496,9 +496,9 @@ impl ByteStreamSourceParserImpl { #[cfg(test)] pub mod test_utils { use futures::StreamExt as _; - use itertools::Itertools as _; use super::*; + use crate::source::SourceMessage; #[easy_ext::ext(ByteStreamSourceParserImplTestExt)] pub(crate) impl ByteStreamSourceParserImpl { From b32abd61c262f44281e4fb437b402b75db5cb90a Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 18 Dec 2024 16:25:25 +0800 Subject: [PATCH 13/15] fix build Signed-off-by: Richard Chien --- src/connector/src/parser/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index b7a7f280f4041..f50f2672d8ea3 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -495,7 +495,8 @@ impl ByteStreamSourceParserImpl { /// Test utilities for [`ByteStreamSourceParserImpl`]. #[cfg(test)] pub mod test_utils { - use futures::StreamExt as _; + use futures::StreamExt; + use itertools::Itertools; use super::*; use crate::source::SourceMessage; From b1a892c1836880c0696f6b12373f98f1b7951853 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 18 Dec 2024 17:22:57 +0800 Subject: [PATCH 14/15] fix check Signed-off-by: Richard Chien --- src/connector/src/parser/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index f50f2672d8ea3..410a062f6875a 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -202,11 +202,11 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { #[easy_ext::ext(SourceParserIntoStreamExt)] impl P { - /// Parse a stream of vectors of [`SourceMessage`] into a stream of [`StreamChunk`]. + /// Parse a stream of vectors of `SourceMessage` into a stream of [`StreamChunk`]. /// /// # Arguments /// - /// - `msg_stream`: A stream of vectors of [`SourceMessage`]. + /// - `msg_stream`: A stream of vectors of `SourceMessage`. /// /// # Returns /// @@ -413,7 +413,7 @@ pub enum EncodingType { Value, } -/// The entrypoint of parsing. It parses [`SourceMessage`] stream (byte stream) into [`StreamChunk`] stream. +/// The entrypoint of parsing. It parses `SourceMessage` stream (byte stream) into [`StreamChunk`] stream. /// Used by [`crate::source::into_chunk_stream`]. #[derive(Debug)] pub enum ByteStreamSourceParserImpl { @@ -427,7 +427,7 @@ pub enum ByteStreamSourceParserImpl { } impl ByteStreamSourceParserImpl { - /// Converts [`SourceMessage`] vec stream into [`StreamChunk`] stream. + /// Converts `SourceMessage` vec stream into [`StreamChunk`] stream. pub fn into_stream(self, msg_stream: BoxSourceStream) -> impl ChunkSourceStream + Unpin { #[auto_enum(futures03::Stream)] let stream = match self { From bd6796d50723a2918870f80d2ed847dccd627009 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 17 Dec 2024 16:11:22 +0800 Subject: [PATCH 15/15] remove chunk splitting logic in `apply_rate_limit` Signed-off-by: Richard Chien --- src/stream/src/executor/source/mod.rs | 35 ++++++++++++++------------- src/stream/src/executor/utils.rs | 4 +-- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/src/stream/src/executor/source/mod.rs b/src/stream/src/executor/source/mod.rs index 7911d99a6c4b0..019ddfcf40d06 100644 --- a/src/stream/src/executor/source/mod.rs +++ b/src/stream/src/executor/source/mod.rs @@ -148,24 +148,25 @@ pub async fn apply_rate_limit(stream: BoxChunkSourceStream, rate_limit_rps: Opti } let limiter = limiter.as_ref().unwrap(); - let limit = rate_limit_rps.unwrap() as usize; - - let required_permits = compute_rate_limit_chunk_permits(&chunk, limit); - if required_permits <= limit { - let n = NonZeroU32::new(required_permits as u32).unwrap(); - // `InsufficientCapacity` should never happen because we have check the cardinality - limiter.until_n_ready(n).await.unwrap(); - yield chunk; - } else { - // Cut the chunk into smaller chunks - for chunk in chunk.split(limit) { - let n = NonZeroU32::new(compute_rate_limit_chunk_permits(&chunk, limit) as u32) - .unwrap(); - // chunks split should have effective chunk size <= limit - limiter.until_n_ready(n).await.unwrap(); - yield chunk; - } + let burst = rate_limit_rps.unwrap() as usize; + + let mut required_permits = compute_rate_limit_chunk_permits(&chunk, burst); + if required_permits > burst { + // This should not happen after https://github.com/risingwavelabs/risingwave/pull/19698. + // But if it does happen, let's don't panic and just log an error. + tracing::error!( + chunk_size, + required_permits, + burst, + "unexpected large chunk size" + ); + required_permits = burst; } + + let n = NonZeroU32::new(required_permits as u32).unwrap(); + // `InsufficientCapacity` should never happen because we have check the cardinality + limiter.until_n_ready(n).await.unwrap(); + yield chunk; } } diff --git a/src/stream/src/executor/utils.rs b/src/stream/src/executor/utils.rs index ad2e1b8a4268c..bd6db0b7c61c5 100644 --- a/src/stream/src/executor/utils.rs +++ b/src/stream/src/executor/utils.rs @@ -23,7 +23,7 @@ impl Execute for DummyExecutor { } } -pub fn compute_rate_limit_chunk_permits(chunk: &StreamChunk, limit: usize) -> usize { +pub fn compute_rate_limit_chunk_permits(chunk: &StreamChunk, burst: usize) -> usize { let chunk_size = chunk.capacity(); let ends_with_update = if chunk_size >= 2 { // Note we have to check if the 2nd last is `U-` to be consistenct with `StreamChunkBuilder`. @@ -32,7 +32,7 @@ pub fn compute_rate_limit_chunk_permits(chunk: &StreamChunk, limit: usize) -> us } else { false }; - if chunk_size == limit + 1 && ends_with_update { + if chunk_size == burst + 1 && ends_with_update { // If the chunk size exceed limit because of the last `Update` operation, // we should minus 1 to make sure the permits consumed is within the limit (max burst). chunk_size - 1