diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index 06f73720..c9e7e619 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -446,10 +446,27 @@ def _run_once(self) -> None: self.__is_paused = True elif self.__is_paused: - # A paused consumer should still poll periodically to avoid it's partitions - # getting revoked by the broker after reaching the max.poll.interval.ms - # Polling a paused consumer should never yield a message. - assert self.__consumer.poll(0.1) is None + paused_partitions = set(self.__consumer.paused()) + unpaused_partitions = ( + set(self.__consumer.tell()) - paused_partitions + ) + if unpaused_partitions: + logger.warning( + "Processor in paused state while consumer is partially unpaused: %s, paused: %s", + unpaused_partitions, + paused_partitions, + ) + self.__is_paused = False + # unpause paused partitions... just in case a subset is paused + self.__metrics_buffer.incr_counter( + "arroyo.consumer.resume", 1 + ) + self.__consumer.resume([*paused_partitions]) + else: + # A paused consumer should still poll periodically to avoid it's partitions + # getting revoked by the broker after reaching the max.poll.interval.ms + # Polling a paused consumer should never yield a message. + assert self.__consumer.poll(0.1) is None else: time.sleep(0.01)