Skip to content

Commit

Permalink
fix: Temporarily bring back support for legacy commit log format (#298)
Browse files Browse the repository at this point in the history
We experienced issues when deploying #295
to production. It was discovered that very old commit log entries are still present.
This is likely because the commit log topic has cleanup.policy=compact set, which
causes messages to never expire if they have a unique key.

The process of fixing this is underway: getsentry/snuba#4941
and getsentry/ops#8361

However we have to keep support for the legacy commit log format around for a while
until this work is complete.
  • Loading branch information
lynnagara authored Oct 30, 2023
1 parent b629c7d commit a2772a3
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
32 changes: 32 additions & 0 deletions arroyo/backends/kafka/commit.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import json
from datetime import datetime

from arroyo.backends.kafka import KafkaPayload
from arroyo.commit import Commit
from arroyo.types import Partition, Topic
from arroyo.utils.codecs import Codec

# Kept in decode method for backward compatibility. Will be
# remove in a future release of Arroyo
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"


class CommitCodec(Codec[KafkaPayload, Commit]):
def encode(self, value: Commit) -> KafkaPayload:
Expand Down Expand Up @@ -37,6 +42,9 @@ def decode(self, value: KafkaPayload) -> Commit:

payload = val.decode("utf-8")

if payload.isnumeric():
return self.decode_legacy(value)

decoded = json.loads(payload)
offset = decoded["offset"]
orig_message_ts = decoded["orig_message_ts"]
Expand All @@ -55,3 +63,27 @@ def decode(self, value: KafkaPayload) -> Commit:
orig_message_ts,
received_ts,
)

def decode_legacy(self, value: KafkaPayload) -> Commit:
key = value.key
if not isinstance(key, bytes):
raise TypeError("payload key must be a bytes object")

val = value.value
if not isinstance(val, bytes):
raise TypeError("payload value must be a bytes object")

headers = {k: v for (k, v) in value.headers}
orig_message_ts = datetime.strptime(
headers["orig_message_ts"].decode("utf-8"), DATETIME_FORMAT
)

topic_name, partition_index, group = key.decode("utf-8").split(":", 3)
offset = int(val.decode("utf-8"))
return Commit(
group,
Partition(Topic(topic_name), int(partition_index)),
offset,
orig_message_ts.timestamp(),
None,
)
10 changes: 10 additions & 0 deletions tests/backends/test_commit.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import time

from arroyo.backends.kafka import KafkaPayload
from arroyo.backends.kafka.commit import CommitCodec
from arroyo.commit import Commit
from arroyo.types import Partition, Topic
Expand All @@ -24,3 +25,12 @@ def test_encode_decode() -> None:
encoded = commit_codec.encode(commit)

assert commit_codec.decode(encoded) == commit


def test_decode_legacy() -> None:
legacy = KafkaPayload(
b"topic:0:leader-a", b"5", [("orig_message_ts", b"2023-09-26T21:58:14.191325Z")]
)
decoded = CommitCodec().decode(legacy)
assert decoded.offset == 5
assert decoded.group == "leader-a"

0 comments on commit a2772a3

Please sign in to comment.