Skip to content

Commit

Permalink
bugfix: 同步进程适配GSE 2.0接口 (closed #305)
Browse files Browse the repository at this point in the history
wyyalt authored and wyyalt committed Nov 17, 2023
1 parent 31b7a10 commit e37dc9b
Showing 5 changed files with 77 additions and 26 deletions.
9 changes: 9 additions & 0 deletions apps/adapters/api/gse/base.py
Original file line number Diff line number Diff line change
@@ -89,3 +89,12 @@ def get_gse_proc_key(
:return:
"""
return f"{self.get_agent_id(mixed_types_of_host_info)}:{namespace}:{proc_name}"

@abc.abstractmethod
def sync_proc_status(self, params: InfoDict) -> InfoDict:
"""
获取进程状态信息
:param params: 接口查询参数
:return:
"""
raise NotImplementedError
3 changes: 3 additions & 0 deletions apps/adapters/api/gse/v1.py
Original file line number Diff line number Diff line change
@@ -33,3 +33,6 @@ def _operate_proc_multi(self, proc_operate_req: base.InfoDictList, **options) ->

def get_proc_operate_result(self, task_id: str) -> base.InfoDict:
return self.gse_api_obj.get_proc_operate_result({"task_id": task_id, "no_request": True}, raw=True)

def sync_proc_status(self, params: base.InfoDict) -> base.InfoDict:
return self.gse_api_obj.sync_proc_status(params)
6 changes: 6 additions & 0 deletions apps/api/modules/gse_v2.py
Original file line number Diff line number Diff line change
@@ -37,3 +37,9 @@ def __init__(self):
module=self.MODULE,
description="查询进程状态信息",
)
self.sync_proc_status = DataAPI(
method="POST",
url=GSE_APIGATEWAY_ROOT_V2 + "api/v2/proc/sync_proc_status/",
module=self.MODULE,
description="同步查询进程状态信息",
)
5 changes: 5 additions & 0 deletions apps/core/gray/tools.py
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@
from apps.gsekit import constants
from apps.gsekit.meta import models
from apps.utils.cache import func_cache_decorator
from env.constants import GseVersion


class GrayTools:
@@ -42,3 +43,7 @@ def is_gse2_gray(self, bk_biz_id: typing.Any) -> bool:
:return:
"""
return int(bk_biz_id) in self.gse2_gray_scope_set

def get_gse_version_by_biz_id(self, bk_biz_id: typing.Any) -> str:
is_gse2_gray: bool = self.is_gse2_gray(bk_biz_id=bk_biz_id)
return GseVersion.V2.value if is_gse2_gray else GseVersion.V1.value
80 changes: 54 additions & 26 deletions apps/gsekit/process/handlers/process.py
Original file line number Diff line number Diff line change
@@ -19,8 +19,9 @@

from django.db import transaction
from django.db.models import Q, QuerySet
from apps.adapters.api.gse.base import GseApiBaseHelper

from apps.api import CCApi, GseApi
from apps.api import CCApi
from apps.gsekit import constants
from apps.gsekit.cmdb.handlers.cmdb import CMDBHandler
from apps.gsekit.configfile.models import ConfigTemplateBindingRelationship, ConfigTemplate
@@ -39,6 +40,8 @@
from apps.utils.batch_request import batch_request, request_multi_thread
from apps.utils.mako_utils.render import mako_render
from common.log import logger
from apps.adapters.api.gse import get_gse_api_helper
from apps.core.gray.tools import GrayTools


class ProcessHandler(APIModel):
@@ -773,15 +776,15 @@ def create_process_inst(self, process_list: List):
ProcessInst.objects.filter(reduce(operator.or_, to_be_deleted_inst_condition)).delete()
ProcessInst.objects.bulk_create(to_be_created_inst, batch_size=constants.ORM_BATCH_SIZE)

def sync_proc_status_to_db(self, proc_status_infos=None):
def sync_proc_status_to_db(self, proc_status_infos=None, gse_api_helper: GseApiBaseHelper = None):
"""
同步业务进程状态
:return:
"""
# 不传proc_status_infos默认拉取sync_proc_status接口数据,该接口有5min状态延迟
if not proc_status_infos:
proc_status_infos = batch_request(
GseApi.sync_proc_status,
gse_api_helper.sync_proc_status,
{"meta": {"namespace": NAMESPACE.format(bk_biz_id=self.bk_biz_id)}},
get_data=lambda x: x["proc_infos"],
)
@@ -791,6 +794,7 @@ def sync_proc_status_to_db(self, proc_status_infos=None):
proc_status_info["status"] = Process.ProcessStatus.TERMINATED
# 统一is_auto字段
proc_status_info["is_auto"] = proc_status_info["isauto"]
# TODO 这段代码是有问题的 通过agentid查询未返回ip,(这段代码暂未用到)
proc_status_info["inst_uniq_key"] = (
f"{proc_status_info['host']['ip']}-{proc_status_info['host']['bk_cloud_id']}-"
f"{'-'.join(proc_status_info['meta']['name'].rsplit('_', 1))}"
@@ -930,8 +934,9 @@ def sync_proc_status_to_db(self, proc_status_infos=None):
Process.objects.filter(bk_process_id__in=to_be_updated_auto_proc_ids).update(is_auto=True)

@staticmethod
def get_proc_inst_status_infos(proc_inst_infos, _request=None) -> List[Dict]:
base_params = {"_request": _request} if _request else {}
def get_proc_inst_status_infos(
proc_inst_infos, _request=None, gse_api_helper: GseApiBaseHelper = None
) -> List[Dict]:
proc_operate_req_slice = []
meta_key_uniq_key_map = {}
for proc_inst_info in proc_inst_infos:
@@ -965,9 +970,8 @@ def get_proc_inst_status_infos(proc_inst_infos, _request=None) -> List[Dict]:
bk_process_name=process_info["bk_process_name"],
local_inst_id=local_inst_id,
)
meta_key = (
f"{host_info['bk_cloud_id']}:{host_info['bk_host_innerip']}:"
f"{namespace}:{process_info['bk_process_name']}_{local_inst_id}"
meta_key: str = gse_api_helper.get_gse_proc_key(
host_info, namespace=namespace, proc_name=f"{process_info['bk_process_name']}_{local_inst_id}"
)

meta_key_uniq_key_map[meta_key] = uniq_key
@@ -982,7 +986,13 @@ def get_proc_inst_status_infos(proc_inst_infos, _request=None) -> List[Dict]:
},
},
"op_type": GseOpType.CHECK,
"hosts": [{"ip": host_info["bk_host_innerip"], "bk_cloud_id": host_info["bk_cloud_id"]}],
"hosts": [
{
"bk_host_innerip": host_info["bk_host_innerip"],
"bk_cloud_id": host_info["bk_cloud_id"],
"bk_agent_id": host_info.get("bk_agent_id", ""),
}
],
"spec": {
"identity": {
"index_key": "",
@@ -1008,14 +1018,14 @@ def get_proc_inst_status_infos(proc_inst_infos, _request=None) -> List[Dict]:
}
)

gse_task_id = GseApi.operate_proc_multi({"proc_operate_req": proc_operate_req_slice, **base_params})["task_id"]
gse_task_id: str = gse_api_helper.operate_proc_multi(proc_operate_req=proc_operate_req_slice)

proc_inst_status_infos = []
uniq_keys_recorded = set()
poll_time, interval, timeout = 0, 1.5, 60
while True:
try:
gse_api_result = GseApi.get_proc_operate_result({"task_id": gse_task_id, **base_params})
gse_api_result = gse_api_helper.get_proc_operate_result(gse_task_id)["data"]
except Exception as error:
logger.error(
"[sync_biz_process_status | get_proc_inst_status_infos] "
@@ -1032,18 +1042,25 @@ def get_proc_inst_status_infos(proc_inst_infos, _request=None) -> List[Dict]:
continue
if task_result.get("error_code") == GseDataErrorCode.SUCCESS:
gse_ip_proc_info = json.loads(task_result["content"])
proc_inst_status_infos.append(
{
"is_auto": gse_ip_proc_info["process"][0]["instance"][0].get("isAuto", False),
"status": (
Process.ProcessStatus.RUNNING
if gse_ip_proc_info["process"][0]["instance"][0].get("pid", -1) > 0
else Process.ProcessStatus.TERMINATED
),
"inst_uniq_key": meta_key_uniq_key_map[meta_key],
}
)
uniq_keys_recorded.add(meta_key_uniq_key_map[meta_key])
try:
proc_inst_status_infos.append(
{
"is_auto": gse_ip_proc_info["process"][0]["instance"][0].get("isAuto", False),
"status": (
Process.ProcessStatus.RUNNING
if gse_ip_proc_info["process"][0]["instance"][0].get("pid", -1) > 0
else Process.ProcessStatus.TERMINATED
),
"inst_uniq_key": meta_key_uniq_key_map[meta_key],
}
)
uniq_keys_recorded.add(meta_key_uniq_key_map[meta_key])
except IndexError:
logger.error(
f"[sync_biz_process_status] | task_result: {task_result} "
"gse_task_id: {gse_task_id}, process_list is None: process"
)
continue
elif task_result.get("error_code") != GseDataErrorCode.RUNNING:
uniq_keys_recorded.add(meta_key_uniq_key_map[meta_key])

@@ -1104,18 +1121,29 @@ def sync_biz_process_status(self):
}
)

# 获取业务gse version
gse_version: str = GrayTools().get_gse_version_by_biz_id(self.bk_biz_id)
gse_api_helper: GseApiBaseHelper = get_gse_api_helper(gse_version=gse_version)

# 片起始位置,分片大小
limit = 1000
params_list = [
dict(proc_inst_infos=proc_inst_infos[start : start + limit])
dict(
proc_inst_infos=proc_inst_infos[start : start + limit],
gse_api_helper=gse_api_helper,
)
for start in range(0, len(proc_inst_infos), limit)
]

proc_status_infos = request_multi_thread(
self.get_proc_inst_status_infos, params_list, get_data=lambda x: x, get_request_target=lambda x: x
self.get_proc_inst_status_infos,
params_list,
get_data=lambda x: x,
get_request_target=lambda x: x,
)

# 更新进程实例状态并汇总到Process
self.sync_proc_status_to_db(proc_status_infos)
self.sync_proc_status_to_db(proc_status_infos, gse_api_helper=gse_api_helper)

cost_time = time.time() - begin_time
logger.info("[sync_proc_status] cost: {cost_time}s".format(cost_time=cost_time))

0 comments on commit e37dc9b

Please sign in to comment.