From 778fc476d86ce0e720730bca10b6fe96345ab818 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Wed, 5 Feb 2025 17:10:10 +0000 Subject: [PATCH] datalake: metric for invalid records --- src/v/datalake/BUILD | 3 + src/v/datalake/CMakeLists.txt | 2 + src/v/datalake/datalake_manager.cc | 17 +++- src/v/datalake/datalake_manager.h | 8 ++ src/v/datalake/record_multiplexer.cc | 13 ++- src/v/datalake/record_multiplexer.h | 5 +- src/v/datalake/tests/BUILD | 1 + src/v/datalake/tests/CMakeLists.txt | 1 + .../tests/gtest_record_multiplexer_test.cc | 10 ++- .../tests/record_multiplexer_bench.cc | 10 ++- .../datalake/tests/record_multiplexer_test.cc | 30 +++++++ src/v/datalake/tests/translation_task_test.cc | 48 +++++------ src/v/datalake/translation/BUILD | 21 +++++ src/v/datalake/translation/CMakeLists.txt | 8 ++ .../translation/partition_translator.cc | 6 +- .../translation/partition_translator.h | 5 +- .../datalake/translation/translation_probe.cc | 82 +++++++++++++++++++ .../datalake/translation/translation_probe.h | 68 +++++++++++++++ src/v/datalake/translation_task.cc | 7 +- src/v/datalake/translation_task.h | 5 +- .../tests/datalake/datalake_dlq_test.py | 35 ++++++++ 21 files changed, 343 insertions(+), 42 deletions(-) create mode 100644 src/v/datalake/translation/translation_probe.cc create mode 100644 src/v/datalake/translation/translation_probe.h diff --git a/src/v/datalake/BUILD b/src/v/datalake/BUILD index f1b76c4b52c1d..469587c27b4f4 100644 --- a/src/v/datalake/BUILD +++ b/src/v/datalake/BUILD @@ -190,6 +190,7 @@ redpanda_cc_library( "//src/v/datalake/coordinator:catalog_factory", "//src/v/datalake/coordinator:frontend", "//src/v/datalake/translation:partition_translator", + "//src/v/datalake/translation:translation_probe", "//src/v/features", "//src/v/model", "//src/v/pandaproxy", @@ -238,6 +239,7 @@ redpanda_cc_library( ":writer", "//src/v/base", "//src/v/datalake/coordinator:translated_offset_range", + "//src/v/datalake/translation:translation_probe", "//src/v/iceberg:values_bytes", "//src/v/model", "//src/v/utils:lazy_abort_source", @@ -376,6 +378,7 @@ redpanda_cc_library( ":types", ":writer", "//src/v/container:chunked_hash_map", + "//src/v/datalake/translation:translation_probe", "//src/v/model", "@seastar", ], diff --git a/src/v/datalake/CMakeLists.txt b/src/v/datalake/CMakeLists.txt index eee9db2bf23eb..e8fdaa239d93a 100644 --- a/src/v/datalake/CMakeLists.txt +++ b/src/v/datalake/CMakeLists.txt @@ -18,6 +18,7 @@ v_cc_library( backlog_controller.cc DEPS v::datalake_translation + v::datalake_translation_probe v::datalake_coordinator v::cluster v::raft @@ -60,6 +61,7 @@ v_cc_library( DEPS v::cloud_io v::datalake_common + v::datalake_translation_probe v::pandaproxy_schema_registry v::schema v::storage diff --git a/src/v/datalake/datalake_manager.cc b/src/v/datalake/datalake_manager.cc index 1b04b2a684301..d2d29a623f476 100644 --- a/src/v/datalake/datalake_manager.cc +++ b/src/v/datalake/datalake_manager.cc @@ -144,6 +144,15 @@ double datalake_manager::average_translation_backlog() { return total_lag / translators_with_backlog; } +ss::lw_shared_ptr +datalake_manager::get_or_create_probe(const model::ntp& ntp) { + auto [it, inserted] = _translation_probe_by_ntp.try_emplace(ntp, nullptr); + if (inserted) { + it->second = ss::make_lw_shared(ntp); + } + return it->second; +} + ss::future<> datalake_manager::start() { /* * Ensure that datalake scratch space directory exists. This is run on each @@ -178,6 +187,10 @@ ss::future<> datalake_manager::start() { = _partition_mgr->local().register_unmanage_notification( model::kafka_namespace, [this](model::topic_partition_view tp) { model::ntp ntp{model::kafka_namespace, tp.topic, tp.partition}; + // We remove the probe only when partition is moved out of the + // shard to avoid metrics disappearing during much more common + // leadership changes. + _translation_probe_by_ntp.erase(ntp); stop_translator(ntp); }); // Handle leadership changes @@ -334,6 +347,7 @@ void datalake_manager::start_translator( "instance already exists", partition->ntp(), partition->term()); + auto translator = std::make_unique( partition, _coordinator_frontend, @@ -348,7 +362,8 @@ void datalake_manager::start_translator( _effective_max_translator_buffered_data, &_parallel_translations, invalid_record_action, - _writer_scratch_space); + _writer_scratch_space, + get_or_create_probe(partition->ntp())); _translators.emplace(partition->ntp(), std::move(translator)); } diff --git a/src/v/datalake/datalake_manager.h b/src/v/datalake/datalake_manager.h index c24e40fc5b233..f697c8794035b 100644 --- a/src/v/datalake/datalake_manager.h +++ b/src/v/datalake/datalake_manager.h @@ -20,6 +20,7 @@ #include "datalake/location.h" #include "datalake/record_schema_resolver.h" #include "datalake/translation/partition_translator.h" +#include "datalake/translation/translation_probe.h" #include "features/fwd.h" #include "model/metadata.h" #include "pandaproxy/schema_registry/fwd.h" @@ -91,6 +92,11 @@ class datalake_manager : public ss::peering_sharded_service { model::iceberg_invalid_record_action); void stop_translator(const model::ntp&); double average_translation_backlog(); + + /// \note The probe is created on the first use. + ss::lw_shared_ptr get_or_create_probe(const model::ntp&); + +private: model::node_id _self; ss::sharded* _group_mgr; ss::sharded* _partition_mgr; @@ -109,6 +115,8 @@ class datalake_manager : public ss::peering_sharded_service { std::unique_ptr _type_resolver; std::unique_ptr _schema_cache; std::unique_ptr _backlog_controller; + chunked_hash_map> + _translation_probe_by_ntp; ss::sharded* _as; ss::scheduling_group _sg; ss::gate _gate; diff --git a/src/v/datalake/record_multiplexer.cc b/src/v/datalake/record_multiplexer.cc index 9092aa3a79738..92ce1c480c878 100644 --- a/src/v/datalake/record_multiplexer.cc +++ b/src/v/datalake/record_multiplexer.cc @@ -18,6 +18,7 @@ #include "datalake/record_translator.h" #include "datalake/table_creator.h" #include "datalake/table_id_provider.h" +#include "datalake/translation/translation_probe.h" #include "model/metadata.h" #include "model/record.h" #include "storage/parser_utils.h" @@ -36,6 +37,7 @@ record_multiplexer::record_multiplexer( table_creator& table_creator, model::iceberg_invalid_record_action invalid_record_action, location_provider location_provider, + translation_probe& translation_probe, lazy_abort_source& as) : _log(datalake_log, fmt::format("{}", ntp)) , _ntp(ntp) @@ -47,6 +49,7 @@ record_multiplexer::record_multiplexer( , _table_creator(table_creator) , _invalid_record_action(invalid_record_action) , _location_provider(std::move(location_provider)) + , _translation_probe(translation_probe) , _as(as) {} ss::future @@ -97,6 +100,8 @@ record_multiplexer::operator()(model::record_batch batch) { case type_resolver::errc::bad_input: case type_resolver::errc::translation_error: auto invalid_res = co_await handle_invalid_record( + translation_probe::invalid_record_cause:: + failed_kafka_schema_resolution, offset, record.share_key(), record.share_value(), @@ -128,6 +133,8 @@ record_multiplexer::operator()(model::record_batch batch) { offset, record_data_res.error()); auto invalid_res = co_await handle_invalid_record( + translation_probe::invalid_record_cause:: + failed_data_translation, offset, record.share_key(), record.share_value(), @@ -151,6 +158,8 @@ record_multiplexer::operator()(model::record_batch batch) { switch (e) { case table_creator::errc::incompatible_schema: { auto invalid_res = co_await handle_invalid_record( + translation_probe::invalid_record_cause:: + failed_iceberg_schema_resolution, offset, record.share_key(), record.share_value(), @@ -300,13 +309,15 @@ record_multiplexer::end_of_stream() { ss::future> record_multiplexer::handle_invalid_record( + translation_probe::invalid_record_cause cause, kafka::offset offset, std::optional key, std::optional val, model::timestamp ts, chunked_vector, std::optional>> headers) { - // TODO: add a metric! + _translation_probe.increment_invalid_record(cause); + switch (_invalid_record_action) { case model::iceberg_invalid_record_action::drop: vlog(_log.debug, "Dropping invalid record at offset {}", offset); diff --git a/src/v/datalake/record_multiplexer.h b/src/v/datalake/record_multiplexer.h index d918a57f1369e..167ddffa9670d 100644 --- a/src/v/datalake/record_multiplexer.h +++ b/src/v/datalake/record_multiplexer.h @@ -15,6 +15,7 @@ #include "datalake/location.h" #include "datalake/partitioning_writer.h" #include "datalake/schema_identifier.h" +#include "datalake/translation/translation_probe.h" #include "model/record.h" #include "utils/lazy_abort_source.h" #include "utils/prefix_logger.h" @@ -54,6 +55,7 @@ class record_multiplexer { table_creator&, model::iceberg_invalid_record_action, location_provider, + translation_probe&, lazy_abort_source& as); ss::future operator()(model::record_batch batch); @@ -62,8 +64,8 @@ class record_multiplexer { private: // Handles the given record components of a record that is invalid for the // target table. - // TODO: this just drops the data. Consider a separate table entirely. ss::future> handle_invalid_record( + translation_probe::invalid_record_cause, kafka::offset, std::optional, std::optional, @@ -80,6 +82,7 @@ class record_multiplexer { table_creator& _table_creator; model::iceberg_invalid_record_action _invalid_record_action; location_provider _location_provider; + translation_probe& _translation_probe; lazy_abort_source& _as; chunked_hash_map< record_schema_components, diff --git a/src/v/datalake/tests/BUILD b/src/v/datalake/tests/BUILD index 12e04d0be4c04..81c02251f18d7 100644 --- a/src/v/datalake/tests/BUILD +++ b/src/v/datalake/tests/BUILD @@ -264,6 +264,7 @@ redpanda_cc_gtest( "//src/v/datalake:record_translator", "//src/v/datalake:table_definition", "//src/v/datalake:table_id_provider", + "//src/v/datalake/translation:translation_probe", "//src/v/iceberg:filesystem_catalog", "//src/v/model", "//src/v/pandaproxy", diff --git a/src/v/datalake/tests/CMakeLists.txt b/src/v/datalake/tests/CMakeLists.txt index 1534ce54216ba..dde8fcf4d061e 100644 --- a/src/v/datalake/tests/CMakeLists.txt +++ b/src/v/datalake/tests/CMakeLists.txt @@ -68,6 +68,7 @@ rp_test( LIBRARIES Boost::iostreams v::datalake_test_utils + v::datalake_translation_probe v::gtest_main v::datalake_writer v::cloud_io_utils diff --git a/src/v/datalake/tests/gtest_record_multiplexer_test.cc b/src/v/datalake/tests/gtest_record_multiplexer_test.cc index e855988709e77..3692d57d3beed 100644 --- a/src/v/datalake/tests/gtest_record_multiplexer_test.cc +++ b/src/v/datalake/tests/gtest_record_multiplexer_test.cc @@ -19,6 +19,7 @@ #include "datalake/tests/record_generator.h" #include "datalake/tests/test_data_writer.h" #include "datalake/tests/test_utils.h" +#include "datalake/translation/translation_probe.h" #include "iceberg/filesystem_catalog.h" #include "model/fundamental.h" #include "model/metadata.h" @@ -49,6 +50,7 @@ TEST(DatalakeMultiplexerTest, TestMultiplexer) { int start_offset = 1005; auto writer_factory = std::make_unique( false); + translation_probe probe(ntp); datalake::record_multiplexer multiplexer( ntp, rev, @@ -61,6 +63,7 @@ TEST(DatalakeMultiplexerTest, TestMultiplexer) { location_provider( cloud_io::s3_compat_provider{"s3"}, cloud_storage_clients::bucket_name{"bucket"}), + probe, as); model::test::record_batch_spec batch_spec; @@ -95,6 +98,7 @@ TEST(DatalakeMultiplexerTest, TestMultiplexerWriteError) { int batch_count = 10; auto writer_factory = std::make_unique( true); + translation_probe probe(ntp); datalake::record_multiplexer multiplexer( ntp, rev, @@ -107,6 +111,7 @@ TEST(DatalakeMultiplexerTest, TestMultiplexerWriteError) { location_provider( cloud_io::s3_compat_provider{"s3"}, cloud_storage_clients::bucket_name{"bucket"}), + probe, as); model::test::record_batch_spec batch_spec; @@ -140,7 +145,7 @@ TEST(DatalakeMultiplexerTest, WritesDataFiles) { datalake::local_path(tmp_dir.get_path()), "data", ss::make_shared()); - + translation_probe probe(ntp); datalake::record_multiplexer multiplexer( ntp, rev, @@ -153,6 +158,7 @@ TEST(DatalakeMultiplexerTest, WritesDataFiles) { location_provider( cloud_io::s3_compat_provider{"s3"}, cloud_storage_clients::bucket_name{"bucket"}), + probe, as); model::test::record_batch_spec batch_spec; @@ -264,6 +270,7 @@ TEST_F(RecordMultiplexerParquetTest, TestSimple) { datalake::local_path(tmp_dir.get_path()), "data", ss::make_shared()); + translation_probe probe(ntp); record_multiplexer mux( ntp, rev, @@ -274,6 +281,7 @@ TEST_F(RecordMultiplexerParquetTest, TestSimple) { t_creator, model::iceberg_invalid_record_action::dlq_table, location_provider(scoped_remote->remote.local().provider(), bucket_name), + probe, as); auto res = reader.consume(std::move(mux), model::no_timeout).get(); ASSERT_FALSE(res.has_error()) << res.error(); diff --git a/src/v/datalake/tests/record_multiplexer_bench.cc b/src/v/datalake/tests/record_multiplexer_bench.cc index 1678429f485cb..79721c500c40f 100644 --- a/src/v/datalake/tests/record_multiplexer_bench.cc +++ b/src/v/datalake/tests/record_multiplexer_bench.cc @@ -312,6 +312,10 @@ class record_multiplexer_bench_fixture } private: + const model::ntp ntp{ + model::ns{"rp"}, model::topic{"t"}, model::partition_id{0}}; + const model::revision_id topic_rev{123}; + std::unordered_set _added_names; datalake::chunked_schema_cache _schema_cache; datalake::catalog_schema_manager _schema_mgr; @@ -319,13 +323,10 @@ class record_multiplexer_bench_fixture datalake::tests::record_generator _record_gen; datalake::default_translator _translator; datalake::direct_table_creator _table_creator; + datalake::translation_probe _translation_probe{ntp}; chunked_vector _batch_data; lazy_abort_source _as; - const model::ntp ntp{ - model::ns{"rp"}, model::topic{"t"}, model::partition_id{0}}; - const model::revision_id topic_rev{123}; - datalake::record_multiplexer create_mux() { return datalake::record_multiplexer( ntp, @@ -338,6 +339,7 @@ class record_multiplexer_bench_fixture model::iceberg_invalid_record_action::dlq_table, datalake::location_provider( scoped_remote->remote.local().provider(), bucket_name), + _translation_probe, _as); } diff --git a/src/v/datalake/tests/record_multiplexer_test.cc b/src/v/datalake/tests/record_multiplexer_test.cc index 8bcf4ab98d46d..e6e86f4649099 100644 --- a/src/v/datalake/tests/record_multiplexer_test.cc +++ b/src/v/datalake/tests/record_multiplexer_test.cc @@ -17,6 +17,7 @@ #include "datalake/tests/record_generator.h" #include "datalake/tests/test_data_writer.h" #include "datalake/tests/test_utils.h" +#include "datalake/translation/translation_probe.h" #include "iceberg/filesystem_catalog.h" #include "model/fundamental.h" #include "model/record_batch_reader.h" @@ -144,6 +145,7 @@ class RecordMultiplexerTestBase model::iceberg_invalid_record_action::dlq_table, location_provider( scoped_remote->remote.local().provider(), bucket_name), + *get_or_create_probe(ntp), as); auto res = reader.consume(std::move(mux), model::no_timeout).get(); if (expect_error) { @@ -191,9 +193,19 @@ class RecordMultiplexerTestBase } } + ss::lw_shared_ptr + get_or_create_probe(const model::ntp& ntp) { + auto [it, inserted] = probes.try_emplace(ntp, nullptr); + if (inserted) { + it->second = ss::make_lw_shared(ntp); + } + return it->second; + } + catalog_schema_manager schema_mgr; record_schema_resolver type_resolver; direct_table_creator t_creator; + std::map> probes; lazy_abort_source as; static constexpr records_param default_param = { @@ -370,6 +382,12 @@ TEST_F(RecordMultiplexerTest, TestMissingSchema) { assert_dlq_table(ntp.tp.topic, true); EXPECT_EQ(res.value().dlq_files.size(), default_param.hrs); + + EXPECT_EQ( + get_or_create_probe(ntp)->counter_ref( + translation_probe::invalid_record_cause:: + failed_kafka_schema_resolution), + default_param.num_records()); } TEST_F(RecordMultiplexerTest, TestBadData) { @@ -398,6 +416,11 @@ TEST_F(RecordMultiplexerTest, TestBadData) { assert_dlq_table(ntp.tp.topic, true); EXPECT_EQ(res.value().dlq_files.size(), default_param.hrs); + + EXPECT_EQ( + get_or_create_probe(ntp)->counter_ref( + translation_probe::invalid_record_cause::failed_data_translation), + default_param.num_records()); } TEST_F(RecordMultiplexerTest, TestBadSchemaChange) { @@ -450,4 +473,11 @@ TEST_F(RecordMultiplexerTest, TestBadSchemaChange) { // The schema for the main table should not have changed. schema = get_current_schema(); EXPECT_EQ(schema->highest_field_id(), 10); + + // Metrics updated. + EXPECT_EQ( + get_or_create_probe(ntp)->counter_ref( + translation_probe::invalid_record_cause:: + failed_iceberg_schema_resolution), + default_param.num_records()); } diff --git a/src/v/datalake/tests/translation_task_test.cc b/src/v/datalake/tests/translation_task_test.cc index 35bbae14d2667..332491f16a61c 100644 --- a/src/v/datalake/tests/translation_task_test.cc +++ b/src/v/datalake/tests/translation_task_test.cc @@ -19,6 +19,7 @@ #include "datalake/serde_parquet_writer.h" #include "datalake/table_creator.h" #include "datalake/tests/test_utils.h" +#include "datalake/translation/translation_probe.h" #include "datalake/translation_task.h" #include "iceberg/uri.h" #include "model/record_batch_reader.h" @@ -55,7 +56,8 @@ class TranslateTaskTest .to_uri(bucket_name, "test"))) , t_creator( std::make_unique(*schema_resolver, *schema_mgr)) - , location_provider(sr->remote.local().provider(), bucket_name) { + , location_provider(sr->remote.local().provider(), bucket_name) + , probe(ntp) { set_expectations_and_listen({}); } @@ -133,6 +135,18 @@ class TranslateTaskTest co_return ret; } + translation_task create_task() { + return translation_task( + cloud_io, + *schema_mgr, + *schema_resolver, + *translator, + *t_creator, + model::iceberg_invalid_record_action::dlq_table, + location_provider, + probe); + } + ss::abort_source as; temporary_dir tmp_dir; retry_chain_node test_rcn; @@ -140,6 +154,7 @@ class TranslateTaskTest std::unique_ptr schema_mgr; std::unique_ptr t_creator; datalake::location_provider location_provider; + translation_probe probe; }; struct deleter { @@ -171,16 +186,7 @@ struct deleter { }; TEST_F(TranslateTaskTest, TestHappyPathTranslation) { - datalake::translation_task task( - cloud_io, - *schema_mgr, - *schema_resolver, - *translator, - *t_creator, - model::iceberg_invalid_record_action::dlq_table, - location_provider); - - auto result = task + auto result = create_task() .translate( ntp, rev, @@ -207,19 +213,11 @@ TEST_F(TranslateTaskTest, TestHappyPathTranslation) { } TEST_F(TranslateTaskTest, TestDataFileMissing) { - datalake::translation_task task( - cloud_io, - *schema_mgr, - *schema_resolver, - *translator, - *t_creator, - model::iceberg_invalid_record_action::dlq_table, - location_provider); // create deleting task to cause local io error deleter del(tmp_dir.get_path().string()); del.start(); auto stop_deleter = ss::defer([&del] { del.stop().get(); }); - auto result = task + auto result = create_task() .translate( ntp, rev, @@ -236,14 +234,6 @@ TEST_F(TranslateTaskTest, TestDataFileMissing) { } TEST_F(TranslateTaskTest, TestUploadError) { - datalake::translation_task task( - cloud_io, - *schema_mgr, - *schema_resolver, - *translator, - *t_creator, - model::iceberg_invalid_record_action::dlq_table, - location_provider); // fail all PUT requests fail_request_if( [](const http_test_utils::request_info& req) -> bool { @@ -253,7 +243,7 @@ TEST_F(TranslateTaskTest, TestUploadError) { .body = "failed!", .status = ss::http::reply::status_type::internal_server_error}); - auto result = task + auto result = create_task() .translate( ntp, model::revision_id{123}, diff --git a/src/v/datalake/translation/BUILD b/src/v/datalake/translation/BUILD index eaa1dbf440bd0..8bc310d514063 100644 --- a/src/v/datalake/translation/BUILD +++ b/src/v/datalake/translation/BUILD @@ -93,6 +93,7 @@ redpanda_cc_library( deps = [ ":model", ":stm", + ":translation_probe", ":utils", "//src/v/base", "//src/v/cluster", @@ -125,3 +126,23 @@ redpanda_cc_library( "@seastar", ], ) + +redpanda_cc_library( + name = "translation_probe", + srcs = [ + "translation_probe.cc", + ], + hdrs = [ + "translation_probe.h", + ], + include_prefix = "datalake/translation", + visibility = [ + "//src/v/datalake:__subpackages__", + ], + deps = [ + "//src/v/config", + "//src/v/metrics", + "//src/v/model", + "@seastar", + ], +) diff --git a/src/v/datalake/translation/CMakeLists.txt b/src/v/datalake/translation/CMakeLists.txt index ac1359eb53d19..298489d04dde8 100644 --- a/src/v/datalake/translation/CMakeLists.txt +++ b/src/v/datalake/translation/CMakeLists.txt @@ -16,4 +16,12 @@ v_cc_library( Seastar::seastar ) +v_cc_library( + NAME datalake_translation_probe + SRCS + translation_probe.cc + DEPS + v::metrics +) + add_subdirectory(tests) diff --git a/src/v/datalake/translation/partition_translator.cc b/src/v/datalake/translation/partition_translator.cc index 25952d34fbc06..93385afe4d0f0 100644 --- a/src/v/datalake/translation/partition_translator.cc +++ b/src/v/datalake/translation/partition_translator.cc @@ -23,6 +23,7 @@ #include "datalake/serde_parquet_writer.h" #include "datalake/table_creator.h" #include "datalake/translation/state_machine.h" +#include "datalake/translation/translation_probe.h" #include "datalake/translation_task.h" #include "kafka/utils/txn_reader.h" #include "model/fundamental.h" @@ -170,7 +171,8 @@ partition_translator::partition_translator( size_t reader_max_bytes, std::unique_ptr* parallel_translations, model::iceberg_invalid_record_action invalid_record_action, - std::filesystem::path writer_scratch_space) + std::filesystem::path writer_scratch_space, + ss::lw_shared_ptr probe) : _term(partition->raft()->term()) , _partition(std::move(partition)) , _stm(_partition->raft() @@ -194,6 +196,7 @@ partition_translator::partition_translator( , _parallel_translations(parallel_translations) , _invalid_record_action(invalid_record_action) , _writer_scratch_space(writer_scratch_space) + , _probe(std::move(probe)) , _logger(prefix_logger{ datalake_log, fmt::format("{}-term-{}", _partition->ntp(), _term)}) { vassert(_stm, "No translation stm found for {}", _partition->ntp()); @@ -293,6 +296,7 @@ partition_translator::do_translation_for_range( *_table_creator, _invalid_record_action, _location_provider, + *_probe, }; const auto& ntp = _partition->ntp(); auto remote_path_prefix = remote_path{ diff --git a/src/v/datalake/translation/partition_translator.h b/src/v/datalake/translation/partition_translator.h index 73f9b59b6f1b6..5d7c9b6d031da 100644 --- a/src/v/datalake/translation/partition_translator.h +++ b/src/v/datalake/translation/partition_translator.h @@ -17,6 +17,7 @@ #include "datalake/errors.h" #include "datalake/fwd.h" #include "datalake/location.h" +#include "datalake/translation/translation_probe.h" #include "features/fwd.h" #include "kafka/data/partition_proxy.h" #include "model/metadata.h" @@ -83,7 +84,8 @@ class partition_translator { size_t reader_max_bytes, std::unique_ptr* parallel_translations, model::iceberg_invalid_record_action invalid_record_action, - std::filesystem::path writer_scratch_space); + std::filesystem::path writer_scratch_space, + ss::lw_shared_ptr probe); ~partition_translator(); void start_translation_in_background(ss::scheduling_group); @@ -163,6 +165,7 @@ class partition_translator { std::unique_ptr* _parallel_translations; model::iceberg_invalid_record_action _invalid_record_action; std::filesystem::path _writer_scratch_space; + ss::lw_shared_ptr _probe; ss::gate _gate; ss::abort_source _as; prefix_logger _logger; diff --git a/src/v/datalake/translation/translation_probe.cc b/src/v/datalake/translation/translation_probe.cc new file mode 100644 index 0000000000000..dfaffb4266a96 --- /dev/null +++ b/src/v/datalake/translation/translation_probe.cc @@ -0,0 +1,82 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "datalake/translation/translation_probe.h" + +#include "config/configuration.h" +#include "metrics/metrics.h" +#include "metrics/prometheus_sanitize.h" + +namespace datalake { + +namespace { +static const auto group_name = prometheus_sanitize::metrics_name( + "iceberg:translation"); +static const auto namespace_label = metrics::make_namespaced_label("namespace"); +static const auto topic_label = metrics::make_namespaced_label("topic"); +static const auto partition_label = metrics::make_namespaced_label("partition"); +static const auto cause_label = metrics::make_namespaced_label("cause"); +}; // namespace + +translation_probe::translation_probe(model::ntp ntp) + : _ntp(std::move(ntp)) { + if (!config::shard_local_cfg().disable_public_metrics()) { + _public_metrics.emplace(); + register_invalid_record_metric(); + } +} + +void translation_probe::register_invalid_record_metric() { + namespace sm = ss::metrics; + + for (auto cause : + {invalid_record_cause::failed_kafka_schema_resolution, + invalid_record_cause::failed_data_translation, + invalid_record_cause::failed_iceberg_schema_resolution}) { + std::vector labels{ + namespace_label(_ntp.ns()), + topic_label(_ntp.tp.topic()), + partition_label(_ntp.tp.partition()), + cause_label( + prometheus_sanitize::metrics_name(fmt::format("{}", cause))), + }; + + _public_metrics->add_group( + group_name, + { + sm::make_counter( + "invalid_records", + counter_ref(cause), + sm::description( + "Number of invalid records handled by translation"), + labels) + .aggregate({ + sm::shard_label, + partition_label, + }), + }); + } +} + +std::ostream& +operator<<(std::ostream& os, translation_probe::invalid_record_cause cause) { + using enum translation_probe::invalid_record_cause; + + switch (cause) { + case failed_kafka_schema_resolution: + return os << "failed_kafka_schema_resolution"; + case failed_data_translation: + return os << "failed_data_translation"; + case failed_iceberg_schema_resolution: + return os << "failed_iceberg_schema_resolution"; + } +} + +}; // namespace datalake diff --git a/src/v/datalake/translation/translation_probe.h b/src/v/datalake/translation/translation_probe.h new file mode 100644 index 0000000000000..acb5e4f5a1820 --- /dev/null +++ b/src/v/datalake/translation/translation_probe.h @@ -0,0 +1,68 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once + +#include "metrics/metrics.h" +#include "model/fundamental.h" + +namespace datalake { + +class translation_probe final { +public: + // Note: Do not forget to register new causes in + // register_invalid_record_metric. + enum class invalid_record_cause { + /// Failed to resolve the Kafka schema for the record. This covers the + /// cases where magic byte is missing from the record or it references a + /// non-existent schema in the registry. + failed_kafka_schema_resolution, + /// Failed to translate the record data according to the schema fetches + /// from schema registry to an equivalent Iceberg schema. + failed_data_translation, + /// Failed to ensure the table schema matches the inferred Iceberg + /// schema. + failed_iceberg_schema_resolution, + }; + +public: + explicit translation_probe(model::ntp ntp); + +public: + void increment_invalid_record(invalid_record_cause cause) { + counter_ref(cause)++; + } + + constexpr size_t& counter_ref(invalid_record_cause cause) { + switch (cause) { + case invalid_record_cause::failed_kafka_schema_resolution: + return _num_failed_kafka_schema_resolution; + case invalid_record_cause::failed_data_translation: + return _num_failed_data_translation; + case invalid_record_cause::failed_iceberg_schema_resolution: + return _num_failed_iceberg_schema_resolution; + } + } + +private: + void register_invalid_record_metric(); + +private: + model::ntp _ntp; + std::optional _public_metrics; + + size_t _num_failed_kafka_schema_resolution = 0; + size_t _num_failed_data_translation = 0; + size_t _num_failed_iceberg_schema_resolution = 0; +}; + +std::ostream& +operator<<(std::ostream& os, translation_probe::invalid_record_cause cause); + +}; // namespace datalake diff --git a/src/v/datalake/translation_task.cc b/src/v/datalake/translation_task.cc index 3585dce40d34c..6fb647ec91890 100644 --- a/src/v/datalake/translation_task.cc +++ b/src/v/datalake/translation_task.cc @@ -178,14 +178,16 @@ translation_task::translation_task( record_translator& record_translator, table_creator& table_creator, model::iceberg_invalid_record_action invalid_record_action, - location_provider location_provider) + location_provider location_provider, + translation_probe& probe) : _cloud_io(&cloud_io) , _schema_mgr(&schema_mgr) , _type_resolver(&type_resolver) , _record_translator(&record_translator) , _table_creator(&table_creator) , _invalid_record_action(invalid_record_action) - , _location_provider(std::move(location_provider)) {} + , _location_provider(std::move(location_provider)) + , _translation_probe(&probe) {} ss::future< checked> @@ -208,6 +210,7 @@ translation_task::translate( *_table_creator, _invalid_record_action, _location_provider, + *_translation_probe, lazy_as); // Write local files auto mux_result = co_await std::move(reader).consume( diff --git a/src/v/datalake/translation_task.h b/src/v/datalake/translation_task.h index 5312b0e40a8ab..9f0e82f9d5029 100644 --- a/src/v/datalake/translation_task.h +++ b/src/v/datalake/translation_task.h @@ -15,6 +15,7 @@ #include "datalake/data_writer_interface.h" #include "datalake/fwd.h" #include "datalake/location.h" +#include "datalake/translation/translation_probe.h" #include "model/metadata.h" #include "model/record_batch_reader.h" #include "utils/lazy_abort_source.h" @@ -34,7 +35,8 @@ class translation_task { record_translator& record_translator, table_creator&, model::iceberg_invalid_record_action, - location_provider); + location_provider, + translation_probe&); enum class errc { file_io_error, cloud_io_error, @@ -72,5 +74,6 @@ class translation_task { table_creator* _table_creator; model::iceberg_invalid_record_action _invalid_record_action; location_provider _location_provider; + translation_probe* _translation_probe; }; } // namespace datalake diff --git a/tests/rptest/tests/datalake/datalake_dlq_test.py b/tests/rptest/tests/datalake/datalake_dlq_test.py index c262794910030..98944db269cda 100644 --- a/tests/rptest/tests/datalake/datalake_dlq_test.py +++ b/tests/rptest/tests/datalake/datalake_dlq_test.py @@ -18,6 +18,7 @@ from rptest.clients.types import TopicSpec from rptest.services.cluster import cluster from rptest.services.redpanda import ( + MetricsEndpoint, PandaproxyConfig, SchemaRegistryConfig, SISettings, @@ -40,6 +41,15 @@ def __str__(self): return self.value +class IcebergInvalidRecordCause(str, Enum): + FAILED_KAFKA_SCHEMA_RESOLUTION = "failed_kafka_schema_resolution" + FAILED_DATA_TRANSLATION = "failed_data_translation" + FAILED_ICEBERG_SCHEMA_RESOLUTION = "failed_iceberg_schema_resolution" + + def __str__(self): + return self.value + + class DatalakeDLQPropertiesTest(RedpandaTest): def __init__(self, test_context): super(DatalakeDLQPropertiesTest, @@ -346,6 +356,11 @@ def test_dlq_table_for_mixed_records(self, cloud_storage_type, 5, table_override=f"{self.topic_name}~dlq") + invalid_schema_res = self._invalid_records_metric_sum( + self.topic_name, + IcebergInvalidRecordCause.FAILED_KAFKA_SCHEMA_RESOLUTION) + assert invalid_schema_res >= num_iter * num_invalid_per_iter, f"Expected {num_iter * num_invalid_per_iter} invalid records due to failed schema resolution but got {invalid_schema_res}" + @cluster(num_nodes=4) @matrix(cloud_storage_type=supported_storage_types(), query_engine=[QueryEngineType.SPARK], @@ -487,3 +502,23 @@ def test_invalid_record_action_runtime_change(self, cloud_storage_type, dlq_count = dl.query_engine(query_engine).count_table( "redpanda", f"{self.topic_name}~dlq") assert dlq_count == num_iter * num_invalid_per_iter, f"Didn't expect additional records in DLQ table: Expected {num_iter * num_invalid_per_iter} but got {dlq_count}" + + def _invalid_records_metric_sum(self, topic: str, + cause: IcebergInvalidRecordCause) -> int: + metric_name = 'redpanda_iceberg_translation_invalid_records_total' + + samples = self.redpanda.metrics_sample(metric_name, + self.redpanda.nodes, + MetricsEndpoint.PUBLIC_METRICS) + if not samples: + return 0 + + return sum([ + s.value for s in samples.label_filter({ + 'redpanda_topic': + topic, + **({ + 'cause': str(cause) + } if cause is not None else {}) + }).samples + ])