Skip to content

Commit

Permalink
ref: A better commit log format (#289)
Browse files Browse the repository at this point in the history
I really don't want to port this abomination to Rust, let's simplify the commit log format

Two main changes:
- uses float instead of weird datetime format we had before
- avoids putting important information in headers. This was a hack to avoid changing the message format earlier, but it should be avoided. The timestamp is now in the payload of the message

This is the first of 3 changes, encode method is unchanged, the decode method supports the new format.
  • Loading branch information
lynnagara authored Sep 28, 2023
1 parent 7c577d0 commit 9613627
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
32 changes: 31 additions & 1 deletion arroyo/backends/kafka/commit.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from datetime import datetime
from typing import Optional

Expand All @@ -6,6 +7,8 @@
from arroyo.types import Partition, Topic
from arroyo.utils.codecs import Codec

# Kept in decode method for backward compatibility. Will be
# remove in a future release of Arroyo
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"


Expand All @@ -28,7 +31,7 @@ def encode(self, value: Commit) -> KafkaPayload:
],
)

def decode(self, value: KafkaPayload) -> Commit:
def decode_legacy(self, value: KafkaPayload) -> Commit:
key = value.key
if not isinstance(key, bytes):
raise TypeError("payload key must be a bytes object")
Expand All @@ -53,3 +56,30 @@ def decode(self, value: KafkaPayload) -> Commit:
offset,
orig_message_ts,
)

def decode(self, value: KafkaPayload) -> Commit:
key = value.key
if not isinstance(key, bytes):
raise TypeError("payload key must be a bytes object")

val = value.value
if not isinstance(val, bytes):
raise TypeError("payload value must be a bytes object")

payload = val.decode("utf-8")

if payload.isnumeric():
return self.decode_legacy(value)

decoded = json.loads(payload)
offset = decoded["offset"]
orig_message_ts = datetime.fromtimestamp(decoded["orig_message_ts"])

topic_name, partition_index, group = key.decode("utf-8").split(":", 3)

return Commit(
group,
Partition(Topic(topic_name), int(partition_index)),
offset,
orig_message_ts,
)
7 changes: 7 additions & 0 deletions tests/backends/test_commit.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime

from arroyo.backends.kafka.commit import CommitCodec
from arroyo.backends.kafka import KafkaPayload
from arroyo.commit import Commit
from arroyo.types import Partition, Topic

Expand All @@ -21,3 +22,9 @@ def test_encode_decode() -> None:
encoded = commit_codec.encode(commit)

assert commit_codec.decode(encoded) == commit

def test_decode_legacy() -> None:
legacy = KafkaPayload(b"topic:0:leader-a", b"5", [('orig_message_ts', b'2023-09-26T21:58:14.191325Z')])
decoded = CommitCodec().decode(legacy)
assert decoded.offset == 5
assert decoded.group == "leader-a"

0 comments on commit 9613627

Please sign in to comment.