From a7a1efd507c393e8baec0dfd6b4642d1b663af97 Mon Sep 17 00:00:00 2001 From: Sneha Prabhu Date: Thu, 19 Dec 2024 03:58:37 +0530 Subject: [PATCH 01/13] Add dry run for backfill --- .../core_api/datamodels/backfills.py | 13 ++++ .../core_api/openapi/v1-generated.yaml | 32 +++++++- .../core_api/routes/public/backfills.py | 75 +++++++++++++++---- airflow/ui/openapi-gen/queries/queries.ts | 2 +- .../ui/openapi-gen/requests/schemas.gen.ts | 36 +++++++++ .../ui/openapi-gen/requests/services.gen.ts | 2 +- airflow/ui/openapi-gen/requests/types.gen.ts | 19 ++++- .../core_api/routes/public/test_backfills.py | 1 + 8 files changed, 161 insertions(+), 19 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/backfills.py b/airflow/api_fastapi/core_api/datamodels/backfills.py index be04063907a9d..c5f6bd86ea242 100644 --- a/airflow/api_fastapi/core_api/datamodels/backfills.py +++ b/airflow/api_fastapi/core_api/datamodels/backfills.py @@ -33,6 +33,7 @@ class BackfillPostBody(BaseModel): dag_run_conf: dict = {} reprocess_behavior: ReprocessBehavior = ReprocessBehavior.NONE max_active_runs: int = 10 + dry_run: bool = False class BackfillResponse(BaseModel): @@ -56,3 +57,15 @@ class BackfillCollectionResponse(BaseModel): backfills: list[BackfillResponse] total_entries: int + + +class BackfillRunInfo(BaseModel): + """Data model for run information during a backfill operation.""" + + logical_date: datetime + + +class BackfillDryRunResponse(BaseModel): + """Serializer for responses in dry-run mode for backfill operations.""" + + run_info_list: list[BackfillRunInfo] diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 6e8beb4fc6af1..0f6b9dca06e82 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1143,7 +1143,10 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/BackfillResponse' + anyOf: + - $ref: '#/components/schemas/BackfillResponse' + - $ref: '#/components/schemas/BackfillDryRunResponse' + title: Response Create Backfill '401': content: application/json: @@ -6210,6 +6213,18 @@ components: - total_entries title: BackfillCollectionResponse description: Backfill Collection serializer for responses. + BackfillDryRunResponse: + properties: + run_info_list: + items: + $ref: '#/components/schemas/BackfillRunInfo' + type: array + title: Run Info List + type: object + required: + - run_info_list + title: BackfillDryRunResponse + description: Serializer for responses in dry-run mode for backfill operations. BackfillPostBody: properties: dag_id: @@ -6238,6 +6253,10 @@ components: type: integer title: Max Active Runs default: 10 + dry_run: + type: boolean + title: Dry Run + default: false type: object required: - dag_id @@ -6301,6 +6320,17 @@ components: - updated_at title: BackfillResponse description: Base serializer for Backfill. + BackfillRunInfo: + properties: + logical_date: + type: string + format: date-time + title: Logical Date + type: object + required: + - logical_date + title: BackfillRunInfo + description: Data model for run information during a backfill operation. BaseInfoResponse: properties: status: diff --git a/airflow/api_fastapi/core_api/routes/public/backfills.py b/airflow/api_fastapi/core_api/routes/public/backfills.py index 61d25597cdd15..4d8a0b1d33daf 100644 --- a/airflow/api_fastapi/core_api/routes/public/backfills.py +++ b/airflow/api_fastapi/core_api/routes/public/backfills.py @@ -19,7 +19,7 @@ from typing import Annotated from fastapi import Depends, HTTPException, status -from sqlalchemy import select, update +from sqlalchemy import desc, select, update from airflow.api_fastapi.common.db.common import ( AsyncSessionDep, @@ -30,8 +30,10 @@ from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.datamodels.backfills import ( BackfillCollectionResponse, + BackfillDryRunResponse, BackfillPostBody, BackfillResponse, + BackfillRunInfo, ) from airflow.api_fastapi.core_api.openapi.exceptions import ( create_openapi_http_exception_doc, @@ -41,9 +43,14 @@ AlreadyRunningBackfill, Backfill, BackfillDagRun, + BackfillDagRunExceptionReason, + ReprocessBehavior, _create_backfill, + _get_info_list, ) +from airflow.models.serialized_dag import SerializedDagModel from airflow.utils import timezone +from airflow.utils.sqlalchemy import nulls_first from airflow.utils.state import DagRunState backfills_router = AirflowRouter(tags=["Backfill"], prefix="/backfills") @@ -187,22 +194,62 @@ def cancel_backfill(backfill_id, session: SessionDep) -> BackfillResponse: ) def create_backfill( backfill_request: BackfillPostBody, -) -> BackfillResponse: + session: SessionDep, +) -> BackfillResponse | BackfillDryRunResponse: from_date = timezone.coerce_datetime(backfill_request.from_date) to_date = timezone.coerce_datetime(backfill_request.to_date) - try: - backfill_obj = _create_backfill( - dag_id=backfill_request.dag_id, + if not backfill_request.dry_run: + try: + backfill_obj = _create_backfill( + dag_id=backfill_request.dag_id, + from_date=from_date, + to_date=to_date, + max_active_runs=backfill_request.max_active_runs, + reverse=backfill_request.run_backwards, + dag_run_conf=backfill_request.dag_run_conf, + reprocess_behavior=backfill_request.reprocess_behavior, + ) + return BackfillResponse.model_validate(backfill_obj) + except AlreadyRunningBackfill: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=f"There is already a running backfill for dag {backfill_request.dag_id}", + ) + else: + serdag = session.scalar(SerializedDagModel.latest_item_select_object(backfill_request.dag_id)) + if not serdag: + raise HTTPException(status_code=404, detail=f"Could not find dag {backfill_request.dag_id}") + + info_list = _get_info_list( + dag=serdag.dag, from_date=from_date, to_date=to_date, - max_active_runs=backfill_request.max_active_runs, reverse=backfill_request.run_backwards, - dag_run_conf=backfill_request.dag_run_conf, - reprocess_behavior=backfill_request.reprocess_behavior, - ) - return BackfillResponse.model_validate(backfill_obj) - except AlreadyRunningBackfill: - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, - detail=f"There is already a running backfill for dag {backfill_request.dag_id}", ) + backfill_response_item = [] + print(info_list) + for info in info_list: + print(info.logical_date) + dr = session.scalar( + select(DagRun) + .where(DagRun.logical_date == info.logical_date) + .order_by(nulls_first(desc(DagRun.start_date), session)) + .limit(1) + ) + + if dr: + non_create_reason = None + if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED): + non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT + elif backfill_request.reprocess_behavior is ReprocessBehavior.NONE: + non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS + elif backfill_request.reprocess_behavior is ReprocessBehavior.FAILED: + if dr.state != DagRunState.FAILED: + non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS + if not non_create_reason: + backfill_response_item.append(BackfillRunInfo(logical_date=info.logical_date)) + + else: + backfill_response_item.append(BackfillRunInfo(logical_date=info.logical_date)) + + return BackfillDryRunResponse(run_info_list=backfill_response_item) diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 216c165ae330f..a41499117674a 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -2779,7 +2779,7 @@ export const useAssetServiceCreateAssetEvent = < * Create Backfill * @param data The data for the request. * @param data.requestBody - * @returns BackfillResponse Successful Response + * @returns unknown Successful Response * @throws ApiError */ export const useBackfillServiceCreateBackfill = < diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index cd9c2e5897dbf..60b735e335142 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -350,6 +350,23 @@ export const $BackfillCollectionResponse = { description: "Backfill Collection serializer for responses.", } as const; +export const $BackfillDryRunResponse = { + properties: { + run_info_list: { + items: { + $ref: "#/components/schemas/BackfillRunInfo", + }, + type: "array", + title: "Run Info List", + }, + }, + type: "object", + required: ["run_info_list"], + title: "BackfillDryRunResponse", + description: + "Serializer for responses in dry-run mode for backfill operations.", +} as const; + export const $BackfillPostBody = { properties: { dag_id: { @@ -385,6 +402,11 @@ export const $BackfillPostBody = { title: "Max Active Runs", default: 10, }, + dry_run: { + type: "boolean", + title: "Dry Run", + default: false, + }, }, type: "object", required: ["dag_id", "from_date", "to_date"], @@ -468,6 +490,20 @@ export const $BackfillResponse = { description: "Base serializer for Backfill.", } as const; +export const $BackfillRunInfo = { + properties: { + logical_date: { + type: "string", + format: "date-time", + title: "Logical Date", + }, + }, + type: "object", + required: ["logical_date"], + title: "BackfillRunInfo", + description: "Data model for run information during a backfill operation.", +} as const; + export const $BaseInfoResponse = { properties: { status: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index c048ddb023053..b967f19258e32 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -837,7 +837,7 @@ export class BackfillService { * Create Backfill * @param data The data for the request. * @param data.requestBody - * @returns BackfillResponse Successful Response + * @returns unknown Successful Response * @throws ApiError */ public static createBackfill( diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 620ac86b815a7..5b652e9a46b6a 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -97,6 +97,13 @@ export type BackfillCollectionResponse = { total_entries: number; }; +/** + * Serializer for responses in dry-run mode for backfill operations. + */ +export type BackfillDryRunResponse = { + run_info_list: Array; +}; + /** * Object used for create backfill request. */ @@ -110,6 +117,7 @@ export type BackfillPostBody = { }; reprocess_behavior?: ReprocessBehavior; max_active_runs?: number; + dry_run?: boolean; }; /** @@ -131,6 +139,13 @@ export type BackfillResponse = { updated_at: string; }; +/** + * Data model for run information during a backfill operation. + */ +export type BackfillRunInfo = { + logical_date: string; +}; + /** * Base info serializer for responses. */ @@ -1497,7 +1512,7 @@ export type CreateBackfillData = { requestBody: BackfillPostBody; }; -export type CreateBackfillResponse = BackfillResponse; +export type CreateBackfillResponse = BackfillResponse | BackfillDryRunResponse; export type GetBackfillData = { backfillId: string; @@ -2629,7 +2644,7 @@ export type $OpenApiTs = { /** * Successful Response */ - 200: BackfillResponse; + 200: BackfillResponse | BackfillDryRunResponse; /** * Unauthorized */ diff --git a/tests/api_fastapi/core_api/routes/public/test_backfills.py b/tests/api_fastapi/core_api/routes/public/test_backfills.py index 1c64b10848f4b..a711c13ff7010 100644 --- a/tests/api_fastapi/core_api/routes/public/test_backfills.py +++ b/tests/api_fastapi/core_api/routes/public/test_backfills.py @@ -192,6 +192,7 @@ def test_create_backfill(self, repro_act, repro_exp, session, dag_maker, test_cl "max_active_runs": max_active_runs, "run_backwards": False, "dag_run_conf": {"param1": "val1", "param2": True}, + "dry_run": False, } if repro_act is not None: data["reprocess_behavior"] = repro_act From d28893413149085cc8a9278378619c39fda5efc0 Mon Sep 17 00:00:00 2001 From: Sneha Prabhu Date: Thu, 19 Dec 2024 03:58:37 +0530 Subject: [PATCH 02/13] Add dry run for backfill --- .../core_api/datamodels/backfills.py | 13 ++++ .../core_api/openapi/v1-generated.yaml | 32 +++++++- .../core_api/routes/public/backfills.py | 75 +++++++++++++++---- airflow/ui/openapi-gen/queries/queries.ts | 2 +- .../ui/openapi-gen/requests/schemas.gen.ts | 36 +++++++++ .../ui/openapi-gen/requests/services.gen.ts | 2 +- airflow/ui/openapi-gen/requests/types.gen.ts | 19 ++++- .../core_api/routes/public/test_backfills.py | 1 + 8 files changed, 161 insertions(+), 19 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/backfills.py b/airflow/api_fastapi/core_api/datamodels/backfills.py index be04063907a9d..c5f6bd86ea242 100644 --- a/airflow/api_fastapi/core_api/datamodels/backfills.py +++ b/airflow/api_fastapi/core_api/datamodels/backfills.py @@ -33,6 +33,7 @@ class BackfillPostBody(BaseModel): dag_run_conf: dict = {} reprocess_behavior: ReprocessBehavior = ReprocessBehavior.NONE max_active_runs: int = 10 + dry_run: bool = False class BackfillResponse(BaseModel): @@ -56,3 +57,15 @@ class BackfillCollectionResponse(BaseModel): backfills: list[BackfillResponse] total_entries: int + + +class BackfillRunInfo(BaseModel): + """Data model for run information during a backfill operation.""" + + logical_date: datetime + + +class BackfillDryRunResponse(BaseModel): + """Serializer for responses in dry-run mode for backfill operations.""" + + run_info_list: list[BackfillRunInfo] diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 980b045e34d06..bdc6ae842bc60 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1143,7 +1143,10 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/BackfillResponse' + anyOf: + - $ref: '#/components/schemas/BackfillResponse' + - $ref: '#/components/schemas/BackfillDryRunResponse' + title: Response Create Backfill '401': content: application/json: @@ -6210,6 +6213,18 @@ components: - total_entries title: BackfillCollectionResponse description: Backfill Collection serializer for responses. + BackfillDryRunResponse: + properties: + run_info_list: + items: + $ref: '#/components/schemas/BackfillRunInfo' + type: array + title: Run Info List + type: object + required: + - run_info_list + title: BackfillDryRunResponse + description: Serializer for responses in dry-run mode for backfill operations. BackfillPostBody: properties: dag_id: @@ -6238,6 +6253,10 @@ components: type: integer title: Max Active Runs default: 10 + dry_run: + type: boolean + title: Dry Run + default: false type: object required: - dag_id @@ -6301,6 +6320,17 @@ components: - updated_at title: BackfillResponse description: Base serializer for Backfill. + BackfillRunInfo: + properties: + logical_date: + type: string + format: date-time + title: Logical Date + type: object + required: + - logical_date + title: BackfillRunInfo + description: Data model for run information during a backfill operation. BaseInfoResponse: properties: status: diff --git a/airflow/api_fastapi/core_api/routes/public/backfills.py b/airflow/api_fastapi/core_api/routes/public/backfills.py index 61d25597cdd15..4d8a0b1d33daf 100644 --- a/airflow/api_fastapi/core_api/routes/public/backfills.py +++ b/airflow/api_fastapi/core_api/routes/public/backfills.py @@ -19,7 +19,7 @@ from typing import Annotated from fastapi import Depends, HTTPException, status -from sqlalchemy import select, update +from sqlalchemy import desc, select, update from airflow.api_fastapi.common.db.common import ( AsyncSessionDep, @@ -30,8 +30,10 @@ from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.datamodels.backfills import ( BackfillCollectionResponse, + BackfillDryRunResponse, BackfillPostBody, BackfillResponse, + BackfillRunInfo, ) from airflow.api_fastapi.core_api.openapi.exceptions import ( create_openapi_http_exception_doc, @@ -41,9 +43,14 @@ AlreadyRunningBackfill, Backfill, BackfillDagRun, + BackfillDagRunExceptionReason, + ReprocessBehavior, _create_backfill, + _get_info_list, ) +from airflow.models.serialized_dag import SerializedDagModel from airflow.utils import timezone +from airflow.utils.sqlalchemy import nulls_first from airflow.utils.state import DagRunState backfills_router = AirflowRouter(tags=["Backfill"], prefix="/backfills") @@ -187,22 +194,62 @@ def cancel_backfill(backfill_id, session: SessionDep) -> BackfillResponse: ) def create_backfill( backfill_request: BackfillPostBody, -) -> BackfillResponse: + session: SessionDep, +) -> BackfillResponse | BackfillDryRunResponse: from_date = timezone.coerce_datetime(backfill_request.from_date) to_date = timezone.coerce_datetime(backfill_request.to_date) - try: - backfill_obj = _create_backfill( - dag_id=backfill_request.dag_id, + if not backfill_request.dry_run: + try: + backfill_obj = _create_backfill( + dag_id=backfill_request.dag_id, + from_date=from_date, + to_date=to_date, + max_active_runs=backfill_request.max_active_runs, + reverse=backfill_request.run_backwards, + dag_run_conf=backfill_request.dag_run_conf, + reprocess_behavior=backfill_request.reprocess_behavior, + ) + return BackfillResponse.model_validate(backfill_obj) + except AlreadyRunningBackfill: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=f"There is already a running backfill for dag {backfill_request.dag_id}", + ) + else: + serdag = session.scalar(SerializedDagModel.latest_item_select_object(backfill_request.dag_id)) + if not serdag: + raise HTTPException(status_code=404, detail=f"Could not find dag {backfill_request.dag_id}") + + info_list = _get_info_list( + dag=serdag.dag, from_date=from_date, to_date=to_date, - max_active_runs=backfill_request.max_active_runs, reverse=backfill_request.run_backwards, - dag_run_conf=backfill_request.dag_run_conf, - reprocess_behavior=backfill_request.reprocess_behavior, - ) - return BackfillResponse.model_validate(backfill_obj) - except AlreadyRunningBackfill: - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, - detail=f"There is already a running backfill for dag {backfill_request.dag_id}", ) + backfill_response_item = [] + print(info_list) + for info in info_list: + print(info.logical_date) + dr = session.scalar( + select(DagRun) + .where(DagRun.logical_date == info.logical_date) + .order_by(nulls_first(desc(DagRun.start_date), session)) + .limit(1) + ) + + if dr: + non_create_reason = None + if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED): + non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT + elif backfill_request.reprocess_behavior is ReprocessBehavior.NONE: + non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS + elif backfill_request.reprocess_behavior is ReprocessBehavior.FAILED: + if dr.state != DagRunState.FAILED: + non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS + if not non_create_reason: + backfill_response_item.append(BackfillRunInfo(logical_date=info.logical_date)) + + else: + backfill_response_item.append(BackfillRunInfo(logical_date=info.logical_date)) + + return BackfillDryRunResponse(run_info_list=backfill_response_item) diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 216c165ae330f..a41499117674a 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -2779,7 +2779,7 @@ export const useAssetServiceCreateAssetEvent = < * Create Backfill * @param data The data for the request. * @param data.requestBody - * @returns BackfillResponse Successful Response + * @returns unknown Successful Response * @throws ApiError */ export const useBackfillServiceCreateBackfill = < diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index defc76ec7345f..f842c5c7c8352 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -350,6 +350,23 @@ export const $BackfillCollectionResponse = { description: "Backfill Collection serializer for responses.", } as const; +export const $BackfillDryRunResponse = { + properties: { + run_info_list: { + items: { + $ref: "#/components/schemas/BackfillRunInfo", + }, + type: "array", + title: "Run Info List", + }, + }, + type: "object", + required: ["run_info_list"], + title: "BackfillDryRunResponse", + description: + "Serializer for responses in dry-run mode for backfill operations.", +} as const; + export const $BackfillPostBody = { properties: { dag_id: { @@ -385,6 +402,11 @@ export const $BackfillPostBody = { title: "Max Active Runs", default: 10, }, + dry_run: { + type: "boolean", + title: "Dry Run", + default: false, + }, }, type: "object", required: ["dag_id", "from_date", "to_date"], @@ -468,6 +490,20 @@ export const $BackfillResponse = { description: "Base serializer for Backfill.", } as const; +export const $BackfillRunInfo = { + properties: { + logical_date: { + type: "string", + format: "date-time", + title: "Logical Date", + }, + }, + type: "object", + required: ["logical_date"], + title: "BackfillRunInfo", + description: "Data model for run information during a backfill operation.", +} as const; + export const $BaseInfoResponse = { properties: { status: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index c048ddb023053..b967f19258e32 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -837,7 +837,7 @@ export class BackfillService { * Create Backfill * @param data The data for the request. * @param data.requestBody - * @returns BackfillResponse Successful Response + * @returns unknown Successful Response * @throws ApiError */ public static createBackfill( diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 9aeda695c3554..b1f17c39c86e7 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -97,6 +97,13 @@ export type BackfillCollectionResponse = { total_entries: number; }; +/** + * Serializer for responses in dry-run mode for backfill operations. + */ +export type BackfillDryRunResponse = { + run_info_list: Array; +}; + /** * Object used for create backfill request. */ @@ -110,6 +117,7 @@ export type BackfillPostBody = { }; reprocess_behavior?: ReprocessBehavior; max_active_runs?: number; + dry_run?: boolean; }; /** @@ -131,6 +139,13 @@ export type BackfillResponse = { updated_at: string; }; +/** + * Data model for run information during a backfill operation. + */ +export type BackfillRunInfo = { + logical_date: string; +}; + /** * Base info serializer for responses. */ @@ -1498,7 +1513,7 @@ export type CreateBackfillData = { requestBody: BackfillPostBody; }; -export type CreateBackfillResponse = BackfillResponse; +export type CreateBackfillResponse = BackfillResponse | BackfillDryRunResponse; export type GetBackfillData = { backfillId: string; @@ -2630,7 +2645,7 @@ export type $OpenApiTs = { /** * Successful Response */ - 200: BackfillResponse; + 200: BackfillResponse | BackfillDryRunResponse; /** * Unauthorized */ diff --git a/tests/api_fastapi/core_api/routes/public/test_backfills.py b/tests/api_fastapi/core_api/routes/public/test_backfills.py index 1c64b10848f4b..a711c13ff7010 100644 --- a/tests/api_fastapi/core_api/routes/public/test_backfills.py +++ b/tests/api_fastapi/core_api/routes/public/test_backfills.py @@ -192,6 +192,7 @@ def test_create_backfill(self, repro_act, repro_exp, session, dag_maker, test_cl "max_active_runs": max_active_runs, "run_backwards": False, "dag_run_conf": {"param1": "val1", "param2": True}, + "dry_run": False, } if repro_act is not None: data["reprocess_behavior"] = repro_act From 5bd604e69dd741b1d1afdafdf5339d27b2e57828 Mon Sep 17 00:00:00 2001 From: Sneha Prabhu Date: Thu, 19 Dec 2024 18:25:58 +0530 Subject: [PATCH 03/13] Add tests for backfill dry runs --- .../core_api/routes/public/backfills.py | 3 + .../core_api/routes/public/test_backfills.py | 87 +++++++++++++++++++ 2 files changed, 90 insertions(+) diff --git a/airflow/api_fastapi/core_api/routes/public/backfills.py b/airflow/api_fastapi/core_api/routes/public/backfills.py index 4d8a0b1d33daf..3bba1f115b760 100644 --- a/airflow/api_fastapi/core_api/routes/public/backfills.py +++ b/airflow/api_fastapi/core_api/routes/public/backfills.py @@ -243,6 +243,9 @@ def create_backfill( non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT elif backfill_request.reprocess_behavior is ReprocessBehavior.NONE: non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS + elif backfill_request.reprocess_behavior is ReprocessBehavior.COMPLETED: + if dr.state == DagRunState.FAILED: + non_create_reason = BackfillDagRunExceptionReason.UNKNOWN elif backfill_request.reprocess_behavior is ReprocessBehavior.FAILED: if dr.state != DagRunState.FAILED: non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS diff --git a/tests/api_fastapi/core_api/routes/public/test_backfills.py b/tests/api_fastapi/core_api/routes/public/test_backfills.py index a711c13ff7010..f653489efedc3 100644 --- a/tests/api_fastapi/core_api/routes/public/test_backfills.py +++ b/tests/api_fastapi/core_api/routes/public/test_backfills.py @@ -216,6 +216,93 @@ def test_create_backfill(self, repro_act, repro_exp, session, dag_maker, test_cl } +class TestCreateBackfillDryRun(TestBackfillEndpoint): + @pytest.mark.parametrize( + "reprocess_behavior, expected_dates", + [ + ( + "none", + [ + {"logical_date": "2024-01-01T00:00:00Z"}, + {"logical_date": "2024-01-04T00:00:00Z"}, + {"logical_date": "2024-01-05T00:00:00Z"}, + ], + ), + ( + "failed", + [ + {"logical_date": "2024-01-01T00:00:00Z"}, + {"logical_date": "2024-01-03T00:00:00Z"}, # Reprocess failed + {"logical_date": "2024-01-04T00:00:00Z"}, + {"logical_date": "2024-01-05T00:00:00Z"}, + ], + ), + ( + "completed", + [ + {"logical_date": "2024-01-01T00:00:00Z"}, + {"logical_date": "2024-01-02T00:00:00Z"}, # Reprocess completed + {"logical_date": "2024-01-04T00:00:00Z"}, + {"logical_date": "2024-01-05T00:00:00Z"}, + ], + ), + ], + ) + def test_create_backfill_dry_run( + self, session, dag_maker, test_client, reprocess_behavior, expected_dates + ): + with dag_maker( + session=session, + dag_id="TEST_DAG_2", + schedule="0 0 * * *", + start_date=pendulum.parse("2024-01-01"), + ) as dag: + EmptyOperator(task_id="mytask") + + session.commit() + + existing_dagruns = [ + {"logical_date": pendulum.parse("2024-01-02"), "state": DagRunState.SUCCESS}, # Completed dag run + {"logical_date": pendulum.parse("2024-01-03"), "state": DagRunState.FAILED}, # Failed dag run + ] + for dagrun in existing_dagruns: + session.add( + DagRun( + dag_id=dag.dag_id, + run_id=f"manual__{dagrun['logical_date'].isoformat()}", + logical_date=dagrun["logical_date"], + state=dagrun["state"], + run_type="scheduled", + ) + ) + session.commit() + + from_date = pendulum.parse("2024-01-01") + from_date_iso = to_iso(from_date) + to_date = pendulum.parse("2024-01-05") + to_date_iso = to_iso(to_date) + + data = { + "dag_id": dag.dag_id, + "from_date": from_date_iso, + "to_date": to_date_iso, + "max_active_runs": 5, + "run_backwards": False, + "dag_run_conf": {"param1": "val1", "param2": True}, + "dry_run": True, + "reprocess_behavior": reprocess_behavior, + } + + response = test_client.post( + url="/public/backfills", + json=data, + ) + + assert response.status_code == 200 + response_json = response.json() + assert response_json["run_info_list"] == expected_dates + + class TestCancelBackfill(TestBackfillEndpoint): def test_cancel_backfill(self, session, test_client): (dag,) = self._create_dag_models() From 23b7b9b7e48f3f939bff31adaf90778b1c54e1d4 Mon Sep 17 00:00:00 2001 From: Sneha Prabhu Date: Thu, 19 Dec 2024 19:14:28 +0530 Subject: [PATCH 04/13] Fix tests --- airflow/api_fastapi/core_api/routes/public/backfills.py | 3 --- tests/api_fastapi/core_api/routes/public/test_backfills.py | 3 ++- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/backfills.py b/airflow/api_fastapi/core_api/routes/public/backfills.py index 3bba1f115b760..4d8a0b1d33daf 100644 --- a/airflow/api_fastapi/core_api/routes/public/backfills.py +++ b/airflow/api_fastapi/core_api/routes/public/backfills.py @@ -243,9 +243,6 @@ def create_backfill( non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT elif backfill_request.reprocess_behavior is ReprocessBehavior.NONE: non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS - elif backfill_request.reprocess_behavior is ReprocessBehavior.COMPLETED: - if dr.state == DagRunState.FAILED: - non_create_reason = BackfillDagRunExceptionReason.UNKNOWN elif backfill_request.reprocess_behavior is ReprocessBehavior.FAILED: if dr.state != DagRunState.FAILED: non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS diff --git a/tests/api_fastapi/core_api/routes/public/test_backfills.py b/tests/api_fastapi/core_api/routes/public/test_backfills.py index f653489efedc3..c24b2b6cc8a8e 100644 --- a/tests/api_fastapi/core_api/routes/public/test_backfills.py +++ b/tests/api_fastapi/core_api/routes/public/test_backfills.py @@ -241,7 +241,8 @@ class TestCreateBackfillDryRun(TestBackfillEndpoint): "completed", [ {"logical_date": "2024-01-01T00:00:00Z"}, - {"logical_date": "2024-01-02T00:00:00Z"}, # Reprocess completed + {"logical_date": "2024-01-02T00:00:00Z"}, # Reprocess all + {"logical_date": "2024-01-03T00:00:00Z"}, {"logical_date": "2024-01-04T00:00:00Z"}, {"logical_date": "2024-01-05T00:00:00Z"}, ], From ab17ebf5cb5477eb4804add11b857df9e4c81288 Mon Sep 17 00:00:00 2001 From: Sneha Prabhu Date: Mon, 23 Dec 2024 10:35:06 +0530 Subject: [PATCH 05/13] create a separate endpoint --- .../core_api/openapi/v1-generated.yaml | 54 ++++++++- .../core_api/routes/public/backfills.py | 112 ++++++++++-------- airflow/ui/openapi-gen/queries/common.ts | 3 + airflow/ui/openapi-gen/queries/queries.ts | 40 ++++++- .../ui/openapi-gen/requests/services.gen.ts | 29 ++++- airflow/ui/openapi-gen/requests/types.gen.ts | 41 ++++++- .../core_api/routes/public/test_backfills.py | 2 +- 7 files changed, 222 insertions(+), 59 deletions(-) diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index bdc6ae842bc60..35e681381cb5f 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1143,10 +1143,7 @@ paths: content: application/json: schema: - anyOf: - - $ref: '#/components/schemas/BackfillResponse' - - $ref: '#/components/schemas/BackfillDryRunResponse' - title: Response Create Backfill + $ref: '#/components/schemas/BackfillResponse' '401': content: application/json: @@ -1368,6 +1365,55 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/backfills/dry_run: + post: + tags: + - Backfill + summary: Create Backfill Dry Run + operationId: create_backfill_dry_run + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/BackfillPostBody' + required: true + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/BackfillDryRunResponse' + '401': + description: Unauthorized + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + '403': + description: Forbidden + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + '404': + description: Not Found + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + '409': + description: Conflict + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/connections/{connection_id}: delete: tags: diff --git a/airflow/api_fastapi/core_api/routes/public/backfills.py b/airflow/api_fastapi/core_api/routes/public/backfills.py index 4d8a0b1d33daf..be4c20957ed03 100644 --- a/airflow/api_fastapi/core_api/routes/public/backfills.py +++ b/airflow/api_fastapi/core_api/routes/public/backfills.py @@ -194,62 +194,74 @@ def cancel_backfill(backfill_id, session: SessionDep) -> BackfillResponse: ) def create_backfill( backfill_request: BackfillPostBody, - session: SessionDep, -) -> BackfillResponse | BackfillDryRunResponse: +) -> BackfillResponse: from_date = timezone.coerce_datetime(backfill_request.from_date) to_date = timezone.coerce_datetime(backfill_request.to_date) - if not backfill_request.dry_run: - try: - backfill_obj = _create_backfill( - dag_id=backfill_request.dag_id, - from_date=from_date, - to_date=to_date, - max_active_runs=backfill_request.max_active_runs, - reverse=backfill_request.run_backwards, - dag_run_conf=backfill_request.dag_run_conf, - reprocess_behavior=backfill_request.reprocess_behavior, - ) - return BackfillResponse.model_validate(backfill_obj) - except AlreadyRunningBackfill: - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, - detail=f"There is already a running backfill for dag {backfill_request.dag_id}", - ) - else: - serdag = session.scalar(SerializedDagModel.latest_item_select_object(backfill_request.dag_id)) - if not serdag: - raise HTTPException(status_code=404, detail=f"Could not find dag {backfill_request.dag_id}") - - info_list = _get_info_list( - dag=serdag.dag, + try: + backfill_obj = _create_backfill( + dag_id=backfill_request.dag_id, from_date=from_date, to_date=to_date, + max_active_runs=backfill_request.max_active_runs, reverse=backfill_request.run_backwards, + dag_run_conf=backfill_request.dag_run_conf, + reprocess_behavior=backfill_request.reprocess_behavior, ) - backfill_response_item = [] - print(info_list) - for info in info_list: - print(info.logical_date) - dr = session.scalar( - select(DagRun) - .where(DagRun.logical_date == info.logical_date) - .order_by(nulls_first(desc(DagRun.start_date), session)) - .limit(1) - ) - - if dr: - non_create_reason = None - if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED): - non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT - elif backfill_request.reprocess_behavior is ReprocessBehavior.NONE: - non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS - elif backfill_request.reprocess_behavior is ReprocessBehavior.FAILED: - if dr.state != DagRunState.FAILED: - non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS - if not non_create_reason: - backfill_response_item.append(BackfillRunInfo(logical_date=info.logical_date)) + return BackfillResponse.model_validate(backfill_obj) + except AlreadyRunningBackfill: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=f"There is already a running backfill for dag {backfill_request.dag_id}", + ) + - else: +@backfills_router.post( + path="/dry_run", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_404_NOT_FOUND, + status.HTTP_409_CONFLICT, + ] + ), +) +def create_backfill_dry_run( + backfill_request: BackfillPostBody, + session: SessionDep, +) -> BackfillDryRunResponse: + from_date = timezone.coerce_datetime(backfill_request.from_date) + to_date = timezone.coerce_datetime(backfill_request.to_date) + serdag = session.scalar(SerializedDagModel.latest_item_select_object(backfill_request.dag_id)) + if not serdag: + raise HTTPException(status_code=404, detail=f"Could not find dag {backfill_request.dag_id}") + + info_list = _get_info_list( + dag=serdag.dag, + from_date=from_date, + to_date=to_date, + reverse=backfill_request.run_backwards, + ) + backfill_response_item = [] + for info in info_list: + dr = session.scalar( + select(DagRun) + .where(DagRun.logical_date == info.logical_date) + .order_by(nulls_first(desc(DagRun.start_date), session)) + .limit(1) + ) + + if dr: + non_create_reason = None + if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED): + non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT + elif backfill_request.reprocess_behavior is ReprocessBehavior.NONE: + non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS + elif backfill_request.reprocess_behavior is ReprocessBehavior.FAILED: + if dr.state != DagRunState.FAILED: + non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS + if not non_create_reason: backfill_response_item.append(BackfillRunInfo(logical_date=info.logical_date)) - return BackfillDryRunResponse(run_info_list=backfill_response_item) + else: + backfill_response_item.append(BackfillRunInfo(logical_date=info.logical_date)) + + return BackfillDryRunResponse(run_info_list=backfill_response_item) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index ea18796932159..f06c2c91a14bb 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -1686,6 +1686,9 @@ export type AssetServiceCreateAssetEventMutationResult = Awaited< export type BackfillServiceCreateBackfillMutationResult = Awaited< ReturnType >; +export type BackfillServiceCreateBackfillDryRunMutationResult = Awaited< + ReturnType +>; export type ConnectionServicePostConnectionMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index a41499117674a..b0bef485a23ee 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -2779,7 +2779,7 @@ export const useAssetServiceCreateAssetEvent = < * Create Backfill * @param data The data for the request. * @param data.requestBody - * @returns unknown Successful Response + * @returns BackfillResponse Successful Response * @throws ApiError */ export const useBackfillServiceCreateBackfill = < @@ -2813,6 +2813,44 @@ export const useBackfillServiceCreateBackfill = < }) as unknown as Promise, ...options, }); +/** + * Create Backfill Dry Run + * @param data The data for the request. + * @param data.requestBody + * @returns BackfillDryRunResponse Successful Response + * @throws ApiError + */ +export const useBackfillServiceCreateBackfillDryRun = < + TData = Common.BackfillServiceCreateBackfillDryRunMutationResult, + TError = unknown, + TContext = unknown, +>( + options?: Omit< + UseMutationOptions< + TData, + TError, + { + requestBody: BackfillPostBody; + }, + TContext + >, + "mutationFn" + >, +) => + useMutation< + TData, + TError, + { + requestBody: BackfillPostBody; + }, + TContext + >({ + mutationFn: ({ requestBody }) => + BackfillService.createBackfillDryRun({ + requestBody, + }) as unknown as Promise, + ...options, + }); /** * Post Connection * Create connection entry. diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index b967f19258e32..99967dfc0903e 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -54,6 +54,8 @@ import type { UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, + CreateBackfillDryRunData, + CreateBackfillDryRunResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, @@ -837,7 +839,7 @@ export class BackfillService { * Create Backfill * @param data The data for the request. * @param data.requestBody - * @returns unknown Successful Response + * @returns BackfillResponse Successful Response * @throws ApiError */ public static createBackfill( @@ -960,6 +962,31 @@ export class BackfillService { }, }); } + + /** + * Create Backfill Dry Run + * @param data The data for the request. + * @param data.requestBody + * @returns BackfillDryRunResponse Successful Response + * @throws ApiError + */ + public static createBackfillDryRun( + data: CreateBackfillDryRunData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "POST", + url: "/public/backfills/dry_run", + body: data.requestBody, + mediaType: "application/json", + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 409: "Conflict", + 422: "Validation Error", + }, + }); + } } export class ConnectionService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index b1f17c39c86e7..f92d2fd23a56f 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1513,7 +1513,7 @@ export type CreateBackfillData = { requestBody: BackfillPostBody; }; -export type CreateBackfillResponse = BackfillResponse | BackfillDryRunResponse; +export type CreateBackfillResponse = BackfillResponse; export type GetBackfillData = { backfillId: string; @@ -1539,6 +1539,12 @@ export type CancelBackfillData = { export type CancelBackfillResponse = BackfillResponse; +export type CreateBackfillDryRunData = { + requestBody: BackfillPostBody; +}; + +export type CreateBackfillDryRunResponse = BackfillDryRunResponse; + export type DeleteConnectionData = { connectionId: string; }; @@ -2645,7 +2651,7 @@ export type $OpenApiTs = { /** * Successful Response */ - 200: BackfillResponse | BackfillDryRunResponse; + 200: BackfillResponse; /** * Unauthorized */ @@ -2789,6 +2795,37 @@ export type $OpenApiTs = { }; }; }; + "/public/backfills/dry_run": { + post: { + req: CreateBackfillDryRunData; + res: { + /** + * Successful Response + */ + 200: BackfillDryRunResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Conflict + */ + 409: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/public/connections/{connection_id}": { delete: { req: DeleteConnectionData; diff --git a/tests/api_fastapi/core_api/routes/public/test_backfills.py b/tests/api_fastapi/core_api/routes/public/test_backfills.py index c24b2b6cc8a8e..b4eae7010cf0f 100644 --- a/tests/api_fastapi/core_api/routes/public/test_backfills.py +++ b/tests/api_fastapi/core_api/routes/public/test_backfills.py @@ -295,7 +295,7 @@ def test_create_backfill_dry_run( } response = test_client.post( - url="/public/backfills", + url="/public/backfills/dry_run", json=data, ) From dcf0e2dbd44062a4dfdff07f8fe6abed7f881970 Mon Sep 17 00:00:00 2001 From: Sneha Prabhu Date: Sun, 29 Dec 2024 14:24:03 +0530 Subject: [PATCH 06/13] Refactor _create_backfill function --- .../core_api/datamodels/backfills.py | 8 +- .../core_api/openapi/v1-generated.yaml | 56 ++--- .../core_api/routes/public/backfills.py | 68 ++---- airflow/models/backfill.py | 231 +++++++++++------- airflow/ui/openapi-gen/queries/queries.ts | 2 +- .../ui/openapi-gen/requests/schemas.gen.ts | 71 +++--- .../ui/openapi-gen/requests/services.gen.ts | 2 +- airflow/ui/openapi-gen/requests/types.gen.ts | 34 +-- .../core_api/routes/public/test_backfills.py | 2 +- 9 files changed, 254 insertions(+), 220 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/backfills.py b/airflow/api_fastapi/core_api/datamodels/backfills.py index c5f6bd86ea242..4926f28693e8e 100644 --- a/airflow/api_fastapi/core_api/datamodels/backfills.py +++ b/airflow/api_fastapi/core_api/datamodels/backfills.py @@ -33,7 +33,6 @@ class BackfillPostBody(BaseModel): dag_run_conf: dict = {} reprocess_behavior: ReprocessBehavior = ReprocessBehavior.NONE max_active_runs: int = 10 - dry_run: bool = False class BackfillResponse(BaseModel): @@ -59,13 +58,14 @@ class BackfillCollectionResponse(BaseModel): total_entries: int -class BackfillRunInfo(BaseModel): +class DryRunBackfillResponse(BaseModel): """Data model for run information during a backfill operation.""" logical_date: datetime -class BackfillDryRunResponse(BaseModel): +class DryRunBackfillCollectionResponse(BaseModel): """Serializer for responses in dry-run mode for backfill operations.""" - run_info_list: list[BackfillRunInfo] + backfills: list[DryRunBackfillResponse] + total_entries: int diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 41081fc97c603..a6560e7babba7 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1383,7 +1383,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/BackfillDryRunResponse' + $ref: '#/components/schemas/DryRunBackfillCollectionResponse' '401': description: Unauthorized content: @@ -6265,18 +6265,6 @@ components: - total_entries title: BackfillCollectionResponse description: Backfill Collection serializer for responses. - BackfillDryRunResponse: - properties: - run_info_list: - items: - $ref: '#/components/schemas/BackfillRunInfo' - type: array - title: Run Info List - type: object - required: - - run_info_list - title: BackfillDryRunResponse - description: Serializer for responses in dry-run mode for backfill operations. BackfillPostBody: properties: dag_id: @@ -6305,10 +6293,6 @@ components: type: integer title: Max Active Runs default: 10 - dry_run: - type: boolean - title: Dry Run - default: false type: object required: - dag_id @@ -6372,17 +6356,6 @@ components: - updated_at title: BackfillResponse description: Base serializer for Backfill. - BackfillRunInfo: - properties: - logical_date: - type: string - format: date-time - title: Logical Date - type: object - required: - - logical_date - title: BackfillRunInfo - description: Data model for run information during a backfill operation. BaseInfoResponse: properties: status: @@ -7800,6 +7773,33 @@ components: This is the set of allowable values for the ``warning_type`` field in the DagWarning model.' + DryRunBackfillCollectionResponse: + properties: + backfills: + items: + $ref: '#/components/schemas/DryRunBackfillResponse' + type: array + title: Backfills + total_entries: + type: integer + title: Total Entries + type: object + required: + - backfills + - total_entries + title: DryRunBackfillCollectionResponse + description: Serializer for responses in dry-run mode for backfill operations. + DryRunBackfillResponse: + properties: + logical_date: + type: string + format: date-time + title: Logical Date + type: object + required: + - logical_date + title: DryRunBackfillResponse + description: Data model for run information during a backfill operation. EdgeResponse: properties: is_setup_teardown: diff --git a/airflow/api_fastapi/core_api/routes/public/backfills.py b/airflow/api_fastapi/core_api/routes/public/backfills.py index be4c20957ed03..931ad167e9e10 100644 --- a/airflow/api_fastapi/core_api/routes/public/backfills.py +++ b/airflow/api_fastapi/core_api/routes/public/backfills.py @@ -19,7 +19,7 @@ from typing import Annotated from fastapi import Depends, HTTPException, status -from sqlalchemy import desc, select, update +from sqlalchemy import select, update from airflow.api_fastapi.common.db.common import ( AsyncSessionDep, @@ -30,10 +30,10 @@ from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.datamodels.backfills import ( BackfillCollectionResponse, - BackfillDryRunResponse, BackfillPostBody, BackfillResponse, - BackfillRunInfo, + DryRunBackfillCollectionResponse, + DryRunBackfillResponse, ) from airflow.api_fastapi.core_api.openapi.exceptions import ( create_openapi_http_exception_doc, @@ -43,14 +43,9 @@ AlreadyRunningBackfill, Backfill, BackfillDagRun, - BackfillDagRunExceptionReason, - ReprocessBehavior, _create_backfill, - _get_info_list, ) -from airflow.models.serialized_dag import SerializedDagModel from airflow.utils import timezone -from airflow.utils.sqlalchemy import nulls_first from airflow.utils.state import DagRunState backfills_router = AirflowRouter(tags=["Backfill"], prefix="/backfills") @@ -225,43 +220,28 @@ def create_backfill( ), ) def create_backfill_dry_run( - backfill_request: BackfillPostBody, - session: SessionDep, -) -> BackfillDryRunResponse: - from_date = timezone.coerce_datetime(backfill_request.from_date) - to_date = timezone.coerce_datetime(backfill_request.to_date) - serdag = session.scalar(SerializedDagModel.latest_item_select_object(backfill_request.dag_id)) - if not serdag: - raise HTTPException(status_code=404, detail=f"Could not find dag {backfill_request.dag_id}") + body: BackfillPostBody, +) -> DryRunBackfillCollectionResponse: + from_date = timezone.coerce_datetime(body.from_date) + to_date = timezone.coerce_datetime(body.to_date) - info_list = _get_info_list( - dag=serdag.dag, - from_date=from_date, - to_date=to_date, - reverse=backfill_request.run_backwards, - ) - backfill_response_item = [] - for info in info_list: - dr = session.scalar( - select(DagRun) - .where(DagRun.logical_date == info.logical_date) - .order_by(nulls_first(desc(DagRun.start_date), session)) - .limit(1) + try: + backfills_dry_run = _create_backfill( + dag_id=body.dag_id, + from_date=from_date, + to_date=to_date, + max_active_runs=body.max_active_runs, + reverse=body.run_backwards, + dag_run_conf=body.dag_run_conf, + reprocess_behavior=body.reprocess_behavior, + dry_run=True, ) + backfills = [DryRunBackfillResponse(logical_date=logical_date) for logical_date in backfills_dry_run] - if dr: - non_create_reason = None - if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED): - non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT - elif backfill_request.reprocess_behavior is ReprocessBehavior.NONE: - non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS - elif backfill_request.reprocess_behavior is ReprocessBehavior.FAILED: - if dr.state != DagRunState.FAILED: - non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS - if not non_create_reason: - backfill_response_item.append(BackfillRunInfo(logical_date=info.logical_date)) - - else: - backfill_response_item.append(BackfillRunInfo(logical_date=info.logical_date)) + return DryRunBackfillCollectionResponse(backfills=backfills, total_entries=len(backfills_dry_run)) - return BackfillDryRunResponse(run_info_list=backfill_response_item) + except AlreadyRunningBackfill: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="There is already a running backfill for the dag", + ) diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index 0e88fa15bb04f..32d1f99072687 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -25,7 +25,7 @@ import logging from enum import Enum -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, overload from sqlalchemy import ( Boolean, @@ -33,7 +33,6 @@ ForeignKeyConstraint, Integer, UniqueConstraint, - desc, func, select, ) @@ -47,13 +46,15 @@ from airflow.settings import json from airflow.utils import timezone from airflow.utils.session import create_session -from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, with_row_locks +from airflow.utils.sqlalchemy import UtcDateTime, with_row_locks from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType if TYPE_CHECKING: from datetime import datetime + from typing_extensions import Literal + log = logging.getLogger(__name__) @@ -158,72 +159,98 @@ def validate_sort_ordinal(self, key, val): def _create_backfill_dag_run( *, dag, - info, - reprocess_behavior: ReprocessBehavior, + dagrun_info_list, + reprocess_behavior: ReprocessBehavior | None = None, backfill_id, - dag_run_conf, - backfill_sort_ordinal, + dag_run_conf: dict | None, session, -): + dry_run, +) -> list[datetime]: from airflow.models import DagRun - with session.begin_nested() as nested: - dr = session.scalar( - with_row_locks( - select(DagRun) - .where(DagRun.logical_date == info.logical_date) - .order_by(nulls_first(desc(DagRun.start_date), session=session)) - .limit(1), - session=session, - ) - ) - if dr: - non_create_reason = None - if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED): - non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT - elif reprocess_behavior is ReprocessBehavior.NONE: - non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS - elif reprocess_behavior is ReprocessBehavior.FAILED: - if dr.state != DagRunState.FAILED: + backfill_sort_ordinal = 0 + logical_dates = [] + dagrun_infos = list(dagrun_info_list) + + if reprocess_behavior is None: + reprocess_behavior = ReprocessBehavior.NONE + if dag_run_conf is None: + dag_run_conf = {} + + existing_dag_runs = { + dr.logical_date: dr + for dr in session.scalars( + select(DagRun) + .where(DagRun.dag_id == dag.dag_id) + .where(DagRun.logical_date.in_([info.logical_date for info in dagrun_infos])) + ).all() + } + + for info in dagrun_infos: + backfill_sort_ordinal += 1 + dr = existing_dag_runs.get(info.logical_date) + non_create_reason = None + + with session.begin_nested() as nested: + if dr: + if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED): + non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT + elif reprocess_behavior is ReprocessBehavior.NONE: non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS - if non_create_reason: - # rolling back here restores to start of this nested tran - # which releases the lock on the latest dag run, since we - # are not creating a new one - nested.rollback() + elif reprocess_behavior is ReprocessBehavior.FAILED: + if dr.state != DagRunState.FAILED: + non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS + + if non_create_reason: + if not dry_run: + nested.rollback() + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=None, + logical_date=dr.logical_date, + exception_reason=non_create_reason, + sort_ordinal=backfill_sort_ordinal, + ) + ) + else: + logical_dates.append(dr.logical_date) + else: + logical_dates.append(info.logical_date) + + if not non_create_reason and not dry_run: + dag_version = DagVersion.get_latest_version(dag.dag_id, session=session) + dr = dag.create_dagrun( + triggered_by=DagRunTriggeredByType.BACKFILL, + logical_date=info.logical_date, + data_interval=info.data_interval, + start_date=timezone.utcnow(), + state=DagRunState.QUEUED, + external_trigger=False, + conf=dag_run_conf, + run_type=DagRunType.BACKFILL_JOB, + creating_job_id=None, + session=session, + backfill_id=backfill_id, + dag_version=dag_version, + ) session.add( BackfillDagRun( backfill_id=backfill_id, - dag_run_id=None, - logical_date=info.logical_date, - exception_reason=non_create_reason, + dag_run_id=dr.id, sort_ordinal=backfill_sort_ordinal, + logical_date=info.logical_date, ) ) - return - dag_version = DagVersion.get_latest_version(dag.dag_id, session=session) - dr = dag.create_dagrun( - triggered_by=DagRunTriggeredByType.BACKFILL, - logical_date=info.logical_date, - data_interval=info.data_interval, - start_date=timezone.utcnow(), - state=DagRunState.QUEUED, - external_trigger=False, - conf=dag_run_conf, - run_type=DagRunType.BACKFILL_JOB, - creating_job_id=None, - session=session, - backfill_id=backfill_id, - dag_version=dag_version, - ) - session.add( - BackfillDagRun( - backfill_id=backfill_id, - dag_run_id=dr.id, - sort_ordinal=backfill_sort_ordinal, - logical_date=info.logical_date, - ) - ) + + log.info( + "created backfill dag run dag_id=%s backfill_id=%s, info=%s", + dag.dag_id, + backfill_id, + info, + ) + + return logical_dates def _get_info_list( @@ -241,6 +268,34 @@ def _get_info_list( return dagrun_info_list +@overload +def _create_backfill( + *, + dag_id: str, + from_date: datetime, + to_date: datetime, + max_active_runs: int, + reverse: bool, + dag_run_conf: dict | None, + reprocess_behavior: ReprocessBehavior | None = ..., + dry_run: Literal[True], +) -> list[datetime]: ... + + +@overload +def _create_backfill( + *, + dag_id: str, + from_date: datetime, + to_date: datetime, + max_active_runs: int, + reverse: bool, + dag_run_conf: dict | None, + reprocess_behavior: ReprocessBehavior | None = ..., + dry_run: Literal[False] | None = ..., +) -> Backfill | None: ... + + def _create_backfill( *, dag_id: str, @@ -250,7 +305,8 @@ def _create_backfill( reverse: bool, dag_run_conf: dict | None, reprocess_behavior: ReprocessBehavior | None = None, -) -> Backfill | None: + dry_run: bool | None = False, +) -> Backfill | list[datetime] | None: from airflow.models import DagModel from airflow.models.serialized_dag import SerializedDagModel @@ -284,18 +340,23 @@ def _create_backfill( "You must set reprocess behavior to reprocess completed or " "reprocess failed" ) - br = Backfill( - dag_id=dag_id, - from_date=from_date, - to_date=to_date, - max_active_runs=max_active_runs, - dag_run_conf=dag_run_conf, - reprocess_behavior=reprocess_behavior, - ) - session.add(br) - session.commit() - backfill_sort_ordinal = 0 + backfill_id = None + + if not dry_run: + br = Backfill( + dag_id=dag_id, + from_date=from_date, + to_date=to_date, + max_active_runs=max_active_runs, + dag_run_conf=dag_run_conf, + reprocess_behavior=reprocess_behavior, + ) + + session.add(br) + session.commit() + + backfill_id = br.id dagrun_info_list = _get_info_list( from_date=from_date, @@ -316,21 +377,15 @@ def _create_backfill( ) if not dag_model: raise RuntimeError(f"Dag {dag_id} not found") - for info in dagrun_info_list: - backfill_sort_ordinal += 1 - _create_backfill_dag_run( - dag=dag, - info=info, - backfill_id=br.id, - dag_run_conf=br.dag_run_conf, - reprocess_behavior=br.reprocess_behavior, - backfill_sort_ordinal=backfill_sort_ordinal, - session=session, - ) - log.info( - "created backfill dag run dag_id=%s backfill_id=%s, info=%s", - dag.dag_id, - br.id, - info, - ) - return br + + backfill_response = _create_backfill_dag_run( + dag=dag, + dagrun_info_list=dagrun_info_list, + backfill_id=backfill_id, + dag_run_conf=dag_run_conf, + reprocess_behavior=reprocess_behavior, + session=session, + dry_run=dry_run, + ) + + return br if not dry_run else backfill_response diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index b0bef485a23ee..9de4e5e9275d1 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -2817,7 +2817,7 @@ export const useBackfillServiceCreateBackfill = < * Create Backfill Dry Run * @param data The data for the request. * @param data.requestBody - * @returns BackfillDryRunResponse Successful Response + * @returns DryRunBackfillCollectionResponse Successful Response * @throws ApiError */ export const useBackfillServiceCreateBackfillDryRun = < diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 995bb3e71c4bb..a1c232465b3ac 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -350,23 +350,6 @@ export const $BackfillCollectionResponse = { description: "Backfill Collection serializer for responses.", } as const; -export const $BackfillDryRunResponse = { - properties: { - run_info_list: { - items: { - $ref: "#/components/schemas/BackfillRunInfo", - }, - type: "array", - title: "Run Info List", - }, - }, - type: "object", - required: ["run_info_list"], - title: "BackfillDryRunResponse", - description: - "Serializer for responses in dry-run mode for backfill operations.", -} as const; - export const $BackfillPostBody = { properties: { dag_id: { @@ -402,11 +385,6 @@ export const $BackfillPostBody = { title: "Max Active Runs", default: 10, }, - dry_run: { - type: "boolean", - title: "Dry Run", - default: false, - }, }, type: "object", required: ["dag_id", "from_date", "to_date"], @@ -490,20 +468,6 @@ export const $BackfillResponse = { description: "Base serializer for Backfill.", } as const; -export const $BackfillRunInfo = { - properties: { - logical_date: { - type: "string", - format: "date-time", - title: "Logical Date", - }, - }, - type: "object", - required: ["logical_date"], - title: "BackfillRunInfo", - description: "Data model for run information during a backfill operation.", -} as const; - export const $BaseInfoResponse = { properties: { status: { @@ -2666,6 +2630,41 @@ This is the set of allowable values for the \`\`warning_type\`\` field in the DagWarning model.`, } as const; +export const $DryRunBackfillCollectionResponse = { + properties: { + backfills: { + items: { + $ref: "#/components/schemas/DryRunBackfillResponse", + }, + type: "array", + title: "Backfills", + }, + total_entries: { + type: "integer", + title: "Total Entries", + }, + }, + type: "object", + required: ["backfills", "total_entries"], + title: "DryRunBackfillCollectionResponse", + description: + "Serializer for responses in dry-run mode for backfill operations.", +} as const; + +export const $DryRunBackfillResponse = { + properties: { + logical_date: { + type: "string", + format: "date-time", + title: "Logical Date", + }, + }, + type: "object", + required: ["logical_date"], + title: "DryRunBackfillResponse", + description: "Data model for run information during a backfill operation.", +} as const; + export const $EdgeResponse = { properties: { is_setup_teardown: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index be8a0cda9660d..f25fb39a9639e 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -967,7 +967,7 @@ export class BackfillService { * Create Backfill Dry Run * @param data The data for the request. * @param data.requestBody - * @returns BackfillDryRunResponse Successful Response + * @returns DryRunBackfillCollectionResponse Successful Response * @throws ApiError */ public static createBackfillDryRun( diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 6494219e2479e..97016b99ace07 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -97,13 +97,6 @@ export type BackfillCollectionResponse = { total_entries: number; }; -/** - * Serializer for responses in dry-run mode for backfill operations. - */ -export type BackfillDryRunResponse = { - run_info_list: Array; -}; - /** * Object used for create backfill request. */ @@ -117,7 +110,6 @@ export type BackfillPostBody = { }; reprocess_behavior?: ReprocessBehavior; max_active_runs?: number; - dry_run?: boolean; }; /** @@ -139,13 +131,6 @@ export type BackfillResponse = { updated_at: string; }; -/** - * Data model for run information during a backfill operation. - */ -export type BackfillRunInfo = { - logical_date: string; -}; - /** * Base info serializer for responses. */ @@ -651,6 +636,21 @@ export type DagTagResponse = { */ export type DagWarningType = "asset conflict" | "non-existent pool"; +/** + * Serializer for responses in dry-run mode for backfill operations. + */ +export type DryRunBackfillCollectionResponse = { + backfills: Array; + total_entries: number; +}; + +/** + * Data model for run information during a backfill operation. + */ +export type DryRunBackfillResponse = { + logical_date: string; +}; + /** * Edge serializer for responses. */ @@ -1543,7 +1543,7 @@ export type CreateBackfillDryRunData = { requestBody: BackfillPostBody; }; -export type CreateBackfillDryRunResponse = BackfillDryRunResponse; +export type CreateBackfillDryRunResponse = DryRunBackfillCollectionResponse; export type DeleteConnectionData = { connectionId: string; @@ -2802,7 +2802,7 @@ export type $OpenApiTs = { /** * Successful Response */ - 200: BackfillDryRunResponse; + 200: DryRunBackfillCollectionResponse; /** * Unauthorized */ diff --git a/tests/api_fastapi/core_api/routes/public/test_backfills.py b/tests/api_fastapi/core_api/routes/public/test_backfills.py index b4eae7010cf0f..e98540c16ff98 100644 --- a/tests/api_fastapi/core_api/routes/public/test_backfills.py +++ b/tests/api_fastapi/core_api/routes/public/test_backfills.py @@ -301,7 +301,7 @@ def test_create_backfill_dry_run( assert response.status_code == 200 response_json = response.json() - assert response_json["run_info_list"] == expected_dates + assert response_json["backfills"] == expected_dates class TestCancelBackfill(TestBackfillEndpoint): From 828907c32b17e9c861488d500996c01062d2e6c0 Mon Sep 17 00:00:00 2001 From: Sneha Prabhu Date: Sun, 29 Dec 2024 17:40:37 +0530 Subject: [PATCH 07/13] Fix static checks --- airflow/models/backfill.py | 28 +++++++++++++++++-- .../core_api/routes/public/test_backfills.py | 1 - 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index 32d1f99072687..01b23086354ef 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -177,15 +177,39 @@ def _create_backfill_dag_run( if dag_run_conf is None: dag_run_conf = {} + dag_run_ranked = ( + select( + DagRun.logical_date, + DagRun.start_date, + DagRun.run_id, + DagRun.dag_id, + func.row_number() + .over( + partition_by=DagRun.logical_date, + order_by=(DagRun.start_date.desc().nullsfirst()), + ) + .label("row_number"), + ) + .where(DagRun.dag_id == dag.dag_id) + .where(DagRun.logical_date.in_([info.logical_date for info in dagrun_infos])) + .subquery() + ) + existing_dag_runs = { dr.logical_date: dr for dr in session.scalars( select(DagRun) - .where(DagRun.dag_id == dag.dag_id) - .where(DagRun.logical_date.in_([info.logical_date for info in dagrun_infos])) + .join( + dag_run_ranked, + (DagRun.logical_date == dag_run_ranked.c.logical_date) + & (DagRun.dag_id == dag_run_ranked.c.dag_id), + ) + .where(dag_run_ranked.c.row_number == 1) ).all() } + print(existing_dag_runs) + for info in dagrun_infos: backfill_sort_ordinal += 1 dr = existing_dag_runs.get(info.logical_date) diff --git a/tests/api_fastapi/core_api/routes/public/test_backfills.py b/tests/api_fastapi/core_api/routes/public/test_backfills.py index e98540c16ff98..4f0ae7918e41d 100644 --- a/tests/api_fastapi/core_api/routes/public/test_backfills.py +++ b/tests/api_fastapi/core_api/routes/public/test_backfills.py @@ -290,7 +290,6 @@ def test_create_backfill_dry_run( "max_active_runs": 5, "run_backwards": False, "dag_run_conf": {"param1": "val1", "param2": True}, - "dry_run": True, "reprocess_behavior": reprocess_behavior, } From b130be48530672261fb324ad0d2ab6d6567d7bf3 Mon Sep 17 00:00:00 2001 From: Sneha Prabhu Date: Sun, 29 Dec 2024 19:28:00 +0530 Subject: [PATCH 08/13] Fix static checks --- airflow/models/backfill.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index 01b23086354ef..cf55fc300cf2c 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -181,7 +181,6 @@ def _create_backfill_dag_run( select( DagRun.logical_date, DagRun.start_date, - DagRun.run_id, DagRun.dag_id, func.row_number() .over( @@ -202,6 +201,10 @@ def _create_backfill_dag_run( .join( dag_run_ranked, (DagRun.logical_date == dag_run_ranked.c.logical_date) + & ( + (DagRun.start_date == dag_run_ranked.c.start_date) + | ((DagRun.start_date.is_(None)) & (dag_run_ranked.c.start_date.is_(None))) + ) & (DagRun.dag_id == dag_run_ranked.c.dag_id), ) .where(dag_run_ranked.c.row_number == 1) From f4b7602a2094a03330f2b99ec54776c30aa69282 Mon Sep 17 00:00:00 2001 From: Sneha Prabhu Date: Sun, 29 Dec 2024 23:55:02 +0530 Subject: [PATCH 09/13] Fix mysql static checks --- airflow/models/backfill.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index cf55fc300cf2c..c9158bbad240d 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -27,15 +27,7 @@ from enum import Enum from typing import TYPE_CHECKING, overload -from sqlalchemy import ( - Boolean, - Column, - ForeignKeyConstraint, - Integer, - UniqueConstraint, - func, - select, -) +from sqlalchemy import Boolean, Column, ForeignKeyConstraint, Integer, UniqueConstraint, case, func, select from sqlalchemy.orm import relationship, validates from sqlalchemy_jsonfield import JSONField @@ -185,7 +177,7 @@ def _create_backfill_dag_run( func.row_number() .over( partition_by=DagRun.logical_date, - order_by=(DagRun.start_date.desc().nullsfirst()), + order_by=(case([(DagRun.start_date.is_(None), 0)], else_=1), DagRun.start_date.desc()), ) .label("row_number"), ) From 178e664136b934eaf8dbe018707cf01be9411aa2 Mon Sep 17 00:00:00 2001 From: Sneha Prabhu Date: Wed, 8 Jan 2025 17:15:43 +0530 Subject: [PATCH 10/13] Address PR comments --- .../core_api/datamodels/backfills.py | 4 ++-- .../core_api/openapi/v1-generated.yaml | 4 ++-- .../core_api/routes/public/backfills.py | 2 +- airflow/models/backfill.py | 19 +++++++------------ airflow/ui/openapi-gen/queries/queries.ts | 4 +--- .../ui/openapi-gen/requests/schemas.gen.ts | 5 ++--- airflow/ui/openapi-gen/requests/types.gen.ts | 4 ++-- 7 files changed, 17 insertions(+), 25 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/backfills.py b/airflow/api_fastapi/core_api/datamodels/backfills.py index 4926f28693e8e..e36e50ea3b8d7 100644 --- a/airflow/api_fastapi/core_api/datamodels/backfills.py +++ b/airflow/api_fastapi/core_api/datamodels/backfills.py @@ -59,13 +59,13 @@ class BackfillCollectionResponse(BaseModel): class DryRunBackfillResponse(BaseModel): - """Data model for run information during a backfill operation.""" + """Backfill serializer for responses in dry-run mode.""" logical_date: datetime class DryRunBackfillCollectionResponse(BaseModel): - """Serializer for responses in dry-run mode for backfill operations.""" + """Backfill collection serializer for responses in dry-run mode.""" backfills: list[DryRunBackfillResponse] total_entries: int diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index c07570d93392d..fffd52bc1d23a 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -8006,7 +8006,7 @@ components: - backfills - total_entries title: DryRunBackfillCollectionResponse - description: Serializer for responses in dry-run mode for backfill operations. + description: Backfill collection serializer for responses in dry-run mode. DryRunBackfillResponse: properties: logical_date: @@ -8017,7 +8017,7 @@ components: required: - logical_date title: DryRunBackfillResponse - description: Data model for run information during a backfill operation. + description: Backfill serializer for responses in dry-run mode. EdgeResponse: properties: is_setup_teardown: diff --git a/airflow/api_fastapi/core_api/routes/public/backfills.py b/airflow/api_fastapi/core_api/routes/public/backfills.py index 931ad167e9e10..f576abba2474a 100644 --- a/airflow/api_fastapi/core_api/routes/public/backfills.py +++ b/airflow/api_fastapi/core_api/routes/public/backfills.py @@ -236,7 +236,7 @@ def create_backfill_dry_run( reprocess_behavior=body.reprocess_behavior, dry_run=True, ) - backfills = [DryRunBackfillResponse(logical_date=logical_date) for logical_date in backfills_dry_run] + backfills = [DryRunBackfillResponse(logical_date=d) for d in backfills_dry_run] return DryRunBackfillCollectionResponse(backfills=backfills, total_entries=len(backfills_dry_run)) diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index c9158bbad240d..a6394e7667b37 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -44,8 +44,7 @@ if TYPE_CHECKING: from datetime import datetime - - from typing_extensions import Literal + from typing import Literal log = logging.getLogger(__name__) @@ -192,19 +191,15 @@ def _create_backfill_dag_run( select(DagRun) .join( dag_run_ranked, - (DagRun.logical_date == dag_run_ranked.c.logical_date) - & ( - (DagRun.start_date == dag_run_ranked.c.start_date) - | ((DagRun.start_date.is_(None)) & (dag_run_ranked.c.start_date.is_(None))) - ) - & (DagRun.dag_id == dag_run_ranked.c.dag_id), + DagRun.logical_date == dag_run_ranked.c.logical_date + and DagRun.start_date == dag_run_ranked.c.start_date + and DagRun.dag_id == dag_run_ranked.c.dag_id, + isouter=True, ) .where(dag_run_ranked.c.row_number == 1) ).all() } - print(existing_dag_runs) - for info in dagrun_infos: backfill_sort_ordinal += 1 dr = existing_dag_runs.get(info.logical_date) @@ -397,7 +392,7 @@ def _create_backfill( if not dag_model: raise RuntimeError(f"Dag {dag_id} not found") - backfill_response = _create_backfill_dag_run( + logical_dates = _create_backfill_dag_run( dag=dag, dagrun_info_list=dagrun_info_list, backfill_id=backfill_id, @@ -407,4 +402,4 @@ def _create_backfill( dry_run=dry_run, ) - return br if not dry_run else backfill_response + return br if not dry_run else logical_dates diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index a4863575acdc5..d3fa3b0d7660e 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -2743,9 +2743,7 @@ export const useBackfillServiceCreateBackfillDryRun = < TContext >({ mutationFn: ({ requestBody }) => - BackfillService.createBackfillDryRun({ - requestBody, - }) as unknown as Promise, + BackfillService.createBackfillDryRun({ requestBody }) as unknown as Promise, ...options, }); /** diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 4d4da246af02e..184382ea6c897 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -2657,8 +2657,7 @@ export const $DryRunBackfillCollectionResponse = { type: "object", required: ["backfills", "total_entries"], title: "DryRunBackfillCollectionResponse", - description: - "Serializer for responses in dry-run mode for backfill operations.", + description: "Backfill collection serializer for responses in dry-run mode.", } as const; export const $DryRunBackfillResponse = { @@ -2672,7 +2671,7 @@ export const $DryRunBackfillResponse = { type: "object", required: ["logical_date"], title: "DryRunBackfillResponse", - description: "Data model for run information during a backfill operation.", + description: "Backfill serializer for responses in dry-run mode.", } as const; export const $EdgeResponse = { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index fdecf9b02780e..039e4d0c1f332 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -638,7 +638,7 @@ export type DagTagResponse = { export type DagWarningType = "asset conflict" | "non-existent pool"; /** - * Serializer for responses in dry-run mode for backfill operations. + * Backfill collection serializer for responses in dry-run mode. */ export type DryRunBackfillCollectionResponse = { backfills: Array; @@ -646,7 +646,7 @@ export type DryRunBackfillCollectionResponse = { }; /** - * Data model for run information during a backfill operation. + * Backfill serializer for responses in dry-run mode. */ export type DryRunBackfillResponse = { logical_date: string; From e65fe7f0a021fa969a06ab432a73b0914ef0005a Mon Sep 17 00:00:00 2001 From: Sneha Prabhu Date: Thu, 9 Jan 2025 17:14:08 +0530 Subject: [PATCH 11/13] refactor code --- .../core_api/routes/public/backfills.py | 29 +- .../remote_commands/backfill_command.py | 52 ++- airflow/models/backfill.py | 320 ++++++++---------- 3 files changed, 177 insertions(+), 224 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/backfills.py b/airflow/api_fastapi/core_api/routes/public/backfills.py index f576abba2474a..6b06be22d80bc 100644 --- a/airflow/api_fastapi/core_api/routes/public/backfills.py +++ b/airflow/api_fastapi/core_api/routes/public/backfills.py @@ -44,6 +44,7 @@ Backfill, BackfillDagRun, _create_backfill, + _do_dry_run, ) from airflow.utils import timezone from airflow.utils.state import DagRunState @@ -225,23 +226,13 @@ def create_backfill_dry_run( from_date = timezone.coerce_datetime(body.from_date) to_date = timezone.coerce_datetime(body.to_date) - try: - backfills_dry_run = _create_backfill( - dag_id=body.dag_id, - from_date=from_date, - to_date=to_date, - max_active_runs=body.max_active_runs, - reverse=body.run_backwards, - dag_run_conf=body.dag_run_conf, - reprocess_behavior=body.reprocess_behavior, - dry_run=True, - ) - backfills = [DryRunBackfillResponse(logical_date=d) for d in backfills_dry_run] - - return DryRunBackfillCollectionResponse(backfills=backfills, total_entries=len(backfills_dry_run)) + backfills_dry_run = _do_dry_run( + dag_id=body.dag_id, + from_date=from_date, + to_date=to_date, + reverse=body.run_backwards, + reprocess_behavior=body.reprocess_behavior, + ) + backfills = [DryRunBackfillResponse(logical_date=d) for d in backfills_dry_run] - except AlreadyRunningBackfill: - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, - detail="There is already a running backfill for the dag", - ) + return DryRunBackfillCollectionResponse(backfills=backfills, total_entries=len(backfills_dry_run)) diff --git a/airflow/cli/commands/remote_commands/backfill_command.py b/airflow/cli/commands/remote_commands/backfill_command.py index 63a8573ab7379..277f0e7719400 100644 --- a/airflow/cli/commands/remote_commands/backfill_command.py +++ b/airflow/cli/commands/remote_commands/backfill_command.py @@ -21,31 +21,10 @@ import signal from airflow import settings -from airflow.models.backfill import ReprocessBehavior, _create_backfill, _get_info_list -from airflow.models.serialized_dag import SerializedDagModel +from airflow.models.backfill import ReprocessBehavior, _create_backfill, _do_dry_run from airflow.utils import cli as cli_utils from airflow.utils.cli import sigint_handler from airflow.utils.providers_configuration_loader import providers_configuration_loaded -from airflow.utils.session import create_session - - -def _do_dry_run(*, params, dag_id, from_date, to_date, reverse): - print("Performing dry run of backfill.") - print("Printing params:") - for k, v in params.items(): - print(f" - {k} = {v}") - with create_session() as session: - serdag = session.get(SerializedDagModel, dag_id) - - info_list = _get_info_list( - dag=serdag.dag, - from_date=from_date, - to_date=to_date, - reverse=reverse, - ) - print("Logical dates to be attempted:") - for info in info_list: - print(f" - {info.logical_date}") @cli_utils.action_cli @@ -61,22 +40,31 @@ def create_backfill(args) -> None: reprocess_behavior = None if args.dry_run: - _do_dry_run( - params=dict( - dag_id=args.dag_id, - from_date=args.from_date, - to_date=args.to_date, - max_active_runs=args.max_active_runs, - reverse=args.run_backwards, - dag_run_conf=args.dag_run_conf, - reprocess_behavior=reprocess_behavior, - ), + print("Performing dry run of backfill.") + print("Printing params:") + params = dict( dag_id=args.dag_id, from_date=args.from_date, to_date=args.to_date, + max_active_runs=args.max_active_runs, reverse=args.run_backwards, + dag_run_conf=args.dag_run_conf, + reprocess_behavior=reprocess_behavior, ) + for k, v in params.items(): + print(f" - {k} = {v}") + logical_dates = _do_dry_run( + dag_id=args.dag_id, + from_date=args.from_date, + to_date=args.to_date, + reverse=args.reverse, + reprocess_behavior=args.reprocess_behavior, + ) + print("Logical dates to be attempted:") + for d in logical_dates: + print(f" - {d}") return + _create_backfill( dag_id=args.dag_id, from_date=args.from_date, diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index a6394e7667b37..e0dec0d90f7fe 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -25,9 +25,18 @@ import logging from enum import Enum -from typing import TYPE_CHECKING, overload - -from sqlalchemy import Boolean, Column, ForeignKeyConstraint, Integer, UniqueConstraint, case, func, select +from typing import TYPE_CHECKING + +from sqlalchemy import ( + Boolean, + Column, + ForeignKeyConstraint, + Integer, + UniqueConstraint, + desc, + func, + select, +) from sqlalchemy.orm import relationship, validates from sqlalchemy_jsonfield import JSONField @@ -38,13 +47,12 @@ from airflow.settings import json from airflow.utils import timezone from airflow.utils.session import create_session -from airflow.utils.sqlalchemy import UtcDateTime, with_row_locks +from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, with_row_locks from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType if TYPE_CHECKING: from datetime import datetime - from typing import Literal log = logging.getLogger(__name__) @@ -147,124 +155,127 @@ def validate_sort_ordinal(self, key, val): return val -def _create_backfill_dag_run( - *, - dag, - dagrun_info_list, - reprocess_behavior: ReprocessBehavior | None = None, - backfill_id, - dag_run_conf: dict | None, - session, - dry_run, -) -> list[datetime]: +def _get_latest_dag_run_row(info, session): from airflow.models import DagRun - backfill_sort_ordinal = 0 - logical_dates = [] - dagrun_infos = list(dagrun_info_list) - - if reprocess_behavior is None: - reprocess_behavior = ReprocessBehavior.NONE - if dag_run_conf is None: - dag_run_conf = {} - - dag_run_ranked = ( - select( - DagRun.logical_date, - DagRun.start_date, - DagRun.dag_id, - func.row_number() - .over( - partition_by=DagRun.logical_date, - order_by=(case([(DagRun.start_date.is_(None), 0)], else_=1), DagRun.start_date.desc()), - ) - .label("row_number"), + return session.scalar( + with_row_locks( + select(DagRun) + .where(DagRun.logical_date == info.logical_date) + .order_by(nulls_first(desc(DagRun.start_date), session=session)) + .limit(1), + session=session, ) - .where(DagRun.dag_id == dag.dag_id) - .where(DagRun.logical_date.in_([info.logical_date for info in dagrun_infos])) - .subquery() ) - existing_dag_runs = { - dr.logical_date: dr - for dr in session.scalars( - select(DagRun) - .join( - dag_run_ranked, - DagRun.logical_date == dag_run_ranked.c.logical_date - and DagRun.start_date == dag_run_ranked.c.start_date - and DagRun.dag_id == dag_run_ranked.c.dag_id, - isouter=True, + +def _get_dag_run_no_create_reason(dr, reprocess_behavior: ReprocessBehavior) -> str | None: + non_create_reason = None + if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED): + non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT + elif reprocess_behavior is ReprocessBehavior.NONE: + non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS + elif reprocess_behavior is ReprocessBehavior.FAILED: + if dr.state != DagRunState.FAILED: + non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS + return non_create_reason + + +def _validate_backfill_params(dag, reverse, reprocess_behavior: ReprocessBehavior | None): + depends_on_past = None + depends_on_past = any(x.depends_on_past for x in dag.tasks) + if depends_on_past: + if reverse is True: + raise ValueError( + "Backfill cannot be run in reverse when the dag has tasks where depends_on_past=True" + ) + if reprocess_behavior in (None, ReprocessBehavior.NONE): + raise ValueError( + "Dag has task for which depends_on_past is true. " + "You must set reprocess behavior to reprocess completed or " + "reprocess failed" ) - .where(dag_run_ranked.c.row_number == 1) - ).all() - } - for info in dagrun_infos: - backfill_sort_ordinal += 1 - dr = existing_dag_runs.get(info.logical_date) - non_create_reason = None - with session.begin_nested() as nested: +def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior) -> list[datetime]: + from airflow.models.serialized_dag import SerializedDagModel + + with create_session() as session: + serdag = session.scalar(SerializedDagModel.latest_item_select_object(dag_id)) + dag = serdag.dag + _validate_backfill_params(dag, reverse, reprocess_behavior) + + dagrun_info_list = _get_info_list( + dag=dag, + from_date=from_date, + to_date=to_date, + reverse=reverse, + ) + logical_dates = [] + for info in dagrun_info_list: + dr = _get_latest_dag_run_row(info, session) if dr: - if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED): - non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT - elif reprocess_behavior is ReprocessBehavior.NONE: - non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS - elif reprocess_behavior is ReprocessBehavior.FAILED: - if dr.state != DagRunState.FAILED: - non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS - - if non_create_reason: - if not dry_run: - nested.rollback() - session.add( - BackfillDagRun( - backfill_id=backfill_id, - dag_run_id=None, - logical_date=dr.logical_date, - exception_reason=non_create_reason, - sort_ordinal=backfill_sort_ordinal, - ) - ) - else: - logical_dates.append(dr.logical_date) + non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior) + if not non_create_reason: + logical_dates.append(info.logical_date) else: logical_dates.append(info.logical_date) + return logical_dates - if not non_create_reason and not dry_run: - dag_version = DagVersion.get_latest_version(dag.dag_id, session=session) - dr = dag.create_dagrun( - triggered_by=DagRunTriggeredByType.BACKFILL, - logical_date=info.logical_date, - data_interval=info.data_interval, - start_date=timezone.utcnow(), - state=DagRunState.QUEUED, - external_trigger=False, - conf=dag_run_conf, - run_type=DagRunType.BACKFILL_JOB, - creating_job_id=None, - session=session, - backfill_id=backfill_id, - dag_version=dag_version, - ) + +def _create_backfill_dag_run( + *, + dag, + info, + reprocess_behavior: ReprocessBehavior, + backfill_id, + dag_run_conf, + backfill_sort_ordinal, + session, +): + with session.begin_nested() as nested: + dr = _get_latest_dag_run_row(info, session) + + if dr: + non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior) + if non_create_reason: + # rolling back here restores to start of this nested tran + # which releases the lock on the latest dag run, since we + # are not creating a new one + nested.rollback() session.add( BackfillDagRun( backfill_id=backfill_id, - dag_run_id=dr.id, - sort_ordinal=backfill_sort_ordinal, + dag_run_id=None, logical_date=info.logical_date, + exception_reason=non_create_reason, + sort_ordinal=backfill_sort_ordinal, ) ) - - log.info( - "created backfill dag run dag_id=%s backfill_id=%s, info=%s", - dag.dag_id, - backfill_id, - info, - ) - - return logical_dates + return + dag_version = DagVersion.get_latest_version(dag.dag_id, session=session) + dr = dag.create_dagrun( + triggered_by=DagRunTriggeredByType.BACKFILL, + logical_date=info.logical_date, + data_interval=info.data_interval, + start_date=timezone.utcnow(), + state=DagRunState.QUEUED, + external_trigger=False, + conf=dag_run_conf, + run_type=DagRunType.BACKFILL_JOB, + creating_job_id=None, + session=session, + backfill_id=backfill_id, + dag_version=dag_version, + ) + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=dr.id, + sort_ordinal=backfill_sort_ordinal, + logical_date=info.logical_date, + ) + ) def _get_info_list( @@ -282,34 +293,6 @@ def _get_info_list( return dagrun_info_list -@overload -def _create_backfill( - *, - dag_id: str, - from_date: datetime, - to_date: datetime, - max_active_runs: int, - reverse: bool, - dag_run_conf: dict | None, - reprocess_behavior: ReprocessBehavior | None = ..., - dry_run: Literal[True], -) -> list[datetime]: ... - - -@overload -def _create_backfill( - *, - dag_id: str, - from_date: datetime, - to_date: datetime, - max_active_runs: int, - reverse: bool, - dag_run_conf: dict | None, - reprocess_behavior: ReprocessBehavior | None = ..., - dry_run: Literal[False] | None = ..., -) -> Backfill | None: ... - - def _create_backfill( *, dag_id: str, @@ -319,8 +302,7 @@ def _create_backfill( reverse: bool, dag_run_conf: dict | None, reprocess_behavior: ReprocessBehavior | None = None, - dry_run: bool | None = False, -) -> Backfill | list[datetime] | None: +) -> Backfill | None: from airflow.models import DagModel from airflow.models.serialized_dag import SerializedDagModel @@ -342,35 +324,20 @@ def _create_backfill( ) dag = serdag.dag - depends_on_past = any(x.depends_on_past for x in dag.tasks) - if depends_on_past: - if reverse is True: - raise ValueError( - "Backfill cannot be run in reverse when the dag has tasks where depends_on_past=True" - ) - if reprocess_behavior in (None, ReprocessBehavior.NONE): - raise ValueError( - "Dag has task for which depends_on_past is true. " - "You must set reprocess behavior to reprocess completed or " - "reprocess failed" - ) - - backfill_id = None - - if not dry_run: - br = Backfill( - dag_id=dag_id, - from_date=from_date, - to_date=to_date, - max_active_runs=max_active_runs, - dag_run_conf=dag_run_conf, - reprocess_behavior=reprocess_behavior, - ) + _validate_backfill_params(dag, reverse, reprocess_behavior) - session.add(br) - session.commit() + br = Backfill( + dag_id=dag_id, + from_date=from_date, + to_date=to_date, + max_active_runs=max_active_runs, + dag_run_conf=dag_run_conf, + reprocess_behavior=reprocess_behavior, + ) + session.add(br) + session.commit() - backfill_id = br.id + backfill_sort_ordinal = 0 dagrun_info_list = _get_info_list( from_date=from_date, @@ -392,14 +359,21 @@ def _create_backfill( if not dag_model: raise RuntimeError(f"Dag {dag_id} not found") - logical_dates = _create_backfill_dag_run( - dag=dag, - dagrun_info_list=dagrun_info_list, - backfill_id=backfill_id, - dag_run_conf=dag_run_conf, - reprocess_behavior=reprocess_behavior, - session=session, - dry_run=dry_run, - ) - - return br if not dry_run else logical_dates + for info in dagrun_info_list: + backfill_sort_ordinal += 1 + _create_backfill_dag_run( + dag=dag, + info=info, + backfill_id=br.id, + dag_run_conf=br.dag_run_conf, + reprocess_behavior=br.reprocess_behavior, + backfill_sort_ordinal=backfill_sort_ordinal, + session=session, + ) + log.info( + "created backfill dag run dag_id=%s backfill_id=%s, info=%s", + dag.dag_id, + br.id, + info, + ) + return br From 81083e077a7fcbe6fa3902a291745a661e099c77 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 9 Jan 2025 23:03:03 -0800 Subject: [PATCH 12/13] don't lock for dry run --- airflow/models/backfill.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index e0dec0d90f7fe..c924bb2163d00 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -155,17 +155,14 @@ def validate_sort_ordinal(self, key, val): return val -def _get_latest_dag_run_row(info, session): +def _get_latest_dag_run_row_query(info, session): from airflow.models import DagRun - return session.scalar( - with_row_locks( - select(DagRun) - .where(DagRun.logical_date == info.logical_date) - .order_by(nulls_first(desc(DagRun.start_date), session=session)) - .limit(1), - session=session, - ) + return ( + select(DagRun) + .where(DagRun.logical_date == info.logical_date) + .order_by(nulls_first(desc(DagRun.start_date), session=session)) + .limit(1) ) @@ -213,7 +210,9 @@ def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior) -> l ) logical_dates = [] for info in dagrun_info_list: - dr = _get_latest_dag_run_row(info, session) + dr = session.scalar( + statement=_get_latest_dag_run_row_query(info, session), + ) if dr: non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior) if not non_create_reason: @@ -234,8 +233,12 @@ def _create_backfill_dag_run( session, ): with session.begin_nested() as nested: - dr = _get_latest_dag_run_row(info, session) - + dr = session.scalar( + with_row_locks( + query=_get_latest_dag_run_row_query(info, session), + session=session, + ), + ) if dr: non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior) if non_create_reason: From 4b877c9a7ac81d4c5044f67d4116b3a926f60943 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Fri, 10 Jan 2025 09:14:21 -0800 Subject: [PATCH 13/13] take a session --- .../core_api/routes/public/backfills.py | 2 + .../remote_commands/backfill_command.py | 17 ++++---- airflow/models/backfill.py | 41 +++++++++---------- 3 files changed, 32 insertions(+), 28 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/backfills.py b/airflow/api_fastapi/core_api/routes/public/backfills.py index 6b06be22d80bc..4dae9ce3b4761 100644 --- a/airflow/api_fastapi/core_api/routes/public/backfills.py +++ b/airflow/api_fastapi/core_api/routes/public/backfills.py @@ -222,6 +222,7 @@ def create_backfill( ) def create_backfill_dry_run( body: BackfillPostBody, + session: SessionDep, ) -> DryRunBackfillCollectionResponse: from_date = timezone.coerce_datetime(body.from_date) to_date = timezone.coerce_datetime(body.to_date) @@ -232,6 +233,7 @@ def create_backfill_dry_run( to_date=to_date, reverse=body.run_backwards, reprocess_behavior=body.reprocess_behavior, + session=session, ) backfills = [DryRunBackfillResponse(logical_date=d) for d in backfills_dry_run] diff --git a/airflow/cli/commands/remote_commands/backfill_command.py b/airflow/cli/commands/remote_commands/backfill_command.py index 277f0e7719400..4256e6a8b3677 100644 --- a/airflow/cli/commands/remote_commands/backfill_command.py +++ b/airflow/cli/commands/remote_commands/backfill_command.py @@ -25,6 +25,7 @@ from airflow.utils import cli as cli_utils from airflow.utils.cli import sigint_handler from airflow.utils.providers_configuration_loader import providers_configuration_loaded +from airflow.utils.session import create_session @cli_utils.action_cli @@ -53,13 +54,15 @@ def create_backfill(args) -> None: ) for k, v in params.items(): print(f" - {k} = {v}") - logical_dates = _do_dry_run( - dag_id=args.dag_id, - from_date=args.from_date, - to_date=args.to_date, - reverse=args.reverse, - reprocess_behavior=args.reprocess_behavior, - ) + with create_session() as session: + logical_dates = _do_dry_run( + dag_id=args.dag_id, + from_date=args.from_date, + to_date=args.to_date, + reverse=args.reverse, + reprocess_behavior=args.reprocess_behavior, + session=session, + ) print("Logical dates to be attempted:") for d in logical_dates: print(f" - {d}") diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index c924bb2163d00..39a28379a6526 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -194,32 +194,31 @@ def _validate_backfill_params(dag, reverse, reprocess_behavior: ReprocessBehavio ) -def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior) -> list[datetime]: +def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior, session) -> list[datetime]: from airflow.models.serialized_dag import SerializedDagModel - with create_session() as session: - serdag = session.scalar(SerializedDagModel.latest_item_select_object(dag_id)) - dag = serdag.dag - _validate_backfill_params(dag, reverse, reprocess_behavior) + serdag = session.scalar(SerializedDagModel.latest_item_select_object(dag_id)) + dag = serdag.dag + _validate_backfill_params(dag, reverse, reprocess_behavior) - dagrun_info_list = _get_info_list( - dag=dag, - from_date=from_date, - to_date=to_date, - reverse=reverse, + dagrun_info_list = _get_info_list( + dag=dag, + from_date=from_date, + to_date=to_date, + reverse=reverse, + ) + logical_dates = [] + for info in dagrun_info_list: + dr = session.scalar( + statement=_get_latest_dag_run_row_query(info, session), ) - logical_dates = [] - for info in dagrun_info_list: - dr = session.scalar( - statement=_get_latest_dag_run_row_query(info, session), - ) - if dr: - non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior) - if not non_create_reason: - logical_dates.append(info.logical_date) - else: + if dr: + non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior) + if not non_create_reason: logical_dates.append(info.logical_date) - return logical_dates + else: + logical_dates.append(info.logical_date) + return logical_dates def _create_backfill_dag_run(