From d6636f4227c496d48f9fc007525de2e26e18973a Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Thu, 19 Dec 2024 20:09:10 +0530 Subject: [PATCH] AIP-72: Logging updated state in execution API server (#45074) --- airflow/api_fastapi/execution_api/routes/task_instances.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow/api_fastapi/execution_api/routes/task_instances.py index 2184c2946b8ef..016f5222c79d8 100644 --- a/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -180,6 +180,8 @@ def ti_update_state( Not all state transitions are valid, and transitioning to some states requires extra information to be passed along. (Check out the datamodels for details, the rendered docs might not reflect this accurately) """ + updated_state: str = "" + # We only use UUID above for validation purposes ti_id_str = str(task_instance_id) @@ -207,6 +209,7 @@ def ti_update_state( if ti_patch_payload.state == State.FAILED: # clear the next_method and next_kwargs query = query.values(next_method=None, next_kwargs=None) + updated_state = State.FAILED elif isinstance(ti_patch_payload, TIDeferredStatePayload): # Calculate timeout if it was passed timeout = None @@ -231,6 +234,7 @@ def ti_update_state( next_kwargs=ti_patch_payload.trigger_kwargs, trigger_timeout=timeout, ) + updated_state = State.DEFERRED elif isinstance(ti_patch_payload, TIRescheduleStatePayload): task_instance = session.get(TI, ti_id_str) actual_start_date = timezone.utcnow() @@ -252,11 +256,12 @@ def ti_update_state( query = TI.duration_expression_update(ti_patch_payload.end_date, query, session.bind) # clear the next_method and next_kwargs so that none of the retries pick them up query = query.values(state=State.UP_FOR_RESCHEDULE, next_method=None, next_kwargs=None) + updated_state = State.UP_FOR_RESCHEDULE # TODO: Replace this with FastAPI's Custom Exception handling: # https://fastapi.tiangolo.com/tutorial/handling-errors/#install-custom-exception-handlers try: result = session.execute(query) - log.info("TI %s state updated: %s row(s) affected", ti_id_str, result.rowcount) + log.info("TI %s state updated to %s: %s row(s) affected", ti_id_str, updated_state, result.rowcount) except SQLAlchemyError as e: log.error("Error updating Task Instance state: %s", e) raise HTTPException(