Skip to content

Commit

Permalink
Merge pull request redpanda-data#24074 from IoannisRP/ik-sanction-pin…
Browse files Browse the repository at this point in the history
…-leadership-create-topic

cluster: Sanction redpanda.leaders.preference on invalid license - create topic
  • Loading branch information
IoannisRP authored Nov 13, 2024
2 parents 1cc9493 + 1ddda34 commit 997168d
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 15 deletions.
8 changes: 8 additions & 0 deletions src/v/cluster/topics_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "cluster/topic_recovery_validator.h"
#include "cluster/types.h"
#include "config/configuration.h"
#include "config/leaders_preference.h"
#include "data_migration_types.h"
#include "features/feature_table.h"
#include "fwd.h"
Expand Down Expand Up @@ -83,6 +84,13 @@ std::vector<std::string_view> get_enterprise_features(
if (cfg.is_schema_id_validation_enabled()) {
features.emplace_back("schema ID validation");
}
if (const auto& leaders_pref = cfg.cfg.properties.leaders_preference;
leaders_pref.has_value()
&& config::shard_local_cfg()
.default_leaders_preference.check_restricted(
leaders_pref.value())) {
features.emplace_back("leadership pinning");
}
return features;
}

Expand Down
40 changes: 25 additions & 15 deletions src/v/kafka/server/tests/create_topics_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "config/leaders_preference.h"
#include "container/fragmented_vector.h"
#include "kafka/protocol/create_topics.h"
#include "kafka/protocol/metadata.h"
Expand Down Expand Up @@ -447,27 +448,36 @@ FIXTURE_TEST(unlicensed_rejected, create_topic_fixture) {
pandaproxy::schema_registry::schema_id_validation_mode::compat);

revoke_license();
auto si_props = {
ss::sstring{kafka::topic_property_remote_read},
ss::sstring{kafka::topic_property_remote_write},
ss::sstring{kafka::topic_property_recovery},
ss::sstring{kafka::topic_property_read_replica},
ss::sstring{kafka::topic_property_record_key_schema_id_validation},
ss::sstring{kafka::topic_property_record_key_schema_id_validation_compat},
ss::sstring{kafka::topic_property_record_value_schema_id_validation},
ss::sstring{
kafka::topic_property_record_value_schema_id_validation_compat},
using prop_t = std::map<ss::sstring, ss::sstring>;
const auto with = [](const std::string_view prop, const auto value) {
return std::make_pair(
prop, prop_t{{ss::sstring{prop}, ssx::sformat("{}", value)}});
};
std::vector<std::pair<std::string_view, prop_t>> enterprise_props{
// si_props
with(kafka::topic_property_remote_read, true),
with(kafka::topic_property_remote_write, true),
with(kafka::topic_property_recovery, true),
with(kafka::topic_property_read_replica, true),
// schema id validation
with(kafka::topic_property_record_key_schema_id_validation, true),
with(kafka::topic_property_record_key_schema_id_validation_compat, true),
with(kafka::topic_property_record_value_schema_id_validation, true),
with(
kafka::topic_property_record_value_schema_id_validation_compat, true),
// pin_leadership_props
with(
kafka::topic_property_leaders_preference,
config::leaders_preference{
.type = config::leaders_preference::type_t::racks,
.racks = {model::rack_id{"A"}}})};

auto client = make_kafka_client().get();
client.connect().get();

for (const auto& prop : si_props) {
for (const auto& [name, props] : enterprise_props) {
auto topic = make_topic(
ssx::sformat("topic_{}", prop),
std::nullopt,
std::nullopt,
std::map<ss::sstring, ss::sstring>{{prop, "true"}});
ssx::sformat("topic_{}", name), std::nullopt, std::nullopt, props);

auto resp
= client.dispatch(make_req({topic}), kafka::api_version(5)).get();
Expand Down

0 comments on commit 997168d

Please sign in to comment.