Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(nemesis.py): abort nemesis on stress command failure #9585

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions sdcm/cassandra_harry_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ def _run_stress(self, loader, loader_idx, cpu_idx):

CassandraHarryEvent.start(node=loader, stress_cmd=self.stress_cmd).publish()

result = {}
harry_failure_event = harry_finish_event = None
with CassandraHarryStressExporter(instance_name=loader.ip_address,
metrics=nemesis_metrics_obj(),
stress_operation='write',
Expand All @@ -106,22 +108,23 @@ def _run_stress(self, loader, loader_idx, cpu_idx):
except Exception as exc: # pylint: disable=broad-except # noqa: BLE001
errors_str = format_stress_cmd_error(exc)
if "timeout" in errors_str:
event_type = CassandraHarryEvent.timeout
harry_failure_event = CassandraHarryEvent.timeout
elif self.stop_test_on_failure:
event_type = CassandraHarryEvent.failure
harry_failure_event = CassandraHarryEvent.failure
else:
event_type = CassandraHarryEvent.error
event_type(
harry_failure_event = CassandraHarryEvent.error
harry_failure_event(
node=loader,
stress_cmd=self.stress_cmd,
log_file_name=log_file_name,
errors=[errors_str, ],
).publish()
else:
CassandraHarryEvent.finish(node=loader, stress_cmd=self.stress_cmd,
log_file_name=log_file_name).publish()
harry_finish_event = CassandraHarryEvent.finish(node=loader, stress_cmd=self.stress_cmd,
log_file_name=log_file_name)
harry_finish_event.publish()

return result
return loader, result, harry_failure_event or harry_finish_event

@staticmethod
def _parse_harry_summary(lines): # pylint: disable=too-many-branches
Expand All @@ -137,3 +140,6 @@ def _parse_harry_summary(lines): # pylint: disable=too-many-branches
else:
results['status'] = 'failed'
return results

def get_results(self) -> list:
return [result for _, result, _ in super().get_results()]
4 changes: 4 additions & 0 deletions sdcm/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,7 @@ class CapacityReservationError(Exception):

class RaftTopologyCoordinatorNotFound(Exception):
"""Raise exception if no host id for raft topology was not found in group0 history"""


class NemesisStressFailure(Exception):
"""Exception to be raised to stop Nemesis flow, if stress command failed"""
18 changes: 12 additions & 6 deletions sdcm/kcl_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,27 +73,33 @@ def _run_stress(self, loader, loader_idx, cpu_idx):

KclStressEvent.start(node=loader, stress_cmd=stress_cmd).publish()

result = {}
kcl_finish_event = kcl_failure_event = None
try:
with cleanup_context:
result = docker.run(cmd=node_cmd,
timeout=self.timeout + self.shutdown_timeout,
log_file=log_file_name,
retry=0,
)

return result

except Exception as exc: # pylint: disable=broad-except
errors_str = format_stress_cmd_error(exc)
KclStressEvent.failure(
kcl_failure_event = KclStressEvent.failure(
node=loader,
stress_cmd=self.stress_cmd,
log_file_name=log_file_name,
errors=[errors_str, ],
).publish()
)
kcl_failure_event.publish()
raise
finally:
KclStressEvent.finish(node=loader, stress_cmd=stress_cmd, log_file_name=log_file_name).publish()
kcl_finish_event = KclStressEvent.finish(node=loader, stress_cmd=stress_cmd, log_file_name=log_file_name)
kcl_finish_event.publish()

return loader, result, kcl_failure_event or kcl_finish_event

def get_results(self) -> list:
return [result for _, result, _ in super().get_results()]


class CompareTablesSizesThread(DockerBasedStressThread): # pylint: disable=too-many-instance-attributes
Expand Down
40 changes: 24 additions & 16 deletions sdcm/ndbench_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,25 +148,33 @@ def _run_stress(self, loader, loader_idx, cpu_idx):

NdBenchStressEvent.start(node=loader, stress_cmd=self.stress_cmd).publish()

result = {}
ndbench_failure_event = ndbench_finish_event = None
with NdBenchStatsPublisher(loader, loader_idx, ndbench_log_filename=log_file_name), \
NdBenchStressEventsPublisher(node=loader, ndbench_log_filename=log_file_name), \
cleanup_context:
try:
docker_run_result = docker.run(cmd=node_cmd,
timeout=self.timeout + self.shutdown_timeout,
ignore_status=True,
log_file=log_file_name,
verbose=True,
retry=0,
)
return docker_run_result
result = docker.run(cmd=node_cmd,
timeout=self.timeout + self.shutdown_timeout,
ignore_status=True,
log_file=log_file_name,
verbose=True,
retry=0)
except Exception as exc: # pylint: disable=broad-except # noqa: BLE001
NdBenchStressEvent.failure(node=str(loader),
stress_cmd=self.stress_cmd,
log_file_name=log_file_name,
errors=[format_stress_cmd_error(exc), ]).publish()
ndbench_failure_event = NdBenchStressEvent.failure(
node=str(loader),
stress_cmd=self.stress_cmd,
log_file_name=log_file_name,
errors=[format_stress_cmd_error(exc), ])
ndbench_failure_event.publish()
finally:
NdBenchStressEvent.finish(node=loader,
stress_cmd=self.stress_cmd,
log_file_name=log_file_name).publish()
return None
ndbench_finish_event = NdBenchStressEvent.finish(
node=loader,
stress_cmd=self.stress_cmd,
log_file_name=log_file_name)
ndbench_finish_event.publish()

return loader, result, ndbench_failure_event or ndbench_finish_event

def get_results(self) -> list:
return [result for _, result, _ in super().get_results()]
29 changes: 29 additions & 0 deletions sdcm/nemesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
from sdcm.sct_events.nemesis import DisruptionEvent
from sdcm.sct_events.system import InfoEvent, CoreDumpEvent
from sdcm.sla.sla_tests import SlaTests
from sdcm.stress_thread import DockerBasedStressThread
from sdcm.utils.aws_kms import AwsKms
from sdcm.utils import cdc
from sdcm.utils.adaptive_timeouts import adaptive_timeout, Operations
Expand Down Expand Up @@ -155,6 +156,7 @@
AuditLogTestFailure,
BootstrapStreamErrorFailure,
QuotaConfigurationFailure,
NemesisStressFailure,
)
from test_lib.compaction import CompactionStrategy, get_compaction_strategy, get_compaction_random_additional_params, \
get_gc_mode, GcMode, calculate_allowed_twcs_ttl_borders, get_table_compaction_info
Expand Down Expand Up @@ -2075,6 +2077,23 @@ def _prepare_test_table(self, ks='keyspace1', table=None):
cs_thread = self.tester.run_stress_thread(
stress_cmd=stress_cmd, keyspace_name=ks, stop_test_on_failure=False, round_robin=True)
cs_thread.verify_results()
self.stop_nemesis_on_stress_errors(cs_thread)

def stop_nemesis_on_stress_errors(self, stress_thread: DockerBasedStressThread) -> None:
# Some implementations of stress threads override logic of the base class method
# DockerBasedStressThread.get_results() and filter out 'events' portion of a result (e.g. c-s stress thread).
# To retrieve all results of a thread we need to call the base class method directly
stress_results = super(stress_thread.__class__, stress_thread).get_results()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the super call needed here ? What are we trying to skip ? And why ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to directly call the get_results method to get all results from threads. Some command specific get_results implementation would filter out failed commands. E.g. in c-s the get_results performs:

    def get_results(self) -> list[dict | None]:
        ret = []
        results = super().get_results()

        for _, result, event in results:
            if not result:
                # Silently skip if stress command threw error, since it was already reported in _run_stress
                continue
...
        return ret

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit specific c-s, we should at least put a comment explaining it, it's quite unusual to skip up the MRO like that, I barely understand the reason it's c-s is doing that, let's at least point to the reason we jump it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the comment

node_errors = {}
for node, result, event in stress_results:
if event.get('errors'):
node_errors.setdefault(node.name, []).extend(event.errors)

if len(node_errors) == len(stress_results): # stop only if stress command failed on all loaders
errors_str = ''.join(f" on node '{node_name}': {errors}\n" for node_name, errors in node_errors.items())
raise NemesisStressFailure(
f"Aborting '{self.__class__.__name__}' nemesis as '{stress_thread.stress_cmd}' stress command failed "
f"with the following errors:\n{errors_str}")

@scylla_versions(("5.2.rc0", None), ("2023.1.rc0", None))
def _truncate_cmd_timeout_suffix(self, truncate_timeout): # pylint: disable=no-self-use
Expand Down Expand Up @@ -2121,6 +2140,7 @@ def disrupt_truncate_large_partition(self):
bench_thread = self.tester.run_stress_thread(
stress_cmd=stress_cmd, stop_test_on_failure=False)
self.tester.verify_stress_thread(bench_thread)
self.stop_nemesis_on_stress_errors(bench_thread)

# In order to workaround issue #4924 when truncate timeouts, we try to flush before truncate.
with adaptive_timeout(Operations.FLUSH, self.target_node, timeout=HOUR_IN_SEC * 2):
Expand Down Expand Up @@ -4248,6 +4268,7 @@ def _double_cluster_load(self, duration: int) -> None:
stress_queue = self.tester.run_stress_thread(
stress_cmd=self.tester.stress_cmd, stress_num=1, stats_aggregate_cmds=False, duration=duration)
results = self.tester.get_stress_results(queue=stress_queue, store_results=False)
self.stop_nemesis_on_stress_errors(stress_queue)
self.log.info(f"Double load results: {results}")

@target_data_nodes
Expand Down Expand Up @@ -4422,6 +4443,7 @@ def run_write_scylla_bench_load(write_cmd):
regex=".*sstable - Error while linking SSTable.*filesystem error: stat failed: No such file or directory.*"):
write_thread = self.tester.run_stress_thread(stress_cmd=write_cmd, stop_test_on_failure=False)
self.tester.verify_stress_thread(write_thread)
self.stop_nemesis_on_stress_errors(write_thread)
Copy link
Contributor

@vponomaryov vponomaryov Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we run the same command on each of the existing loaders.
In this case we are totally ok to get n_loaders - 1 failures.
We need result at least from some of them.

Probably it is the case for some other nemesis.

It runs a single command on multiple loaders just because of the default parameter round_robin=False defined in the self.tester.run_stress_thread.

So, probably we should have some passes and some fails checks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the check in the new method so that a nemesis would be aborted if all 'n_loadersstress commands failed. Checked this for both cases -round_robin` enabled and disabled.


try:
for i in range(2 if (aws_kms and kms_key_alias_name and enable_kms_key_rotation) else 1):
Expand All @@ -4441,6 +4463,7 @@ def run_write_scylla_bench_load(write_cmd):
" -iterations=1 -concurrency=10 -connection-count=10 -rows-per-request=10")
read_thread = self.tester.run_stress_thread(stress_cmd=read_cmd, stop_test_on_failure=False)
self.tester.verify_stress_thread(read_thread)
self.stop_nemesis_on_stress_errors(read_thread)

# Rotate KMS key
if enable_kms_key_rotation and aws_kms and kms_key_alias_name and i == 0:
Expand All @@ -4466,6 +4489,7 @@ def run_write_scylla_bench_load(write_cmd):
# ReRead data
read_thread2 = self.tester.run_stress_thread(stress_cmd=read_cmd, stop_test_on_failure=False)
self.tester.verify_stress_thread(read_thread2)
self.stop_nemesis_on_stress_errors(read_thread2)

# ReWrite data making the sstables be rewritten
run_write_scylla_bench_load(write_cmd)
Expand All @@ -4474,6 +4498,7 @@ def run_write_scylla_bench_load(write_cmd):
# ReRead data
read_thread3 = self.tester.run_stress_thread(stress_cmd=read_cmd, stop_test_on_failure=False)
self.tester.verify_stress_thread(read_thread3)
self.stop_nemesis_on_stress_errors(read_thread3)

# Check that sstables of that table are not encrypted anymore
check_encryption_fact(sstable_util, False)
Expand Down Expand Up @@ -4740,6 +4765,7 @@ def _write_read_data_to_multi_dc_keyspace(self, datacenters: List[str]) -> None:
f"-mode cql3 native compression=lz4 -rate threads=5 -pop seq=1..10000 -log interval=5"
write_thread = self.tester.run_stress_thread(stress_cmd=write_cmd, round_robin=True, stop_test_on_failure=False)
self.tester.verify_stress_thread(cs_thread_pool=write_thread)
self.stop_nemesis_on_stress_errors(write_thread)
self._verify_multi_dc_keyspace_data(consistency_level="ALL")

def _verify_multi_dc_keyspace_data(self, consistency_level: str = "ALL"):
Expand All @@ -4748,6 +4774,7 @@ def _verify_multi_dc_keyspace_data(self, consistency_level: str = "ALL"):
f"-pop seq=1..10000 -log interval=5"
read_thread = self.tester.run_stress_thread(stress_cmd=read_cmd, round_robin=True, stop_test_on_failure=False)
self.tester.verify_stress_thread(cs_thread_pool=read_thread)
self.stop_nemesis_on_stress_errors(read_thread)

def _switch_to_network_replication_strategy(self, keyspaces: List[str]) -> None:
"""Switches replication strategy to NetworkTopology for given keyspaces.
Expand Down Expand Up @@ -5161,13 +5188,15 @@ def _disrupt_toggle_audit(self, store: AuditStore):
write_thread = self.tester.run_stress_thread(
stress_cmd=write_cmd, round_robin=True, stop_test_on_failure=False)
self.tester.verify_stress_thread(cs_thread_pool=write_thread)
self.stop_nemesis_on_stress_errors(write_thread)
read_cmd = f"cassandra-stress read no-warmup cl=ONE n=1000 " \
f" -schema 'replication(strategy=NetworkTopologyStrategy,replication_factor=3)" \
f" keyspace={audit_keyspace}' -mode cql3 native -rate 'threads=1 throttle=1000/s'" \
f" -pop seq=1..1000 -col 'n=FIXED(1) size=FIXED(128)' -log interval=5"
read_thread = self.tester.run_stress_thread(
stress_cmd=read_cmd, round_robin=True, stop_test_on_failure=False)
self.tester.verify_stress_thread(cs_thread_pool=read_thread)
self.stop_nemesis_on_stress_errors(read_thread)
InfoEvent(message='Verifying Audit table contents').publish()
rows = audit.get_audit_log(from_datetime=audit_start, category="DML", limit_rows=1500)
# filter out USE keyspace rows due to https://github.com/scylladb/scylla-enterprise/issues/3169
Expand Down
19 changes: 12 additions & 7 deletions sdcm/nosql_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def _run_stress(self, loader, loader_idx, cpu_idx):
(loader_idx, cpu_idx, uuid.uuid4()))
LOGGER.debug('nosql-bench-stress local log: %s', log_file_name)
LOGGER.debug("'running: %s", stress_cmd)
result = {}
with NoSQLBenchStressEvent(node=loader, stress_cmd=stress_cmd, log_file_name=log_file_name) as stress_event, \
NoSQLBenchEventsPublisher(node=loader, log_filename=log_file_name):
try:
Expand All @@ -107,12 +108,16 @@ def _run_stress(self, loader, loader_idx, cpu_idx):
log_file=log_file_name,
ignore_status=True)

return loader.remoter.run(cmd=f'docker run '
'--name=nb '
'--network=nosql '
f'{self.docker_image_name} '
f'{stress_cmd} --report-graphite-to graphite-exporter:9109',
timeout=self.timeout + self.shutdown_timeout, log_file=log_file_name)
result = loader.remoter.run(cmd=f'docker run '
'--name=nb '
'--network=nosql '
f'{self.docker_image_name} '
f'{stress_cmd} --report-graphite-to graphite-exporter:9109',
timeout=self.timeout + self.shutdown_timeout, log_file=log_file_name)
except Exception as exc: # pylint: disable=broad-except # noqa: BLE001
self.configure_event_on_failure(stress_event=stress_event, exc=exc)
return None

return loader, result, stress_event

def get_results(self) -> list:
return [result for _, result, _ in super().get_results()]
4 changes: 2 additions & 2 deletions sdcm/scylla_bench_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def verify_results(self):

results = self.get_results()

for _, result in results:
for _, result, _ in results:
if not result:
# Silently skip if stress command threw an error, since it was already reported in _run_stress
continue
Expand Down Expand Up @@ -251,7 +251,7 @@ def _run_stress(self, loader, loader_idx, cpu_idx): # pylint: disable=too-many-
except Exception as exc: # pylint: disable=broad-except # noqa: BLE001
self.configure_event_on_failure(stress_event=scylla_bench_event, exc=exc)

return loader, result
return loader, result, scylla_bench_event
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to align this on all stress commands ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new stop_nemesis_on_stress_errors method is now applied to results of stress commands that are triggered from nemesis.py. There are 3 commands that are executed this way in nemesis code - c-s, s-b, and cql-stress-cassandra-stress.
All of them (with this^ change included) are returning tuple of (loader, result, event).

I'm not sure, if/which stress commands are also triggered from nemesis.py.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think regardless of what is being used right now, it would be good to align them to return identically from this function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, would be less confusion if we had single return type for all cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just set down with @dimakr, and we agreed that there would be a follow up for this change

that would incorporate stop_nemesis_on_stress_errors() into verify verify_results() and align the signatures of get_results() and verify_results()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aligned the return value for other stress command's run_tress'API.

I had to add a simple local implementation of get_results() for most of stress commands, so that we don't have problems in places like here

def get_stress_results_bench(self, queue):

But this is a temporary mechanism. After the followup task https://github.com/scylladb/qa-tasks/issues/1826 is implemented, the (get_/verify_)results() API will be streamlined across stress commands.


@classmethod
def _parse_bench_summary(cls, lines):
Expand Down
9 changes: 6 additions & 3 deletions sdcm/stress/latte_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ def _run_stress(self, loader, loader_idx, cpu_idx):

operation = self.function_name(stress_cmd)

result = {}
with cleanup_context, \
LatteStatsPublisher(loader, loader_idx, latte_log_filename=log_file_name,
operation=operation), \
Expand All @@ -233,12 +234,11 @@ def _run_stress(self, loader, loader_idx, cpu_idx):
log_file=log_file_name,
retry=0,
)
return self.parse_final_output(result)

result = self.parse_final_output(result)
except Exception as exc: # pylint: disable=broad-except # noqa: BLE001
self.configure_event_on_failure(stress_event=latte_stress_event, exc=exc)

return {}
return loader, result, latte_stress_event
# TODOs:
# 1) take back the report workload..3.0.8.p128.t1.c1.20231025.220812.json

Expand All @@ -253,6 +253,9 @@ def configure_event_on_failure(self, stress_event: LatteStressEvent, exc: Except
stress_event.severity = Severity.ERROR
stress_event.add_error(errors=[error_msg])

def get_results(self) -> list:
return [result for _, result, _ in super().get_results()]


def format_stress_cmd_error(exc: Exception) -> str:
"""Format nicely the exception from a stress command failure."""
Expand Down
Loading
Loading