diff --git a/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow/api_fastapi/execution_api/routes/task_instances.py index 2184c2946b8ef..016f5222c79d8 100644 --- a/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -180,6 +180,8 @@ def ti_update_state( Not all state transitions are valid, and transitioning to some states requires extra information to be passed along. (Check out the datamodels for details, the rendered docs might not reflect this accurately) """ + updated_state: str = "" + # We only use UUID above for validation purposes ti_id_str = str(task_instance_id) @@ -207,6 +209,7 @@ def ti_update_state( if ti_patch_payload.state == State.FAILED: # clear the next_method and next_kwargs query = query.values(next_method=None, next_kwargs=None) + updated_state = State.FAILED elif isinstance(ti_patch_payload, TIDeferredStatePayload): # Calculate timeout if it was passed timeout = None @@ -231,6 +234,7 @@ def ti_update_state( next_kwargs=ti_patch_payload.trigger_kwargs, trigger_timeout=timeout, ) + updated_state = State.DEFERRED elif isinstance(ti_patch_payload, TIRescheduleStatePayload): task_instance = session.get(TI, ti_id_str) actual_start_date = timezone.utcnow() @@ -252,11 +256,12 @@ def ti_update_state( query = TI.duration_expression_update(ti_patch_payload.end_date, query, session.bind) # clear the next_method and next_kwargs so that none of the retries pick them up query = query.values(state=State.UP_FOR_RESCHEDULE, next_method=None, next_kwargs=None) + updated_state = State.UP_FOR_RESCHEDULE # TODO: Replace this with FastAPI's Custom Exception handling: # https://fastapi.tiangolo.com/tutorial/handling-errors/#install-custom-exception-handlers try: result = session.execute(query) - log.info("TI %s state updated: %s row(s) affected", ti_id_str, result.rowcount) + log.info("TI %s state updated to %s: %s row(s) affected", ti_id_str, updated_state, result.rowcount) except SQLAlchemyError as e: log.error("Error updating Task Instance state: %s", e) raise HTTPException(