From d4177d8dbe41a76e7efa68a8c3cfc8aa083fdb5b Mon Sep 17 00:00:00 2001 From: Gil Bregman Date: Mon, 6 Jan 2025 14:38:34 +0200 Subject: [PATCH] Add a CLI command to show SPDK ANA states. Fixes #973 Signed-off-by: Gil Bregman --- control/cli.py | 83 ++++++++++++++++++++++++++ control/grpc.py | 114 ++++++++++++++++++++++++++++++++++-- control/proto/gateway.proto | 18 ++++++ tests/ha/demo_test.sh | 83 ++++++++++++++++++++++++++ tests/test_grpc.py | 4 +- 5 files changed, 295 insertions(+), 7 deletions(-) diff --git a/control/cli.py b/control/cli.py index cf651a97..91afe36d 100644 --- a/control/cli.py +++ b/control/cli.py @@ -511,10 +511,88 @@ def gw_set_log_level(self, args): else: assert False + def gw_listener_info(self, args): + """Show gateway's listeners info""" + + out_func, err_func = self.get_output_functions(args) + listeners_info = None + try: + list_req = pb2.show_gateway_listeners_info_req(subsystem_nqn=args.subsystem) + listeners_info = self.stub.show_gateway_listeners_info(list_req) + except Exception as ex: + listeners_info = pb2.gateway_listeners_info(status=errno.EINVAL, + error_message=f"Failure listing gateway " + f"listeners info:\n{ex}", + gw_listeners=[]) + + if args.format == "text" or args.format == "plain": + if listeners_info.status == 0: + listeners_list = [] + for lstnr in listeners_info.gw_listeners: + ana_states = "" + for ana in lstnr.lb_states: + if not args.verbose and ana.state != pb2.ana_state.OPTIMIZED: + continue + state_str = GatewayEnumUtils.get_key_from_value(pb2.ana_state, ana.state) + if state_str is None: + ana_states += str(ana.grp_id) + "/" + str(ana.state) + "\n" + else: + ana_states += str(ana.grp_id) + "/" + state_str.title() + "\n" + adrfam = GatewayEnumUtils.get_key_from_value(pb2.AddressFamily, + lstnr.listener.adrfam) + adrfam = self.format_adrfam(adrfam) + secure = "Yes" if lstnr.listener.secure else "No" + ana_states = ana_states.removesuffix("\n") + listeners_list.append([lstnr.listener.host_name, + lstnr.listener.trtype, + adrfam, + f"{lstnr.listener.traddr}:{lstnr.listener.trsvcid}", + secure, + ana_states]) + if len(listeners_list) > 0: + if args.format == "text": + table_format = "fancy_grid" + else: + table_format = "plain" + listeners_out = tabulate(listeners_list, + headers=["Host", + "Transport", + "Address Family", + "Address", + "Secure", + "Load Balancing Group ID/State"], + tablefmt=table_format) + out_func(f"Gateway listeners for {args.subsystem}:\n{listeners_out}") + else: + out_func(f"No gateway listeners for {args.subsystem}") + else: + err_func(f"{listeners_info.error_message}") + elif args.format == "json" or args.format == "yaml": + ret_str = json_format.MessageToJson(listeners_info, indent=4, + including_default_value_fields=True, + preserving_proto_field_name=True) + if args.format == "json": + out_func(ret_str) + elif args.format == "yaml": + obj = json.loads(ret_str) + out_func(yaml.dump(obj)) + elif args.format == "python": + return listeners_info + else: + assert False + + return listeners_info.status + gw_set_log_level_args = [ argument("--level", "-l", help="Gateway log level", required=True, type=str, choices=get_enum_keys_list(pb2.GwLogLevel, False)), ] + gw_listener_info_args = [ + argument("--subsystem", + "-n", + help="Subsystem NQN", + required=True), + ] gw_actions = [] gw_actions.append({"name": "version", "args": [], @@ -528,6 +606,9 @@ def gw_set_log_level(self, args): gw_actions.append({"name": "set_log_level", "args": gw_set_log_level_args, "help": "Set gateway's log level"}) + gw_actions.append({"name": "listener_info", + "args": gw_listener_info_args, + "help": "Show listeners information for the gateway"}) gw_choices = get_actions(gw_actions) @cli.cmd(gw_actions) @@ -542,6 +623,8 @@ def gw(self, args): return self.gw_get_log_level(args) elif args.action == "set_log_level": return self.gw_set_log_level(args) + elif args.action == "listener_info": + return self.gw_listener_info(args) if not args.action: self.cli.parser.error(f"missing action for gw command (choose from " f"{GatewayClient.gw_choices})") diff --git a/control/grpc.py b/control/grpc.py index 6bccef00..3498dea5 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -565,6 +565,17 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, self.ana_grp_ns_load = {} self.ana_grp_subs_load = defaultdict(dict) self.max_ana_grps = self.config.getint_with_default("gateway", "max_gws_in_grp", 16) + if self.max_ana_grps > self.max_namespaces: + self.logger.warning(f"Maximal number of load balancing groups can't be greather " + f"than the maximal number of namespaces, will truncate " + f"to {self.max_namespaces}") + self.max_ana_grps = self.max_namespaces + + if self.max_namespaces_per_subsystem > self.max_namespaces: + self.logger.warning(f"Maximal number of namespace per subsystem can't be greater " + f"than the global maximal number of namespaces, will truncate " + f"to {self.max_namespaces}") + self.max_namespaces_per_subsystem = self.max_namespaces for i in range(self.max_ana_grps + 1): self.ana_grp_ns_load[i] = 0 @@ -1500,8 +1511,8 @@ def create_namespace(self, subsystem_nqn, bdev_name, nsid, anagrpid, uuid, add_namespace_error_prefix = f"Failure adding namespace{nsid_msg} to {subsystem_nqn}" peer_msg = self.get_peer_message(context) - self.logger.info(f"Received request to add {bdev_name} to {subsystem_nqn} with ANA group " - f"id {anagrpid}{nsid_msg}, auto_visible: {auto_visible}, " + self.logger.info(f"Received request to add {bdev_name} to {subsystem_nqn} with load " + f"balancing group id {anagrpid}{nsid_msg}, auto_visible: {auto_visible}, " f"context: {context}{peer_msg}") if subsystem_nqn not in self.subsys_max_ns: @@ -1746,7 +1757,7 @@ def namespace_add_safe(self, request, context): self.gateway_group) else: anagrp = self.choose_anagrpid_for_namespace(request.nsid) - assert anagrp != 0, "Chosen ANA group is 0" + assert anagrp != 0, "Chosen load balancing group is 0" if request.nsid: ns = self.subsystem_nsid_bdev_and_uuid.find_namespace(request.subsystem_nqn, @@ -1788,7 +1799,7 @@ def namespace_add_safe(self, request, context): # If an explicit load balancing group was passed, make sure it exists if request.anagrpid != 0: if request.anagrpid not in grps_list: - self.logger.debug(f"ANA groups: {grps_list}") + self.logger.debug(f"Load balancing groups: {grps_list}") errmsg = f"Failure adding namespace {nsid_msg}to " \ f"{request.subsystem_nqn}: Load balancing group " \ f"{request.anagrpid} doesn't exist" @@ -1897,7 +1908,7 @@ def namespace_change_load_balancing_group_safe(self, request, context): grps_list = self.ceph_utils.get_number_created_gateways( self.gateway_pool, self.gateway_group) if request.anagrpid not in grps_list: - self.logger.debug(f"ANA groups: {grps_list}") + self.logger.debug(f"Load balancing groups: {grps_list}") errmsg = f"{change_lb_group_failure_prefix}: Load balancing group " \ f"{request.anagrpid} doesn't exist" self.logger.error(errmsg) @@ -4168,6 +4179,99 @@ def list_listeners_safe(self, request, context): def list_listeners(self, request, context=None): return self.execute_grpc_function(self.list_listeners_safe, request, context) + def show_gateway_listeners_info_safe(self, request, context): + """Show gateway's listeners info.""" + + peer_msg = self.get_peer_message(context) + self.logger.info(f"Received request to show gateway listeners info for " + f"{request.subsystem_nqn}, context: {context}{peer_msg}") + + if self.ana_grp_state[0] != pb2.ana_state.INACCESSIBLE: + errmsg = "Internal error, we shouldn't have a real state for load balancing group 0" + self.logger.error(errmsg) + return pb2.gateway_listeners_info(status=errno.EINVAL, + error_message=errmsg, + gw_listeners=[]) + + try: + ret = rpc_nvmf.nvmf_subsystem_get_listeners(self.spdk_rpc_client, + nqn=request.subsystem_nqn) + self.logger.debug(f"get_listeners: {ret}") + except Exception as ex: + errmsg = "Failure listing gateway listeners" + self.logger.exception(errmsg) + errmsg = f"{errmsg}:\n{ex}" + resp = self.parse_json_exeption(ex) + status = errno.ENODEV + if resp: + status = resp["code"] + errmsg = f"Failure listing gateway listeners: {resp['message']}" + return pb2.gateway_listeners_info(status=status, + error_message=errmsg, + gw_listeners=[]) + + gw_listeners = [] + for lstnr in ret: + try: + secure = False + if request.subsystem_nqn in self.subsystem_listeners: + local_lstnr = (lstnr["address"]["adrfam"].lower(), + lstnr["address"]["traddr"], + int(lstnr["address"]["trsvcid"]), + True) + if local_lstnr in self.subsystem_listeners[request.subsystem_nqn]: + secure = True + lstnr_part = pb2.listener_info(host_name=self.host_name, + trtype=lstnr["address"]["trtype"].upper(), + adrfam=lstnr["address"]["adrfam"].lower(), + traddr=lstnr["address"]["traddr"], + trsvcid=int(lstnr["address"]["trsvcid"]), + secure=secure) + except Exception: + self.logger.exception(f"Error getting address from {lstnr}") + continue + + ana_states = [] + try: + for ana_state in lstnr["ana_states"]: + spdk_group = ana_state["ana_group"] + if spdk_group > self.max_ana_grps: + continue + spdk_state = ana_state["ana_state"] + spdk_state_enum_val = GatewayEnumUtils.get_value_from_key(pb2.ana_state, + spdk_state.upper()) + if spdk_state_enum_val is None: + self.logger.error(f"Unknown state \"{spdk_state}\" for " + f"load balancing group {spdk_group} in SPDK") + continue + + ana_states.append(pb2.ana_group_state(grp_id=spdk_group, + state=spdk_state_enum_val)) + if spdk_group in self.ana_grp_state: + if self.ana_grp_state[spdk_group] != spdk_state_enum_val: + gw_state_str = GatewayEnumUtils.get_key_from_value( + pb2.ana_state, self.ana_grp_state[spdk_group]) + if gw_state_str is None: + self.logger.error(f'State for load balancing group {spdk_group} ' + f'is "{self.ana_grp_state[spdk_group]}" ' + f'but is {spdk_state_enum_val} in SPDK') + else: + self.logger.error(f'State for load balancing group {spdk_group} ' + f'is "{gw_state_str}" ' + f'but is "{spdk_state}" in SPDK') + except Exception: + self.logger.exception(f"Error parsing load balancing state {ana_state}") + continue + + gw_lstnr = pb2.gateway_listener_info(listener=lstnr_part, lb_states=ana_states) + gw_listeners.append(gw_lstnr) + + return pb2.gateway_listeners_info(status=0, error_message=os.strerror(0), + gw_listeners=gw_listeners) + + def show_gateway_listeners_info(self, request, context=None): + return self.execute_grpc_function(self.show_gateway_listeners_info_safe, request, context) + def list_subsystems_safe(self, request, context): """List subsystems.""" diff --git a/control/proto/gateway.proto b/control/proto/gateway.proto index cee82a1d..521897fc 100644 --- a/control/proto/gateway.proto +++ b/control/proto/gateway.proto @@ -122,6 +122,9 @@ service Gateway { // Set gateway log level rpc set_gateway_log_level(set_gateway_log_level_req) returns(req_status) {} + + // Show gateway listeners info + rpc show_gateway_listeners_info(show_gateway_listeners_info_req) returns(gateway_listeners_info) {} } // Request messages @@ -299,6 +302,10 @@ message set_gateway_log_level_req { GwLogLevel log_level = 1; } +message show_gateway_listeners_info_req { + string subsystem_nqn = 1; +} + // From https://nvmexpress.org/wp-content/uploads/NVM-Express-1_4-2019.06.10-Ratified.pdf page 138 // Asymmetric Namespace Access state for all namespaces in this ANA // Group when accessed through this controller. @@ -455,6 +462,17 @@ message listeners_info { repeated listener_info listeners = 3; } +message gateway_listener_info { + listener_info listener = 1; + repeated ana_group_state lb_states = 2; +} + +message gateway_listeners_info { + int32 status = 1; + string error_message = 2; + repeated gateway_listener_info gw_listeners = 3; +} + message host { string nqn = 1; optional bool use_psk = 2; diff --git a/tests/ha/demo_test.sh b/tests/ha/demo_test.sh index e73cf71d..6a29d1e1 100755 --- a/tests/ha/demo_test.sh +++ b/tests/ha/demo_test.sh @@ -251,6 +251,89 @@ function demo_bdevperf_unsecured() [[ "$devs" == "Nvme0n1 Nvme0n2" ]] make exec SVC=bdevperf OPTS=-T CMD="$rpc -v -s $BDEVPERF_SOCKET bdev_nvme_detach_controller Nvme0" + echo "ℹ️ verify listeners info, both gateway and SPDK" + rm -f /tmp/gw_listeners.txt + rm -f /tmp/listeners2.txt + cephnvmf_func --output stdio --format json gw listener_info --subsystem $NQN > /tmp/gw_listeners.txt + cephnvmf_func --output stdio --format json listener list --subsystem $NQN > /tmp/listeners.txt + cat /tmp/gw_listeners.txt + [[ `cat /tmp/gw_listeners.txt | jq -r '.status'` == "0" ]] + [[ `cat /tmp/gw_listeners.txt | jq -r '.gw_listeners[0].listener.trtype'` == "TCP" ]] + [[ `cat /tmp/gw_listeners.txt | jq -r '.gw_listeners[0].listener.adrfam'` == "ipv6" ]] + [[ `cat /tmp/gw_listeners.txt | jq -r '.gw_listeners[0].listener.traddr'` == "2001:db8::3" ]] + [[ `cat /tmp/gw_listeners.txt | jq -r '.gw_listeners[0].listener.trsvcid'` == "4420" ]] + [[ `cat /tmp/gw_listeners.txt | jq -r '.gw_listeners[0].listener.secure'` == "false" ]] + [[ `cat /tmp/gw_listeners.txt | jq -r '.gw_listeners[1].listener.trtype'` == "TCP" ]] + [[ `cat /tmp/gw_listeners.txt | jq -r '.gw_listeners[1].listener.adrfam'` == "ipv4" ]] + [[ `cat /tmp/gw_listeners.txt | jq -r '.gw_listeners[1].listener.traddr'` == "0.0.0.0" ]] + [[ `cat /tmp/gw_listeners.txt | jq -r '.gw_listeners[1].listener.trsvcid'` == "4430" ]] + [[ `cat /tmp/gw_listeners.txt | jq -r '.gw_listeners[1].listener.secure'` == "false" ]] + [[ `cat /tmp/gw_listeners.txt | jq -r '.gw_listeners[2].listener.trtype'` == "TCP" ]] + [[ `cat /tmp/gw_listeners.txt | jq -r '.gw_listeners[2].listener.adrfam'` == "ipv4" ]] + [[ `cat /tmp/gw_listeners.txt | jq -r '.gw_listeners[2].listener.traddr'` == "192.168.13.3" ]] + [[ `cat /tmp/gw_listeners.txt | jq -r '.gw_listeners[2].listener.trsvcid'` == "4420" ]] + [[ `cat /tmp/gw_listeners.txt | jq -r '.gw_listeners[2].listener.secure'` == "false" ]] + [[ `cat /tmp/gw_listeners.txt | jq -r '.gw_listeners[3]'` == "null" ]] + hostname0=`cat /tmp/gw_listeners.txt | jq -r '.gw_listeners[0].listener.host_name'` + hostname1=`cat /tmp/gw_listeners.txt | jq -r '.gw_listeners[1].listener.host_name'` + hostname2=`cat /tmp/gw_listeners.txt | jq -r '.gw_listeners[2].listener.host_name'` + [[ "$hostname0" == "$hostname1" ]] + [[ "$hostname0" == "$hostname2" ]] + [[ `cat /tmp/gw_listeners.txt | jq -r '.gw_listeners[0].lb_states[0].grp_id'` == "1" ]] + [[ `cat /tmp/gw_listeners.txt | jq -r '.gw_listeners[0].lb_states[0].state'` == "OPTIMIZED" ]] + for lsnr in 0 1 2 + do + [[ `cat /tmp/gw_listeners.txt | jq -r ".gw_listeners[${lsnr}].lb_states[0].grp_id"` == "1" ]] + [[ `cat /tmp/gw_listeners.txt | jq -r ".gw_listeners[${lsnr}].lb_states[0].state"` == "OPTIMIZED" ]] + for i in `seq 1 15` + do + grp=`expr ${i} + 1` + [[ `cat /tmp/gw_listeners.txt | jq -r ".gw_listeners[${lsnr}].lb_states[${i}].grp_id"` == "${grp}" ]] + [[ `cat /tmp/gw_listeners.txt | jq -r ".gw_listeners[${lsnr}].lb_states[${i}].state"` == "INACCESSIBLE" ]] + done + done + + cat /tmp/listeners.txt + [[ `cat /tmp/listeners.txt | jq -r '.status'` == "0" ]] + [[ `cat /tmp/listeners.txt | jq -r '.listeners[0].trtype'` == "TCP" ]] + [[ `cat /tmp/listeners.txt | jq -r '.listeners[0].adrfam'` == "ipv4" ]] + [[ `cat /tmp/listeners.txt | jq -r '.listeners[0].traddr'` == "192.168.13.3" ]] + [[ `cat /tmp/listeners.txt | jq -r '.listeners[0].trsvcid'` == "4420" ]] + [[ `cat /tmp/listeners.txt | jq -r '.listeners[0].secure'` == "false" ]] + [[ `cat /tmp/listeners.txt | jq -r '.listeners[1].trtype'` == "TCP" ]] + [[ `cat /tmp/listeners.txt | jq -r '.listeners[1].adrfam'` == "ipv4" ]] + [[ `cat /tmp/listeners.txt | jq -r '.listeners[1].traddr'` == "0.0.0.0" ]] + [[ `cat /tmp/listeners.txt | jq -r '.listeners[1].trsvcid'` == "4430" ]] + [[ `cat /tmp/listeners.txt | jq -r '.listeners[1].secure'` == "false" ]] + [[ `cat /tmp/listeners.txt | jq -r '.listeners[2].trtype'` == "TCP" ]] + [[ `cat /tmp/listeners.txt | jq -r '.listeners[2].adrfam'` == "ipv6" ]] + [[ `cat /tmp/listeners.txt | jq -r '.listeners[2].traddr'` == "2001:db8::3" ]] + [[ `cat /tmp/listeners.txt | jq -r '.listeners[2].trsvcid'` == "4420" ]] + [[ `cat /tmp/listeners.txt | jq -r '.listeners[2].secure'` == "false" ]] + [[ `cat /tmp/listeners.txt | jq -r '.listeners[3]'` == "null" ]] + hostname20=`cat /tmp/listeners.txt | jq -r '.listeners[0].host_name'` + hostname21=`cat /tmp/listeners.txt | jq -r '.listeners[1].host_name'` + hostname22=`cat /tmp/listeners.txt | jq -r '.listeners[2].host_name'` + [[ "$hostname20" == "$hostname21" ]] + [[ "$hostname20" == "$hostname22" ]] + [[ "$hostname20" == "$hostname0" ]] + + rm -f /tmp/gw_listeners.txt + rm -f /tmp/listeners.txt + cephnvmf_func --output stdio --format plain gw listener_info --subsystem $NQN > /tmp/gw_listeners.txt + cat /tmp/gw_listeners.txt + grep "TCP IPv6 2001:db8::3:4420 No 1/Optimized" /tmp/gw_listeners.txt + grep "TCP IPv4 0.0.0.0:4430 No 1/Optimized" /tmp/gw_listeners.txt + grep "TCP IPv4 192.168.13.3:4420 No 1/Optimized" /tmp/gw_listeners.txt + + set +e + tail -n +3 /tmp/gw_listeners.txt | grep -v "Optimized" + if [[ $? -eq 0 ]]; then + echo "Should only get optimized load balancing states" + exit 1 + fi + set -e + return $? } diff --git a/tests/test_grpc.py b/tests/test_grpc.py index d05700aa..882b4ab6 100644 --- a/tests/test_grpc.py +++ b/tests/test_grpc.py @@ -60,7 +60,7 @@ def test_create_get_subsys(caplog, config): assert "failed" not in caplog.text.lower().replace("failed to notify", "") assert "Failure" not in caplog.text - assert f"{subsystem_prefix}0 with ANA group id 1" in caplog.text + assert f"{subsystem_prefix}0 with load balancing group id 1" in caplog.text caplog.clear() # add a listener @@ -109,7 +109,7 @@ def test_create_get_subsys(caplog, config): time.sleep(0.1) time.sleep(20) # Make sure update() is over - assert f"{subsystem_prefix}0 with ANA group id 1" in caplog.text + assert f"{subsystem_prefix}0 with load balancing group id 1" in caplog.text assert f"Received request to set QOS limits for namespace 1 on " \ f"{subsystem_prefix}0, R/W IOs per second: 2000 " \ f"Read megabytes per second: 5" in caplog.text