Skip to content

Commit

Permalink
datalake/metrics: add a metric for iceberg commit lag.
Browse files Browse the repository at this point in the history
(cherry picked from commit a511e22)
  • Loading branch information
bharathv authored and vbotbuildovich committed Nov 27, 2024
1 parent d06acc8 commit 4d4a9f2
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 0 deletions.
10 changes: 10 additions & 0 deletions src/v/cluster/partition_probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,16 @@ void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) {
sm::description("Total number of offsets that are pending "
"translation to iceberg."),
labels),
sm::make_gauge(
"iceberg_offsets_pending_commit",
[this] {
return _partition.log()->config().iceberg_enabled()
? _iceberg_commit_offset_lag
: metric_feature_disabled_state;
},
sm::description("Total number of offsets that are pending "
"commit to iceberg catalog."),
labels),
},
{},
{sm::shard_label, partition_label});
Expand Down
10 changes: 10 additions & 0 deletions src/v/cluster/partition_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class partition_probe {
virtual void add_bytes_fetched_from_follower(uint64_t) = 0;
virtual void add_schema_id_validation_failed() = 0;
virtual void update_iceberg_translation_offset_lag(int64_t) = 0;
virtual void update_iceberg_commit_offset_lag(int64_t) = 0;
virtual void setup_metrics(const model::ntp&) = 0;
virtual void clear_metrics() = 0;
virtual ~impl() noexcept = default;
Expand Down Expand Up @@ -71,6 +72,10 @@ class partition_probe {
_impl->update_iceberg_translation_offset_lag(new_lag);
}

void update_iceberg_commit_offset_lag(int64_t new_lag) {
_impl->update_iceberg_commit_offset_lag(new_lag);
}

void clear_metrics() { _impl->clear_metrics(); }

private:
Expand All @@ -97,6 +102,10 @@ class replicated_partition_probe : public partition_probe::impl {
_iceberg_translation_offset_lag = new_lag;
}

void update_iceberg_commit_offset_lag(int64_t new_lag) final {
_iceberg_commit_offset_lag = new_lag;
}

void clear_metrics() final;

private:
Expand All @@ -117,6 +126,7 @@ class replicated_partition_probe : public partition_probe::impl {
uint64_t _bytes_fetched_from_follower{0};
uint64_t _schema_id_validation_records_failed{0};
int64_t _iceberg_translation_offset_lag{metric_default_initialized_state};
int64_t _iceberg_commit_offset_lag{metric_default_initialized_state};
metrics::internal_metric_groups _metrics;
metrics::public_metric_groups _public_metrics;
};
Expand Down
14 changes: 14 additions & 0 deletions src/v/datalake/translation/partition_translator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,19 @@ void partition_translator::update_translation_lag(
_partition->probe().update_iceberg_translation_offset_lag(offset_lag);
}

void partition_translator::update_commit_lag(
std::optional<kafka::offset> max_committed_offset) const {
auto max_translatable_offset = max_offset_for_translation();
if (
!max_translatable_offset
|| max_translatable_offset.value() < kafka::offset{0}) {
return;
}
auto offset_lag = max_translatable_offset.value()
- max_committed_offset.value_or(kafka::offset{-1});
_partition->probe().update_iceberg_commit_offset_lag(offset_lag);
}

ss::future<partition_translator::checkpoint_result>
partition_translator::checkpoint_translated_data(
retry_chain_node& rcn,
Expand Down Expand Up @@ -429,6 +442,7 @@ partition_translator::reconcile_with_coordinator() {
vlog(_logger.warn, "reconciliation failed, response: {}", resp);
co_return std::nullopt;
}
update_commit_lag(resp.last_iceberg_committed_offset);
// No file entry signifies the translation was just enabled on the
// topic. In such a case we start translation from the local start
// of the log. The underlying assumption is that there is a reasonable
Expand Down
2 changes: 2 additions & 0 deletions src/v/datalake/translation/partition_translator.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ class partition_translator {

void
update_translation_lag(kafka::offset max_translated_kafka_offset) const;
void update_commit_lag(
std::optional<kafka::offset> max_committed_kafka_offset) const;

using translation_success = ss::bool_class<struct translation_success>;
ss::future<translation_success> do_translate_once(retry_chain_node& parent);
Expand Down

0 comments on commit 4d4a9f2

Please sign in to comment.