From bf208544930b3b8a9204f66251d636f6dd80019a Mon Sep 17 00:00:00 2001 From: Dmitriy Kruglov Date: Wed, 18 Dec 2024 19:55:31 +0100 Subject: [PATCH] fix(nemesis.py): abort nemesis on stress command failure Abort the nemesis flow early when a stress command fails, as continuing it is often invalid due to the cluster being in an unexpected state. For example, if the _prepare_test_table routine fails, subsequent steps that depend on the test table (or attempt disruptions on top of it) will also fail. This change adds a check for results of stress command triggered within the nemesis code, ensuring the nemesis halts early if the stress command is unsuccessful. Fixes: https://github.com/scylladb/scylla-cluster-tests/issues/8722 --- sdcm/exceptions.py | 4 ++++ sdcm/nemesis.py | 26 ++++++++++++++++++++++++++ sdcm/scylla_bench_thread.py | 4 ++-- 3 files changed, 32 insertions(+), 2 deletions(-) diff --git a/sdcm/exceptions.py b/sdcm/exceptions.py index eee8d58d22..6cfc15c189 100644 --- a/sdcm/exceptions.py +++ b/sdcm/exceptions.py @@ -87,3 +87,7 @@ class CapacityReservationError(Exception): class RaftTopologyCoordinatorNotFound(Exception): """Raise exception if no host id for raft topology was not found in group0 history""" + + +class NemesisStressFailure(Exception): + """Exception to be raised to stop Nemesis flow, if stress command failed""" diff --git a/sdcm/nemesis.py b/sdcm/nemesis.py index 6b813f7838..3eca47fb17 100644 --- a/sdcm/nemesis.py +++ b/sdcm/nemesis.py @@ -98,6 +98,7 @@ from sdcm.sct_events.nemesis import DisruptionEvent from sdcm.sct_events.system import InfoEvent, CoreDumpEvent from sdcm.sla.sla_tests import SlaTests +from sdcm.stress_thread import DockerBasedStressThread from sdcm.utils.aws_kms import AwsKms from sdcm.utils import cdc from sdcm.utils.adaptive_timeouts import adaptive_timeout, Operations @@ -155,6 +156,7 @@ AuditLogTestFailure, BootstrapStreamErrorFailure, QuotaConfigurationFailure, + NemesisStressFailure, ) from test_lib.compaction import CompactionStrategy, get_compaction_strategy, get_compaction_random_additional_params, \ get_gc_mode, GcMode, calculate_allowed_twcs_ttl_borders, get_table_compaction_info @@ -2075,6 +2077,20 @@ def _prepare_test_table(self, ks='keyspace1', table=None): cs_thread = self.tester.run_stress_thread( stress_cmd=stress_cmd, keyspace_name=ks, stop_test_on_failure=False, round_robin=True) cs_thread.verify_results() + self.stop_nemesis_on_stress_errors(cs_thread) + + def stop_nemesis_on_stress_errors(self, stress_thread: DockerBasedStressThread) -> None: + stress_results = super(stress_thread.__class__, stress_thread).get_results() + node_errors = {} + for node, result, event in stress_results: + if event.errors: + node_errors.setdefault(node.name, []).extend(event.errors) + + if len(node_errors) == len(stress_results): # stop only if stress command failed on all loaders + errors_str = ''.join(f" on node '{node_name}': {errors}\n" for node_name, errors in node_errors.items()) + raise NemesisStressFailure( + f"Aborting '{self.__class__.__name__}' nemesis as '{stress_thread.stress_cmd}' stress command failed " + f"with the following errors:\n{errors_str}") @scylla_versions(("5.2.rc0", None), ("2023.1.rc0", None)) def _truncate_cmd_timeout_suffix(self, truncate_timeout): # pylint: disable=no-self-use @@ -2121,6 +2137,7 @@ def disrupt_truncate_large_partition(self): bench_thread = self.tester.run_stress_thread( stress_cmd=stress_cmd, stop_test_on_failure=False) self.tester.verify_stress_thread(bench_thread) + self.stop_nemesis_on_stress_errors(bench_thread) # In order to workaround issue #4924 when truncate timeouts, we try to flush before truncate. with adaptive_timeout(Operations.FLUSH, self.target_node, timeout=HOUR_IN_SEC * 2): @@ -4248,6 +4265,7 @@ def _double_cluster_load(self, duration: int) -> None: stress_queue = self.tester.run_stress_thread( stress_cmd=self.tester.stress_cmd, stress_num=1, stats_aggregate_cmds=False, duration=duration) results = self.tester.get_stress_results(queue=stress_queue, store_results=False) + self.stop_nemesis_on_stress_errors(stress_queue) self.log.info(f"Double load results: {results}") @target_data_nodes @@ -4422,6 +4440,7 @@ def run_write_scylla_bench_load(write_cmd): regex=".*sstable - Error while linking SSTable.*filesystem error: stat failed: No such file or directory.*"): write_thread = self.tester.run_stress_thread(stress_cmd=write_cmd, stop_test_on_failure=False) self.tester.verify_stress_thread(write_thread) + self.stop_nemesis_on_stress_errors(write_thread) try: for i in range(2 if (aws_kms and kms_key_alias_name and enable_kms_key_rotation) else 1): @@ -4441,6 +4460,7 @@ def run_write_scylla_bench_load(write_cmd): " -iterations=1 -concurrency=10 -connection-count=10 -rows-per-request=10") read_thread = self.tester.run_stress_thread(stress_cmd=read_cmd, stop_test_on_failure=False) self.tester.verify_stress_thread(read_thread) + self.stop_nemesis_on_stress_errors(read_thread) # Rotate KMS key if enable_kms_key_rotation and aws_kms and kms_key_alias_name and i == 0: @@ -4466,6 +4486,7 @@ def run_write_scylla_bench_load(write_cmd): # ReRead data read_thread2 = self.tester.run_stress_thread(stress_cmd=read_cmd, stop_test_on_failure=False) self.tester.verify_stress_thread(read_thread2) + self.stop_nemesis_on_stress_errors(read_thread2) # ReWrite data making the sstables be rewritten run_write_scylla_bench_load(write_cmd) @@ -4474,6 +4495,7 @@ def run_write_scylla_bench_load(write_cmd): # ReRead data read_thread3 = self.tester.run_stress_thread(stress_cmd=read_cmd, stop_test_on_failure=False) self.tester.verify_stress_thread(read_thread3) + self.stop_nemesis_on_stress_errors(read_thread3) # Check that sstables of that table are not encrypted anymore check_encryption_fact(sstable_util, False) @@ -4740,6 +4762,7 @@ def _write_read_data_to_multi_dc_keyspace(self, datacenters: List[str]) -> None: f"-mode cql3 native compression=lz4 -rate threads=5 -pop seq=1..10000 -log interval=5" write_thread = self.tester.run_stress_thread(stress_cmd=write_cmd, round_robin=True, stop_test_on_failure=False) self.tester.verify_stress_thread(cs_thread_pool=write_thread) + self.stop_nemesis_on_stress_errors(write_thread) self._verify_multi_dc_keyspace_data(consistency_level="ALL") def _verify_multi_dc_keyspace_data(self, consistency_level: str = "ALL"): @@ -4748,6 +4771,7 @@ def _verify_multi_dc_keyspace_data(self, consistency_level: str = "ALL"): f"-pop seq=1..10000 -log interval=5" read_thread = self.tester.run_stress_thread(stress_cmd=read_cmd, round_robin=True, stop_test_on_failure=False) self.tester.verify_stress_thread(cs_thread_pool=read_thread) + self.stop_nemesis_on_stress_errors(read_thread) def _switch_to_network_replication_strategy(self, keyspaces: List[str]) -> None: """Switches replication strategy to NetworkTopology for given keyspaces. @@ -5161,6 +5185,7 @@ def _disrupt_toggle_audit(self, store: AuditStore): write_thread = self.tester.run_stress_thread( stress_cmd=write_cmd, round_robin=True, stop_test_on_failure=False) self.tester.verify_stress_thread(cs_thread_pool=write_thread) + self.stop_nemesis_on_stress_errors(write_thread) read_cmd = f"cassandra-stress read no-warmup cl=ONE n=1000 " \ f" -schema 'replication(strategy=NetworkTopologyStrategy,replication_factor=3)" \ f" keyspace={audit_keyspace}' -mode cql3 native -rate 'threads=1 throttle=1000/s'" \ @@ -5168,6 +5193,7 @@ def _disrupt_toggle_audit(self, store: AuditStore): read_thread = self.tester.run_stress_thread( stress_cmd=read_cmd, round_robin=True, stop_test_on_failure=False) self.tester.verify_stress_thread(cs_thread_pool=read_thread) + self.stop_nemesis_on_stress_errors(read_thread) InfoEvent(message='Verifying Audit table contents').publish() rows = audit.get_audit_log(from_datetime=audit_start, category="DML", limit_rows=1500) # filter out USE keyspace rows due to https://github.com/scylladb/scylla-enterprise/issues/3169 diff --git a/sdcm/scylla_bench_thread.py b/sdcm/scylla_bench_thread.py index cad7e9b911..408c56e73d 100644 --- a/sdcm/scylla_bench_thread.py +++ b/sdcm/scylla_bench_thread.py @@ -144,7 +144,7 @@ def verify_results(self): results = self.get_results() - for _, result in results: + for _, result, _ in results: if not result: # Silently skip if stress command threw an error, since it was already reported in _run_stress continue @@ -251,7 +251,7 @@ def _run_stress(self, loader, loader_idx, cpu_idx): # pylint: disable=too-many- except Exception as exc: # pylint: disable=broad-except # noqa: BLE001 self.configure_event_on_failure(stress_event=scylla_bench_event, exc=exc) - return loader, result + return loader, result, scylla_bench_event @classmethod def _parse_bench_summary(cls, lines):