Skip to content

Commit

Permalink
Merge pull request redpanda-data#25062 from nvartolomei/nv/iceberg-in…
Browse files Browse the repository at this point in the history
…valid-record-action-metrics-2

datalake: metric for invalid records
  • Loading branch information
nvartolomei authored Feb 18, 2025
2 parents 86741ac + 778fc47 commit 81c91ed
Show file tree
Hide file tree
Showing 24 changed files with 361 additions and 52 deletions.
3 changes: 3 additions & 0 deletions src/v/datalake/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ redpanda_cc_library(
"//src/v/datalake/coordinator:catalog_factory",
"//src/v/datalake/coordinator:frontend",
"//src/v/datalake/translation:partition_translator",
"//src/v/datalake/translation:translation_probe",
"//src/v/features",
"//src/v/model",
"//src/v/pandaproxy",
Expand Down Expand Up @@ -238,6 +239,7 @@ redpanda_cc_library(
":writer",
"//src/v/base",
"//src/v/datalake/coordinator:translated_offset_range",
"//src/v/datalake/translation:translation_probe",
"//src/v/iceberg:values_bytes",
"//src/v/model",
"//src/v/utils:lazy_abort_source",
Expand Down Expand Up @@ -376,6 +378,7 @@ redpanda_cc_library(
":types",
":writer",
"//src/v/container:chunked_hash_map",
"//src/v/datalake/translation:translation_probe",
"//src/v/model",
"@seastar",
],
Expand Down
2 changes: 2 additions & 0 deletions src/v/datalake/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ v_cc_library(
backlog_controller.cc
DEPS
v::datalake_translation
v::datalake_translation_probe
v::datalake_coordinator
v::cluster
v::raft
Expand Down Expand Up @@ -60,6 +61,7 @@ v_cc_library(
DEPS
v::cloud_io
v::datalake_common
v::datalake_translation_probe
v::pandaproxy_schema_registry
v::schema
v::storage
Expand Down
17 changes: 16 additions & 1 deletion src/v/datalake/datalake_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ double datalake_manager::average_translation_backlog() {
return total_lag / translators_with_backlog;
}

ss::lw_shared_ptr<class translation_probe>
datalake_manager::get_or_create_probe(const model::ntp& ntp) {
auto [it, inserted] = _translation_probe_by_ntp.try_emplace(ntp, nullptr);
if (inserted) {
it->second = ss::make_lw_shared<class translation_probe>(ntp);
}
return it->second;
}

ss::future<> datalake_manager::start() {
/*
* Ensure that datalake scratch space directory exists. This is run on each
Expand Down Expand Up @@ -178,6 +187,10 @@ ss::future<> datalake_manager::start() {
= _partition_mgr->local().register_unmanage_notification(
model::kafka_namespace, [this](model::topic_partition_view tp) {
model::ntp ntp{model::kafka_namespace, tp.topic, tp.partition};
// We remove the probe only when partition is moved out of the
// shard to avoid metrics disappearing during much more common
// leadership changes.
_translation_probe_by_ntp.erase(ntp);
stop_translator(ntp);
});
// Handle leadership changes
Expand Down Expand Up @@ -334,6 +347,7 @@ void datalake_manager::start_translator(
"instance already exists",
partition->ntp(),
partition->term());

auto translator = std::make_unique<translation::partition_translator>(
partition,
_coordinator_frontend,
Expand All @@ -348,7 +362,8 @@ void datalake_manager::start_translator(
_effective_max_translator_buffered_data,
&_parallel_translations,
invalid_record_action,
_writer_scratch_space);
_writer_scratch_space,
get_or_create_probe(partition->ntp()));
_translators.emplace(partition->ntp(), std::move(translator));
}

Expand Down
8 changes: 8 additions & 0 deletions src/v/datalake/datalake_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "datalake/location.h"
#include "datalake/record_schema_resolver.h"
#include "datalake/translation/partition_translator.h"
#include "datalake/translation/translation_probe.h"
#include "features/fwd.h"
#include "model/metadata.h"
#include "pandaproxy/schema_registry/fwd.h"
Expand Down Expand Up @@ -91,6 +92,11 @@ class datalake_manager : public ss::peering_sharded_service<datalake_manager> {
model::iceberg_invalid_record_action);
void stop_translator(const model::ntp&);
double average_translation_backlog();

/// \note The probe is created on the first use.
ss::lw_shared_ptr<translation_probe> get_or_create_probe(const model::ntp&);

private:
model::node_id _self;
ss::sharded<raft::group_manager>* _group_mgr;
ss::sharded<cluster::partition_manager>* _partition_mgr;
Expand All @@ -109,6 +115,8 @@ class datalake_manager : public ss::peering_sharded_service<datalake_manager> {
std::unique_ptr<datalake::type_resolver> _type_resolver;
std::unique_ptr<datalake::schema_cache> _schema_cache;
std::unique_ptr<backlog_controller> _backlog_controller;
chunked_hash_map<model::ntp, ss::lw_shared_ptr<class translation_probe>>
_translation_probe_by_ntp;
ss::sharded<ss::abort_source>* _as;
ss::scheduling_group _sg;
ss::gate _gate;
Expand Down
13 changes: 12 additions & 1 deletion src/v/datalake/record_multiplexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "datalake/record_translator.h"
#include "datalake/table_creator.h"
#include "datalake/table_id_provider.h"
#include "datalake/translation/translation_probe.h"
#include "model/metadata.h"
#include "model/record.h"
#include "storage/parser_utils.h"
Expand All @@ -36,6 +37,7 @@ record_multiplexer::record_multiplexer(
table_creator& table_creator,
model::iceberg_invalid_record_action invalid_record_action,
location_provider location_provider,
translation_probe& translation_probe,
lazy_abort_source& as)
: _log(datalake_log, fmt::format("{}", ntp))
, _ntp(ntp)
Expand All @@ -47,6 +49,7 @@ record_multiplexer::record_multiplexer(
, _table_creator(table_creator)
, _invalid_record_action(invalid_record_action)
, _location_provider(std::move(location_provider))
, _translation_probe(translation_probe)
, _as(as) {}

ss::future<ss::stop_iteration>
Expand Down Expand Up @@ -97,6 +100,8 @@ record_multiplexer::operator()(model::record_batch batch) {
case type_resolver::errc::bad_input:
case type_resolver::errc::translation_error:
auto invalid_res = co_await handle_invalid_record(
translation_probe::invalid_record_cause::
failed_kafka_schema_resolution,
offset,
record.share_key(),
record.share_value(),
Expand Down Expand Up @@ -128,6 +133,8 @@ record_multiplexer::operator()(model::record_batch batch) {
offset,
record_data_res.error());
auto invalid_res = co_await handle_invalid_record(
translation_probe::invalid_record_cause::
failed_data_translation,
offset,
record.share_key(),
record.share_value(),
Expand All @@ -151,6 +158,8 @@ record_multiplexer::operator()(model::record_batch batch) {
switch (e) {
case table_creator::errc::incompatible_schema: {
auto invalid_res = co_await handle_invalid_record(
translation_probe::invalid_record_cause::
failed_iceberg_schema_resolution,
offset,
record.share_key(),
record.share_value(),
Expand Down Expand Up @@ -300,13 +309,15 @@ record_multiplexer::end_of_stream() {

ss::future<result<std::nullopt_t, writer_error>>
record_multiplexer::handle_invalid_record(
translation_probe::invalid_record_cause cause,
kafka::offset offset,
std::optional<iobuf> key,
std::optional<iobuf> val,
model::timestamp ts,
chunked_vector<std::pair<std::optional<iobuf>, std::optional<iobuf>>>
headers) {
// TODO: add a metric!
_translation_probe.increment_invalid_record(cause);

switch (_invalid_record_action) {
case model::iceberg_invalid_record_action::drop:
vlog(_log.debug, "Dropping invalid record at offset {}", offset);
Expand Down
5 changes: 4 additions & 1 deletion src/v/datalake/record_multiplexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "datalake/location.h"
#include "datalake/partitioning_writer.h"
#include "datalake/schema_identifier.h"
#include "datalake/translation/translation_probe.h"
#include "model/record.h"
#include "utils/lazy_abort_source.h"
#include "utils/prefix_logger.h"
Expand Down Expand Up @@ -54,6 +55,7 @@ class record_multiplexer {
table_creator&,
model::iceberg_invalid_record_action,
location_provider,
translation_probe&,
lazy_abort_source& as);

ss::future<ss::stop_iteration> operator()(model::record_batch batch);
Expand All @@ -62,8 +64,8 @@ class record_multiplexer {
private:
// Handles the given record components of a record that is invalid for the
// target table.
// TODO: this just drops the data. Consider a separate table entirely.
ss::future<result<std::nullopt_t, writer_error>> handle_invalid_record(
translation_probe::invalid_record_cause,
kafka::offset,
std::optional<iobuf>,
std::optional<iobuf>,
Expand All @@ -80,6 +82,7 @@ class record_multiplexer {
table_creator& _table_creator;
model::iceberg_invalid_record_action _invalid_record_action;
location_provider _location_provider;
translation_probe& _translation_probe;
lazy_abort_source& _as;
chunked_hash_map<
record_schema_components,
Expand Down
1 change: 1 addition & 0 deletions src/v/datalake/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ redpanda_cc_gtest(
"//src/v/datalake:record_translator",
"//src/v/datalake:table_definition",
"//src/v/datalake:table_id_provider",
"//src/v/datalake/translation:translation_probe",
"//src/v/iceberg:filesystem_catalog",
"//src/v/model",
"//src/v/pandaproxy",
Expand Down
1 change: 1 addition & 0 deletions src/v/datalake/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ rp_test(
LIBRARIES
Boost::iostreams
v::datalake_test_utils
v::datalake_translation_probe
v::gtest_main
v::datalake_writer
v::cloud_io_utils
Expand Down
10 changes: 9 additions & 1 deletion src/v/datalake/tests/gtest_record_multiplexer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "datalake/tests/record_generator.h"
#include "datalake/tests/test_data_writer.h"
#include "datalake/tests/test_utils.h"
#include "datalake/translation/translation_probe.h"
#include "iceberg/filesystem_catalog.h"
#include "model/fundamental.h"
#include "model/metadata.h"
Expand Down Expand Up @@ -49,6 +50,7 @@ TEST(DatalakeMultiplexerTest, TestMultiplexer) {
int start_offset = 1005;
auto writer_factory = std::make_unique<datalake::test_data_writer_factory>(
false);
translation_probe probe(ntp);
datalake::record_multiplexer multiplexer(
ntp,
rev,
Expand All @@ -61,6 +63,7 @@ TEST(DatalakeMultiplexerTest, TestMultiplexer) {
location_provider(
cloud_io::s3_compat_provider{"s3"},
cloud_storage_clients::bucket_name{"bucket"}),
probe,
as);

model::test::record_batch_spec batch_spec;
Expand Down Expand Up @@ -95,6 +98,7 @@ TEST(DatalakeMultiplexerTest, TestMultiplexerWriteError) {
int batch_count = 10;
auto writer_factory = std::make_unique<datalake::test_data_writer_factory>(
true);
translation_probe probe(ntp);
datalake::record_multiplexer multiplexer(
ntp,
rev,
Expand All @@ -107,6 +111,7 @@ TEST(DatalakeMultiplexerTest, TestMultiplexerWriteError) {
location_provider(
cloud_io::s3_compat_provider{"s3"},
cloud_storage_clients::bucket_name{"bucket"}),
probe,
as);

model::test::record_batch_spec batch_spec;
Expand Down Expand Up @@ -140,7 +145,7 @@ TEST(DatalakeMultiplexerTest, WritesDataFiles) {
datalake::local_path(tmp_dir.get_path()),
"data",
ss::make_shared<datalake::serde_parquet_writer_factory>());

translation_probe probe(ntp);
datalake::record_multiplexer multiplexer(
ntp,
rev,
Expand All @@ -153,6 +158,7 @@ TEST(DatalakeMultiplexerTest, WritesDataFiles) {
location_provider(
cloud_io::s3_compat_provider{"s3"},
cloud_storage_clients::bucket_name{"bucket"}),
probe,
as);

model::test::record_batch_spec batch_spec;
Expand Down Expand Up @@ -264,6 +270,7 @@ TEST_F(RecordMultiplexerParquetTest, TestSimple) {
datalake::local_path(tmp_dir.get_path()),
"data",
ss::make_shared<datalake::serde_parquet_writer_factory>());
translation_probe probe(ntp);
record_multiplexer mux(
ntp,
rev,
Expand All @@ -274,6 +281,7 @@ TEST_F(RecordMultiplexerParquetTest, TestSimple) {
t_creator,
model::iceberg_invalid_record_action::dlq_table,
location_provider(scoped_remote->remote.local().provider(), bucket_name),
probe,
as);
auto res = reader.consume(std::move(mux), model::no_timeout).get();
ASSERT_FALSE(res.has_error()) << res.error();
Expand Down
14 changes: 8 additions & 6 deletions src/v/datalake/tests/record_generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,25 @@

namespace datalake::tests {

ss::future<checked<std::nullopt_t, record_generator::error>>
ss::future<
checked<pandaproxy::schema_registry::schema_id, record_generator::error>>
record_generator::register_avro_schema(
std::string_view name, std::string_view schema) {
using namespace pandaproxy::schema_registry;
auto id = co_await ss::coroutine::as_future(
auto id_fut = co_await ss::coroutine::as_future(
_sr->create_schema(unparsed_schema{
subject{"foo"},
unparsed_schema_definition{schema, schema_type::avro}}));
if (id.failed()) {
if (id_fut.failed()) {
co_return error{fmt::format(
"Error creating schema {}: {}", name, id.get_exception())};
"Error creating schema {}: {}", name, id_fut.get_exception())};
}
auto [_, added] = _id_by_name.emplace(name, id.get());
auto id = id_fut.get();
auto [_, added] = _id_by_name.emplace(name, id);
if (!added) {
co_return error{fmt::format("Failed to add schema {} to map", name)};
}
co_return std::nullopt;
co_return id;
}

ss::future<checked<std::nullopt_t, record_generator::error>>
Expand Down
2 changes: 1 addition & 1 deletion src/v/datalake/tests/record_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class record_generator {
using error = named_type<ss::sstring, struct error_tag>;

// Registers the given schema with the given name.
ss::future<checked<std::nullopt_t, error>>
ss::future<checked<pandaproxy::schema_registry::schema_id, error>>
register_avro_schema(std::string_view name, std::string_view schema);

// Registers the given schema with the given name.
Expand Down
10 changes: 6 additions & 4 deletions src/v/datalake/tests/record_multiplexer_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -312,20 +312,21 @@ class record_multiplexer_bench_fixture
}

private:
const model::ntp ntp{
model::ns{"rp"}, model::topic{"t"}, model::partition_id{0}};
const model::revision_id topic_rev{123};

std::unordered_set<std::string> _added_names;
datalake::chunked_schema_cache _schema_cache;
datalake::catalog_schema_manager _schema_mgr;
datalake::record_schema_resolver _type_resolver;
datalake::tests::record_generator _record_gen;
datalake::default_translator _translator;
datalake::direct_table_creator _table_creator;
datalake::translation_probe _translation_probe{ntp};
chunked_vector<model::record_batch> _batch_data;
lazy_abort_source _as;

const model::ntp ntp{
model::ns{"rp"}, model::topic{"t"}, model::partition_id{0}};
const model::revision_id topic_rev{123};

datalake::record_multiplexer create_mux() {
return datalake::record_multiplexer(
ntp,
Expand All @@ -338,6 +339,7 @@ class record_multiplexer_bench_fixture
model::iceberg_invalid_record_action::dlq_table,
datalake::location_provider(
scoped_remote->remote.local().provider(), bucket_name),
_translation_probe,
_as);
}

Expand Down
Loading

0 comments on commit 81c91ed

Please sign in to comment.