-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(nemesis.py): abort nemesis on stress command failure #9585
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -98,6 +98,7 @@ | |
from sdcm.sct_events.nemesis import DisruptionEvent | ||
from sdcm.sct_events.system import InfoEvent, CoreDumpEvent | ||
from sdcm.sla.sla_tests import SlaTests | ||
from sdcm.stress_thread import DockerBasedStressThread | ||
from sdcm.utils.aws_kms import AwsKms | ||
from sdcm.utils import cdc | ||
from sdcm.utils.adaptive_timeouts import adaptive_timeout, Operations | ||
|
@@ -155,6 +156,7 @@ | |
AuditLogTestFailure, | ||
BootstrapStreamErrorFailure, | ||
QuotaConfigurationFailure, | ||
NemesisStressFailure, | ||
) | ||
from test_lib.compaction import CompactionStrategy, get_compaction_strategy, get_compaction_random_additional_params, \ | ||
get_gc_mode, GcMode, calculate_allowed_twcs_ttl_borders, get_table_compaction_info | ||
|
@@ -2075,6 +2077,20 @@ def _prepare_test_table(self, ks='keyspace1', table=None): | |
cs_thread = self.tester.run_stress_thread( | ||
stress_cmd=stress_cmd, keyspace_name=ks, stop_test_on_failure=False, round_robin=True) | ||
cs_thread.verify_results() | ||
self.stop_nemesis_on_stress_errors(cs_thread) | ||
|
||
def stop_nemesis_on_stress_errors(self, stress_thread: DockerBasedStressThread) -> None: | ||
stress_results = super(stress_thread.__class__, stress_thread).get_results() | ||
node_errors = {} | ||
for node, result, event in stress_results: | ||
if event.errors: | ||
node_errors.setdefault(node.name, []).extend(event.errors) | ||
|
||
if len(node_errors) == len(stress_results): # stop only if stress command failed on all loaders | ||
errors_str = ''.join(f" on node '{node_name}': {errors}\n" for node_name, errors in node_errors.items()) | ||
raise NemesisStressFailure( | ||
f"Aborting '{self.__class__.__name__}' nemesis as '{stress_thread.stress_cmd}' stress command failed " | ||
f"with the following errors:\n{errors_str}") | ||
|
||
@scylla_versions(("5.2.rc0", None), ("2023.1.rc0", None)) | ||
def _truncate_cmd_timeout_suffix(self, truncate_timeout): # pylint: disable=no-self-use | ||
|
@@ -2121,6 +2137,7 @@ def disrupt_truncate_large_partition(self): | |
bench_thread = self.tester.run_stress_thread( | ||
stress_cmd=stress_cmd, stop_test_on_failure=False) | ||
self.tester.verify_stress_thread(bench_thread) | ||
self.stop_nemesis_on_stress_errors(bench_thread) | ||
|
||
# In order to workaround issue #4924 when truncate timeouts, we try to flush before truncate. | ||
with adaptive_timeout(Operations.FLUSH, self.target_node, timeout=HOUR_IN_SEC * 2): | ||
|
@@ -4248,6 +4265,7 @@ def _double_cluster_load(self, duration: int) -> None: | |
stress_queue = self.tester.run_stress_thread( | ||
stress_cmd=self.tester.stress_cmd, stress_num=1, stats_aggregate_cmds=False, duration=duration) | ||
results = self.tester.get_stress_results(queue=stress_queue, store_results=False) | ||
self.stop_nemesis_on_stress_errors(stress_queue) | ||
self.log.info(f"Double load results: {results}") | ||
|
||
@target_data_nodes | ||
|
@@ -4422,6 +4440,7 @@ def run_write_scylla_bench_load(write_cmd): | |
regex=".*sstable - Error while linking SSTable.*filesystem error: stat failed: No such file or directory.*"): | ||
write_thread = self.tester.run_stress_thread(stress_cmd=write_cmd, stop_test_on_failure=False) | ||
self.tester.verify_stress_thread(write_thread) | ||
self.stop_nemesis_on_stress_errors(write_thread) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here we run the same command on each of the existing loaders. Probably it is the case for some other nemesis. It runs a single command on multiple loaders just because of the default parameter So, probably we should have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed the check in the new method so that a nemesis would be aborted if all 'n_loaders |
||
|
||
try: | ||
for i in range(2 if (aws_kms and kms_key_alias_name and enable_kms_key_rotation) else 1): | ||
|
@@ -4441,6 +4460,7 @@ def run_write_scylla_bench_load(write_cmd): | |
" -iterations=1 -concurrency=10 -connection-count=10 -rows-per-request=10") | ||
read_thread = self.tester.run_stress_thread(stress_cmd=read_cmd, stop_test_on_failure=False) | ||
self.tester.verify_stress_thread(read_thread) | ||
self.stop_nemesis_on_stress_errors(read_thread) | ||
|
||
# Rotate KMS key | ||
if enable_kms_key_rotation and aws_kms and kms_key_alias_name and i == 0: | ||
|
@@ -4466,6 +4486,7 @@ def run_write_scylla_bench_load(write_cmd): | |
# ReRead data | ||
read_thread2 = self.tester.run_stress_thread(stress_cmd=read_cmd, stop_test_on_failure=False) | ||
self.tester.verify_stress_thread(read_thread2) | ||
self.stop_nemesis_on_stress_errors(read_thread2) | ||
|
||
# ReWrite data making the sstables be rewritten | ||
run_write_scylla_bench_load(write_cmd) | ||
|
@@ -4474,6 +4495,7 @@ def run_write_scylla_bench_load(write_cmd): | |
# ReRead data | ||
read_thread3 = self.tester.run_stress_thread(stress_cmd=read_cmd, stop_test_on_failure=False) | ||
self.tester.verify_stress_thread(read_thread3) | ||
self.stop_nemesis_on_stress_errors(read_thread3) | ||
|
||
# Check that sstables of that table are not encrypted anymore | ||
check_encryption_fact(sstable_util, False) | ||
|
@@ -4740,6 +4762,7 @@ def _write_read_data_to_multi_dc_keyspace(self, datacenters: List[str]) -> None: | |
f"-mode cql3 native compression=lz4 -rate threads=5 -pop seq=1..10000 -log interval=5" | ||
write_thread = self.tester.run_stress_thread(stress_cmd=write_cmd, round_robin=True, stop_test_on_failure=False) | ||
self.tester.verify_stress_thread(cs_thread_pool=write_thread) | ||
self.stop_nemesis_on_stress_errors(write_thread) | ||
self._verify_multi_dc_keyspace_data(consistency_level="ALL") | ||
|
||
def _verify_multi_dc_keyspace_data(self, consistency_level: str = "ALL"): | ||
|
@@ -4748,6 +4771,7 @@ def _verify_multi_dc_keyspace_data(self, consistency_level: str = "ALL"): | |
f"-pop seq=1..10000 -log interval=5" | ||
read_thread = self.tester.run_stress_thread(stress_cmd=read_cmd, round_robin=True, stop_test_on_failure=False) | ||
self.tester.verify_stress_thread(cs_thread_pool=read_thread) | ||
self.stop_nemesis_on_stress_errors(read_thread) | ||
|
||
def _switch_to_network_replication_strategy(self, keyspaces: List[str]) -> None: | ||
"""Switches replication strategy to NetworkTopology for given keyspaces. | ||
|
@@ -5161,13 +5185,15 @@ def _disrupt_toggle_audit(self, store: AuditStore): | |
write_thread = self.tester.run_stress_thread( | ||
stress_cmd=write_cmd, round_robin=True, stop_test_on_failure=False) | ||
self.tester.verify_stress_thread(cs_thread_pool=write_thread) | ||
self.stop_nemesis_on_stress_errors(write_thread) | ||
read_cmd = f"cassandra-stress read no-warmup cl=ONE n=1000 " \ | ||
f" -schema 'replication(strategy=NetworkTopologyStrategy,replication_factor=3)" \ | ||
f" keyspace={audit_keyspace}' -mode cql3 native -rate 'threads=1 throttle=1000/s'" \ | ||
f" -pop seq=1..1000 -col 'n=FIXED(1) size=FIXED(128)' -log interval=5" | ||
read_thread = self.tester.run_stress_thread( | ||
stress_cmd=read_cmd, round_robin=True, stop_test_on_failure=False) | ||
self.tester.verify_stress_thread(cs_thread_pool=read_thread) | ||
self.stop_nemesis_on_stress_errors(read_thread) | ||
InfoEvent(message='Verifying Audit table contents').publish() | ||
rows = audit.get_audit_log(from_datetime=audit_start, category="DML", limit_rows=1500) | ||
# filter out USE keyspace rows due to https://github.com/scylladb/scylla-enterprise/issues/3169 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -144,7 +144,7 @@ def verify_results(self): | |
|
||
results = self.get_results() | ||
|
||
for _, result in results: | ||
for _, result, _ in results: | ||
if not result: | ||
# Silently skip if stress command threw an error, since it was already reported in _run_stress | ||
continue | ||
|
@@ -251,7 +251,7 @@ def _run_stress(self, loader, loader_idx, cpu_idx): # pylint: disable=too-many- | |
except Exception as exc: # pylint: disable=broad-except # noqa: BLE001 | ||
self.configure_event_on_failure(stress_event=scylla_bench_event, exc=exc) | ||
|
||
return loader, result | ||
return loader, result, scylla_bench_event | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we need to align this on all stress commands ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This new I'm not sure, if/which stress commands are also triggered from nemesis.py. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think regardless of what is being used right now, it would be good to align them to return identically from this function. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree, would be less confusion if we had single return type for all cases. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've just set down with @dimakr, and we agreed that there would be a follow up for this change that would incorporate |
||
|
||
@classmethod | ||
def _parse_bench_summary(cls, lines): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the super call needed here ? What are we trying to skip ? And why ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to directly call the
get_results
method to get all results from threads. Some command specificget_results
implementation would filter out failed commands. E.g. in c-s theget_results
performs:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit specific c-s, we should at least put a comment explaining it, it's quite unusual to skip up the MRO like that, I barely understand the reason it's c-s is doing that, let's at least point to the reason we jump it.