From aa6bd34bb6b9539a10a016a3220d43ff2ccc8bf4 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Wed, 18 Oct 2023 10:05:57 -0700 Subject: [PATCH] feat: Add optional received_p99 timestamp to commit log (#295) The value from the received field can be used in the future for subscription scheduling if this is provided. This is better than the `orig_message_ts` field as `received` is assigned at the very start of the pipeline when Sentry receives the event (as opposed to when Snuba gets the event). Switching to this field means any delays in ingestion will be properly accounted for when determining the window on which to schedule subscriptions. This PR also: - deprecates the legacy decoder since we have fully switched over to the new format - switches orig_message_ts from datetime to float. Converting between the two in encode/decode is pointless, and it introduces the possibility of timezone issues. Simpler to just keep it a unix timestamp everywhere. --- arroyo/backends/kafka/commit.py | 41 ++++++++------------------------- arroyo/commit.py | 6 ++--- tests/backends/test_commit.py | 14 ++++------- tests/backends/test_kafka.py | 6 +++-- 4 files changed, 21 insertions(+), 46 deletions(-) diff --git a/arroyo/backends/kafka/commit.py b/arroyo/backends/kafka/commit.py index b15ead84..151ea0b4 100644 --- a/arroyo/backends/kafka/commit.py +++ b/arroyo/backends/kafka/commit.py @@ -1,15 +1,10 @@ import json -from datetime import datetime from arroyo.backends.kafka import KafkaPayload from arroyo.commit import Commit from arroyo.types import Partition, Topic from arroyo.utils.codecs import Codec -# Kept in decode method for backward compatibility. Will be -# remove in a future release of Arroyo -DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" - class CommitCodec(Codec[KafkaPayload, Commit]): def encode(self, value: Commit) -> KafkaPayload: @@ -18,7 +13,8 @@ def encode(self, value: Commit) -> KafkaPayload: payload = json.dumps( { "offset": value.offset, - "orig_message_ts": datetime.timestamp(value.orig_message_ts), + "orig_message_ts": value.orig_message_ts, + "received_p99": value.received_p99, } ).encode("utf-8") @@ -30,28 +26,6 @@ def encode(self, value: Commit) -> KafkaPayload: [], ) - def decode_legacy(self, value: KafkaPayload) -> Commit: - key = value.key - if not isinstance(key, bytes): - raise TypeError("payload key must be a bytes object") - - val = value.value - if not isinstance(val, bytes): - raise TypeError("payload value must be a bytes object") - - headers = {k: v for (k, v) in value.headers} - orig_message_ts = datetime.strptime( - headers["orig_message_ts"].decode("utf-8"), DATETIME_FORMAT - ) - topic_name, partition_index, group = key.decode("utf-8").split(":", 3) - offset = int(val.decode("utf-8")) - return Commit( - group, - Partition(Topic(topic_name), int(partition_index)), - offset, - orig_message_ts, - ) - def decode(self, value: KafkaPayload) -> Commit: key = value.key if not isinstance(key, bytes): @@ -63,12 +37,14 @@ def decode(self, value: KafkaPayload) -> Commit: payload = val.decode("utf-8") - if payload.isnumeric(): - return self.decode_legacy(value) - decoded = json.loads(payload) offset = decoded["offset"] - orig_message_ts = datetime.fromtimestamp(decoded["orig_message_ts"]) + orig_message_ts = decoded["orig_message_ts"] + + if decoded.get("received_p99"): + received_ts = decoded["received_p99"] + else: + received_ts = None topic_name, partition_index, group = key.decode("utf-8").split(":", 3) @@ -77,4 +53,5 @@ def decode(self, value: KafkaPayload) -> Commit: Partition(Topic(topic_name), int(partition_index)), offset, orig_message_ts, + received_ts, ) diff --git a/arroyo/commit.py b/arroyo/commit.py index 691305ea..25b97577 100644 --- a/arroyo/commit.py +++ b/arroyo/commit.py @@ -2,7 +2,6 @@ import time from dataclasses import dataclass, field -from datetime import datetime from typing import Mapping, MutableMapping, Optional from arroyo.types import Partition @@ -61,9 +60,10 @@ def did_commit(self, now: float, offsets: Mapping[Partition, int]) -> None: @dataclass(frozen=True) class Commit: - __slots__ = ["group", "partition", "offset", "orig_message_ts"] + __slots__ = ["group", "partition", "offset", "orig_message_ts", "received_p99"] group: str partition: Partition offset: int - orig_message_ts: datetime + orig_message_ts: float + received_p99: Optional[float] diff --git a/tests/backends/test_commit.py b/tests/backends/test_commit.py index 2e1d958d..43888ab2 100644 --- a/tests/backends/test_commit.py +++ b/tests/backends/test_commit.py @@ -1,7 +1,6 @@ -from datetime import datetime +import time from arroyo.backends.kafka.commit import CommitCodec -from arroyo.backends.kafka import KafkaPayload from arroyo.commit import Commit from arroyo.types import Partition, Topic @@ -12,19 +11,16 @@ def test_encode_decode() -> None: offset_to_commit = 5 + now = time.time() + commit = Commit( "leader-a", Partition(topic, 0), offset_to_commit, - datetime.now(), + now, + now - 5, ) encoded = commit_codec.encode(commit) assert commit_codec.decode(encoded) == commit - -def test_decode_legacy() -> None: - legacy = KafkaPayload(b"topic:0:leader-a", b"5", [('orig_message_ts', b'2023-09-26T21:58:14.191325Z')]) - decoded = CommitCodec().decode(legacy) - assert decoded.offset == 5 - assert decoded.group == "leader-a" diff --git a/tests/backends/test_kafka.py b/tests/backends/test_kafka.py index 56c99278..296766b6 100644 --- a/tests/backends/test_kafka.py +++ b/tests/backends/test_kafka.py @@ -2,9 +2,9 @@ import itertools import os import pickle +import time import uuid from contextlib import closing -from datetime import datetime from pickle import PickleBuffer from typing import Any, Iterator, Mapping, MutableSequence, Optional from unittest import mock @@ -195,7 +195,9 @@ def test_consumer_stream_processor_shutdown(self) -> None: def test_commit_codec() -> None: - commit = Commit("group", Partition(Topic("topic"), 0), 0, datetime.now()) + commit = Commit( + "group", Partition(Topic("topic"), 0), 0, time.time(), time.time() - 5 + ) assert commit_codec.decode(commit_codec.encode(commit)) == commit