From e37dc9b6dbd20a59f43c21879321d5f60303187c Mon Sep 17 00:00:00 2001 From: wyyalt Date: Wed, 15 Nov 2023 16:04:59 +0800 Subject: [PATCH] =?UTF-8?q?bugfix:=20=20=E5=90=8C=E6=AD=A5=E8=BF=9B?= =?UTF-8?q?=E7=A8=8B=E9=80=82=E9=85=8DGSE=202.0=E6=8E=A5=E5=8F=A3=20(close?= =?UTF-8?q?d=20#305)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/adapters/api/gse/base.py | 9 +++ apps/adapters/api/gse/v1.py | 3 + apps/api/modules/gse_v2.py | 6 ++ apps/core/gray/tools.py | 5 ++ apps/gsekit/process/handlers/process.py | 80 +++++++++++++++++-------- 5 files changed, 77 insertions(+), 26 deletions(-) diff --git a/apps/adapters/api/gse/base.py b/apps/adapters/api/gse/base.py index 6ad674c..4cda9d7 100644 --- a/apps/adapters/api/gse/base.py +++ b/apps/adapters/api/gse/base.py @@ -89,3 +89,12 @@ def get_gse_proc_key( :return: """ return f"{self.get_agent_id(mixed_types_of_host_info)}:{namespace}:{proc_name}" + + @abc.abstractmethod + def sync_proc_status(self, params: InfoDict) -> InfoDict: + """ + 获取进程状态信息 + :param params: 接口查询参数 + :return: + """ + raise NotImplementedError diff --git a/apps/adapters/api/gse/v1.py b/apps/adapters/api/gse/v1.py index 7c9237f..93c4ade 100644 --- a/apps/adapters/api/gse/v1.py +++ b/apps/adapters/api/gse/v1.py @@ -33,3 +33,6 @@ def _operate_proc_multi(self, proc_operate_req: base.InfoDictList, **options) -> def get_proc_operate_result(self, task_id: str) -> base.InfoDict: return self.gse_api_obj.get_proc_operate_result({"task_id": task_id, "no_request": True}, raw=True) + + def sync_proc_status(self, params: base.InfoDict) -> base.InfoDict: + return self.gse_api_obj.sync_proc_status(params) diff --git a/apps/api/modules/gse_v2.py b/apps/api/modules/gse_v2.py index bc3d201..91c5d4a 100644 --- a/apps/api/modules/gse_v2.py +++ b/apps/api/modules/gse_v2.py @@ -37,3 +37,9 @@ def __init__(self): module=self.MODULE, description="查询进程状态信息", ) + self.sync_proc_status = DataAPI( + method="POST", + url=GSE_APIGATEWAY_ROOT_V2 + "api/v2/proc/sync_proc_status/", + module=self.MODULE, + description="同步查询进程状态信息", + ) diff --git a/apps/core/gray/tools.py b/apps/core/gray/tools.py index 0ef85fa..ec79293 100644 --- a/apps/core/gray/tools.py +++ b/apps/core/gray/tools.py @@ -13,6 +13,7 @@ from apps.gsekit import constants from apps.gsekit.meta import models from apps.utils.cache import func_cache_decorator +from env.constants import GseVersion class GrayTools: @@ -42,3 +43,7 @@ def is_gse2_gray(self, bk_biz_id: typing.Any) -> bool: :return: """ return int(bk_biz_id) in self.gse2_gray_scope_set + + def get_gse_version_by_biz_id(self, bk_biz_id: typing.Any) -> str: + is_gse2_gray: bool = self.is_gse2_gray(bk_biz_id=bk_biz_id) + return GseVersion.V2.value if is_gse2_gray else GseVersion.V1.value diff --git a/apps/gsekit/process/handlers/process.py b/apps/gsekit/process/handlers/process.py index 2d7c2a6..2f8649e 100644 --- a/apps/gsekit/process/handlers/process.py +++ b/apps/gsekit/process/handlers/process.py @@ -19,8 +19,9 @@ from django.db import transaction from django.db.models import Q, QuerySet +from apps.adapters.api.gse.base import GseApiBaseHelper -from apps.api import CCApi, GseApi +from apps.api import CCApi from apps.gsekit import constants from apps.gsekit.cmdb.handlers.cmdb import CMDBHandler from apps.gsekit.configfile.models import ConfigTemplateBindingRelationship, ConfigTemplate @@ -39,6 +40,8 @@ from apps.utils.batch_request import batch_request, request_multi_thread from apps.utils.mako_utils.render import mako_render from common.log import logger +from apps.adapters.api.gse import get_gse_api_helper +from apps.core.gray.tools import GrayTools class ProcessHandler(APIModel): @@ -773,7 +776,7 @@ def create_process_inst(self, process_list: List): ProcessInst.objects.filter(reduce(operator.or_, to_be_deleted_inst_condition)).delete() ProcessInst.objects.bulk_create(to_be_created_inst, batch_size=constants.ORM_BATCH_SIZE) - def sync_proc_status_to_db(self, proc_status_infos=None): + def sync_proc_status_to_db(self, proc_status_infos=None, gse_api_helper: GseApiBaseHelper = None): """ 同步业务进程状态 :return: @@ -781,7 +784,7 @@ def sync_proc_status_to_db(self, proc_status_infos=None): # 不传proc_status_infos默认拉取sync_proc_status接口数据,该接口有5min状态延迟 if not proc_status_infos: proc_status_infos = batch_request( - GseApi.sync_proc_status, + gse_api_helper.sync_proc_status, {"meta": {"namespace": NAMESPACE.format(bk_biz_id=self.bk_biz_id)}}, get_data=lambda x: x["proc_infos"], ) @@ -791,6 +794,7 @@ def sync_proc_status_to_db(self, proc_status_infos=None): proc_status_info["status"] = Process.ProcessStatus.TERMINATED # 统一is_auto字段 proc_status_info["is_auto"] = proc_status_info["isauto"] + # TODO 这段代码是有问题的 通过agentid查询未返回ip,(这段代码暂未用到) proc_status_info["inst_uniq_key"] = ( f"{proc_status_info['host']['ip']}-{proc_status_info['host']['bk_cloud_id']}-" f"{'-'.join(proc_status_info['meta']['name'].rsplit('_', 1))}" @@ -930,8 +934,9 @@ def sync_proc_status_to_db(self, proc_status_infos=None): Process.objects.filter(bk_process_id__in=to_be_updated_auto_proc_ids).update(is_auto=True) @staticmethod - def get_proc_inst_status_infos(proc_inst_infos, _request=None) -> List[Dict]: - base_params = {"_request": _request} if _request else {} + def get_proc_inst_status_infos( + proc_inst_infos, _request=None, gse_api_helper: GseApiBaseHelper = None + ) -> List[Dict]: proc_operate_req_slice = [] meta_key_uniq_key_map = {} for proc_inst_info in proc_inst_infos: @@ -965,9 +970,8 @@ def get_proc_inst_status_infos(proc_inst_infos, _request=None) -> List[Dict]: bk_process_name=process_info["bk_process_name"], local_inst_id=local_inst_id, ) - meta_key = ( - f"{host_info['bk_cloud_id']}:{host_info['bk_host_innerip']}:" - f"{namespace}:{process_info['bk_process_name']}_{local_inst_id}" + meta_key: str = gse_api_helper.get_gse_proc_key( + host_info, namespace=namespace, proc_name=f"{process_info['bk_process_name']}_{local_inst_id}" ) meta_key_uniq_key_map[meta_key] = uniq_key @@ -982,7 +986,13 @@ def get_proc_inst_status_infos(proc_inst_infos, _request=None) -> List[Dict]: }, }, "op_type": GseOpType.CHECK, - "hosts": [{"ip": host_info["bk_host_innerip"], "bk_cloud_id": host_info["bk_cloud_id"]}], + "hosts": [ + { + "bk_host_innerip": host_info["bk_host_innerip"], + "bk_cloud_id": host_info["bk_cloud_id"], + "bk_agent_id": host_info.get("bk_agent_id", ""), + } + ], "spec": { "identity": { "index_key": "", @@ -1008,14 +1018,14 @@ def get_proc_inst_status_infos(proc_inst_infos, _request=None) -> List[Dict]: } ) - gse_task_id = GseApi.operate_proc_multi({"proc_operate_req": proc_operate_req_slice, **base_params})["task_id"] + gse_task_id: str = gse_api_helper.operate_proc_multi(proc_operate_req=proc_operate_req_slice) proc_inst_status_infos = [] uniq_keys_recorded = set() poll_time, interval, timeout = 0, 1.5, 60 while True: try: - gse_api_result = GseApi.get_proc_operate_result({"task_id": gse_task_id, **base_params}) + gse_api_result = gse_api_helper.get_proc_operate_result(gse_task_id)["data"] except Exception as error: logger.error( "[sync_biz_process_status | get_proc_inst_status_infos] " @@ -1032,18 +1042,25 @@ def get_proc_inst_status_infos(proc_inst_infos, _request=None) -> List[Dict]: continue if task_result.get("error_code") == GseDataErrorCode.SUCCESS: gse_ip_proc_info = json.loads(task_result["content"]) - proc_inst_status_infos.append( - { - "is_auto": gse_ip_proc_info["process"][0]["instance"][0].get("isAuto", False), - "status": ( - Process.ProcessStatus.RUNNING - if gse_ip_proc_info["process"][0]["instance"][0].get("pid", -1) > 0 - else Process.ProcessStatus.TERMINATED - ), - "inst_uniq_key": meta_key_uniq_key_map[meta_key], - } - ) - uniq_keys_recorded.add(meta_key_uniq_key_map[meta_key]) + try: + proc_inst_status_infos.append( + { + "is_auto": gse_ip_proc_info["process"][0]["instance"][0].get("isAuto", False), + "status": ( + Process.ProcessStatus.RUNNING + if gse_ip_proc_info["process"][0]["instance"][0].get("pid", -1) > 0 + else Process.ProcessStatus.TERMINATED + ), + "inst_uniq_key": meta_key_uniq_key_map[meta_key], + } + ) + uniq_keys_recorded.add(meta_key_uniq_key_map[meta_key]) + except IndexError: + logger.error( + f"[sync_biz_process_status] | task_result: {task_result} " + "gse_task_id: {gse_task_id}, process_list is None: process" + ) + continue elif task_result.get("error_code") != GseDataErrorCode.RUNNING: uniq_keys_recorded.add(meta_key_uniq_key_map[meta_key]) @@ -1104,18 +1121,29 @@ def sync_biz_process_status(self): } ) + # 获取业务gse version + gse_version: str = GrayTools().get_gse_version_by_biz_id(self.bk_biz_id) + gse_api_helper: GseApiBaseHelper = get_gse_api_helper(gse_version=gse_version) + # 片起始位置,分片大小 limit = 1000 params_list = [ - dict(proc_inst_infos=proc_inst_infos[start : start + limit]) + dict( + proc_inst_infos=proc_inst_infos[start : start + limit], + gse_api_helper=gse_api_helper, + ) for start in range(0, len(proc_inst_infos), limit) ] + proc_status_infos = request_multi_thread( - self.get_proc_inst_status_infos, params_list, get_data=lambda x: x, get_request_target=lambda x: x + self.get_proc_inst_status_infos, + params_list, + get_data=lambda x: x, + get_request_target=lambda x: x, ) # 更新进程实例状态并汇总到Process - self.sync_proc_status_to_db(proc_status_infos) + self.sync_proc_status_to_db(proc_status_infos, gse_api_helper=gse_api_helper) cost_time = time.time() - begin_time logger.info("[sync_proc_status] cost: {cost_time}s".format(cost_time=cost_time))