Skip to content

ref: Type out chains and steps #99

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
May 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ repos:
responses,
"sentry-arroyo>=2.18.2",
types-pyYAML,
types-jsonschema
types-jsonschema,
"sentry-kafka-schemas>=1.2.0",
]
files: ^sentry_streams/.+
- repo: https://github.com/pycqa/isort
Expand Down
1 change: 1 addition & 0 deletions sentry_streams/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies = [
"sentry-arroyo>=2.18.2",
"pyyaml>=6.0.2",
"jsonschema>=4.23.0",
"sentry-kafka-schemas>=1.2.0",
]

[dependency-groups]
Expand Down
14 changes: 13 additions & 1 deletion sentry_streams/sentry_streams/adapters/arroyo/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from arroyo.backends.kafka.consumer import KafkaConsumer, KafkaPayload, KafkaProducer
from arroyo.processing.processor import StreamProcessor
from arroyo.types import Topic
from sentry_kafka_schemas import get_codec
from sentry_kafka_schemas.codecs import Codec

from sentry_streams.adapters.arroyo.consumer import (
ArroyoConsumer,
Expand Down Expand Up @@ -148,7 +150,17 @@ def source(self, step: Source) -> Route:
"""
source_name = step.name
self.__sources.add_source(step)
self.__consumers[source_name] = ArroyoConsumer(source_name)

# This is the Arroyo adapter, and it only supports consuming from StreamSource anyways
assert isinstance(step, StreamSource)
try:
schema: Codec[Any] = get_codec(step.stream_name)
except Exception:
raise ValueError(f"Kafka topic {step.stream_name} has no associated schema")

self.__consumers[source_name] = ArroyoConsumer(
source_name, step.stream_name, schema, step.header_filter
)

return Route(source_name, [])

Expand Down
9 changes: 8 additions & 1 deletion sentry_streams/sentry_streams/adapters/arroyo/broadcaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from arroyo.types import FilteredPayload, Message, Partition, Value

from sentry_streams.adapters.arroyo.routes import Route, RoutedValue
from sentry_streams.pipeline.message import Message as StreamsMessage


@dataclass(eq=True)
Expand Down Expand Up @@ -78,6 +79,7 @@ def submit(self, message: Message[Union[FilteredPayload, RoutedValue]]) -> None:
for branch in self.__downstream_branches:
msg_copy = cast(Message[RoutedValue], deepcopy(message))
copy_payload = msg_copy.value.payload
streams_msg = copy_payload.payload
routed_copy = Message(
Value(
committable=msg_copy.value.committable,
Expand All @@ -87,7 +89,12 @@ def submit(self, message: Message[Union[FilteredPayload, RoutedValue]]) -> None:
source=copy_payload.route.source,
waypoints=[*copy_payload.route.waypoints, branch],
),
payload=copy_payload.payload,
payload=StreamsMessage(
streams_msg.payload,
streams_msg.headers,
streams_msg.timestamp,
streams_msg.schema,
),
),
)
)
Expand Down
31 changes: 22 additions & 9 deletions sentry_streams/sentry_streams/adapters/arroyo/consumer.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import logging
import time
from dataclasses import dataclass, field
from typing import Any, Mapping, MutableSequence
from typing import Any, Mapping, MutableSequence, Optional, Tuple, Union

from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.backends.kafka.consumer import Headers, KafkaPayload
from arroyo.processing.strategies import CommitOffsets
from arroyo.processing.strategies.abstract import (
ProcessingStrategy,
ProcessingStrategyFactory,
)
from arroyo.processing.strategies.run_task import RunTask
from arroyo.types import (
Commit,
Message,
Partition,
)
from arroyo.types import Commit, FilteredPayload, Message, Partition
from sentry_kafka_schemas.codecs import Codec

from sentry_streams.adapters.arroyo.routes import Route, RoutedValue
from sentry_streams.adapters.arroyo.steps import ArroyoStep
from sentry_streams.pipeline.message import Message as StreamsMessage

logger = logging.getLogger(__name__)

Expand All @@ -41,6 +40,9 @@ class ArroyoConsumer:
"""

source: str
stream_name: str
schema: Codec[Any]
header_filter: Optional[Tuple[str, bytes]] = None
steps: MutableSequence[ArroyoStep] = field(default_factory=list)

def add_step(self, step: ArroyoStep) -> None:
Expand All @@ -59,9 +61,20 @@ def build_strategy(self, commit: Commit) -> ProcessingStrategy[Any]:
follow.
"""

def add_route(message: Message[KafkaPayload]) -> RoutedValue:
def add_route(message: Message[KafkaPayload]) -> Union[FilteredPayload, RoutedValue]:
headers: Headers = message.payload.headers
if self.header_filter and self.header_filter not in headers:
return FilteredPayload()

broker_timestamp = message.timestamp.timestamp() if message.timestamp else time.time()
value = message.payload.value
return RoutedValue(route=Route(source=self.source, waypoints=[]), payload=value)

return RoutedValue(
route=Route(source=self.source, waypoints=[]),
payload=StreamsMessage(
payload=value, headers=headers, timestamp=broker_timestamp, schema=self.schema
),
)

strategy: ProcessingStrategy[Any] = CommitOffsets(commit)
for step in reversed(self.steps):
Expand Down
4 changes: 3 additions & 1 deletion sentry_streams/sentry_streams/adapters/arroyo/forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ def __init__(
def submit(self, message: Message[Union[FilteredPayload, RoutedValue]]) -> None:
message_payload = message.value.payload
if isinstance(message_payload, RoutedValue) and message_payload.route == self.__route:
# TODO: get headers from the StreamsMessage
assert isinstance(message_payload.payload.payload, bytes)
kafka_payload = message.value.replace(
KafkaPayload(None, str(message_payload.payload).encode("utf-8"), [])
KafkaPayload(None, message_payload.payload.payload, [])
)
self.__produce_step.submit(Message(kafka_payload))
else:
Expand Down
62 changes: 62 additions & 0 deletions sentry_streams/sentry_streams/adapters/arroyo/msg_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import time
from typing import Optional, TypeVar, Union, cast

from arroyo.processing.strategies.abstract import ProcessingStrategy
from arroyo.types import FilteredPayload, Message, Value

from sentry_streams.adapters.arroyo.routes import Route, RoutedValue
from sentry_streams.pipeline.message import Message as StreamsMessage

TPayload = TypeVar("TPayload")


class MessageWrapper(ProcessingStrategy[Union[FilteredPayload, TPayload]]):
"""
Custom processing strategy which can wrap payloads coming from the previous step
into a Message. In the case that the previous step already forwards a Message
or a FilteredPayload, this strategy will simply forward that as well to the
next step.
"""

def __init__(
self,
route: Route,
next_step: ProcessingStrategy[Union[FilteredPayload, RoutedValue]],
) -> None:
self.__next_step = next_step
self.__route = route

def submit(self, message: Message[Union[FilteredPayload, TPayload]]) -> None:
now = time.time()
if not isinstance(message.payload, FilteredPayload):

if isinstance(message.payload, RoutedValue):
# No need to wrap a StreamsMessage in StreamsMessage() again
# This case occurs when prior strategy is forwarding a message that belongs on a separate route
assert isinstance(message.payload.payload, StreamsMessage)
self.__next_step.submit(cast(Message[Union[FilteredPayload, RoutedValue]], message))
else:
msg = StreamsMessage(message.payload, [], now, None)

routed_msg: Message[RoutedValue] = Message(
Value(
committable=message.value.committable,
payload=RoutedValue(self.__route, msg),
)
)
self.__next_step.submit(routed_msg)

else:
self.__next_step.submit(cast(Message[FilteredPayload], message))

def poll(self) -> None:
self.__next_step.poll()

def join(self, timeout: Optional[float] = None) -> None:
self.__next_step.join(timeout)

def close(self) -> None:
self.__next_step.close()

def terminate(self) -> None:
self.__next_step.terminate()
33 changes: 16 additions & 17 deletions sentry_streams/sentry_streams/adapters/arroyo/reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def add(self, value: Any) -> Self:
self.offsets[partition] = max(offsets[partition], self.offsets[partition])

else:
self.offsets.update(offsets)
self.offsets[partition] = offsets[partition]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Member Author

@ayirr7 ayirr7 May 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you expand the code snippet, there was a bug in the else block, that got fixed here


return self

Expand Down Expand Up @@ -127,15 +127,15 @@ def __init__(
window_size: float,
window_slide: float,
acc: Callable[[], Accumulator[Any, Any]],
next_step: ProcessingStrategy[TResult],
next_step: ProcessingStrategy[Union[FilteredPayload, TResult]],
route: Route,
) -> None:

self.window_count = int(window_size / window_slide)
self.window_size = int(window_size)
self.window_slide = int(window_slide)

self.next_step = next_step
self.msg_wrap_step = next_step
self.start_time = time.time()
self.route = route

Expand Down Expand Up @@ -180,9 +180,8 @@ def __merge_and_flush(self, window_id: int) -> None:

# If there is a gap in the data, it is possible to have empty flushes
if payload:
result = RoutedValue(self.route, payload)
self.next_step.submit(
Message(Value(cast(TResult, result), merged_window.get_offsets()))
self.msg_wrap_step.submit(
Message(Value(cast(TResult, payload), merged_window.get_offsets()))
)

# Refresh only the accumulator that was the first
Expand All @@ -206,12 +205,12 @@ def __maybe_flush(self, cur_time: float) -> None:
def submit(self, message: Message[Union[FilteredPayload, TPayload]]) -> None:
value = message.payload
if isinstance(value, FilteredPayload):
self.next_step.submit(cast(Message[TResult], message))
self.msg_wrap_step.submit(cast(Message[Union[FilteredPayload, TResult]], message))
return

assert isinstance(value, RoutedValue)
if value.route != self.route:
self.next_step.submit(cast(Message[TResult], message))
self.msg_wrap_step.submit(cast(Message[Union[FilteredPayload, TResult]], message))
return

cur_time = time.time() - self.start_time
Expand All @@ -225,23 +224,23 @@ def poll(self) -> None:
cur_time = time.time() - self.start_time
self.__maybe_flush(cur_time)

self.next_step.poll()
self.msg_wrap_step.poll()

def close(self) -> None:
self.next_step.close()
self.msg_wrap_step.close()

def terminate(self) -> None:
self.next_step.terminate()
self.msg_wrap_step.terminate()

def join(self, timeout: Optional[float] = None) -> None:
self.next_step.close()
self.next_step.join()
self.msg_wrap_step.close()
self.msg_wrap_step.join()


def build_arroyo_windowed_reduce(
streams_window: Window[MeasurementUnit],
accumulator: Callable[[], Accumulator[Any, Any]],
next_step: ProcessingStrategy[Union[FilteredPayload, TPayload]],
msg_wrapper: ProcessingStrategy[Union[FilteredPayload, TPayload]],
route: Route,
) -> ProcessingStrategy[Union[FilteredPayload, TPayload]]:

Expand All @@ -268,7 +267,7 @@ def build_arroyo_windowed_reduce(
size,
slide,
accumulator,
next_step,
msg_wrapper,
route,
)

Expand All @@ -294,15 +293,15 @@ def build_arroyo_windowed_reduce(
arroyo_acc.accumulator,
),
arroyo_acc.initial_value,
next_step,
msg_wrapper,
)

case timedelta():
return TimeWindowedReduce(
window_size.total_seconds(),
window_size.total_seconds(),
accumulator,
next_step,
msg_wrapper,
route,
)

Expand Down
4 changes: 3 additions & 1 deletion sentry_streams/sentry_streams/adapters/arroyo/routes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from dataclasses import dataclass
from typing import Any, MutableSequence

from sentry_streams.pipeline.message import Message as StreamsMessage


@dataclass(frozen=True)
class Route:
Expand All @@ -23,4 +25,4 @@ class Route:
@dataclass(frozen=True)
class RoutedValue:
route: Route
payload: Any
payload: StreamsMessage[Any]
Loading