Skip to content

Commit

Permalink
AIP-72: Logging updated state in execution API server (#45074)
Browse files Browse the repository at this point in the history
  • Loading branch information
amoghrajesh authored Dec 19, 2024
1 parent 8774f28 commit d6636f4
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion airflow/api_fastapi/execution_api/routes/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ def ti_update_state(
Not all state transitions are valid, and transitioning to some states requires extra information to be
passed along. (Check out the datamodels for details, the rendered docs might not reflect this accurately)
"""
updated_state: str = ""

# We only use UUID above for validation purposes
ti_id_str = str(task_instance_id)

Expand Down Expand Up @@ -207,6 +209,7 @@ def ti_update_state(
if ti_patch_payload.state == State.FAILED:
# clear the next_method and next_kwargs
query = query.values(next_method=None, next_kwargs=None)
updated_state = State.FAILED
elif isinstance(ti_patch_payload, TIDeferredStatePayload):
# Calculate timeout if it was passed
timeout = None
Expand All @@ -231,6 +234,7 @@ def ti_update_state(
next_kwargs=ti_patch_payload.trigger_kwargs,
trigger_timeout=timeout,
)
updated_state = State.DEFERRED
elif isinstance(ti_patch_payload, TIRescheduleStatePayload):
task_instance = session.get(TI, ti_id_str)
actual_start_date = timezone.utcnow()
Expand All @@ -252,11 +256,12 @@ def ti_update_state(
query = TI.duration_expression_update(ti_patch_payload.end_date, query, session.bind)
# clear the next_method and next_kwargs so that none of the retries pick them up
query = query.values(state=State.UP_FOR_RESCHEDULE, next_method=None, next_kwargs=None)
updated_state = State.UP_FOR_RESCHEDULE
# TODO: Replace this with FastAPI's Custom Exception handling:
# https://fastapi.tiangolo.com/tutorial/handling-errors/#install-custom-exception-handlers
try:
result = session.execute(query)
log.info("TI %s state updated: %s row(s) affected", ti_id_str, result.rowcount)
log.info("TI %s state updated to %s: %s row(s) affected", ti_id_str, updated_state, result.rowcount)
except SQLAlchemyError as e:
log.error("Error updating Task Instance state: %s", e)
raise HTTPException(
Expand Down

0 comments on commit d6636f4

Please sign in to comment.