Skip to content

Commit

Permalink
feature(new_dc): Add new node with zero node dc
Browse files Browse the repository at this point in the history
Test checks that in multidc symetric configuration:
DC1: 3, DC2: 3, we lost raft quorum if all nodes in any dc
will die (stopped, killed, etc) and with adding new DC with zero
node only allow to save raft quroum and restore failed dc with
topology operations
  • Loading branch information
aleksbykov committed Nov 23, 2024
1 parent 72a67ce commit a8d2eae
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 14 deletions.
157 changes: 150 additions & 7 deletions add_new_dc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
from typing import Tuple, List

from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement # pylint: disable=no-name-in-module
from cassandra.query import SimpleStatement
import pytest # pylint: disable=no-name-in-module

from longevity_test import LongevityTest
from sdcm.cluster import BaseNode
from sdcm.cluster import BaseNode, NodeSetupFailed, NodeSetupTimeout
from sdcm.stress_thread import CassandraStressThread
from sdcm.utils.common import skip_optional_stage
from sdcm.utils.decorators import optional_stage
from sdcm.utils.decorators import optional_stage, skip_on_capacity_issues
from sdcm.utils.replication_strategy_utils import NetworkTopologyReplicationStrategy
from sdcm.exceptions import ReadBarrierErrorException
from sdcm.utils.adaptive_timeouts import Operations, adaptive_timeout

warnings.filterwarnings(action="ignore", message="unclosed", category=ResourceWarning)

Expand Down Expand Up @@ -63,6 +66,94 @@ def test_add_new_dc(self) -> None: # pylint: disable=too-many-locals
self.verify_data_can_be_read_from_new_dc(new_node)
self.log.info("Test completed.")

def test_add_new_dc_with_zero_nodes(self):
self.log.info("Starting add new DC with zero nodes test...")
assert self.params.get('n_db_nodes').endswith(" 0"), "n_db_nodes must be a list and last dc must equal 0"
system_keyspaces = ["system_distributed", "system_traces"]
# auth-v2 is used when consistent topology is enabled
if not self.db_cluster.nodes[0].raft.is_consistent_topology_changes_enabled:
system_keyspaces.insert(0, "system_auth")

self.prewrite_db_with_data()

status = self.db_cluster.get_nodetool_status()

self.reconfigure_keyspaces_to_use_network_topology_strategy(
keyspaces=system_keyspaces + ["keyspace1"],
replication_factors={dc: len(status[dc].keys()) for dc in status}
)

self.log.info("Running repair on all nodes")
for node in self.db_cluster.nodes:
node.run_nodetool(sub_cmd="repair -pr", publish_event=True)

# Stop all nodes in 1st dc and check that raft quorum is lost

status = self.db_cluster.get_nodetool_status()
nodes_to_region = self.db_cluster.nodes_by_region(nodes=self.db_cluster.data_nodes)
regions = list(nodes_to_region.keys())
target_dc_name = regions[0]
alive_dc_name = regions[1]
for node in nodes_to_region[target_dc_name]:
node.stop_scylla()

node = nodes_to_region[alive_dc_name][0]
with pytest.raises(ReadBarrierErrorException):
node.raft.call_read_barrier()

# Start all nodes in 1st dc
for node in nodes_to_region[target_dc_name]:
node.start_scylla()

self.db_cluster.wait_all_nodes_un()

# Add new dc with zero node only
new_node = self.add_zero_node_in_new_dc()

status = self.db_cluster.get_nodetool_status()
node_host_ids = []
node_for_termination = []

nodes_to_region = self.db_cluster.nodes_by_region(nodes=self.db_cluster.data_nodes)
regions = list(nodes_to_region.keys())
target_dc_name = regions[0]
for node in nodes_to_region[target_dc_name]:
node_host_ids.append(node.host_id)
node_for_termination.append(node)
node.stop_scylla()

# check that raft quorum is not lost
new_node.raft.call_read_barrier()
# restore dc1
new_node.run_nodetool(
sub_cmd=f"removenode {node_host_ids[0]} --ignore-dead-nodes {','.join(node_host_ids[1:])}")

self.replace_cluster_node(new_node,
node_host_ids[1],
nodes_to_region[target_dc_name][-1].dc_idx,
dead_node_hostids=node_host_ids[2])

self.replace_cluster_node(new_node,
node_host_ids[2],
nodes_to_region[target_dc_name][-1].dc_idx)

# bootstrap new node in 1st dc
new_data_node = self.add_node_in_new_dc(nodes_to_region[target_dc_name][-1].dc_idx, 3)
for node in node_for_termination:
self.db_cluster.terminate_node(node)

self.db_cluster.wait_all_nodes_un()
status = self.db_cluster.get_nodetool_status()
self.log.info("Running rebuild in restored DC")
new_data_node.run_nodetool(sub_cmd=f"rebuild -- {list(status.keys())[-1]}", publish_event=True)

self.log.info("Running repair on all nodes")
for node in self.db_cluster.nodes:
node.run_nodetool(sub_cmd="repair -pr", publish_event=True)

self.verify_data_can_be_read_from_new_dc(new_data_node)
self.log.info("Test completed.")

def reconfigure_keyspaces_to_use_network_topology_strategy(self, keyspaces: List[str], replication_factors: dict[str, int]) -> None:
node = self.db_cluster.nodes[0]
self.log.info("Reconfiguring keyspace Replication Strategy")
Expand All @@ -89,22 +180,37 @@ def start_stress_during_adding_new_dc(self) -> Tuple[CassandraStressThread, Cass
self.log.info("Stress during adding DC started")
return read_thread, write_thread

def add_node_in_new_dc(self) -> BaseNode:
def add_node_in_new_dc(self, dc_idx: int = 0, num_of_dc: int = 2) -> BaseNode:
self.log.info("Adding new node")
new_node = self.db_cluster.add_nodes(1, dc_idx=1, enable_auto_bootstrap=True)[0] # add node
new_node = self.db_cluster.add_nodes(1, dc_idx=dc_idx, enable_auto_bootstrap=True)[0] # add node
self.db_cluster.wait_for_init(node_list=[new_node], timeout=900,
check_node_health=False)
self.db_cluster.wait_for_nodes_up_and_normal(nodes=[new_node])
self.monitors.reconfigure_scylla_monitoring()

status = self.db_cluster.get_nodetool_status()
assert len(status.keys()) == 2, f"new datacenter was not registered. Cluster status: {status}"
assert len(status.keys()) == num_of_dc, f"new datacenter was not registered. Cluster status: {status}"
self.log.info("New DC to cluster has been added")
return new_node

def add_zero_node_in_new_dc(self) -> BaseNode:
if not self.params.get("use_zero_nodes"):
raise Exception("Zero node support should be enabled")
self.log.info("Adding new node")
new_node = self.db_cluster.add_nodes(1, dc_idx=2, enable_auto_bootstrap=True, is_zero_node=True)[0] # add node
self.db_cluster.wait_for_init(node_list=[new_node], timeout=900,
check_node_health=True)
self.db_cluster.wait_for_nodes_up_and_normal(nodes=[new_node])
self.monitors.reconfigure_scylla_monitoring()

status = self.db_cluster.get_nodetool_status()
assert len(status.keys()) == 3, f"new datacenter was not registered. Cluster status: {status}"
self.log.info("New DC to cluster has been added")
return new_node

@optional_stage('post_test_load')
def verify_data_can_be_read_from_new_dc(self, new_node: BaseNode) -> None:
self.log.info("Veryfing if data has been transferred successfully to the new DC")
self.log.info("Verifying if data has been transferred successfully to the new DC")
stress_cmd = self.params.get('verify_data_after_entire_test') + f" -node {new_node.ip_address}"
end_stress = self.run_stress_thread(stress_cmd=stress_cmd, stats_aggregate_cmds=False, round_robin=False)
self.verify_stress_thread(cs_thread_pool=end_stress)
Expand All @@ -117,3 +223,40 @@ def querying_new_node_should_return_no_data(self, new_node: BaseNode) -> None:
fetch_size=10)
data = session.execute(statement).one()
assert not data, f"no data should be returned when querying with CL=LOCAL_QUORUM and RF=0. {data}"

def replace_cluster_node(self, verification_node: BaseNode,
host_id: str | None = None,
dc_idx: int = 0,
dead_node_hostids: str = "",
timeout: int | float = 3600 * 8) -> BaseNode:
"""When old_node_ip or host_id are not None then replacement node procedure is initiated"""
self.log.info("Adding new node to cluster...")
new_node: BaseNode = skip_on_capacity_issues(self.db_cluster.add_nodes)(
count=1, dc_idx=dc_idx, enable_auto_bootstrap=True)[0]
self.monitors.reconfigure_scylla_monitoring()
with new_node.remote_scylla_yaml() as scylla_yaml:
scylla_yaml.ignore_dead_nodes_for_replace = dead_node_hostids
# since we need this logic before starting a node, and in `use_preinstalled_scylla: false` case
# scylla is not yet installed or target node was terminated, we should use an alive node without nemesis for version,
# it should be up and with scylla executable available

new_node.replacement_host_id = host_id

try:
with adaptive_timeout(Operations.NEW_NODE, node=verification_node, timeout=timeout):
self.db_cluster.wait_for_init(node_list=[new_node], timeout=timeout, check_node_health=False)
self.db_cluster.clean_replacement_node_options(new_node)
self.db_cluster.set_seeds()
self.db_cluster.update_seed_provider()
except (NodeSetupFailed, NodeSetupTimeout):
self.log.warning("TestConfig of the '%s' failed, removing it from list of nodes" % new_node)
self.db_cluster.nodes.remove(new_node)
self.log.warning("Node will not be terminated. Please terminate manually!!!")
raise

self.db_cluster.wait_for_nodes_up_and_normal(nodes=[new_node])
new_node.wait_node_fully_start()
with new_node.remote_scylla_yaml() as scylla_yaml:
scylla_yaml.ignore_dead_nodes_for_replace = ""

return new_node
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!groovy

// trick from https://github.com/jenkinsci/workflow-cps-global-lib-plugin/pull/43
def lib = library identifier: 'sct@snapshot', retriever: legacySCM(scm)

longevityPipeline(
backend: 'aws',
region: '''["eu-west-1", "eu-west-2", "eu-north-1"]''',
test_name: 'add_new_dc_test.TestAddNewDc.test_add_new_dc_with_zero_nodes',
test_config: 'test-cases/features/add-new-dc-with-zero-nodes.yaml',

timeout: [time: 120, unit: 'MINUTES']
)
5 changes: 5 additions & 0 deletions sdcm/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,8 @@ class SstablesNotFound(Exception):

class CapacityReservationError(Exception):
pass


class ReadBarrierErrorException(Exception):
"""Raise exception if read barrier call failed"""
pass
1 change: 1 addition & 0 deletions sdcm/provision/scylla_yaml/scylla_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ def set_authorizer(cls, authorizer: str):
replace_address: str = None # ""
replace_address_first_boot: str = None # ""
replace_node_first_boot: str = None # ""
ignore_dead_nodes_for_replace: str = None # ""
override_decommission: bool = None # False
enable_repair_based_node_ops: bool = None # True
# NOTE: example for disabling RBNO for 'bootstrap' and 'decommission' operations:
Expand Down
26 changes: 26 additions & 0 deletions sdcm/rest/raft_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright (c) 2024 ScyllaDB
from sdcm.rest.remote_curl_client import RemoteCurlClient


class RaftApi(RemoteCurlClient):
"""Raft api commands"""

def __init__(self, node: "BaseNode"): # noqa: F821
super().__init__(host="localhost:10000", endpoint="raft", node=node)

def read_barrier(self, group_id: str, timeout: int = 60) -> str:
path = f"read_barrier?group_id={group_id}&timeout={timeout}"
return self.run_remoter_curl(method="POST",
path=path,
params={}, timeout=timeout + 30).stdout.strip()
51 changes: 44 additions & 7 deletions sdcm/utils/raft/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import contextlib
import logging
import random
import json

from enum import Enum
from abc import ABC, abstractmethod
Expand All @@ -13,6 +14,9 @@
from sdcm.utils.features import is_consistent_topology_changes_feature_enabled, is_consistent_cluster_management_feature_enabled
from sdcm.utils.health_checker import HealthEventsGenerator
from sdcm.wait import wait_for
from sdcm.rest.raft_api import RaftApi
from sdcm.exceptions import ReadBarrierErrorException


LOGGER = logging.getLogger(__name__)
RAFT_DEFAULT_SCYLLA_VERSION = "5.5.0-dev"
Expand Down Expand Up @@ -137,6 +141,9 @@ def get_all_messages_timeouts(self, operation: TopologyOperations):
log_patterns = self.TOPOLOGY_OPERATION_LOG_PATTERNS.get(operation)
return list(map(self.get_message_waiting_timeout, log_patterns))

def call_read_barrier(self):
...


class RaftFeature(RaftFeatureOperations):
TOPOLOGY_OPERATION_LOG_PATTERNS: dict[TopologyOperations, Iterable[MessagePosition]] = {
Expand Down Expand Up @@ -177,14 +184,10 @@ def get_group0_members(self) -> list[dict[str, str]]:
group0_members = []
try:
with self._node.parent_cluster.cql_connection_patient_exclusive(node=self._node) as session:
row = session.execute("select value from system.scylla_local where key = 'raft_group0_id'").one()
if not row:
return []
raft_group0_id = row.value

raft_group0_id = self.get_group0_id(session)
assert raft_group0_id, "Group0 id was not found"
rows = session.execute(f"select server_id, can_vote from system.raft_state \
where group_id = {raft_group0_id} and disposition = 'CURRENT'").all()

for row in rows:
group0_members.append({"host_id": str(row.server_id),
"voter": row.can_vote})
Expand Down Expand Up @@ -347,6 +350,37 @@ def check_group0_tokenring_consistency(
LOGGER.debug("Group0 and token-ring are consistent on node %s (host_id=%s)...",
self._node.name, self._node.host_id)

def get_group0_id(self, session) -> str:
try:
row = session.execute("select value from system.scylla_local where key = 'raft_group0_id'").one()
return row.value
except Exception as exc: # pylint: disable=broad-except # noqa: BLE001
err_msg = f"Get group0 members failed with error: {exc}"
LOGGER.error(err_msg)
return ""

def call_read_barrier(self, timeout: int = 60):
""" Wait until the node applies all previously committed Raft entries
Any schema/topology changes are committed with Raft group0 on node. Before
change is written to node, raft group0 checks that all previous schema/
topology changes were applied. Raft group0 triggers read_barrier on node, and
node applies all previous changes(commits) before new schema/operation
write will be applied. After read barrier finished, it guarantees that
node has all schema/topology changes, which was done in cluster before
read_barrier started on node.
"""
with self._node.parent_cluster.cql_connection_patient_exclusive(node=self._node) as session:
raft_group0_id = self.get_group0_id(session)
assert raft_group0_id, "Group0 id was not found"
api = RaftApi(self._node)
result = api.read_barrier(group_id=raft_group0_id, timeout=timeout)
LOGGER.debug("Api response %s", result)
if not result:
return
status = json.loads(result)
raise ReadBarrierErrorException(f"Error code: {status['code']}, Error message: {status['message']}")


class NoRaft(RaftFeatureOperations):
TOPOLOGY_OPERATION_LOG_PATTERNS = {
Expand Down Expand Up @@ -404,6 +438,9 @@ def check_group0_tokenring_consistency(
tokenring_members: list[dict[str, str]]) -> None:
LOGGER.debug("Raft feature is disabled on node %s (host_id=%s)", self._node.name, self._node.host_id)

def call_read_barrier(self):
...


def get_raft_mode(node) -> RaftFeature | NoRaft:
with node.parent_cluster.cql_connection_patient(node) as session:
Expand Down Expand Up @@ -440,5 +477,5 @@ def get_node_status_from_system_by(verification_node: "BaseNode", *, ip_address:

__all__ = ["get_raft_mode",
"get_node_status_from_system_by",
"Group0MembersNotConsistentWithTokenRingMembersException",
"Group0MembersNotConsistentWithTokenRingMembersException", "RestApiError",
]
16 changes: 16 additions & 0 deletions test-cases/features/add-new-dc-with-zero-node-in-multidc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
prepare_write_cmd: "cassandra-stress write cl=QUORUM n=20900 -schema 'replication(strategy=NetworkTopologyStrategy,replication_factor=3) compaction(strategy=SizeTieredCompactionStrategy)' -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5"

stress_cmd: ["cassandra-stress read cl=LOCAL_QUORUM duration=20m -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5",
"cassandra-stress write cl=LOCAL_QUORUM duration=20m -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5"
]
verify_data_after_entire_test: "cassandra-stress read cl=LOCAL_ONE n=20900 -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5"

n_db_nodes: '3 3 0' # make n_db_nodes configured as multi-dc with last dc set to 0 (so later easily new node can be added)
region_name: 'eu-west-1 eu-west-2 eu-north-1'
n_loaders: 1
n_monitor_nodes: 1

instance_type_db: 'i4i.xlarge'
seeds_num: 3

user_prefix: 'add-new-dc'
20 changes: 20 additions & 0 deletions test-cases/features/add-new-dc-with-zero-nodes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
prepare_write_cmd: "cassandra-stress write cl=QUORUM n=20900 -schema 'replication(strategy=NetworkTopologyStrategy,replication_factor=3) compaction(strategy=SizeTieredCompactionStrategy)' -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5"

stress_cmd: ["cassandra-stress read cl=LOCAL_QUORUM duration=20m -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5",
"cassandra-stress write cl=LOCAL_QUORUM duration=20m -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5"
]
verify_data_after_entire_test: "cassandra-stress read cl=LOCAL_QUORUM n=20900 -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5"

n_db_nodes: '3 3 0' # make n_db_nodes configured as multi-dc with last dc set to 0 (so later easily new node can be added)
region_name: 'eu-west-1 eu-west-2 eu-north-1'
n_loaders: 1
n_monitor_nodes: 1

instance_type_db: 'i4i.xlarge'
seeds_num: 3

user_prefix: 'add-new-dc'

n_db_zero_token_nodes: '0 0 0'
zero_token_instance_type_db: 'i4i.large'
use_zero_nodes: true

0 comments on commit a8d2eae

Please sign in to comment.