From bb3aaf57a7496b9cd99ba726307a13338de05b91 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 18 Dec 2024 17:57:24 -0800 Subject: [PATCH] fix: Avoid polling the consumer if it says it's paused (#412) There is a bug in StreamProcessor where self.__is_paused, but at the same time we poll the consumer and get messages. File "sentry/runner/commands/run.py", line 455, in basic_consumer run_processor_with_signals(processor, consumer_name) File "sentry/utils/kafka.py", line 46, in run_processor_with_signals processor.run() File "arroyo/processing/processor.py", line 323, in run self._run_once() File "arroyo/processing/processor.py", line 442, in _run_once assert self.__consumer.poll(0.1) is None Breadcrumbs indicate that rebalancing was happening at that time. Could it be that we were paused, but the consumer got more partitions assigned? It doesn't make sense since we don't have cooperative-sticky rebalancing enabled, let's add more logs and prevent the crash. --- arroyo/processing/processor.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index 2d69ec19..61c6de20 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -446,10 +446,27 @@ def _run_once(self) -> None: self.__is_paused = True elif self.__is_paused: - # A paused consumer should still poll periodically to avoid it's partitions - # getting revoked by the broker after reaching the max.poll.interval.ms - # Polling a paused consumer should never yield a message. - assert self.__consumer.poll(0.1) is None + paused_partitions = set(self.__consumer.paused()) + unpaused_partitions = ( + set(self.__consumer.tell()) - paused_partitions + ) + if unpaused_partitions: + logger.warning( + "Processor in paused state while consumer is partially unpaused: %s, paused: %s", + unpaused_partitions, + paused_partitions, + ) + self.__is_paused = False + # unpause paused partitions... just in case a subset is paused + self.__metrics_buffer.incr_counter( + "arroyo.consumer.resume", 1 + ) + self.__consumer.resume([*paused_partitions]) + else: + # A paused consumer should still poll periodically to avoid it's partitions + # getting revoked by the broker after reaching the max.poll.interval.ms + # Polling a paused consumer should never yield a message. + assert self.__consumer.poll(0.1) is None else: time.sleep(0.01)