Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(FullScan): choose non-running_nemesis node #9370

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions sdcm/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import itertools
import json
import ipaddress
from importlib import import_module
from typing import List, Optional, Dict, Union, Set, Iterable, ContextManager, Any, IO, AnyStr, Callable
from datetime import datetime, timezone
from textwrap import dedent
Expand Down Expand Up @@ -66,6 +65,7 @@
from sdcm.mgmt.common import get_manager_repo_from_defaults, get_manager_scylla_backend
from sdcm.prometheus import start_metrics_server, PrometheusAlertManagerListener, AlertSilencer
from sdcm.log import SDCMAdapter
from sdcm.target_node_lock import run_nemesis
from sdcm.provision.common.configuration_script import ConfigurationScriptBuilder
from sdcm.provision.common.utils import disable_daily_apt_triggers
from sdcm.provision.scylla_yaml import ScyllaYamlNodeAttrBuilder
Expand Down Expand Up @@ -4585,9 +4585,7 @@ def _rotate_kms_key(kms_key_alias_name, kms_key_rotation_interval, db_cluster):
message=f"Failed to rotate AWS KMS key for the '{kms_key_alias_name}' alias",
traceback=traceback.format_exc()).publish()
try:
nemesis_class = self.nemesis[0] if self.nemesis else getattr(
import_module('sdcm.nemesis'), "Nemesis")
with nemesis_class.run_nemesis(node_list=db_cluster.data_nodes, nemesis_label="KMS encryption check") as target_node:
with run_nemesis(node_list=db_cluster.data_nodes, nemesis_label="KMS encryption check") as target_node:
self.log.debug("Target node for 'rotate_kms_key' is %s", target_node.name)

ks_cf_list = db_cluster.get_non_system_ks_cf_list(
Expand Down
4 changes: 2 additions & 2 deletions sdcm/kcl_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from functools import cached_property
from typing import Dict

from sdcm.nemesis import Nemesis
from sdcm.target_node_lock import run_nemesis
from sdcm.stress_thread import DockerBasedStressThread
from sdcm.stress.base import format_stress_cmd_error
from sdcm.utils.docker_remote import RemoteDocker
Expand Down Expand Up @@ -132,7 +132,7 @@ def _run_stress(self, loader, loader_idx, cpu_idx):
end_time = time.time() + self._timeout

while not self._stop_event.is_set():
with Nemesis.run_nemesis(node_list=self.node_list, nemesis_label="Compare tables size by cf-stats") as node:
with run_nemesis(node_list=self.node_list, nemesis_label="Compare tables size by cf-stats") as node:
node.run_nodetool('flush')

dst_size = node.get_cfstats(dst_table)['Number of partitions (estimate)']
Expand Down
48 changes: 16 additions & 32 deletions sdcm/nemesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import itertools
import enum
from distutils.version import LooseVersion
from contextlib import ExitStack, contextmanager
from contextlib import ExitStack
from typing import Any, List, Optional, Type, Tuple, Callable, Dict, Set, Union, Iterable
from functools import wraps, partial
from collections import defaultdict, Counter, namedtuple
Expand Down Expand Up @@ -66,6 +66,8 @@
)
from sdcm.db_stats import PrometheusDBStats
from sdcm.log import SDCMAdapter
from sdcm.target_node_lock import run_nemesis, set_running_nemesis, unset_running_nemesis, \
NEMESIS_TARGET_SELECTION_LOCK, CantAcquireLockException
from sdcm.logcollector import save_kallsyms_map
from sdcm.mgmt.common import TaskStatus, ScyllaManagerError, get_persistent_snapshots
from sdcm.nemesis_publisher import NemesisElasticSearchPublisher
Expand Down Expand Up @@ -106,6 +108,7 @@
update_authenticator, ParallelObject,
ParallelObjectResult, sleep_for_percent_of_duration, get_views_of_base_table)
from sdcm.utils.features import is_tablets_feature_enabled
from sdcm.utils.nemesis_thread_safe_operations import safe_cluster_start_stop, safe_node_restart
from sdcm.utils.quota import configure_quota_on_node_for_scylla_user_context, is_quota_enabled_on_node, enable_quota_on_node, \
write_data_to_reach_end_of_quota
from sdcm.utils.compaction_ops import CompactionOps, StartStopCompactionArgs
Expand Down Expand Up @@ -172,7 +175,6 @@
"disrupt_terminate_kubernetes_host_then_decommission_and_add_scylla_node",
)

NEMESIS_TARGET_SELECTION_LOCK = Lock()
DISRUPT_POOL_PROPERTY_NAME = "target_pool"


Expand Down Expand Up @@ -328,24 +330,6 @@ def wrapper(self, *args, **kwargs):
setattr(cls, func.__name__, wrapper) # bind it to Nemesis class
return func # returning func means func can still be used normally

@staticmethod
@contextmanager
def run_nemesis(node_list: list['BaseNode'], nemesis_label: str):
"""
pick a node out of a `node_list`, and mark is as running_nemesis
for the duration of this context
"""
with NEMESIS_TARGET_SELECTION_LOCK:
free_nodes = [node for node in node_list if not node.running_nemesis]
assert free_nodes, f"couldn't find nodes for running:`{nemesis_label}`, are all nodes running nemesis ?"
node = random.choice(free_nodes)
node.running_nemesis = nemesis_label
try:
yield node
finally:
with NEMESIS_TARGET_SELECTION_LOCK:
node.running_nemesis = None

def use_nemesis_seed(self):
if nemesis_seed := self.tester.params.get("nemesis_seed"):
random.seed(nemesis_seed)
Expand Down Expand Up @@ -379,14 +363,11 @@ def switch_target_node(self, node: BaseNode):
self.target_node = node

def set_current_running_nemesis(self, node):
with NEMESIS_TARGET_SELECTION_LOCK:
node.running_nemesis = self.current_disruption
set_running_nemesis(node, self.current_disruption)

@staticmethod
def unset_current_running_nemesis(node):
if node is not None:
with NEMESIS_TARGET_SELECTION_LOCK:
node.running_nemesis = None
unset_running_nemesis(node)

def set_target_node_pool_type(self, pool_type: NEMESIS_TARGET_POOLS = NEMESIS_TARGET_POOLS.data_nodes):
"""Set pool type to choose nodes for target node """
Expand Down Expand Up @@ -449,10 +430,10 @@ def set_target_node(self, dc_idx: Optional[int] = None, rack: Optional[int] = No
self.target_node = random.choice(nodes)

if current_disruption:
self.target_node.running_nemesis = current_disruption
set_running_nemesis(self.target_node, current_disruption)
self.set_current_disruption(current_disruption)
elif self.current_disruption:
self.target_node.running_nemesis = self.current_disruption
set_running_nemesis(self.target_node, self.current_disruption)
else:
raise ValueError("current_disruption is not set")
self.log.info('Current Target: %s with running nemesis: %s',
Expand Down Expand Up @@ -980,7 +961,10 @@ def disrupt_soft_reboot_node(self):

@decorate_with_context(ignore_ycsb_connection_refused)
def disrupt_rolling_restart_cluster(self, random_order=False):
self.cluster.restart_scylla(random_order=random_order)
try:
safe_cluster_start_stop(self.target_node.running_nemesis, self.cluster)
except CantAcquireLockException as e:
UnsupportedNemesis(e)

def disrupt_switch_between_password_authenticator_and_saslauthd_authenticator_and_back(self):
"""
Expand Down Expand Up @@ -3076,7 +3060,7 @@ def execute_data_validation_thread(command_template, keyspace_name, number_of_ro
f'Schema restoration of {chosen_snapshot_tag} has failed!'

with ignore_ycsb_connection_refused():
self.cluster.restart_scylla() # After schema restoration, you should restart the nodes
safe_cluster_start_stop(self.target_node.running_nemesis, self.cluster)
self.tester.set_ks_strategy_to_network_and_rf_according_to_cluster(
keyspace=chosen_snapshot_info["keyspace_name"], repair_after_alter=False)

Expand Down Expand Up @@ -4055,7 +4039,7 @@ def decommission_post_action():
terminate_pattern.timeout):
stack.enter_context(expected_start_failed_context)
with ignore_stream_mutation_fragments_errors(), ignore_raft_topology_cmd_failing(), \
self.run_nemesis(node_list=self.cluster.data_nodes, nemesis_label="DecommissionStreamingErr") as verification_node, \
run_nemesis(node_list=self.cluster.data_nodes, nemesis_label="DecommissionStreamingErr") as verification_node, \
FailedDecommissionOperationMonitoring(target_node=self.target_node,
verification_node=verification_node,
timeout=full_operations_timeout):
Expand Down Expand Up @@ -4372,7 +4356,7 @@ def _enable_disable_table_encryption(self, enable_kms_key_rotation, additional_s
}
is_restart_needed = True
if is_restart_needed:
node.restart_scylla()
safe_node_restart(self.target_node.running_nemesis, node)

# Create table with encryption
keyspace_name, table_name = self.cluster.get_test_keyspaces()[0], 'tmp_encrypted_table'
Expand Down Expand Up @@ -5258,7 +5242,7 @@ def disrupt_bootstrap_streaming_error(self):
decommission_timeout = 7200
monitoring_decommission_timeout = decommission_timeout + 100
un_nodes = self.cluster.get_nodes_up_and_normal()
with Nemesis.run_nemesis(node_list=un_nodes, nemesis_label="BootstrapStreaminError") as verification_node, \
with run_nemesis(node_list=un_nodes, nemesis_label="BootstrapStreaminError") as verification_node, \
FailedDecommissionOperationMonitoring(target_node=new_node, verification_node=verification_node,
timeout=monitoring_decommission_timeout):

Expand Down
Loading
Loading