Skip to content

Commit

Permalink
rptest: fix race-y checks in compaction_recovery_test
Browse files Browse the repository at this point in the history
This test can race with segment rolls and un-self-compacted segments,
since self compaction may not occur before `redpanda` version changes
and node restarts occur. Thus, breaking the expectations around the
compacted index `mtime()` stats.

To fix the race conditions, return the `partition` data used to evaluate
compaction as being finished, and use that to perform the equality/inequality
checks between compacted index `mtime()` values.

Also ensure that all segements present in the `partition` data that are produced
during a version epoch are self-compacted in that same epoch, per expectations
of the test.
  • Loading branch information
WillemKauf committed Feb 21, 2025
1 parent 5fe9fc9 commit 854b56a
Showing 1 changed file with 29 additions and 15 deletions.
44 changes: 29 additions & 15 deletions tests/rptest/tests/compaction_recovery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
from rptest.services.cluster import cluster
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST
from rptest.services.redpanda_installer import RedpandaInstaller
from rptest.services.redpanda import MetricsEndpoint
from ducktape.utils.util import wait_until
from rptest.util import wait_until_result

from rptest.clients.types import TopicSpec
from rptest.tests.redpanda_test import RedpandaTest
Expand Down Expand Up @@ -164,17 +166,26 @@ def produce_and_wait_for_compaction(node, count):
record_size=1024,
batch_size=2048)

def no_big_closed_segments():
def get_compacted_segments():
return self.redpanda.metric_sum(
metric_name=
"vectorized_storage_log_compacted_segment_total",
metrics_endpoint=MetricsEndpoint.METRICS,
topic=self.topic,
nodes=[node])

def finished_compaction():
partition = get_storage_partition(node)
return not any(
seg.base_index and seg.size > (self.SEGMENT_SIZE / 2)
for seg in partition.segments.values())

wait_until(no_big_closed_segments, timeout_sec=30, backoff_sec=2)
for seg in partition.segments.values(
)) and get_compacted_segments() >= count, partition

def get_closed_segment2mtime(node):
partition = get_storage_partition(node)
return wait_until_result(finished_compaction,
timeout_sec=30,
backoff_sec=2)

def get_closed_segment2mtime(partition):
seg2mtime = dict()
for sname, seg in partition.segments.items():
if seg.base_index:
Expand All @@ -191,8 +202,8 @@ def get_closed_segment2mtime(node):
to_restart = self.redpanda.nodes[0]
self.logger.info(f"will test node {to_restart.account.hostname}")

produce_and_wait_for_compaction(to_restart, 2)
seg2mtime_1 = get_closed_segment2mtime(to_restart)
partition = produce_and_wait_for_compaction(to_restart, 2)
seg2mtime_1 = get_closed_segment2mtime(partition)
assert len(seg2mtime_1) >= 2

self.installer.install([to_restart], next_version)
Expand All @@ -202,30 +213,33 @@ def get_closed_segment2mtime(node):
# After restart we produce and wait for the new segments to be compacted.
# Because redpanda compacts segments from the beginning, this means that
# all earlier segments have been either recovered or rebuilt after restart.
produce_and_wait_for_compaction(to_restart, 2)
seg2mtime_2 = get_closed_segment2mtime(to_restart)
partition = produce_and_wait_for_compaction(to_restart, 2)
seg2mtime_2 = get_closed_segment2mtime(partition)
assert len(seg2mtime_2) >= len(seg2mtime_1) + 2

for index in seg2mtime_1.keys():
# v1 compacted segments should be left intact
assert index in seg2mtime_2
assert seg2mtime_1[index] == seg2mtime_2[index]
assert seg2mtime_1[index] == seg2mtime_2[
index], f"Expected segment index {index} mtime {seg2mtime_2[index]} == {seg2mtime_1[index]}"

self.installer.install([to_restart], self.OLD_VERSION)
self.redpanda.restart_nodes([to_restart])
self.redpanda.wait_for_membership(first_start=False)

produce_and_wait_for_compaction(to_restart, 2)
seg2mtime_3 = get_closed_segment2mtime(to_restart)
partition = produce_and_wait_for_compaction(to_restart, 2)
seg2mtime_3 = get_closed_segment2mtime(partition)
assert len(seg2mtime_3) >= len(seg2mtime_2) + 2
for index in seg2mtime_2.keys():
assert index in seg2mtime_3
if index in seg2mtime_1:
# v1-compacted segments should not be rewritten
assert seg2mtime_3[index] == seg2mtime_2[index]
assert seg2mtime_3[index] == seg2mtime_2[
index], f"Expected segment index {index} mtime {seg2mtime_3[index]} == {seg2mtime_2[index]}"
else:
# old version should rebuild v2 indices and re-compact segments
assert seg2mtime_3[index] > seg2mtime_2[index]
assert seg2mtime_3[index] > seg2mtime_2[
index], f"Expected segment index {index} mtime {seg2mtime_3[index]} > {seg2mtime_2[index]}"

# Now that we are back at the old version, proceed to upgrade all the way to HEAD
remaining_versions = self.load_version_range(self.OLD_VERSION)[1:]
Expand Down

0 comments on commit 854b56a

Please sign in to comment.