Skip to content

Commit

Permalink
AIP-84: Migrating DELETE queued asset events for DAG to fastAPI (#44129)
Browse files Browse the repository at this point in the history
* AIP-84: Migrating GET queued asset events for DAG to fastAPI

* fixing tests and server code

* fixing parameters

* fixing parameters

* AIP-84: Migrating delete queued asset events for DAG to fastAPI

* adding assert for count

* review comments from kalyan
  • Loading branch information
amoghrajesh authored Nov 18, 2024
1 parent f3bad1e commit 6c3caa6
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 1 deletion.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/asset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ def get_dag_asset_queued_events(
)


@mark_fastapi_migration_done
@security.requires_access_asset("DELETE")
@security.requires_access_dag("GET")
@action_logging
Expand Down
53 changes: 53 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,59 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
delete:
tags:
- Asset
summary: Delete Dag Asset Queued Events
operationId: delete_dag_asset_queued_events
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: before
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Before
responses:
'204':
description: Successful Response
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/backfills/:
get:
tags:
Expand Down
26 changes: 25 additions & 1 deletion airflow/api_fastapi/core_api/routes/public/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from typing import Annotated

from fastapi import Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy import delete, select
from sqlalchemy.orm import Session, joinedload, subqueryload

from airflow.api_fastapi.common.db.common import get_session, paginated_select
Expand Down Expand Up @@ -252,3 +252,27 @@ def get_dag_asset_queued_events(
],
total_entries=total_entries,
)


@assets_router.delete(
"/dags/{dag_id}/assets/queuedEvent",
status_code=status.HTTP_204_NO_CONTENT,
responses=create_openapi_http_exception_doc(
[
status.HTTP_400_BAD_REQUEST,
status.HTTP_404_NOT_FOUND,
]
),
)
def delete_dag_asset_queued_events(
dag_id: str,
session: Annotated[Session, Depends(get_session)],
before: OptionalDateTimeQuery = None,
):
where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before)

delete_statement = delete(AssetDagRunQueue).where(*where_clause)
result = session.execute(delete_statement)

if result.rowcount == 0:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with dag_id: `{dag_id}` was not found")
3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1158,6 +1158,9 @@ export type PoolServicePatchPoolMutationResult = Awaited<
export type VariableServicePatchVariableMutationResult = Awaited<
ReturnType<typeof VariableService.patchVariable>
>;
export type AssetServiceDeleteDagAssetQueuedEventsMutationResult = Awaited<
ReturnType<typeof AssetService.deleteDagAssetQueuedEvents>
>;
export type ConnectionServiceDeleteConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.deleteConnection>
>;
Expand Down
42 changes: 42 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2551,6 +2551,48 @@ export const useVariableServicePatchVariable = <
}) as unknown as Promise<TData>,
...options,
});
/**
* Delete Dag Asset Queued Events
* @param data The data for the request.
* @param data.dagId
* @param data.before
* @returns void Successful Response
* @throws ApiError
*/
export const useAssetServiceDeleteDagAssetQueuedEvents = <
TData = Common.AssetServiceDeleteDagAssetQueuedEventsMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
before?: string;
dagId: string;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
before?: string;
dagId: string;
},
TContext
>({
mutationFn: ({ before, dagId }) =>
AssetService.deleteDagAssetQueuedEvents({
before,
dagId,
}) as unknown as Promise<TData>,
...options,
});
/**
* Delete Connection
* Delete a connection entry.
Expand Down
32 changes: 32 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import type {
GetAssetResponse,
GetDagAssetQueuedEventsData,
GetDagAssetQueuedEventsResponse,
DeleteDagAssetQueuedEventsData,
DeleteDagAssetQueuedEventsResponse,
HistoricalMetricsData,
HistoricalMetricsResponse,
RecentDagRunsData,
Expand Down Expand Up @@ -306,6 +308,36 @@ export class AssetService {
},
});
}

/**
* Delete Dag Asset Queued Events
* @param data The data for the request.
* @param data.dagId
* @param data.before
* @returns void Successful Response
* @throws ApiError
*/
public static deleteDagAssetQueuedEvents(
data: DeleteDagAssetQueuedEventsData,
): CancelablePromise<DeleteDagAssetQueuedEventsResponse> {
return __request(OpenAPI, {
method: "DELETE",
url: "/public/dags/{dag_id}/assets/queuedEvent",
path: {
dag_id: data.dagId,
},
query: {
before: data.before,
},
errors: {
400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
422: "Validation Error",
},
});
}
}

export class DashboardService {
Expand Down
36 changes: 36 additions & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,13 @@ export type GetDagAssetQueuedEventsData = {

export type GetDagAssetQueuedEventsResponse = QueuedEventCollectionResponse;

export type DeleteDagAssetQueuedEventsData = {
before?: string | null;
dagId: string;
};

export type DeleteDagAssetQueuedEventsResponse = void;

export type HistoricalMetricsData = {
endDate: string;
startDate: string;
Expand Down Expand Up @@ -1691,6 +1698,35 @@ export type $OpenApiTs = {
422: HTTPValidationError;
};
};
delete: {
req: DeleteDagAssetQueuedEventsData;
res: {
/**
* Successful Response
*/
204: void;
/**
* Bad Request
*/
400: HTTPExceptionResponse;
/**
* Unauthorized
*/
401: HTTPExceptionResponse;
/**
* Forbidden
*/
403: HTTPExceptionResponse;
/**
* Not Found
*/
404: HTTPExceptionResponse;
/**
* Validation Error
*/
422: HTTPValidationError;
};
};
};
"/ui/dashboard/historical_metrics_data": {
get: {
Expand Down
44 changes: 44 additions & 0 deletions tests/api_fastapi/core_api/routes/public/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,50 @@ def test_should_respond_404(self, test_client):
assert response.json()["detail"] == "Queue event with dag_id: `not_exists` was not found"


class TestDeleteDagDatasetQueuedEvents(TestQueuedEventEndpoint):
@pytest.mark.usefixtures("time_freezer")
def test_should_respond_204(self, test_client, session, create_dummy_dag):
dag, _ = create_dummy_dag()
dag_id = dag.dag_id
self.create_assets(session=session, num=1)
asset_id = 1
self._create_asset_dag_run_queues(dag_id, asset_id, session)
adrqs = session.query(AssetDagRunQueue).all()
assert len(adrqs) == 1

response = test_client.delete(
f"/public/dags/{dag_id}/assets/queuedEvent",
)

assert response.status_code == 204
adrqs = session.query(AssetDagRunQueue).all()
assert len(adrqs) == 0

def test_should_respond_404_invalid_dag(self, test_client):
dag_id = "not_exists"

response = test_client.delete(
f"/public/dags/{dag_id}/assets/queuedEvent",
)

assert response.status_code == 404
assert response.json()["detail"] == "Queue event with dag_id: `not_exists` was not found"

def test_should_respond_404_valid_dag_no_adrq(self, test_client, session, create_dummy_dag):
dag, _ = create_dummy_dag()
dag_id = dag.dag_id
self.create_assets(session=session, num=1)
adrqs = session.query(AssetDagRunQueue).all()
assert len(adrqs) == 0

response = test_client.delete(
f"/public/dags/{dag_id}/assets/queuedEvent",
)

assert response.status_code == 404
assert response.json()["detail"] == "Queue event with dag_id: `dag` was not found"


class TestPostAssetEvents(TestAssets):
@pytest.mark.usefixtures("time_freezer")
def test_should_respond_200(self, test_client, session):
Expand Down

0 comments on commit 6c3caa6

Please sign in to comment.