Skip to content

Commit

Permalink
Merge pull request #24423 from WillemKauf/storage_chunked_compaction
Browse files Browse the repository at this point in the history
[CORE-8160] `storage`: add chunked compaction routine
  • Loading branch information
dotnwat authored Jan 22, 2025
2 parents 39840ad + 8a0299e commit 88fb0d2
Show file tree
Hide file tree
Showing 23 changed files with 723 additions and 217 deletions.
14 changes: 13 additions & 1 deletion src/v/model/record.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "serde/rw/iobuf.h"
#include "serde/rw/rw.h"

#include <seastar/core/loop.hh>
#include <seastar/core/smp.hh>
#include <seastar/util/optimized_optional.hh>

Expand Down Expand Up @@ -871,7 +872,18 @@ class record_batch
ss::future<> for_each_record_async(Func f) const {
auto it = record_batch_iterator::create(*this);
while (it.has_next()) {
co_await ss::futurize_invoke(f, it.next());
if constexpr (std::is_same_v<
ss::futurize_t<
std::invoke_result_t<Func, model::record>>,
ss::future<ss::stop_iteration>>) {
ss::stop_iteration s = co_await ss::futurize_invoke(
f, it.next());
if (s == ss::stop_iteration::yes) {
co_return;
}
} else {
co_await ss::futurize_invoke(f, it.next());
}
}
}

Expand Down
54 changes: 54 additions & 0 deletions src/v/storage/compaction_reducers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
#include "model/record_batch_types.h"
#include "model/record_utils.h"
#include "random/generators.h"
#include "storage/compacted_index.h"
#include "storage/compaction.h"
#include "storage/index_state.h"
#include "storage/logger.h"
#include "storage/parser_utils.h"
#include "storage/record_batch_utils.h"
#include "storage/segment_utils.h"

#include <seastar/core/future.hh>
#include <seastar/core/loop.hh>

#include <absl/algorithm/container.h>
#include <boost/range/irange.hpp>
Expand Down Expand Up @@ -443,4 +446,55 @@ ss::future<ss::stop_iteration> tx_reducer::operator()(model::record_batch&& b) {
co_return co_await _delegate(std::move(b));
}

ss::future<ss::stop_iteration> map_building_reducer::maybe_index_record_in_map(
const model::record& r,
model::offset base_offset,
model::record_batch_type type,
bool is_control,
bool& fully_indexed_batch) {
auto offset = base_offset + model::offset_delta(r.offset_delta());
if (offset < _start_offset) {
co_return ss::stop_iteration::no;
}

auto key_view = iobuf_to_bytes(r.key());
auto key = enhance_key(type, is_control, key_view);
bool success = co_await _map->put(key, offset);

if (success) {
co_return ss::stop_iteration::no;
}

fully_indexed_batch = false;
co_return ss::stop_iteration::yes;
}

ss::future<ss::stop_iteration>
map_building_reducer::operator()(model::record_batch batch) {
bool fully_indexed_batch = true;
// There is no point to indexing records in uncompactible batches, since
// their inclusion in the segment post compaction is irrespective of the map
// state (see copy_data_segment_reducer::filter()).
if (!is_compactible(batch)) {
co_return ss::stop_iteration::no;
}
auto b = co_await decompress_batch(std::move(batch));
co_await b.for_each_record_async(
[this,
&fully_indexed_batch,
base_offset = b.base_offset(),
type = b.header().type,
is_control = b.header().attrs.is_control()](
const model::record& r) -> ss::future<ss::stop_iteration> {
return maybe_index_record_in_map(
r, base_offset, type, is_control, fully_indexed_batch);
});

if (fully_indexed_batch) {
co_return ss::stop_iteration::no;
}
_fully_indexed_segment = false;
co_return ss::stop_iteration::yes;
}

} // namespace storage::internal
35 changes: 35 additions & 0 deletions src/v/storage/compaction_reducers.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "bytes/bytes.h"
#include "container/fragmented_vector.h"
#include "hashing/xx.h"
#include "model/fundamental.h"
#include "model/record_batch_reader.h"
#include "storage/compacted_index.h"
#include "storage/compacted_index_writer.h"
Expand All @@ -24,6 +25,7 @@
#include "storage/logger.h"
#include "utils/tracking_allocator.h"

#include <seastar/core/loop.hh>
#include <seastar/util/noncopyable_function.hh>

#include <absl/container/btree_map.h>
Expand Down Expand Up @@ -309,4 +311,37 @@ class tx_reducer : public compaction_reducer {
std::optional<storage::stm_type> _transactional_stm_type;
};

// Builds up a key_offset_map for a segment, starting from the offset
// start_offset_inclusive. Intended to be used for chunked compaction,
// in which it is expected that the map will not be able to fit the entire
// key set of the segment at once due to memory constraints.
// As many keys as possible will be added to the map from start_offset_inclusive
// onwards until the capacity limit is reached, or the end of the segment is
// reached.
//
// end_of_stream() returns a bool value indicating whether the segment was fully
// indexed or not.
class map_building_reducer : public compaction_reducer {
public:
explicit map_building_reducer(
key_offset_map* map, model::offset start_offset_inclusive)
: _map(map)
, _start_offset(start_offset_inclusive) {}

ss::future<ss::stop_iteration> operator()(model::record_batch);
bool end_of_stream() { return _fully_indexed_segment; }

private:
ss::future<ss::stop_iteration> maybe_index_record_in_map(
const model::record& r,
model::offset base_offset,
model::record_batch_type type,
bool is_control,
bool& fully_indexed_batch);

key_offset_map* _map;
model::offset _start_offset;
bool _fully_indexed_segment = true;
};

} // namespace storage::internal
Loading

0 comments on commit 88fb0d2

Please sign in to comment.