Skip to content

Commit

Permalink
fix(collator): check if should collect before updating current proces…
Browse files Browse the repository at this point in the history
…sed offset

* otherwise we skip last messages collection on refill and get an incorrect messages buffers state after refill
  • Loading branch information
SmaGMan committed Feb 23, 2025
1 parent 962fa94 commit ca1a395
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions collator/src/collator/messages_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,13 @@ impl<V: InternalMessageValue> MessagesReader<V> {
) -> Result<CollectMessageForPartitionResult> {
let mut res = CollectMessageForPartitionResult::default();

// on refill collect only until the last range processed offset reached
let int_prev_processed_offset_reached_on_refill =
read_mode == GetNextMessageGroupMode::Refill && par_reader.last_range_offset_reached();
let ext_prev_processed_offsets_reached_on_refill = read_mode
== GetNextMessageGroupMode::Refill
&& externals_reader.last_range_offset_reached(&par_reader.partition_id);

// update processed offset anyway
par_reader.increment_curr_processed_offset();
externals_reader.increment_curr_processed_offset(&par_reader.partition_id)?;
Expand All @@ -1017,13 +1024,6 @@ impl<V: InternalMessageValue> MessagesReader<V> {
let mut all_internals_collected_before = false;
let mut all_read_externals_collected_before = false;

// on refill collect only until the last range processed offset reached
let int_prev_processed_offset_reached_on_refill =
read_mode == GetNextMessageGroupMode::Refill && par_reader.last_range_offset_reached();
let ext_prev_processed_offsets_reached_on_refill = read_mode
== GetNextMessageGroupMode::Refill
&& externals_reader.last_range_offset_reached(&par_reader.partition_id);

// collect existing internals
if *par_reader_stage == MessagesReaderStage::ExistingAndExternals
&& !int_prev_processed_offset_reached_on_refill
Expand Down Expand Up @@ -1112,7 +1112,8 @@ impl<V: InternalMessageValue> MessagesReader<V> {
has_pending_externals = externals_reader.has_pending_externals(),
ext_reader_states = ?externals_reader.reader_state().by_partitions,
last_range_reader_state = ?externals_reader.get_last_range_reader().map(|(seqno, r)| (seqno, DebugExternalsRangeReaderState(r.reader_state()))),
"all read externals collected",
"all read externals collected when collecting from partition_id={}",
par_reader.partition_id,
);
}
}
Expand Down

0 comments on commit ca1a395

Please sign in to comment.