diff --git a/collator/src/collator/messages_reader/externals_reader.rs b/collator/src/collator/messages_reader/externals_reader.rs index 00b300821..f3c9ca398 100644 --- a/collator/src/collator/messages_reader/externals_reader.rs +++ b/collator/src/collator/messages_reader/externals_reader.rs @@ -643,8 +643,8 @@ impl ExternalsReader { let range_reader_state_by_partition = reader.reader_state.get_state_by_partition_mut(par_id)?; - // skip up to skip offset - if curr_processed_offset > range_reader_state_by_partition.skip_offset { + // skip below skip offset + if curr_processed_offset >= range_reader_state_by_partition.skip_offset { res.metrics.add_to_message_groups_timer.start(); let FillMessageGroupResult { collected_count, @@ -726,7 +726,7 @@ impl ExternalsReader { // collect messages from the next range // only when current range processed offset is reached - if curr_processed_offset <= range_reader_processed_offset { + if curr_processed_offset < range_reader_processed_offset { break; } } diff --git a/collator/src/collator/messages_reader/internals_reader.rs b/collator/src/collator/messages_reader/internals_reader.rs index 830ad71eb..975f3102c 100644 --- a/collator/src/collator/messages_reader/internals_reader.rs +++ b/collator/src/collator/messages_reader/internals_reader.rs @@ -827,8 +827,8 @@ impl InternalsPartitionReader { continue; } - // skip up to skip offset - if self.reader_state.curr_processed_offset > range_reader.reader_state.skip_offset { + // skip below skip offset + if self.reader_state.curr_processed_offset >= range_reader.reader_state.skip_offset { res.metrics.add_to_message_groups_timer.start(); let CollectMessagesFromRangeReaderResult { mut collected_queue_msgs_keys, @@ -855,7 +855,7 @@ impl InternalsPartitionReader { // collect messages from the next range // only when current range processed offset is reached - if self.reader_state.curr_processed_offset <= range_reader_processed_offset { + if self.reader_state.curr_processed_offset < range_reader_processed_offset { break; } }