Skip to content

Commit

Permalink
ct: use replicated_apply for push_overlay
Browse files Browse the repository at this point in the history
  • Loading branch information
nvartolomei committed Nov 28, 2024
1 parent 4bf9b74 commit b711212
Showing 1 changed file with 4 additions and 15 deletions.
19 changes: 4 additions & 15 deletions src/v/cloud_topics/dl_stm/dl_stm_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,11 @@ ss::future<> dl_stm_api::stop() { co_await _gate.close(); }

ss::future<result<bool, dl_stm_api_errc>>
dl_stm_api::push_overlay(dl_overlay overlay) {
vlog(_logger.debug, "Replicating dl_stm_cmd::push_overlay_cmd");
auto h = _gate.hold();

// TODO: Sync state and consider whether we need to encode invariants in the
// command.
model::term_id term = _stm->_raft->term();

vlog(_logger.debug, "Replicating dl_stm_cmd::push_overlay_cmd");

storage::record_batch_builder builder(
model::record_batch_type::dl_stm_command, model::offset(0));
Expand All @@ -53,20 +51,11 @@ dl_stm_api::push_overlay(dl_overlay overlay) {
serde::to_iobuf(push_overlay_cmd(std::move(overlay))));

auto batch = std::move(builder).build();
auto reader = model::make_memory_record_batch_reader(std::move(batch));

auto opts = raft::replicate_options(raft::consistency_level::quorum_ack);
opts.set_force_flush();
auto res = co_await _stm->_raft->replicate(term, std::move(reader), opts);

if (res.has_error()) {
throw std::runtime_error(
fmt::format("Failed to replicate overlay: {}", res.error()));
auto apply_result = co_await replicated_apply(std::move(batch));
if (apply_result.has_failure()) {
co_return apply_result.error();
}

co_await _stm->wait_no_throw(
res.value().last_offset, model::timeout_clock::now() + 30s);

co_return outcome::success(true);
}

Expand Down

0 comments on commit b711212

Please sign in to comment.