From e379abac13dadb10500e50014eee74050745fd15 Mon Sep 17 00:00:00 2001 From: Lyn Date: Wed, 29 Nov 2023 10:23:26 -0800 Subject: [PATCH] . --- arroyo/dlq.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/arroyo/dlq.py b/arroyo/dlq.py index fb45164c..0a5f7464 100644 --- a/arroyo/dlq.py +++ b/arroyo/dlq.py @@ -329,16 +329,19 @@ def __init__( ) -> None: self.MAX_PENDING_FUTURES = 1000 # This is a per partition max self.__dlq_policy = policy - if policy: - self.__futures: MutableMapping[ - Partition, - Deque[ - Tuple[ - BrokerValue[TStrategyPayload], - Future[BrokerValue[TStrategyPayload]], - ] - ], - ] = defaultdict(deque) + if policy is None: + return + + self.__futures: MutableMapping[ + Partition, + Deque[ + Tuple[ + BrokerValue[TStrategyPayload], + Future[BrokerValue[TStrategyPayload]], + ] + ], + ] = defaultdict(deque) + self.reset_offsets({}) def reset_offsets(self, assignment: Mapping[Partition, int]) -> None: """