Skip to content

Commit

Permalink
feat: 任务清理节点相关查询增加 batch 分批逻辑 #7668
Browse files Browse the repository at this point in the history
  • Loading branch information
normal-wls committed Jan 21, 2025
1 parent aaeafc4 commit 52662b7
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 25 deletions.
1 change: 1 addition & 0 deletions config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,7 @@ def monitor_report_config():
CLEAN_EXPIRED_V2_TASK_CRON = env.CLEAN_EXPIRED_V2_TASK_CRON
V2_TASK_VALIDITY_DAY = env.V2_TASK_VALIDITY_DAY
CLEAN_EXPIRED_V2_TASK_BATCH_NUM = env.CLEAN_EXPIRED_V2_TASK_BATCH_NUM
CLEAN_EXPIRED_V2_TASK_NODE_BATCH_NUM = env.CLEAN_EXPIRED_V2_TASK_NODE_BATCH_NUM
CLEAN_EXPIRED_V2_TASK_INSTANCE = env.CLEAN_EXPIRED_V2_TASK_INSTANCE
CLEAN_EXPIRED_V2_TASK_CREATE_METHODS = env.CLEAN_EXPIRED_V2_TASK_CREATE_METHODS
CLEAN_EXPIRED_V2_TASK_PROJECTS = env.CLEAN_EXPIRED_V2_TASK_PROJECTS
Expand Down
1 change: 1 addition & 0 deletions env.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
CLEAN_EXPIRED_V2_TASK_CRON = tuple(os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_CRON", "30 0 * * *").split())
V2_TASK_VALIDITY_DAY = int(os.getenv("BKAPP_V2_TASK_VALIDITY_DAY", 730))
CLEAN_EXPIRED_V2_TASK_BATCH_NUM = int(os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_BATCH_NUM", 100))
CLEAN_EXPIRED_V2_TASK_NODE_BATCH_NUM = int(os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_NODE_BATCH_NUM", 5000))
CLEAN_EXPIRED_V2_TASK_INSTANCE = bool(os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_INSTANCE", False))
CLEAN_EXPIRED_V2_TASK_CREATE_METHODS = os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_CREATE_METHODS", "periodic").split(",")
# 没有配置则默认清除所有项目
Expand Down
55 changes: 32 additions & 23 deletions gcloud/contrib/cleaner/pipeline/bamboo_engine_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,31 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from typing import List, Dict
import logging
from typing import Dict, List

from django.conf import settings
from django.db.models import QuerySet

from pipeline.contrib.periodic_task.models import PeriodicTaskHistory
from pipeline.eri.models import (
ContextValue,
CallbackData,
ContextOutputs,
Process,
Node,
ContextValue,
Data,
State,
ExecutionHistory,
ExecutionData,
CallbackData,
ExecutionHistory,
Node,
Process,
Schedule,
State,
)
from pipeline.models import PipelineInstance, Snapshot, TreeInfo

from pipeline.models import PipelineInstance, TreeInfo, Snapshot
from gcloud.utils.data_handler import chunk_data
from pipeline_web.core.models import NodeInInstance

logger = logging.getLogger("root")


def get_clean_pipeline_instance_data(instance_ids: List[str]) -> Dict[str, QuerySet]:
"""
Expand All @@ -48,19 +52,24 @@ def get_clean_pipeline_instance_data(instance_ids: List[str]) -> Dict[str, Query
execution_snapshot = Snapshot.objects.filter(id__in=list(execution_snapshot_ids))

pipeline_ids = instance_ids
logger.info(
f"[get_clean_pipeline_instance_data] fetching pipeline_ids number: {pipeline_ids}, e.x.:{pipeline_ids[:3]}..."
)
context_value = ContextValue.objects.filter(pipeline_id__in=pipeline_ids)
context_outputs = ContextOutputs.objects.filter(pipeline_id__in=pipeline_ids)
process = Process.objects.filter(root_pipeline_id__in=pipeline_ids)
periodic_task_history = PeriodicTaskHistory.objects.filter(pipeline_instance_id__in=pipeline_ids)

node_ids = list(nodes_in_pipeline.values_list("node_id", flat=True)) + instance_ids
nodes = Node.objects.filter(node_id__in=node_ids)
data = Data.objects.filter(node_id__in=node_ids)
states = State.objects.filter(node_id__in=node_ids)
execution_history = ExecutionHistory.objects.filter(node_id__in=node_ids)
execution_data = ExecutionData.objects.filter(node_id__in=node_ids)
callback_data = CallbackData.objects.filter(node_id__in=node_ids)
schedules = Schedule.objects.filter(node_id__in=node_ids)
logger.info(f"[get_clean_pipeline_instance_data] fetching node_ids number: {node_ids}, e.x.:{node_ids[:3]}...")
chunk_size = settings.CLEAN_EXPIRED_V2_TASK_NODE_BATCH_NUM
nodes_list = chunk_data(node_ids, chunk_size, lambda x: Node.objects.filter(node_id__in=x))
data_list = chunk_data(node_ids, chunk_size, lambda x: Data.objects.filter(node_id__in=x))
states_list = chunk_data(node_ids, chunk_size, lambda x: State.objects.filter(node_id__in=x))
execution_history_list = chunk_data(node_ids, chunk_size, lambda x: ExecutionHistory.objects.filter(node_id__in=x))
execution_data_list = chunk_data(node_ids, chunk_size, lambda x: ExecutionData.objects.filter(node_id__in=x))
callback_data_list = chunk_data(node_ids, chunk_size, lambda x: CallbackData.objects.filter(node_id__in=x))
schedules_list = chunk_data(node_ids, chunk_size, lambda x: Schedule.objects.filter(node_id__in=x))

return {
"tree_info": tree_info,
Expand All @@ -69,13 +78,13 @@ def get_clean_pipeline_instance_data(instance_ids: List[str]) -> Dict[str, Query
"context_value": context_value,
"context_outputs": context_outputs,
"process": process,
"node": nodes,
"data": data,
"state": states,
"execution_history": execution_history,
"execution_data": execution_data,
"callback_data": callback_data,
"schedules": schedules,
"periodic_task_history": periodic_task_history,
"pipeline_instances": pipeline_instances,
"node_list": nodes_list,
"data_list": data_list,
"state_list": states_list,
"execution_history_list": execution_history_list,
"execution_data_list": execution_data_list,
"callback_data_list": callback_data_list,
"schedules_list": schedules_list,
}
8 changes: 6 additions & 2 deletions gcloud/contrib/cleaner/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,13 @@ def clean_expired_v2_task_data():
instance_fields = ["tasks", "pipeline_instances"]
with transaction.atomic():
for field, qs in data_to_clean.items():
if field not in instance_fields or settings.CLEAN_EXPIRED_V2_TASK_INSTANCE:
if field.endswith("_list") and isinstance(qs, list):
logger.info(f"[clean_expired_v2_task_data] clean field: {field}, {len(qs)} batch data")
[q.delete() for q in qs]
elif field not in instance_fields or settings.CLEAN_EXPIRED_V2_TASK_INSTANCE:
logger.info(
f"[clean_expired_v2_task_data] clean field: {field}, qs ids: {qs.values_list('id', flat=True)}"
f"[clean_expired_v2_task_data] clean field: {field}, "
f"qs ids: {qs.values_list('id', flat=True)[:10]}..."
)
qs.delete()
elif field == "pipeline_instances":
Expand Down

0 comments on commit 52662b7

Please sign in to comment.