Skip to content

Commit

Permalink
Merge pull request redpanda-data#24058 from BenPope/core-8000-tiered-…
Browse files Browse the repository at this point in the history
…storage-sanctioning

[CORE-8000] tiered storage sanctioning for alter topics
  • Loading branch information
BenPope authored Nov 11, 2024
2 parents c165f59 + 28f4897 commit 497b802
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 66 deletions.
43 changes: 24 additions & 19 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -943,24 +943,9 @@ void incremental_update(
}
}

ss::future<std::error_code>
topic_table::apply(update_topic_properties_cmd cmd, model::offset o) {
_last_applied_revision_id = model::revision_id(o);
auto tp = _topics.find(cmd.key);
if (tp == _topics.end()) {
co_return make_error_code(errc::topic_not_exists);
}
const auto migration_state = _migrated_resources.get_topic_state(cmd.key);
if (
migration_state
!= data_migrations::migrated_resource_state::non_restricted) {
co_return errc::resource_is_being_migrated;
}
auto updated_properties = tp->second.get_configuration().properties;
topic_properties topic_table::update_topic_properties(
topic_properties updated_properties, update_topic_properties_cmd cmd) {
auto& overrides = cmd.value;
/**
* Update topic properties
*/
incremental_update(
updated_properties.cleanup_policy_bitflags,
overrides.cleanup_policy_bitflags);
Expand Down Expand Up @@ -1051,6 +1036,26 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) {
overrides.iceberg_translation_interval_ms);
incremental_update(
updated_properties.delete_retention_ms, overrides.delete_retention_ms);
return updated_properties;
}

ss::future<std::error_code>
topic_table::apply(update_topic_properties_cmd cmd, model::offset o) {
_last_applied_revision_id = model::revision_id(o);
auto key = cmd.key;
auto tp = _topics.find(key);
if (tp == _topics.end()) {
co_return make_error_code(errc::topic_not_exists);
}
const auto migration_state = _migrated_resources.get_topic_state(key);
if (
migration_state
!= data_migrations::migrated_resource_state::non_restricted) {
co_return errc::resource_is_being_migrated;
}

auto updated_properties = update_topic_properties(
tp->second.get_configuration().properties, std::move(cmd));

auto& properties = tp->second.get_configuration().properties;
// no configuration change, no need to generate delta
Expand All @@ -1073,14 +1078,14 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) {

_pending_topic_deltas.emplace_back(
tp->second.get_revision(),
cmd.key,
key,
model::revision_id{o},
topic_table_topic_delta_type::properties_updated);

const auto& assignments = tp->second.get_assignments();
for (auto& [_, p_as] : assignments) {
_pending_ntp_deltas.emplace_back(
model::ntp(cmd.key.ns, cmd.key.tp, p_as.id),
model::ntp(key.ns, key.tp, p_as.id),
p_as.group,
model::revision_id(o),
topic_table_ntp_delta_type::properties_updated);
Expand Down
6 changes: 6 additions & 0 deletions src/v/cluster/topic_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,12 @@ class topic_table {
_partitions_to_force_reconfigure.end());
}

/**
* Return the result of applying the update to the topic properties.
*/
static topic_properties update_topic_properties(
topic_properties updated_properties, update_topic_properties_cmd cmd);

private:
friend topic_table_probe;

Expand Down
39 changes: 39 additions & 0 deletions src/v/cluster/topics_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "rpc/types.h"
#include "ssx/future-util.h"
#include "topic_configuration.h"
#include "topic_properties.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/future.hh>
Expand Down Expand Up @@ -84,6 +85,31 @@ std::vector<std::string_view> get_enterprise_features(
return features;
}

std::vector<std::string_view> get_enterprise_features(
const cluster::metadata_cache& metadata,
const cluster::topic_properties_update& update) {
auto tp_metadata = metadata.get_topic_metadata_ref(update.tp_ns);
if (!tp_metadata.has_value()) {
// Topic does not exist, nothing to validate
return {};
}

const auto& properties = tp_metadata->get().get_configuration().properties;
auto updated_properties = cluster::topic_table::update_topic_properties(
properties, {update.tp_ns, update.properties});

std::vector<std::string_view> features;
const auto si_disabled = model::shadow_indexing_mode::disabled;
if (
(properties.shadow_indexing.value_or(si_disabled)
< updated_properties.shadow_indexing.value_or(si_disabled))
|| (properties.remote_delete < updated_properties.remote_delete)) {
features.emplace_back("tiered storage");
}

return features;
}

} // namespace

namespace cluster {
Expand Down Expand Up @@ -273,6 +299,19 @@ ss::future<std::vector<topic_result>> topics_frontend::update_topic_properties(

auto results = co_await ssx::parallel_transform(
std::move(updates), [this, timeout](topic_properties_update update) {
if (
_features.local().should_sanction()
&& is_user_topic(update.tp_ns)) {
if (auto f = get_enterprise_features(_metadata_cache, update);
!f.empty()) {
vlog(
clusterlog.warn,
"An enterprise license is required to enable {}.",
f);
return ss::make_ready_future<topic_result>(
topic_result(update.tp_ns, errc::topic_invalid_config));
}
}
return do_update_topic_properties(std::move(update), timeout);
});

Expand Down
Loading

0 comments on commit 497b802

Please sign in to comment.