Skip to content

Commit

Permalink
AIP-84 Fix: Allow Null Values for end_date Field in Dashboard Endpint…
Browse files Browse the repository at this point in the history
… in FastAPI (#44043)
  • Loading branch information
bugraoz93 authored Nov 19, 2024
1 parent 7747b7b commit 8d2e96f
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 60 deletions.
6 changes: 4 additions & 2 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ paths:
title: Start Date
- name: end_date
in: query
required: true
required: false
schema:
type: string
anyOf:
- type: string
- type: 'null'
title: End Date
responses:
'200':
Expand Down
13 changes: 7 additions & 6 deletions airflow/api_fastapi/core_api/routes/ui/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from sqlalchemy import func, select
from sqlalchemy.orm import Session

from airflow.api_fastapi.common.parameters import DateTimeQuery
from airflow.api_fastapi.common.parameters import DateTimeQuery, OptionalDateTimeQuery
from airflow.api_fastapi.core_api.datamodels.ui.dashboard import HistoricalMetricDataResponse
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.models.dagrun import DagRun, DagRunType
Expand All @@ -44,17 +44,18 @@
responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST]),
)
def historical_metrics(
start_date: DateTimeQuery,
end_date: DateTimeQuery,
session: Annotated[Session, Depends(get_session)],
start_date: DateTimeQuery,
end_date: OptionalDateTimeQuery = None,
) -> HistoricalMetricDataResponse:
"""Return cluster activity historical metrics."""
current_time = timezone.utcnow()
# DagRuns
dag_run_types = session.execute(
select(DagRun.run_type, func.count(DagRun.run_id))
.where(
DagRun.start_date >= start_date,
func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date,
func.coalesce(DagRun.end_date, current_time) <= func.coalesce(end_date, current_time),
)
.group_by(DagRun.run_type)
).all()
Expand All @@ -63,7 +64,7 @@ def historical_metrics(
select(DagRun.state, func.count(DagRun.run_id))
.where(
DagRun.start_date >= start_date,
func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date,
func.coalesce(DagRun.end_date, current_time) <= func.coalesce(end_date, current_time),
)
.group_by(DagRun.state)
).all()
Expand All @@ -74,7 +75,7 @@ def historical_metrics(
.join(TaskInstance.dag_run)
.where(
DagRun.start_date >= start_date,
func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date,
func.coalesce(DagRun.end_date, current_time) <= func.coalesce(end_date, current_time),
)
.group_by(TaskInstance.state)
).all()
Expand Down
2 changes: 1 addition & 1 deletion airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ export const UseDashboardServiceHistoricalMetricsKeyFn = (
endDate,
startDate,
}: {
endDate: string;
endDate?: string;
startDate: string;
},
queryKey?: Array<unknown>,
Expand Down
2 changes: 1 addition & 1 deletion airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ export const prefetchUseDashboardServiceHistoricalMetrics = (
endDate,
startDate,
}: {
endDate: string;
endDate?: string;
startDate: string;
},
) =>
Expand Down
2 changes: 1 addition & 1 deletion airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ export const useDashboardServiceHistoricalMetrics = <
endDate,
startDate,
}: {
endDate: string;
endDate?: string;
startDate: string;
},
queryKey?: TQueryKey,
Expand Down
2 changes: 1 addition & 1 deletion airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ export const useDashboardServiceHistoricalMetricsSuspense = <
endDate,
startDate,
}: {
endDate: string;
endDate?: string;
startDate: string;
},
queryKey?: TQueryKey,
Expand Down
2 changes: 1 addition & 1 deletion airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1137,7 +1137,7 @@ export type DeleteDagAssetQueuedEventData = {
export type DeleteDagAssetQueuedEventResponse = void;

export type HistoricalMetricsData = {
endDate: string;
endDate?: string | null;
startDate: string;
};

Expand Down
120 changes: 73 additions & 47 deletions tests/api_fastapi/core_api/routes/ui/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,53 +100,79 @@ def make_dag_runs(dag_maker, session, time_machine):


class TestHistoricalMetricsDataEndpoint:
@pytest.mark.parametrize(
"params, expected",
[
(
{"start_date": "2023-01-01T00:00", "end_date": "2023-08-02T00:00"},
{
"dag_run_states": {"failed": 1, "queued": 0, "running": 1, "success": 1},
"dag_run_types": {"backfill": 0, "asset_triggered": 1, "manual": 0, "scheduled": 2},
"task_instance_states": {
"deferred": 0,
"failed": 2,
"no_status": 2,
"queued": 0,
"removed": 0,
"restarting": 0,
"running": 0,
"scheduled": 0,
"skipped": 0,
"success": 2,
"up_for_reschedule": 0,
"up_for_retry": 0,
"upstream_failed": 0,
},
},
),
(
{"start_date": "2023-02-02T00:00", "end_date": "2023-06-02T00:00"},
{
"dag_run_states": {"failed": 1, "queued": 0, "running": 0, "success": 0},
"dag_run_types": {"backfill": 0, "asset_triggered": 1, "manual": 0, "scheduled": 0},
"task_instance_states": {
"deferred": 0,
"failed": 2,
"no_status": 0,
"queued": 0,
"removed": 0,
"restarting": 0,
"running": 0,
"scheduled": 0,
"skipped": 0,
"success": 0,
"up_for_reschedule": 0,
"up_for_retry": 0,
"upstream_failed": 0,
},
},
),
(
{"start_date": "2023-02-02T00:00"},
{
"dag_run_states": {"failed": 1, "queued": 0, "running": 1, "success": 0},
"dag_run_types": {"backfill": 0, "asset_triggered": 1, "manual": 0, "scheduled": 1},
"task_instance_states": {
"deferred": 0,
"failed": 2,
"no_status": 2,
"queued": 0,
"removed": 0,
"restarting": 0,
"running": 0,
"scheduled": 0,
"skipped": 0,
"success": 0,
"up_for_reschedule": 0,
"up_for_retry": 0,
"upstream_failed": 0,
},
},
),
],
)
@pytest.mark.usefixtures("freeze_time_for_dagruns", "make_dag_runs")
def test_historical_metrics_data(self, test_client, time_machine):
params = {"start_date": "2023-01-01T00:00", "end_date": "2023-08-02T00:00"}
response = test_client.get("/ui/dashboard/historical_metrics_data", params=params)

assert response.status_code == 200
assert response.json() == {
"dag_run_states": {"failed": 1, "queued": 0, "running": 1, "success": 1},
"dag_run_types": {"backfill": 0, "asset_triggered": 1, "manual": 0, "scheduled": 2},
"task_instance_states": {
"deferred": 0,
"failed": 2,
"no_status": 2,
"queued": 0,
"removed": 0,
"restarting": 0,
"running": 0,
"scheduled": 0,
"skipped": 0,
"success": 2,
"up_for_reschedule": 0,
"up_for_retry": 0,
"upstream_failed": 0,
},
}

@pytest.mark.usefixtures("freeze_time_for_dagruns", "make_dag_runs")
def test_historical_metrics_data_date_filters(self, test_client):
params = {"start_date": "2023-02-02T00:00", "end_date": "2023-06-02T00:00"}
def test_historical_metrics_data(self, test_client, params, expected):
response = test_client.get("/ui/dashboard/historical_metrics_data", params=params)
assert response.status_code == 200
assert response.json() == {
"dag_run_states": {"failed": 1, "queued": 0, "running": 0, "success": 0},
"dag_run_types": {"backfill": 0, "asset_triggered": 1, "manual": 0, "scheduled": 0},
"task_instance_states": {
"deferred": 0,
"failed": 2,
"no_status": 0,
"queued": 0,
"removed": 0,
"restarting": 0,
"running": 0,
"scheduled": 0,
"skipped": 0,
"success": 0,
"up_for_reschedule": 0,
"up_for_retry": 0,
"upstream_failed": 0,
},
}
assert response.json() == expected

0 comments on commit 8d2e96f

Please sign in to comment.