Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Swap Dag Parsing to use the TaskSDK machinery. #44972

Merged
merged 2 commits into from
Dec 19, 2024

Conversation

ashb
Copy link
Member

@ashb ashb commented Dec 16, 2024

As part of Airflow 3 DAG definition files will have to use the Task SDK for
all their classes, and anything involving running user code will need to be
de-coupled from the database in the user-code process.

This change moves all of the "serialization" change up to the
DagFileProcessorManager, using the new function introduced in #44898 and the
"subprocess" machinery introduced in #44874.

Important Note: this change does not remove the ability for dag processes
to access the DB for Variables etc. That will come in a future change.

Some key parts of this change:

  • It builds upon the WatchedSubprocess from the TaskSDK. Right now this puts a
    nasty/unwanted depenednecy between the Dag Parsing code upon the TaskSDK.
    This will be addressed before release (we have talked about introducing a
    new "apache-airflow-base-executor" dist where this subprocess+supervisor
    could live, as the "execution_time" folder in the Task SDK is more a feature
    of the executor, not of the TaskSDK itself)
  • A number of classes that we need to send between processes have been
    converted to Pydantic for ease of serialization.
  • In order to not have to serialize everything in the subprocess and deserialize everything
    in the parent Manager process, we have created a LazyDeserializedDAG class
    that provides lazy access to much of the properties needed to create update
    the DAG related DB objects, without needing to fully deserialize the entire
    DAG structure.
  • Classes switched to attrs based for less boilerplate in constructors.
  • Internal timers convert to time.monotonic where possible, and time.time
    where not, we only need second diff between two points, not datetime objects
  • With the earlier removal of "sync mode" for SQLite in Remove "single process" restrictions on SQLite in favour of using WAL mode #44839 the need for
    separate TERMIANTE and END messages over the control socket can go

Co-authored-by: Jed Cunningham [email protected]
Co-authored-by: Daniel Imberman [email protected]


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@ashb ashb added area:Scheduler including HA (high availability) scheduler area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK area:task-sdk labels Dec 16, 2024
@ashb
Copy link
Member Author

ashb commented Dec 16, 2024

The tests aren't 100% finished yet.

And this change is larger than I would have liked, but at least it's a net-negative change

@ashb ashb force-pushed the dag-parsing-uses-task-sdk branch from 32176b9 to 1215213 Compare December 16, 2024 23:29
@ashb ashb force-pushed the dag-parsing-uses-task-sdk branch from 1215213 to 9f04c14 Compare December 16, 2024 23:46
@ashb
Copy link
Member Author

ashb commented Dec 16, 2024

I don't expect tests to pass yet, but I want to give people the chance to see this PR, and I know @jedcunningham is waiting on this for some of his DAG versioning work.

@kaxil
Copy link
Member

kaxil commented Dec 17, 2024

Right now this puts a
nasty/unwanted depenednecy between the Dag Parsing code upon the TaskSDK.

I think that is unavoidable, as user code will come from Task SDK

This will be addressed before release (we have talked about introducing a
new "apache-airflow-base-executor" dist where this subprocess+supervisor
could live, as the "execution_time" folder in the Task SDK is more a feature
of the executor, not of the TaskSDK itself)

Right, I think processor will have to depend on both Task SDK (user-facing code) + Base Executor dist -- after that separation

Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review, will be coming back to it in an hour

airflow/callbacks/callback_requests.py Outdated Show resolved Hide resolved
airflow/dag_processing/collection.py Show resolved Hide resolved
airflow/models/dagcode.py Show resolved Hide resolved
airflow/dag_processing/collection.py Show resolved Hide resolved
airflow/dag_processing/manager.py Outdated Show resolved Hide resolved
airflow/dag_processing/manager.py Show resolved Hide resolved
airflow/dag_processing/manager.py Show resolved Hide resolved
airflow/dag_processing/manager.py Outdated Show resolved Hide resolved
airflow/dag_processing/manager.py Outdated Show resolved Hide resolved
airflow/dag_processing/manager.py Show resolved Hide resolved
airflow/dag_processing/collection.py Show resolved Hide resolved
airflow/dag_processing/manager.py Show resolved Hide resolved
airflow/dag_processing/manager.py Outdated Show resolved Hide resolved
airflow/dag_processing/manager.py Outdated Show resolved Hide resolved
airflow/dag_processing/manager.py Show resolved Hide resolved
airflow/dag_processing/manager.py Outdated Show resolved Hide resolved
airflow/dag_processing/manager.py Show resolved Hide resolved
airflow/callbacks/callback_requests.py Outdated Show resolved Hide resolved

class DagFileProcessor(LoggingMixin):
@attrs.define()
class DagFileProcessorProcess(WatchedSubprocess):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class DagFileProcessorProcess(WatchedSubprocess):
class FileParserProcess(WatchedSubprocess):

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah sounds good

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to do these rename as a separate PR right after this one -- the diff is big enough as it is.

@ashb ashb force-pushed the dag-parsing-uses-task-sdk branch 2 times, most recently from 93b8d53 to 553049e Compare December 17, 2024 15:56
airflow/dag_processing/processor.py Outdated Show resolved Hide resolved
airflow/dag_processing/processor.py Outdated Show resolved Hide resolved
airflow/dag_processing/processor.py Outdated Show resolved Hide resolved
airflow/dag_processing/processor.py Outdated Show resolved Hide resolved
airflow/dag_processing/processor.py Outdated Show resolved Hide resolved
airflow/dag_processing/processor.py Outdated Show resolved Hide resolved
airflow/serialization/serialized_objects.py Outdated Show resolved Hide resolved
airflow/serialization/serialized_objects.py Outdated Show resolved Hide resolved
airflow/serialization/serialized_objects.py Outdated Show resolved Hide resolved
task_sdk/src/airflow/sdk/log.py Show resolved Hide resolved
Copy link
Contributor

@amoghrajesh amoghrajesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mainly reviewed the processor and manager files. The other changes seem mostly reactive. Overall, I like the improvements, especially the reuse of the execution time machinery here. Few initial comments, nothing serious but mostly nits.


class DagFileProcessor(LoggingMixin):
@attrs.define()
class DagFileProcessorProcess(WatchedSubprocess):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah sounds good

airflow/dag_processing/processor.py Show resolved Hide resolved
airflow/dag_processing/processor.py Outdated Show resolved Hide resolved
airflow/dag_processing/processor.py Outdated Show resolved Hide resolved
airflow/dag_processing/processor.py Outdated Show resolved Hide resolved
airflow/dag_processing/processor.py Outdated Show resolved Hide resolved
airflow/dag_processing/processor.py Outdated Show resolved Hide resolved
airflow/serialization/serialized_objects.py Show resolved Hide resolved
airflow/dag_processing/manager.py Outdated Show resolved Hide resolved
tests/dag_processing/test_processor.py Show resolved Hide resolved
@ashb ashb force-pushed the dag-parsing-uses-task-sdk branch 3 times, most recently from e61c595 to f6487c9 Compare December 18, 2024 23:08
@ashb
Copy link
Member Author

ashb commented Dec 18, 2024

Right, I think this should now pass the tests, the only thing I'm not sure about this is the xfail I've put for the "simple ti roundtrip exec config tests" -- Either we should remove it or make it work, but I'm not sure if we need to pass down executor config via TI anymore

@kaxil Any ideas the best plan for that one?

@ashb ashb marked this pull request as ready for review December 18, 2024 23:09
@ashb
Copy link
Member Author

ashb commented Dec 18, 2024

(I still need to rename a class and file, but that is a non-meaningful/non-review-impacting change.

@kaxil kaxil added the full tests needed We need to run full set of tests for this PR to merge label Dec 19, 2024
@kaxil
Copy link
Member

kaxil commented Dec 19, 2024

Right, I think this should now pass the tests, the only thing I'm not sure about this is the xfail I've put for the "simple ti roundtrip exec config tests" -- Either we should remove it or make it work, but I'm not sure if we need to pass down executor config via TI anymore

@kaxil Any ideas the best plan for that one?

Since we are planning to handle callbacks via Executor/worker interface too -- don't think we need to pass it explicitly from TI, instead just handle it on the server side before sending TI/request to the worker.

Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor comments and the renaming of file & classes can happen in a separate PR too

airflow/dag_processing/manager.py Outdated Show resolved Hide resolved
airflow/dag_processing/processor.py Outdated Show resolved Hide resolved
@ashb
Copy link
Member Author

ashb commented Dec 19, 2024

I'm going to run this with full tests, I want the kube tests to see if there is something broken not covered by unit tests

ashb and others added 2 commits December 19, 2024 12:02
As part of Airflow 3 DAG definition files will have to use the Task SDK for
all their classes, and anything involving running user code will need to be
de-coupled from the database in the user-code process.

This change moves all of the "serialization" change up to the
DagFileProcessorManager, using the new function introduced in apache#44898 and the
"subprocess" machinery introduced in apache#44874.

**Important Note**: this change does not remove the ability for dag processes
to access the DB for Variables etc. That will come in a future change.

Some key parts of this change:

- It builds upon the WatchedSubprocess from the TaskSDK. Right now this puts a
  nasty/unwanted depenednecy between the Dag Parsing code upon the TaskSDK.
  This will be addressed before release (we have talked about introducing a
  new "apache-airflow-base-executor" dist where this subprocess+supervisor
  could live, as the "execution_time" folder in the Task SDK is more a feature
  of the executor, not of the TaskSDK itself.)
- A number of classes that we need to send between processes have been
  converted to Pydantic for ease of serialization.
- In order to not have to serialize everything in the subprocess and deserialize everything
  in the parent Manager process, we have created a `LazyDeserializedDAG` class
  that provides lazy access to much of the properties needed to create update
  the DAG related DB objects, without needing to fully deserialize the entire
  DAG structure.
- Classes switched to attrs based for less boilerplate in constructors.
- Internal timers convert to `time.monotonic` where possible, and `time.time`
  where not, we only need second diff between two points, not datetime
  objects.
- With the earlier removal of "sync mode" for SQLite in apache#44839 the need for
  separate TERMIANTE and END messages over the control socket can go.

Co-authored-by: Jed Cunningham <[email protected]>
Co-authored-by: Daniel Imberman <[email protected]>
@ashb ashb force-pushed the dag-parsing-uses-task-sdk branch from f6487c9 to b22af18 Compare December 19, 2024 12:04
assert "a.py" in resp.import_errors


# @conf_vars({("logging", "dag_processor_log_target"): "stdout"})
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving these comments out for now as I want to overhaul the logging in #45072

@ashb ashb merged commit 8774f28 into apache:main Dec 19, 2024
91 of 94 checks passed
@ashb ashb deleted the dag-parsing-uses-task-sdk branch December 19, 2024 14:19
LefterisXefteris pushed a commit to LefterisXefteris/airflow that referenced this pull request Jan 5, 2025
As part of Airflow 3 DAG definition files will have to use the Task SDK for
all their classes, and anything involving running user code will need to be
de-coupled from the database in the user-code process.

This change moves all of the "serialization" change up to the
DagFileProcessorManager, using the new function introduced in apache#44898 and the
"subprocess" machinery introduced in apache#44874.

**Important Note**: this change does not remove the ability for dag processes
to access the DB for Variables etc. That will come in a future change.

Some key parts of this change:

- It builds upon the WatchedSubprocess from the TaskSDK. Right now this puts a
  nasty/unwanted depenednecy between the Dag Parsing code upon the TaskSDK.
  This will be addressed before release (we have talked about introducing a
  new "apache-airflow-base-executor" dist where this subprocess+supervisor
  could live, as the "execution_time" folder in the Task SDK is more a feature
  of the executor, not of the TaskSDK itself.)
- A number of classes that we need to send between processes have been
  converted to Pydantic for ease of serialization.
- In order to not have to serialize everything in the subprocess and deserialize everything
  in the parent Manager process, we have created a `LazyDeserializedDAG` class
  that provides lazy access to much of the properties needed to create update
  the DAG related DB objects, without needing to fully deserialize the entire
  DAG structure.
- Classes switched to attrs based for less boilerplate in constructors.
- Internal timers convert to `time.monotonic` where possible, and `time.time`
  where not, we only need second diff between two points, not datetime
  objects.
- With the earlier removal of "sync mode" for SQLite in apache#44839 the need for
  separate TERMINATE and END messages over the control socket can go.

---------

Co-authored-by: Jed Cunningham <[email protected]>
Co-authored-by: Daniel Imberman <[email protected]>
agupta01 pushed a commit to agupta01/airflow that referenced this pull request Jan 6, 2025
As part of Airflow 3 DAG definition files will have to use the Task SDK for
all their classes, and anything involving running user code will need to be
de-coupled from the database in the user-code process.

This change moves all of the "serialization" change up to the
DagFileProcessorManager, using the new function introduced in apache#44898 and the
"subprocess" machinery introduced in apache#44874.

**Important Note**: this change does not remove the ability for dag processes
to access the DB for Variables etc. That will come in a future change.

Some key parts of this change:

- It builds upon the WatchedSubprocess from the TaskSDK. Right now this puts a
  nasty/unwanted depenednecy between the Dag Parsing code upon the TaskSDK.
  This will be addressed before release (we have talked about introducing a
  new "apache-airflow-base-executor" dist where this subprocess+supervisor
  could live, as the "execution_time" folder in the Task SDK is more a feature
  of the executor, not of the TaskSDK itself.)
- A number of classes that we need to send between processes have been
  converted to Pydantic for ease of serialization.
- In order to not have to serialize everything in the subprocess and deserialize everything
  in the parent Manager process, we have created a `LazyDeserializedDAG` class
  that provides lazy access to much of the properties needed to create update
  the DAG related DB objects, without needing to fully deserialize the entire
  DAG structure.
- Classes switched to attrs based for less boilerplate in constructors.
- Internal timers convert to `time.monotonic` where possible, and `time.time`
  where not, we only need second diff between two points, not datetime
  objects.
- With the earlier removal of "sync mode" for SQLite in apache#44839 the need for
  separate TERMINATE and END messages over the control socket can go.

---------

Co-authored-by: Jed Cunningham <[email protected]>
Co-authored-by: Daniel Imberman <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:CLI area:Scheduler including HA (high availability) scheduler area:serialization area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK area:task-sdk full tests needed We need to run full set of tests for this PR to merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants