diff --git a/gcloud/taskflow3/signals/handlers.py b/gcloud/taskflow3/signals/handlers.py index faf34b87a0..7788dea646 100644 --- a/gcloud/taskflow3/signals/handlers.py +++ b/gcloud/taskflow3/signals/handlers.py @@ -12,14 +12,26 @@ """ import datetime +import json import logging +from bamboo_engine import states as bamboo_engine_states from bk_monitor_report.reporter import MonitorReporter from django.conf import settings from django.dispatch import receiver +from pipeline.core.pipeline import Pipeline +from pipeline.engine.signals import activity_failed, pipeline_end, pipeline_revoke +from pipeline.eri.signals import ( + execute_interrupt, + post_set_state, + pre_service_execute, + pre_service_schedule, + schedule_interrupt, +) +from pipeline.models import PipelineInstance +from pipeline.signals import post_pipeline_finish, post_pipeline_revoke import env -from bamboo_engine import states as bamboo_engine_states from gcloud.shortcuts.message import ATOM_FAILED, TASK_FINISHED from gcloud.taskflow3.celery.tasks import auto_retry_node, send_taskflow_message, task_callback from gcloud.taskflow3.models import ( @@ -30,17 +42,6 @@ TimeoutNodeConfig, ) from gcloud.taskflow3.signals import taskflow_finished, taskflow_revoked -from pipeline.core.pipeline import Pipeline -from pipeline.engine.signals import activity_failed, pipeline_end, pipeline_revoke -from pipeline.eri.signals import ( - execute_interrupt, - post_set_state, - pre_service_execute, - pre_service_schedule, - schedule_interrupt, -) -from pipeline.models import PipelineInstance -from pipeline.signals import post_pipeline_finish, post_pipeline_revoke logger = logging.getLogger("celery") @@ -57,14 +58,14 @@ def _finish_taskflow_and_send_signal(instance_id, sig, task_success=False): sig.send(TaskFlowInstance, task_id=task_id) if task_success: - _check_and_callback(task_id, task_success=task_success) + _check_and_callback(task_id, task_success=task_success, task=qs[0]) try: send_taskflow_message.delay(task_id=task_id, msg_type=TASK_FINISHED) except Exception as e: logger.exception("send_taskflow_message[taskflow_id=%s] task delay error: %s" % (task_id, e)) if sig is taskflow_revoked: - _check_and_callback(task_id, task_success=False) + _check_and_callback(task_id, task_success=False, task=qs[0]) def _send_node_fail_message(node_id, pipeline_id): @@ -73,8 +74,7 @@ def _send_node_fail_message(node_id, pipeline_id): except TaskFlowInstance.DoesNotExist: logger.error("pipeline finished handler get taskflow error, pipeline_instance_id=%s" % pipeline_id) return - - _check_and_callback(taskflow.id, task_success=False) + _check_and_callback(taskflow.id, task_success=False, task=taskflow) if taskflow.is_child_taskflow is False: try: @@ -89,8 +89,13 @@ def _check_and_callback(taskflow_id, *args, **kwargs): if not TaskCallBackRecord.objects.filter(task_id=taskflow_id).exists(): return try: + if kwargs.get("task"): + task = kwargs.pop("task") + kwargs["task_outputs"] = json.dumps(task.get_task_detail()["outputs"]) task_callback.apply_async( - kwargs=dict(task_id=taskflow_id, **kwargs), queue="task_callback", routing_key="task_callback", + kwargs=dict(task_id=taskflow_id, **kwargs), + queue="task_callback", + routing_key="task_callback", ) except Exception as e: logger.exception(f"[_check_and_callback] task_callback delay error: {e}")