diff --git a/add_new_dc_test.py b/add_new_dc_test.py index f6e357829c8..dbb2d1d510e 100644 --- a/add_new_dc_test.py +++ b/add_new_dc_test.py @@ -2,14 +2,17 @@ from typing import Tuple, List from cassandra import ConsistencyLevel -from cassandra.query import SimpleStatement # pylint: disable=no-name-in-module +from cassandra.query import SimpleStatement +import pytest # pylint: disable=no-name-in-module from longevity_test import LongevityTest -from sdcm.cluster import BaseNode +from sdcm.cluster import BaseNode, NodeSetupFailed, NodeSetupTimeout from sdcm.stress_thread import CassandraStressThread from sdcm.utils.common import skip_optional_stage -from sdcm.utils.decorators import optional_stage +from sdcm.utils.decorators import optional_stage, skip_on_capacity_issues from sdcm.utils.replication_strategy_utils import NetworkTopologyReplicationStrategy +from sdcm.exceptions import ReadBarrierErrorException +from sdcm.utils.adaptive_timeouts import Operations, adaptive_timeout warnings.filterwarnings(action="ignore", message="unclosed", category=ResourceWarning) @@ -63,6 +66,94 @@ def test_add_new_dc(self) -> None: # pylint: disable=too-many-locals self.verify_data_can_be_read_from_new_dc(new_node) self.log.info("Test completed.") + def test_add_new_dc_with_zero_nodes(self): + self.log.info("Starting add new DC with zero nodes test...") + assert self.params.get('n_db_nodes').endswith(" 0"), "n_db_nodes must be a list and last dc must equal 0" + system_keyspaces = ["system_distributed", "system_traces"] + # auth-v2 is used when consistent topology is enabled + if not self.db_cluster.nodes[0].raft.is_consistent_topology_changes_enabled: + system_keyspaces.insert(0, "system_auth") + + self.prewrite_db_with_data() + + status = self.db_cluster.get_nodetool_status() + + self.reconfigure_keyspaces_to_use_network_topology_strategy( + keyspaces=system_keyspaces + ["keyspace1"], + replication_factors={dc: len(status[dc].keys()) for dc in status} + ) + + self.log.info("Running repair on all nodes") + for node in self.db_cluster.nodes: + node.run_nodetool(sub_cmd="repair -pr", publish_event=True) + + # Stop all nodes in 1st dc and check that raft quorum is lost + + status = self.db_cluster.get_nodetool_status() + nodes_to_region = self.db_cluster.nodes_by_region(nodes=self.db_cluster.data_nodes) + regions = list(nodes_to_region.keys()) + target_dc_name = regions[0] + alive_dc_name = regions[1] + for node in nodes_to_region[target_dc_name]: + node.stop_scylla() + + node = nodes_to_region[alive_dc_name][0] + with pytest.raises(ReadBarrierErrorException): + node.raft.call_read_barrier() + + # Start all nodes in 1st dc + for node in nodes_to_region[target_dc_name]: + node.start_scylla() + + self.db_cluster.wait_all_nodes_un() + + # Add new dc with zero node only + new_node = self.add_zero_node_in_new_dc() + + status = self.db_cluster.get_nodetool_status() + node_host_ids = [] + node_for_termination = [] + + nodes_to_region = self.db_cluster.nodes_by_region(nodes=self.db_cluster.data_nodes) + regions = list(nodes_to_region.keys()) + target_dc_name = regions[0] + for node in nodes_to_region[target_dc_name]: + node_host_ids.append(node.host_id) + node_for_termination.append(node) + node.stop_scylla() + + # check that raft quorum is not lost + new_node.raft.call_read_barrier() + # restore dc1 + new_node.run_nodetool( + sub_cmd=f"removenode {node_host_ids[0]} --ignore-dead-nodes {','.join(node_host_ids[1:])}") + + self.replace_cluster_node(new_node, + node_host_ids[1], + nodes_to_region[target_dc_name][-1].dc_idx, + dead_node_hostids=node_host_ids[2]) + + self.replace_cluster_node(new_node, + node_host_ids[2], + nodes_to_region[target_dc_name][-1].dc_idx) + + # bootstrap new node in 1st dc + new_data_node = self.add_node_in_new_dc(nodes_to_region[target_dc_name][-1].dc_idx, 3) + for node in node_for_termination: + self.db_cluster.terminate_node(node) + + self.db_cluster.wait_all_nodes_un() + status = self.db_cluster.get_nodetool_status() + self.log.info("Running rebuild in restored DC") + new_data_node.run_nodetool(sub_cmd=f"rebuild -- {list(status.keys())[-1]}", publish_event=True) + + self.log.info("Running repair on all nodes") + for node in self.db_cluster.nodes: + node.run_nodetool(sub_cmd="repair -pr", publish_event=True) + + self.verify_data_can_be_read_from_new_dc(new_data_node) + self.log.info("Test completed.") + def reconfigure_keyspaces_to_use_network_topology_strategy(self, keyspaces: List[str], replication_factors: dict[str, int]) -> None: node = self.db_cluster.nodes[0] self.log.info("Reconfiguring keyspace Replication Strategy") @@ -89,22 +180,37 @@ def start_stress_during_adding_new_dc(self) -> Tuple[CassandraStressThread, Cass self.log.info("Stress during adding DC started") return read_thread, write_thread - def add_node_in_new_dc(self) -> BaseNode: + def add_node_in_new_dc(self, dc_idx: int = 0, num_of_dc: int = 2) -> BaseNode: self.log.info("Adding new node") - new_node = self.db_cluster.add_nodes(1, dc_idx=1, enable_auto_bootstrap=True)[0] # add node + new_node = self.db_cluster.add_nodes(1, dc_idx=dc_idx, enable_auto_bootstrap=True)[0] # add node self.db_cluster.wait_for_init(node_list=[new_node], timeout=900, check_node_health=False) self.db_cluster.wait_for_nodes_up_and_normal(nodes=[new_node]) self.monitors.reconfigure_scylla_monitoring() status = self.db_cluster.get_nodetool_status() - assert len(status.keys()) == 2, f"new datacenter was not registered. Cluster status: {status}" + assert len(status.keys()) == num_of_dc, f"new datacenter was not registered. Cluster status: {status}" + self.log.info("New DC to cluster has been added") + return new_node + + def add_zero_node_in_new_dc(self) -> BaseNode: + if not self.params.get("use_zero_nodes"): + raise Exception("Zero node support should be enabled") + self.log.info("Adding new node") + new_node = self.db_cluster.add_nodes(1, dc_idx=2, enable_auto_bootstrap=True, is_zero_node=True)[0] # add node + self.db_cluster.wait_for_init(node_list=[new_node], timeout=900, + check_node_health=True) + self.db_cluster.wait_for_nodes_up_and_normal(nodes=[new_node]) + self.monitors.reconfigure_scylla_monitoring() + + status = self.db_cluster.get_nodetool_status() + assert len(status.keys()) == 3, f"new datacenter was not registered. Cluster status: {status}" self.log.info("New DC to cluster has been added") return new_node @optional_stage('post_test_load') def verify_data_can_be_read_from_new_dc(self, new_node: BaseNode) -> None: - self.log.info("Veryfing if data has been transferred successfully to the new DC") + self.log.info("Verifying if data has been transferred successfully to the new DC") stress_cmd = self.params.get('verify_data_after_entire_test') + f" -node {new_node.ip_address}" end_stress = self.run_stress_thread(stress_cmd=stress_cmd, stats_aggregate_cmds=False, round_robin=False) self.verify_stress_thread(cs_thread_pool=end_stress) @@ -117,3 +223,40 @@ def querying_new_node_should_return_no_data(self, new_node: BaseNode) -> None: fetch_size=10) data = session.execute(statement).one() assert not data, f"no data should be returned when querying with CL=LOCAL_QUORUM and RF=0. {data}" + + def replace_cluster_node(self, verification_node: BaseNode, + host_id: str | None = None, + dc_idx: int = 0, + dead_node_hostids: str = "", + timeout: int | float = 3600 * 8) -> BaseNode: + """When old_node_ip or host_id are not None then replacement node procedure is initiated""" + self.log.info("Adding new node to cluster...") + new_node: BaseNode = skip_on_capacity_issues(self.db_cluster.add_nodes)( + count=1, dc_idx=dc_idx, enable_auto_bootstrap=True)[0] + self.monitors.reconfigure_scylla_monitoring() + with new_node.remote_scylla_yaml() as scylla_yaml: + scylla_yaml.ignore_dead_nodes_for_replace = dead_node_hostids + # since we need this logic before starting a node, and in `use_preinstalled_scylla: false` case + # scylla is not yet installed or target node was terminated, we should use an alive node without nemesis for version, + # it should be up and with scylla executable available + + new_node.replacement_host_id = host_id + + try: + with adaptive_timeout(Operations.NEW_NODE, node=verification_node, timeout=timeout): + self.db_cluster.wait_for_init(node_list=[new_node], timeout=timeout, check_node_health=False) + self.db_cluster.clean_replacement_node_options(new_node) + self.db_cluster.set_seeds() + self.db_cluster.update_seed_provider() + except (NodeSetupFailed, NodeSetupTimeout): + self.log.warning("TestConfig of the '%s' failed, removing it from list of nodes" % new_node) + self.db_cluster.nodes.remove(new_node) + self.log.warning("Node will not be terminated. Please terminate manually!!!") + raise + + self.db_cluster.wait_for_nodes_up_and_normal(nodes=[new_node]) + new_node.wait_node_fully_start() + with new_node.remote_scylla_yaml() as scylla_yaml: + scylla_yaml.ignore_dead_nodes_for_replace = "" + + return new_node diff --git a/jenkins-pipelines/oss/features/features-add-new-dc-with-zero-nodes.jenkinsfile b/jenkins-pipelines/oss/features/features-add-new-dc-with-zero-nodes.jenkinsfile new file mode 100644 index 00000000000..04abeada9fc --- /dev/null +++ b/jenkins-pipelines/oss/features/features-add-new-dc-with-zero-nodes.jenkinsfile @@ -0,0 +1,13 @@ +#!groovy + +// trick from https://github.com/jenkinsci/workflow-cps-global-lib-plugin/pull/43 +def lib = library identifier: 'sct@snapshot', retriever: legacySCM(scm) + +longevityPipeline( + backend: 'aws', + region: '''["eu-west-1", "eu-west-2", "eu-north-1"]''', + test_name: 'add_new_dc_test.TestAddNewDc.test_add_new_dc_with_zero_nodes', + test_config: 'test-cases/features/add-new-dc-with-zero-nodes.yaml', + + timeout: [time: 120, unit: 'MINUTES'] +) diff --git a/sdcm/exceptions.py b/sdcm/exceptions.py index 1c0475fb35b..0461b35a930 100644 --- a/sdcm/exceptions.py +++ b/sdcm/exceptions.py @@ -83,3 +83,8 @@ class SstablesNotFound(Exception): class CapacityReservationError(Exception): pass + + +class ReadBarrierErrorException(Exception): + """Raise exception if read barrier call failed""" + pass diff --git a/sdcm/provision/scylla_yaml/scylla_yaml.py b/sdcm/provision/scylla_yaml/scylla_yaml.py index 4d3845395e2..ede050a9797 100644 --- a/sdcm/provision/scylla_yaml/scylla_yaml.py +++ b/sdcm/provision/scylla_yaml/scylla_yaml.py @@ -265,6 +265,7 @@ def set_authorizer(cls, authorizer: str): replace_address: str = None # "" replace_address_first_boot: str = None # "" replace_node_first_boot: str = None # "" + ignore_dead_nodes_for_replace: str = None # "" override_decommission: bool = None # False enable_repair_based_node_ops: bool = None # True # NOTE: example for disabling RBNO for 'bootstrap' and 'decommission' operations: diff --git a/sdcm/rest/raft_api.py b/sdcm/rest/raft_api.py new file mode 100644 index 00000000000..9fd6ed8b2ac --- /dev/null +++ b/sdcm/rest/raft_api.py @@ -0,0 +1,26 @@ +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See LICENSE for more details. +# +# Copyright (c) 2024 ScyllaDB +from sdcm.rest.remote_curl_client import RemoteCurlClient + + +class RaftApi(RemoteCurlClient): + """Raft api commands""" + + def __init__(self, node: "BaseNode"): # noqa: F821 + super().__init__(host="localhost:10000", endpoint="raft", node=node) + + def read_barrier(self, group_id: str, timeout: int = 60) -> str: + path = f"read_barrier?group_id={group_id}&timeout={timeout}" + return self.run_remoter_curl(method="POST", + path=path, + params={}, timeout=timeout + 30).stdout.strip() diff --git a/sdcm/utils/raft/__init__.py b/sdcm/utils/raft/__init__.py index 8a60765684b..9b1de82bb5f 100644 --- a/sdcm/utils/raft/__init__.py +++ b/sdcm/utils/raft/__init__.py @@ -1,6 +1,7 @@ import contextlib import logging import random +import json from enum import Enum from abc import ABC, abstractmethod @@ -13,6 +14,9 @@ from sdcm.utils.features import is_consistent_topology_changes_feature_enabled, is_consistent_cluster_management_feature_enabled from sdcm.utils.health_checker import HealthEventsGenerator from sdcm.wait import wait_for +from sdcm.rest.raft_api import RaftApi +from sdcm.exceptions import ReadBarrierErrorException + LOGGER = logging.getLogger(__name__) RAFT_DEFAULT_SCYLLA_VERSION = "5.5.0-dev" @@ -137,6 +141,9 @@ def get_all_messages_timeouts(self, operation: TopologyOperations): log_patterns = self.TOPOLOGY_OPERATION_LOG_PATTERNS.get(operation) return list(map(self.get_message_waiting_timeout, log_patterns)) + def call_read_barrier(self): + ... + class RaftFeature(RaftFeatureOperations): TOPOLOGY_OPERATION_LOG_PATTERNS: dict[TopologyOperations, Iterable[MessagePosition]] = { @@ -177,14 +184,10 @@ def get_group0_members(self) -> list[dict[str, str]]: group0_members = [] try: with self._node.parent_cluster.cql_connection_patient_exclusive(node=self._node) as session: - row = session.execute("select value from system.scylla_local where key = 'raft_group0_id'").one() - if not row: - return [] - raft_group0_id = row.value - + raft_group0_id = self.get_group0_id(session) + assert raft_group0_id, "Group0 id was not found" rows = session.execute(f"select server_id, can_vote from system.raft_state \ where group_id = {raft_group0_id} and disposition = 'CURRENT'").all() - for row in rows: group0_members.append({"host_id": str(row.server_id), "voter": row.can_vote}) @@ -347,6 +350,37 @@ def check_group0_tokenring_consistency( LOGGER.debug("Group0 and token-ring are consistent on node %s (host_id=%s)...", self._node.name, self._node.host_id) + def get_group0_id(self, session) -> str: + try: + row = session.execute("select value from system.scylla_local where key = 'raft_group0_id'").one() + return row.value + except Exception as exc: # pylint: disable=broad-except # noqa: BLE001 + err_msg = f"Get group0 members failed with error: {exc}" + LOGGER.error(err_msg) + return "" + + def call_read_barrier(self, timeout: int = 60): + """ Wait until the node applies all previously committed Raft entries + + Any schema/topology changes are committed with Raft group0 on node. Before + change is written to node, raft group0 checks that all previous schema/ + topology changes were applied. Raft group0 triggers read_barrier on node, and + node applies all previous changes(commits) before new schema/operation + write will be applied. After read barrier finished, it guarantees that + node has all schema/topology changes, which was done in cluster before + read_barrier started on node. + """ + with self._node.parent_cluster.cql_connection_patient_exclusive(node=self._node) as session: + raft_group0_id = self.get_group0_id(session) + assert raft_group0_id, "Group0 id was not found" + api = RaftApi(self._node) + result = api.read_barrier(group_id=raft_group0_id, timeout=timeout) + LOGGER.debug("Api response %s", result) + if not result: + return + status = json.loads(result) + raise ReadBarrierErrorException(f"Error code: {status['code']}, Error message: {status['message']}") + class NoRaft(RaftFeatureOperations): TOPOLOGY_OPERATION_LOG_PATTERNS = { @@ -404,6 +438,9 @@ def check_group0_tokenring_consistency( tokenring_members: list[dict[str, str]]) -> None: LOGGER.debug("Raft feature is disabled on node %s (host_id=%s)", self._node.name, self._node.host_id) + def call_read_barrier(self): + ... + def get_raft_mode(node) -> RaftFeature | NoRaft: with node.parent_cluster.cql_connection_patient(node) as session: @@ -440,5 +477,5 @@ def get_node_status_from_system_by(verification_node: "BaseNode", *, ip_address: __all__ = ["get_raft_mode", "get_node_status_from_system_by", - "Group0MembersNotConsistentWithTokenRingMembersException", + "Group0MembersNotConsistentWithTokenRingMembersException", "RestApiError", ] diff --git a/test-cases/features/add-new-dc-with-zero-node-in-multidc.yaml b/test-cases/features/add-new-dc-with-zero-node-in-multidc.yaml new file mode 100644 index 00000000000..c7285b2b34a --- /dev/null +++ b/test-cases/features/add-new-dc-with-zero-node-in-multidc.yaml @@ -0,0 +1,16 @@ +prepare_write_cmd: "cassandra-stress write cl=QUORUM n=20900 -schema 'replication(strategy=NetworkTopologyStrategy,replication_factor=3) compaction(strategy=SizeTieredCompactionStrategy)' -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5" + +stress_cmd: ["cassandra-stress read cl=LOCAL_QUORUM duration=20m -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5", + "cassandra-stress write cl=LOCAL_QUORUM duration=20m -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5" + ] +verify_data_after_entire_test: "cassandra-stress read cl=LOCAL_ONE n=20900 -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5" + +n_db_nodes: '3 3 0' # make n_db_nodes configured as multi-dc with last dc set to 0 (so later easily new node can be added) +region_name: 'eu-west-1 eu-west-2 eu-north-1' +n_loaders: 1 +n_monitor_nodes: 1 + +instance_type_db: 'i4i.xlarge' +seeds_num: 3 + +user_prefix: 'add-new-dc' diff --git a/test-cases/features/add-new-dc-with-zero-nodes.yaml b/test-cases/features/add-new-dc-with-zero-nodes.yaml new file mode 100644 index 00000000000..327c2f6d0b4 --- /dev/null +++ b/test-cases/features/add-new-dc-with-zero-nodes.yaml @@ -0,0 +1,20 @@ +prepare_write_cmd: "cassandra-stress write cl=QUORUM n=20900 -schema 'replication(strategy=NetworkTopologyStrategy,replication_factor=3) compaction(strategy=SizeTieredCompactionStrategy)' -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5" + +stress_cmd: ["cassandra-stress read cl=LOCAL_QUORUM duration=20m -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5", + "cassandra-stress write cl=LOCAL_QUORUM duration=20m -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5" + ] +verify_data_after_entire_test: "cassandra-stress read cl=LOCAL_QUORUM n=20900 -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5" + +n_db_nodes: '3 3 0' # make n_db_nodes configured as multi-dc with last dc set to 0 (so later easily new node can be added) +region_name: 'eu-west-1 eu-west-2 eu-north-1' +n_loaders: 1 +n_monitor_nodes: 1 + +instance_type_db: 'i4i.xlarge' +seeds_num: 3 + +user_prefix: 'add-new-dc' + +n_db_zero_token_nodes: '0 0 0' +zero_token_instance_type_db: 'i4i.large' +use_zero_nodes: true