Skip to content

Commit

Permalink
fix(FullScan): choose only non-disruptive node
Browse files Browse the repository at this point in the history
this commit has the following changes

1 introduce common targed_node_lock mechanism
that can be used in nemesis and Scan operations

2 FullScan operation now run only on free of nemeses node

3 change all node.running_nemesis settings to use common methods
set/unset_running_nemesis from common targed_node_lock file (except unit tests)

fixes: scylladb#9284
  • Loading branch information
temichus committed Nov 27, 2024
1 parent 57e5dd0 commit 33fd377
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 176 deletions.
6 changes: 2 additions & 4 deletions sdcm/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import itertools
import json
import ipaddress
from importlib import import_module
from typing import List, Optional, Dict, Union, Set, Iterable, ContextManager, Any, IO, AnyStr, Callable
from datetime import datetime, timezone
from textwrap import dedent
Expand Down Expand Up @@ -66,6 +65,7 @@
from sdcm.mgmt.common import get_manager_repo_from_defaults, get_manager_scylla_backend
from sdcm.prometheus import start_metrics_server, PrometheusAlertManagerListener, AlertSilencer
from sdcm.log import SDCMAdapter
from sdcm.target_node_lock import run_nemesis
from sdcm.provision.common.configuration_script import ConfigurationScriptBuilder
from sdcm.provision.common.utils import disable_daily_apt_triggers
from sdcm.provision.scylla_yaml import ScyllaYamlNodeAttrBuilder
Expand Down Expand Up @@ -4579,9 +4579,7 @@ def _rotate_kms_key(kms_key_alias_name, kms_key_rotation_interval, db_cluster):
message=f"Failed to rotate AWS KMS key for the '{kms_key_alias_name}' alias",
traceback=traceback.format_exc()).publish()
try:
nemesis_class = self.nemesis[0] if self.nemesis else getattr(
import_module('sdcm.nemesis'), "Nemesis")
with nemesis_class.run_nemesis(node_list=db_cluster.data_nodes, nemesis_label="KMS encryption check") as target_node:
with run_nemesis(node_list=db_cluster.data_nodes, nemesis_label="KMS encryption check") as target_node:
self.log.debug("Target node for 'rotate_kms_key' is %s", target_node.name)

ks_cf_list = db_cluster.get_non_system_ks_cf_list(
Expand Down
4 changes: 2 additions & 2 deletions sdcm/kcl_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from functools import cached_property
from typing import Dict

from sdcm.nemesis import Nemesis
from sdcm.target_node_lock import run_nemesis
from sdcm.stress_thread import DockerBasedStressThread
from sdcm.stress.base import format_stress_cmd_error
from sdcm.utils.docker_remote import RemoteDocker
Expand Down Expand Up @@ -132,7 +132,7 @@ def _run_stress(self, loader, loader_idx, cpu_idx):
end_time = time.time() + self._timeout

while not self._stop_event.is_set():
with Nemesis.run_nemesis(node_list=self.node_list, nemesis_label="Compare tables size by cf-stats") as node:
with run_nemesis(node_list=self.node_list, nemesis_label="Compare tables size by cf-stats") as node:
node.run_nodetool('flush')

dst_size = node.get_cfstats(dst_table)['Number of partitions (estimate)']
Expand Down
38 changes: 8 additions & 30 deletions sdcm/nemesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import json
import itertools
from distutils.version import LooseVersion
from contextlib import ExitStack, contextmanager
from contextlib import ExitStack
from typing import Any, List, Optional, Type, Tuple, Callable, Dict, Set, Union, Iterable
from functools import wraps, partial
from collections import defaultdict, Counter, namedtuple
Expand Down Expand Up @@ -65,6 +65,7 @@
)
from sdcm.db_stats import PrometheusDBStats
from sdcm.log import SDCMAdapter
from sdcm.target_node_lock import run_nemesis, set_running_nemesis, unset_running_nemesis, NEMESIS_TARGET_SELECTION_LOCK
from sdcm.logcollector import save_kallsyms_map
from sdcm.mgmt.common import TaskStatus, ScyllaManagerError, get_persistent_snapshots
from sdcm.nemesis_publisher import NemesisElasticSearchPublisher
Expand Down Expand Up @@ -170,8 +171,6 @@
"disrupt_terminate_kubernetes_host_then_decommission_and_add_scylla_node",
)

NEMESIS_TARGET_SELECTION_LOCK = Lock()


class DefaultValue: # pylint: disable=too-few-public-methods
"""
Expand Down Expand Up @@ -337,24 +336,6 @@ def wrapper(self, *args, **kwargs):
setattr(cls, func.__name__, wrapper) # bind it to Nemesis class
return func # returning func means func can still be used normally

@staticmethod
@contextmanager
def run_nemesis(node_list: list['BaseNode'], nemesis_label: str):
"""
pick a node out of a `node_list`, and mark is as running_nemesis
for the duration of this context
"""
with NEMESIS_TARGET_SELECTION_LOCK:
free_nodes = [node for node in node_list if not node.running_nemesis]
assert free_nodes, f"couldn't find nodes for running:`{nemesis_label}`, are all nodes running nemesis ?"
node = random.choice(free_nodes)
node.running_nemesis = nemesis_label
try:
yield node
finally:
with NEMESIS_TARGET_SELECTION_LOCK:
node.running_nemesis = None

def use_nemesis_seed(self):
if nemesis_seed := self.tester.params.get("nemesis_seed"):
random.seed(nemesis_seed)
Expand All @@ -381,14 +362,11 @@ def publish_event(self, disrupt, status=True, data=None):
DisruptionEvent(nemesis_name=disrupt, severity=severity, **data).publish()

def set_current_running_nemesis(self, node):
with NEMESIS_TARGET_SELECTION_LOCK:
node.running_nemesis = self.current_disruption
set_running_nemesis(node, self.current_disruption)

@staticmethod
def unset_current_running_nemesis(node):
if node is not None:
with NEMESIS_TARGET_SELECTION_LOCK:
node.running_nemesis = None
unset_running_nemesis(node)

def set_target_node_pool(self, nodelist: list[BaseNode] | None = None):
"""Set pool of nodes to choose target node """
Expand Down Expand Up @@ -453,10 +431,10 @@ def set_target_node(self, dc_idx: Optional[int] = None, rack: Optional[int] = No
self.target_node = random.choice(nodes)

if current_disruption:
self.target_node.running_nemesis = current_disruption
set_running_nemesis(self.target_node, current_disruption)
self.set_current_disruption(current_disruption)
elif self.current_disruption:
self.target_node.running_nemesis = self.current_disruption
set_running_nemesis(self.target_node, self.current_disruption)
else:
raise ValueError("current_disruption is not set")
self.log.info('Current Target: %s with running nemesis: %s',
Expand Down Expand Up @@ -4046,7 +4024,7 @@ def decommission_post_action():
terminate_pattern.timeout):
stack.enter_context(expected_start_failed_context)
with ignore_stream_mutation_fragments_errors(), ignore_raft_topology_cmd_failing(), \
self.run_nemesis(node_list=self.cluster.data_nodes, nemesis_label="DecommissionStreamingErr") as verification_node, \
run_nemesis(node_list=self.cluster.data_nodes, nemesis_label="DecommissionStreamingErr") as verification_node, \
FailedDecommissionOperationMonitoring(target_node=self.target_node,
verification_node=verification_node,
timeout=full_operations_timeout):
Expand Down Expand Up @@ -5240,7 +5218,7 @@ def disrupt_bootstrap_streaming_error(self):
decommission_timeout = 7200
monitoring_decommission_timeout = decommission_timeout + 100
un_nodes = self.cluster.get_nodes_up_and_normal()
with Nemesis.run_nemesis(node_list=un_nodes, nemesis_label="BootstrapStreaminError") as verification_node, \
with run_nemesis(node_list=un_nodes, nemesis_label="BootstrapStreaminError") as verification_node, \
FailedDecommissionOperationMonitoring(target_node=new_node, verification_node=verification_node,
timeout=monitoring_decommission_timeout):

Expand Down
Loading

0 comments on commit 33fd377

Please sign in to comment.