Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): remove chunk splitting logic in apply_rate_limit #19826

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/batch/executors/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl SourceExecutor {
self.metrics,
SourceCtrlOpts {
chunk_size: self.chunk_size,
rate_limit: None,
split_txn: false,
},
ConnectorProperties::default(),
None,
Expand Down
2 changes: 0 additions & 2 deletions src/common/src/array/stream_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ impl Op {
}
}

pub type Ops<'a> = &'a [Op];

/// `StreamChunk` is used to pass data over the streaming pathway.
#[derive(Clone, PartialEq)]
pub struct StreamChunk {
Expand Down
11 changes: 11 additions & 0 deletions src/common/src/array/stream_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ pub enum RecordType {
Update,
}

impl RecordType {
/// Get the corresponding `Op`s for this record type.
pub fn ops(self) -> &'static [Op] {
match self {
RecordType::Insert => &[Op::Insert],
RecordType::Delete => &[Op::Delete],
RecordType::Update => &[Op::UpdateDelete, Op::UpdateInsert],
}
}
}

/// Generic type to represent a row change.
#[derive(Debug, Clone, Copy)]
pub enum Record<R: Row> {
Expand Down
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ serde_derive = "1"
serde_json = "1"
serde_with = { version = "3", features = ["json"] }
simd-json = { version = "0.14.2", features = ["hints"] }
smallvec = "1"
sqlx = { workspace = true }
strum = "0.26"
strum_macros = "0.26"
Expand Down
6 changes: 5 additions & 1 deletion src/connector/benches/debezium_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use json_common::*;
use paste::paste;
use rand::Rng;
use risingwave_connector::parser::{DebeziumParser, SourceStreamChunkBuilder};
use risingwave_connector::source::SourceCtrlOpts;

fn generate_debezium_json_row(rng: &mut impl Rng, change_event: &str) -> String {
let source = r#"{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null}"#;
Expand Down Expand Up @@ -57,7 +58,10 @@ macro_rules! create_debezium_bench_helpers {
|| (block_on(DebeziumParser::new_for_test(get_descs())).unwrap(), records.clone()) ,
| (mut parser, records) | async move {
let mut builder =
SourceStreamChunkBuilder::with_capacity(get_descs(), NUM_RECORDS);
SourceStreamChunkBuilder::new(get_descs(), SourceCtrlOpts {
chunk_size: NUM_RECORDS,
split_txn: false,
});
for record in records {
let writer = builder.row_writer();
parser.parse_inner(None, record, writer).await.unwrap();
Expand Down
18 changes: 15 additions & 3 deletions src/connector/benches/json_vs_plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use json_common::*;
use old_json_parser::JsonParser;
use risingwave_connector::parser::plain_parser::PlainParser;
use risingwave_connector::parser::{SourceStreamChunkBuilder, SpecificParserConfig};
use risingwave_connector::source::SourceContext;
use risingwave_connector::source::{SourceContext, SourceCtrlOpts};

// The original implementation used to parse JSON prior to #13707.
mod old_json_parser {
Expand Down Expand Up @@ -130,7 +130,13 @@ fn bench_plain_parser_and_json_parser(c: &mut Criterion) {
(parser, records.clone())
},
|(mut parser, records)| async move {
let mut builder = SourceStreamChunkBuilder::with_capacity(get_descs(), NUM_RECORDS);
let mut builder = SourceStreamChunkBuilder::new(
get_descs(),
SourceCtrlOpts {
chunk_size: NUM_RECORDS,
split_txn: false,
},
);
for record in records {
let writer = builder.row_writer();
parser
Expand All @@ -155,7 +161,13 @@ fn bench_plain_parser_and_json_parser(c: &mut Criterion) {
(parser, records.clone())
},
|(parser, records)| async move {
let mut builder = SourceStreamChunkBuilder::with_capacity(get_descs(), NUM_RECORDS);
let mut builder = SourceStreamChunkBuilder::new(
get_descs(),
SourceCtrlOpts {
chunk_size: NUM_RECORDS,
split_txn: false,
},
);
for record in records {
let writer = builder.row_writer();
parser.parse_inner(record, writer).await.unwrap();
Expand Down
7 changes: 4 additions & 3 deletions src/connector/src/parser/bytes_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ mod tests {
BytesProperties, EncodingProperties, ProtocolProperties, SourceColumnDesc,
SourceStreamChunkBuilder, SpecificParserConfig,
};
use crate::source::SourceContext;
use crate::source::{SourceContext, SourceCtrlOpts};

fn get_payload() -> Vec<Vec<u8>> {
vec![br#"t"#.to_vec(), br#"random"#.to_vec()]
Expand All @@ -70,7 +70,7 @@ mod tests {
.await
.unwrap();

let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2);
let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());

for payload in get_payload() {
let writer = builder.row_writer();
Expand All @@ -80,7 +80,8 @@ mod tests {
.unwrap();
}

let chunk = builder.finish();
builder.finish_current_chunk();
let chunk = builder.consume_ready_chunks().next().unwrap();
let mut rows = chunk.rows();
{
let (op, row) = rows.next().unwrap();
Expand Down
34 changes: 22 additions & 12 deletions src/connector/src/parser/canal/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ mod tests {

use super::*;
use crate::parser::SourceStreamChunkBuilder;
use crate::source::SourceCtrlOpts;

#[tokio::test]
async fn test_data_types() {
Expand All @@ -162,12 +163,15 @@ mod tests {
)
.unwrap();

let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 1);
let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());

let writer = builder.row_writer();
parser.parse_inner(payload.to_vec(), writer).await.unwrap();
parser
.parse_inner(payload.to_vec(), builder.row_writer())
.await
.unwrap();

let chunk = builder.finish();
builder.finish_current_chunk();
let chunk = builder.consume_ready_chunks().next().unwrap();
let (op, row) = chunk.rows().next().unwrap();
assert_eq!(op, Op::Insert);
assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(1)));
Expand Down Expand Up @@ -233,12 +237,15 @@ mod tests {
)
.unwrap();

let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2);
let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());

let writer = builder.row_writer();
parser.parse_inner(payload.to_vec(), writer).await.unwrap();
parser
.parse_inner(payload.to_vec(), builder.row_writer())
.await
.unwrap();

let chunk = builder.finish();
builder.finish_current_chunk();
let chunk = builder.consume_ready_chunks().next().unwrap();

let mut rows = chunk.rows();

Expand Down Expand Up @@ -287,12 +294,15 @@ mod tests {
)
.unwrap();

let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2);
let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());

let writer = builder.row_writer();
parser.parse_inner(payload.to_vec(), writer).await.unwrap();
parser
.parse_inner(payload.to_vec(), builder.row_writer())
.await
.unwrap();

let chunk = builder.finish();
builder.finish_current_chunk();
let chunk = builder.consume_ready_chunks().next().unwrap();

let mut rows = chunk.rows();

Expand Down
Loading
Loading