diff --git a/longevity_sla_test.py b/longevity_sla_test.py index 8187cab9fa..4d6abd5f92 100644 --- a/longevity_sla_test.py +++ b/longevity_sla_test.py @@ -49,7 +49,7 @@ def test_custom_time(self): for role in self.roles + [self.fullscan_role]: # self.fullscan_role may be None if "run_fullscan" is not defined if role and is_enterprise: - with adaptive_timeout(Operations.SERVICE_LEVEL_PROPAGATION, node=self.db_cluster.nodes[0], timeout=15, + with adaptive_timeout(Operations.SERVICE_LEVEL_PROPAGATION, node=self.db_cluster.data_nodes[0], timeout=15, service_level_for_test_step="MAIN_SERVICE_LEVEL"): SlaUtils().wait_for_service_level_propagated(cluster=self.db_cluster, service_level=role.attached_service_level) diff --git a/longevity_test.py b/longevity_test.py index 96e74d137d..1e2648213c 100644 --- a/longevity_test.py +++ b/longevity_test.py @@ -154,7 +154,7 @@ def test_custom_time(self): # Grow cluster to target size if requested if cluster_target_size := self.params.get('cluster_target_size'): add_node_cnt = self.params.get('add_node_cnt') - node_cnt = len(self.db_cluster.nodes) + node_cnt = len(self.db_cluster.data_nodes) InfoEvent(message=f"Starting to grow cluster from {node_cnt} to {cluster_target_size}").publish() @@ -163,10 +163,10 @@ def test_custom_time(self): new_nodes = self.db_cluster.add_nodes(count=add_node_cnt, enable_auto_bootstrap=True) self.monitors.reconfigure_scylla_monitoring() up_timeout = MAX_TIME_WAIT_FOR_NEW_NODE_UP - with adaptive_timeout(Operations.NEW_NODE, node=self.db_cluster.nodes[0], timeout=up_timeout): + with adaptive_timeout(Operations.NEW_NODE, node=self.db_cluster.data_nodes[0], timeout=up_timeout): self.db_cluster.wait_for_init(node_list=new_nodes, timeout=up_timeout, check_node_health=False) self.db_cluster.wait_for_nodes_up_and_normal(nodes=new_nodes) - node_cnt = len(self.db_cluster.nodes) + node_cnt = len(self.db_cluster.data_nodes) InfoEvent(message=f"Growing cluster finished, new cluster size is {node_cnt}").publish() @@ -214,7 +214,7 @@ def test_custom_time(self): if (stress_read_cmd or stress_cmd) and self.validate_large_collections: with ignore_large_collection_warning(): - for node in self.db_cluster.nodes: + for node in self.db_cluster.data_nodes: self._run_validate_large_collections_in_system(node) self._run_validate_large_collections_warning_in_logs(node) @@ -369,7 +369,7 @@ def _run_stress_in_batches(self, total_stress, batch_size, stress_cmd): @property def all_node_ips_for_stress_command(self): - return f' -node {",".join([n.cql_address for n in self.db_cluster.nodes])}' + return f' -node {",".join([n.cql_address for n in self.db_cluster.data_nodes])}' @staticmethod def _get_columns_num_of_single_stress(single_stress_cmd): diff --git a/longevity_tombstone_gc_test.py b/longevity_tombstone_gc_test.py index 87a0cb1db7..d9788b5a96 100644 --- a/longevity_tombstone_gc_test.py +++ b/longevity_tombstone_gc_test.py @@ -24,12 +24,13 @@ def __init__(self, *args, **kwargs): def _run_repair_and_major_compaction(self, wait_propagation_delay: bool = False): self.log.info('Run a flush for %s on nodes', self.keyspace) - triggers = [partial(node.run_nodetool, sub_cmd=f"flush -- {self.keyspace}", ) for node in self.db_cluster.nodes] + triggers = [partial(node.run_nodetool, sub_cmd=f"flush -- {self.keyspace}", ) + for node in self.db_cluster.data_nodes] ParallelObject(objects=triggers, timeout=1200).call_objects() self.log.info('Run a repair for %s on nodes', self.ks_cf) triggers = [partial(node.run_nodetool, sub_cmd="repair", args=f"-pr {self.keyspace} {self.table}", ) for node - in self.db_cluster.nodes] + in self.db_cluster.data_nodes] ParallelObject(objects=triggers, timeout=1200).call_objects() if wait_propagation_delay: diff --git a/mgmt_cli_test.py b/mgmt_cli_test.py index 654305d4ae..9979bad80d 100644 --- a/mgmt_cli_test.py +++ b/mgmt_cli_test.py @@ -692,7 +692,7 @@ def test_restore_backup_with_task(self, ks_names: list = None): f"Backup task ended in {backup_task_status} instead of {TaskStatus.DONE}" soft_timeout = 36 * 60 hard_timeout = 50 * 60 - with adaptive_timeout(Operations.MGMT_REPAIR, self.db_cluster.nodes[0], timeout=soft_timeout): + with adaptive_timeout(Operations.MGMT_REPAIR, self.db_cluster.data_nodes[0], timeout=soft_timeout): self.verify_backup_success(mgr_cluster=mgr_cluster, backup_task=backup_task, ks_names=ks_names, restore_data_with_task=True, timeout=hard_timeout) self.run_verification_read_stress(ks_names) diff --git a/sdcm/cluster.py b/sdcm/cluster.py index 5461efeca9..5c7167207f 100644 --- a/sdcm/cluster.py +++ b/sdcm/cluster.py @@ -3823,7 +3823,7 @@ def run_node_benchmarks(self): if "db-cluster" not in self.name: return try: - self.node_benchmark_manager.add_nodes(self.nodes) + self.node_benchmark_manager.add_nodes(self.data_nodes) self.node_benchmark_manager.install_benchmark_tools() self.node_benchmark_manager.run_benchmarks() except Exception as ex: # pylint: disable=broad-except # noqa: BLE001 @@ -4592,7 +4592,7 @@ def _rotate_kms_key(kms_key_alias_name, kms_key_rotation_interval, db_cluster): try: nemesis_class = self.nemesis[0] if self.nemesis else getattr( import_module('sdcm.nemesis'), "Nemesis") - with nemesis_class.run_nemesis(node_list=db_cluster.nodes, nemesis_label="KMS encryption check") as target_node: + with nemesis_class.run_nemesis(node_list=db_cluster.data_nodes, nemesis_label="KMS encryption check") as target_node: self.log.debug("Target node for 'rotate_kms_key' is %s", target_node.name) ks_cf_list = db_cluster.get_non_system_ks_cf_list( diff --git a/sdcm/commit_log_check_thread.py b/sdcm/commit_log_check_thread.py index 6a2ee64195..eb5f039de3 100644 --- a/sdcm/commit_log_check_thread.py +++ b/sdcm/commit_log_check_thread.py @@ -22,7 +22,7 @@ class CommitlogConfigParams: def __init__(self, db_cluster, ): logger = logging.getLogger(self.__class__.__name__) with db_cluster.cql_connection_patient( - node=db_cluster.nodes[0], + node=db_cluster.data_nodes[0], connect_timeout=300,) as session: self.use_hard_size_limit = bool(strtobool(session.execute( "SELECT value FROM system.config WHERE name='commitlog_use_hard_size_limit'").one().value)) @@ -33,7 +33,7 @@ def __init__(self, db_cluster, ): method="GET", path="metrics/max_disk_size", params=None).stdout) self.smp = len(re.findall( "shard", - db_cluster.nodes[0].remoter.run('sudo seastar-cpu-map.sh -n scylla').stdout)) + db_cluster.data_nodes[0].remoter.run('sudo seastar-cpu-map.sh -n scylla').stdout)) self.total_space = int(self.max_disk_size / self.smp) logger.debug("CommitlogConfigParams") diff --git a/sdcm/nemesis.py b/sdcm/nemesis.py index 339059a1be..3d3bb38516 100644 --- a/sdcm/nemesis.py +++ b/sdcm/nemesis.py @@ -1311,7 +1311,7 @@ def _replace_cluster_node(self, old_node_ip: str | None = None, host_id: str | N else: new_node.replacement_node_ip = old_node_ip try: - with adaptive_timeout(Operations.NEW_NODE, node=self.cluster.nodes[0], timeout=timeout): + with adaptive_timeout(Operations.NEW_NODE, node=self.cluster.data_nodes[0], timeout=timeout): self.cluster.wait_for_init(node_list=[new_node], timeout=timeout, check_node_health=False) self.cluster.clean_replacement_node_options(new_node) self.cluster.set_seeds() @@ -1346,7 +1346,7 @@ def _add_and_init_new_cluster_nodes(self, count, timeout=MAX_TIME_WAIT_FOR_NEW_N for new_node in new_nodes: self.set_current_running_nemesis(node=new_node) try: - with adaptive_timeout(Operations.NEW_NODE, node=self.cluster.nodes[0], timeout=timeout): + with adaptive_timeout(Operations.NEW_NODE, node=self.cluster.data_nodes[0], timeout=timeout): self.cluster.wait_for_init(node_list=new_nodes, timeout=timeout, check_node_health=False) self.cluster.set_seeds() self.cluster.update_seed_provider() @@ -1721,6 +1721,7 @@ def _major_compaction(self): def disrupt_major_compaction(self): self._major_compaction() + @target_data_nodes def disrupt_load_and_stream(self): # Checking the columns number of keyspace1.standard1 self.log.debug('Prepare keyspace1.standard1 if it does not exist') @@ -1736,7 +1737,7 @@ def disrupt_load_and_stream(self): result = self.target_node.run_nodetool(sub_cmd="cfstats", args="keyspace1.standard1") if result is not None and result.exit_status == 0: - map_files_to_node = SstableLoadUtils.distribute_test_files_to_cluster_nodes(nodes=self.cluster.nodes, + map_files_to_node = SstableLoadUtils.distribute_test_files_to_cluster_nodes(nodes=self.cluster.data_nodes, test_data=test_data) for sstables_info, load_on_node in map_files_to_node: SstableLoadUtils.upload_sstables(load_on_node, test_data=sstables_info, table_name="standard1") @@ -1746,6 +1747,7 @@ def disrupt_load_and_stream(self): SstableLoadUtils.run_load_and_stream(load_on_node, **kwargs) # pylint: disable=too-many-statements + @target_data_nodes def disrupt_nodetool_refresh(self, big_sstable: bool = False): # Checking the columns number of keyspace1.standard1 self.log.debug('Prepare keyspace1.standard1 if it does not exist') @@ -1775,15 +1777,15 @@ def disrupt_nodetool_refresh(self, big_sstable: bool = False): self.log.debug('Key %s already exists before refresh', key) # Executing rolling refresh one by one - shards_num = self.cluster.nodes[0].scylla_shards - for node in self.cluster.nodes: + shards_num = self.cluster.data_nodes[0].scylla_shards + for node in self.cluster.data_nodes: SstableLoadUtils.upload_sstables(node, test_data=test_data[0], table_name="standard1", is_cloud_cluster=self.cluster.params.get("db_type") == 'cloud_scylla') system_log_follower = SstableLoadUtils.run_refresh(node, test_data=test_data[0]) # NOTE: resharding happens only if we have more than 1 core. # We may have 1 core in a K8S multitenant setup. # If tablets in use, skipping resharding validation since it doesn't work the same as vnodes - with self.cluster.cql_connection_patient(self.cluster.nodes[0]) as session: + with self.cluster.cql_connection_patient(self.cluster.data_nodes[0]) as session: if shards_num > 1 and not is_tablets_feature_enabled(session=session): SstableLoadUtils.validate_resharding_after_refresh( node=node, system_log_follower=system_log_follower) @@ -1815,10 +1817,11 @@ def _k8s_fake_enospc_error(self, node): # pylint: disable=no-self-use node.restart_scylla_server(verify_up_after=True) assert no_space_errors, "There are no 'No space left on device' errors in db log during enospc disruption." + @target_data_nodes def disrupt_nodetool_enospc(self, sleep_time=30, all_nodes=False): if all_nodes: - nodes = self.cluster.nodes + nodes = self.cluster.data_nodes InfoEvent('Enospc test on {}'.format([n.name for n in nodes])).publish() else: nodes = [self.target_node] @@ -1838,6 +1841,7 @@ def disrupt_nodetool_enospc(self, sleep_time=30, all_nodes=False): finally: clean_enospc_on_node(target_node=node, sleep_time=sleep_time) + @target_data_nodes def disrupt_end_of_quota_nemesis(self, sleep_time=30): """ Nemesis flow @@ -3225,7 +3229,7 @@ def disrupt_validate_hh_short_downtime(self): # pylint: disable=invalid-name # Wait until all other nodes see the target node as UN # Only then we can expect that hint sending started on all nodes def target_node_reported_un_by_others(): - for node in self.cluster.nodes: + for node in self.cluster.data_nodes: if node is not self.target_node: self.cluster.check_nodes_up_and_normal(nodes=[self.target_node], verification_node=node) return True @@ -4291,6 +4295,7 @@ def disrupt_enable_disable_table_encryption_aws_kms_provider_with_rotation(self) @decorate_with_context(ignore_ycsb_connection_refused) @scylla_versions(("2023.1.1-dev", None)) + @target_data_nodes def _enable_disable_table_encryption(self, enable_kms_key_rotation, additional_scylla_encryption_options=None): # noqa: PLR0914 if self.cluster.params.get("cluster_backend") != "aws": raise UnsupportedNemesis("This nemesis is supported only on the AWS cluster backend") @@ -4375,7 +4380,7 @@ def run_write_scylla_bench_load(write_cmd): " -partition-count=50 -clustering-row-count=100 -clustering-row-size=uniform:75..125" f" -keyspace {keyspace_name} -table {table_name} -timeout=120s -validate-data") run_write_scylla_bench_load(write_cmd) - upgrade_sstables(self.cluster.nodes) + upgrade_sstables(self.cluster.data_nodes) # Read data read_cmd = ( @@ -4693,8 +4698,8 @@ def _verify_multi_dc_keyspace_data(self, consistency_level: str = "ALL"): def _switch_to_network_replication_strategy(self, keyspaces: List[str]) -> None: """Switches replication strategy to NetworkTopology for given keyspaces. """ - node = self.cluster.nodes[0] - nodes_by_region = self.tester.db_cluster.nodes_by_region() + node = self.cluster.data_nodes[0] + nodes_by_region = self.tester.db_cluster.nodes_by_region(nodes=self.tester.db_cluster.data_nodes) region = list(nodes_by_region.keys())[0] dc_name = self.tester.db_cluster.get_nodetool_info(nodes_by_region[region][0])['Data Center'] for keyspace in keyspaces: @@ -4717,14 +4722,14 @@ def disrupt_add_remove_dc(self) -> None: raise UnsupportedNemesis( "add_remove_dc skipped for multi-dc scenario (https://github.com/scylladb/scylla-cluster-tests/issues/5369)") InfoEvent(message='Starting New DC Nemesis').publish() - node = self.cluster.nodes[0] + node = self.cluster.data_nodes[0] system_keyspaces = ["system_distributed", "system_traces"] if not node.raft.is_consistent_topology_changes_enabled: # auth-v2 is used when consistent topology is enabled system_keyspaces.insert(0, "system_auth") self._switch_to_network_replication_strategy(self.cluster.get_test_keyspaces() + system_keyspaces) datacenters = list(self.tester.db_cluster.get_nodetool_status().keys()) self.tester.create_keyspace("keyspace_new_dc", replication_factor={ - datacenters[0]: min(3, len(self.cluster.nodes))}) + datacenters[0]: min(3, len(self.cluster.data_nodes))}) node_added = False with ExitStack() as context_manager: def finalizer(exc_type, *_): @@ -4754,8 +4759,8 @@ def finalizer(exc_type, *_): end_line_patterns=["rebuild.*finished with keyspaces=", "Rebuild succeeded"], start_timeout=60, end_timeout=600): new_node.run_nodetool(sub_cmd=f"rebuild -- {datacenters[0]}", long_running=True, retry=0) - InfoEvent(message='Running full cluster repair on each node').publish() - for cluster_node in self.cluster.nodes: + InfoEvent(message='Running full cluster repair on each data node').publish() + for cluster_node in self.cluster.data_nodes: cluster_node.run_nodetool(sub_cmd="repair -pr", publish_event=True) datacenters = list(self.tester.db_cluster.get_nodetool_status().keys()) self._write_read_data_to_multi_dc_keyspace(datacenters) @@ -4993,6 +4998,7 @@ def disrupt_create_index(self): finally: drop_index(session, ks, index_name) + @target_data_nodes def disrupt_add_remove_mv(self): """ Create a Materialized view on an existing table while a node is down. @@ -5001,7 +5007,7 @@ def disrupt_add_remove_mv(self): Finally, drop the MV. """ - free_nodes = [node for node in self.cluster.nodes if not node.running_nemesis] + free_nodes = [node for node in self.cluster.data_nodes if not node.running_nemesis] if not free_nodes: raise UnsupportedNemesis("Not enough free nodes for nemesis. Skipping.") cql_query_executor_node = random.choice(free_nodes) @@ -5305,7 +5311,8 @@ def wrapper(*args, **kwargs): # pylint: disable=too-many-statements # noqa: PL args[0].set_target_node(current_disruption=current_disruption) args[0].cluster.check_cluster_health() - num_nodes_before = len(args[0].cluster.nodes) + num_data_nodes_before = len(args[0].cluster.data_nodes) + num_zero_nodes_before = len(args[0].cluster.zero_nodes) start_time = time.time() args[0].log.debug('Start disruption at `%s`', datetime.datetime.fromtimestamp(start_time)) class_name = args[0].get_class_name() @@ -5390,10 +5397,14 @@ def wrapper(*args, **kwargs): # pylint: disable=too-many-statements # noqa: PL start_time), nemesis_event=nemesis_event) args[0].cluster.check_cluster_health() - num_nodes_after = len(args[0].cluster.nodes) - if num_nodes_before != num_nodes_after: - args[0].log.error('num nodes before %s and nodes after %s does not match' % - (num_nodes_before, num_nodes_after)) + num_data_nodes_after = len(args[0].cluster.data_nodes) + num_zero_nodes_after = len(args[0].cluster.zero_nodes) + if num_data_nodes_before != num_data_nodes_after: + args[0].log.error('num data nodes before %s and data nodes after %s does not match' % + (num_data_nodes_before, num_data_nodes_after)) + if args[0].cluster.params.get("use_zero_nodes") and num_zero_nodes_before != num_zero_nodes_after: + args[0].log.error('num zero nodes before %s and zero nodes after %s does not match' % + (num_zero_nodes_before, num_zero_nodes_after)) # TODO: Temporary print. Will be removed later data_validation_prints(args=args) finally: diff --git a/sdcm/scan_operation_thread.py b/sdcm/scan_operation_thread.py index cc2203f6bb..fa046a1500 100644 --- a/sdcm/scan_operation_thread.py +++ b/sdcm/scan_operation_thread.py @@ -109,7 +109,7 @@ def __init__(self, generator: random.Random, thread_params: ThreadParams, thread self.log.info("FullscanOperationBase init finished") def _get_random_node(self) -> BaseNode: - return self.generator.choice(self.fullscan_params.db_cluster.nodes) + return self.generator.choice(self.fullscan_params.db_cluster.data_nodes) @abstractmethod def randomly_form_cql_statement(self) -> str: diff --git a/sdcm/sla/sla_tests.py b/sdcm/sla/sla_tests.py index 58aba4bd3c..d3e4a5b755 100644 --- a/sdcm/sla/sla_tests.py +++ b/sdcm/sla/sla_tests.py @@ -52,7 +52,7 @@ def alter_sl_and_validate_scheduler_runtime(self, tester, service_level, new_sha try: service_level.alter(new_shares=new_shares) # Wait for SL update is propagated to all nodes - with adaptive_timeout(Operations.SERVICE_LEVEL_PROPAGATION, node=tester.db_cluster.nodes[0], timeout=15, + with adaptive_timeout(Operations.SERVICE_LEVEL_PROPAGATION, node=tester.db_cluster.data_nodes[0], timeout=15, service_level_for_test_step="ALTER_SERVICE_LEVEL"): self.wait_for_service_level_propagated(cluster=tester.db_cluster, service_level=service_level) start_time = time.time() + 60 @@ -178,7 +178,7 @@ def _create_sla_auth(self, session, db_cluster, shares: int, index: str, superus role = None try: role = create_sla_auth(session=session, shares=shares, index=index, superuser=superuser) - with adaptive_timeout(Operations.SERVICE_LEVEL_PROPAGATION, node=db_cluster.nodes[0], timeout=15, + with adaptive_timeout(Operations.SERVICE_LEVEL_PROPAGATION, node=db_cluster.data_nodes[0], timeout=15, service_level_for_test_step="INITIAL_FOR_TEST"): self.wait_for_service_level_propagated(cluster=db_cluster, service_level=role.attached_service_level) return role @@ -194,7 +194,7 @@ def _create_new_service_level(self, session, auth_entity_name_index, shares, db_ new_sl = ServiceLevel(session=session, name=SERVICE_LEVEL_NAME_TEMPLATE % (shares, auth_entity_name_index), shares=shares).create() - with adaptive_timeout(Operations.SERVICE_LEVEL_PROPAGATION, node=db_cluster.nodes[0], timeout=15, + with adaptive_timeout(Operations.SERVICE_LEVEL_PROPAGATION, node=db_cluster.data_nodes[0], timeout=15, service_level_for_test_step=service_level_for_test_step): self.wait_for_service_level_propagated(cluster=db_cluster, service_level=new_sl) return new_sl diff --git a/sdcm/tombstone_gc_verification_thread.py b/sdcm/tombstone_gc_verification_thread.py index 1aa1191170..d8158d86fe 100644 --- a/sdcm/tombstone_gc_verification_thread.py +++ b/sdcm/tombstone_gc_verification_thread.py @@ -87,7 +87,7 @@ def _run_tombstone_gc_verification(self): def run(self): end_time = time.time() + self.duration while time.time() < end_time and not self.termination_event.is_set(): - self._sstable_utils.db_node = random.choice(self.db_cluster.nodes) + self._sstable_utils.db_node = random.choice(self.db_cluster.data_nodes) self._run_tombstone_gc_verification() self.log.debug('Executed %s', TombstoneGcVerificationEvent.__name__) time.sleep(self.interval) diff --git a/sdcm/utils/compaction_ops.py b/sdcm/utils/compaction_ops.py index 2a85c24e61..38a0ac1c7c 100644 --- a/sdcm/utils/compaction_ops.py +++ b/sdcm/utils/compaction_ops.py @@ -40,7 +40,7 @@ class CompactionOps: def __init__(self, cluster: Union[BaseCluster, BaseScyllaCluster], node: Optional[BaseNode] = None): self.cluster = cluster - self.node = node if node else self.cluster.nodes[0] + self.node = node if node and not node._is_zero_token_node else self.cluster.data_nodes[0] self.storage_service_client = StorageServiceClient(node=self.node) def trigger_major_compaction(self, keyspace: str = "keyspace1", cf: str = "standard1") -> Result: diff --git a/sdcm/utils/sstable/sstable_utils.py b/sdcm/utils/sstable/sstable_utils.py index 9f801d0af8..c0fcc75ab6 100644 --- a/sdcm/utils/sstable/sstable_utils.py +++ b/sdcm/utils/sstable/sstable_utils.py @@ -26,7 +26,7 @@ def __init__(self, propagation_delay_in_seconds: int = 0, ks_cf: str = None, **kwargs): self.db_node = db_node self.db_cluster = self.db_node.parent_cluster if self.db_node else None - self.ks_cf = ks_cf or random.choice(self.db_cluster.get_non_system_ks_cf_list(self.db_cluster.nodes[0])) + self.ks_cf = ks_cf or random.choice(self.db_cluster.get_non_system_ks_cf_list(self.db_cluster.data_nodes[0])) self.keyspace, self.table = self.ks_cf.split('.') self.propagation_delay_in_seconds = propagation_delay_in_seconds self.log = logging.getLogger(self.__class__.__name__) diff --git a/unit_tests/test_nemesis_sisyphus.py b/unit_tests/test_nemesis_sisyphus.py index b4010ac5c9..22db260f30 100644 --- a/unit_tests/test_nemesis_sisyphus.py +++ b/unit_tests/test_nemesis_sisyphus.py @@ -27,6 +27,14 @@ class Cluster: def check_cluster_health(self): pass + @property + def data_nodes(self): + return self.nodes + + @property + def zero_nodes(self): + return self.nodes + @dataclass class FakeTester: