Skip to content

Commit

Permalink
feat: 超大订阅方案 (closed #2429)
Browse files Browse the repository at this point in the history
  • Loading branch information
wyyalt committed Dec 26, 2024
1 parent b5814a6 commit ce37fab
Show file tree
Hide file tree
Showing 31 changed files with 937 additions and 198 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ repos:
name: black
language: python
- repo: https://github.com/pycqa/isort
rev: 5.5.4
rev: 5.12.0
hooks:
- id: isort
args: [ "--profile", "black" ]
Expand Down
1 change: 1 addition & 0 deletions apps/backend/agent/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
class AgentServiceActivity(ServiceActivity):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.is_multi_paralle_gateway: bool = False
self.component.inputs.meta = Var(type=Var.SPLICE, value="${meta}")
self.component.inputs.description = Var(type=Var.SPLICE, value="${description}")
self.component.inputs.blueking_language = Var(type=Var.SPLICE, value="${blueking_language}")
Expand Down
29 changes: 27 additions & 2 deletions apps/backend/components/collections/agent_new/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from apps.prometheus.helper import SetupObserve

from .. import job
from ..base import BaseService, CommonData
from ..base import BaseService, CommonData, RedisCommonData

logger = logging.getLogger("celery")

Expand All @@ -49,6 +49,29 @@ class AgentCommonData(CommonData):
injected_ap_id: int


class RedisAgentCommonData(RedisCommonData):

# 默认接入点
@property
def default_ap(self) -> models.AccessPoint:
return self._get_attr_from_redis("default_ap")

# 主机ID - 接入点 映射关系
@property
def host_id__ap_map(self) -> Dict[int, models.AccessPoint]:
return self._get_attr_from_redis("host_id__ap_map")

# AgentStep 适配器
@property
def agent_step_adapter(self) -> AgentStepAdapter:
return self._get_attr_from_redis("agent_step_adapter")

# 注入AP_ID
@property
def injected_ap_id(self) -> int:
return self._get_attr_from_redis("injected_ap_id")


class AgentBaseService(BaseService, metaclass=abc.ABCMeta):
"""
AGENT安装基类
Expand Down Expand Up @@ -91,7 +114,9 @@ def get_common_data(cls, data):
common_data.subscription_step, gse_version=data.get_one_of_inputs("meta", {}).get("GSE_VERSION")
)

return AgentCommonData(
agent_common_data_cls = AgentCommonData if isinstance(common_data, CommonData) else RedisAgentCommonData

return agent_common_data_cls(
bk_host_ids=common_data.bk_host_ids,
host_id_obj_map=common_data.host_id_obj_map,
ap_id_obj_map=common_data.ap_id_obj_map,
Expand Down
80 changes: 79 additions & 1 deletion apps/backend/components/collections/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import ast
import logging
import os
import pickle
import traceback
import typing
import uuid
from collections import defaultdict
from dataclasses import dataclass
from typing import (
Expand All @@ -32,6 +35,7 @@
from django.db.models.functions import Concat
from django.utils import timezone
from django.utils.translation import ugettext as _
from django_redis import get_redis_connection

from apps.adapters.api.gse import GseApiBaseHelper, get_gse_api_helper
from apps.backend.api.constants import POLLING_TIMEOUT
Expand All @@ -45,6 +49,7 @@
from apps.prometheus.helper import SetupObserve
from apps.utils import cache, time_handler, translation
from apps.utils.exc import ExceptionHandler
from apps.utils.redis import REDIS_CACHE_DATA_TIMEOUT
from pipeline.core.flow import Service

logger = logging.getLogger("celery")
Expand Down Expand Up @@ -263,6 +268,73 @@ class CommonData:
subscription_instance_ids: Set[int]


class RedisCommonData:
def __init__(self, *args, **kwargs):
self.uuid_key = uuid.uuid4().hex
self.client = get_redis_connection()

for k, v in dict(*args, **kwargs).items():
self.client.hset(self.uuid_key, k, str(pickle.dumps(v)))

self.client.expire(self.uuid_key, REDIS_CACHE_DATA_TIMEOUT)

def _get_attr_from_redis(self, key):
return pickle.loads(ast.literal_eval(self.client.hget(self.uuid_key, key)))

def __del__(self):
self.client.delete(self.uuid_key)

def __enter__(self):
return self

def __exit__(self, *args, **kwargs):
self.__del__()

@property
def bk_host_ids(self) -> Set[int]:
return self._get_attr_from_redis("bk_host_ids")

@property
def host_id_obj_map(self) -> Dict[int, models.Host]:
return self._get_attr_from_redis("host_id_obj_map")

@property
def sub_inst_id__host_id_map(self) -> Dict[int, int]:
return self._get_attr_from_redis("sub_inst_id__host_id_map")

@property
def host_id__sub_inst_id_map(self) -> Dict[int, int]:
return self._get_attr_from_redis("host_id__sub_inst_id_map")

@property
def ap_id_obj_map(self) -> Dict[int, models.AccessPoint]:
return self._get_attr_from_redis("ap_id_obj_map")

@property
def sub_inst_id__sub_inst_obj_map(self) -> Dict[int, models.SubscriptionInstanceRecord]:
return self._get_attr_from_redis("sub_inst_id__sub_inst_obj_map")

@property
def gse_api_helper(self) -> GseApiBaseHelper:
return self._get_attr_from_redis("gse_api_helper")

@property
def subscription(self) -> models.Subscription:
return self._get_attr_from_redis("subscription")

@property
def subscription_step(self) -> models.SubscriptionStep:
return self._get_attr_from_redis("subscription_step")

@property
def subscription_instances(self) -> List[models.SubscriptionInstanceRecord]:
return self._get_attr_from_redis("subscription_instances")

@property
def subscription_instance_ids(self) -> Set[int]:
return self._get_attr_from_redis("subscription_instance_ids")


class BaseService(Service, LogMixin, DBHelperMixin, PollingTimeoutMixin):

# 失败订阅实例ID - 失败原因 映射关系
Expand Down Expand Up @@ -447,7 +519,10 @@ def get_common_data(cls, data):
break

ap_id_obj_map = models.AccessPoint.ap_id_obj_map()
return CommonData(

common_data_cls = RedisCommonData if data.get_one_of_inputs("is_multi_paralle_gateway") else CommonData

return common_data_cls(
bk_host_ids=bk_host_ids,
host_id_obj_map=host_id_obj_map,
sub_inst_id__host_id_map=sub_inst_id__host_id_map,
Expand Down Expand Up @@ -610,6 +685,9 @@ def inputs_format(self):
),
Service.InputItem(name="subscription_step_id", key="subscription_step_id", type="int", required=True),
Service.InputItem(name="blueking_language", key="blueking_language", type="str", required=True),
Service.InputItem(
name="is_multi_paralle_gateway", key="is_multi_paralle_gateway", type="bool", required=True
),
]

def outputs_format(self):
Expand Down
37 changes: 35 additions & 2 deletions apps/backend/components/collections/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@
GseDataErrCode,
)
from apps.backend.api.job import process_parms
from apps.backend.components.collections.base import BaseService, CommonData
from apps.backend.components.collections.base import (
BaseService,
CommonData,
RedisCommonData,
)
from apps.backend.components.collections.common.script_content import INITIALIZE_SCRIPT
from apps.backend.components.collections.job import (
JobExecuteScriptService,
Expand Down Expand Up @@ -86,6 +90,34 @@ def __post_init__(self):
self.plugin_name = self.policy_step_adapter.plugin_name


class RedisPluginCommonData(RedisCommonData):

# 进程状态列表
@property
def process_statuses(self) -> List[models.ProcessStatus]:
return self._get_attr_from_redis("process_statuses")

# 目标主机列表,用于远程采集场景
@property
def target_host_objs(self) -> Optional[List[models.Host]]:
return self._get_attr_from_redis("target_host_objs")

# PluginStep 适配器,用于屏蔽不同类型的插件操作类订阅差异
@property
def policy_step_adapter(self) -> PolicyStepAdapter:
return self._get_attr_from_redis("policy_step_adapter")

# group_id - 订阅实例记录映射关系
@property
def group_id_instance_map(self) -> Dict[str, models.SubscriptionInstanceRecord]:
return self._get_attr_from_redis("group_id_instance_map")

# 插件名称
@property
def plugin_name(self) -> str:
return self._get_attr_from_redis("policy_step_adapter").plugin_name


class PluginBaseService(BaseService, metaclass=abc.ABCMeta):
"""
插件原子基类,提供一些常用的数据获取方法
Expand Down Expand Up @@ -134,7 +166,8 @@ def get_common_data(cls, data):
process_statuses = models.ProcessStatus.objects.filter(
name=policy_step_adapter.plugin_name, group_id__in=group_id_instance_map.keys()
)
return PluginCommonData(
plugin_common_data_cls = PluginCommonData if isinstance(common_data, CommonData) else RedisPluginCommonData
return plugin_common_data_cls(
bk_host_ids=common_data.bk_host_ids,
host_id_obj_map=common_data.host_id_obj_map,
ap_id_obj_map=common_data.ap_id_obj_map,
Expand Down
4 changes: 3 additions & 1 deletion apps/backend/periodic_tasks/cache_scope_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from apps.backend.subscription import constants, tools
from apps.node_man import models
from apps.node_man.constants import DataBackend
from apps.utils.md5 import count_md5
from apps.utils.periodic_task import calculate_countdown

Expand All @@ -31,8 +32,9 @@ def get_instances_by_scope_task(subscription_id):
f" scope_md5: {scope_md5}, scope: {subscription.scope}"
)
# 查询后会进行缓存,详见 get_instances_by_scope 的装饰器 func_cache_decorator
data_backend = DataBackend.REDIS.value if subscription.is_multi_paralle_gateway else DataBackend.MEM.value
tools.get_instances_by_scope_with_checker(
subscription.scope, subscription.steps, source="get_instances_by_scope_task"
subscription.scope, subscription.steps, source="get_instances_by_scope_task", data_backend=data_backend
)
logger.info(f"[cache_subscription_scope_instances] (subscription: {subscription_id}) end.")

Expand Down
38 changes: 30 additions & 8 deletions apps/backend/subscription/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
specific language governing permissions and limitations under the License.
"""
import logging
from typing import Union

from django.core.cache import cache

Expand All @@ -18,6 +19,7 @@
from apps.prometheus import metrics
from apps.prometheus.helper import SetupObserve, get_call_resource_labels_func
from apps.utils.batch_request import batch_request
from apps.utils.redis import DynamicContainer, RedisList

logger = logging.getLogger("app")

Expand All @@ -38,7 +40,7 @@ def get_host_object_attribute(bk_biz_id):


@SetupObserve(counter=metrics.app_common_method_requests_total, get_labels_func=get_call_resource_labels_func)
def list_biz_hosts(bk_biz_id, condition, func, split_params=False):
def list_biz_hosts(bk_biz_id, condition, func, split_params=False, data_backend: str = None):
biz_custom_property = []
kwargs = {
"fields": constants.CC_HOST_FIELDS,
Expand All @@ -50,14 +52,16 @@ def list_biz_hosts(bk_biz_id, condition, func, split_params=False):
kwargs["fields"] += list(set(biz_custom_property))
kwargs["fields"] = list(set(kwargs["fields"]))
kwargs.update(condition)

hosts = batch_request(getattr(client_v2.cc, func), kwargs, split_params=split_params)
hosts = batch_request(getattr(client_v2.cc, func), kwargs, split_params=split_params, data_backend=data_backend)
# 排除掉CMDB中内网IP为空的主机
cleaned_hosts = [host for host in hosts if host.get("bk_host_innerip") or host.get("bk_host_innerip_v6")]
cleaned_hosts: Union[RedisList, list] = DynamicContainer(
return_type=constants.DCReturnType.LIST.value, data_backend=data_backend
).container
cleaned_hosts.extend([host for host in hosts if host.get("bk_host_innerip") or host.get("bk_host_innerip_v6")])
return cleaned_hosts


def get_host_by_inst(bk_biz_id, inst_list):
def get_host_by_inst(bk_biz_id, inst_list, data_backend: str = None):
"""
根据拓扑节点查询主机
:param inst_list: 实例列表
Expand All @@ -67,7 +71,9 @@ def get_host_by_inst(bk_biz_id, inst_list):
if not bk_biz_id:
raise BizNotExistError()

hosts = []
hosts: Union[RedisList, list] = DynamicContainer(
return_type=constants.DCReturnType.LIST.value, data_backend=data_backend
).container
bk_module_ids = []
bk_set_ids = []
bk_biz_ids = []
Expand All @@ -88,20 +94,35 @@ def get_host_by_inst(bk_biz_id, inst_list):
# 自定义层级
topo_cond = {"bk_obj_id": inst["bk_obj_id"], "bk_inst_id": inst["bk_inst_id"]}
hosts.extend(
list_biz_hosts(bk_biz_id, topo_cond, "find_host_by_topo", source="get_host_by_inst:find_host_by_topo")
list_biz_hosts(
bk_biz_id,
topo_cond,
"find_host_by_topo",
source="get_host_by_inst:find_host_by_topo",
data_backend=data_backend,
)
)

if bk_biz_ids:
# 业务查询
for bk_biz_id in bk_biz_ids:
hosts.extend(list_biz_hosts(bk_biz_id, {}, "list_biz_hosts", source="get_host_by_inst:list_biz_hosts:biz"))
hosts.extend(
list_biz_hosts(
bk_biz_id,
{},
"list_biz_hosts",
source="get_host_by_inst:list_biz_hosts:biz",
data_backend=data_backend,
)
)
if bk_set_ids:
# 集群查询
hosts.extend(
list_biz_hosts(
bk_biz_id,
{"set_cond": [{"field": "bk_set_id", "operator": "$in", "value": bk_set_ids}]},
"list_biz_hosts",
data_backend=data_backend,
)
)
if bk_module_ids:
Expand All @@ -113,6 +134,7 @@ def get_host_by_inst(bk_biz_id, inst_list):
"list_biz_hosts",
split_params=True,
source="get_host_by_inst:list_biz_hosts:module",
data_backend=data_backend,
)
)

Expand Down
3 changes: 3 additions & 0 deletions apps/backend/subscription/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,8 @@
# 单个任务主机数量
TASK_HOST_LIMIT = 500

# 单个并行网关的子进程数量 并行网关主机数量 1000*500
PARALLE_GATEWAY_PROCESS_LIMIT = 1000

# 订阅范围实例缓存时间,比自动下发周期多1小时
SUBSCRIPTION_SCOPE_CACHE_TIME = SUBSCRIPTION_UPDATE_INTERVAL + constants.TimeUnit.HOUR
Loading

0 comments on commit ce37fab

Please sign in to comment.