Skip to content

Commit

Permalink
fix(nemesis.py): abort nemesis on stress command failure
Browse files Browse the repository at this point in the history
Abort the nemesis flow early when a stress command fails, as continuing it is
often invalid due to the cluster being in an unexpected state. For example, if
the _prepare_test_table routine fails, subsequent steps that depend on the test
table (or attempt disruptions on top of it) will also fail.

This change adds a check for results of stress command triggered within the nemesis
code, ensuring the nemesis halts early if the stress command is unsuccessful.

Fixes: #8722
  • Loading branch information
dimakr committed Dec 20, 2024
1 parent 78c864c commit 6de0904
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 2 deletions.
4 changes: 4 additions & 0 deletions sdcm/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,7 @@ class CapacityReservationError(Exception):

class RaftTopologyCoordinatorNotFound(Exception):
"""Raise exception if no host id for raft topology was not found in group0 history"""


class NemesisStressFailure(Exception):
"""Exception to be raised to stop Nemesis flow, if stress command failed"""
25 changes: 25 additions & 0 deletions sdcm/nemesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
from sdcm.sct_events.nemesis import DisruptionEvent
from sdcm.sct_events.system import InfoEvent, CoreDumpEvent
from sdcm.sla.sla_tests import SlaTests
from sdcm.stress_thread import DockerBasedStressThread
from sdcm.utils.aws_kms import AwsKms
from sdcm.utils import cdc
from sdcm.utils.adaptive_timeouts import adaptive_timeout, Operations
Expand Down Expand Up @@ -155,6 +156,7 @@
AuditLogTestFailure,
BootstrapStreamErrorFailure,
QuotaConfigurationFailure,
NemesisStressFailure,
)
from test_lib.compaction import CompactionStrategy, get_compaction_strategy, get_compaction_random_additional_params, \
get_gc_mode, GcMode, calculate_allowed_twcs_ttl_borders, get_table_compaction_info
Expand Down Expand Up @@ -2075,6 +2077,19 @@ def _prepare_test_table(self, ks='keyspace1', table=None):
cs_thread = self.tester.run_stress_thread(
stress_cmd=stress_cmd, keyspace_name=ks, stop_test_on_failure=False, round_robin=True)
cs_thread.verify_results()
self.stop_nemesis_on_stress_errors(cs_thread)

def stop_nemesis_on_stress_errors(self, stress_thread: DockerBasedStressThread) -> None:
stress_results = super(stress_thread.__class__, stress_thread).get_results()
node_errors = {}
for node, result, event in stress_results:
node_errors.setdefault(node.name, []).extend(event.errors)

if len(node_errors) == len(stress_results): # stop only if stress command failed on all loaders
errors_str = ''.join(f" on node '{node_name}': {errors}\n" for node_name, errors in node_errors.items())
raise NemesisStressFailure(
f"Aborting '{self.__class__.__name__}' nemesis as '{stress_thread.stress_cmd}' stress command failed "
f"with the following errors:\n{errors_str}")

@scylla_versions(("5.2.rc0", None), ("2023.1.rc0", None))
def _truncate_cmd_timeout_suffix(self, truncate_timeout): # pylint: disable=no-self-use
Expand Down Expand Up @@ -2121,6 +2136,7 @@ def disrupt_truncate_large_partition(self):
bench_thread = self.tester.run_stress_thread(
stress_cmd=stress_cmd, stop_test_on_failure=False)
self.tester.verify_stress_thread(bench_thread)
self.stop_nemesis_on_stress_errors(bench_thread)

# In order to workaround issue #4924 when truncate timeouts, we try to flush before truncate.
with adaptive_timeout(Operations.FLUSH, self.target_node, timeout=HOUR_IN_SEC * 2):
Expand Down Expand Up @@ -4248,6 +4264,7 @@ def _double_cluster_load(self, duration: int) -> None:
stress_queue = self.tester.run_stress_thread(
stress_cmd=self.tester.stress_cmd, stress_num=1, stats_aggregate_cmds=False, duration=duration)
results = self.tester.get_stress_results(queue=stress_queue, store_results=False)
self.stop_nemesis_on_stress_errors(stress_queue)
self.log.info(f"Double load results: {results}")

@target_data_nodes
Expand Down Expand Up @@ -4422,6 +4439,7 @@ def run_write_scylla_bench_load(write_cmd):
regex=".*sstable - Error while linking SSTable.*filesystem error: stat failed: No such file or directory.*"):
write_thread = self.tester.run_stress_thread(stress_cmd=write_cmd, stop_test_on_failure=False)
self.tester.verify_stress_thread(write_thread)
self.stop_nemesis_on_stress_errors(write_thread)

try:
for i in range(2 if (aws_kms and kms_key_alias_name and enable_kms_key_rotation) else 1):
Expand All @@ -4441,6 +4459,7 @@ def run_write_scylla_bench_load(write_cmd):
" -iterations=1 -concurrency=10 -connection-count=10 -rows-per-request=10")
read_thread = self.tester.run_stress_thread(stress_cmd=read_cmd, stop_test_on_failure=False)
self.tester.verify_stress_thread(read_thread)
self.stop_nemesis_on_stress_errors(read_thread)

# Rotate KMS key
if enable_kms_key_rotation and aws_kms and kms_key_alias_name and i == 0:
Expand All @@ -4466,6 +4485,7 @@ def run_write_scylla_bench_load(write_cmd):
# ReRead data
read_thread2 = self.tester.run_stress_thread(stress_cmd=read_cmd, stop_test_on_failure=False)
self.tester.verify_stress_thread(read_thread2)
self.stop_nemesis_on_stress_errors(read_thread2)

# ReWrite data making the sstables be rewritten
run_write_scylla_bench_load(write_cmd)
Expand All @@ -4474,6 +4494,7 @@ def run_write_scylla_bench_load(write_cmd):
# ReRead data
read_thread3 = self.tester.run_stress_thread(stress_cmd=read_cmd, stop_test_on_failure=False)
self.tester.verify_stress_thread(read_thread3)
self.stop_nemesis_on_stress_errors(read_thread3)

# Check that sstables of that table are not encrypted anymore
check_encryption_fact(sstable_util, False)
Expand Down Expand Up @@ -4740,6 +4761,7 @@ def _write_read_data_to_multi_dc_keyspace(self, datacenters: List[str]) -> None:
f"-mode cql3 native compression=lz4 -rate threads=5 -pop seq=1..10000 -log interval=5"
write_thread = self.tester.run_stress_thread(stress_cmd=write_cmd, round_robin=True, stop_test_on_failure=False)
self.tester.verify_stress_thread(cs_thread_pool=write_thread)
self.stop_nemesis_on_stress_errors(write_thread)
self._verify_multi_dc_keyspace_data(consistency_level="ALL")

def _verify_multi_dc_keyspace_data(self, consistency_level: str = "ALL"):
Expand All @@ -4748,6 +4770,7 @@ def _verify_multi_dc_keyspace_data(self, consistency_level: str = "ALL"):
f"-pop seq=1..10000 -log interval=5"
read_thread = self.tester.run_stress_thread(stress_cmd=read_cmd, round_robin=True, stop_test_on_failure=False)
self.tester.verify_stress_thread(cs_thread_pool=read_thread)
self.stop_nemesis_on_stress_errors(read_thread)

def _switch_to_network_replication_strategy(self, keyspaces: List[str]) -> None:
"""Switches replication strategy to NetworkTopology for given keyspaces.
Expand Down Expand Up @@ -5161,13 +5184,15 @@ def _disrupt_toggle_audit(self, store: AuditStore):
write_thread = self.tester.run_stress_thread(
stress_cmd=write_cmd, round_robin=True, stop_test_on_failure=False)
self.tester.verify_stress_thread(cs_thread_pool=write_thread)
self.stop_nemesis_on_stress_errors(write_thread)
read_cmd = f"cassandra-stress read no-warmup cl=ONE n=1000 " \
f" -schema 'replication(strategy=NetworkTopologyStrategy,replication_factor=3)" \
f" keyspace={audit_keyspace}' -mode cql3 native -rate 'threads=1 throttle=1000/s'" \
f" -pop seq=1..1000 -col 'n=FIXED(1) size=FIXED(128)' -log interval=5"
read_thread = self.tester.run_stress_thread(
stress_cmd=read_cmd, round_robin=True, stop_test_on_failure=False)
self.tester.verify_stress_thread(cs_thread_pool=read_thread)
self.stop_nemesis_on_stress_errors(read_thread)
InfoEvent(message='Verifying Audit table contents').publish()
rows = audit.get_audit_log(from_datetime=audit_start, category="DML", limit_rows=1500)
# filter out USE keyspace rows due to https://github.com/scylladb/scylla-enterprise/issues/3169
Expand Down
4 changes: 2 additions & 2 deletions sdcm/scylla_bench_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def verify_results(self):

results = self.get_results()

for _, result in results:
for _, result, _ in results:
if not result:
# Silently skip if stress command threw an error, since it was already reported in _run_stress
continue
Expand Down Expand Up @@ -251,7 +251,7 @@ def _run_stress(self, loader, loader_idx, cpu_idx): # pylint: disable=too-many-
except Exception as exc: # pylint: disable=broad-except # noqa: BLE001
self.configure_event_on_failure(stress_event=scylla_bench_event, exc=exc)

return loader, result
return loader, result, scylla_bench_event

@classmethod
def _parse_bench_summary(cls, lines):
Expand Down

0 comments on commit 6de0904

Please sign in to comment.