diff --git a/src/v/cloud_topics/BUILD b/src/v/cloud_topics/BUILD index 5372da6e90a10..7b11c8412ccab 100644 --- a/src/v/cloud_topics/BUILD +++ b/src/v/cloud_topics/BUILD @@ -45,13 +45,6 @@ redpanda_cc_library( ], ) -redpanda_cc_library( - name = "dl_version", - hdrs = ["dl_version.h"], - include_prefix = "cloud_topics", - deps = ["//src/v/utils:named_type"], -) - redpanda_cc_library( name = "dl_overlay", srcs = [ @@ -86,6 +79,33 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "dl_version", + hdrs = ["dl_version.h"], + include_prefix = "cloud_topics", + deps = ["//src/v/utils:named_type"], +) + +redpanda_cc_library( + name = "dl_snapshot", + srcs = [ + "dl_snapshot.cc", + ], + hdrs = [ + "dl_snapshot.h", + ], + implementation_deps = [ + "@fmt", + ], + include_prefix = "cloud_topics", + deps = [ + ":dl_overlay", + ":dl_version", + "//src/v/container:fragmented_vector", + "//src/v/serde", + ], +) + redpanda_cc_library( name = "app", srcs = [ diff --git a/src/v/cloud_topics/CMakeLists.txt b/src/v/cloud_topics/CMakeLists.txt index f95db8d50ab59..65e5bf75a2f7d 100644 --- a/src/v/cloud_topics/CMakeLists.txt +++ b/src/v/cloud_topics/CMakeLists.txt @@ -3,6 +3,7 @@ v_cc_library( NAME cloud_topics_base SRCS dl_overlay.cc + dl_snapshot.cc logger.cc types.cc DEPS diff --git a/src/v/cloud_topics/dl_snapshot.cc b/src/v/cloud_topics/dl_snapshot.cc new file mode 100644 index 0000000000000..95a64ab13760c --- /dev/null +++ b/src/v/cloud_topics/dl_snapshot.cc @@ -0,0 +1,16 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "cloud_topics/dl_snapshot.h" + +auto fmt::formatter::format( + const experimental::cloud_topics::dl_snapshot_id& id, + fmt::format_context& ctx) const -> decltype(ctx.out()) { + return fmt::format_to(ctx.out(), "{}", id.version); +} diff --git a/src/v/cloud_topics/dl_snapshot.h b/src/v/cloud_topics/dl_snapshot.h new file mode 100644 index 0000000000000..498870802aac6 --- /dev/null +++ b/src/v/cloud_topics/dl_snapshot.h @@ -0,0 +1,55 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#pragma once + +#include "cloud_topics/dl_overlay.h" +#include "cloud_topics/dl_version.h" +#include "container/fragmented_vector.h" +#include "serde/envelope.h" + +namespace experimental::cloud_topics { + +struct dl_snapshot_id + : serde:: + envelope, serde::compat_version<0>> { + dl_snapshot_id() noexcept = default; + + explicit dl_snapshot_id(dl_version version) noexcept + : version(version) {} + + auto serde_fields() { return std::tie(version); } + + bool operator==(const dl_snapshot_id& other) const noexcept = default; + + /// Version for which the snapshot is created. + dl_version version; +}; + +struct dl_snapshot_payload + : serde::checksum_envelope< + dl_snapshot_id, + serde::version<0>, + serde::compat_version<0>> { + /// Version for which the snapshot is created. + dl_snapshot_id id; + + /// Overlays visible at the snapshot version. + fragmented_vector overlays; +}; + +}; // namespace experimental::cloud_topics + +template<> +struct fmt::formatter + : fmt::formatter { + auto format( + const experimental::cloud_topics::dl_snapshot_id&, + fmt::format_context& ctx) const -> decltype(ctx.out()); +}; diff --git a/src/v/cloud_topics/dl_stm/BUILD b/src/v/cloud_topics/dl_stm/BUILD index ba2447e7edd1c..5eb84bc75b424 100644 --- a/src/v/cloud_topics/dl_stm/BUILD +++ b/src/v/cloud_topics/dl_stm/BUILD @@ -7,6 +7,7 @@ redpanda_cc_library( visibility = [":__subpackages__"], deps = [ "//src/v/cloud_topics:dl_overlay", + "//src/v/cloud_topics:dl_version", "//src/v/serde", ], ) @@ -19,6 +20,7 @@ redpanda_cc_library( visibility = [":__subpackages__"], deps = [ "//src/v/cloud_topics:dl_overlay", + "//src/v/cloud_topics:dl_snapshot", "//src/v/cloud_topics:dl_version", "//src/v/container:fragmented_vector", "//src/v/model", @@ -58,6 +60,8 @@ redpanda_cc_library( deps = [ "//src/v/base", "//src/v/cloud_topics:dl_overlay", + "//src/v/cloud_topics:dl_snapshot", + "//src/v/cloud_topics:dl_version", "//src/v/model", "@seastar", ], diff --git a/src/v/cloud_topics/dl_stm/dl_stm.cc b/src/v/cloud_topics/dl_stm/dl_stm.cc index db91721310ce7..5532f2921d21f 100644 --- a/src/v/cloud_topics/dl_stm/dl_stm.cc +++ b/src/v/cloud_topics/dl_stm/dl_stm.cc @@ -46,6 +46,17 @@ ss::future<> dl_stm::do_apply(const model::record_batch& batch) { _state.push_overlay(new_dl_version, std::move(cmd.overlay)); break; } + case dl_stm_key::start_snapshot: { + std::ignore = serde::from_iobuf( + r.release_value()); + _state.start_snapshot(new_dl_version); + break; + } + case dl_stm_key::remove_snapshots_before_version: + auto cmd = serde::from_iobuf( + r.release_value()); + _state.remove_snapshots_before(cmd.last_version_to_keep); + break; } }); diff --git a/src/v/cloud_topics/dl_stm/dl_stm_api.cc b/src/v/cloud_topics/dl_stm/dl_stm_api.cc index 016ec4f10f16a..3cd1fcb8f25e6 100644 --- a/src/v/cloud_topics/dl_stm/dl_stm_api.cc +++ b/src/v/cloud_topics/dl_stm/dl_stm_api.cc @@ -17,6 +17,8 @@ #include "serde/rw/uuid.h" #include "storage/record_batch_builder.h" +#include + namespace experimental::cloud_topics { std::ostream& operator<<(std::ostream& o, dl_stm_api_errc errc) { @@ -32,13 +34,15 @@ dl_stm_api::dl_stm_api(ss::logger& logger, ss::shared_ptr stm) : _logger(logger) , _stm(std::move(stm)) {} +ss::future<> dl_stm_api::stop() { co_await _gate.close(); } + ss::future> dl_stm_api::push_overlay(dl_overlay overlay) { + vlog(_logger.debug, "Replicating dl_stm_cmd::push_overlay_cmd"); + auto h = _gate.hold(); + // TODO: Sync state and consider whether we need to encode invariants in the // command. - model::term_id term = _stm->_raft->term(); - - vlog(_logger.debug, "Replicating dl_stm_cmd::push_overlay_cmd"); storage::record_batch_builder builder( model::record_batch_type::dl_stm_command, model::offset(0)); @@ -47,6 +51,82 @@ dl_stm_api::push_overlay(dl_overlay overlay) { serde::to_iobuf(push_overlay_cmd(std::move(overlay)))); auto batch = std::move(builder).build(); + auto apply_result = co_await replicated_apply(std::move(batch)); + if (apply_result.has_failure()) { + co_return apply_result.error(); + } + + co_return outcome::success(true); +} + +std::optional dl_stm_api::lower_bound(kafka::offset offset) const { + return _stm->_state.lower_bound(offset); +} + +ss::future> +dl_stm_api::start_snapshot() { + vlog(_logger.debug, "Replicating dl_stm_cmd::start_snapshot_cmd"); + auto h = _gate.hold(); + + storage::record_batch_builder builder( + model::record_batch_type::dl_stm_command, model::offset(0)); + builder.add_raw_kv( + serde::to_iobuf(dl_stm_key::start_snapshot), + serde::to_iobuf(start_snapshot_cmd())); + + auto batch = std::move(builder).build(); + + auto apply_result = co_await replicated_apply(std::move(batch)); + if (apply_result.has_failure()) { + co_return apply_result.error(); + } + + // We abuse knowledge of implementation detail here to construct the + // dl_snapshot_id without having to setup listeners and notifiers of command + // apply. + auto expected_id = dl_snapshot_id(dl_version(apply_result.value())); + + // Ensure that the expected snapshot was created. + if (!_stm->_state.snapshot_exists(expected_id)) { + throw std::runtime_error(fmt::format( + "Snapshot with expected id not found after waiting for command to be " + "applied: {}", + expected_id)); + } + + co_return outcome::success(expected_id); +} + +std::optional +dl_stm_api::read_snapshot(dl_snapshot_id id) { + return _stm->_state.read_snapshot(id); +} + +ss::future> +dl_stm_api::remove_snapshots_before(dl_version last_version_to_keep) { + vlog(_logger.debug, "Replicating dl_stm_cmd::remove_snapshots_cmd"); + auto h = _gate.hold(); + + storage::record_batch_builder builder( + model::record_batch_type::dl_stm_command, model::offset(0)); + builder.add_raw_kv( + serde::to_iobuf(dl_stm_key::remove_snapshots_before_version), + serde::to_iobuf( + remove_snapshots_before_version_cmd(last_version_to_keep))); + + auto batch = std::move(builder).build(); + auto apply_result = co_await replicated_apply(std::move(batch)); + if (apply_result.has_failure()) { + co_return apply_result.error(); + } + + co_return outcome::success(); +} + +ss::future> +dl_stm_api::replicated_apply(model::record_batch&& batch) { + model::term_id term = _stm->_raft->term(); + auto reader = model::make_memory_record_batch_reader(std::move(batch)); auto opts = raft::replicate_options(raft::consistency_level::quorum_ack); @@ -58,14 +138,10 @@ dl_stm_api::push_overlay(dl_overlay overlay) { fmt::format("Failed to replicate overlay: {}", res.error())); } - co_await _stm->wait_no_throw( + co_await _stm->wait( res.value().last_offset, model::timeout_clock::now() + 30s); - co_return outcome::success(true); -} - -std::optional dl_stm_api::lower_bound(kafka::offset offset) const { - return _stm->_state.lower_bound(offset); + co_return res.value().last_offset; } }; // namespace experimental::cloud_topics diff --git a/src/v/cloud_topics/dl_stm/dl_stm_api.h b/src/v/cloud_topics/dl_stm/dl_stm_api.h index a47d59a2caa63..efbcba1fa1394 100644 --- a/src/v/cloud_topics/dl_stm/dl_stm_api.h +++ b/src/v/cloud_topics/dl_stm/dl_stm_api.h @@ -11,7 +11,11 @@ #include "base/outcome.h" #include "cloud_topics/dl_overlay.h" +#include "cloud_topics/dl_snapshot.h" +#include "cloud_topics/dl_version.h" +#include "model/record.h" +#include #include #include @@ -32,6 +36,13 @@ class dl_stm_api { dl_stm_api(ss::logger& logger, ss::shared_ptr stm); dl_stm_api(dl_stm_api&&) noexcept = default; + ~dl_stm_api() { + vassert(_gate.is_closed(), "object destroyed before calling stop()"); + } + +public: + ss::future<> stop(); + public: /// Attempt to add a new overlay. ss::future> push_overlay(dl_overlay overlay); @@ -41,9 +52,31 @@ class dl_stm_api { /// available offset. std::optional lower_bound(kafka::offset offset) const; + /// Request a new snapshot to be created. + ss::future> start_snapshot(); + + /// Read the payload of a snapshot. + std::optional read_snapshot(dl_snapshot_id id); + + /// Remove all snapshots with version less than the given version. + /// This must be called periodically as new snapshots are being created + /// to avoid the state growing indefinitely. + ss::future> + remove_snapshots_before(dl_version last_version_to_keep); + +private: + /// Replicate a record batch and wait for it to be applied to the dl_stm. + /// Returns the offset at which the batch was applied. + ss::future> + replicated_apply(model::record_batch&& batch); + private: ss::logger& _logger; + /// Gate held by async operations to ensure that the API is not destroyed + /// while an operation is in progress. + ss::gate _gate; + /// The API can only read the state of the stm. The state can be mutated /// only via \ref consensus::replicate calls. ss::shared_ptr _stm; diff --git a/src/v/cloud_topics/dl_stm/dl_stm_commands.h b/src/v/cloud_topics/dl_stm/dl_stm_commands.h index 7f51c6c15ed9c..0791be2e9ef63 100644 --- a/src/v/cloud_topics/dl_stm/dl_stm_commands.h +++ b/src/v/cloud_topics/dl_stm/dl_stm_commands.h @@ -10,6 +10,7 @@ #pragma once #include "cloud_topics/dl_overlay.h" +#include "cloud_topics/dl_version.h" #include "model/fundamental.h" #include "model/timestamp.h" #include "serde/envelope.h" @@ -28,4 +29,29 @@ struct push_overlay_cmd dl_overlay overlay; }; +struct start_snapshot_cmd + : public serde::envelope< + start_snapshot_cmd, + serde::version<0>, + serde::compat_version<0>> { + start_snapshot_cmd() noexcept = default; + + auto serde_fields() { return std::tie(); } +}; + +struct remove_snapshots_before_version_cmd + : public serde::envelope< + remove_snapshots_before_version_cmd, + serde::version<0>, + serde::compat_version<0>> { + remove_snapshots_before_version_cmd() noexcept = default; + explicit remove_snapshots_before_version_cmd( + dl_version last_version_to_keep) + : last_version_to_keep(last_version_to_keep) {} + + auto serde_fields() { return std::tie(last_version_to_keep); } + + dl_version last_version_to_keep{}; +}; + } // namespace experimental::cloud_topics diff --git a/src/v/cloud_topics/dl_stm/dl_stm_state.cc b/src/v/cloud_topics/dl_stm/dl_stm_state.cc index 37dc4669346d8..7b5d2e56c93c1 100644 --- a/src/v/cloud_topics/dl_stm/dl_stm_state.cc +++ b/src/v/cloud_topics/dl_stm/dl_stm_state.cc @@ -10,6 +10,7 @@ #include "cloud_topics/dl_stm/dl_stm_state.h" #include "cloud_topics/dl_overlay.h" +#include "cloud_topics/dl_snapshot.h" #include "model/fundamental.h" #include @@ -71,4 +72,79 @@ dl_stm_state::lower_bound(kafka::offset offset) const { return best_match; } +dl_snapshot_id dl_stm_state::start_snapshot(dl_version version) noexcept { + _version_invariant.set_last_snapshot_version(version); + + auto id = dl_snapshot_id(version); + _snapshots.push_back(id); + + return id; +} + +bool dl_stm_state::snapshot_exists(dl_snapshot_id id) const noexcept { + return std::binary_search( + _snapshots.begin(), + _snapshots.end(), + id, + [](const dl_snapshot_id& a, const dl_snapshot_id& b) { + return a.version < b.version; + }); +} + +std::optional +dl_stm_state::read_snapshot(dl_snapshot_id id) const { + auto it = std::find_if( + _snapshots.begin(), _snapshots.end(), [&id](const dl_snapshot_id& s) { + return s.version == id.version; + }); + + // Snapshot not found. + if (it == _snapshots.end()) { + return std::nullopt; + } + + // Collect overlays that are visible at the snapshot version. + fragmented_vector overlays; + for (const auto& entry : _overlays) { + if ( + entry.added_at <= id.version + && (entry.removed_at == dl_version{} || entry.removed_at > id.version)) { + overlays.push_back(entry.overlay); + } + } + + return dl_snapshot_payload{ + .id = *it, + .overlays = std::move(overlays), + }; +} + +void dl_stm_state::remove_snapshots_before(dl_version last_version_to_keep) { + if (_snapshots.empty()) { + throw std::runtime_error(fmt::format( + "Attempt to remove snapshots before version {} but no snapshots " + "exist", + last_version_to_keep)); + } + + // Find the first snapshot to keep. It is the first snapshot with a version + // equal or greater than the version to keep. + auto it = std::lower_bound( + _snapshots.begin(), + _snapshots.end(), + last_version_to_keep, + [](const dl_snapshot_id& a, dl_version b) { return a.version < b; }); + + if (it == _snapshots.begin()) { + // Short circuit if there are no snapshots to remove + return; + } else if (it == _snapshots.end()) { + throw std::runtime_error(fmt::format( + "Trying to remove snapshots before an non-existent snapshot", + last_version_to_keep)); + } else { + _snapshots.erase(_snapshots.begin(), it); + } +} + } // namespace experimental::cloud_topics diff --git a/src/v/cloud_topics/dl_stm/dl_stm_state.h b/src/v/cloud_topics/dl_stm/dl_stm_state.h index 09b876dd91b04..442c0b57ec0f8 100644 --- a/src/v/cloud_topics/dl_stm/dl_stm_state.h +++ b/src/v/cloud_topics/dl_stm/dl_stm_state.h @@ -10,9 +10,12 @@ #pragma once #include "cloud_topics/dl_overlay.h" +#include "cloud_topics/dl_snapshot.h" #include "cloud_topics/dl_version.h" #include "container/fragmented_vector.h" +#include + namespace experimental::cloud_topics { struct dl_overlay_entry { @@ -25,17 +28,34 @@ struct dl_overlay_entry { class dl_version_monotonic_invariant { public: void set_version(dl_version version) noexcept { - // Greater or equal is required to handle retries. + // Greater or equal for `_last_version` is required to handle retries. + // Greater for `_last_snapshot_version` to avoid mutating an existing + // snapshot. vassert( - version >= _last_version, - "Version can't go backwards. Current version: {}, new version: {}", + version >= _last_version && version > _last_snapshot_version, + "Version can't go backwards. Current version: {}, new version: {}, " + "last snapshot version: {}", _last_version, - version); + version, + _last_snapshot_version); _last_version = version; } + void set_last_snapshot_version(dl_version version) noexcept { + // Greater or equal is required to handle retries. + vassert( + version >= _last_snapshot_version, + "Snapshot version can't go backwards. Current snapshot version: {}, " + "new snapshot version: {}", + _last_snapshot_version, + version); + set_version(version); + _last_snapshot_version = version; + } + private: dl_version _last_version; + dl_version _last_snapshot_version; }; /// In-memory state of the data layout state machine (dl_stm). @@ -55,11 +75,29 @@ class dl_stm_state { /// available offset. std::optional lower_bound(kafka::offset offset) const; + /// Create a handle to a snapshot of the state at the current version. + /// The snapshot id can be used later to read snapshot contents. + dl_snapshot_id start_snapshot(dl_version version) noexcept; + + bool snapshot_exists(dl_snapshot_id id) const noexcept; + + /// Snapshot of the state at the given version. + std::optional read_snapshot(dl_snapshot_id id) const; + + /// Remove all snapshots with version less than the given version. + void remove_snapshots_before(dl_version last_version_to_keep); + private: // A list of overlays that are stored in the cloud storage. // The order of elements is undefined. std::deque _overlays; + // A list of snapshot handles that are currently open. + // The list is ordered by version in ascending order to efficiently find the + // oldest snapshot when running state garbage collection and to remove + // closed snapshots. + std::deque _snapshots; + dl_version_monotonic_invariant _version_invariant; }; diff --git a/src/v/cloud_topics/dl_stm/tests/BUILD b/src/v/cloud_topics/dl_stm/tests/BUILD index becec908c7f5d..f9644e11b3cb1 100644 --- a/src/v/cloud_topics/dl_stm/tests/BUILD +++ b/src/v/cloud_topics/dl_stm/tests/BUILD @@ -8,6 +8,7 @@ redpanda_cc_gtest( ], deps = [ "//src/v/cloud_topics:dl_overlay", + "//src/v/cloud_topics:dl_version", "//src/v/cloud_topics:logger", "//src/v/cloud_topics:types", "//src/v/cloud_topics/dl_stm:dl_stm_state", diff --git a/src/v/cloud_topics/dl_stm/tests/dl_stm_state_test.cc b/src/v/cloud_topics/dl_stm/tests/dl_stm_state_test.cc index 8c7a6b410e2c1..95b5733540017 100644 --- a/src/v/cloud_topics/dl_stm/tests/dl_stm_state_test.cc +++ b/src/v/cloud_topics/dl_stm/tests/dl_stm_state_test.cc @@ -9,6 +9,8 @@ #include "cloud_topics/dl_overlay.h" #include "cloud_topics/dl_stm/dl_stm_state.h" +#include "cloud_topics/dl_version.h" +#include "gmock/gmock.h" #include "gtest/gtest.h" #include "random/generators.h" #include "test_utils/test.h" @@ -167,3 +169,144 @@ TEST(dl_stm_state, lower_bound) { } while (std::next_permutation( push_order.begin(), push_order.end(), base_offset_less_cmp)); } + +TEST(dl_stm_state_death, start_snapshot) { + ct::dl_stm_state state; + + auto snapshot_id1 = state.start_snapshot(ct::dl_version(1)); + ASSERT_EQ(snapshot_id1.version, ct::dl_version(1)); + + auto snapshot1 = state.read_snapshot(snapshot_id1); + ASSERT_TRUE(snapshot1.has_value()); + ASSERT_EQ(snapshot1->id, snapshot_id1); + ASSERT_TRUE(snapshot1->overlays.empty()); + + // This does not exist yet. + ASSERT_FALSE( + state.read_snapshot(ct::dl_snapshot_id(ct::dl_version(2))).has_value()); + + auto snapshot_id2 = state.start_snapshot(ct::dl_version(2)); + ASSERT_EQ(snapshot_id2.version, ct::dl_version(2)); + + auto snapshot2 = state.read_snapshot(snapshot_id2); + ASSERT_TRUE(snapshot2.has_value()); + ASSERT_EQ(snapshot2->id, snapshot_id2); + ASSERT_TRUE(snapshot2->overlays.empty()); + + // Starting a snapshot without advancing the version should throw. + ASSERT_DEATH( + { state.start_snapshot(ct::dl_version(1)); }, + "Snapshot version can't go backwards. Current snapshot version: 2, new " + "snapshot version: 1"); + + ASSERT_DEATH( + { + state.push_overlay( + ct::dl_version(2), + make_overlay(kafka::offset(0), kafka::offset(10))); + }, + "Version can't go backwards. Current version: 2, new version: 2, last " + "snapshot version: 2"); +} + +TEST(dl_stm_state, start_snapshot) { + ct::dl_stm_state state; + + auto overlay0 = make_overlay(kafka::offset(0), kafka::offset(10)); + state.push_overlay(ct::dl_version(1), overlay0); + + auto snapshot_id0 = state.start_snapshot(ct::dl_version(1)); + + // Mark the overlay as removed. + q::overlays(state).front().removed_at = ct::dl_version(2); + + auto snapshot_id1 = state.start_snapshot(ct::dl_version(2)); + + auto overlay1 = make_overlay(kafka::offset(5), kafka::offset(8)); + state.push_overlay(ct::dl_version(3), overlay1); + + auto snapshot_id2 = state.start_snapshot(ct::dl_version(3)); + + auto overlay2 = make_overlay(kafka::offset(6), kafka::offset(20)); + state.push_overlay(ct::dl_version(4), overlay2); + + auto snapshot_id3 = state.start_snapshot(ct::dl_version(5)); + + auto snapshot0 = state.read_snapshot(snapshot_id0); + ASSERT_TRUE(snapshot0.has_value()); + ASSERT_EQ(snapshot0->id, snapshot_id0); + ASSERT_EQ(snapshot0->overlays.size(), 1) << snapshot0->overlays; + ASSERT_EQ(snapshot0->overlays[0], overlay0); + + auto snapshot1 = state.read_snapshot(snapshot_id1); + ASSERT_TRUE(snapshot1.has_value()); + ASSERT_EQ(snapshot1->id, snapshot_id1); + ASSERT_EQ(snapshot1->overlays.size(), 0); + + auto snapshot2 = state.read_snapshot(snapshot_id2); + ASSERT_TRUE(snapshot2.has_value()); + ASSERT_EQ(snapshot2->id, snapshot_id2); + ASSERT_EQ(snapshot2->overlays.size(), 1) << snapshot2->overlays; + ASSERT_EQ(snapshot2->overlays[0], overlay1); + + auto snapshot3 = state.read_snapshot(snapshot_id3); + ASSERT_TRUE(snapshot3.has_value()); + ASSERT_EQ(snapshot3->id, snapshot_id3); + ASSERT_EQ(snapshot3->overlays.size(), 2); + ASSERT_EQ(snapshot3->overlays[0], overlay1) << snapshot3->overlays; + ASSERT_EQ(snapshot3->overlays[1], overlay2) << snapshot3->overlays; +} + +TEST(dl_stm_state, remove_snapshots_before) { + ct::dl_stm_state state; + + EXPECT_THAT( + [&]() { state.remove_snapshots_before(ct::dl_version(42)); }, + ThrowsMessage( + testing::HasSubstr("Attempt to remove snapshots before version 42 but " + "no snapshots exist"))); + + auto overlay0 = make_overlay(kafka::offset(0), kafka::offset(10)); + state.push_overlay(ct::dl_version(1), overlay0); + + auto snapshot_id0 = state.start_snapshot(ct::dl_version(1)); + + // Mark the overlay as removed. + q::overlays(state).front().removed_at = ct::dl_version(2); + + auto snapshot_id1 = state.start_snapshot(ct::dl_version(2)); + + auto overlay1 = make_overlay(kafka::offset(5), kafka::offset(8)); + state.push_overlay(ct::dl_version(3), overlay1); + + auto snapshot_id2 = state.start_snapshot(ct::dl_version(3)); + + auto overlay2 = make_overlay(kafka::offset(6), kafka::offset(20)); + state.push_overlay(ct::dl_version(4), overlay2); + + auto snapshot_id3 = state.start_snapshot(ct::dl_version(5)); + + // Test that operation is idempotent. + for (auto i = 0; i < 3; ++i) { + state.remove_snapshots_before(ct::dl_version(3)); + + ASSERT_FALSE(state.snapshot_exists(snapshot_id0)); + ASSERT_FALSE(state.snapshot_exists(snapshot_id1)); + ASSERT_TRUE(state.snapshot_exists(snapshot_id2)); + ASSERT_TRUE(state.snapshot_exists(snapshot_id3)); + + // Retrying the an out-of-date version is an idempotent operation too. + state.remove_snapshots_before(ct::dl_version(2)); + } + + // It should be impossible to make a call like this because the contract + // with the callers is that they should first call `start_snapshot` and can + // call remove_snapshots_before only with the result of the `start_snapshot` + // call. + // In case this bug is introduced we want to throw an exception instead of + // failing silently. + EXPECT_THAT( + [&]() { state.remove_snapshots_before(ct::dl_version::max()); }, + ThrowsMessage(testing::HasSubstr( + "Trying to remove snapshots before an non-existent snapshot"))); +} diff --git a/src/v/cloud_topics/dl_stm/tests/dl_stm_test.cc b/src/v/cloud_topics/dl_stm/tests/dl_stm_test.cc index 8cd93dafcbde0..2fb819b50e734 100644 --- a/src/v/cloud_topics/dl_stm/tests/dl_stm_test.cc +++ b/src/v/cloud_topics/dl_stm/tests/dl_stm_test.cc @@ -24,6 +24,12 @@ class dl_stm_fixture : public raft::raft_fixture { public: static constexpr auto node_count = 3; + ~dl_stm_fixture() override { + for (auto& entry : api_by_vnode) { + entry.second->stop().get(); + } + }; + ss::future<> start() { for (auto i = 0; i < node_count; ++i) { add_node(model::node_id(i), model::revision_id(0)); @@ -87,4 +93,32 @@ TEST_F_CORO(dl_stm_fixture, test_basic) { ASSERT_TRUE_CORO( api(node(*get_leader())).lower_bound(kafka::offset(0)).has_value()); + + auto snapshot_res = co_await api(node(*get_leader())).start_snapshot(); + ASSERT_FALSE_CORO(res.has_error()); + + ASSERT_TRUE_CORO( + api(node(*get_leader())).read_snapshot(snapshot_res.value()).has_value()); + ASSERT_EQ_CORO( + api(node(*get_leader())) + .read_snapshot(snapshot_res.value()) + ->overlays.size(), + 1); + + auto snapshot_res2 = co_await api(node(*get_leader())).start_snapshot(); + + ASSERT_TRUE_CORO(api(node(*get_leader())) + .read_snapshot(snapshot_res2.value()) + .has_value()); + + auto remove_res = co_await api(node(*get_leader())) + .remove_snapshots_before(snapshot_res2.value().version); + ASSERT_FALSE_CORO(remove_res.has_error()); + + ASSERT_FALSE_CORO( + api(node(*get_leader())).read_snapshot(snapshot_res.value()).has_value()); + + ASSERT_TRUE_CORO(api(node(*get_leader())) + .read_snapshot(snapshot_res2.value()) + .has_value()); } diff --git a/src/v/cloud_topics/types.cc b/src/v/cloud_topics/types.cc index 3f6e50d7b1133..785a1b6a2d194 100644 --- a/src/v/cloud_topics/types.cc +++ b/src/v/cloud_topics/types.cc @@ -17,6 +17,11 @@ auto fmt::formatter::format( switch (key) { case experimental::cloud_topics::dl_stm_key::push_overlay: return fmt::format_to(ctx.out(), "push_overlay"); + case experimental::cloud_topics::dl_stm_key::start_snapshot: + return fmt::format_to(ctx.out(), "start_snapshot"); + case experimental::cloud_topics::dl_stm_key:: + remove_snapshots_before_version: + return fmt::format_to(ctx.out(), "remove_snapshots_before_version"); } return fmt::format_to( ctx.out(), "unknown dl_stm_key({})", static_cast(key)); diff --git a/src/v/cloud_topics/types.h b/src/v/cloud_topics/types.h index 7ef75f7adc220..bd1e16bf9d107 100644 --- a/src/v/cloud_topics/types.h +++ b/src/v/cloud_topics/types.h @@ -21,6 +21,8 @@ namespace experimental::cloud_topics { enum class dl_stm_key { push_overlay = 0, + start_snapshot = 1, + remove_snapshots_before_version = 2, // TODO: add all commands }; diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 8febc0d334397..d8337ad611f9b 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -585,6 +585,14 @@ ss::future<> partition::stop() { co_await _cloud_storage_manifest_view->stop(); } + if (_dl_stm_api) { + vlog( + clusterlog.debug, + "Stopping dl_stm_api on partition: {}", + partition_ntp); + co_await _dl_stm_api->stop(); + } + _probe.clear_metrics(); vlog(clusterlog.debug, "Stopped partition {}", partition_ntp); }