From 378e3581f31dfcc6dfa4661d07643ea07b2406bb Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Fri, 10 Jan 2025 11:19:12 +0100 Subject: [PATCH 01/24] Pin pnpm action to hash commit following best GH Action practices (#45547) This was found by the CodeQL Actions scanning --- .github/workflows/basic-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/basic-tests.yml b/.github/workflows/basic-tests.yml index 353f65d9a6c9c..da803aee31904 100644 --- a/.github/workflows/basic-tests.yml +++ b/.github/workflows/basic-tests.yml @@ -98,7 +98,7 @@ jobs: - name: "Cleanup docker" run: ./scripts/ci/cleanup_docker.sh - name: Setup pnpm - uses: pnpm/action-setup@v4.0.0 + uses: pnpm/action-setup@fe02b34f77f8bc703788d5817da081398fad5dd2 # v4.0.0 with: version: 9 run_install: false From a149dd72b31cf428e3a87b2ab3e3197cadd9a014 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Fri, 10 Jan 2025 11:19:30 +0100 Subject: [PATCH 02/24] CodeQL scanning can run always on all code (#45541) The CodeQL scannig is fast and having custom configuration to select which scanning to run should be run makes it unnecessarily complex We can just run all CodeQL scans always. This has been suggested by actions codeql scan itself. --- .github/workflows/codeql-analysis.yml | 57 --------------------------- 1 file changed, 57 deletions(-) diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index be0d690799550..1fcf81a84fd5b 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -33,46 +33,12 @@ concurrency: cancel-in-progress: true jobs: - selective-checks: - name: Selective checks - runs-on: ["ubuntu-22.04"] - env: - GITHUB_CONTEXT: ${{ toJson(github) }} - outputs: - needs-python-scans: ${{ steps.selective-checks.outputs.needs-python-scans }} - needs-javascript-scans: ${{ steps.selective-checks.outputs.needs-javascript-scans }} - steps: - - name: Checkout repository - uses: actions/checkout@v4 - with: - fetch-depth: 2 - persist-credentials: false - - name: "Install Breeze" - uses: ./.github/actions/breeze - with: - use-uv: "true" - - name: "Get information about the Workflow" - id: source-run-info - run: breeze ci get-workflow-info 2>> ${GITHUB_OUTPUT} - env: - SKIP_BREEZE_SELF_UPGRADE_CHECK: "true" - - name: Selective checks - id: selective-checks - env: - PR_LABELS: "${{ steps.source-run-info.outputs.pr-labels }}" - COMMIT_REF: "${{ github.sha }}" - VERBOSE: "false" - run: breeze ci selective-check 2>> ${GITHUB_OUTPUT} - analyze: name: Analyze runs-on: ["ubuntu-22.04"] - needs: [selective-checks] strategy: fail-fast: false matrix: - # Override automatic language detection by changing the below list - # Supported options are ['csharp', 'cpp', 'go', 'java', 'javascript', 'python'] language: ['python', 'javascript', 'actions'] permissions: actions: read @@ -84,37 +50,14 @@ jobs: uses: actions/checkout@v4 with: persist-credentials: false - if: | - matrix.language == 'actions' || - matrix.language == 'python' && needs.selective-checks.outputs.needs-python-scans == 'true' || - matrix.language == 'javascript' && needs.selective-checks.outputs.needs-javascript-scans == 'true' - # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL uses: github/codeql-action/init@v3 with: languages: ${{ matrix.language }} - # If you wish to specify custom queries, you can do so here or in a config file. - # By default, queries listed here will override any specified in a config file. - # Prefix the list here with "+" to use these queries and those in the config file. - # queries: ./path/to/local/query, your-org/your-repo/queries@main - if: | - matrix.language == 'actions' || - matrix.language == 'python' && needs.selective-checks.outputs.needs-python-scans == 'true' || - matrix.language == 'javascript' && needs.selective-checks.outputs.needs-javascript-scans == 'true' - # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). - # If this step fails, then you should remove it and run the build manually (see below) - name: Autobuild uses: github/codeql-action/autobuild@v3 - if: | - matrix.language == 'actions' || - matrix.language == 'python' && needs.selective-checks.outputs.needs-python-scans == 'true' || - matrix.language == 'javascript' && needs.selective-checks.outputs.needs-javascript-scans == 'true' - name: Perform CodeQL Analysis uses: github/codeql-action/analyze@v3 - if: | - matrix.language == 'actions' || - matrix.language == 'python' && needs.selective-checks.outputs.needs-python-scans == 'true' || - matrix.language == 'javascript' && needs.selective-checks.outputs.needs-javascript-scans == 'true' From 52f89dd5bc471b1702e7e5e67b90420e27d51d20 Mon Sep 17 00:00:00 2001 From: Pierre Jeambrun Date: Fri, 10 Jan 2025 18:48:22 +0800 Subject: [PATCH 03/24] AIP-84 Clear Task Instance improve response (#45514) --- .../core_api/datamodels/task_instances.py | 15 ------- .../core_api/openapi/v1-generated.yaml | 36 +--------------- .../core_api/routes/public/task_instances.py | 9 ++-- airflow/ui/openapi-gen/queries/queries.ts | 2 +- .../ui/openapi-gen/requests/schemas.gen.ts | 41 ------------------- .../ui/openapi-gen/requests/services.gen.ts | 2 +- airflow/ui/openapi-gen/requests/types.gen.ts | 21 +--------- .../routes/public/test_task_instances.py | 41 ++++++++++++++++--- 8 files changed, 45 insertions(+), 122 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/task_instances.py b/airflow/api_fastapi/core_api/datamodels/task_instances.py index 07a1f77ad5421..3d191b96828a8 100644 --- a/airflow/api_fastapi/core_api/datamodels/task_instances.py +++ b/airflow/api_fastapi/core_api/datamodels/task_instances.py @@ -220,18 +220,3 @@ def validate_new_state(cls, ns: str | None) -> str: if ns not in valid_states: raise ValueError(f"'{ns}' is not one of {valid_states}") return ns - - -class TaskInstanceReferenceResponse(BaseModel): - """Task Instance Reference serializer for responses.""" - - task_id: str - dag_run_id: str = Field(validation_alias="run_id") - dag_id: str - - -class TaskInstanceReferenceCollectionResponse(BaseModel): - """Task Instance Reference collection serializer for responses.""" - - task_instances: list[TaskInstanceReferenceResponse] - total_entries: int diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index bb4861a5fc3c7..5d06e7ab28e1d 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -5506,7 +5506,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/TaskInstanceReferenceCollectionResponse' + $ref: '#/components/schemas/TaskInstanceCollectionResponse' '401': content: application/json: @@ -9063,40 +9063,6 @@ components: - executor_config title: TaskInstanceHistoryResponse description: TaskInstanceHistory serializer for responses. - TaskInstanceReferenceCollectionResponse: - properties: - task_instances: - items: - $ref: '#/components/schemas/TaskInstanceReferenceResponse' - type: array - title: Task Instances - total_entries: - type: integer - title: Total Entries - type: object - required: - - task_instances - - total_entries - title: TaskInstanceReferenceCollectionResponse - description: Task Instance Reference collection serializer for responses. - TaskInstanceReferenceResponse: - properties: - task_id: - type: string - title: Task Id - dag_run_id: - type: string - title: Dag Run Id - dag_id: - type: string - title: Dag Id - type: object - required: - - task_id - - dag_run_id - - dag_id - title: TaskInstanceReferenceResponse - description: Task Instance Reference serializer for responses. TaskInstanceResponse: properties: id: diff --git a/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow/api_fastapi/core_api/routes/public/task_instances.py index 9eaf191374746..91e4cb0ddcc2a 100644 --- a/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -55,8 +55,6 @@ TaskInstanceCollectionResponse, TaskInstanceHistoryCollectionResponse, TaskInstanceHistoryResponse, - TaskInstanceReferenceCollectionResponse, - TaskInstanceReferenceResponse, TaskInstanceResponse, TaskInstancesBatchBody, ) @@ -550,7 +548,7 @@ def post_clear_task_instances( request: Request, body: ClearTaskInstancesBody, session: SessionDep, -) -> TaskInstanceReferenceCollectionResponse: +) -> TaskInstanceCollectionResponse: """Clear task instances.""" dag = request.app.state.dag_bag.get_dag(dag_id) if not dag: @@ -597,6 +595,7 @@ def post_clear_task_instances( dry_run=True, task_ids=task_ids, dag_bag=request.app.state.dag_bag, + session=session, **body.model_dump( include={ "start_date", @@ -615,9 +614,9 @@ def post_clear_task_instances( DagRunState.QUEUED if reset_dag_runs else False, ) - return TaskInstanceReferenceCollectionResponse( + return TaskInstanceCollectionResponse( task_instances=[ - TaskInstanceReferenceResponse.model_validate( + TaskInstanceResponse.model_validate( ti, from_attributes=True, ) diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index e5d6d08240f02..9a816460704d5 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -2980,7 +2980,7 @@ export const useTaskInstanceServiceGetTaskInstancesBatch = < * @param data The data for the request. * @param data.dagId * @param data.requestBody - * @returns TaskInstanceReferenceCollectionResponse Successful Response + * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ export const useTaskInstanceServicePostClearTaskInstances = < diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 9d59d4fd59a61..64a8fcc9e812c 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -4303,47 +4303,6 @@ export const $TaskInstanceHistoryResponse = { description: "TaskInstanceHistory serializer for responses.", } as const; -export const $TaskInstanceReferenceCollectionResponse = { - properties: { - task_instances: { - items: { - $ref: "#/components/schemas/TaskInstanceReferenceResponse", - }, - type: "array", - title: "Task Instances", - }, - total_entries: { - type: "integer", - title: "Total Entries", - }, - }, - type: "object", - required: ["task_instances", "total_entries"], - title: "TaskInstanceReferenceCollectionResponse", - description: "Task Instance Reference collection serializer for responses.", -} as const; - -export const $TaskInstanceReferenceResponse = { - properties: { - task_id: { - type: "string", - title: "Task Id", - }, - dag_run_id: { - type: "string", - title: "Dag Run Id", - }, - dag_id: { - type: "string", - title: "Dag Id", - }, - }, - type: "object", - required: ["task_id", "dag_run_id", "dag_id"], - title: "TaskInstanceReferenceResponse", - description: "Task Instance Reference serializer for responses.", -} as const; - export const $TaskInstanceResponse = { properties: { id: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index ee8b5ab70c202..01666ae090423 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -2394,7 +2394,7 @@ export class TaskInstanceService { * @param data The data for the request. * @param data.dagId * @param data.requestBody - * @returns TaskInstanceReferenceCollectionResponse Successful Response + * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ public static postClearTaskInstances( diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index f9dad9f4edad5..232e66d246446 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1050,23 +1050,6 @@ export type TaskInstanceHistoryResponse = { executor_config: string; }; -/** - * Task Instance Reference collection serializer for responses. - */ -export type TaskInstanceReferenceCollectionResponse = { - task_instances: Array; - total_entries: number; -}; - -/** - * Task Instance Reference serializer for responses. - */ -export type TaskInstanceReferenceResponse = { - task_id: string; - dag_run_id: string; - dag_id: string; -}; - /** * TaskInstance serializer for responses. */ @@ -1984,7 +1967,7 @@ export type PostClearTaskInstancesData = { requestBody: ClearTaskInstancesBody; }; -export type PostClearTaskInstancesResponse = TaskInstanceReferenceCollectionResponse; +export type PostClearTaskInstancesResponse = TaskInstanceCollectionResponse; export type GetLogData = { accept?: "application/json" | "text/plain" | "*/*"; @@ -4021,7 +4004,7 @@ export type $OpenApiTs = { /** * Successful Response */ - 200: TaskInstanceReferenceCollectionResponse; + 200: TaskInstanceCollectionResponse; /** * Unauthorized */ diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py b/tests/api_fastapi/core_api/routes/public/test_task_instances.py index d62d37944348c..eb07ae8bab6f5 100644 --- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py @@ -1878,7 +1878,7 @@ def test_clear_taskinstance_is_called_with_queued_dr_state(self, mock_clearti, t ) assert response.status_code == 200 - # dag_id (3rd argument) is a different session object. Manually asserting that the dag_id + # dag (3rd argument) is a different session object. Manually asserting that the dag_id # is the same. mock_clearti.assert_called_once_with([], mock.ANY, mock.ANY, DagRunState.QUEUED) assert mock_clearti.call_args[0][2].dag_id == dag_id @@ -1982,7 +1982,9 @@ def test_should_respond_200_with_reset_dag_run(self, test_client, session): }, ] for task_instance in expected_response: - assert task_instance in response.json()["task_instances"] + assert task_instance in [ + {key: ti[key] for key in task_instance.keys()} for ti in response.json()["task_instances"] + ] assert response.json()["total_entries"] == 6 assert failed_dag_runs == 0 @@ -2037,6 +2039,32 @@ def test_should_respond_200_with_dag_run_id(self, test_client, session): "dag_id": "example_python_operator", "dag_run_id": "TEST_DAG_RUN_ID_0", "task_id": "print_the_context", + "duration": mock.ANY, + "end_date": mock.ANY, + "executor": None, + "executor_config": "{}", + "hostname": "", + "id": mock.ANY, + "logical_date": "2020-01-01T00:00:00Z", + "map_index": -1, + "max_tries": 0, + "note": "placeholder-note", + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "rendered_fields": {}, + "rendered_map_index": None, + "start_date": "2020-01-02T00:00:00Z", + "state": "restarting", + "task_display_name": "print_the_context", + "trigger": None, + "triggerer_job": None, + "try_number": 0, + "unixname": mock.ANY, }, ] assert response.json()["task_instances"] == expected_response @@ -2121,7 +2149,9 @@ def test_should_respond_200_with_include_past(self, test_client, session): }, ] for task_instance in expected_response: - assert task_instance in response.json()["task_instances"] + assert task_instance in [ + {key: ti[key] for key in task_instance.keys()} for ti in response.json()["task_instances"] + ] assert response.json()["total_entries"] == 6 def test_should_respond_200_with_include_future(self, test_client, session): @@ -2204,7 +2234,9 @@ def test_should_respond_200_with_include_future(self, test_client, session): }, ] for task_instance in expected_response: - assert task_instance in response.json()["task_instances"] + assert task_instance in [ + {key: ti[key] for key in task_instance.keys()} for ti in response.json()["task_instances"] + ] assert response.json()["total_entries"] == 6 def test_should_respond_404_for_nonexistent_dagrun_id(self, test_client, session): @@ -2339,7 +2371,6 @@ def test_should_respond_200(self, test_client, session): self.create_task_instances( session=session, task_instances=[{"state": State.SUCCESS}], with_ti_history=True ) - print("here") response = test_client.get( "/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries" ) From c86d120369c43b67c6e78b56474df7e80ff722b5 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Fri, 10 Jan 2025 12:41:53 +0100 Subject: [PATCH 04/24] Add explicit permissions for all workflow-run workflows (#45548) Those workflows inherit permissions from the calling workflows but it's good to add explicit permissions to indicate what is needed and in case we will also use the workflows for other purposes in the future - default permissions for older repos might be write so it's best to be explicit about the permissions. Found by CodeQL scanning --- .github/workflows/additional-ci-image-checks.yml | 2 ++ .github/workflows/additional-prod-image-tests.yml | 2 ++ .github/workflows/automatic-backport.yml | 3 ++- .github/workflows/backport-cli.yml | 3 +++ .github/workflows/basic-tests.yml | 2 ++ .github/workflows/ci-image-build.yml | 2 ++ .github/workflows/ci-image-checks.yml | 3 ++- .github/workflows/finalize-tests.yml | 2 ++ .github/workflows/generate-constraints.yml | 6 ++++++ .github/workflows/helm-tests.yml | 2 ++ .github/workflows/integration-system-tests.yml | 2 ++ .github/workflows/k8s-tests.yml | 2 ++ .github/workflows/news-fragment.yml | 3 ++- .github/workflows/prod-image-build.yml | 3 ++- .github/workflows/prod-image-extra-checks.yml | 2 ++ .github/workflows/push-image-cache.yml | 2 ++ .github/workflows/run-unit-tests.yml | 2 ++ .github/workflows/special-tests.yml | 3 ++- .github/workflows/task-sdk-tests.yml | 3 ++- .github/workflows/test-provider-packages.yml | 2 ++ 20 files changed, 45 insertions(+), 6 deletions(-) diff --git a/.github/workflows/additional-ci-image-checks.yml b/.github/workflows/additional-ci-image-checks.yml index 56cee1697620c..a6b7bdafcb5af 100644 --- a/.github/workflows/additional-ci-image-checks.yml +++ b/.github/workflows/additional-ci-image-checks.yml @@ -84,6 +84,8 @@ on: # yamllint disable-line rule:truthy description: "Whether to use uv to build the image (true/false)" required: true type: string +permissions: + contents: read jobs: # Push early BuildX cache to GitHub Registry in Apache repository, This cache does not wait for all the # tests to complete - it is run very early in the build process for "main" merges in order to refresh diff --git a/.github/workflows/additional-prod-image-tests.yml b/.github/workflows/additional-prod-image-tests.yml index bca5e3a592713..7b55121571471 100644 --- a/.github/workflows/additional-prod-image-tests.yml +++ b/.github/workflows/additional-prod-image-tests.yml @@ -60,6 +60,8 @@ on: # yamllint disable-line rule:truthy description: "Whether to use uv" required: true type: string +permissions: + contents: read jobs: prod-image-extra-checks-main: name: PROD image extra checks (main) diff --git a/.github/workflows/automatic-backport.yml b/.github/workflows/automatic-backport.yml index b5b22b7491a9c..4c72401a5d317 100644 --- a/.github/workflows/automatic-backport.yml +++ b/.github/workflows/automatic-backport.yml @@ -21,7 +21,8 @@ on: # yamllint disable-line rule:truthy push: branches: - main - +permissions: + contents: read jobs: get-pr-info: name: "Get PR information" diff --git a/.github/workflows/backport-cli.yml b/.github/workflows/backport-cli.yml index 3706cd65bb01e..53243006137a6 100644 --- a/.github/workflows/backport-cli.yml +++ b/.github/workflows/backport-cli.yml @@ -41,6 +41,9 @@ on: # yamllint disable-line rule:truthy type: string permissions: + # Those permissions are only active for workflow dispatch (only committers can trigger it) and workflow call + # Which is triggered automatically by "automatic-backport" push workflow (only when merging by committer) + # Branch protection prevents from pushing to the "code" branches contents: write pull-requests: write jobs: diff --git a/.github/workflows/basic-tests.yml b/.github/workflows/basic-tests.yml index da803aee31904..847eec3b4ee59 100644 --- a/.github/workflows/basic-tests.yml +++ b/.github/workflows/basic-tests.yml @@ -60,6 +60,8 @@ on: # yamllint disable-line rule:truthy description: "Whether to use uv in the image" required: true type: string +permissions: + contents: read jobs: run-breeze-tests: timeout-minutes: 10 diff --git a/.github/workflows/ci-image-build.yml b/.github/workflows/ci-image-build.yml index d15c297d82a00..55bf4e046e23f 100644 --- a/.github/workflows/ci-image-build.yml +++ b/.github/workflows/ci-image-build.yml @@ -96,6 +96,8 @@ on: # yamllint disable-line rule:truthy description: "Disable airflow repo cache read from main." required: true type: string +permissions: + contents: read jobs: build-ci-images: strategy: diff --git a/.github/workflows/ci-image-checks.yml b/.github/workflows/ci-image-checks.yml index 21c857e7bd710..c6784042cec2c 100644 --- a/.github/workflows/ci-image-checks.yml +++ b/.github/workflows/ci-image-checks.yml @@ -108,7 +108,8 @@ on: # yamllint disable-line rule:truthy description: "Whether to use uv to build the image (true/false)" required: true type: string - +permissions: + contents: read jobs: install-pre-commit: timeout-minutes: 5 diff --git a/.github/workflows/finalize-tests.yml b/.github/workflows/finalize-tests.yml index 1d0ac8a600c1d..ac13089caf656 100644 --- a/.github/workflows/finalize-tests.yml +++ b/.github/workflows/finalize-tests.yml @@ -76,6 +76,8 @@ on: # yamllint disable-line rule:truthy description: "Whether to debug resources or not (true/false)" required: true type: string +permissions: + contents: read jobs: update-constraints: runs-on: ${{ fromJSON(inputs.runs-on-as-json-public) }} diff --git a/.github/workflows/generate-constraints.yml b/.github/workflows/generate-constraints.yml index 740310e1cc09b..19592dae295c5 100644 --- a/.github/workflows/generate-constraints.yml +++ b/.github/workflows/generate-constraints.yml @@ -44,6 +44,12 @@ on: # yamllint disable-line rule:truthy description: "Whether to use uvloop (true/false)" required: true type: string +permissions: + # This permission is only active for "canary" builds and PRs from the main repo + # All fork PRs are not allowed to have write permissions and this one is automatically downgraded to read + # Branch protection also prevents from pushing to the "code" branches so we can safely use this one to + # Push constraints to "constraints" branches which are non-code branches and are not protected + contents: write jobs: generate-constraints: permissions: diff --git a/.github/workflows/helm-tests.yml b/.github/workflows/helm-tests.yml index 9dc300c61c0a1..1b4aa19cbe595 100644 --- a/.github/workflows/helm-tests.yml +++ b/.github/workflows/helm-tests.yml @@ -40,6 +40,8 @@ on: # yamllint disable-line rule:truthy description: "Whether to use uvloop (true/false)" required: true type: string +permissions: + contents: read jobs: tests-helm: timeout-minutes: 80 diff --git a/.github/workflows/integration-system-tests.yml b/.github/workflows/integration-system-tests.yml index f992b726e30df..7c3916d9d19c9 100644 --- a/.github/workflows/integration-system-tests.yml +++ b/.github/workflows/integration-system-tests.yml @@ -64,6 +64,8 @@ on: # yamllint disable-line rule:truthy description: "Whether to use uv" required: true type: string +permissions: + contents: read jobs: tests-core-integration: timeout-minutes: 130 diff --git a/.github/workflows/k8s-tests.yml b/.github/workflows/k8s-tests.yml index 6f867af65e9cd..40f73e3c59c66 100644 --- a/.github/workflows/k8s-tests.yml +++ b/.github/workflows/k8s-tests.yml @@ -48,6 +48,8 @@ on: # yamllint disable-line rule:truthy description: "Whether to debug resources" required: true type: string +permissions: + contents: read jobs: tests-kubernetes: timeout-minutes: 60 diff --git a/.github/workflows/news-fragment.yml b/.github/workflows/news-fragment.yml index 73e58a0193711..46cb294d7a5b9 100644 --- a/.github/workflows/news-fragment.yml +++ b/.github/workflows/news-fragment.yml @@ -21,7 +21,8 @@ name: CI on: # yamllint disable-line rule:truthy pull_request: types: [labeled, unlabeled, opened, reopened, synchronize] - +permissions: + contents: read jobs: check-news-fragment: name: Check News Fragment diff --git a/.github/workflows/prod-image-build.yml b/.github/workflows/prod-image-build.yml index d90d1910f9336..85b421cade447 100644 --- a/.github/workflows/prod-image-build.yml +++ b/.github/workflows/prod-image-build.yml @@ -116,8 +116,9 @@ on: # yamllint disable-line rule:truthy description: "Whether this is a prod-image build (true/false)" required: true type: string +permissions: + contents: read jobs: - build-prod-packages: name: "Build Airflow and provider packages" timeout-minutes: 10 diff --git a/.github/workflows/prod-image-extra-checks.yml b/.github/workflows/prod-image-extra-checks.yml index f5a4b771436a7..56fa4b2b1a28d 100644 --- a/.github/workflows/prod-image-extra-checks.yml +++ b/.github/workflows/prod-image-extra-checks.yml @@ -64,6 +64,8 @@ on: # yamllint disable-line rule:truthy description: "Disable airflow repo cache read from main." required: true type: string +permissions: + contents: read jobs: myssql-client-image: uses: ./.github/workflows/prod-image-build.yml diff --git a/.github/workflows/push-image-cache.yml b/.github/workflows/push-image-cache.yml index b1c9d12754206..86ec3b2a85a86 100644 --- a/.github/workflows/push-image-cache.yml +++ b/.github/workflows/push-image-cache.yml @@ -80,6 +80,8 @@ on: # yamllint disable-line rule:truthy description: "Disable airflow repo cache read from main." required: true type: string +permissions: + contents: read jobs: push-ci-image-cache: name: "Push CI ${{ inputs.cache-type }}:${{ matrix.python }} image cache " diff --git a/.github/workflows/run-unit-tests.yml b/.github/workflows/run-unit-tests.yml index 1c24e659d0979..e67d59ee08d37 100644 --- a/.github/workflows/run-unit-tests.yml +++ b/.github/workflows/run-unit-tests.yml @@ -116,6 +116,8 @@ on: # yamllint disable-line rule:truthy description: "Whether to use uv" required: true type: string +permissions: + contents: read jobs: tests: timeout-minutes: 120 diff --git a/.github/workflows/special-tests.yml b/.github/workflows/special-tests.yml index 36ccbf871cca9..8507294e535c6 100644 --- a/.github/workflows/special-tests.yml +++ b/.github/workflows/special-tests.yml @@ -80,7 +80,8 @@ on: # yamllint disable-line rule:truthy description: "Whether to use uv or not (true/false)" required: true type: string - +permissions: + contents: read jobs: tests-min-sqlalchemy: name: "Min SQLAlchemy test" diff --git a/.github/workflows/task-sdk-tests.yml b/.github/workflows/task-sdk-tests.yml index 501e880fd3be0..b8ecf0eb798c6 100644 --- a/.github/workflows/task-sdk-tests.yml +++ b/.github/workflows/task-sdk-tests.yml @@ -44,7 +44,8 @@ on: # yamllint disable-line rule:truthy description: "Whether this is a canary run (true/false)" required: true type: string - +permissions: + contents: read jobs: task-sdk-tests: timeout-minutes: 80 diff --git a/.github/workflows/test-provider-packages.yml b/.github/workflows/test-provider-packages.yml index 877ff1f1b23c9..b0912fa6dfe37 100644 --- a/.github/workflows/test-provider-packages.yml +++ b/.github/workflows/test-provider-packages.yml @@ -62,6 +62,8 @@ on: # yamllint disable-line rule:truthy description: "Whether to use uv" required: true type: string +permissions: + contents: read jobs: prepare-install-verify-provider-packages: timeout-minutes: 80 From e8f3be8f23cda7873d1988f3e445a4abe6fd65ca Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Fri, 10 Jan 2025 18:27:34 +0530 Subject: [PATCH 05/24] AIP-72: Supporting Pulling multiple XCOM values (#45509) Co-Authored-By: Kaxil Naik closes: https://github.com/apache/airflow/issues/45243 --- .../airflow/sdk/execution_time/task_runner.py | 56 +++++++++++-------- .../tests/execution_time/test_task_runner.py | 38 ++++++++----- 2 files changed, 56 insertions(+), 38 deletions(-) diff --git a/task_sdk/src/airflow/sdk/execution_time/task_runner.py b/task_sdk/src/airflow/sdk/execution_time/task_runner.py index 4b7b50c5ed621..e63252efa9552 100644 --- a/task_sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task_sdk/src/airflow/sdk/execution_time/task_runner.py @@ -165,7 +165,7 @@ def render_templates( def xcom_pull( self, - task_ids: str | Iterable[str] | None = None, # TODO: Simplify to a single task_id? (breaking change) + task_ids: str | Iterable[str] | None = None, dag_id: str | None = None, key: str = "return_value", # TODO: Make this a constant (``XCOM_RETURN_KEY``) include_prior_dates: bool = False, # TODO: Add support for this @@ -213,11 +213,10 @@ def xcom_pull( run_id = self.run_id if task_ids is None: + # default to the current task if not provided task_ids = self.task_id - elif not isinstance(task_ids, str) and isinstance(task_ids, Iterable): - # TODO: Handle multiple task_ids or remove support - raise NotImplementedError("Multiple task_ids are not supported yet") - + elif isinstance(task_ids, str): + task_ids = [task_ids] if map_indexes is None: map_indexes = self.map_index elif isinstance(map_indexes, Iterable): @@ -225,28 +224,37 @@ def xcom_pull( raise NotImplementedError("Multiple map_indexes are not supported yet") log = structlog.get_logger(logger_name="task") - SUPERVISOR_COMMS.send_request( - log=log, - msg=GetXCom( - key=key, - dag_id=dag_id, - task_id=task_ids, - run_id=run_id, - map_index=map_indexes, - ), - ) - msg = SUPERVISOR_COMMS.get_message() - if TYPE_CHECKING: - assert isinstance(msg, XComResult) + xcoms = [] + for t in task_ids: + SUPERVISOR_COMMS.send_request( + log=log, + msg=GetXCom( + key=key, + dag_id=dag_id, + task_id=t, + run_id=run_id, + map_index=map_indexes, + ), + ) + + msg = SUPERVISOR_COMMS.get_message() + if not isinstance(msg, XComResult): + raise TypeError(f"Expected XComResult, received: {type(msg)} {msg}") + + if msg.value is not None: + from airflow.models.xcom import XCom - if msg.value is not None: - from airflow.models.xcom import XCom + # TODO: Move XCom serialization & deserialization to Task SDK + # https://github.com/apache/airflow/issues/45231 + xcom = XCom.deserialize_value(msg) # type: ignore[arg-type] + xcoms.append(xcom) + else: + xcoms.append(default) - # TODO: Move XCom serialization & deserialization to Task SDK - # https://github.com/apache/airflow/issues/45231 - return XCom.deserialize_value(msg) # type: ignore[arg-type] - return default + if len(xcoms) == 1: + return xcoms[0] + return xcoms def xcom_push(self, key: str, value: Any): """ diff --git a/task_sdk/tests/execution_time/test_task_runner.py b/task_sdk/tests/execution_time/test_task_runner.py index e93287064f30d..3dc06379cdffa 100644 --- a/task_sdk/tests/execution_time/test_task_runner.py +++ b/task_sdk/tests/execution_time/test_task_runner.py @@ -735,14 +735,20 @@ def test_get_variable_from_context( assert var_from_context == Variable(key="test_key", value=expected_value) - def test_xcom_pull(self, create_runtime_ti, mock_supervisor_comms, spy_agency): + @pytest.mark.parametrize( + "task_ids", + [ + "push_task", + ["push_task1", "push_task2"], + {"push_task1", "push_task2"}, + ], + ) + def test_xcom_pull(self, create_runtime_ti, mock_supervisor_comms, spy_agency, task_ids): """Test that a task pulls the expected XCom value if it exists.""" - task_id = "push_task" - class CustomOperator(BaseOperator): def execute(self, context): - value = context["ti"].xcom_pull(task_ids=task_id, key="key") + value = context["ti"].xcom_pull(task_ids=task_ids, key="key") print(f"Pulled XCom Value: {value}") task = CustomOperator(task_id="pull_task") @@ -755,16 +761,20 @@ def execute(self, context): run(runtime_ti, log=mock.MagicMock()) - mock_supervisor_comms.send_request.assert_any_call( - log=mock.ANY, - msg=GetXCom( - key="key", - dag_id="test_dag", - run_id="test_run", - task_id=task_id, - map_index=None, - ), - ) + if isinstance(task_ids, str): + task_ids = [task_ids] + + for task_id in task_ids: + mock_supervisor_comms.send_request.assert_any_call( + log=mock.ANY, + msg=GetXCom( + key="key", + dag_id="test_dag", + run_id="test_run", + task_id=task_id, + map_index=None, + ), + ) class TestXComAfterTaskExecution: From fcef600a7e60bcd625977cbc4e4546a7c4f799b0 Mon Sep 17 00:00:00 2001 From: Park Jiwon <57484954+david-parkk@users.noreply.github.com> Date: Fri, 10 Jan 2025 22:11:22 +0900 Subject: [PATCH 06/24] fix code indent in modified docker-compose.yaml for PyCharm (#45545) --- docs/apache-airflow/howto/docker-compose/index.rst | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/apache-airflow/howto/docker-compose/index.rst b/docs/apache-airflow/howto/docker-compose/index.rst index 96ec9b53e3a9f..65151c22e0656 100644 --- a/docs/apache-airflow/howto/docker-compose/index.rst +++ b/docs/apache-airflow/howto/docker-compose/index.rst @@ -373,13 +373,13 @@ Steps: .. code-block:: yaml airflow-python: - <<: *airflow-common - profiles: - - debug - environment: - <<: *airflow-common-env - user: "50000:0" - entrypoint: [ "/bin/bash", "-c" ] + <<: *airflow-common + profiles: + - debug + environment: + <<: *airflow-common-env + user: "50000:0" + entrypoint: [ "/bin/bash", "-c" ] .. note:: From 7dec7e8fa04ef663a21b39b030fd1e025e401652 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Salih=20G=C3=B6ktu=C4=9F=20K=C3=B6se?= <34631089+goktugkose@users.noreply.github.com> Date: Fri, 10 Jan 2025 16:12:46 +0300 Subject: [PATCH 07/24] Upgrade pgBouncer version to 1.24.0 (#45542) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit As of version 1.23.0, pgBouncer introduced support for rolling restarts. This feature allows users to seamlessly restart PgBouncer instances without disrupting active connections. For Airflow users managing PgBouncer on Kubernetes, this enhancement simplifies maintenance and upgrades, ensuring high availability and minimal downtime for critical workflows. Co-authored-by: Salih Göktuğ Köse --- chart/dockerfiles/pgbouncer/build_and_push.sh | 6 +++--- chart/values.schema.json | 2 +- chart/values.yaml | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/chart/dockerfiles/pgbouncer/build_and_push.sh b/chart/dockerfiles/pgbouncer/build_and_push.sh index cede5ab8b0ab3..e4345cc44d018 100755 --- a/chart/dockerfiles/pgbouncer/build_and_push.sh +++ b/chart/dockerfiles/pgbouncer/build_and_push.sh @@ -22,13 +22,13 @@ readonly DOCKERHUB_USER DOCKERHUB_REPO=${DOCKERHUB_REPO:="airflow"} readonly DOCKERHUB_REPO -PGBOUNCER_VERSION="1.22.1" +PGBOUNCER_VERSION="1.24.0" readonly PGBOUNCER_VERSION -PGBOUNCER_SHA256="2b018aa6ce7f592c9892bb9e0fd90262484eb73937fd2af929770a45373ba215" +PGBOUNCER_SHA256="e76adf941a3191a416e223c0b2cdbf73159eef80a2a32314af6fbd82e41a1d41" readonly PGBOUNCER_SHA256 -AIRFLOW_PGBOUNCER_VERSION="2024.09.19" +AIRFLOW_PGBOUNCER_VERSION="2025.01.10" readonly AIRFLOW_PGBOUNCER_VERSION COMMIT_SHA=$(git rev-parse HEAD) diff --git a/chart/values.schema.json b/chart/values.schema.json index fedae1557ab6b..e8c1d1f2cfe54 100644 --- a/chart/values.schema.json +++ b/chart/values.schema.json @@ -887,7 +887,7 @@ "tag": { "description": "The PgBouncer image tag.", "type": "string", - "default": "airflow-pgbouncer-2024.09.19-1.22.1" + "default": "airflow-pgbouncer-2025.01.10-1.24.0" }, "pullPolicy": { "description": "The PgBouncer image pull policy.", diff --git a/chart/values.yaml b/chart/values.yaml index c0764fb54a0d7..73e00ed4281d8 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -115,7 +115,7 @@ images: pullPolicy: IfNotPresent pgbouncer: repository: apache/airflow - tag: airflow-pgbouncer-2024.09.19-1.22.1 + tag: airflow-pgbouncer-2025.01.10-1.24.0 pullPolicy: IfNotPresent pgbouncerExporter: repository: apache/airflow From 32787abd5d1d08e033c38a2cd29a7c200f86d5a6 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Fri, 10 Jan 2025 16:24:21 +0100 Subject: [PATCH 08/24] Revert "Pin pnpm action to hash commit following best GH Action practices (#45547)" (#45557) This reverts commit 378e3581f31dfcc6dfa4661d07643ea07b2406bb. --- .github/workflows/basic-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/basic-tests.yml b/.github/workflows/basic-tests.yml index 847eec3b4ee59..5cb71cb7f5c1f 100644 --- a/.github/workflows/basic-tests.yml +++ b/.github/workflows/basic-tests.yml @@ -100,7 +100,7 @@ jobs: - name: "Cleanup docker" run: ./scripts/ci/cleanup_docker.sh - name: Setup pnpm - uses: pnpm/action-setup@fe02b34f77f8bc703788d5817da081398fad5dd2 # v4.0.0 + uses: pnpm/action-setup@v4.0.0 with: version: 9 run_install: false From ae32ebcc3c637902b8e62d549a02d537be76343c Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Fri, 10 Jan 2025 16:24:33 +0100 Subject: [PATCH 09/24] Remove contents: write permission from generate-constraints (#45558) The write permission cannot be set for PRs from forks in the call workflow - so we have to come back to implicit permissions and make explicit permissions passing a bit differently. --- .github/workflows/generate-constraints.yml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/.github/workflows/generate-constraints.yml b/.github/workflows/generate-constraints.yml index 19592dae295c5..740310e1cc09b 100644 --- a/.github/workflows/generate-constraints.yml +++ b/.github/workflows/generate-constraints.yml @@ -44,12 +44,6 @@ on: # yamllint disable-line rule:truthy description: "Whether to use uvloop (true/false)" required: true type: string -permissions: - # This permission is only active for "canary" builds and PRs from the main repo - # All fork PRs are not allowed to have write permissions and this one is automatically downgraded to read - # Branch protection also prevents from pushing to the "code" branches so we can safely use this one to - # Push constraints to "constraints" branches which are non-code branches and are not protected - contents: write jobs: generate-constraints: permissions: From db739d911437d2bf62ef321b9f43cd2921194e0b Mon Sep 17 00:00:00 2001 From: Karthikeyan Singaravelan Date: Fri, 10 Jan 2025 21:11:27 +0530 Subject: [PATCH 10/24] Don't display table/card header and pagination when no rows are present. (#45501) * Don't display table/card and pagination when no rows are present. * Refactor hasRows check to front. --- .../ui/src/components/DataTable/DataTable.tsx | 41 ++++++++++--------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/airflow/ui/src/components/DataTable/DataTable.tsx b/airflow/ui/src/components/DataTable/DataTable.tsx index 3a2086a040e96..4f02b57cb8a6e 100644 --- a/airflow/ui/src/components/DataTable/DataTable.tsx +++ b/airflow/ui/src/components/DataTable/DataTable.tsx @@ -115,33 +115,34 @@ export const DataTable = ({ const { rows } = table.getRowModel(); const display = displayMode === "card" && Boolean(cardDef) ? "card" : "table"; + const hasRows = rows.length > 0; return ( <> {errorMessage} - {display === "table" && } - {display === "card" && cardDef !== undefined && ( + {hasRows && display === "table" ? : undefined} + {hasRows && display === "card" && cardDef !== undefined ? ( - )} - {!Boolean(isLoading) && !rows.length && ( - {noRowsMessage ?? `No ${modelName}s found.`} - )} - table.setPageIndex(page.page - 1)} - page={table.getState().pagination.pageIndex + 1} - pageSize={table.getState().pagination.pageSize} - siblingCount={1} - > - - - - - - + ) : undefined} + {!hasRows && !Boolean(isLoading) && {noRowsMessage ?? `No ${modelName}s found.`}} + {hasRows ? ( + table.setPageIndex(page.page - 1)} + page={table.getState().pagination.pageIndex + 1} + pageSize={table.getState().pagination.pageSize} + siblingCount={1} + > + + + + + + + ) : undefined} ); }; From bae4bb1d549f20a54a2e8c27c57377a0207f393b Mon Sep 17 00:00:00 2001 From: luoyuliuyin Date: Fri, 10 Jan 2025 23:52:11 +0800 Subject: [PATCH 11/24] fix: log action get the correct request body (#45546) --- airflow/www/decorators.py | 2 +- .../endpoints/test_dag_run_endpoint.py | 56 +++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/airflow/www/decorators.py b/airflow/www/decorators.py index 0651820ce78d7..f373990a4c919 100644 --- a/airflow/www/decorators.py +++ b/airflow/www/decorators.py @@ -95,7 +95,7 @@ def wrapper(*args, **kwargs): user_display = get_auth_manager().get_user_display_name() isAPIRequest = request.blueprint == "/api/v1" - hasJsonBody = request.headers.get("content-type") == "application/json" and request.json + hasJsonBody = "application/json" in request.headers.get("content-type") and request.json fields_skip_logging = { "csrf_token", diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py index 4f322bb8f0a6c..4a0d18ff48cf9 100644 --- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import json import urllib from datetime import timedelta from unittest import mock @@ -24,6 +25,7 @@ import time_machine from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP +from airflow.models import Log from airflow.models.asset import AssetEvent, AssetModel from airflow.models.dag import DAG, DagModel from airflow.models.dagrun import DagRun @@ -1522,6 +1524,60 @@ def test_should_respond_200(self, state, run_type, dag_maker, session): assert response.status_code == 200 assert response.json == expected_response_json + @pytest.mark.parametrize("state", ["failed", "success", "queued"]) + @pytest.mark.parametrize("run_type", [state.value for state in DagRunType]) + def test_action_logging(self, state, run_type, dag_maker, session): + dag_id = "TEST_DAG_ID" + dag_run_id = "TEST_DAG_RUN_ID" + with dag_maker(dag_id) as dag: + task = EmptyOperator(task_id="task_id", dag=dag) + self.app.dag_bag.bag_dag(dag, root_dag=dag) + dr = dag_maker.create_dagrun(run_id=dag_run_id, run_type=run_type) + ti = dr.get_task_instance(task_id="task_id") + ti.task = task + ti.state = State.RUNNING + session.merge(ti) + session.commit() + + request_json = {"state": state} + + self.client.patch( + f"api/v1/dags/{dag_id}/dagRuns/{dag_run_id}", + json=request_json, + environ_overrides={"REMOTE_USER": "test"}, + ) + + log = ( + session.query(Log) + .filter( + Log.dag_id == dag_id, + Log.run_id == dag_run_id, + Log.event == "api.update_dag_run_state", + ) + .order_by(Log.id.desc()) + .first() + ) + assert log.extra == json.dumps(request_json) + + self.client.patch( + f"api/v1/dags/{dag_id}/dagRuns/{dag_run_id}", + json=request_json, + environ_overrides={"REMOTE_USER": "test"}, + headers={"content-type": "application/json; charset=utf-8"}, + ) + + log = ( + session.query(Log) + .filter( + Log.dag_id == dag_id, + Log.run_id == dag_run_id, + Log.event == "api.update_dag_run_state", + ) + .order_by(Log.id.desc()) + .first() + ) + assert log.extra == json.dumps(request_json) + def test_schema_validation_error_raises(self, dag_maker, session): dag_id = "TEST_DAG_ID" dag_run_id = "TEST_DAG_RUN_ID" From f3fd262de274b4fd1e26d36f9cead0a56775a309 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 10 Jan 2025 18:18:00 +0100 Subject: [PATCH 12/24] Bump trove-classifiers from 2025.1.7.14 to 2025.1.10.15 (#45561) Bumps [trove-classifiers](https://github.com/pypa/trove-classifiers) from 2025.1.7.14 to 2025.1.10.15. - [Release notes](https://github.com/pypa/trove-classifiers/releases) - [Commits](https://github.com/pypa/trove-classifiers/compare/2025.1.7.14...2025.1.10.15) --- updated-dependencies: - dependency-name: trove-classifiers dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d09cd1aa927ee..206923faae963 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ requires = [ "pluggy==1.5.0", "smmap==5.0.2", "tomli==2.2.1; python_version < '3.11'", - "trove-classifiers==2025.1.7.14", + "trove-classifiers==2025.1.10.15", ] build-backend = "hatchling.build" From ef79854f10fd5b0de95379efd78fa648dd589c3f Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Fri, 10 Jan 2025 12:43:20 -0700 Subject: [PATCH 13/24] Add a bundle for example dags when enabled (#45533) Once we start parsing from bundles, we will have a separate bundle to represent the example dags, instead of simply adding them to the list of files from the dags folder like we do today. --- airflow/dag_processing/bundles/manager.py | 17 +++++++++++++++++ .../bundles/test_dag_bundle_manager.py | 16 +++++++++++++++- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/airflow/dag_processing/bundles/manager.py b/airflow/dag_processing/bundles/manager.py index 0a17ab3f8c5f5..49cccc02849e8 100644 --- a/airflow/dag_processing/bundles/manager.py +++ b/airflow/dag_processing/bundles/manager.py @@ -64,6 +64,23 @@ def parse_config(self) -> None: "Bundle config is not a list. Check config value" " for section `dag_bundles` and key `backends`." ) + + # example dags! + if conf.getboolean("core", "LOAD_EXAMPLES"): + from airflow import example_dags + + example_dag_folder = next(iter(example_dags.__path__)) + backends.append( + { + "name": "example_dags", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": { + "local_folder": example_dag_folder, + "refresh_interval": conf.getint("scheduler", "dag_dir_list_interval"), + }, + } + ) + seen = set() for cfg in backends: name = cfg["name"] diff --git a/tests/dag_processing/bundles/test_dag_bundle_manager.py b/tests/dag_processing/bundles/test_dag_bundle_manager.py index 47b192efc198f..eee7dee211472 100644 --- a/tests/dag_processing/bundles/test_dag_bundle_manager.py +++ b/tests/dag_processing/bundles/test_dag_bundle_manager.py @@ -68,7 +68,9 @@ ) def test_parse_bundle_config(value, expected): """Test that bundle_configs are read from configuration.""" - envs = {"AIRFLOW__DAG_BUNDLES__BACKENDS": value} if value else {} + envs = {"AIRFLOW__CORE__LOAD_EXAMPLES": "False"} + if value: + envs["AIRFLOW__DAG_BUNDLES__BACKENDS"] = value cm = nullcontext() exp_fail = False if isinstance(expected, str): @@ -133,6 +135,7 @@ def clear_db(): @pytest.mark.db_test +@conf_vars({("core", "LOAD_EXAMPLES"): "False"}) def test_sync_bundles_to_db(clear_db): def _get_bundle_names_and_active(): with create_session() as session: @@ -167,3 +170,14 @@ def test_view_url(version): with patch.object(BaseDagBundle, "view_url") as view_url_mock: bundle_manager.view_url("my-test-bundle", version=version) view_url_mock.assert_called_once_with(version=version) + + +def test_example_dags_bundle_added(): + manager = DagBundlesManager() + manager.parse_config() + assert "example_dags" in manager._bundle_config + + with conf_vars({("core", "LOAD_EXAMPLES"): "False"}): + manager = DagBundlesManager() + manager.parse_config() + assert "example_dags" not in manager._bundle_config From 2f64780ca58d3f23c22d7ef24ab8520bebdd01b1 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Fri, 10 Jan 2025 14:37:48 -0700 Subject: [PATCH 14/24] Reserve the 'example_dags' bundle name (#45566) Users can enable example dags with config, so we will reserve the bundle name, as it's added automatically based on that config. --- airflow/dag_processing/bundles/manager.py | 6 ++++++ tests/dag_processing/bundles/test_dag_bundle_manager.py | 7 +++++++ 2 files changed, 13 insertions(+) diff --git a/airflow/dag_processing/bundles/manager.py b/airflow/dag_processing/bundles/manager.py index 49cccc02849e8..1ae751f8d3304 100644 --- a/airflow/dag_processing/bundles/manager.py +++ b/airflow/dag_processing/bundles/manager.py @@ -65,6 +65,12 @@ def parse_config(self) -> None: " for section `dag_bundles` and key `backends`." ) + if any(b["name"] == "example_dags" for b in backends): + raise AirflowConfigException( + "Bundle name 'example_dags' is a reserved name. Please choose another name for your bundle." + " Example DAGs can be enabled with the '[core] load_examples' config." + ) + # example dags! if conf.getboolean("core", "LOAD_EXAMPLES"): from airflow import example_dags diff --git a/tests/dag_processing/bundles/test_dag_bundle_manager.py b/tests/dag_processing/bundles/test_dag_bundle_manager.py index eee7dee211472..35cecdace6c1f 100644 --- a/tests/dag_processing/bundles/test_dag_bundle_manager.py +++ b/tests/dag_processing/bundles/test_dag_bundle_manager.py @@ -181,3 +181,10 @@ def test_example_dags_bundle_added(): manager = DagBundlesManager() manager.parse_config() assert "example_dags" not in manager._bundle_config + + +def test_example_dags_name_is_reserved(): + reserved_name_config = [{"name": "example_dags"}] + with conf_vars({("dag_bundles", "backends"): json.dumps(reserved_name_config)}): + with pytest.raises(AirflowConfigException, match="Bundle name 'example_dags' is a reserved name."): + DagBundlesManager().parse_config() From a467fc386d1e39af3210eed7cdc53b3d142fbde1 Mon Sep 17 00:00:00 2001 From: Shahar Epstein <60007259+shahar1@users.noreply.github.com> Date: Fri, 10 Jan 2025 23:38:13 +0200 Subject: [PATCH 15/24] Fix typo in README_RELEASE_AIRFLOW.md (#45568) --- dev/README_RELEASE_AIRFLOW.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/README_RELEASE_AIRFLOW.md b/dev/README_RELEASE_AIRFLOW.md index 261e463589c05..3fa033b4a5952 100644 --- a/dev/README_RELEASE_AIRFLOW.md +++ b/dev/README_RELEASE_AIRFLOW.md @@ -892,7 +892,7 @@ Documentation for providers can be found in the ``/docs/apache-airflow`` directo # and finally open a PR ``` -The `--run-in-parallell` switch allows to speed up SBOM generation significantly, but it might take a lot +The `--run-in-parallel` switch allows to speed up SBOM generation significantly, but it might take a lot of memory - if you are running into memory issues you can limit parallelism by setting `--parallelism N` where N is a number of parallel `cdxgen` servers that should be started. From 46304d8d7e5e0d47e3829ae51401e8a4b9bfc4ae Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Fri, 10 Jan 2025 13:42:00 -0800 Subject: [PATCH 16/24] Fix log_action decorator when content type is None (#45567) --- airflow/www/decorators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/www/decorators.py b/airflow/www/decorators.py index f373990a4c919..9a916da184c13 100644 --- a/airflow/www/decorators.py +++ b/airflow/www/decorators.py @@ -95,7 +95,7 @@ def wrapper(*args, **kwargs): user_display = get_auth_manager().get_user_display_name() isAPIRequest = request.blueprint == "/api/v1" - hasJsonBody = "application/json" in request.headers.get("content-type") and request.json + hasJsonBody = "application/json" in request.headers.get("content-type", "") and request.json fields_skip_logging = { "csrf_token", From a2d5a2543b7026a1e743ea825cd66b6b1ebc8e93 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Fri, 10 Jan 2025 22:51:43 +0000 Subject: [PATCH 17/24] Improve subclassing of TaskSDK's WatchedSubprocess (#45570) When I added the DagFileProcessorProcess I did some naughty things with subclassing and lying to the type checker, and this is now stopping us easily adding DAG bundles (because as it is structured right now we would have to change both parsing and execution at the same time, or make the type checker _even more unhappy_.) This more correctly separates the two classes -- essentially anything that used `self.client` couldn't have been called from a DagFileProcessorProcess (as client was always None for those instances). This PR fixes it by adding a new `ActivitySubprocess` which is the type used at Execution time (the one that always has the client) and the base behaviour kept in WatchedSubprocess. --- airflow/dag_processing/processor.py | 39 +-- .../airflow/sdk/execution_time/supervisor.py | 291 ++++++++++-------- .../tests/execution_time/test_supervisor.py | 30 +- tests/dag_processing/test_manager.py | 10 +- 4 files changed, 191 insertions(+), 179 deletions(-) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 8d48b5ab6aeb3..583b858b585a6 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -19,7 +19,6 @@ import os import sys import traceback -from collections.abc import Generator from typing import TYPE_CHECKING, Annotated, Callable, Literal, Union import attrs @@ -181,7 +180,7 @@ class DagFileParsingResult(BaseModel): ] -@attrs.define() +@attrs.define(kw_only=True) class DagFileProcessorProcess(WatchedSubprocess): """ Parses dags with Task SDK API. @@ -194,6 +193,7 @@ class DagFileProcessorProcess(WatchedSubprocess): """ parsing_result: DagFileParsingResult | None = None + decoder: TypeAdapter[ToParent] = TypeAdapter[ToParent](ToParent) @classmethod def start( # type: ignore[override] @@ -203,39 +203,30 @@ def start( # type: ignore[override] target: Callable[[], None] = _parse_file_entrypoint, **kwargs, ) -> Self: - return super().start(path, callbacks, target=target, client=None, **kwargs) # type:ignore[arg-type] + proc = super().start(target=target, **kwargs) + proc._on_child_started(callbacks, path) + return proc - def _on_child_started( # type: ignore[override] - self, callbacks: list[CallbackRequest], path: str | os.PathLike[str], child_comms_fd: int - ) -> None: + def _on_child_started(self, callbacks: list[CallbackRequest], path: str | os.PathLike[str]) -> None: msg = DagFileParseRequest( file=os.fspath(path), - requests_fd=child_comms_fd, + requests_fd=self._requests_fd, callback_requests=callbacks, ) self.stdin.write(msg.model_dump_json().encode() + b"\n") - def handle_requests(self, log: FilteringBoundLogger) -> Generator[None, bytes, None]: - # TODO: Make decoder an instance variable, then this can live in the base class - decoder = TypeAdapter[ToParent](ToParent) - - while True: - line = yield - - try: - msg = decoder.validate_json(line) - except Exception: - log.exception("Unable to decode message", line=line) - continue - - self._handle_request(msg, log) # type: ignore[arg-type] - def _handle_request(self, msg: ToParent, log: FilteringBoundLogger) -> None: # type: ignore[override] + # TODO: GetVariable etc -- parsing a dag can run top level code that asks for an Airflow Variable + resp = None if isinstance(msg, DagFileParsingResult): self.parsing_result = msg return - # GetVariable etc -- parsing a dag can run top level code that asks for an Airflow Variable - super()._handle_request(msg, log) + else: + log.error("Unhandled request", msg=msg) + return + + if resp: + self.stdin.write(resp + b"\n") @property def is_ready(self) -> bool: diff --git a/task_sdk/src/airflow/sdk/execution_time/supervisor.py b/task_sdk/src/airflow/sdk/execution_time/supervisor.py index c9cec771ea081..b4415e5728f33 100644 --- a/task_sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task_sdk/src/airflow/sdk/execution_time/supervisor.py @@ -83,7 +83,7 @@ from airflow.typing_compat import Self -__all__ = ["WatchedSubprocess", "supervise"] +__all__ = ["ActivitySubprocess", "WatchedSubprocess", "supervise"] log: FilteringBoundLogger = structlog.get_logger(logger_name="supervisor") @@ -284,44 +284,26 @@ def exit(n: int) -> NoReturn: exit(125) -@attrs.define() +@attrs.define(kw_only=True) class WatchedSubprocess: id: UUID - pid: int + pid: int stdin: BinaryIO """The handle connected to stdin of the child process""" - client: Client + decoder: TypeAdapter _process: psutil.Process + _requests_fd: int _num_open_sockets: int = 4 _exit_code: int | None = attrs.field(default=None, init=False) - _terminal_state: str | None = attrs.field(default=None, init=False) - _final_state: str | None = attrs.field(default=None, init=False) - - _last_successful_heartbeat: float = attrs.field(default=0, init=False) - _last_heartbeat_attempt: float = attrs.field(default=0, init=False) - - # After the failure of a heartbeat, we'll increment this counter. If it reaches `MAX_FAILED_HEARTBEATS`, we - # will kill the process. This is to handle temporary network issues etc. ensuring that the process - # does not hang around forever. - failed_heartbeats: int = attrs.field(default=0, init=False) - - # Maximum possible time (in seconds) that task will have for execution of auxiliary processes - # like listeners after task is complete. - # TODO: This should come from airflow.cfg: [core] task_success_overtime - TASK_OVERTIME_THRESHOLD: ClassVar[float] = 20.0 - _task_end_time_monotonic: float | None = attrs.field(default=None, init=False) selector: selectors.BaseSelector = attrs.field(factory=selectors.DefaultSelector) @classmethod def start( cls, - path: str | os.PathLike[str], - what: TaskInstance, - client: Client, target: Callable[[], None] = _subprocess_main, logger: FilteringBoundLogger | None = None, **constructor_kwargs, @@ -342,9 +324,7 @@ def start( # Python GC should delete these for us, but lets make double sure that we don't keep anything # around in the forked processes, especially things that might involve open files or sockets! - del path - del client - del what + del constructor_kwargs del logger # Run the child entrypoint @@ -357,11 +337,10 @@ def start( cls._close_unused_sockets(child_stdin, child_stdout, child_stderr, child_comms, child_logs) proc = cls( - id=constructor_kwargs.pop("id", None) or getattr(what, "id"), pid=pid, stdin=feed_stdin, process=psutil.Process(pid), - client=client, + requests_fd=requests_fd, **constructor_kwargs, ) @@ -374,9 +353,6 @@ def start( logs=read_logs, ) - # Tell the task process what it needs to do! - proc._on_child_started(what, path, requests_fd) - return proc def _register_pipe_readers( @@ -417,42 +393,28 @@ def _on_socket_closed(self): # We want to keep servicing this process until we've read up to EOF from all the sockets. self._num_open_sockets -= 1 + def handle_requests(self, log: FilteringBoundLogger) -> Generator[None, bytes, None]: + """Handle incoming requests from the task process, respond with the appropriate data.""" + while True: + line = yield + + try: + msg = self.decoder.validate_json(line) + except Exception: + log.exception("Unable to decode message", line=line) + continue + + self._handle_request(msg, log) + + def _handle_request(self, msg, log: FilteringBoundLogger) -> None: + raise NotImplementedError() + @staticmethod def _close_unused_sockets(*sockets): """Close unused ends of sockets after fork.""" for sock in sockets: sock.close() - def _on_child_started(self, ti: TaskInstance, path: str | os.PathLike[str], requests_fd: int): - """Send startup message to the subprocess.""" - try: - # We've forked, but the task won't start doing anything until we send it the StartupDetails - # message. But before we do that, we need to tell the server it's started (so it has the chance to - # tell us "no, stop!" for any reason) - ti_context = self.client.task_instances.start(ti.id, self.pid, datetime.now(tz=timezone.utc)) - self._last_successful_heartbeat = time.monotonic() - except Exception: - # On any error kill that subprocess! - self.kill(signal.SIGKILL) - raise - - msg = StartupDetails.model_construct( - ti=ti, - file=os.fspath(path), - requests_fd=requests_fd, - ti_context=ti_context, - ) - - # Send the message to tell the process what it needs to execute - log.debug("Sending", msg=msg) - - try: - self.stdin.write(msg.model_dump_json().encode()) - self.stdin.write(b"\n") - except BrokenPipeError: - # Debug is fine, the process will have shown _something_ in it's last_chance exception handler - log.debug("Couldn't send startup message to Subprocess - it died very early", pid=self.pid) - def kill( self, signal_to_send: signal.Signals = signal.SIGINT, @@ -510,6 +472,137 @@ def kill( log.error("Failed to terminate process after full escalation", pid=self.pid) + def wait(self) -> int: + raise NotImplementedError() + + def __rich_repr__(self): + yield "id", self.id + yield "pid", self.pid + # only include this if it's not the default (third argument) + yield "exit_code", self._exit_code, None + + __rich_repr__.angular = True # type: ignore[attr-defined] + + def __repr__(self) -> str: + rep = f"<{type(self).__name__} id={self.id} pid={self.pid}" + if self._exit_code is not None: + rep += f" exit_code={self._exit_code}" + return rep + " >" + + def _service_subprocess(self, max_wait_time: float, raise_on_timeout: bool = False): + """ + Service subprocess events by processing socket activity and checking for process exit. + + This method: + - Waits for activity on the registered file objects (via `self.selector.select`). + - Processes any events triggered on these file objects. + - Checks if the subprocess has exited during the wait. + + :param max_wait_time: Maximum time to block while waiting for events, in seconds. + :param raise_on_timeout: If True, raise an exception if the subprocess does not exit within the timeout. + :returns: The process exit code, or None if it's still alive + """ + events = self.selector.select(timeout=max_wait_time) + for key, _ in events: + # Retrieve the handler responsible for processing this file object (e.g., stdout, stderr) + socket_handler = key.data + + # Example of handler behavior: + # If the subprocess writes "Hello, World!" to stdout: + # - `socket_handler` reads and processes the message. + # - If EOF is reached, the handler returns False to signal no more reads are expected. + need_more = socket_handler(key.fileobj) + + # If the handler signals that the file object is no longer needed (EOF, closed, etc.) + # unregister it from the selector to stop monitoring; `wait()` blocks until all selectors + # are removed. + if not need_more: + self.selector.unregister(key.fileobj) + key.fileobj.close() # type: ignore[union-attr] + + # Check if the subprocess has exited + return self._check_subprocess_exit(raise_on_timeout=raise_on_timeout) + + def _check_subprocess_exit(self, raise_on_timeout: bool = False) -> int | None: + """Check if the subprocess has exited.""" + if self._exit_code is None: + try: + self._exit_code = self._process.wait(timeout=0) + log.debug("Workload process exited", exit_code=self._exit_code) + except psutil.TimeoutExpired: + if raise_on_timeout: + raise + return self._exit_code + + +@attrs.define(kw_only=True) +class ActivitySubprocess(WatchedSubprocess): + client: Client + + _terminal_state: str | None = attrs.field(default=None, init=False) + _final_state: str | None = attrs.field(default=None, init=False) + + _last_successful_heartbeat: float = attrs.field(default=0, init=False) + _last_heartbeat_attempt: float = attrs.field(default=0, init=False) + + # After the failure of a heartbeat, we'll increment this counter. If it reaches `MAX_FAILED_HEARTBEATS`, we + # will kill the process. This is to handle temporary network issues etc. ensuring that the process + # does not hang around forever. + failed_heartbeats: int = attrs.field(default=0, init=False) + + # Maximum possible time (in seconds) that task will have for execution of auxiliary processes + # like listeners after task is complete. + # TODO: This should come from airflow.cfg: [core] task_success_overtime + TASK_OVERTIME_THRESHOLD: ClassVar[float] = 20.0 + _task_end_time_monotonic: float | None = attrs.field(default=None, init=False) + + decoder: TypeAdapter[ToSupervisor] = TypeAdapter(ToSupervisor) + + @classmethod + def start( # type: ignore[override] + cls, + path: str | os.PathLike[str], + what: TaskInstance, + client: Client, + target: Callable[[], None] = _subprocess_main, + logger: FilteringBoundLogger | None = None, + **kwargs, + ) -> Self: + proc = super().start(id=what.id, client=client, target=target, logger=logger, **kwargs) + # Tell the task process what it needs to do! + proc._on_child_started(what, path) + return proc + + def _on_child_started(self, ti: TaskInstance, path: str | os.PathLike[str]): + """Send startup message to the subprocess.""" + try: + # We've forked, but the task won't start doing anything until we send it the StartupDetails + # message. But before we do that, we need to tell the server it's started (so it has the chance to + # tell us "no, stop!" for any reason) + ti_context = self.client.task_instances.start(ti.id, self.pid, datetime.now(tz=timezone.utc)) + self._last_successful_heartbeat = time.monotonic() + except Exception: + # On any error kill that subprocess! + self.kill(signal.SIGKILL) + raise + + msg = StartupDetails.model_construct( + ti=ti, + file=os.fspath(path), + requests_fd=self._requests_fd, + ti_context=ti_context, + ) + + # Send the message to tell the process what it needs to execute + log.debug("Sending", msg=msg) + + try: + self.stdin.write(msg.model_dump_json().encode()) + self.stdin.write(b"\n") + except BrokenPipeError: + # Debug is fine, the process will have shown _something_ in it's last_chance exception handler + log.debug("Couldn't send startup message to Subprocess - it died very early", pid=self.pid) + def wait(self) -> int: if self._exit_code is not None: return self._exit_code @@ -579,51 +672,6 @@ def _handle_process_overtime_if_needed(self): log.warning("Workload success overtime reached; terminating process", ti_id=self.id) self.kill(signal.SIGTERM, force=True) - def _service_subprocess(self, max_wait_time: float, raise_on_timeout: bool = False): - """ - Service subprocess events by processing socket activity and checking for process exit. - - This method: - - Waits for activity on the registered file objects (via `self.selector.select`). - - Processes any events triggered on these file objects. - - Checks if the subprocess has exited during the wait. - - :param max_wait_time: Maximum time to block while waiting for events, in seconds. - :param raise_on_timeout: If True, raise an exception if the subprocess does not exit within the timeout. - :returns: The process exit code, or None if it's still alive - """ - events = self.selector.select(timeout=max_wait_time) - for key, _ in events: - # Retrieve the handler responsible for processing this file object (e.g., stdout, stderr) - socket_handler = key.data - - # Example of handler behavior: - # If the subprocess writes "Hello, World!" to stdout: - # - `socket_handler` reads and processes the message. - # - If EOF is reached, the handler returns False to signal no more reads are expected. - need_more = socket_handler(key.fileobj) - - # If the handler signals that the file object is no longer needed (EOF, closed, etc.) - # unregister it from the selector to stop monitoring; `wait()` blocks until all selectors - # are removed. - if not need_more: - self.selector.unregister(key.fileobj) - key.fileobj.close() # type: ignore[union-attr] - - # Check if the subprocess has exited - return self._check_subprocess_exit(raise_on_timeout=raise_on_timeout) - - def _check_subprocess_exit(self, raise_on_timeout: bool = False) -> int | None: - """Check if the subprocess has exited.""" - if self._exit_code is None: - try: - self._exit_code = self._process.wait(timeout=0) - log.debug("Workload process exited", exit_code=self._exit_code) - except psutil.TimeoutExpired: - if raise_on_timeout: - raise - return self._exit_code - def _send_heartbeat_if_needed(self): """Send a heartbeat to the client if heartbeat interval has passed.""" # Respect the minimum interval between heartbeat attempts @@ -684,35 +732,6 @@ def final_state(self): return self._terminal_state or TerminalTIState.SUCCESS return TerminalTIState.FAILED - def __rich_repr__(self): - yield "id", self.id - yield "pid", self.pid - # only include this if it's not the default (third argument) - yield "exit_code", self._exit_code, None - - __rich_repr__.angular = True # type: ignore[attr-defined] - - def __repr__(self) -> str: - rep = f"" - - def handle_requests(self, log: FilteringBoundLogger) -> Generator[None, bytes, None]: - """Handle incoming requests from the task process, respond with the appropriate data.""" - decoder = TypeAdapter[ToSupervisor](ToSupervisor) - - while True: - line = yield - - try: - msg = decoder.validate_json(line) - except Exception: - log.exception("Unable to decode message", line=line) - continue - - self._handle_request(msg, log) - def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger): log.debug("Received message from task runner", msg=msg) resp = None @@ -902,7 +921,7 @@ def supervise( processors = logging_processors(enable_pretty_log=pretty_logs)[0] logger = structlog.wrap_logger(underlying_logger, processors=processors, logger_name="task").bind() - process = WatchedSubprocess.start(dag_path, ti, client=client, logger=logger) + process = ActivitySubprocess.start(dag_path, ti, client=client, logger=logger) exit_code = process.wait() end = time.monotonic() diff --git a/task_sdk/tests/execution_time/test_supervisor.py b/task_sdk/tests/execution_time/test_supervisor.py index 3ced432b2eea3..c5879971a63d4 100644 --- a/task_sdk/tests/execution_time/test_supervisor.py +++ b/task_sdk/tests/execution_time/test_supervisor.py @@ -52,7 +52,7 @@ VariableResult, XComResult, ) -from airflow.sdk.execution_time.supervisor import WatchedSubprocess, supervise +from airflow.sdk.execution_time.supervisor import ActivitySubprocess, supervise from airflow.sdk.execution_time.task_runner import CommsDecoder from airflow.utils import timezone, timezone as tz @@ -99,7 +99,7 @@ def subprocess_main(): instant = tz.datetime(2024, 11, 7, 12, 34, 56, 78901) time_machine.move_to(instant, tick=False) - proc = WatchedSubprocess.start( + proc = ActivitySubprocess.start( path=os.devnull, what=TaskInstance( id="4d828a62-a417-4936-a7a6-2b3fabacecab", @@ -166,7 +166,7 @@ def subprocess_main(): assert os.getpid() != main_pid os.kill(os.getpid(), signal.SIGKILL) - proc = WatchedSubprocess.start( + proc = ActivitySubprocess.start( path=os.devnull, what=TaskInstance( id="4d828a62-a417-4936-a7a6-2b3fabacecab", @@ -189,7 +189,7 @@ def subprocess_main(): # or import error for instance - a very early exception raise RuntimeError("Fake syntax error") - proc = WatchedSubprocess.start( + proc = ActivitySubprocess.start( path=os.devnull, what=TaskInstance( id=uuid7(), @@ -225,7 +225,7 @@ def subprocess_main(): ti_id = uuid7() spy = spy_agency.spy_on(sdk_client.TaskInstanceOperations.heartbeat) - proc = WatchedSubprocess.start( + proc = ActivitySubprocess.start( path=os.devnull, what=TaskInstance( id=ti_id, @@ -341,7 +341,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: client = make_client(transport=httpx.MockTransport(handle_request)) with pytest.raises(ServerResponseError, match="Server returned error") as err: - WatchedSubprocess.start(path=os.devnull, what=ti, client=client) + ActivitySubprocess.start(path=os.devnull, what=ti, client=client) assert err.value.response.status_code == 409 assert err.value.detail == { @@ -394,7 +394,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: # Return a 204 for all other requests return httpx.Response(status_code=204) - proc = WatchedSubprocess.start( + proc = ActivitySubprocess.start( path=os.devnull, what=TaskInstance(id=ti_id, task_id="b", dag_id="c", run_id="d", try_number=1), client=make_client(transport=httpx.MockTransport(handle_request)), @@ -447,12 +447,13 @@ def test_heartbeat_failures_handling(self, monkeypatch, mocker, captured_logs, t # Patch the kill method at the class level so we can assert it was called with the correct signal mock_kill = mocker.patch("airflow.sdk.execution_time.supervisor.WatchedSubprocess.kill") - proc = WatchedSubprocess( + proc = ActivitySubprocess( id=TI_ID, pid=mock_process.pid, stdin=mocker.MagicMock(), client=client, process=mock_process, + requests_fd=-1, ) time_now = tz.datetime(2024, 11, 28, 12, 0, 0) @@ -533,14 +534,15 @@ def test_overtime_handling( mocker.patch("time.monotonic", return_value=20.0) # Patch the task overtime threshold - monkeypatch.setattr(WatchedSubprocess, "TASK_OVERTIME_THRESHOLD", overtime_threshold) + monkeypatch.setattr(ActivitySubprocess, "TASK_OVERTIME_THRESHOLD", overtime_threshold) - mock_watched_subprocess = WatchedSubprocess( + mock_watched_subprocess = ActivitySubprocess( id=TI_ID, pid=12345, stdin=mocker.Mock(), process=mocker.Mock(), client=mocker.Mock(), + requests_fd=-1, ) # Set the terminal state and task end datetime @@ -572,12 +574,13 @@ def mock_process(self, mocker): @pytest.fixture def watched_subprocess(self, mocker, mock_process): - proc = WatchedSubprocess( + proc = ActivitySubprocess( id=TI_ID, pid=12345, stdin=mocker.Mock(), client=mocker.Mock(), process=mock_process, + requests_fd=-1, ) # Mock the selector mock_selector = mocker.Mock(spec=selectors.DefaultSelector) @@ -662,7 +665,7 @@ def _handler(sig, frame): ti_id = uuid7() - proc = WatchedSubprocess.start( + proc = ActivitySubprocess.start( path=os.devnull, what=TaskInstance(id=ti_id, task_id="b", dag_id="c", run_id="d", try_number=1), client=MagicMock(spec=sdk_client.Client), @@ -753,12 +756,13 @@ class TestHandleRequest: @pytest.fixture def watched_subprocess(self, mocker): """Fixture to provide a WatchedSubprocess instance.""" - return WatchedSubprocess( + return ActivitySubprocess( id=TI_ID, pid=12345, stdin=BytesIO(), client=mocker.Mock(), process=mocker.Mock(), + requests_fd=-1, ) @pytest.mark.parametrize( diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index 2cc8a43e05450..12b9cd52013bc 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -112,8 +112,8 @@ def mock_processor(self) -> DagFileProcessorProcess: id=uuid7(), pid=1234, process=proc, - client=Mock(), stdin=io.BytesIO(), + requests_fd=123, ) ret._num_open_sockets = 0 return ret @@ -435,12 +435,11 @@ def test_kill_timed_out_processors_no_kill(self): mock_kill.assert_not_called() @pytest.mark.parametrize( - ["callbacks", "path", "child_comms_fd", "expected_buffer"], + ["callbacks", "path", "expected_buffer"], [ pytest.param( [], "/opt/airflow/dags/test_dag.py", - 123, b'{"file":"/opt/airflow/dags/test_dag.py","requests_fd":123,"callback_requests":[],' b'"type":"DagFileParseRequest"}\n', ), @@ -454,7 +453,6 @@ def test_kill_timed_out_processors_no_kill(self): ) ], "/opt/airflow/dags/dag_callback_dag.py", - 123, b'{"file":"/opt/airflow/dags/dag_callback_dag.py","requests_fd":123,"callback_requests":' b'[{"full_filepath":"/opt/airflow/dags/dag_callback_dag.py","msg":null,"dag_id":"dag_id",' b'"run_id":"run_id","is_failure_callback":false,"type":"DagCallbackRequest"}],' @@ -462,9 +460,9 @@ def test_kill_timed_out_processors_no_kill(self): ), ], ) - def test_serialize_callback_requests(self, callbacks, path, child_comms_fd, expected_buffer): + def test_serialize_callback_requests(self, callbacks, path, expected_buffer): processor = self.mock_processor() - processor._on_child_started(callbacks, path, child_comms_fd) + processor._on_child_started(callbacks, path) # Verify the response was added to the buffer val = processor.stdin.getvalue() From 4c01f7d16be11eafc0df9b1acf3c0af4af569611 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Fri, 10 Jan 2025 23:16:33 +0000 Subject: [PATCH 18/24] Fix tests introduced in #45546 (#45572) This was merged without letting tests pass somehow. We have removed the `root_dag` argument in Airflow 3. --- tests/api_connexion/endpoints/test_dag_run_endpoint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py index 4a0d18ff48cf9..caa061158f577 100644 --- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py @@ -1531,7 +1531,7 @@ def test_action_logging(self, state, run_type, dag_maker, session): dag_run_id = "TEST_DAG_RUN_ID" with dag_maker(dag_id) as dag: task = EmptyOperator(task_id="task_id", dag=dag) - self.app.dag_bag.bag_dag(dag, root_dag=dag) + self.app.dag_bag.bag_dag(dag) dr = dag_maker.create_dagrun(run_id=dag_run_id, run_type=run_type) ti = dr.get_task_instance(task_id="task_id") ti.task = task From 662804921feb2c9539c882914a4134bf773c0e3e Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Sat, 11 Jan 2025 07:50:30 +0100 Subject: [PATCH 19/24] Add optional --image-file-dir to store loaded files elsewhere (#45564) While backorting the "pull_request_target" removal to v2-10-test branches it turned out that there is not enough disk space on Public runner to load all 5 images and keep the file dump at the same time in the same filesystem. This PR allows to choose where the load/save files will be stored and in the github runner environment we store the files in "/mnt" wnich is a separate folder with 40GB free. --- .../prepare_breeze_and_image/action.yml | 11 ++- .../prepare_single_ci_image/action.yml | 10 ++- .github/workflows/ci-image-build.yml | 10 ++- .github/workflows/prod-image-build.yml | 10 ++- .../doc/images/output_ci-image_load.svg | 60 ++++++++++------ .../doc/images/output_ci-image_load.txt | 2 +- .../doc/images/output_ci-image_save.svg | 26 ++++--- .../doc/images/output_ci-image_save.txt | 2 +- .../doc/images/output_prod-image_load.svg | 50 ++++++++------ .../doc/images/output_prod-image_load.txt | 2 +- .../doc/images/output_prod-image_save.svg | 26 ++++--- .../doc/images/output_prod-image_save.txt | 2 +- .../commands/ci_image_commands.py | 68 ++++++++++++------- .../commands/ci_image_commands_config.py | 2 + .../commands/common_image_options.py | 19 ++++++ .../commands/production_image_commands.py | 60 ++++++++++------ .../production_image_commands_config.py | 2 + dev/breeze/src/airflow_breeze/utils/github.py | 4 +- 18 files changed, 246 insertions(+), 120 deletions(-) diff --git a/.github/actions/prepare_breeze_and_image/action.yml b/.github/actions/prepare_breeze_and_image/action.yml index e6755444b2f4f..26be0b76315ff 100644 --- a/.github/actions/prepare_breeze_and_image/action.yml +++ b/.github/actions/prepare_breeze_and_image/action.yml @@ -46,17 +46,22 @@ runs: with: use-uv: ${{ inputs.use-uv }} id: breeze + - name: Check free space + run: df -H + shell: bash + - name: Make /mnt/ directory writeable + run: sudo chown -R ${USER} /mnt + shell: bash - name: "Restore ${{ inputs.image-type }} docker image ${{ inputs.platform }}:${{ inputs.python }}" uses: apache/infrastructure-actions/stash/restore@c94b890bbedc2fc61466d28e6bd9966bc6c6643c with: key: ${{ inputs.image-type }}-image-save-${{ inputs.platform }}-${{ inputs.python }} - path: "/tmp/" + path: "/mnt/" - name: "Load ${{ inputs.image-type }} image ${{ inputs.platform }}:${{ inputs.python }}" env: PLATFORM: ${{ inputs.platform }} PYTHON: ${{ inputs.python }} IMAGE_TYPE: ${{ inputs.image-type }} run: > - breeze ${IMAGE_TYPE}-image load - --platform ${PLATFORM} --python ${PYTHON} + breeze ${IMAGE_TYPE}-image load --platform "${PLATFORM}" --python "${PYTHON}" --image-file-dir "/mnt" shell: bash diff --git a/.github/actions/prepare_single_ci_image/action.yml b/.github/actions/prepare_single_ci_image/action.yml index 3dde30033aa15..ecae9f802c966 100644 --- a/.github/actions/prepare_single_ci_image/action.yml +++ b/.github/actions/prepare_single_ci_image/action.yml @@ -35,16 +35,22 @@ inputs: runs: using: "composite" steps: + - name: Check free space + run: df -H + shell: bash + - name: Make /mnt/ directory writeable + run: sudo chown -R ${USER} /mnt + shell: bash - name: "Restore CI docker images ${{ inputs.platform }}:${{ inputs.python }}" uses: apache/infrastructure-actions/stash/restore@c94b890bbedc2fc61466d28e6bd9966bc6c6643c with: key: ci-image-save-${{ inputs.platform }}-${{ inputs.python }} - path: "/tmp/" + path: "/mnt/" if: contains(inputs.python-versions-list-as-string, inputs.python) - name: "Load CI image ${{ inputs.platform }}:${{ inputs.python }}" env: PLATFORM: ${{ inputs.platform }} PYTHON: ${{ inputs.python }} - run: breeze ci-image load --platform "${PLATFORM}" --python "${PYTHON}" + run: breeze ci-image load --platform "${PLATFORM}" --python "${PYTHON}" --image-file-dir "/mnt/" shell: bash if: contains(inputs.python-versions-list-as-string, inputs.python) diff --git a/.github/workflows/ci-image-build.yml b/.github/workflows/ci-image-build.yml index 55bf4e046e23f..9283dc06b936f 100644 --- a/.github/workflows/ci-image-build.yml +++ b/.github/workflows/ci-image-build.yml @@ -175,16 +175,22 @@ jobs: PUSH: ${{ inputs.push-image }} VERBOSE: "true" PLATFORM: ${{ inputs.platform }} + - name: Check free space + run: df -H + shell: bash + - name: Make /mnt/ directory writeable + run: sudo chown -R ${USER} /mnt + shell: bash - name: "Export CI docker image ${{ env.PYTHON_MAJOR_MINOR_VERSION }}" env: PLATFORM: ${{ inputs.platform }} - run: breeze ci-image save --platform "${PLATFORM}" + run: breeze ci-image save --platform "${PLATFORM}" --image-file-dir "/mnt" if: inputs.upload-image-artifact == 'true' - name: "Stash CI docker image ${{ env.PYTHON_MAJOR_MINOR_VERSION }}" uses: apache/infrastructure-actions/stash/save@c94b890bbedc2fc61466d28e6bd9966bc6c6643c with: key: ci-image-save-${{ inputs.platform }}-${{ env.PYTHON_MAJOR_MINOR_VERSION }} - path: "/tmp/ci-image-save-*-${{ env.PYTHON_MAJOR_MINOR_VERSION }}.tar" + path: "/mnt/ci-image-save-*-${{ env.PYTHON_MAJOR_MINOR_VERSION }}.tar" if-no-files-found: 'error' retention-days: '2' if: inputs.upload-image-artifact == 'true' diff --git a/.github/workflows/prod-image-build.yml b/.github/workflows/prod-image-build.yml index 85b421cade447..5784c7c58ba60 100644 --- a/.github/workflows/prod-image-build.yml +++ b/.github/workflows/prod-image-build.yml @@ -283,17 +283,23 @@ jobs: if: inputs.build-provider-packages != 'true' - name: "Verify PROD image ${{ env.PYTHON_MAJOR_MINOR_VERSION }}" run: breeze prod-image verify + - name: Check free space + run: df -H + shell: bash + - name: Make /mnt/ directory writeable + run: sudo chown -R ${USER} /mnt + shell: bash - name: "Export PROD docker image ${{ env.PYTHON_MAJOR_MINOR_VERSION }}" env: PLATFORM: ${{ inputs.platform }} run: > - breeze prod-image save --platform "${PLATFORM}" + breeze prod-image save --platform "${PLATFORM}" --image-file-dir "/mnt" if: inputs.upload-image-artifact == 'true' - name: "Stash PROD docker image ${{ env.PYTHON_MAJOR_MINOR_VERSION }}" uses: apache/infrastructure-actions/stash/save@c94b890bbedc2fc61466d28e6bd9966bc6c6643c with: key: prod-image-save-${{ inputs.platform }}-${{ env.PYTHON_MAJOR_MINOR_VERSION }} - path: "/tmp/prod-image-save-*-${{ env.PYTHON_MAJOR_MINOR_VERSION }}.tar" + path: "/mnt/prod-image-save-*-${{ env.PYTHON_MAJOR_MINOR_VERSION }}.tar" if-no-files-found: 'error' retention-days: '2' if: inputs.upload-image-artifact == 'true' diff --git a/dev/breeze/doc/images/output_ci-image_load.svg b/dev/breeze/doc/images/output_ci-image_load.svg index 962b9cb52c9ed..cb7036af1c946 100644 --- a/dev/breeze/doc/images/output_ci-image_load.svg +++ b/dev/breeze/doc/images/output_ci-image_load.svg @@ -1,4 +1,4 @@ - +