Skip to content

Commit

Permalink
ct: add snapshotting
Browse files Browse the repository at this point in the history
  • Loading branch information
nvartolomei committed Nov 28, 2024
1 parent 9b42aaf commit 4bf9b74
Show file tree
Hide file tree
Showing 17 changed files with 571 additions and 11 deletions.
34 changes: 27 additions & 7 deletions src/v/cloud_topics/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,6 @@ redpanda_cc_library(
],
)

redpanda_cc_library(
name = "dl_version",
hdrs = ["dl_version.h"],
include_prefix = "cloud_topics",
deps = ["//src/v/utils:named_type"],
)

redpanda_cc_library(
name = "dl_overlay",
srcs = [
Expand Down Expand Up @@ -82,6 +75,33 @@ redpanda_cc_library(
],
)

redpanda_cc_library(
name = "dl_version",
hdrs = ["dl_version.h"],
include_prefix = "cloud_topics",
deps = ["//src/v/utils:named_type"],
)

redpanda_cc_library(
name = "dl_snapshot",
srcs = [
"dl_snapshot.cc",
],
hdrs = [
"dl_snapshot.h",
],
implementation_deps = [
"@fmt",
],
include_prefix = "cloud_topics",
deps = [
":dl_overlay",
":dl_version",
"//src/v/container:fragmented_vector",
"//src/v/serde",
],
)

redpanda_cc_library(
name = "app",
srcs = [
Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_topics/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ v_cc_library(
NAME cloud_topics_base
SRCS
dl_overlay.cc
dl_snapshot.cc
logger.cc
types.cc
DEPS
Expand Down
16 changes: 16 additions & 0 deletions src/v/cloud_topics/dl_snapshot.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "cloud_topics/dl_snapshot.h"

auto fmt::formatter<experimental::cloud_topics::dl_snapshot_id>::format(
const experimental::cloud_topics::dl_snapshot_id& id,
fmt::format_context& ctx) const -> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "{}", id.version);
}
55 changes: 55 additions & 0 deletions src/v/cloud_topics/dl_snapshot.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#pragma once

#include "cloud_topics/dl_overlay.h"
#include "cloud_topics/dl_version.h"
#include "container/fragmented_vector.h"
#include "serde/envelope.h"

namespace experimental::cloud_topics {

struct dl_snapshot_id
: serde::
envelope<dl_snapshot_id, serde::version<0>, serde::compat_version<0>> {
dl_snapshot_id() noexcept = default;

explicit dl_snapshot_id(dl_version version) noexcept
: version(version) {}

auto serde_fields() { return std::tie(version); }

bool operator==(const dl_snapshot_id& other) const noexcept = default;

/// Version for which the snapshot is created.
dl_version version;
};

struct dl_snapshot_payload
: serde::checksum_envelope<
dl_snapshot_id,
serde::version<0>,
serde::compat_version<0>> {
/// Version for which the snapshot is created.
dl_snapshot_id id;

/// Overlays visible at the snapshot version.
fragmented_vector<dl_overlay> overlays;
};

}; // namespace experimental::cloud_topics

template<>
struct fmt::formatter<experimental::cloud_topics::dl_snapshot_id>
: fmt::formatter<std::string_view> {
auto format(
const experimental::cloud_topics::dl_snapshot_id&,
fmt::format_context& ctx) const -> decltype(ctx.out());
};
4 changes: 4 additions & 0 deletions src/v/cloud_topics/dl_stm/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ redpanda_cc_library(
visibility = [":__subpackages__"],
deps = [
"//src/v/cloud_topics:dl_overlay",
"//src/v/cloud_topics:dl_version",
"//src/v/serde",
],
)
Expand All @@ -19,6 +20,7 @@ redpanda_cc_library(
visibility = [":__subpackages__"],
deps = [
"//src/v/cloud_topics:dl_overlay",
"//src/v/cloud_topics:dl_snapshot",
"//src/v/cloud_topics:dl_version",
"//src/v/container:fragmented_vector",
"//src/v/model",
Expand Down Expand Up @@ -58,6 +60,8 @@ redpanda_cc_library(
deps = [
"//src/v/base",
"//src/v/cloud_topics:dl_overlay",
"//src/v/cloud_topics:dl_snapshot",
"//src/v/cloud_topics:dl_version",
"//src/v/model",
"@seastar",
],
Expand Down
11 changes: 11 additions & 0 deletions src/v/cloud_topics/dl_stm/dl_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ ss::future<> dl_stm::do_apply(const model::record_batch& batch) {
_state.push_overlay(new_dl_version, std::move(cmd.overlay));
break;
}
case dl_stm_key::start_snapshot: {
std::ignore = serde::from_iobuf<start_snapshot_cmd>(
r.release_value());
_state.start_snapshot(new_dl_version);
break;
}
case dl_stm_key::remove_snapshots_before_version:
auto cmd = serde::from_iobuf<remove_snapshots_before_version_cmd>(
r.release_value());
_state.remove_snapshots_before(cmd.last_version_to_keep);
break;
}
});

Expand Down
87 changes: 87 additions & 0 deletions src/v/cloud_topics/dl_stm/dl_stm_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "serde/rw/uuid.h"
#include "storage/record_batch_builder.h"

#include <stdexcept>

namespace experimental::cloud_topics {

std::ostream& operator<<(std::ostream& o, dl_stm_api_errc errc) {
Expand All @@ -32,8 +34,12 @@ dl_stm_api::dl_stm_api(ss::logger& logger, ss::shared_ptr<dl_stm> stm)
: _logger(logger)
, _stm(std::move(stm)) {}

ss::future<> dl_stm_api::stop() { co_await _gate.close(); }

ss::future<result<bool, dl_stm_api_errc>>
dl_stm_api::push_overlay(dl_overlay overlay) {
auto h = _gate.hold();

// TODO: Sync state and consider whether we need to encode invariants in the
// command.
model::term_id term = _stm->_raft->term();
Expand Down Expand Up @@ -68,4 +74,85 @@ std::optional<dl_overlay> dl_stm_api::lower_bound(kafka::offset offset) const {
return _stm->_state.lower_bound(offset);
}

ss::future<checked<dl_snapshot_id, dl_stm_api_errc>>
dl_stm_api::start_snapshot() {
vlog(_logger.debug, "Replicating dl_stm_cmd::start_snapshot_cmd");
auto h = _gate.hold();

storage::record_batch_builder builder(
model::record_batch_type::dl_stm_command, model::offset(0));
builder.add_raw_kv(
serde::to_iobuf(dl_stm_key::start_snapshot),
serde::to_iobuf(start_snapshot_cmd()));

auto batch = std::move(builder).build();

auto apply_result = co_await replicated_apply(std::move(batch));
if (apply_result.has_failure()) {
co_return apply_result.error();
}

// We abuse knowledge of implementation detail here to construct the
// dl_snapshot_id without having to setup listeners and notifiers of command
// apply.
auto expected_id = dl_snapshot_id(dl_version(apply_result.value()));

// Ensure that the expected snapshot was created.
if (!_stm->_state.snapshot_exists(expected_id)) {
throw std::runtime_error(fmt::format(
"Snapshot with expected id not found after waiting for command to be "
"applied: {}",
expected_id));
}

co_return outcome::success(expected_id);
}

std::optional<dl_snapshot_payload>
dl_stm_api::read_snapshot(dl_snapshot_id id) {
return _stm->_state.read_snapshot(id);
}

ss::future<checked<void, dl_stm_api_errc>>
dl_stm_api::remove_snapshots_before(dl_version last_version_to_keep) {
vlog(_logger.debug, "Replicating dl_stm_cmd::remove_snapshots_cmd");
auto h = _gate.hold();

storage::record_batch_builder builder(
model::record_batch_type::dl_stm_command, model::offset(0));
builder.add_raw_kv(
serde::to_iobuf(dl_stm_key::remove_snapshots_before_version),
serde::to_iobuf(
remove_snapshots_before_version_cmd(last_version_to_keep)));

auto batch = std::move(builder).build();
auto apply_result = co_await replicated_apply(std::move(batch));
if (apply_result.has_failure()) {
co_return apply_result.error();
}

co_return outcome::success();
}

ss::future<checked<model::offset, dl_stm_api_errc>>
dl_stm_api::replicated_apply(model::record_batch&& batch) {
model::term_id term = _stm->_raft->term();

auto reader = model::make_memory_record_batch_reader(std::move(batch));

auto opts = raft::replicate_options(raft::consistency_level::quorum_ack);
opts.set_force_flush();
auto res = co_await _stm->_raft->replicate(term, std::move(reader), opts);

if (res.has_error()) {
throw std::runtime_error(
fmt::format("Failed to replicate overlay: {}", res.error()));
}

co_await _stm->wait(
res.value().last_offset, model::timeout_clock::now() + 30s);

co_return res.value().last_offset;
}

}; // namespace experimental::cloud_topics
33 changes: 33 additions & 0 deletions src/v/cloud_topics/dl_stm/dl_stm_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@

#include "base/outcome.h"
#include "cloud_topics/dl_overlay.h"
#include "cloud_topics/dl_snapshot.h"
#include "cloud_topics/dl_version.h"
#include "model/record.h"

#include <seastar/core/gate.hh>
#include <seastar/util/log.hh>

#include <ostream>
Expand All @@ -32,6 +36,13 @@ class dl_stm_api {
dl_stm_api(ss::logger& logger, ss::shared_ptr<dl_stm> stm);
dl_stm_api(dl_stm_api&&) noexcept = default;

~dl_stm_api() {
vassert(_gate.is_closed(), "object destroyed before calling stop()");
}

public:
ss::future<> stop();

public:
/// Attempt to add a new overlay.
ss::future<result<bool, dl_stm_api_errc>> push_overlay(dl_overlay overlay);
Expand All @@ -41,9 +52,31 @@ class dl_stm_api {
/// available offset.
std::optional<dl_overlay> lower_bound(kafka::offset offset) const;

/// Request a new snapshot to be created.
ss::future<checked<dl_snapshot_id, dl_stm_api_errc>> start_snapshot();

/// Read the payload of a snapshot.
std::optional<dl_snapshot_payload> read_snapshot(dl_snapshot_id id);

/// Remove all snapshots with version less than the given version.
/// This must be called periodically as new snapshots are being created
/// to avoid the state growing indefinitely.
ss::future<checked<void, dl_stm_api_errc>>
remove_snapshots_before(dl_version last_version_to_keep);

private:
/// Replicate a record batch and wait for it to be applied to the dl_stm.
/// Returns the offset at which the batch was applied.
ss::future<checked<model::offset, dl_stm_api_errc>>
replicated_apply(model::record_batch&& batch);

private:
ss::logger& _logger;

/// Gate held by async operations to ensure that the API is not destroyed
/// while an operation is in progress.
ss::gate _gate;

/// The API can only read the state of the stm. The state can be mutated
/// only via \ref consensus::replicate calls.
ss::shared_ptr<const dl_stm> _stm;
Expand Down
26 changes: 26 additions & 0 deletions src/v/cloud_topics/dl_stm/dl_stm_commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#pragma once

#include "cloud_topics/dl_overlay.h"
#include "cloud_topics/dl_version.h"
#include "model/fundamental.h"
#include "model/timestamp.h"
#include "serde/envelope.h"
Expand All @@ -28,4 +29,29 @@ struct push_overlay_cmd
dl_overlay overlay;
};

struct start_snapshot_cmd
: public serde::envelope<
start_snapshot_cmd,
serde::version<0>,
serde::compat_version<0>> {
start_snapshot_cmd() noexcept = default;

auto serde_fields() { return std::tie(); }
};

struct remove_snapshots_before_version_cmd
: public serde::envelope<
remove_snapshots_before_version_cmd,
serde::version<0>,
serde::compat_version<0>> {
remove_snapshots_before_version_cmd() noexcept = default;
explicit remove_snapshots_before_version_cmd(
dl_version last_version_to_keep)
: last_version_to_keep(last_version_to_keep) {}

auto serde_fields() { return std::tie(last_version_to_keep); }

dl_version last_version_to_keep{};
};

} // namespace experimental::cloud_topics
Loading

0 comments on commit 4bf9b74

Please sign in to comment.