-
-
Notifications
You must be signed in to change notification settings - Fork 0
ref: Type out chains and steps #99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Got some offline reviews from @untitaker Currently working on:
|
|
||
|
||
# a message with a generic payload | ||
class Message(Generic[TIn]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am trying to intentionally keep Message as bare-bones as possible. For example, I don't think the user should have to think about:
headers after the source step
partition key
offset management
However, they can provide a schema for messages and any additional info. e.g. for the generic metrics pipeline, maybe they annotate each message with the metric type.
Do let me know if something important is missing though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree on hiding entirely offset management and partitions. They have no place in this abstraction.
I would revisit a few decisions though:
- Headers. They are something the application logic may have to deal with for routing messages before payload parsing. The platform itself will need a way to make sense of message without the payload parsing. For exampe we will want to mark invalid messages or stale message
- Timestamp. I think a concept of message timestamp is needed. Then the question is whether to introduce the broker timestamp and a second, optional event timestamp.
A few ideas:
- Separate the concept of pre-parsing message and post-parsing. Before parsing you can only access headers and timestamp. After parsing you can access the parsed payload as well. This can be done with different Message classes or with the payload type. The goal is to discourage people from parsing bytes on their own. If the user wants to access the payload, it has to be a parsed payload.
- Expose headers and separate them between internal ones managed by platform, mutable by the platform and the application ones: readonly and have to be present at the source. We will figure out later how to provide a more flexible set of application headers.
- Add a timestamp field which is the source timestamp. We will figure out event timestamp another time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expose headers and separate them between internal ones managed by platform, mutable by the platform and the application ones: readonly and have to be present at the source. We will figure out later how to provide a more flexible set of application headers
With tasks, we have headers as as simple mutable map. Separating the headers out seems like additional complexity we could avoid. Our SDKs use message headers to pass trace_id and trace baggage which I assume you'll want for streaming as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the time being: add headers as a map, propagate in all steps, leave empty in reduce.
Add timestamp.
|
||
return RoutedValue( | ||
route=Route(source=self.source, waypoints=[]), | ||
payload=StreamsMessage(schema=schema, payload=value), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This identifies the schema of the messages based on the topic it comes from. I chose to do it this way, where we wrap it in this Message
and pass Message
throughout the pipeline, so that the user can do flexible parsing/deserialization of messages. This way, instead of baking it into the Source, we can basically do parsing whenever in the pipeline using the Parser()
step.
If we can just bake parsing/deserialization into the Source then all this is not needed.
"myunbatch", | ||
FlatMap( | ||
function=cast( | ||
Union[Callable[[Message[MutableSequence[IngestMetric]]], Message[IngestMetric]], str], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because unbatch
is a function with generic arguments, the user unfortunately has to explicitly cast it to the concrete type in their pipeline. This shouldn't be difficult because the incoming ExtensibleChain
already has concrete type hints.
Obviously this specific cast()
can also be avoided if users write and use a custom unbatcher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether it would be easier if we had Batch
and Unbatch
appliers without having to go through the FlatMap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out of the scope for now.
Serializer(serializer=json_serializer), | ||
) # ExtensibleChain[bytes] | ||
|
||
chain4 = chain3.sink( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I split up the pipeline like this in the example because you can see the inferred type hints of each chain in a code editor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please write it in a comment so who reads the code knows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it goes in a good direction.
See the comments in line for details but my main feedback is to be more strict on the message interface. Specifically:
- I would separate the concept of parsed message (only headers) and serialized (pre-parsing and post serialization) message where the payload is available already parsed. We will figure out later whether to relax this constraint. This would also make it clear that serialization has to happen before a sink.
- Maybe later, introduce a difference in terms of types between serializable messages (where we can break segments) and any other type where we cannot break segments.
Also, what happens today for invalid messages? We will have to add support for the DLQ. It is ok to do it separately, but please call it out in the parser.
TRoute = TypeVar("TRoute") | ||
|
||
TIn = TypeVar("TIn") | ||
TOut = TypeVar("TOut") | ||
|
||
|
||
# a message with a generic payload | ||
class Message(Generic[TIn]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
I think you want to preserve the timestamp though. That can be useful.
re: schema, I think it may be a good idea.
|
||
from sentry_kafka_schemas.codecs import Codec | ||
|
||
TIn = TypeVar("TIn") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this would be better named as TPayload
. TIn
was used in the chain to represent the the input of a function as opposed to TOut
|
||
|
||
# a message with a generic payload | ||
class Message(Generic[TIn]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree on hiding entirely offset management and partitions. They have no place in this abstraction.
I would revisit a few decisions though:
- Headers. They are something the application logic may have to deal with for routing messages before payload parsing. The platform itself will need a way to make sense of message without the payload parsing. For exampe we will want to mark invalid messages or stale message
- Timestamp. I think a concept of message timestamp is needed. Then the question is whether to introduce the broker timestamp and a second, optional event timestamp.
A few ideas:
- Separate the concept of pre-parsing message and post-parsing. Before parsing you can only access headers and timestamp. After parsing you can access the parsed payload as well. This can be done with different Message classes or with the payload type. The goal is to discourage people from parsing bytes on their own. If the user wants to access the payload, it has to be a parsed payload.
- Expose headers and separate them between internal ones managed by platform, mutable by the platform and the application ones: readonly and have to be present at the source. We will figure out later how to provide a more flexible set of application headers.
- Add a timestamp field which is the source timestamp. We will figure out event timestamp another time.
"myunbatch", | ||
FlatMap( | ||
function=cast( | ||
Union[Callable[[Message[MutableSequence[IngestMetric]]], Message[IngestMetric]], str], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether it would be easier if we had Batch
and Unbatch
appliers without having to go through the FlatMap.
@@ -0,0 +1,31 @@ | |||
from typing import Any | |||
|
|||
from sentry_streams.pipeline.message import Message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unit tests please ensuring it works for protobuf as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll need a protobuf message type published to do that. If you need a hand with sentry-protos let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll come back to this after PTO in another PR. Thanks!
msg = StreamsMessage(message.payload, [], now, None) | ||
|
||
routed_msg: Message[RoutedValue] = Message( | ||
Value(committable=message.value.committable, payload=RoutedValue(self.__route, msg)) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good sign that we will not stick with arroyo in the long run:
StreamsMessage is the payload of RoutedMessage
which is the payload of the value of Message.
That's a lot
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah unfortunately had to alias our top-level API's Message
as a StreamsMessage
in order to not clash with Arroyo's Message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had kind of hoped for a solution where all of it can be managed at the top-level API code, but given that we want to expose some metadata to the user, like processing time, the StreamsMessage
type has to be managed in the Arroyo steps as well
@@ -90,7 +90,7 @@ def add(self, value: Any) -> Self: | |||
self.offsets[partition] = max(offsets[partition], self.offsets[partition]) | |||
|
|||
else: | |||
self.offsets.update(offsets) | |||
self.offsets[partition] = offsets[partition] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you expand the code snippet, there was a bug in the else
block, that got fixed here
def test_adapter( | ||
broker: LocalBroker[KafkaPayload], | ||
pipeline: Pipeline, | ||
metric: IngestMetric, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll probably stop spamming the IngestMetric
type everywhere in tests and examples, and use a different one instead for some of them...especially where the topic name doesn't even match
Serializer(serializer=json_serializer), | ||
) # ExtensibleChain[bytes] | ||
|
||
chain4 = chain3.sink( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please write it in a comment so who reads the code knows.
Does the following:
Still to do: