Skip to content

Commit

Permalink
take a session
Browse files Browse the repository at this point in the history
  • Loading branch information
dstandish committed Jan 10, 2025
1 parent 52a4e0b commit 4b877c9
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 28 deletions.
2 changes: 2 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ def create_backfill(
)
def create_backfill_dry_run(
body: BackfillPostBody,
session: SessionDep,
) -> DryRunBackfillCollectionResponse:
from_date = timezone.coerce_datetime(body.from_date)
to_date = timezone.coerce_datetime(body.to_date)
Expand All @@ -232,6 +233,7 @@ def create_backfill_dry_run(
to_date=to_date,
reverse=body.run_backwards,
reprocess_behavior=body.reprocess_behavior,
session=session,
)
backfills = [DryRunBackfillResponse(logical_date=d) for d in backfills_dry_run]

Expand Down
17 changes: 10 additions & 7 deletions airflow/cli/commands/remote_commands/backfill_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from airflow.utils import cli as cli_utils
from airflow.utils.cli import sigint_handler
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.session import create_session


@cli_utils.action_cli
Expand Down Expand Up @@ -53,13 +54,15 @@ def create_backfill(args) -> None:
)
for k, v in params.items():
print(f" - {k} = {v}")
logical_dates = _do_dry_run(
dag_id=args.dag_id,
from_date=args.from_date,
to_date=args.to_date,
reverse=args.reverse,
reprocess_behavior=args.reprocess_behavior,
)
with create_session() as session:
logical_dates = _do_dry_run(
dag_id=args.dag_id,
from_date=args.from_date,
to_date=args.to_date,
reverse=args.reverse,
reprocess_behavior=args.reprocess_behavior,
session=session,
)
print("Logical dates to be attempted:")
for d in logical_dates:
print(f" - {d}")
Expand Down
41 changes: 20 additions & 21 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,32 +194,31 @@ def _validate_backfill_params(dag, reverse, reprocess_behavior: ReprocessBehavio
)


def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior) -> list[datetime]:
def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior, session) -> list[datetime]:
from airflow.models.serialized_dag import SerializedDagModel

with create_session() as session:
serdag = session.scalar(SerializedDagModel.latest_item_select_object(dag_id))
dag = serdag.dag
_validate_backfill_params(dag, reverse, reprocess_behavior)
serdag = session.scalar(SerializedDagModel.latest_item_select_object(dag_id))
dag = serdag.dag
_validate_backfill_params(dag, reverse, reprocess_behavior)

dagrun_info_list = _get_info_list(
dag=dag,
from_date=from_date,
to_date=to_date,
reverse=reverse,
dagrun_info_list = _get_info_list(
dag=dag,
from_date=from_date,
to_date=to_date,
reverse=reverse,
)
logical_dates = []
for info in dagrun_info_list:
dr = session.scalar(
statement=_get_latest_dag_run_row_query(info, session),
)
logical_dates = []
for info in dagrun_info_list:
dr = session.scalar(
statement=_get_latest_dag_run_row_query(info, session),
)
if dr:
non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior)
if not non_create_reason:
logical_dates.append(info.logical_date)
else:
if dr:
non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior)
if not non_create_reason:
logical_dates.append(info.logical_date)
return logical_dates
else:
logical_dates.append(info.logical_date)
return logical_dates


def _create_backfill_dag_run(
Expand Down

0 comments on commit 4b877c9

Please sign in to comment.