Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref: A better commit log format pt2 #290

Merged
merged 2 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 40 additions & 10 deletions arroyo/backends/kafka/commit.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from datetime import datetime
from typing import Optional

Expand All @@ -6,29 +7,31 @@
from arroyo.types import Partition, Topic
from arroyo.utils.codecs import Codec

# Kept in decode method for backward compatibility. Will be
# remove in a future release of Arroyo
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"


class CommitCodec(Codec[KafkaPayload, Commit]):
def encode(self, value: Commit) -> KafkaPayload:
assert value.orig_message_ts is not None

payload = json.dumps(
{
"offset": value.offset,
"orig_message_ts": datetime.timestamp(value.orig_message_ts),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for the alerts e2e SLO ?

Copy link
Member Author

@lynnagara lynnagara Sep 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, this is the same information that was already there, just in a float format and in the body not the header now so we don't need to do the custom decoding dance

}
).encode("utf-8")

return KafkaPayload(
f"{value.partition.topic.name}:{value.partition.index}:{value.group}".encode(
"utf-8"
),
f"{value.offset}".encode("utf-8"),
[
(
"orig_message_ts",
datetime.strftime(value.orig_message_ts, DATETIME_FORMAT).encode(
"utf-8"
),
)
],
payload,
[],
)

def decode(self, value: KafkaPayload) -> Commit:
def decode_legacy(self, value: KafkaPayload) -> Commit:
key = value.key
if not isinstance(key, bytes):
raise TypeError("payload key must be a bytes object")
Expand All @@ -53,3 +56,30 @@ def decode(self, value: KafkaPayload) -> Commit:
offset,
orig_message_ts,
)

def decode(self, value: KafkaPayload) -> Commit:
key = value.key
if not isinstance(key, bytes):
raise TypeError("payload key must be a bytes object")

val = value.value
if not isinstance(val, bytes):
raise TypeError("payload value must be a bytes object")

payload = val.decode("utf-8")

if payload.isnumeric():
return self.decode_legacy(value)

decoded = json.loads(payload)
offset = decoded["offset"]
orig_message_ts = datetime.fromtimestamp(decoded["orig_message_ts"])

topic_name, partition_index, group = key.decode("utf-8").split(":", 3)

return Commit(
group,
Partition(Topic(topic_name), int(partition_index)),
offset,
orig_message_ts,
)
7 changes: 7 additions & 0 deletions tests/backends/test_commit.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime

from arroyo.backends.kafka.commit import CommitCodec
from arroyo.backends.kafka import KafkaPayload
from arroyo.commit import Commit
from arroyo.types import Partition, Topic

Expand All @@ -21,3 +22,9 @@ def test_encode_decode() -> None:
encoded = commit_codec.encode(commit)

assert commit_codec.decode(encoded) == commit

def test_decode_legacy() -> None:
legacy = KafkaPayload(b"topic:0:leader-a", b"5", [('orig_message_ts', b'2023-09-26T21:58:14.191325Z')])
decoded = CommitCodec().decode(legacy)
assert decoded.offset == 5
assert decoded.group == "leader-a"
Loading