diff --git a/sdcm/cluster.py b/sdcm/cluster.py index 0a9694aa78..5461efeca9 100644 --- a/sdcm/cluster.py +++ b/sdcm/cluster.py @@ -4492,7 +4492,7 @@ def cfstat_reached_threshold(self, key, threshold, keyspaces=None): keyspaces = self.get_test_keyspaces() self.log.debug("Waiting for threshold: %s" % (threshold)) - node = self.nodes[0] + node = self.data_nodes[0] node_space = 0 # Calculate space on the disk of all test keyspaces on the one node. # It's decided to check the threshold on one node only @@ -5042,13 +5042,16 @@ def get_node_ip_list(verification_node): self.test_config.tester_obj().monitors.reconfigure_scylla_monitoring() def decommission(self, node: BaseNode, timeout: int | float = None) -> DataCenterTopologyRfControl | None: - with node.parent_cluster.cql_connection_patient(node) as session: - if tablets_enabled := is_tablets_feature_enabled(session): - dc_topology_rf_change = DataCenterTopologyRfControl(target_node=node) - dc_topology_rf_change.decrease_keyspaces_rf() + if not node._is_zero_token_node: + with node.parent_cluster.cql_connection_patient(node) as session: + if tablets_enabled := is_tablets_feature_enabled(session): + dc_topology_rf_change = DataCenterTopologyRfControl(target_node=node) + dc_topology_rf_change.decrease_keyspaces_rf() with adaptive_timeout(operation=Operations.DECOMMISSION, node=node): node.run_nodetool("decommission", timeout=timeout, long_running=True, retry=0) self.verify_decommission(node) + if node._is_zero_token_node: + return None return dc_topology_rf_change if tablets_enabled else None @property diff --git a/sdcm/nemesis.py b/sdcm/nemesis.py index ec58b82186..339059a1be 100644 --- a/sdcm/nemesis.py +++ b/sdcm/nemesis.py @@ -180,6 +180,39 @@ class DefaultValue: # pylint: disable=too-few-public-methods ... +def target_data_nodes(func): + @wraps(func) + def wrapper(*args, **kwargs): + try: + args[0].set_target_node_pool(args[0].cluster.data_nodes) + return func(*args, **kwargs) + finally: + args[0].set_target_node_pool(args[0].cluster.data_nodes) + return wrapper + + +def target_zero_nodes(func): + @wraps(func) + def wrapper(*args, **kwargs): + try: + args[0].set_target_node_pool(args[0].cluster.zero_nodes) + return func(*args, **kwargs) + finally: + args[0].set_target_node_pool(args[0].cluster.data_nodes) + return wrapper + + +def target_all_nodes(func): + @wraps(func) + def wrapper(*args, **kwargs): + try: + args[0].set_target_node_pool(args[0].cluster.nodes) + return func(*args, **kwargs) + finally: + args[0].set_target_node_pool(args[0].cluster.data_nodes) + return wrapper + + class Nemesis: # pylint: disable=too-many-instance-attributes,too-many-public-methods DISRUPT_NAME_PREF: str = "disrupt_" @@ -199,6 +232,7 @@ class Nemesis: # pylint: disable=too-many-instance-attributes,too-many-public-m free_tier_set: bool = False # nemesis should be run in FreeTierNemesisSet manager_operation: bool = False # flag that signals that the nemesis uses scylla manager delete_rows: bool = False # A flag denotes a nemesis deletes partitions/rows, generating tombstones. + zero_node_changes: bool = False def __init__(self, tester_obj, termination_event, *args, nemesis_selector=None, **kwargs): # pylint: disable=unused-argument for name, member in inspect.getmembers(self, lambda x: inspect.isfunction(x) or inspect.ismethod(x)): @@ -253,6 +287,7 @@ def __init__(self, tester_obj, termination_event, *args, nemesis_selector=None, } self.es_publisher = NemesisElasticSearchPublisher(self.tester) self._init_num_deletions_factor() + self._target_node_pool = self.cluster.data_nodes def _init_num_deletions_factor(self): # num_deletions_factor is a numeric divisor. It's a factor by which the available-partitions-for-deletion @@ -355,13 +390,22 @@ def unset_current_running_nemesis(node): with NEMESIS_TARGET_SELECTION_LOCK: node.running_nemesis = None + def set_target_node_pool(self, nodelist: list[BaseNode] | None = None): + """Set pool of nodes to choose target node """ + if not nodelist: + self._target_node_pool = self.cluster.data_nodes + else: + self._target_node_pool = nodelist + def _get_target_nodes( self, is_seed: Optional[Union[bool, DefaultValue]] = DefaultValue, dc_idx: Optional[int] = None, rack: Optional[int] = None) -> list: """ - Filters and return nodes in the cluster that has no running nemesis on them + Filters and return nodes from target node pool that has no running nemesis on them + Target node pool could be set with method 'set_target_node_pool' to use + only data_nodes, zero_token nodes or any set of nodes. It can filter node by following criteria: is_seed, dc_idx, rack Same mechanism works for other parameters, if multiple criteria provided it will return nodes that match all of them. @@ -373,7 +417,7 @@ def _get_target_nodes( """ if is_seed is DefaultValue: is_seed = False if self.filter_seed else None - nodes = [node for node in self.cluster.nodes if not node.running_nemesis] + nodes = [node for node in self._target_node_pool if not node.running_nemesis] if is_seed is not None: nodes = [node for node in nodes if node.is_seed == is_seed] if dc_idx is not None: @@ -471,6 +515,7 @@ def get_list_of_methods_compatible_with_backend( config_changes: Optional[bool] = None, free_tier_set: Optional[bool] = None, manager_operation: Optional[bool] = None, + zero_node_changes: Optional[bool] = None, ) -> List[str]: return self.get_list_of_methods_by_flags( disruptive=disruptive, @@ -483,6 +528,7 @@ def get_list_of_methods_compatible_with_backend( config_changes=config_changes, free_tier_set=free_tier_set, manager_operation=manager_operation, + zero_node_changes=zero_node_changes ) def _is_it_on_kubernetes(self) -> bool: @@ -502,6 +548,7 @@ def get_list_of_methods_by_flags( # pylint: disable=too-many-locals # noqa: PL free_tier_set: Optional[bool] = None, sla: Optional[bool] = None, manager_operation: Optional[bool] = None, + zero_node_changes: Optional[bool] = None, ) -> List[str]: subclasses_list = self._get_subclasses( disruptive=disruptive, @@ -629,6 +676,7 @@ def _kill_scylla_daemon(self): self.target_node.wait_jmx_up() self.cluster.wait_for_schema_agreement() + @target_all_nodes def disrupt_stop_wait_start_scylla_server(self, sleep_time=300): # pylint: disable=invalid-name self.target_node.stop_scylla_server(verify_up=False, verify_down=True) self.log.info("Sleep for %s seconds", sleep_time) @@ -1234,16 +1282,20 @@ def add_ldap_configuration_to_node(node): if not ContainerManager.is_running(self.tester.localhost, 'ldap'): raise LdapNotRunning("LDAP server was supposed to be running, but it is not") - def _replace_cluster_node(self, old_node_ip=None, host_id=None, - timeout=MAX_TIME_WAIT_FOR_NEW_NODE_UP, rack=0): + def _replace_cluster_node(self, old_node_ip: str | None = None, host_id: str | None = None, + timeout: int | float = MAX_TIME_WAIT_FOR_NEW_NODE_UP, rack=0, is_zero_node: bool = False) -> BaseNode: """When old_node_ip or host_id are not None then replacement node procedure is initiated""" # TODO: make it work on K8S when we have decommissioned (by nodetool) nodes. # Now it will fail because pod which hosts decommissioned Scylla member is reported # as 'NotReady' and will fail the pod waiter function. self.log.info("Adding new node to cluster...") InfoEvent(message='StartEvent - Adding new node to cluster').publish() - new_node = skip_on_capacity_issues(self.cluster.add_nodes)( - count=1, dc_idx=self.target_node.dc_idx, enable_auto_bootstrap=True, rack=rack)[0] + if is_zero_node: + new_node = skip_on_capacity_issues(self.cluster.add_nodes)( + count=1, dc_idx=self.target_node.dc_idx, enable_auto_bootstrap=True, rack=rack, is_zero_node=is_zero_node)[0] + else: + new_node = skip_on_capacity_issues(self.cluster.add_nodes)( + count=1, dc_idx=self.target_node.dc_idx, enable_auto_bootstrap=True, rack=rack)[0] self.monitoring_set.reconfigure_scylla_monitoring() self.set_current_running_nemesis(node=new_node) # prevent to run nemesis on new node when running in parallel @@ -1274,14 +1326,22 @@ def _replace_cluster_node(self, old_node_ip=None, host_id=None, InfoEvent(message="FinishEvent - New Node is up and normal").publish() return new_node - def _add_and_init_new_cluster_nodes(self, count, timeout=MAX_TIME_WAIT_FOR_NEW_NODE_UP, rack=None, instance_type: str = None) -> list[BaseNode]: + def _add_and_init_new_cluster_nodes(self, count, timeout=MAX_TIME_WAIT_FOR_NEW_NODE_UP, rack=None, instance_type: str = None, is_zero_node: bool = False) -> list[BaseNode]: if rack is None and self._is_it_on_kubernetes(): rack = 0 self.log.info("Adding %s new nodes to cluster...", count) InfoEvent(message=f'StartEvent - Adding {count} new nodes to cluster').publish() - new_nodes = skip_on_capacity_issues(self.cluster.add_nodes)( - count=count, dc_idx=self.target_node.dc_idx, enable_auto_bootstrap=True, rack=rack, - instance_type=instance_type) + add_node_func_args = {"count": count, + "dc_idx": self.target_node.dc_idx, + "enable_auto_bootstrap": True, + "rack": rack, + "instance_type": instance_type + } + if is_zero_node: + instance_type = self.cluster.params.get("zero_token_instance_type_db") or instance_type + add_node_func_args.update({"is_zero_node": is_zero_node, "instance_type": instance_type}) + + new_nodes = skip_on_capacity_issues(self.cluster.add_nodes)(**add_node_func_args) self.monitoring_set.reconfigure_scylla_monitoring() for new_node in new_nodes: self.set_current_running_nemesis(node=new_node) @@ -1358,8 +1418,8 @@ def _terminate_and_wait(self, target_node, sleep_time=300): time.sleep(sleep_time) # Sleeping for 5 mins to let the cluster live with a missing node for a while @latency_calculator_decorator(legend="Replace a node in cluster with new one") - def replace_node(self, old_node_ip, host_id, rack=0): - return self._replace_cluster_node(old_node_ip, host_id, rack=rack) + def replace_node(self, old_node_ip: str, host_id: str, rack: int = 0, is_zero_node: bool = False) -> BaseNode: + return self._replace_cluster_node(old_node_ip, host_id, rack=rack, is_zero_node=is_zero_node) def _verify_resharding_on_k8s(self, cpus, dc_idx): nodes_data = [] @@ -1576,6 +1636,7 @@ def _kubernetes_wait_till_node_up_after_been_recreated(self, node, old_uid=None) self.log.info('Wait till %s is ready', node) node.wait_for_pod_readiness() + @target_all_nodes def disrupt_terminate_and_replace_node(self): # pylint: disable=invalid-name def get_node_state(node_ip: str) -> List["str"] | None: @@ -1596,7 +1657,8 @@ def get_node_state(node_ip: str) -> List["str"] | None: self._terminate_and_wait(target_node=self.target_node) assert get_node_state(old_node_ip) == "DN", "Removed node state should be DN" InfoEvent(message='FinishEvent - target_node was terminated').publish() - new_node = self.replace_node(old_node_ip, host_id, rack=self.target_node.rack) + new_node = self.replace_node(old_node_ip, host_id, rack=self.target_node.rack, + is_zero_node=self.target_node._is_zero_token_node) try: if new_node.get_scylla_config_param("enable_repair_based_node_ops") == 'false': InfoEvent(message='StartEvent - Run repair on new node').publish() @@ -1624,6 +1686,7 @@ def wait_for_old_node_to_removed(): self.cluster.update_seed_provider() @decorate_with_context(ignore_ycsb_connection_refused) + @target_all_nodes def disrupt_kill_scylla(self): self._kill_scylla_daemon() @@ -2868,7 +2931,7 @@ def set_new_twcs_settings(settings: Dict[str, Any]) -> Dict[str, Any]: self.log.error("Number of sstables after change settings larger than before") # run major compaction on all nodes # to reshape sstables on other nodes - for node in self.cluster.nodes: + for node in self.cluster.data_nodes: num_sstables_before_change = len(node.get_list_of_sstables(keyspace, table, suffix="-Data.db")) node.run_nodetool("compact", args=f"{keyspace} {table}") num_sstables_after_change = len(node.get_list_of_sstables(keyspace, table, suffix="-Data.db")) @@ -2889,15 +2952,18 @@ def disrupt_modify_table(self): disrupt_func = getattr(self, disrupt_func_name) disrupt_func() + @target_data_nodes def disrupt_mgmt_backup_specific_keyspaces(self): self._mgmt_backup(backup_specific_tables=True) + @target_data_nodes def disrupt_mgmt_backup(self): self._mgmt_backup(backup_specific_tables=False) + @target_data_nodes def disrupt_mgmt_restore(self): def get_total_scylla_partition_size(): - result = self.cluster.nodes[0].remoter.run("df -k | grep /var/lib/scylla") # Size in KB + result = self.cluster.data_nodes[0].remoter.run("df -k | grep /var/lib/scylla") # Size in KB free_space_size = int(result.stdout.split()[1]) / 1024 ** 2 # Converting to GB return free_space_size @@ -3138,6 +3204,7 @@ def repair_streaming_exists(): self.log.debug("Execute a complete repair for target node") self.repair_nodetool_repair() + @target_data_nodes def disrupt_validate_hh_short_downtime(self): # pylint: disable=invalid-name """ Validates that hinted handoff mechanism works: there were no drops and errors @@ -3230,6 +3297,7 @@ def is_virtual_tables_get_snapshot(): f"Expected content: {sorted(keyspace_table)} \n " f"Actual snapshot content: {sorted(snapshot_content_list)}") + @target_data_nodes def disrupt_snapshot_operations(self): # pylint: disable=too-many-statements """ Extend this nemesis to run 'nodetool snapshot' more options including multiple tables. @@ -3518,6 +3586,7 @@ def disrupt_network_block(self): self.target_node.traffic_control(None) self.cluster.wait_all_nodes_un() + @target_data_nodes def disrupt_remove_node_then_add_node(self): # pylint: disable=too-many-branches """ https://docs.scylladb.com/operating-scylla/procedures/cluster-management/remove_node/ @@ -3937,7 +4006,7 @@ def decommission_post_action(): terminate_pattern.timeout): stack.enter_context(expected_start_failed_context) with ignore_stream_mutation_fragments_errors(), ignore_raft_topology_cmd_failing(), \ - self.run_nemesis(node_list=self.cluster.nodes, nemesis_label="DecommissionStreamingErr") as verification_node, \ + self.run_nemesis(node_list=self.cluster.data_nodes, nemesis_label="DecommissionStreamingErr") as verification_node, \ FailedDecommissionOperationMonitoring(target_node=self.target_node, verification_node=verification_node, timeout=full_operations_timeout): @@ -4101,6 +4170,7 @@ def _decommission_nodes(self, nodes_number, rack, is_seed: Optional[Union[bool, if self._is_it_on_kubernetes(): if rack is None and self._is_it_on_kubernetes(): rack = 0 + self.set_target_node_pool(self.cluster.data_nodes) self.set_target_node(rack=rack, is_seed=is_seed, allow_only_last_node_in_rack=True) else: rack_idx = rack if rack is not None else idx % self.cluster.racks_count @@ -4127,6 +4197,7 @@ def _double_cluster_load(self, duration: int) -> None: results = self.tester.get_stress_results(queue=stress_queue, store_results=False) self.log.info(f"Double load results: {results}") + @target_data_nodes def disrupt_grow_shrink_cluster(self): sleep_time_between_ops = self.cluster.params.get('nemesis_sequence_sleep_between_ops') if not self.has_steady_run and sleep_time_between_ops: @@ -4155,7 +4226,7 @@ def _grow_cluster(self, rack=None): if rack is None and self._is_it_on_kubernetes(): rack = 0 add_nodes_number = self.tester.params.get('nemesis_add_node_cnt') - InfoEvent(message=f"Start grow cluster by {add_nodes_number} nodes").publish() + InfoEvent(message=f"Start grow cluster by {add_nodes_number} data nodes").publish() new_nodes = [] if self.cluster.parallel_node_operations: new_nodes = self.add_new_nodes(count=add_nodes_number, rack=rack, @@ -4174,7 +4245,7 @@ def _shrink_cluster(self, rack=None, new_nodes: list[BaseNode] | None = None): add_nodes_number = self.tester.params.get('nemesis_add_node_cnt') InfoEvent(message=f'Start shrink cluster by {add_nodes_number} nodes').publish() # Check that number of nodes is enough for decommission: - cur_num_nodes_in_dc = len([n for n in self.cluster.nodes if n.dc_idx == self.target_node.dc_idx]) + cur_num_nodes_in_dc = len([n for n in self.cluster.data_nodes if n.dc_idx == self.target_node.dc_idx]) initial_db_size = self.tester.params.get("n_db_nodes") if self._is_it_on_kubernetes(): initial_db_size = self.tester.params.get("k8s_n_scylla_pods_per_cluster") or initial_db_size @@ -4201,9 +4272,9 @@ def _shrink_cluster(self, rack=None, new_nodes: list[BaseNode] | None = None): dc_idx=self.target_node.dc_idx, exact_nodes=new_nodes, ) - num_of_nodes = len(self.cluster.nodes) - self.log.info("Cluster shrink finished. Current number of nodes %s", num_of_nodes) - InfoEvent(message=f'Cluster shrink finished. Current number of nodes {num_of_nodes}').publish() + num_of_nodes = len(self.cluster.data_nodes) + self.log.info("Cluster shrink finished. Current number of data nodes %s", num_of_nodes) + InfoEvent(message=f'Cluster shrink finished. Current number of data nodes {num_of_nodes}').publish() # TODO: add support for the 'LocalFileSystemKeyProviderFactory' and 'KmipKeyProviderFactory' key providers # TODO: add encryption for a table with large partitions? @@ -4578,9 +4649,13 @@ def _verify_cdc_feature_status(self, keyspace: str, table: str, cdc_settings: di assert actual_cdc_settings == cdc_settings, \ f"CDC extension settings are differs. Current: {actual_cdc_settings} expected: {cdc_settings}" - def _add_new_node_in_new_dc(self) -> BaseNode: - new_node = skip_on_capacity_issues(self.cluster.add_nodes)( - 1, dc_idx=0, enable_auto_bootstrap=True)[0] # add node + def _add_new_node_in_new_dc(self, is_zero_node=False) -> BaseNode: + if is_zero_node: + new_node = skip_on_capacity_issues(self.cluster.add_nodes)( + 1, dc_idx=0, enable_auto_bootstrap=True, is_zero_node=is_zero_node)[0] # add node + else: + new_node = skip_on_capacity_issues(self.cluster.add_nodes)( + 1, dc_idx=0, enable_auto_bootstrap=True)[0] # add node with new_node.remote_scylla_yaml() as scylla_yml: scylla_yml.rpc_address = new_node.ip_address scylla_yml.seed_provider = [SeedProvider(class_name='org.apache.cassandra.locator.SimpleSeedProvider', @@ -4664,7 +4739,7 @@ def finalizer(exc_type, *_): with temporary_replication_strategy_setter(node) as replication_strategy_setter: new_node = self._add_new_node_in_new_dc() node_added = True - status = self.tester.db_cluster.get_nodetool_status() + status = self.tester.db_cluster.get_node() new_dc_list = [dc for dc in list(status.keys()) if dc.endswith("_nemesis_dc")] assert new_dc_list, "new datacenter was not registered" new_dc_name = new_dc_list[0] @@ -5081,6 +5156,7 @@ def _disrupt_toggle_audit(self, store: AuditStore): if errors: raise AuditLogTestFailure("\n".join(errors)) + @target_data_nodes def disrupt_bootstrap_streaming_error(self): """Abort bootstrap process at different point @@ -5335,6 +5411,8 @@ def wrapper(*args, **kwargs): # pylint: disable=too-many-statements # noqa: PL # gets killed/aborted. So, use safe 'pop' call with the default 'None' value. NEMESIS_RUN_INFO.pop(nemesis_run_info_key, None) + args[0].set_target_node_pool(args[0].cluster.data_nodes) + return result return wrapper diff --git a/sdcm/utils/replication_strategy_utils.py b/sdcm/utils/replication_strategy_utils.py index a463e69045..c98f4f21cc 100644 --- a/sdcm/utils/replication_strategy_utils.py +++ b/sdcm/utils/replication_strategy_utils.py @@ -133,6 +133,9 @@ class DataCenterTopologyRfControl: - In scenarios where a keyspace has an RF equal to the total number of nodes in a data center, decommissioning a node is not supported where tablets are used. - This class provides functionality to temporarily decrease the RF of such keyspaces before a node decommissioning operation and revert them back to their original RF after a new node is added. + **Notes**: + - zero token nodes should be ignored when counting RF, because the zero token nodes are not used in replication and doesn't store user data + **Usage**: 1. **`decrease_keyspaces_rf`**: Identifies keyspaces with RF equal to the total number of nodes in the data center and decreases their RF by 1. This is necessary so decommissioning a node is allowed (with tablets). 2. **`revert_to_original_keyspaces_rf`**: Reverts the RF of the keyspaces back to their original values after a new node is added to the data center. @@ -153,7 +156,7 @@ def __init__(self, target_node: 'BaseNode') -> None: def _get_original_nodes_number(self, node: 'BaseNode') -> int: # Get the original number of nodes in the data center - return len([n for n in self.cluster.nodes if n.dc_idx == node.dc_idx]) + return len([n for n in self.cluster.data_nodes if n.dc_idx == node.dc_idx]) def _get_keyspaces_to_decrease_rf(self, session) -> list: """ @@ -207,7 +210,7 @@ def _alter_keyspace_rf(self, keyspace: str, replication_factor: int, session): def revert_to_original_keyspaces_rf(self, node_to_wait_for_balance: 'BaseNode' = None): if self.decreased_rf_keyspaces: LOGGER.debug(f"Reverting keyspaces replication factor to original value of {self.datacenter}..") - with self.cluster.cql_connection_patient(self.cluster.nodes[0]) as session: + with self.cluster.cql_connection_patient(self.cluster.data_nodes[0]) as session: for keyspace in self.decreased_rf_keyspaces: self._alter_keyspace_rf(keyspace=keyspace, replication_factor=self.original_nodes_number, session=session) diff --git a/unit_tests/test_cluster.py b/unit_tests/test_cluster.py index 42b0c27d89..91c2918c30 100644 --- a/unit_tests/test_cluster.py +++ b/unit_tests/test_cluster.py @@ -534,6 +534,7 @@ def test_datacenter_name_per_region(self): # pylint: disable=no-self-use node2 = NodetoolDummyNode(resp=resp, myregion="east-us", myname="10.0.198.153") node3 = NodetoolDummyNode(resp=resp, myregion="west-us", myname='10.1.59.34') db_cluster = DummyScyllaCluster([node1, node2, node3]) + setattr(db_cluster, "params", {"use_zero_nodes": False}) node1.parent_cluster = node2.parent_cluster = node3.parent_cluster = db_cluster datacenter_name_per_region = db_cluster.get_datacenter_name_per_region() assert datacenter_name_per_region == {'east-us': 'eastus', 'west-us': 'westus'} @@ -561,6 +562,7 @@ def test_get_rack_names_per_datacenter_and_rack_idx(self): # pylint: disable=no node2 = NodetoolDummyNode(resp=resp, myregion="east-us", myname="10.0.198.153", myrack=1) node3 = NodetoolDummyNode(resp=resp, myregion="west-us", myname='10.1.59.34', myrack=2) db_cluster = DummyScyllaCluster([node1, node2, node3]) + setattr(db_cluster, "params", {"use_zero_nodes": False}) node1.parent_cluster = node2.parent_cluster = node3.parent_cluster = db_cluster datacenter_name_per_region = db_cluster.get_rack_names_per_datacenter_and_rack_idx() assert datacenter_name_per_region == {('east-us', '1'): '1a', ('west-us', '2'): '2a'} diff --git a/unit_tests/test_nemesis.py b/unit_tests/test_nemesis.py index dbcdab44f9..63ffaebfa9 100644 --- a/unit_tests/test_nemesis.py +++ b/unit_tests/test_nemesis.py @@ -38,6 +38,14 @@ class Cluster: def check_cluster_health(self): pass + @property + def data_nodes(self): + return self.nodes + + @property + def zero_nodes(self): + return self.nodes + @dataclass class FakeTester: @@ -119,26 +127,32 @@ def test_is_it_on_kubernetes(): class FakeLocalMinimalScyllaPodCluster(LocalMinimalScyllaPodCluster): def __init__(self, params: dict = None): self.params = params + self.nodes = [] class FakeGkeScyllaPodCluster(GkeScyllaPodCluster): def __init__(self, params: dict = None): self.params = params + self.nodes = [] class FakeEksScyllaPodCluster(EksScyllaPodCluster): def __init__(self, params: dict = None): self.params = params + self.nodes = [] class FakeScyllaGCECluster(ScyllaGCECluster): def __init__(self, params: dict = None): self.params = params + self.nodes = [] class FakeScyllaAWSCluster(ScyllaAWSCluster): def __init__(self, params: dict = None): self.params = params + self.nodes = [] class FakeScyllaDockerCluster(ScyllaDockerCluster): def __init__(self, params: dict = None): self.params = params + self.nodes = [] params = {'nemesis_interval': 10, 'nemesis_filter_seeds': 1}