Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(filter): Handle MessageRejected correctly (INC-584) #313

Merged
merged 2 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions arroyo/processing/strategies/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Callable, MutableMapping, Optional, Union, cast

from arroyo.commit import CommitPolicy, CommitPolicyState
from arroyo.processing.strategies.abstract import ProcessingStrategy
from arroyo.processing.strategies.abstract import MessageRejected, ProcessingStrategy
from arroyo.types import (
FILTERED_PAYLOAD,
FilteredPayload,
Expand Down Expand Up @@ -73,7 +73,10 @@ def submit(
) -> None:
assert not self.__closed

policy = self.__commit_policy_state
now = time.time()
if policy is not None and policy.should_commit(now, self.__uncommitted_offsets):
self.__flush_uncommitted_offsets(now, can_backpressure=True)

if not isinstance(message.payload, FilteredPayload) and self.__test_function(
cast(Message[TStrategyPayload], message)
Expand All @@ -86,19 +89,27 @@ def submit(
if self.__commit_policy_state is not None:
self.__uncommitted_offsets.update(message.committable)

policy = self.__commit_policy_state

if policy is not None and policy.should_commit(now, message.committable):
self.__flush_uncommitted_offsets(now)
if policy is not None and policy.should_commit(now, self.__uncommitted_offsets):
# We cannot let MessageRejected propagate here. The caller will
# think it is for `message` (which has already been successfully
# submitted), and will double-send it.
self.__flush_uncommitted_offsets(now, can_backpressure=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be removed since you flush on line 78-79 now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this so that the tests continue to pass. But the tests were just overly strict in the behavior we assert. We don't actually need this exact behavior where should_commit is called after every submit


def __flush_uncommitted_offsets(self, now: float) -> None:
def __flush_uncommitted_offsets(self, now: float, can_backpressure: bool) -> None:
if not self.__uncommitted_offsets:
return

new_message: Message[Union[FilteredPayload, TStrategyPayload]] = Message(
Value(FILTERED_PAYLOAD, self.__uncommitted_offsets)
)
self.__next_step.submit(new_message)
try:
self.__next_step.submit(new_message)
except MessageRejected:
if can_backpressure:
raise
# We have little to gain from reattempting the submission.
# Filtering is not supposed to be that expensive.
return

if self.__commit_policy_state is not None:
self.__commit_policy_state.did_commit(now, self.__uncommitted_offsets)
Expand All @@ -115,6 +126,8 @@ def terminate(self) -> None:
self.__next_step.terminate()

def join(self, timeout: Optional[float] = None) -> None:
self.__flush_uncommitted_offsets(time.time())
# We cannot let MessageRejected propagate here. join() is not supposed
# to raise this exception at all.
self.__flush_uncommitted_offsets(time.time(), can_backpressure=False)
self.__next_step.close()
self.__next_step.join(timeout=timeout)
5 changes: 4 additions & 1 deletion arroyo/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class FilteredPayload:
def __eq__(self, other: Any) -> bool:
return isinstance(other, FilteredPayload)

def __repr__(self) -> str:
return "<FilteredPayload>"


FILTERED_PAYLOAD = FilteredPayload()

Expand Down Expand Up @@ -64,7 +67,7 @@ def __repr__(self) -> str:
# ``__slots__`` for performance reasons. The class variable names
# would conflict with the instance slot names, causing an error.

if type(self.payload) in (float, int):
if type(self.payload) in (float, int, bool, FilteredPayload):
# For the case where value is a float or int, the repr is small and
# therefore safe. This is very useful in tests.
#
Expand Down
90 changes: 89 additions & 1 deletion tests/processing/strategies/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
from typing import Union
from unittest.mock import Mock, call

import pytest

from arroyo.commit import CommitPolicy
from arroyo.processing.strategies.abstract import MessageRejected
from arroyo.processing.strategies.filter import FilterStep
from arroyo.types import (
FILTERED_PAYLOAD,
Expand Down Expand Up @@ -55,7 +58,7 @@ def test_function(message: Message[bool]) -> bool:
return message.payload

filter_step = FilterStep(
test_function, next_step, commit_policy=CommitPolicy(None, 3)
test_function, next_step, commit_policy=CommitPolicy(None, 4)
)

now = datetime.now()
Expand Down Expand Up @@ -160,3 +163,88 @@ def test_function(message: Message[bool]) -> bool:
call(Message(Value(True, {Partition(topic, 1): 3}, now))),
call(Message(Value(True, {Partition(topic, 1): 5}, now))),
]


def test_backpressure_in_join() -> None:
topic = Topic("topic")
next_step = Mock()
next_step.submit.side_effect = [None] * 6 + [MessageRejected] # type: ignore

now = datetime.now()

def test_function(message: Message[bool]) -> bool:
return message.payload

filter_step = FilterStep(
test_function, next_step, commit_policy=CommitPolicy(None, 3)
)

filter_step.submit(Message(Value(True, {Partition(topic, 1): 1}, now)))
filter_step.submit(Message(Value(False, {Partition(topic, 1): 2}, now)))
filter_step.submit(Message(Value(True, {Partition(topic, 1): 3}, now)))
filter_step.submit(Message(Value(False, {Partition(topic, 1): 4}, now)))
filter_step.submit(Message(Value(True, {Partition(topic, 1): 5}, now)))
filter_step.submit(Message(Value(False, {Partition(topic, 1): 6}, now)))

filter_step.join()

assert next_step.submit.mock_calls == [
call(Message(Value(True, {Partition(topic, 1): 1}, now))),
call(Message(Value(True, {Partition(topic, 1): 3}, now))),
call(Message(Value(FILTERED_PAYLOAD, {Partition(topic, 1): 4}))),
call(Message(Value(True, {Partition(topic, 1): 5}, now))),
call(Message(Value(FILTERED_PAYLOAD, {Partition(topic, 1): 6}))),
]


def test_backpressure_in_submit() -> None:
"""
Assert that MessageRejected is propagated for the right messages, and
handled correctly in join() (i.e. suppressed)
"""
topic = Topic("topic")
next_step = Mock()
next_step.submit.side_effect = [
MessageRejected,
None,
MessageRejected,
MessageRejected,
None,
]

now = datetime.now()

def test_function(message: Message[bool]) -> bool:
return message.payload

filter_step = FilterStep(
test_function, next_step, commit_policy=CommitPolicy(None, 3)
)

with pytest.raises(MessageRejected):
filter_step.submit(Message(Value(True, {Partition(topic, 1): 1}, now)))

filter_step.submit(Message(Value(True, {Partition(topic, 1): 1}, now)))

filter_step.submit(Message(Value(False, {Partition(topic, 1): 2}, now)))

assert next_step.submit.mock_calls == [
call(Message(Value(True, {Partition(topic, 1): 1}, now))),
call(Message(Value(True, {Partition(topic, 1): 1}, now))),
]

next_step.submit.mock_calls.clear()

filter_step.join()

assert next_step.submit.mock_calls == [
call(Message(Value(FILTERED_PAYLOAD, {Partition(topic, 1): 2}))),
]

next_step.submit.mock_calls.clear()

filter_step.join()

assert next_step.submit.mock_calls == [
call(Message(Value(FILTERED_PAYLOAD, {Partition(topic, 1): 2}))),
]
Loading