From 4f66591cb4e118b787aef8f870524db95be855c9 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Sun, 9 Feb 2025 19:41:06 -0500 Subject: [PATCH] `storage`: add `compaction_scheduling` fixture test --- src/v/storage/tests/BUILD | 1 + src/v/storage/tests/storage_e2e_test.cc | 115 ++++++++++++++++++++++++ 2 files changed, 116 insertions(+) diff --git a/src/v/storage/tests/BUILD b/src/v/storage/tests/BUILD index a169ffe4ff24..323d27472fc9 100644 --- a/src/v/storage/tests/BUILD +++ b/src/v/storage/tests/BUILD @@ -564,6 +564,7 @@ redpanda_cc_btest( "//src/v/test_utils:seastar_boost", "//src/v/utils:directory_walker", "//src/v/utils:to_string", + "//src/v/utils:tristate", "@boost//:test", "@fmt", "@seastar", diff --git a/src/v/storage/tests/storage_e2e_test.cc b/src/v/storage/tests/storage_e2e_test.cc index 3aaa6a558e58..feec5f4d9117 100644 --- a/src/v/storage/tests/storage_e2e_test.cc +++ b/src/v/storage/tests/storage_e2e_test.cc @@ -25,6 +25,7 @@ #include "reflection/adl.h" #include "storage/batch_cache.h" #include "storage/disk_log_impl.h" +#include "storage/log_housekeeping_meta.h" #include "storage/log_manager.h" #include "storage/log_reader.h" #include "storage/ntp_config.h" @@ -40,6 +41,7 @@ #include "test_utils/randoms.h" #include "test_utils/tmp_dir.h" #include "utils/directory_walker.h" +#include "utils/tristate.h" #include #include @@ -5521,3 +5523,116 @@ FIXTURE_TEST(dirty_and_closed_bytes_bookkeeping, storage_test_fixture) { check_dirty_and_closed_segment_bytes(log); } } + +FIXTURE_TEST(compaction_scheduling, storage_test_fixture) { + using log_manager_accessor = storage::testing_details::log_manager_accessor; + storage::log_manager mgr = make_log_manager(); + info("Configuration: {}", mgr.config()); + auto deferred = ss::defer([&mgr]() mutable { mgr.stop().get(); }); + std::vector> logs; + + using overrides_t = storage::ntp_config::default_overrides; + overrides_t ov; + ov.cleanup_policy_bitflags = model::cleanup_policy_bitflags::compaction; + ov.min_cleanable_dirty_ratio = tristate{0.2}; + + for (const auto& topic : {"tapioca", "cassava", "kudzu"}) { + auto ntp = model::ntp("kafka", topic, 0); + auto log + = mgr + .manage(storage::ntp_config( + ntp, mgr.config().base_dir, std::make_unique(ov))) + .get(); + logs.push_back(log); + } + + auto& meta_list = log_manager_accessor::logs_list(mgr); + + using bflags = storage::log_housekeeping_meta::bitflags; + + static constexpr auto is_set = [](bflags var, auto flag) { + return (var & flag) == flag; + }; + + // Floating point comparison tolerance + static constexpr auto tol = 1.0e-6; + + auto append_and_force_roll = [this](auto& log, int num_batches = 10) { + auto headers = append_random_batches( + log, num_batches); + log->force_roll(ss::default_priority_class()).get(); + }; + + // Attempt a housekeeping scan with no partitions to compact + log_manager_accessor::housekeeping_scan(mgr).get(); + + for (const auto& meta : meta_list) { + BOOST_REQUIRE(is_set(meta.flags, bflags::lifetime_checked)); + BOOST_REQUIRE(!is_set(meta.flags, bflags::compacted)); + } + + // Append batches and force roll with first log- it should be the only one + // compacted + append_and_force_roll(logs[0], 30); + BOOST_REQUIRE_CLOSE(logs[0]->dirty_ratio(), 1.0, tol); + + log_manager_accessor::housekeeping_scan(mgr).get(); + + for (const auto& meta : meta_list) { + bool expect_compacted = meta.handle->config().ntp() + == logs[0]->config().ntp(); + BOOST_REQUIRE(is_set(meta.flags, bflags::lifetime_checked)); + BOOST_REQUIRE(is_set(meta.flags, bflags::compaction_checked)); + BOOST_REQUIRE_EQUAL( + expect_compacted, is_set(meta.flags, bflags::compacted)); + auto batches = read_and_validate_all_batches(logs[0]); + linear_int_kv_batch_generator::validate_post_compaction( + std::move(batches)); + } + + // Append fewer batches and force roll with second log- it should be the + // only one compacted + append_and_force_roll(logs[1], 20); + BOOST_REQUIRE_CLOSE(logs[1]->dirty_ratio(), 1.0, tol); + + log_manager_accessor::housekeeping_scan(mgr).get(); + + for (const auto& meta : meta_list) { + bool expect_compacted = meta.handle->config().ntp() + == logs[1]->config().ntp(); + BOOST_REQUIRE(is_set(meta.flags, bflags::lifetime_checked)); + BOOST_REQUIRE(is_set(meta.flags, bflags::compaction_checked)); + BOOST_REQUIRE_EQUAL( + expect_compacted, is_set(meta.flags, bflags::compacted)); + auto batches = read_and_validate_all_batches(logs[1]); + linear_int_kv_batch_generator::validate_post_compaction( + std::move(batches)); + } + + // Append batches and force roll all logs- all of them will be compacted + for (auto& log : logs) { + append_and_force_roll(log, 10); + } + + BOOST_REQUIRE_GE(logs[0]->dirty_ratio(), 1.0 / 3.0); + BOOST_REQUIRE_GE(logs[1]->dirty_ratio(), 1.0 / 2.0); + BOOST_REQUIRE_CLOSE(logs[2]->dirty_ratio(), 1.0, tol); + + log_manager_accessor::housekeeping_scan(mgr).get(); + + // Logs in the meta list will be ordered w/r/t their dirty ratio + // (descending) post compaction + auto log_it = logs.rbegin(); + for (const auto& meta : meta_list) { + BOOST_REQUIRE(is_set(meta.flags, bflags::lifetime_checked)); + BOOST_REQUIRE(is_set(meta.flags, bflags::compaction_checked)); + BOOST_REQUIRE(is_set(meta.flags, bflags::compacted)); + BOOST_REQUIRE_EQUAL( + meta.handle->config().ntp(), (*log_it)->config().ntp()); + ++log_it; + } + + for (auto& log : logs) { + auto batches = read_and_validate_all_batches(log); + } +}