-
-
Notifications
You must be signed in to change notification settings - Fork 0
ref: Type out chains and steps #99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
45a3873
basic typing between steps, serializer and deserializer
ayirr7 6d9cecd
some basic header filtering in source
ayirr7 33eda66
some minor clean up
ayirr7 66f7f37
remove random comment
ayirr7 efd8e1a
Merge remote-tracking branch 'origin' into riya/chain-typing
ayirr7 501dab8
some Message typing
ayirr7 794bf32
wip use Message[]
ayirr7 a81fb8b
remove concept of Message
ayirr7 dcd7a27
schema validation basics
ayirr7 63d5a73
working transformer example
ayirr7 a5c0204
fix sm batchin
ayirr7 dd68893
fix blq.py typing
ayirr7 b9840b2
type checking is fixed
ayirr7 4d9de0e
assert StreamSources in arroyo adapter
ayirr7 5f41865
somem cleanups
ayirr7 26abe36
more cleanup
ayirr7 9a537e3
support protobuf
ayirr7 ad4efd4
change name to msg_parser
ayirr7 1c226a2
some comment
ayirr7 c7dcabf
remove deserializer and serializer
ayirr7 1b5425e
an immutable message interface
ayirr7 619643a
somehow fixed all of the tests
ayirr7 39ae480
add some comments, don't get_codec for every msg
ayirr7 4e481f4
fix sm test
ayirr7 8ed489f
make flatmap use mutablesequence
ayirr7 dc0b600
make reduce tests better
ayirr7 418dc93
more comments
ayirr7 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
62 changes: 62 additions & 0 deletions
62
sentry_streams/sentry_streams/adapters/arroyo/msg_wrapper.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
import time | ||
from typing import Optional, TypeVar, Union, cast | ||
|
||
from arroyo.processing.strategies.abstract import ProcessingStrategy | ||
from arroyo.types import FilteredPayload, Message, Value | ||
|
||
from sentry_streams.adapters.arroyo.routes import Route, RoutedValue | ||
from sentry_streams.pipeline.message import Message as StreamsMessage | ||
|
||
TPayload = TypeVar("TPayload") | ||
|
||
|
||
class MessageWrapper(ProcessingStrategy[Union[FilteredPayload, TPayload]]): | ||
""" | ||
Custom processing strategy which can wrap payloads coming from the previous step | ||
into a Message. In the case that the previous step already forwards a Message | ||
or a FilteredPayload, this strategy will simply forward that as well to the | ||
next step. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
route: Route, | ||
next_step: ProcessingStrategy[Union[FilteredPayload, RoutedValue]], | ||
) -> None: | ||
self.__next_step = next_step | ||
self.__route = route | ||
|
||
def submit(self, message: Message[Union[FilteredPayload, TPayload]]) -> None: | ||
now = time.time() | ||
if not isinstance(message.payload, FilteredPayload): | ||
|
||
if isinstance(message.payload, RoutedValue): | ||
# No need to wrap a StreamsMessage in StreamsMessage() again | ||
# This case occurs when prior strategy is forwarding a message that belongs on a separate route | ||
assert isinstance(message.payload.payload, StreamsMessage) | ||
self.__next_step.submit(cast(Message[Union[FilteredPayload, RoutedValue]], message)) | ||
else: | ||
msg = StreamsMessage(message.payload, [], now, None) | ||
|
||
routed_msg: Message[RoutedValue] = Message( | ||
Value( | ||
committable=message.value.committable, | ||
payload=RoutedValue(self.__route, msg), | ||
) | ||
) | ||
self.__next_step.submit(routed_msg) | ||
|
||
else: | ||
self.__next_step.submit(cast(Message[FilteredPayload], message)) | ||
|
||
def poll(self) -> None: | ||
self.__next_step.poll() | ||
|
||
def join(self, timeout: Optional[float] = None) -> None: | ||
self.__next_step.join(timeout) | ||
|
||
def close(self) -> None: | ||
self.__next_step.close() | ||
|
||
def terminate(self) -> None: | ||
self.__next_step.terminate() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you expand the code snippet, there was a bug in the
else
block, that got fixed here