Skip to content

ref: Type out chains and steps #99

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
May 2, 2025
Merged

ref: Type out chains and steps #99

merged 27 commits into from
May 2, 2025

Conversation

ayirr7
Copy link
Member

@ayirr7 ayirr7 commented Apr 14, 2025

Does the following:

  • Links types of different steps of an ExtensibleChain
  • Provides a simple Message class to carry message metadata through the pipeline. This allows for decoupling certain steps, like parsing/deserialization from the stream source step.
  • Basic message decoders/encoders provided out of the box in the API (JSON, protobuf)
  • Basic header filtering in the source
  • Schema validation and transforming raw bytes into a schema-enforced message type
  • General type checks
  • Unit tests cleanup

Still to do:

  • Improve a bunch of naming (of interfaces, of files, etc.)
  • Provide full support for Protobuf

@ayirr7 ayirr7 marked this pull request as draft April 14, 2025 21:41
@ayirr7
Copy link
Member Author

ayirr7 commented Apr 18, 2025

Got some offline reviews from @untitaker

Currently working on:

  • Adding schema validation to the Parser step and updating an example to use that
  • Seeing if types for certain steps should be more narrow (i.e. what the Serializer outputs)

@ayirr7 ayirr7 changed the title Riya/chain typing ref: Type out chains and steps Apr 21, 2025


# a message with a generic payload
class Message(Generic[TIn]):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to intentionally keep Message as bare-bones as possible. For example, I don't think the user should have to think about:

headers after the source step
partition key
offset management
However, they can provide a schema for messages and any additional info. e.g. for the generic metrics pipeline, maybe they annotate each message with the metric type.

Do let me know if something important is missing though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree on hiding entirely offset management and partitions. They have no place in this abstraction.
I would revisit a few decisions though:

  • Headers. They are something the application logic may have to deal with for routing messages before payload parsing. The platform itself will need a way to make sense of message without the payload parsing. For exampe we will want to mark invalid messages or stale message
  • Timestamp. I think a concept of message timestamp is needed. Then the question is whether to introduce the broker timestamp and a second, optional event timestamp.

A few ideas:

  • Separate the concept of pre-parsing message and post-parsing. Before parsing you can only access headers and timestamp. After parsing you can access the parsed payload as well. This can be done with different Message classes or with the payload type. The goal is to discourage people from parsing bytes on their own. If the user wants to access the payload, it has to be a parsed payload.
  • Expose headers and separate them between internal ones managed by platform, mutable by the platform and the application ones: readonly and have to be present at the source. We will figure out later how to provide a more flexible set of application headers.
  • Add a timestamp field which is the source timestamp. We will figure out event timestamp another time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expose headers and separate them between internal ones managed by platform, mutable by the platform and the application ones: readonly and have to be present at the source. We will figure out later how to provide a more flexible set of application headers

With tasks, we have headers as as simple mutable map. Separating the headers out seems like additional complexity we could avoid. Our SDKs use message headers to pass trace_id and trace baggage which I assume you'll want for streaming as well.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the time being: add headers as a map, propagate in all steps, leave empty in reduce.
Add timestamp.


return RoutedValue(
route=Route(source=self.source, waypoints=[]),
payload=StreamsMessage(schema=schema, payload=value),
Copy link
Member Author

@ayirr7 ayirr7 Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This identifies the schema of the messages based on the topic it comes from. I chose to do it this way, where we wrap it in this Message and pass Message throughout the pipeline, so that the user can do flexible parsing/deserialization of messages. This way, instead of baking it into the Source, we can basically do parsing whenever in the pipeline using the Parser() step.

If we can just bake parsing/deserialization into the Source then all this is not needed.

"myunbatch",
FlatMap(
function=cast(
Union[Callable[[Message[MutableSequence[IngestMetric]]], Message[IngestMetric]], str],
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because unbatch is a function with generic arguments, the user unfortunately has to explicitly cast it to the concrete type in their pipeline. This shouldn't be difficult because the incoming ExtensibleChain already has concrete type hints.

Obviously this specific cast() can also be avoided if users write and use a custom unbatcher.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether it would be easier if we had Batch and Unbatch appliers without having to go through the FlatMap.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of the scope for now.

Serializer(serializer=json_serializer),
) # ExtensibleChain[bytes]

chain4 = chain3.sink(
Copy link
Member Author

@ayirr7 ayirr7 Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I split up the pipeline like this in the example because you can see the inferred type hints of each chain in a code editor

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please write it in a comment so who reads the code knows.

@ayirr7 ayirr7 marked this pull request as ready for review April 22, 2025 00:17
Copy link
Collaborator

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it goes in a good direction.

See the comments in line for details but my main feedback is to be more strict on the message interface. Specifically:

  • I would separate the concept of parsed message (only headers) and serialized (pre-parsing and post serialization) message where the payload is available already parsed. We will figure out later whether to relax this constraint. This would also make it clear that serialization has to happen before a sink.
  • Maybe later, introduce a difference in terms of types between serializable messages (where we can break segments) and any other type where we cannot break segments.

Also, what happens today for invalid messages? We will have to add support for the DLQ. It is ok to do it separately, but please call it out in the parser.

TRoute = TypeVar("TRoute")

TIn = TypeVar("TIn")
TOut = TypeVar("TOut")


# a message with a generic payload
class Message(Generic[TIn]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.
I think you want to preserve the timestamp though. That can be useful.
re: schema, I think it may be a good idea.


from sentry_kafka_schemas.codecs import Codec

TIn = TypeVar("TIn")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this would be better named as TPayload. TIn was used in the chain to represent the the input of a function as opposed to TOut



# a message with a generic payload
class Message(Generic[TIn]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree on hiding entirely offset management and partitions. They have no place in this abstraction.
I would revisit a few decisions though:

  • Headers. They are something the application logic may have to deal with for routing messages before payload parsing. The platform itself will need a way to make sense of message without the payload parsing. For exampe we will want to mark invalid messages or stale message
  • Timestamp. I think a concept of message timestamp is needed. Then the question is whether to introduce the broker timestamp and a second, optional event timestamp.

A few ideas:

  • Separate the concept of pre-parsing message and post-parsing. Before parsing you can only access headers and timestamp. After parsing you can access the parsed payload as well. This can be done with different Message classes or with the payload type. The goal is to discourage people from parsing bytes on their own. If the user wants to access the payload, it has to be a parsed payload.
  • Expose headers and separate them between internal ones managed by platform, mutable by the platform and the application ones: readonly and have to be present at the source. We will figure out later how to provide a more flexible set of application headers.
  • Add a timestamp field which is the source timestamp. We will figure out event timestamp another time.

"myunbatch",
FlatMap(
function=cast(
Union[Callable[[Message[MutableSequence[IngestMetric]]], Message[IngestMetric]], str],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether it would be easier if we had Batch and Unbatch appliers without having to go through the FlatMap.

@@ -0,0 +1,31 @@
from typing import Any

from sentry_streams.pipeline.message import Message
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit tests please ensuring it works for protobuf as well

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll need a protobuf message type published to do that. If you need a hand with sentry-protos let me know.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll come back to this after PTO in another PR. Thanks!

Comment on lines 31 to 35
msg = StreamsMessage(message.payload, [], now, None)

routed_msg: Message[RoutedValue] = Message(
Value(committable=message.value.committable, payload=RoutedValue(self.__route, msg))
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good sign that we will not stick with arroyo in the long run:
StreamsMessage is the payload of RoutedMessage
which is the payload of the value of Message.
That's a lot

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah unfortunately had to alias our top-level API's Message as a StreamsMessage in order to not clash with Arroyo's Message

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had kind of hoped for a solution where all of it can be managed at the top-level API code, but given that we want to expose some metadata to the user, like processing time, the StreamsMessage type has to be managed in the Arroyo steps as well

@@ -90,7 +90,7 @@ def add(self, value: Any) -> Self:
self.offsets[partition] = max(offsets[partition], self.offsets[partition])

else:
self.offsets.update(offsets)
self.offsets[partition] = offsets[partition]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Member Author

@ayirr7 ayirr7 May 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you expand the code snippet, there was a bug in the else block, that got fixed here

def test_adapter(
broker: LocalBroker[KafkaPayload],
pipeline: Pipeline,
metric: IngestMetric,
Copy link
Member Author

@ayirr7 ayirr7 May 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll probably stop spamming the IngestMetric type everywhere in tests and examples, and use a different one instead for some of them...especially where the topic name doesn't even match

Serializer(serializer=json_serializer),
) # ExtensibleChain[bytes]

chain4 = chain3.sink(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please write it in a comment so who reads the code knows.

@ayirr7 ayirr7 merged commit 800c11e into main May 2, 2025
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants