Skip to content

Commit

Permalink
fix(TopologyOpsMonitor): Monitor decommission topology operation
Browse files Browse the repository at this point in the history
Decommission operation could be terminated for different reason, but
decommission process will be run on node and could successfully be
decommissioned. But because of nemesis is terminated by exception
cluster health validator could abort whole test run because
node status is Decommissioning. This start happened with nemesis
DecommissionStreamingErr, when log message could not be found and
process is aborted by timeout, while decommission continue to run.

To Catch such case FailedDecommissionOperationMonitoring is presented
It is ContextManager which could be used to safely run decommission
operation and check node status and wait decommission will be finished
if command aborted or terminated.

Fix: scylladb#8144
  • Loading branch information
aleksbykov committed Sep 25, 2024
1 parent e53cfd6 commit b0788d4
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 3 deletions.
9 changes: 7 additions & 2 deletions sdcm/nemesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@
get_gc_mode, GcMode
from test_lib.cql_types import CQLTypeBuilder
from test_lib.sla import ServiceLevel, MAX_ALLOWED_SERVICE_LEVELS

from sdcm.utils.topology_ops import FailedDecommissionOperationMonitoring

LOGGER = logging.getLogger(__name__)
# NOTE: following lock is needed in the K8S multitenant case
Expand Down Expand Up @@ -3917,7 +3917,12 @@ def decommission_post_action():
for expected_start_failed_context in self.target_node.raft.get_severity_change_filters_scylla_start_failed(
terminate_pattern.timeout):
stack.enter_context(expected_start_failed_context)
with ignore_stream_mutation_fragments_errors(), ignore_raft_topology_cmd_failing():
with ignore_stream_mutation_fragments_errors(), ignore_raft_topology_cmd_failing(), \
self.run_nemesis(node_list=self.cluster.nodes, nemesis_label="DecommissionStreamingErr") as verification_node, \
FailedDecommissionOperationMonitoring(target_node=self.target_node,
verification_node=verification_node,
timeout=full_operations_timeout, expected_exception=(KillNemesis, )):

ParallelObject(objects=[trigger, watcher], timeout=full_operations_timeout).call_objects()
if new_node := decommission_post_action():
new_node.wait_node_fully_start()
Expand Down
18 changes: 18 additions & 0 deletions sdcm/sct_events/group_common_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,24 @@ def ignore_raft_topology_cmd_failing():
regex=r".*raft_topology - send_raft_topology_cmd\(stream_ranges\) failed with exception \(node state is replacing\)",
extra_time_to_expiration=30
))
stack.enter_context(EventsSeverityChangerFilter(
new_severity=Severity.WARNING,
event_class=DatabaseLogEvent,
regex=r".*raft_topology - drain rpc failed, proceed to fence old writes.*connection is closed",
extra_time_to_expiration=30
))
stack.enter_context(EventsSeverityChangerFilter(
new_severity=Severity.WARNING,
event_class=DatabaseLogEvent,
regex=r".*raft_topology - topology change coordinator fiber got error std::runtime_error.*connection is closed",
extra_time_to_expiration=30
))
stack.enter_context(EventsSeverityChangerFilter(
new_severity=Severity.WARNING,
event_class=DatabaseLogEvent,
regex=r".*raft_topology - transition_state::write_both_read_new, global_token_metadata_barrier failed.*connection is closed",
extra_time_to_expiration=30
))
yield


Expand Down
5 changes: 4 additions & 1 deletion sdcm/utils/raft/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,16 @@ class MessageTimeout(NamedTuple):
}

ABORT_DECOMMISSION_LOG_PATTERNS: Iterable[MessagePosition] = [
MessagePosition("api - decommission", LogPosition.BEGIN),
MessagePosition("DECOMMISSIONING: unbootstrap starts", LogPosition.BEGIN),
MessagePosition("DECOMMISSIONING: unbootstrap done", LogPosition.END),
MessagePosition("becoming a group 0 non-voter", LogPosition.END),
MessagePosition("became a group 0 non-voter", LogPosition.END),
MessagePosition("leaving token ring", LogPosition.END),
MessagePosition("left token ring", LogPosition.END),
MessagePosition("Finished token ring movement", LogPosition.END)
MessagePosition("Finished token ring movement", LogPosition.END),
MessagePosition("raft_topology - decommission: waiting for completion", LogPosition.BEGIN),
MessagePosition("repair - decommission_with_repair", LogPosition.END)
]

ABORT_BOOTSTRAP_LOG_PATTERNS: Iterable[MessagePosition] = [
Expand Down
91 changes: 91 additions & 0 deletions sdcm/utils/topology_ops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import logging
import traceback
from dataclasses import dataclass
from enum import Enum

LOGGER = logging.getLogger(__name__)


class OperationState(Enum):
decommission = "DECOMMISSIONING"
shutdown = "shutdown"


@dataclass
class NodeStatus:
ip_address: str
host_id: str
status: str
up: bool


class FailedDecommissionOperationMonitoring:
"""Monitor status of decommission operation after fail
If decommission command was failed by any reason but decommission
process still running, wait while it be finished and check operation status
"""

def __init__(self, target_node: "BaseNode", verification_node: "BaseNode",
timeout: int | float = 7200, expected_exception: tuple[Exception, ...] | None = None):
self.timeout = timeout
self.target_node = target_node
self.db_cluster: BaseScyllaCluster = target_node.parent_cluster
self.target_node_ip = target_node.ip_address
expected_exception = expected_exception or set()
self.expected_exception = set(expected_exception)
self.verification_node = verification_node

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type:
LOGGER.warning("Decommission failed with error: %s", traceback.format_exception(exc_type, exc_val, exc_tb))
decommission_in_progress = self.is_node_decommissioning()
if not decommission_in_progress:
self.db_cluster.verify_decommission(self.target_node)
return True
if exc_type not in self.expected_exception and decommission_in_progress:
LOGGER.debug("Wait decommission to be done...")
wait_for(func=lambda: not self.is_node_decommissioning(), step=15,
timeout=self.timeout,
text=f"Waiting decommission is finished for {self.target_node.name}...")
self.db_cluster.verify_decommission(self.target_node)
return True

def is_node_decommissioning(self):
with self.db_cluster.cql_connection_patient(node=self.verification_node) as session:
node_status = get_node_state_by_ip(session, ip_address=self.target_node_ip)
LOGGER.debug("The node %s state after decommission failed: %s", self.target_node.name, node_status)
return OperationState(node_status.status) == OperationState.decommission if node_status else False


def get_node_state_by_ip(session: "Session", ip_address: str) -> NodeStatus | None:
nodes_states = get_nodes_states_from_system_table(session)
for node_state in nodes_states:
if node_state.ip_address == ip_address:
return node_state


def get_node_state_by_hostid(session: "Session", host_id: str) -> NodeStatus | None:
nodes_states = get_nodes_states_from_system_table(session)
for node_state in nodes_states:
if node_state.ip_address == host_id:
return node_state


def get_nodes_states_from_system_table(session) -> list[NodeStatus]:
"""Get cluster status from system.cluster_status table
The table contains current cluster status for each node and updated
by raft accordingly
"""
query = "select peer, host_id, status, up from system.cluster_status"
session.default_timeout = 300
results = session.execute(query)
nodes_status = []
for row in results:
nodes_status.append(NodeStatus(row.peer, str(row.host_id), row.status, row.up))
LOGGER.debug("All nodes status: %s", nodes_status)
return nodes_status

0 comments on commit b0788d4

Please sign in to comment.