diff --git a/arroyo/backends/kafka/commit.py b/arroyo/backends/kafka/commit.py index acdcad56..d28f183f 100644 --- a/arroyo/backends/kafka/commit.py +++ b/arroyo/backends/kafka/commit.py @@ -1,3 +1,4 @@ +import json from datetime import datetime from typing import Optional @@ -6,6 +7,8 @@ from arroyo.types import Partition, Topic from arroyo.utils.codecs import Codec +# Kept in decode method for backward compatibility. Will be +# remove in a future release of Arroyo DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" @@ -28,7 +31,7 @@ def encode(self, value: Commit) -> KafkaPayload: ], ) - def decode(self, value: KafkaPayload) -> Commit: + def decode_legacy(self, value: KafkaPayload) -> Commit: key = value.key if not isinstance(key, bytes): raise TypeError("payload key must be a bytes object") @@ -53,3 +56,30 @@ def decode(self, value: KafkaPayload) -> Commit: offset, orig_message_ts, ) + + def decode(self, value: KafkaPayload) -> Commit: + key = value.key + if not isinstance(key, bytes): + raise TypeError("payload key must be a bytes object") + + val = value.value + if not isinstance(val, bytes): + raise TypeError("payload value must be a bytes object") + + payload = val.decode("utf-8") + + if payload.isnumeric(): + return self.decode_legacy(value) + + decoded = json.loads(payload) + offset = decoded["offset"] + orig_message_ts = datetime.fromtimestamp(decoded["orig_message_ts"]) + + topic_name, partition_index, group = key.decode("utf-8").split(":", 3) + + return Commit( + group, + Partition(Topic(topic_name), int(partition_index)), + offset, + orig_message_ts, + ) diff --git a/tests/backends/test_commit.py b/tests/backends/test_commit.py index 59e12614..2e1d958d 100644 --- a/tests/backends/test_commit.py +++ b/tests/backends/test_commit.py @@ -1,6 +1,7 @@ from datetime import datetime from arroyo.backends.kafka.commit import CommitCodec +from arroyo.backends.kafka import KafkaPayload from arroyo.commit import Commit from arroyo.types import Partition, Topic @@ -21,3 +22,9 @@ def test_encode_decode() -> None: encoded = commit_codec.encode(commit) assert commit_codec.decode(encoded) == commit + +def test_decode_legacy() -> None: + legacy = KafkaPayload(b"topic:0:leader-a", b"5", [('orig_message_ts', b'2023-09-26T21:58:14.191325Z')]) + decoded = CommitCodec().decode(legacy) + assert decoded.offset == 5 + assert decoded.group == "leader-a"