Skip to content

Commit

Permalink
refactor(cdc source): merge cdc heartbeat chunk builder & data chunk …
Browse files Browse the repository at this point in the history
…builder (#19671)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Dec 6, 2024
1 parent 7333427 commit fcac311
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 49 deletions.
2 changes: 1 addition & 1 deletion src/connector/src/parser/chunk_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl SourceStreamChunkBuilder {
/// Resets the builder and returns a [`StreamChunk`], while reserving `next_cap` capacity for
/// the builders of the next [`StreamChunk`].
#[must_use]
pub fn take(&mut self, next_cap: usize) -> StreamChunk {
pub fn take_and_reserve(&mut self, next_cap: usize) -> StreamChunk {
let descs = std::mem::take(&mut self.descs); // we don't use `descs` in `finish`
let builder = std::mem::replace(self, Self::with_capacity(descs, next_cap));
builder.finish()
Expand Down
125 changes: 78 additions & 47 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
.map_ok(|_| ParseResult::Rows)
}

fn emit_empty_row<'a>(&'a mut self, mut writer: SourceStreamChunkRowWriter<'a>) {
fn append_empty_row<'a>(&'a mut self, mut writer: SourceStreamChunkRowWriter<'a>) {
_ = writer.do_insert(|_column| Ok(Datum::None));
}
}
Expand Down Expand Up @@ -250,7 +250,7 @@ impl<P: ByteStreamSourceParser> P {

/// Maximum number of rows in a transaction. If a transaction is larger than this, it will be force
/// committed to avoid potential OOM.
const MAX_ROWS_FOR_TRANSACTION: usize = 4096;
const MAX_TRANSACTION_SIZE: usize = 4096;

// TODO: when upsert is disabled, how to filter those empty payload
// Currently, an err is returned for non upsert with empty payload
Expand All @@ -261,8 +261,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
) {
let columns = parser.columns().to_vec();

let mut heartbeat_builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 0);
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0);
let mut chunk_builder = SourceStreamChunkBuilder::with_capacity(columns, 0);

struct Transaction {
id: Box<str>,
Expand All @@ -273,45 +272,71 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(

#[for_await]
for batch in data_stream {
// It's possible that the split is not active, which means the next batch may arrive
// very lately, so we should prefer emitting all records in current batch before the end
// of each iteration, instead of merging them with the next batch. An exception is when
// a transaction is not committed yet, in which yield when the transaction is committed.

let batch = batch?;
let batch_len = batch.len();

let mut last_batch_not_yielded = false;
if let Some(Transaction { len, id }) = &mut current_transaction {
// Dirty state. The last batch is not yielded due to uncommitted transaction.
if *len > MAX_ROWS_FOR_TRANSACTION {
// Large transaction. Force commit.
if batch_len == 0 {
continue;
}

if batch.iter().all(|msg| msg.is_cdc_heartbeat()) {
// This `.iter().all(...)` will short-circuit after seeing the first `false`, so in
// normal cases, this should only involve a constant time cost.

// Now we know that there is no data message in the batch, let's just emit the latest
// heartbeat message. Note that all messages in `batch` should belong to the same
// split, so we don't have to do a split to heartbeats mapping here.

if let Some(Transaction { id, len }) = &mut current_transaction {
// if there's an ongoing transaction, something may be wrong
tracing::warn!(
id,
len,
"transaction is larger than {MAX_ROWS_FOR_TRANSACTION} rows, force commit"
"got a batch of empty messages during an ongoing transaction"
);
*len = 0; // reset `len` while keeping `id`
yield builder.take(batch_len);
} else {
last_batch_not_yielded = true
// for the sake of simplicity, let's force emit the partial transaction chunk
if *len > 0 {
*len = 0; // reset `len` while keeping `id`
yield chunk_builder.take_and_reserve(1); // next chunk will only contain the heartbeat
}
}
} else {
// Clean state. Reserve capacity for the builder.
assert!(builder.is_empty());
let _ = builder.take(batch_len);

// According to the invariant we mentioned at the beginning of the `for batch` loop,
// there should be no data of previous batch in `chunk_builder`.
assert!(chunk_builder.is_empty());

let heartbeat_msg = batch.last().unwrap();
tracing::debug!(
offset = heartbeat_msg.offset,
"emitting a heartbeat message"
);
// TODO(rc): should be `chunk_builder.append_heartbeat` instead, which is simpler
parser.append_empty_row(chunk_builder.row_writer().invisible().with_meta(
MessageMeta {
meta: &heartbeat_msg.meta,
split_id: &heartbeat_msg.split_id,
offset: &heartbeat_msg.offset,
},
));
yield chunk_builder.take_and_reserve(batch_len);

continue;
}

// When we reach here, there is at least one data message in the batch. We should ignore all
// heartbeat messages.

let mut txn_started_in_last_batch = current_transaction.is_some();
let process_time_ms = chrono::Utc::now().timestamp_millis();

for (i, msg) in batch.into_iter().enumerate() {
if msg.key.is_none() && msg.payload.is_none() {
tracing::debug!(
offset = msg.offset,
"got a empty message, could be a heartbeat"
);
// Emit an empty invisible row for the heartbeat message.
parser.emit_empty_row(heartbeat_builder.row_writer().invisible().with_meta(
MessageMeta {
meta: &msg.meta,
split_id: &msg.split_id,
offset: &msg.offset,
},
));
if msg.is_cdc_heartbeat() {
// ignore heartbeat messages
continue;
}

Expand All @@ -330,12 +355,12 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
direct_cdc_event_lag_latency.observe(lag_ms as f64);
}

let old_len = builder.len();
let old_len = chunk_builder.len();
match parser
.parse_one_with_txn(
msg.key,
msg.payload,
builder.row_writer().with_meta(MessageMeta {
chunk_builder.row_writer().with_meta(MessageMeta {
meta: &msg.meta,
split_id: &msg.split_id,
offset: &msg.offset,
Expand All @@ -348,7 +373,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
res @ (Ok(ParseResult::Rows) | Err(_)) => {
// Aggregate the number of new rows into the current transaction.
if let Some(Transaction { len, .. }) = &mut current_transaction {
let n_new_rows = builder.len() - old_len;
let n_new_rows = chunk_builder.len() - old_len;
*len += n_new_rows;
}

Expand Down Expand Up @@ -394,9 +419,9 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
tracing::debug!(id, "commit upstream transaction");
current_transaction = None;

if last_batch_not_yielded {
yield builder.take(batch_len - (i + 1));
last_batch_not_yielded = false;
if txn_started_in_last_batch {
yield chunk_builder.take_and_reserve(batch_len - (i + 1));
txn_started_in_last_batch = false;
}
}
},
Expand Down Expand Up @@ -425,16 +450,22 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
}
}

// emit heartbeat for each message batch
// we must emit heartbeat chunk before the data chunk,
// otherwise the source offset could be backward due to the heartbeat
if !heartbeat_builder.is_empty() {
yield heartbeat_builder.take(0);
}

// If we are not in a transaction, we should yield the chunk now.
if current_transaction.is_none() {
yield builder.take(0);
if let Some(Transaction { len, id }) = &mut current_transaction {
// in transaction, check whether it's too large
if *len > MAX_TRANSACTION_SIZE {
// force commit
tracing::warn!(
id,
len,
"transaction is larger than {MAX_TRANSACTION_SIZE} rows, force commit"
);
*len = 0; // reset `len` while keeping `id`
yield chunk_builder.take_and_reserve(batch_len); // use curr batch len as next capacity, just a hint
}
// TODO(rc): we will have better chunk size control later
} else if !chunk_builder.is_empty() {
// not in transaction, yield the chunk now
yield chunk_builder.take_and_reserve(batch_len); // use curr batch len as next capacity, just a hint
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ mod tests {
_ => panic!("unexpected parse result: {:?}", res),
}

let output = builder.take(10);
let output = builder.take_and_reserve(10);
assert_eq!(0, output.cardinality());
}

Expand Down
5 changes: 5 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,11 @@ impl SourceMessage {
meta: SourceMeta::Empty,
}
}

/// Check whether the source message is a CDC heartbeat message.
pub fn is_cdc_heartbeat(&self) -> bool {
self.key.is_none() && self.payload.is_none()
}
}

#[derive(Debug, Clone)]
Expand Down

0 comments on commit fcac311

Please sign in to comment.