Skip to content

Commit

Permalink
ref: Add extra logging for debugging commits (#302)
Browse files Browse the repository at this point in the history
There is a mystery right now which is causing (probably) very old
commit messages to still be present. This seems to be the case
even after we switched the cleanup policy on the topic to `compact,delete`
from `compact` so that messages eventually expire (they previously never expired).
This adds some logging to help us figure out where/when these old messages came
from and helps us determine when the legacy decoder can be safely removed.
  • Loading branch information
lynnagara authored Nov 8, 2023
1 parent 9301e0f commit 48c47e4
Showing 1 changed file with 14 additions and 1 deletion.
15 changes: 14 additions & 1 deletion arroyo/backends/kafka/commit.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging
from datetime import datetime

from arroyo.backends.kafka import KafkaPayload
Expand All @@ -10,6 +11,10 @@
# remove in a future release of Arroyo
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"

logger = logging.getLogger(__name__)

max_times_to_log_legacy_message = 10


class CommitCodec(Codec[KafkaPayload, Commit]):
def encode(self, value: Commit) -> KafkaPayload:
Expand Down Expand Up @@ -65,6 +70,7 @@ def decode(self, value: KafkaPayload) -> Commit:
)

def decode_legacy(self, value: KafkaPayload) -> Commit:
global max_times_to_log_legacy_message
key = value.key
if not isinstance(key, bytes):
raise TypeError("payload key must be a bytes object")
Expand All @@ -80,10 +86,17 @@ def decode_legacy(self, value: KafkaPayload) -> Commit:

topic_name, partition_index, group = key.decode("utf-8").split(":", 3)
offset = int(val.decode("utf-8"))
return Commit(

commit = Commit(
group,
Partition(Topic(topic_name), int(partition_index)),
offset,
orig_message_ts.timestamp(),
None,
)

if max_times_to_log_legacy_message > 0:
max_times_to_log_legacy_message -= 1
logger.warn(f"Legacy commit message found: {commit}")

return commit

0 comments on commit 48c47e4

Please sign in to comment.