Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(nemesis.py): abort nemesis on stress command failure #9585

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdcm/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,7 @@ class CapacityReservationError(Exception):

class RaftTopologyCoordinatorNotFound(Exception):
"""Raise exception if no host id for raft topology was not found in group0 history"""


class NemesisStressFailure(Exception):
"""Exception to be raised to stop Nemesis flow, if stress command failed"""
26 changes: 26 additions & 0 deletions sdcm/nemesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
from sdcm.sct_events.nemesis import DisruptionEvent
from sdcm.sct_events.system import InfoEvent, CoreDumpEvent
from sdcm.sla.sla_tests import SlaTests
from sdcm.stress_thread import DockerBasedStressThread
from sdcm.utils.aws_kms import AwsKms
from sdcm.utils import cdc
from sdcm.utils.adaptive_timeouts import adaptive_timeout, Operations
Expand Down Expand Up @@ -155,6 +156,7 @@
AuditLogTestFailure,
BootstrapStreamErrorFailure,
QuotaConfigurationFailure,
NemesisStressFailure,
)
from test_lib.compaction import CompactionStrategy, get_compaction_strategy, get_compaction_random_additional_params, \
get_gc_mode, GcMode, calculate_allowed_twcs_ttl_borders, get_table_compaction_info
Expand Down Expand Up @@ -2075,6 +2077,20 @@ def _prepare_test_table(self, ks='keyspace1', table=None):
cs_thread = self.tester.run_stress_thread(
stress_cmd=stress_cmd, keyspace_name=ks, stop_test_on_failure=False, round_robin=True)
cs_thread.verify_results()
self.stop_nemesis_on_stress_errors(cs_thread)

def stop_nemesis_on_stress_errors(self, stress_thread: DockerBasedStressThread) -> None:
stress_results = super(stress_thread.__class__, stress_thread).get_results()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the super call needed here ? What are we trying to skip ? And why ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to directly call the get_results method to get all results from threads. Some command specific get_results implementation would filter out failed commands. E.g. in c-s the get_results performs:

    def get_results(self) -> list[dict | None]:
        ret = []
        results = super().get_results()

        for _, result, event in results:
            if not result:
                # Silently skip if stress command threw error, since it was already reported in _run_stress
                continue
...
        return ret

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit specific c-s, we should at least put a comment explaining it, it's quite unusual to skip up the MRO like that, I barely understand the reason it's c-s is doing that, let's at least point to the reason we jump it.

node_errors = {}
for node, result, event in stress_results:
if event.errors:
node_errors.setdefault(node.name, []).extend(event.errors)

if len(node_errors) == len(stress_results): # stop only if stress command failed on all loaders
errors_str = ''.join(f" on node '{node_name}': {errors}\n" for node_name, errors in node_errors.items())
raise NemesisStressFailure(
f"Aborting '{self.__class__.__name__}' nemesis as '{stress_thread.stress_cmd}' stress command failed "
f"with the following errors:\n{errors_str}")

@scylla_versions(("5.2.rc0", None), ("2023.1.rc0", None))
def _truncate_cmd_timeout_suffix(self, truncate_timeout): # pylint: disable=no-self-use
Expand Down Expand Up @@ -2121,6 +2137,7 @@ def disrupt_truncate_large_partition(self):
bench_thread = self.tester.run_stress_thread(
stress_cmd=stress_cmd, stop_test_on_failure=False)
self.tester.verify_stress_thread(bench_thread)
self.stop_nemesis_on_stress_errors(bench_thread)

# In order to workaround issue #4924 when truncate timeouts, we try to flush before truncate.
with adaptive_timeout(Operations.FLUSH, self.target_node, timeout=HOUR_IN_SEC * 2):
Expand Down Expand Up @@ -4248,6 +4265,7 @@ def _double_cluster_load(self, duration: int) -> None:
stress_queue = self.tester.run_stress_thread(
stress_cmd=self.tester.stress_cmd, stress_num=1, stats_aggregate_cmds=False, duration=duration)
results = self.tester.get_stress_results(queue=stress_queue, store_results=False)
self.stop_nemesis_on_stress_errors(stress_queue)
self.log.info(f"Double load results: {results}")

@target_data_nodes
Expand Down Expand Up @@ -4422,6 +4440,7 @@ def run_write_scylla_bench_load(write_cmd):
regex=".*sstable - Error while linking SSTable.*filesystem error: stat failed: No such file or directory.*"):
write_thread = self.tester.run_stress_thread(stress_cmd=write_cmd, stop_test_on_failure=False)
self.tester.verify_stress_thread(write_thread)
self.stop_nemesis_on_stress_errors(write_thread)
Copy link
Contributor

@vponomaryov vponomaryov Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we run the same command on each of the existing loaders.
In this case we are totally ok to get n_loaders - 1 failures.
We need result at least from some of them.

Probably it is the case for some other nemesis.

It runs a single command on multiple loaders just because of the default parameter round_robin=False defined in the self.tester.run_stress_thread.

So, probably we should have some passes and some fails checks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the check in the new method so that a nemesis would be aborted if all 'n_loadersstress commands failed. Checked this for both cases -round_robin` enabled and disabled.


try:
for i in range(2 if (aws_kms and kms_key_alias_name and enable_kms_key_rotation) else 1):
Expand All @@ -4441,6 +4460,7 @@ def run_write_scylla_bench_load(write_cmd):
" -iterations=1 -concurrency=10 -connection-count=10 -rows-per-request=10")
read_thread = self.tester.run_stress_thread(stress_cmd=read_cmd, stop_test_on_failure=False)
self.tester.verify_stress_thread(read_thread)
self.stop_nemesis_on_stress_errors(read_thread)

# Rotate KMS key
if enable_kms_key_rotation and aws_kms and kms_key_alias_name and i == 0:
Expand All @@ -4466,6 +4486,7 @@ def run_write_scylla_bench_load(write_cmd):
# ReRead data
read_thread2 = self.tester.run_stress_thread(stress_cmd=read_cmd, stop_test_on_failure=False)
self.tester.verify_stress_thread(read_thread2)
self.stop_nemesis_on_stress_errors(read_thread2)

# ReWrite data making the sstables be rewritten
run_write_scylla_bench_load(write_cmd)
Expand All @@ -4474,6 +4495,7 @@ def run_write_scylla_bench_load(write_cmd):
# ReRead data
read_thread3 = self.tester.run_stress_thread(stress_cmd=read_cmd, stop_test_on_failure=False)
self.tester.verify_stress_thread(read_thread3)
self.stop_nemesis_on_stress_errors(read_thread3)

# Check that sstables of that table are not encrypted anymore
check_encryption_fact(sstable_util, False)
Expand Down Expand Up @@ -4740,6 +4762,7 @@ def _write_read_data_to_multi_dc_keyspace(self, datacenters: List[str]) -> None:
f"-mode cql3 native compression=lz4 -rate threads=5 -pop seq=1..10000 -log interval=5"
write_thread = self.tester.run_stress_thread(stress_cmd=write_cmd, round_robin=True, stop_test_on_failure=False)
self.tester.verify_stress_thread(cs_thread_pool=write_thread)
self.stop_nemesis_on_stress_errors(write_thread)
self._verify_multi_dc_keyspace_data(consistency_level="ALL")

def _verify_multi_dc_keyspace_data(self, consistency_level: str = "ALL"):
Expand All @@ -4748,6 +4771,7 @@ def _verify_multi_dc_keyspace_data(self, consistency_level: str = "ALL"):
f"-pop seq=1..10000 -log interval=5"
read_thread = self.tester.run_stress_thread(stress_cmd=read_cmd, round_robin=True, stop_test_on_failure=False)
self.tester.verify_stress_thread(cs_thread_pool=read_thread)
self.stop_nemesis_on_stress_errors(read_thread)

def _switch_to_network_replication_strategy(self, keyspaces: List[str]) -> None:
"""Switches replication strategy to NetworkTopology for given keyspaces.
Expand Down Expand Up @@ -5161,13 +5185,15 @@ def _disrupt_toggle_audit(self, store: AuditStore):
write_thread = self.tester.run_stress_thread(
stress_cmd=write_cmd, round_robin=True, stop_test_on_failure=False)
self.tester.verify_stress_thread(cs_thread_pool=write_thread)
self.stop_nemesis_on_stress_errors(write_thread)
read_cmd = f"cassandra-stress read no-warmup cl=ONE n=1000 " \
f" -schema 'replication(strategy=NetworkTopologyStrategy,replication_factor=3)" \
f" keyspace={audit_keyspace}' -mode cql3 native -rate 'threads=1 throttle=1000/s'" \
f" -pop seq=1..1000 -col 'n=FIXED(1) size=FIXED(128)' -log interval=5"
read_thread = self.tester.run_stress_thread(
stress_cmd=read_cmd, round_robin=True, stop_test_on_failure=False)
self.tester.verify_stress_thread(cs_thread_pool=read_thread)
self.stop_nemesis_on_stress_errors(read_thread)
InfoEvent(message='Verifying Audit table contents').publish()
rows = audit.get_audit_log(from_datetime=audit_start, category="DML", limit_rows=1500)
# filter out USE keyspace rows due to https://github.com/scylladb/scylla-enterprise/issues/3169
Expand Down
4 changes: 2 additions & 2 deletions sdcm/scylla_bench_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def verify_results(self):

results = self.get_results()

for _, result in results:
for _, result, _ in results:
if not result:
# Silently skip if stress command threw an error, since it was already reported in _run_stress
continue
Expand Down Expand Up @@ -251,7 +251,7 @@ def _run_stress(self, loader, loader_idx, cpu_idx): # pylint: disable=too-many-
except Exception as exc: # pylint: disable=broad-except # noqa: BLE001
self.configure_event_on_failure(stress_event=scylla_bench_event, exc=exc)

return loader, result
return loader, result, scylla_bench_event
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to align this on all stress commands ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new stop_nemesis_on_stress_errors method is now applied to results of stress commands that are triggered from nemesis.py. There are 3 commands that are executed this way in nemesis code - c-s, s-b, and cql-stress-cassandra-stress.
All of them (with this^ change included) are returning tuple of (loader, result, event).

I'm not sure, if/which stress commands are also triggered from nemesis.py.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think regardless of what is being used right now, it would be good to align them to return identically from this function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, would be less confusion if we had single return type for all cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just set down with @dimakr, and we agreed that there would be a follow up for this change

that would incorporate stop_nemesis_on_stress_errors() into verify verify_results() and align the signatures of get_results() and verify_results()


@classmethod
def _parse_bench_summary(cls, lines):
Expand Down
Loading