Skip to content

Commit

Permalink
Merge pull request #23794 from BenPope/CORE-681-schema_registry_impro…
Browse files Browse the repository at this point in the history
…ve_metrics

schema_registry: Add metrics
  • Loading branch information
BenPope authored Nov 22, 2024
2 parents bcc6f67 + 7a2d9c2 commit d234c3b
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/v/pandaproxy/probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void probe::setup_metrics() {
return _request_metrics.hist().internal_histogram_logform();
})},
{},
{sm::shard_label, operation_label});
{sm::shard_label});
}

void probe::setup_public_metrics() {
Expand Down
138 changes: 130 additions & 8 deletions src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,23 @@

#pragma once

#include "config/configuration.h"
#include "container/fragmented_vector.h"
#include "metrics/metrics.h"
#include "metrics/prometheus_sanitize.h"
#include "pandaproxy/schema_registry/errors.h"
#include "pandaproxy/schema_registry/types.h"

#include <seastar/core/metrics.hh>

#include <absl/algorithm/container.h>
#include <absl/container/btree_map.h>
#include <absl/container/btree_set.h>
#include <absl/container/node_hash_map.h>

#include <optional>
#include <ranges>

namespace pandaproxy::schema_registry {

///\brief A mapping of version and schema id for a subject.
Expand Down Expand Up @@ -58,10 +66,13 @@ class store {
public:
using schema_id_set = absl::btree_set<schema_id>;

explicit store() = default;
explicit store()
: store(is_mutable::no) {}

explicit store(is_mutable mut)
: _mutable(mut) {}
: _mutable(mut) {
setup_metrics();
}

struct insert_result {
schema_version version;
Expand Down Expand Up @@ -543,7 +554,7 @@ class store {
result<bool>
set_mode(seq_marker marker, const subject& sub, mode m, force f) {
BOOST_OUTCOME_TRYX(check_mode_mutability(f));
auto& sub_entry = _subjects[sub];
auto& sub_entry = get_or_create_subject_entry(sub);
sub_entry.written_at.push_back(marker);
return std::exchange(sub_entry.mode, m) != m;
}
Expand Down Expand Up @@ -589,7 +600,7 @@ class store {
seq_marker marker,
const subject& sub,
compatibility_level compatibility) {
auto& sub_entry = _subjects[sub];
auto& sub_entry = get_or_create_subject_entry(sub);
sub_entry.written_at.push_back(marker);
return std::exchange(sub_entry.compatibility, compatibility)
!= compatibility;
Expand Down Expand Up @@ -637,7 +648,7 @@ class store {
bool inserted;
};
insert_subject_result insert_subject(subject sub, schema_id id) {
auto& subject_entry = _subjects[std::move(sub)];
auto& subject_entry = get_or_create_subject_entry(std::move(sub));
subject_entry.deleted = is_deleted::no;
auto& versions = subject_entry.versions;
const auto v_it = std::find_if(
Expand All @@ -661,7 +672,7 @@ class store {
schema_version version,
schema_id id,
is_deleted deleted) {
auto& subject_entry = _subjects[std::move(sub)];
auto& subject_entry = get_or_create_subject_entry(std::move(sub));
auto& versions = subject_entry.versions;
subject_entry.written_at.push_back(marker);

Expand Down Expand Up @@ -705,6 +716,67 @@ class store {
return outcome::success();
}

void setup_metrics() {
namespace sm = ss::metrics;
const auto make_schema_count = [this]() {
return sm::make_gauge(
"schema_count",
[this] { return _schemas.size(); },
sm::description("The number of schemas in the store"));
};
const auto make_subject_count = [this](is_deleted deleted) {
return sm::make_gauge(
"subject_count",
[this, deleted] {
return std::ranges::count_if(
_subjects, [deleted](const auto& entry) {
return entry.second.deleted == deleted;
});
},
sm::description("The number of subjects in the store"),
{sm::label{"deleted"}(deleted)});
};
const auto make_schema_bytes = [this]() {
return sm::make_gauge(
"schema_memory_bytes",
[this] {
return absl::c_accumulate(
_schemas | std::views::transform([](const auto& s) {
return s.second.definition.raw()().size_bytes();
}),
size_t{0});
},
sm::description("The memory usage of schemas in the store"));
};
auto group_name = prometheus_sanitize::metrics_name(
"schema_registry_cache");
const std::vector<sm::label> agg{{sm::shard_label}};

if (!config::shard_local_cfg().disable_metrics()) {
_metrics.add_group(
group_name,
{
make_schema_count(),
make_schema_bytes(),
make_subject_count(is_deleted::no),
make_subject_count(is_deleted::yes),
},
{},
agg);
}

if (!config::shard_local_cfg().disable_public_metrics()) {
_public_metrics.add_group(
group_name,
{
make_schema_count().aggregate(agg),
make_schema_bytes().aggregate(agg),
make_subject_count(is_deleted::no).aggregate(agg),
make_subject_count(is_deleted::yes).aggregate(agg),
});
}
};

private:
struct schema_entry {
explicit schema_entry(canonical_schema_definition definition)
Expand All @@ -713,17 +785,65 @@ class store {
canonical_schema_definition definition;
};

struct subject_entry {
class subject_entry {
public:
explicit subject_entry(const subject& sub) { setup_metrics(sub); }
std::optional<compatibility_level> compatibility;
std::optional<mode> mode;
std::vector<subject_version_entry> versions;
is_deleted deleted{false};

std::vector<seq_marker> written_at;

private:
metrics::internal_metric_groups _metrics;
metrics::public_metric_groups _public_metrics;

void setup_metrics(const subject& sub) {
namespace sm = ss::metrics;
auto group_name = prometheus_sanitize::metrics_name(
"schema_registry_cache");
const auto make_subject_version_count = [this,
&sub](is_deleted deleted) {
return sm::make_gauge(
"subject_version_count",
[this, deleted] {
return std::ranges::count_if(
versions, [deleted](const subject_version_entry& v) {
return v.deleted == deleted;
});
},
sm::description("The number of versions in the subject"),
{
sm::label{"subject"}(sub),
sm::label{"deleted"}(deleted),
});
};
if (!config::shard_local_cfg().disable_metrics()) {
_metrics.add_group(
group_name,
{make_subject_version_count(is_deleted::no),
make_subject_version_count(is_deleted::yes)},
{},
{sm::shard_label});
}
if (!config::shard_local_cfg().disable_public_metrics()) {
_public_metrics.add_group(
group_name,
{make_subject_version_count(is_deleted::no)
.aggregate({sm::shard_label}),
make_subject_version_count(is_deleted::yes)
.aggregate({sm::shard_label})});
}
}
};
using schema_map = absl::btree_map<schema_id, schema_entry>;
using subject_map = absl::node_hash_map<subject, subject_entry>;

subject_entry& get_or_create_subject_entry(subject sub) {
return _subjects.try_emplace(sub, sub).first->second;
}

result<subject_map::iterator>
get_subject_iter(const subject& sub, include_deleted inc_del) {
const store* const_this = this;
Expand Down Expand Up @@ -781,7 +901,9 @@ class store {
subject_map _subjects;
compatibility_level _compatibility{compatibility_level::backward};
mode _mode{mode::read_write};
is_mutable _mutable{is_mutable::no};
is_mutable _mutable;
metrics::internal_metric_groups _metrics;
metrics::public_metric_groups _public_metrics;
};

} // namespace pandaproxy::schema_registry

0 comments on commit d234c3b

Please sign in to comment.