Skip to content

Commit

Permalink
fix(data_nodes): use data nodes for sct operations
Browse files Browse the repository at this point in the history
Fix code to use data nodes in different places
  • Loading branch information
aleksbykov authored and fruch committed Nov 3, 2024
1 parent ecb1126 commit a58de1b
Show file tree
Hide file tree
Showing 13 changed files with 61 additions and 41 deletions.
2 changes: 1 addition & 1 deletion longevity_sla_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def test_custom_time(self):
for role in self.roles + [self.fullscan_role]:
# self.fullscan_role may be None if "run_fullscan" is not defined
if role and is_enterprise:
with adaptive_timeout(Operations.SERVICE_LEVEL_PROPAGATION, node=self.db_cluster.nodes[0], timeout=15,
with adaptive_timeout(Operations.SERVICE_LEVEL_PROPAGATION, node=self.db_cluster.data_nodes[0], timeout=15,
service_level_for_test_step="MAIN_SERVICE_LEVEL"):
SlaUtils().wait_for_service_level_propagated(cluster=self.db_cluster, service_level=role.attached_service_level)

Expand Down
10 changes: 5 additions & 5 deletions longevity_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def test_custom_time(self):
# Grow cluster to target size if requested
if cluster_target_size := self.params.get('cluster_target_size'):
add_node_cnt = self.params.get('add_node_cnt')
node_cnt = len(self.db_cluster.nodes)
node_cnt = len(self.db_cluster.data_nodes)

InfoEvent(message=f"Starting to grow cluster from {node_cnt} to {cluster_target_size}").publish()

Expand All @@ -163,10 +163,10 @@ def test_custom_time(self):
new_nodes = self.db_cluster.add_nodes(count=add_node_cnt, enable_auto_bootstrap=True)
self.monitors.reconfigure_scylla_monitoring()
up_timeout = MAX_TIME_WAIT_FOR_NEW_NODE_UP
with adaptive_timeout(Operations.NEW_NODE, node=self.db_cluster.nodes[0], timeout=up_timeout):
with adaptive_timeout(Operations.NEW_NODE, node=self.db_cluster.data_nodes[0], timeout=up_timeout):
self.db_cluster.wait_for_init(node_list=new_nodes, timeout=up_timeout, check_node_health=False)
self.db_cluster.wait_for_nodes_up_and_normal(nodes=new_nodes)
node_cnt = len(self.db_cluster.nodes)
node_cnt = len(self.db_cluster.data_nodes)

InfoEvent(message=f"Growing cluster finished, new cluster size is {node_cnt}").publish()

Expand Down Expand Up @@ -214,7 +214,7 @@ def test_custom_time(self):

if (stress_read_cmd or stress_cmd) and self.validate_large_collections:
with ignore_large_collection_warning():
for node in self.db_cluster.nodes:
for node in self.db_cluster.data_nodes:
self._run_validate_large_collections_in_system(node)
self._run_validate_large_collections_warning_in_logs(node)

Expand Down Expand Up @@ -369,7 +369,7 @@ def _run_stress_in_batches(self, total_stress, batch_size, stress_cmd):

@property
def all_node_ips_for_stress_command(self):
return f' -node {",".join([n.cql_address for n in self.db_cluster.nodes])}'
return f' -node {",".join([n.cql_address for n in self.db_cluster.data_nodes])}'

@staticmethod
def _get_columns_num_of_single_stress(single_stress_cmd):
Expand Down
5 changes: 3 additions & 2 deletions longevity_tombstone_gc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ def __init__(self, *args, **kwargs):

def _run_repair_and_major_compaction(self, wait_propagation_delay: bool = False):
self.log.info('Run a flush for %s on nodes', self.keyspace)
triggers = [partial(node.run_nodetool, sub_cmd=f"flush -- {self.keyspace}", ) for node in self.db_cluster.nodes]
triggers = [partial(node.run_nodetool, sub_cmd=f"flush -- {self.keyspace}", )
for node in self.db_cluster.data_nodes]
ParallelObject(objects=triggers, timeout=1200).call_objects()

self.log.info('Run a repair for %s on nodes', self.ks_cf)
triggers = [partial(node.run_nodetool, sub_cmd="repair", args=f"-pr {self.keyspace} {self.table}", ) for node
in self.db_cluster.nodes]
in self.db_cluster.data_nodes]
ParallelObject(objects=triggers, timeout=1200).call_objects()

if wait_propagation_delay:
Expand Down
2 changes: 1 addition & 1 deletion mgmt_cli_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ def test_restore_backup_with_task(self, ks_names: list = None):
f"Backup task ended in {backup_task_status} instead of {TaskStatus.DONE}"
soft_timeout = 36 * 60
hard_timeout = 50 * 60
with adaptive_timeout(Operations.MGMT_REPAIR, self.db_cluster.nodes[0], timeout=soft_timeout):
with adaptive_timeout(Operations.MGMT_REPAIR, self.db_cluster.data_nodes[0], timeout=soft_timeout):
self.verify_backup_success(mgr_cluster=mgr_cluster, backup_task=backup_task, ks_names=ks_names,
restore_data_with_task=True, timeout=hard_timeout)
self.run_verification_read_stress(ks_names)
Expand Down
4 changes: 2 additions & 2 deletions sdcm/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3823,7 +3823,7 @@ def run_node_benchmarks(self):
if "db-cluster" not in self.name:
return
try:
self.node_benchmark_manager.add_nodes(self.nodes)
self.node_benchmark_manager.add_nodes(self.data_nodes)
self.node_benchmark_manager.install_benchmark_tools()
self.node_benchmark_manager.run_benchmarks()
except Exception as ex: # pylint: disable=broad-except # noqa: BLE001
Expand Down Expand Up @@ -4592,7 +4592,7 @@ def _rotate_kms_key(kms_key_alias_name, kms_key_rotation_interval, db_cluster):
try:
nemesis_class = self.nemesis[0] if self.nemesis else getattr(
import_module('sdcm.nemesis'), "Nemesis")
with nemesis_class.run_nemesis(node_list=db_cluster.nodes, nemesis_label="KMS encryption check") as target_node:
with nemesis_class.run_nemesis(node_list=db_cluster.data_nodes, nemesis_label="KMS encryption check") as target_node:
self.log.debug("Target node for 'rotate_kms_key' is %s", target_node.name)

ks_cf_list = db_cluster.get_non_system_ks_cf_list(
Expand Down
4 changes: 2 additions & 2 deletions sdcm/commit_log_check_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class CommitlogConfigParams:
def __init__(self, db_cluster, ):
logger = logging.getLogger(self.__class__.__name__)
with db_cluster.cql_connection_patient(
node=db_cluster.nodes[0],
node=db_cluster.data_nodes[0],
connect_timeout=300,) as session:
self.use_hard_size_limit = bool(strtobool(session.execute(
"SELECT value FROM system.config WHERE name='commitlog_use_hard_size_limit'").one().value))
Expand All @@ -33,7 +33,7 @@ def __init__(self, db_cluster, ):
method="GET", path="metrics/max_disk_size", params=None).stdout)
self.smp = len(re.findall(
"shard",
db_cluster.nodes[0].remoter.run('sudo seastar-cpu-map.sh -n scylla').stdout))
db_cluster.data_nodes[0].remoter.run('sudo seastar-cpu-map.sh -n scylla').stdout))
self.total_space = int(self.max_disk_size / self.smp)

logger.debug("CommitlogConfigParams")
Expand Down
53 changes: 32 additions & 21 deletions sdcm/nemesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1311,7 +1311,7 @@ def _replace_cluster_node(self, old_node_ip: str | None = None, host_id: str | N
else:
new_node.replacement_node_ip = old_node_ip
try:
with adaptive_timeout(Operations.NEW_NODE, node=self.cluster.nodes[0], timeout=timeout):
with adaptive_timeout(Operations.NEW_NODE, node=self.cluster.data_nodes[0], timeout=timeout):
self.cluster.wait_for_init(node_list=[new_node], timeout=timeout, check_node_health=False)
self.cluster.clean_replacement_node_options(new_node)
self.cluster.set_seeds()
Expand Down Expand Up @@ -1346,7 +1346,7 @@ def _add_and_init_new_cluster_nodes(self, count, timeout=MAX_TIME_WAIT_FOR_NEW_N
for new_node in new_nodes:
self.set_current_running_nemesis(node=new_node)
try:
with adaptive_timeout(Operations.NEW_NODE, node=self.cluster.nodes[0], timeout=timeout):
with adaptive_timeout(Operations.NEW_NODE, node=self.cluster.data_nodes[0], timeout=timeout):
self.cluster.wait_for_init(node_list=new_nodes, timeout=timeout, check_node_health=False)
self.cluster.set_seeds()
self.cluster.update_seed_provider()
Expand Down Expand Up @@ -1721,6 +1721,7 @@ def _major_compaction(self):
def disrupt_major_compaction(self):
self._major_compaction()

@target_data_nodes
def disrupt_load_and_stream(self):
# Checking the columns number of keyspace1.standard1
self.log.debug('Prepare keyspace1.standard1 if it does not exist')
Expand All @@ -1736,7 +1737,7 @@ def disrupt_load_and_stream(self):
result = self.target_node.run_nodetool(sub_cmd="cfstats", args="keyspace1.standard1")

if result is not None and result.exit_status == 0:
map_files_to_node = SstableLoadUtils.distribute_test_files_to_cluster_nodes(nodes=self.cluster.nodes,
map_files_to_node = SstableLoadUtils.distribute_test_files_to_cluster_nodes(nodes=self.cluster.data_nodes,
test_data=test_data)
for sstables_info, load_on_node in map_files_to_node:
SstableLoadUtils.upload_sstables(load_on_node, test_data=sstables_info, table_name="standard1")
Expand All @@ -1746,6 +1747,7 @@ def disrupt_load_and_stream(self):
SstableLoadUtils.run_load_and_stream(load_on_node, **kwargs)

# pylint: disable=too-many-statements
@target_data_nodes
def disrupt_nodetool_refresh(self, big_sstable: bool = False):
# Checking the columns number of keyspace1.standard1
self.log.debug('Prepare keyspace1.standard1 if it does not exist')
Expand Down Expand Up @@ -1775,15 +1777,15 @@ def disrupt_nodetool_refresh(self, big_sstable: bool = False):
self.log.debug('Key %s already exists before refresh', key)

# Executing rolling refresh one by one
shards_num = self.cluster.nodes[0].scylla_shards
for node in self.cluster.nodes:
shards_num = self.cluster.data_nodes[0].scylla_shards
for node in self.cluster.data_nodes:
SstableLoadUtils.upload_sstables(node, test_data=test_data[0], table_name="standard1",
is_cloud_cluster=self.cluster.params.get("db_type") == 'cloud_scylla')
system_log_follower = SstableLoadUtils.run_refresh(node, test_data=test_data[0])
# NOTE: resharding happens only if we have more than 1 core.
# We may have 1 core in a K8S multitenant setup.
# If tablets in use, skipping resharding validation since it doesn't work the same as vnodes
with self.cluster.cql_connection_patient(self.cluster.nodes[0]) as session:
with self.cluster.cql_connection_patient(self.cluster.data_nodes[0]) as session:
if shards_num > 1 and not is_tablets_feature_enabled(session=session):
SstableLoadUtils.validate_resharding_after_refresh(
node=node, system_log_follower=system_log_follower)
Expand Down Expand Up @@ -1815,10 +1817,11 @@ def _k8s_fake_enospc_error(self, node): # pylint: disable=no-self-use
node.restart_scylla_server(verify_up_after=True)
assert no_space_errors, "There are no 'No space left on device' errors in db log during enospc disruption."

@target_data_nodes
def disrupt_nodetool_enospc(self, sleep_time=30, all_nodes=False):

if all_nodes:
nodes = self.cluster.nodes
nodes = self.cluster.data_nodes
InfoEvent('Enospc test on {}'.format([n.name for n in nodes])).publish()
else:
nodes = [self.target_node]
Expand All @@ -1838,6 +1841,7 @@ def disrupt_nodetool_enospc(self, sleep_time=30, all_nodes=False):
finally:
clean_enospc_on_node(target_node=node, sleep_time=sleep_time)

@target_data_nodes
def disrupt_end_of_quota_nemesis(self, sleep_time=30):
"""
Nemesis flow
Expand Down Expand Up @@ -3225,7 +3229,7 @@ def disrupt_validate_hh_short_downtime(self): # pylint: disable=invalid-name
# Wait until all other nodes see the target node as UN
# Only then we can expect that hint sending started on all nodes
def target_node_reported_un_by_others():
for node in self.cluster.nodes:
for node in self.cluster.data_nodes:
if node is not self.target_node:
self.cluster.check_nodes_up_and_normal(nodes=[self.target_node], verification_node=node)
return True
Expand Down Expand Up @@ -4291,6 +4295,7 @@ def disrupt_enable_disable_table_encryption_aws_kms_provider_with_rotation(self)

@decorate_with_context(ignore_ycsb_connection_refused)
@scylla_versions(("2023.1.1-dev", None))
@target_data_nodes
def _enable_disable_table_encryption(self, enable_kms_key_rotation, additional_scylla_encryption_options=None): # noqa: PLR0914
if self.cluster.params.get("cluster_backend") != "aws":
raise UnsupportedNemesis("This nemesis is supported only on the AWS cluster backend")
Expand Down Expand Up @@ -4375,7 +4380,7 @@ def run_write_scylla_bench_load(write_cmd):
" -partition-count=50 -clustering-row-count=100 -clustering-row-size=uniform:75..125"
f" -keyspace {keyspace_name} -table {table_name} -timeout=120s -validate-data")
run_write_scylla_bench_load(write_cmd)
upgrade_sstables(self.cluster.nodes)
upgrade_sstables(self.cluster.data_nodes)

# Read data
read_cmd = (
Expand Down Expand Up @@ -4693,8 +4698,8 @@ def _verify_multi_dc_keyspace_data(self, consistency_level: str = "ALL"):
def _switch_to_network_replication_strategy(self, keyspaces: List[str]) -> None:
"""Switches replication strategy to NetworkTopology for given keyspaces.
"""
node = self.cluster.nodes[0]
nodes_by_region = self.tester.db_cluster.nodes_by_region()
node = self.cluster.data_nodes[0]
nodes_by_region = self.tester.db_cluster.nodes_by_region(nodes=self.tester.db_cluster.data_nodes)
region = list(nodes_by_region.keys())[0]
dc_name = self.tester.db_cluster.get_nodetool_info(nodes_by_region[region][0])['Data Center']
for keyspace in keyspaces:
Expand All @@ -4717,14 +4722,14 @@ def disrupt_add_remove_dc(self) -> None:
raise UnsupportedNemesis(
"add_remove_dc skipped for multi-dc scenario (https://github.com/scylladb/scylla-cluster-tests/issues/5369)")
InfoEvent(message='Starting New DC Nemesis').publish()
node = self.cluster.nodes[0]
node = self.cluster.data_nodes[0]
system_keyspaces = ["system_distributed", "system_traces"]
if not node.raft.is_consistent_topology_changes_enabled: # auth-v2 is used when consistent topology is enabled
system_keyspaces.insert(0, "system_auth")
self._switch_to_network_replication_strategy(self.cluster.get_test_keyspaces() + system_keyspaces)
datacenters = list(self.tester.db_cluster.get_nodetool_status().keys())
self.tester.create_keyspace("keyspace_new_dc", replication_factor={
datacenters[0]: min(3, len(self.cluster.nodes))})
datacenters[0]: min(3, len(self.cluster.data_nodes))})
node_added = False
with ExitStack() as context_manager:
def finalizer(exc_type, *_):
Expand Down Expand Up @@ -4754,8 +4759,8 @@ def finalizer(exc_type, *_):
end_line_patterns=["rebuild.*finished with keyspaces=", "Rebuild succeeded"],
start_timeout=60, end_timeout=600):
new_node.run_nodetool(sub_cmd=f"rebuild -- {datacenters[0]}", long_running=True, retry=0)
InfoEvent(message='Running full cluster repair on each node').publish()
for cluster_node in self.cluster.nodes:
InfoEvent(message='Running full cluster repair on each data node').publish()
for cluster_node in self.cluster.data_nodes:
cluster_node.run_nodetool(sub_cmd="repair -pr", publish_event=True)
datacenters = list(self.tester.db_cluster.get_nodetool_status().keys())
self._write_read_data_to_multi_dc_keyspace(datacenters)
Expand Down Expand Up @@ -4993,6 +4998,7 @@ def disrupt_create_index(self):
finally:
drop_index(session, ks, index_name)

@target_data_nodes
def disrupt_add_remove_mv(self):
"""
Create a Materialized view on an existing table while a node is down.
Expand All @@ -5001,7 +5007,7 @@ def disrupt_add_remove_mv(self):
Finally, drop the MV.
"""

free_nodes = [node for node in self.cluster.nodes if not node.running_nemesis]
free_nodes = [node for node in self.cluster.data_nodes if not node.running_nemesis]
if not free_nodes:
raise UnsupportedNemesis("Not enough free nodes for nemesis. Skipping.")
cql_query_executor_node = random.choice(free_nodes)
Expand Down Expand Up @@ -5305,7 +5311,8 @@ def wrapper(*args, **kwargs): # pylint: disable=too-many-statements # noqa: PL
args[0].set_target_node(current_disruption=current_disruption)

args[0].cluster.check_cluster_health()
num_nodes_before = len(args[0].cluster.nodes)
num_data_nodes_before = len(args[0].cluster.data_nodes)
num_zero_nodes_before = len(args[0].cluster.zero_nodes)
start_time = time.time()
args[0].log.debug('Start disruption at `%s`', datetime.datetime.fromtimestamp(start_time))
class_name = args[0].get_class_name()
Expand Down Expand Up @@ -5390,10 +5397,14 @@ def wrapper(*args, **kwargs): # pylint: disable=too-many-statements # noqa: PL
start_time), nemesis_event=nemesis_event)

args[0].cluster.check_cluster_health()
num_nodes_after = len(args[0].cluster.nodes)
if num_nodes_before != num_nodes_after:
args[0].log.error('num nodes before %s and nodes after %s does not match' %
(num_nodes_before, num_nodes_after))
num_data_nodes_after = len(args[0].cluster.data_nodes)
num_zero_nodes_after = len(args[0].cluster.zero_nodes)
if num_data_nodes_before != num_data_nodes_after:
args[0].log.error('num data nodes before %s and data nodes after %s does not match' %
(num_data_nodes_before, num_data_nodes_after))
if args[0].cluster.params.get("use_zero_nodes") and num_zero_nodes_before != num_zero_nodes_after:
args[0].log.error('num zero nodes before %s and zero nodes after %s does not match' %
(num_zero_nodes_before, num_zero_nodes_after))
# TODO: Temporary print. Will be removed later
data_validation_prints(args=args)
finally:
Expand Down
2 changes: 1 addition & 1 deletion sdcm/scan_operation_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def __init__(self, generator: random.Random, thread_params: ThreadParams, thread
self.log.info("FullscanOperationBase init finished")

def _get_random_node(self) -> BaseNode:
return self.generator.choice(self.fullscan_params.db_cluster.nodes)
return self.generator.choice(self.fullscan_params.db_cluster.data_nodes)

@abstractmethod
def randomly_form_cql_statement(self) -> str:
Expand Down
Loading

0 comments on commit a58de1b

Please sign in to comment.