Skip to content

Commit

Permalink
Merge pull request redpanda-data#23739 from bashtanov/chasten-finjector2
Browse files Browse the repository at this point in the history
Chasten finjector to keep redpanda available
  • Loading branch information
bashtanov authored Oct 15, 2024
2 parents 073aadc + 27301c2 commit 9d098a9
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 32 deletions.
53 changes: 45 additions & 8 deletions tests/rptest/services/failure_injector.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import random
import signal
import time
import threading
from ducktape.utils.util import wait_until
from ducktape.errors import TimeoutError
Expand Down Expand Up @@ -63,7 +64,7 @@ def as_tuple(self):
class FailureInjectorBase:
def __init__(self, redpanda):
self.redpanda = redpanda
self._in_flight = set()
self._in_flight = {}

def __enter__(self):
return self
Expand Down Expand Up @@ -99,7 +100,13 @@ def inject_failure(self, spec):

def cleanup():
try:
self._in_flight.remove(spec)
self.redpanda.logger.debug(
f"Timed cleanup started, _in_flight={self._in_flight}"
)
run_time = self._in_flight.pop(spec)
self.redpanda.logger.debug(
f"Timed cleanup dequeued, spec={spec}, would be run time={run_time}, _in_flight={self._in_flight}"
)
except KeyError:
# The stop timers may outlive the test, handle case
# where they run after we already had a heal_all call.
Expand All @@ -108,13 +115,29 @@ def cleanup():
)
else:
self._stop_func(spec.type)(spec.node)
self.redpanda.logger.debug(
f"Timed cleanup complete spec={spec}, _in_flight={self._in_flight}"
)

stop_timer = threading.Timer(function=cleanup,
args=[],
interval=spec.length)
self._in_flight.add(spec)
self._in_flight[spec] = self.now() + spec.length
self.redpanda.logger.debug(
f"Timed cleanup scheduled spec={spec}, run_time={self._in_flight[spec]}, _in_flight={self._in_flight}"
)
stop_timer.start()

def cnt_in_flight(self):
return len(self._in_flight)

def time_till_next_recovery(self):
return min(run_time for spec, run_time in self._in_flight) - self.now()

@classmethod
def now(cls):
return time.clock_gettime(time.CLOCK_THREAD_CPUTIME_ID)

def _start_func(self, tp):
if tp == FailureSpec.FAILURE_KILL:
return self._kill
Expand Down Expand Up @@ -255,31 +278,45 @@ def _heal_all(self):
# Cleanups can fail, e.g. rule does not exist
self.redpanda.logger.warn(f"_heal_all: {e}")

self.redpanda.logger.debug(
f"before _heal_all _in_flight={self._in_flight}")
self._in_flight = {
spec
for spec in self._in_flight
spec: run_time
for spec, run_time in self._in_flight.items()
if spec.type != FailureSpec.FAILURE_ISOLATE
}
self.redpanda.logger.debug(
f"after _heal_all _in_flight={self._in_flight}")

def _continue_all(self):
self.redpanda.logger.info(f"continuing execution on all nodes")
for n in self.redpanda.nodes:
if self.redpanda.check_node(n):
self._continue(n)
self.redpanda.logger.debug(
f"before _continue_all _in_flight={self._in_flight}")
self._in_flight = {
spec
for spec in self._in_flight
spec: run_time
for spec, run_time in self._in_flight.items()
if spec.type != FailureSpec.FAILURE_SUSPEND
}
self.redpanda.logger.debug(
f"after _continue_all _in_flight={self._in_flight}")

def _undo_all(self):
self.redpanda.logger.info(f"running scheduled undos earlier")
self.redpanda.logger.debug(
f"before _undo_all _in_flight={self._in_flight}")
while self._in_flight:
try:
spec = self._in_flight.pop()
spec, run_time = self._in_flight.popitem()
self.redpanda.logger.debug(
f"_undo_all popped spec={spec}, planned run time={run_time} _in_flight={self._in_flight}"
)
except KeyError:
pass # timer just emptied the set: GIL off???
else:
self.redpanda.logger.debug(f"_undo_all stopping spec={spec}")
self._stop_func(spec.type)(spec.node)

def _suspend(self, node):
Expand Down
9 changes: 6 additions & 3 deletions tests/rptest/tests/data_migrations_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ def test_mount_inexistent(self):
topic = TopicSpec(partition_count=3)
admin = Admin(self.redpanda)

with Finjector(self.redpanda, self.scale).finj_thread():
with Finjector(self.redpanda, self.scale,
max_concurrent_failures=1).finj_thread():
in_migration = InboundDataMigration(topics=[
InboundTopic(make_namespaced_topic(topic.name), alias=None)
],
Expand Down Expand Up @@ -352,7 +353,8 @@ def test_creating_and_listing_migrations(self):

assert len(migrations_map) == 0, "There should be no data migrations"

with Finjector(self.redpanda, self.scale).finj_thread():
with Finjector(self.redpanda, self.scale,
max_concurrent_failures=1).finj_thread():
# out
outbound_topics = [make_namespaced_topic(t.name) for t in topics]
out_migration = OutboundDataMigration(outbound_topics,
Expand Down Expand Up @@ -463,7 +465,8 @@ def test_higher_level_migration_api(self):
producer.begin_transaction()
producer.produce(topics[0].name, key="key2", value="value2")

with Finjector(self.redpanda, self.scale).finj_thread():
with Finjector(self.redpanda, self.scale,
max_concurrent_failures=1).finj_thread():
# out
outbound_topics = [make_namespaced_topic(t.name) for t in topics]
reply = self.admin.unmount_topics(outbound_topics).json()
Expand Down
45 changes: 24 additions & 21 deletions tests/rptest/tests/e2e_finjector.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from contextlib import contextmanager
import random
import sys
import time
import threading

Expand All @@ -35,7 +36,7 @@ class Finjector:
# we cannot guarantee start idempotency
LOG_ALLOW_LIST = ["failed to lock pidfile. already locked"]

def __init__(self, redpanda, scale):
def __init__(self, redpanda, scale, **kwargs):
self.redpanda = redpanda
self.enable_manual = False
self.enable_loop = False
Expand All @@ -45,20 +46,25 @@ def __init__(self, redpanda, scale):
self.allowed_nodes_provider = lambda f_type: self.redpanda.nodes
self.allowed_failures = FailureSpec.FAILURE_TYPES
self.custom_failures = []
self.max_concurrent_failures = sys.maxsize
self.configure_finjector(**kwargs)

def add_failure_spec(self, fspec):
self.custom_failures.append(fspec)

def configure_finjector(self,
allowed_failures=None,
length_provider=None,
delay_provider=None):
delay_provider=None,
max_concurrent_failures=None):
if allowed_failures:
allowed_failures = allowed_failures
if length_provider:
self.failure_length_provider = length_provider
if delay_provider:
self.failure_delay_provier = delay_provider
if max_concurrent_failures is not None:
self.max_concurrent_failures = max_concurrent_failures

@contextmanager
def finj_thread(self):
Expand All @@ -72,15 +78,16 @@ def finj_thread(self):
try:
assert not self.enable_manual and not self.enable_loop
self.enable_loop = True
f_injector = make_failure_injector(self.redpanda)
self.finjector_thread = threading.Thread(
target=self._failure_injector_loop, args=())
target=self._failure_injector_loop, args=(f_injector))
self.finjector_thread.start()
yield
finally:
self.enable_loop = False
if self.finjector_thread:
self.finjector_thread.join()
self._cleanup()
self._cleanup(f_injector)

@contextmanager
def finj_manual(self):
Expand All @@ -92,17 +99,18 @@ def finj_manual(self):
:return: a callable with a single failure spec argument
"""
f_injector = make_failure_injector(self.redpanda)
try:
assert not self.enable_manual and not self.enable_loop
self.enable_manual = True

def callable(spec):
return self.inject_failure(spec)
return self.inject_failure(f_injector, spec)

yield callable
finally:
self.enable_manual = False
self._cleanup()
self._cleanup(f_injector)

def random_failure_spec(self):
f_type = random.choice(self.allowed_failures)
Expand All @@ -111,9 +119,8 @@ def random_failure_spec(self):

return FailureSpec(node=node, type=f_type, length=length)

def inject_failure(self, spec):
def inject_failure(self, f_injector, spec):
assert self.enable_manual or self.enable_loop
f_injector = make_failure_injector(self.redpanda)
f_injector.inject_failure(spec)

def _next_failure(self):
Expand All @@ -122,23 +129,19 @@ def _next_failure(self):
else:
return self.random_failure_spec()

def _failure_injector_loop(self):
def _failure_injector_loop(self, f_injector):
while self.enable_loop:
f_injector = make_failure_injector(self.redpanda)
f_injector.inject_failure(self._next_failure())
failure = self._next_failure()
f_injector.inject_failure(failure)

delay = self.failure_delay_provier()
if f_injector.cnt_in_flight() >= self.max_concurrent_failures:
delay = max(delay, f_injector.time_till_next_recovery)
self.redpanda.logger.info(
f"waiting {delay} seconds before next failure")
time.sleep(delay)

def _cleanup(self):
make_failure_injector(self.redpanda)._heal_all()
make_failure_injector(self.redpanda)._continue_all()
make_failure_injector(self.redpanda)._undo_all()


class EndToEndFinjectorTest(EndToEndTest):
def __init__(self, test_context):
super(EndToEndFinjectorTest, self).__init__(test_context=test_context)
self.finjector = Finjector(self.redpanda, test_context)
def _cleanup(self, f_injector):
f_injector._heal_all()
f_injector._continue_all()
f_injector._undo_all()

0 comments on commit 9d098a9

Please sign in to comment.