Skip to content

Commit

Permalink
datalake/metrics: add ducktape test
Browse files Browse the repository at this point in the history
(cherry picked from commit ebb5374)
  • Loading branch information
bharathv authored and vbotbuildovich committed Nov 27, 2024
1 parent 4d4a9f2 commit 3ae2c7e
Showing 1 changed file with 50 additions and 0 deletions.
50 changes: 50 additions & 0 deletions tests/rptest/tests/datalake/datalake_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from rptest.clients.types import TopicSpec
from rptest.clients.rpk import RpkTool
from rptest.services.cluster import cluster
from random import randint

from rptest.services.redpanda import PandaproxyConfig, SchemaRegistryConfig, SISettings
from rptest.services.serde_client import SerdeClient
Expand All @@ -21,6 +22,12 @@
from rptest.tests.datalake.utils import supported_storage_types
from ducktape.mark import matrix
from ducktape.utils.util import wait_until
from rptest.services.metrics_check import MetricCheck

NO_SCHEMA_ERRORS = [
r'Must have parsed schema when using structured data mode',
r'Error translating data to binary record'
]


class DatalakeE2ETests(RedpandaTest):
Expand Down Expand Up @@ -184,3 +191,46 @@ def table_deleted():
dl.create_iceberg_enabled_topic(self.topic_name, partitions=5)
dl.produce_to_topic(self.topic_name, 1024, count)
dl.wait_for_translation(self.topic_name, msg_count=count)

@cluster(num_nodes=3, log_allow_list=NO_SCHEMA_ERRORS)
@matrix(cloud_storage_type=supported_storage_types())
def test_metrics(self, cloud_storage_type):

commit_lag = 'vectorized_cluster_partition_iceberg_offsets_pending_commit'
translation_lag = 'vectorized_cluster_partition_iceberg_offsets_pending_translation'

with DatalakeServices(self.test_ctx,
redpanda=self.redpanda,
filesystem_catalog_mode=False,
include_query_engines=[]) as dl:

dl.create_iceberg_enabled_topic(
self.topic_name,
partitions=1,
replicas=1,
iceberg_mode="value_schema_id_prefix")
count = randint(12, 21)
# Populate schemaless messages in schema-ed mode, this should
# hold up translation and commits
dl.produce_to_topic(self.topic_name, 1024, msg_count=count)

m = MetricCheck(self.redpanda.logger,
self.redpanda,
self.redpanda.nodes[0],
[commit_lag, translation_lag],
labels={
'namespace': 'kafka',
'topic': self.topic_name,
'partition': '0'
},
reduce=sum)
expectations = []
for metric in [commit_lag, translation_lag]:
expectations.append([metric, lambda _, val: val == count])

# Ensure lag metric builds up as expected.
wait_until(
lambda: m.evaluate(expectations),
timeout_sec=30,
backoff_sec=5,
err_msg=f"Timed out waiting for metrics to reach: {count}")

0 comments on commit 3ae2c7e

Please sign in to comment.