Skip to content

Commit

Permalink
ref: Combine DlqLimitState methods (#312)
Browse files Browse the repository at this point in the history
  • Loading branch information
loewenheim authored Dec 6, 2023
1 parent 7a2a871 commit a1cca96
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 15 deletions.
23 changes: 13 additions & 10 deletions arroyo/dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,16 @@ def __init__(
last_invalid_offsets or {}
)

def update_invalid_value(self, value: BrokerValue[TStrategyPayload]) -> None:
def record_invalid_message(self, value: BrokerValue[TStrategyPayload]) -> bool:
"""
This method should be called (prior to should_accept) with each invalid value
to update the count of valid and invalid messages
Records an invalid message.
This updates the internal statistics about the message's partition and
returns True if the message should be produced to the DLQ according to the
configured limit.
"""
if self.__limit is None:
return
return True

partition = value.partition

Expand All @@ -138,10 +141,11 @@ def update_invalid_value(self, value: BrokerValue[TStrategyPayload]) -> None:
self.__invalid_messages.get(partition, 0) + 1
)
self.__last_invalid_offsets[partition] = value.offset

def should_accept(self, value: BrokerValue[TStrategyPayload]) -> bool:
if self.__limit is None:
return True
else:
self.__valid_messages[partition] = 0
self.__invalid_messages[partition] = 1
self.__invalid_consecutive_messages[partition] = 1
self.__last_invalid_offsets[partition] = value.offset

if self.__limit.max_invalid_ratio is not None:
invalid = self.__invalid_messages.get(value.partition, 0)
Expand Down Expand Up @@ -365,8 +369,7 @@ def produce(self, message: BrokerValue[TStrategyPayload]) -> None:
values[0][1].result()
values.popleft()

self.__dlq_limit_state.update_invalid_value(message)
should_accept = self.__dlq_limit_state.should_accept(message)
should_accept = self.__dlq_limit_state.record_invalid_message(message)
if should_accept:
future = self.__dlq_policy.producer.produce(message)
self.__futures[message.partition].append((message, future))
Expand Down
7 changes: 2 additions & 5 deletions tests/test_dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,7 @@ def test_dlq_limit_state() -> None:

# 1 valid message followed by 4 invalid
for i in range(4, 9):
value = BrokerValue(i, partition, i, datetime.now())
state.update_invalid_value(value)
assert state.should_accept(value)
assert state.record_invalid_message(BrokerValue(i, partition, i, datetime.now()))

# Next message should not be accepted
state.update_invalid_value(BrokerValue(9, partition, 9, datetime.now()))
assert state.should_accept(value) == False
assert not state.record_invalid_message(BrokerValue(9, partition, 9, datetime.now()))

0 comments on commit a1cca96

Please sign in to comment.