diff --git a/tests/rptest/services/failure_injector.py b/tests/rptest/services/failure_injector.py index bdf7854d8770f..5a4416159a9aa 100644 --- a/tests/rptest/services/failure_injector.py +++ b/tests/rptest/services/failure_injector.py @@ -9,6 +9,7 @@ import random import signal +import time import threading from ducktape.utils.util import wait_until from ducktape.errors import TimeoutError @@ -63,7 +64,7 @@ def as_tuple(self): class FailureInjectorBase: def __init__(self, redpanda): self.redpanda = redpanda - self._in_flight = set() + self._in_flight = {} def __enter__(self): return self @@ -99,7 +100,13 @@ def inject_failure(self, spec): def cleanup(): try: - self._in_flight.remove(spec) + self.redpanda.logger.debug( + f"Timed cleanup started, _in_flight={self._in_flight}" + ) + run_time = self._in_flight.pop(spec) + self.redpanda.logger.debug( + f"Timed cleanup dequeued, spec={spec}, would be run time={run_time}, _in_flight={self._in_flight}" + ) except KeyError: # The stop timers may outlive the test, handle case # where they run after we already had a heal_all call. @@ -108,13 +115,29 @@ def cleanup(): ) else: self._stop_func(spec.type)(spec.node) + self.redpanda.logger.debug( + f"Timed cleanup complete spec={spec}, _in_flight={self._in_flight}" + ) stop_timer = threading.Timer(function=cleanup, args=[], interval=spec.length) - self._in_flight.add(spec) + self._in_flight[spec] = self.now() + spec.length + self.redpanda.logger.debug( + f"Timed cleanup scheduled spec={spec}, run_time={self._in_flight[spec]}, _in_flight={self._in_flight}" + ) stop_timer.start() + def cnt_in_flight(self): + return len(self._in_flight) + + def time_till_next_recovery(self): + return min(run_time for spec, run_time in self._in_flight) - self.now() + + @classmethod + def now(cls): + return time.clock_gettime(time.CLOCK_THREAD_CPUTIME_ID) + def _start_func(self, tp): if tp == FailureSpec.FAILURE_KILL: return self._kill @@ -255,31 +278,45 @@ def _heal_all(self): # Cleanups can fail, e.g. rule does not exist self.redpanda.logger.warn(f"_heal_all: {e}") + self.redpanda.logger.debug( + f"before _heal_all _in_flight={self._in_flight}") self._in_flight = { - spec - for spec in self._in_flight + spec: run_time + for spec, run_time in self._in_flight.items() if spec.type != FailureSpec.FAILURE_ISOLATE } + self.redpanda.logger.debug( + f"after _heal_all _in_flight={self._in_flight}") def _continue_all(self): self.redpanda.logger.info(f"continuing execution on all nodes") for n in self.redpanda.nodes: if self.redpanda.check_node(n): self._continue(n) + self.redpanda.logger.debug( + f"before _continue_all _in_flight={self._in_flight}") self._in_flight = { - spec - for spec in self._in_flight + spec: run_time + for spec, run_time in self._in_flight.items() if spec.type != FailureSpec.FAILURE_SUSPEND } + self.redpanda.logger.debug( + f"after _continue_all _in_flight={self._in_flight}") def _undo_all(self): self.redpanda.logger.info(f"running scheduled undos earlier") + self.redpanda.logger.debug( + f"before _undo_all _in_flight={self._in_flight}") while self._in_flight: try: - spec = self._in_flight.pop() + spec, run_time = self._in_flight.popitem() + self.redpanda.logger.debug( + f"_undo_all popped spec={spec}, planned run time={run_time} _in_flight={self._in_flight}" + ) except KeyError: pass # timer just emptied the set: GIL off??? else: + self.redpanda.logger.debug(f"_undo_all stopping spec={spec}") self._stop_func(spec.type)(spec.node) def _suspend(self, node): diff --git a/tests/rptest/tests/data_migrations_api_test.py b/tests/rptest/tests/data_migrations_api_test.py index c4ff9f32bd4f3..39c9f7751df5a 100644 --- a/tests/rptest/tests/data_migrations_api_test.py +++ b/tests/rptest/tests/data_migrations_api_test.py @@ -305,7 +305,8 @@ def test_mount_inexistent(self): topic = TopicSpec(partition_count=3) admin = Admin(self.redpanda) - with Finjector(self.redpanda, self.scale).finj_thread(): + with Finjector(self.redpanda, self.scale, + max_concurrent_failures=1).finj_thread(): in_migration = InboundDataMigration(topics=[ InboundTopic(make_namespaced_topic(topic.name), alias=None) ], @@ -352,7 +353,8 @@ def test_creating_and_listing_migrations(self): assert len(migrations_map) == 0, "There should be no data migrations" - with Finjector(self.redpanda, self.scale).finj_thread(): + with Finjector(self.redpanda, self.scale, + max_concurrent_failures=1).finj_thread(): # out outbound_topics = [make_namespaced_topic(t.name) for t in topics] out_migration = OutboundDataMigration(outbound_topics, @@ -463,7 +465,8 @@ def test_higher_level_migration_api(self): producer.begin_transaction() producer.produce(topics[0].name, key="key2", value="value2") - with Finjector(self.redpanda, self.scale).finj_thread(): + with Finjector(self.redpanda, self.scale, + max_concurrent_failures=1).finj_thread(): # out outbound_topics = [make_namespaced_topic(t.name) for t in topics] reply = self.admin.unmount_topics(outbound_topics).json() diff --git a/tests/rptest/tests/e2e_finjector.py b/tests/rptest/tests/e2e_finjector.py index b89ce85b766f1..cc12933286296 100644 --- a/tests/rptest/tests/e2e_finjector.py +++ b/tests/rptest/tests/e2e_finjector.py @@ -9,6 +9,7 @@ from contextlib import contextmanager import random +import sys import time import threading @@ -35,7 +36,7 @@ class Finjector: # we cannot guarantee start idempotency LOG_ALLOW_LIST = ["failed to lock pidfile. already locked"] - def __init__(self, redpanda, scale): + def __init__(self, redpanda, scale, **kwargs): self.redpanda = redpanda self.enable_manual = False self.enable_loop = False @@ -45,6 +46,8 @@ def __init__(self, redpanda, scale): self.allowed_nodes_provider = lambda f_type: self.redpanda.nodes self.allowed_failures = FailureSpec.FAILURE_TYPES self.custom_failures = [] + self.max_concurrent_failures = sys.maxsize + self.configure_finjector(**kwargs) def add_failure_spec(self, fspec): self.custom_failures.append(fspec) @@ -52,13 +55,16 @@ def add_failure_spec(self, fspec): def configure_finjector(self, allowed_failures=None, length_provider=None, - delay_provider=None): + delay_provider=None, + max_concurrent_failures=None): if allowed_failures: allowed_failures = allowed_failures if length_provider: self.failure_length_provider = length_provider if delay_provider: self.failure_delay_provier = delay_provider + if max_concurrent_failures is not None: + self.max_concurrent_failures = max_concurrent_failures @contextmanager def finj_thread(self): @@ -72,15 +78,16 @@ def finj_thread(self): try: assert not self.enable_manual and not self.enable_loop self.enable_loop = True + f_injector = make_failure_injector(self.redpanda) self.finjector_thread = threading.Thread( - target=self._failure_injector_loop, args=()) + target=self._failure_injector_loop, args=(f_injector)) self.finjector_thread.start() yield finally: self.enable_loop = False if self.finjector_thread: self.finjector_thread.join() - self._cleanup() + self._cleanup(f_injector) @contextmanager def finj_manual(self): @@ -92,17 +99,18 @@ def finj_manual(self): :return: a callable with a single failure spec argument """ + f_injector = make_failure_injector(self.redpanda) try: assert not self.enable_manual and not self.enable_loop self.enable_manual = True def callable(spec): - return self.inject_failure(spec) + return self.inject_failure(f_injector, spec) yield callable finally: self.enable_manual = False - self._cleanup() + self._cleanup(f_injector) def random_failure_spec(self): f_type = random.choice(self.allowed_failures) @@ -111,9 +119,8 @@ def random_failure_spec(self): return FailureSpec(node=node, type=f_type, length=length) - def inject_failure(self, spec): + def inject_failure(self, f_injector, spec): assert self.enable_manual or self.enable_loop - f_injector = make_failure_injector(self.redpanda) f_injector.inject_failure(spec) def _next_failure(self): @@ -122,23 +129,19 @@ def _next_failure(self): else: return self.random_failure_spec() - def _failure_injector_loop(self): + def _failure_injector_loop(self, f_injector): while self.enable_loop: - f_injector = make_failure_injector(self.redpanda) - f_injector.inject_failure(self._next_failure()) + failure = self._next_failure() + f_injector.inject_failure(failure) delay = self.failure_delay_provier() + if f_injector.cnt_in_flight() >= self.max_concurrent_failures: + delay = max(delay, f_injector.time_till_next_recovery) self.redpanda.logger.info( f"waiting {delay} seconds before next failure") time.sleep(delay) - def _cleanup(self): - make_failure_injector(self.redpanda)._heal_all() - make_failure_injector(self.redpanda)._continue_all() - make_failure_injector(self.redpanda)._undo_all() - - -class EndToEndFinjectorTest(EndToEndTest): - def __init__(self, test_context): - super(EndToEndFinjectorTest, self).__init__(test_context=test_context) - self.finjector = Finjector(self.redpanda, test_context) + def _cleanup(self, f_injector): + f_injector._heal_all() + f_injector._continue_all() + f_injector._undo_all()