Skip to content

Commit

Permalink
fix(stream): yield upstream updates for backfill after last barrier i…
Browse files Browse the repository at this point in the history
…n the interval (#12654)
  • Loading branch information
kwannoel authored Oct 7, 2023
1 parent 2dcd879 commit 4fe8294
Showing 1 changed file with 5 additions and 31 deletions.
36 changes: 5 additions & 31 deletions src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,30 +333,12 @@ where
break;
} else {
// Process barrier:
// - consume upstream buffer chunk
// - switch snapshot

// Consume upstream buffer chunk
// If no current_pos, means we did not process any snapshot
// yet. In that case
// we can just ignore the upstream buffer chunk, but still need to clean it.
if let Some(current_pos) = &current_pos {
for chunk in upstream_chunk_buffer.drain(..) {
cur_barrier_upstream_processed_rows +=
chunk.cardinality() as u64;
yield Message::Chunk(mapping_chunk(
mark_chunk(
chunk,
current_pos,
&pk_in_output_indices,
pk_order,
),
&self.output_indices,
));
}
} else {
upstream_chunk_buffer.clear()
}
// Upstream updates should only be read after the
// Nth barrier.
// Otherwise they would get filtered out,
// if they are larger than current pos,
// and we will lose them.

self.metrics
.backfill_snapshot_read_row_count
Expand All @@ -366,14 +348,6 @@ where
])
.inc_by(cur_barrier_snapshot_processed_rows);

self.metrics
.backfill_upstream_output_row_count
.with_label_values(&[
upstream_table_id.to_string().as_str(),
self.actor_id.to_string().as_str(),
])
.inc_by(cur_barrier_upstream_processed_rows);

// Update snapshot read epoch.
snapshot_read_epoch = barrier.epoch.prev;

Expand Down

0 comments on commit 4fe8294

Please sign in to comment.