From d2b3d1abe72e3231819459342af0cae3f892b888 Mon Sep 17 00:00:00 2001 From: Teingi Date: Mon, 23 Sep 2024 17:33:45 +0800 Subject: [PATCH 1/6] gather scene run add --skip_type option --- diag_cmd.py | 1 + handler/gather/gather_scenes.py | 3 +++ handler/gather/scenes/base.py | 11 ++++++++--- handler/gather/scenes/sql_problem.py | 11 ++++++++--- handler/gather/step/base.py | 17 +++++++++++------ 5 files changed, 31 insertions(+), 12 deletions(-) diff --git a/diag_cmd.py b/diag_cmd.py index f605c10f..377b84cc 100644 --- a/diag_cmd.py +++ b/diag_cmd.py @@ -678,6 +678,7 @@ def __init__(self): self.parser.add_option('--env', type='string', help='env, eg: "{env1=xxx, env2=xxx}"') self.parser.add_option('--store_dir', type='string', help='the dir to store gather result, current dir by default.', default='./') self.parser.add_option('--temp_dir', type='string', help='the dir for temporarily storing files on nodes', default='/tmp') + self.parser.add_option('--skip_type', type='string', help='The types of gather to be skipped, choices=[ssh, sql]') self.parser.add_option('-c', type='string', help='obdiag custom config', default=os.path.expanduser('~/.obdiag/config.yml')) self.parser.add_option('--config', action="append", type="string", help='config options Format: --config key=value') diff --git a/handler/gather/gather_scenes.py b/handler/gather/gather_scenes.py index 81b798ef..a0ad5ee0 100644 --- a/handler/gather/gather_scenes.py +++ b/handler/gather/gather_scenes.py @@ -188,6 +188,7 @@ def init_option(self): env_option = Util.get_option(options, 'env') scene_option = Util.get_option(options, 'scene') temp_dir_option = Util.get_option(options, 'temp_dir') + skip_type_option = Util.get_option(options, 'skip_type') if from_option is not None and to_option is not None: try: from_timestamp = TimeUtils.parse_time_str(from_option) @@ -226,6 +227,8 @@ def init_option(self): self.env = env_dict if temp_dir_option: self.temp_dir = temp_dir_option + if skip_type_option: + self.context.set_variable('gather_skip_type', skip_type_option) return True def __get_sql_result(self): diff --git a/handler/gather/scenes/base.py b/handler/gather/scenes/base.py index abc4daaf..69bc7ea2 100644 --- a/handler/gather/scenes/base.py +++ b/handler/gather/scenes/base.py @@ -90,14 +90,19 @@ def __execute_yaml_mode(self, nodes): self.stdio.verbose("run scene excute yaml mode in node") def __execute_code_mode(self): + skip_type = self.context.get_variable("gather_skip_type", None) + if skip_type: + self.stdio.verbose("needs to be filtered out and not gather type is {0}".format(skip_type)) if self.scene["name"] == "observer.perf_sql" or self.scene["name"] == "observer.sql_err": scene = SQLProblemScene(self.context, self.scene["name"], self.report_dir, self.scene_variable_dict, self.env) - elif self.scene["name"] == "observer.cpu_high": + elif self.scene["name"] == "observer.cpu_high" and (skip_type != "ssh"): scene = CPUHighScene(self.context, self.report_dir, self.scene_variable_dict, self.env) - elif self.scene["name"] == "observer.px_collect_log": + elif self.scene["name"] == "observer.px_collect_log" and (skip_type != "ssh"): scene = SQLPXCollectLogScene(self.context, self.scene["name"], self.report_dir, self.scene_variable_dict, self.env) else: - self.stdio.error("unsupported hard code scene {0}".format(self.scene["name"])) + scene_names = ["observer.perf_sql", "observer.cpu_high", "observer.px_collect_log"] + if self.scene["name"] not in scene_names: + self.stdio.error("unsupported hard code scene {0}".format(self.scene["name"])) return try: self.stdio.verbose("hard code scene {0} execute start".format(self.scene["name"])) diff --git a/handler/gather/scenes/sql_problem.py b/handler/gather/scenes/sql_problem.py index 95cbd81d..c6f8631a 100644 --- a/handler/gather/scenes/sql_problem.py +++ b/handler/gather/scenes/sql_problem.py @@ -42,10 +42,15 @@ def __init__(self, context, scene_name, report_path, task_variable_dict=None, en self.trace_id = "FAKE_TRACE_ID" def execute(self): + skip_type = self.context.get_variable("gather_skip_type", None) + if skip_type: + self.stdio.verbose("needs to be filtered out and not gather type is {0}".format(skip_type)) if self.__parse_env(): - self.__gather_log() - self.__gather_obproxy_log() - self.__gather_sql_info() + if skip_type != "ssh": + self.__gather_log() + self.__gather_obproxy_log() + if skip_type != "sql": + self.__gather_sql_info() def __gather_log(self): try: diff --git a/handler/gather/step/base.py b/handler/gather/step/base.py index 8729e35e..ee1bb9c2 100644 --- a/handler/gather/step/base.py +++ b/handler/gather/step/base.py @@ -53,16 +53,19 @@ def execute(self): if "type" not in self.step: self.stdio.error("Missing field :type") + skip_type = self.context.get_variable("gather_skip_type", None) + if skip_type: + self.stdio.verbose("needs to be filtered out and not gather type is {0}".format(skip_type)) if (self.node_number > 1) and self.step.get("global") and (self.step.get("global") is True): self.stdio.verbose("step sets the value of the global is true and it is processing the {0} node, skipping gather".format(self.node_number)) else: - if self.step["type"] == "ssh": + if self.step["type"] == "ssh" and (skip_type != "ssh"): handler = SshHandler(self.context, self.step, self.node, self.report_path, self.task_variable_dict) handler.execute() - elif self.step["type"] == "sql": + elif self.step["type"] == "sql" and (skip_type != "sql"): handler = StepSQLHandler(self.context, self.step, self.cluster, self.report_path, self.task_variable_dict) handler.execute() - elif self.step["type"] == "log": + elif self.step["type"] == "log" and (skip_type != "ssh"): if self.node.get("host_type") and self.node.get("host_type") == "OBSERVER": handler = GatherLogHandler(self.context, gather_pack_dir=self.report_path, is_scene=True) self.context.set_variable('filter_nodes_list', [self.node]) @@ -70,7 +73,7 @@ def execute(self): handler.handle() else: self.stdio.verbose("node host_type is {0} not OBSERVER, skipping gather log".format(self.node.get("host_type"))) - elif self.step["type"] == "obproxy_log": + elif self.step["type"] == "obproxy_log" and (skip_type != "ssh"): if self.node.get("host_type") and self.node.get("host_type") == "OBPROXY": handler = GatherObProxyLogHandler(self.context, gather_pack_dir=self.report_path, is_scene=True) self.context.set_variable('filter_nodes_list', [self.node]) @@ -78,12 +81,14 @@ def execute(self): handler.handle() else: self.stdio.verbose("node host_type is {0} not OBPROXY, skipping gather log".format(self.node.get("host_type"))) - elif self.step["type"] == "sysstat": + elif self.step["type"] == "sysstat" and (skip_type != "ssh"): handler = GatherOsInfoHandler(self.context, gather_pack_dir=self.report_path, is_scene=True) self.context.set_variable('filter_nodes_list', [self.node]) handler.handle() else: - self.stdio.error("the type not support: {0}".format(self.step["type"])) + support_types = ["ssh", "sql", "log", "obproxy_log", "sysstat"] + if self.step["type"] not in support_types: + self.stdio.error("{0} is an unsupported type. The currently supported types are {1}. {0}".format(self.step["type"], support_types)) except Exception as e: self.stdio.error("StepBase handler.execute fail, error: {0}".format(e)) if self.step["type"] == "sql": From 9f9f4ac9a68c588378febc06d463b163f6ed8a4b Mon Sep 17 00:00:00 2001 From: Teingi Date: Mon, 23 Sep 2024 20:56:02 +0800 Subject: [PATCH 2/6] close #279 --- common/tool.py | 20 ++++++++++++++------ handler/gather/scenes/sql_problem.py | 22 ++++++++++++++++++++++ 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/common/tool.py b/common/tool.py index 283f5679..cabf03e0 100644 --- a/common/tool.py +++ b/common/tool.py @@ -1168,15 +1168,23 @@ def parse_env(env_string, stdio=None): return env_dict @staticmethod - def get_observer_ip_from_trace_id(content, stdio=None): - if content[0] == 'Y' and len(content) >= 12: - sep = content.find('-') - uval = int(content[1:sep], 16) + def get_observer_ip_port_from_trace_id(trace_id): + if len(trace_id) >= 50: + raise ValueError(f"Trace_id({trace_id}) is invalid due to its length.") + + if trace_id[0] == 'Y': + id_ = trace_id.split('-')[0].split('Y')[1] + uval = int(id_, 16) ip = uval & 0xFFFFFFFF port = (uval >> 32) & 0xFFFF - return "%d.%d.%d.%d:%d" % ((ip >> 24 & 0xFF), (ip >> 16 & 0xFF), (ip >> 8 & 0xFF), (ip >> 0 & 0xFF), port) + ip_str = f"{(ip >> 24) & 0xFF}.{(ip >> 16) & 0xFF}.{(ip >> 8) & 0xFF}.{ip & 0xFF}" + origin_ip_port = f"{ip_str}:{port}" else: - return "" + parts = trace_id.split('-') + processed_parts = [hex(int(t))[2:].upper().zfill(16 if idx == 1 else 0) for idx, t in enumerate(parts)] + s = 'Y' + '-'.join(processed_parts) + origin_ip_port = StringUtils.get_observer_ip_port_from_trace_id(s) + return origin_ip_port @staticmethod def parse_range_string(range_str, nu, stdio=None): diff --git a/handler/gather/scenes/sql_problem.py b/handler/gather/scenes/sql_problem.py index c6f8631a..c121ac03 100644 --- a/handler/gather/scenes/sql_problem.py +++ b/handler/gather/scenes/sql_problem.py @@ -21,6 +21,8 @@ from handler.gather.gather_obproxy_log import GatherObProxyLogHandler from handler.gather.gather_plan_monitor import GatherPlanMonitorHandler from common.tool import StringUtils +from common.ssh_client.ssh import SshClient +from common.command import find_home_path_by_port class SQLProblemScene(SafeStdio): @@ -40,6 +42,7 @@ def __init__(self, context, scene_name, report_path, task_variable_dict=None, en self.scene_name = scene_name self.db_conn = {} self.trace_id = "FAKE_TRACE_ID" + self.task_nodes = [] def execute(self): skip_type = self.context.get_variable("gather_skip_type", None) @@ -52,10 +55,29 @@ def execute(self): if skip_type != "sql": self.__gather_sql_info() + def __find_home_path_by_port(self, ip_str, internal_port_str): + for node in self.ob_nodes: + if node["ip"] == ip_str: + remote_ip = node.get("ip") + remote_user = node.get("ssh_username") + try: + ssh_client = SshClient(self.context, node) + return find_home_path_by_port(ssh_client, internal_port_str, self.stdio) + except Exception as e: + self.stdio.error("ssh {0}@{1}: failed, Please check the config".format(remote_user, remote_ip)) + def __gather_log(self): try: + ip_port_str = StringUtils.get_observer_ip_port_from_trace_id(self.trace_id) + ip_str, internal_port_str = ip_port_str.split(':') + home_path_str = self.__find_home_path_by_port(ip_str, internal_port_str) + for node in self.ob_nodes: + if node["ip"] == ip_str and node["home_path"] == home_path_str: + self.task_nodes.append(node) + break self.stdio.verbose("gather observer log start") handler = GatherLogHandler(self.context, self.report_path, is_scene=True) + self.context.set_variable('filter_nodes_list', self.task_nodes) self.context.set_variable('gather_grep', self.trace_id) handler.handle() self.stdio.verbose("gather observer log end") From 9dad9a045aba4559167c730ff6c389ff08559cf9 Mon Sep 17 00:00:00 2001 From: Teingi Date: Tue, 24 Sep 2024 15:17:20 +0800 Subject: [PATCH 3/6] fix --- common/command.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/command.py b/common/command.py index e132f755..8e08df6b 100644 --- a/common/command.py +++ b/common/command.py @@ -449,7 +449,7 @@ def find_home_path_by_port(ssh_client, internal_port_str, stdio): for original_str in str_list: original_str = str(original_str) if original_str.endswith("/bin/observer") and not original_str.startswith('/[^\s]*'): - home_path = original_str.rstrip("/bin/observer") + home_path = original_str[: -len("/bin/observer")] break stdio.verbose("home_path:{0}".format(home_path)) return home_path From c9542dd79e9a2ea920328f065393208e4365ad8a Mon Sep 17 00:00:00 2001 From: Teingi Date: Tue, 24 Sep 2024 15:33:12 +0800 Subject: [PATCH 4/6] fix --- handler/gather/scenes/sql_problem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/handler/gather/scenes/sql_problem.py b/handler/gather/scenes/sql_problem.py index c121ac03..01218620 100644 --- a/handler/gather/scenes/sql_problem.py +++ b/handler/gather/scenes/sql_problem.py @@ -57,7 +57,7 @@ def execute(self): def __find_home_path_by_port(self, ip_str, internal_port_str): for node in self.ob_nodes: - if node["ip"] == ip_str: + if node.get("ip") and node["ip"] == ip_str: remote_ip = node.get("ip") remote_user = node.get("ssh_username") try: From c3720b9c7b338aaaa62850f250ea59b9f27f0717 Mon Sep 17 00:00:00 2001 From: Teingi Date: Tue, 24 Sep 2024 15:35:52 +0800 Subject: [PATCH 5/6] fix --- handler/gather/scenes/sql_problem.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/handler/gather/scenes/sql_problem.py b/handler/gather/scenes/sql_problem.py index 01218620..8d9c43b2 100644 --- a/handler/gather/scenes/sql_problem.py +++ b/handler/gather/scenes/sql_problem.py @@ -57,7 +57,7 @@ def execute(self): def __find_home_path_by_port(self, ip_str, internal_port_str): for node in self.ob_nodes: - if node.get("ip") and node["ip"] == ip_str: + if node.get("ip") == ip_str: remote_ip = node.get("ip") remote_user = node.get("ssh_username") try: @@ -72,7 +72,7 @@ def __gather_log(self): ip_str, internal_port_str = ip_port_str.split(':') home_path_str = self.__find_home_path_by_port(ip_str, internal_port_str) for node in self.ob_nodes: - if node["ip"] == ip_str and node["home_path"] == home_path_str: + if node.get("ip") == ip_str and node.get("home_path") == home_path_str: self.task_nodes.append(node) break self.stdio.verbose("gather observer log start") From 6107a420af69349cddc3bf6fd63064dfe83b64de Mon Sep 17 00:00:00 2001 From: Teingi Date: Tue, 24 Sep 2024 17:42:19 +0800 Subject: [PATCH 6/6] fix #382 --- handler/analyzer/analyze_flt_trace.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/handler/analyzer/analyze_flt_trace.py b/handler/analyzer/analyze_flt_trace.py index f71c4779..c8eb25d3 100644 --- a/handler/analyzer/analyze_flt_trace.py +++ b/handler/analyzer/analyze_flt_trace.py @@ -42,6 +42,7 @@ def __init__(self, context, gather_pack_dir=None): self.gather_pack_dir = gather_pack_dir self.flt_trace_id = '' self.nodes = [] + self.obproxy_nodes = [] self.workers = const.FLT_TRACE_WORKER self.max_recursion = const.FLT_TRACE_TREE_MAX_RECURSION self.config_path = const.DEFAULT_CONFIG_PATH @@ -51,6 +52,9 @@ def __init__(self, context, gather_pack_dir=None): def init_config(self): self.nodes = self.context.cluster_config['servers'] + self.obproxy_nodes = self.context.obproxy_config['servers'] + if len(self.obproxy_nodes) > 0: + self.nodes.extend(self.obproxy_nodes) self.inner_config = self.context.inner_config return True