Skip to content

Commit

Permalink
fix: 解决代码冲突 --story=121721246
Browse files Browse the repository at this point in the history
  • Loading branch information
guohelu committed Jan 22, 2025
2 parents 9449e53 + 52662b7 commit 24f9625
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 27 deletions.
1 change: 1 addition & 0 deletions config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,7 @@ def monitor_report_config():
CLEAN_EXPIRED_V2_TASK_CRON = env.CLEAN_EXPIRED_V2_TASK_CRON
V2_TASK_VALIDITY_DAY = env.V2_TASK_VALIDITY_DAY
CLEAN_EXPIRED_V2_TASK_BATCH_NUM = env.CLEAN_EXPIRED_V2_TASK_BATCH_NUM
CLEAN_EXPIRED_V2_TASK_NODE_BATCH_NUM = env.CLEAN_EXPIRED_V2_TASK_NODE_BATCH_NUM
CLEAN_EXPIRED_V2_TASK_INSTANCE = env.CLEAN_EXPIRED_V2_TASK_INSTANCE
CLEAN_EXPIRED_V2_TASK_CREATE_METHODS = env.CLEAN_EXPIRED_V2_TASK_CREATE_METHODS
CLEAN_EXPIRED_V2_TASK_PROJECTS = env.CLEAN_EXPIRED_V2_TASK_PROJECTS
Expand Down
1 change: 1 addition & 0 deletions env.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
CLEAN_EXPIRED_V2_TASK_CRON = tuple(os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_CRON", "30 0 * * *").split())
V2_TASK_VALIDITY_DAY = int(os.getenv("BKAPP_V2_TASK_VALIDITY_DAY", 730))
CLEAN_EXPIRED_V2_TASK_BATCH_NUM = int(os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_BATCH_NUM", 100))
CLEAN_EXPIRED_V2_TASK_NODE_BATCH_NUM = int(os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_NODE_BATCH_NUM", 5000))
CLEAN_EXPIRED_V2_TASK_INSTANCE = bool(os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_INSTANCE", False))
CLEAN_EXPIRED_V2_TASK_CREATE_METHODS = os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_CREATE_METHODS", "periodic").split(",")
# 没有配置则默认清除所有项目
Expand Down
59 changes: 34 additions & 25 deletions gcloud/contrib/cleaner/pipeline/bamboo_engine_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,34 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import logging
import json
from typing import List, Dict
from typing import Dict, List

from django.conf import settings
from django.db.models import QuerySet
from gcloud.contrib.cleaner.models import ExpiredTaskArchive
from gcloud.taskflow3.models import TaskFlowInstance

from pipeline.contrib.periodic_task.models import PeriodicTaskHistory
from pipeline.eri.models import (
ContextValue,
CallbackData,
ContextOutputs,
Process,
Node,
ContextValue,
Data,
State,
ExecutionHistory,
ExecutionData,
CallbackData,
ExecutionHistory,
Node,
Process,
Schedule,
State,
)
from pipeline.models import PipelineInstance, Snapshot, TreeInfo

from pipeline.models import PipelineInstance, TreeInfo, Snapshot
from gcloud.utils.data_handler import chunk_data
from gcloud.contrib.cleaner.models import ExpiredTaskArchive
from gcloud.taskflow3.models import TaskFlowInstance
from pipeline_web.core.models import NodeInInstance

logger = logging.getLogger("root")


def get_clean_pipeline_instance_data(instance_ids: List[str]) -> Dict[str, QuerySet]:
"""
Expand All @@ -51,19 +55,24 @@ def get_clean_pipeline_instance_data(instance_ids: List[str]) -> Dict[str, Query
execution_snapshot = Snapshot.objects.filter(id__in=list(execution_snapshot_ids))

pipeline_ids = instance_ids
logger.info(
f"[get_clean_pipeline_instance_data] fetching pipeline_ids number: {pipeline_ids}, e.x.:{pipeline_ids[:3]}..."
)
context_value = ContextValue.objects.filter(pipeline_id__in=pipeline_ids)
context_outputs = ContextOutputs.objects.filter(pipeline_id__in=pipeline_ids)
process = Process.objects.filter(root_pipeline_id__in=pipeline_ids)
periodic_task_history = PeriodicTaskHistory.objects.filter(pipeline_instance_id__in=pipeline_ids)

node_ids = list(nodes_in_pipeline.values_list("node_id", flat=True)) + instance_ids
nodes = Node.objects.filter(node_id__in=node_ids)
data = Data.objects.filter(node_id__in=node_ids)
states = State.objects.filter(node_id__in=node_ids)
execution_history = ExecutionHistory.objects.filter(node_id__in=node_ids)
execution_data = ExecutionData.objects.filter(node_id__in=node_ids)
callback_data = CallbackData.objects.filter(node_id__in=node_ids)
schedules = Schedule.objects.filter(node_id__in=node_ids)
logger.info(f"[get_clean_pipeline_instance_data] fetching node_ids number: {node_ids}, e.x.:{node_ids[:3]}...")
chunk_size = settings.CLEAN_EXPIRED_V2_TASK_NODE_BATCH_NUM
nodes_list = chunk_data(node_ids, chunk_size, lambda x: Node.objects.filter(node_id__in=x))
data_list = chunk_data(node_ids, chunk_size, lambda x: Data.objects.filter(node_id__in=x))
states_list = chunk_data(node_ids, chunk_size, lambda x: State.objects.filter(node_id__in=x))
execution_history_list = chunk_data(node_ids, chunk_size, lambda x: ExecutionHistory.objects.filter(node_id__in=x))
execution_data_list = chunk_data(node_ids, chunk_size, lambda x: ExecutionData.objects.filter(node_id__in=x))
callback_data_list = chunk_data(node_ids, chunk_size, lambda x: CallbackData.objects.filter(node_id__in=x))
schedules_list = chunk_data(node_ids, chunk_size, lambda x: Schedule.objects.filter(node_id__in=x))

return {
"tree_info": tree_info,
Expand All @@ -72,15 +81,15 @@ def get_clean_pipeline_instance_data(instance_ids: List[str]) -> Dict[str, Query
"context_value": context_value,
"context_outputs": context_outputs,
"process": process,
"node": nodes,
"data": data,
"state": states,
"execution_history": execution_history,
"execution_data": execution_data,
"callback_data": callback_data,
"schedules": schedules,
"periodic_task_history": periodic_task_history,
"pipeline_instances": pipeline_instances,
"node_list": nodes_list,
"data_list": data_list,
"state_list": states_list,
"execution_history_list": execution_history_list,
"execution_data_list": execution_data_list,
"callback_data_list": callback_data_list,
"schedules_list": schedules_list,
}


Expand Down
8 changes: 6 additions & 2 deletions gcloud/contrib/cleaner/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,13 @@ def clean_expired_v2_task_data():
instance_fields = ["tasks", "pipeline_instances"]
with transaction.atomic():
for field, qs in data_to_clean.items():
if field not in instance_fields or settings.CLEAN_EXPIRED_V2_TASK_INSTANCE:
if field.endswith("_list") and isinstance(qs, list):
logger.info(f"[clean_expired_v2_task_data] clean field: {field}, {len(qs)} batch data")
[q.delete() for q in qs]
elif field not in instance_fields or settings.CLEAN_EXPIRED_V2_TASK_INSTANCE:
logger.info(
f"[clean_expired_v2_task_data] clean field: {field}, qs ids: {qs.values_list('id', flat=True)}"
f"[clean_expired_v2_task_data] clean field: {field}, "
f"qs ids: {qs.values_list('id', flat=True)[:10]}..."
)
qs.delete()
elif field == "pipeline_instances":
Expand Down

0 comments on commit 24f9625

Please sign in to comment.