Skip to content

Commit

Permalink
fix: Avoid polling the consumer if it says it's paused (#412)
Browse files Browse the repository at this point in the history
There is a bug in StreamProcessor where self.__is_paused, but at the
same time we poll the consumer and get messages.

      File "sentry/runner/commands/run.py", line 455, in basic_consumer
        run_processor_with_signals(processor, consumer_name)
      File "sentry/utils/kafka.py", line 46, in run_processor_with_signals
        processor.run()
      File "arroyo/processing/processor.py", line 323, in run
        self._run_once()
      File "arroyo/processing/processor.py", line 442, in _run_once
        assert self.__consumer.poll(0.1) is None

Breadcrumbs indicate that rebalancing was happening at that time. Could
it be that we were paused, but the consumer got more partitions
assigned? It doesn't make sense since we don't have cooperative-sticky
rebalancing enabled, let's add more logs and prevent the crash.
  • Loading branch information
untitaker authored Dec 19, 2024
1 parent 1adc481 commit bb3aaf5
Showing 1 changed file with 21 additions and 4 deletions.
25 changes: 21 additions & 4 deletions arroyo/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,10 +446,27 @@ def _run_once(self) -> None:
self.__is_paused = True

elif self.__is_paused:
# A paused consumer should still poll periodically to avoid it's partitions
# getting revoked by the broker after reaching the max.poll.interval.ms
# Polling a paused consumer should never yield a message.
assert self.__consumer.poll(0.1) is None
paused_partitions = set(self.__consumer.paused())
unpaused_partitions = (
set(self.__consumer.tell()) - paused_partitions
)
if unpaused_partitions:
logger.warning(
"Processor in paused state while consumer is partially unpaused: %s, paused: %s",
unpaused_partitions,
paused_partitions,
)
self.__is_paused = False
# unpause paused partitions... just in case a subset is paused
self.__metrics_buffer.incr_counter(
"arroyo.consumer.resume", 1
)
self.__consumer.resume([*paused_partitions])
else:
# A paused consumer should still poll periodically to avoid it's partitions
# getting revoked by the broker after reaching the max.poll.interval.ms
# Polling a paused consumer should never yield a message.
assert self.__consumer.poll(0.1) is None
else:
time.sleep(0.01)

Expand Down

0 comments on commit bb3aaf5

Please sign in to comment.