diff --git a/src/v/cloud_topics/dl_stm/dl_stm_api.cc b/src/v/cloud_topics/dl_stm/dl_stm_api.cc index 7febb26e160f4..3cd1fcb8f25e6 100644 --- a/src/v/cloud_topics/dl_stm/dl_stm_api.cc +++ b/src/v/cloud_topics/dl_stm/dl_stm_api.cc @@ -38,13 +38,11 @@ ss::future<> dl_stm_api::stop() { co_await _gate.close(); } ss::future> dl_stm_api::push_overlay(dl_overlay overlay) { + vlog(_logger.debug, "Replicating dl_stm_cmd::push_overlay_cmd"); auto h = _gate.hold(); // TODO: Sync state and consider whether we need to encode invariants in the // command. - model::term_id term = _stm->_raft->term(); - - vlog(_logger.debug, "Replicating dl_stm_cmd::push_overlay_cmd"); storage::record_batch_builder builder( model::record_batch_type::dl_stm_command, model::offset(0)); @@ -53,20 +51,11 @@ dl_stm_api::push_overlay(dl_overlay overlay) { serde::to_iobuf(push_overlay_cmd(std::move(overlay)))); auto batch = std::move(builder).build(); - auto reader = model::make_memory_record_batch_reader(std::move(batch)); - - auto opts = raft::replicate_options(raft::consistency_level::quorum_ack); - opts.set_force_flush(); - auto res = co_await _stm->_raft->replicate(term, std::move(reader), opts); - - if (res.has_error()) { - throw std::runtime_error( - fmt::format("Failed to replicate overlay: {}", res.error())); + auto apply_result = co_await replicated_apply(std::move(batch)); + if (apply_result.has_failure()) { + co_return apply_result.error(); } - co_await _stm->wait_no_throw( - res.value().last_offset, model::timeout_clock::now() + 30s); - co_return outcome::success(true); }