Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conditional Task Execution Based on Model Metadata #1416

Open
itestyoy opened this issue Dec 20, 2024 · 5 comments
Open

Conditional Task Execution Based on Model Metadata #1416

itestyoy opened this issue Dec 20, 2024 · 5 comments
Milestone

Comments

@itestyoy
Copy link

Hello!
I would like to know if it’s possible to run a task based on model metadata, for example: meta: { "airflow_schedule": "@daily" } or smt like this without post-rendering DAG.

Copy link

dosubot bot commented Dec 20, 2024

Hello @itestyoy! I'm here to help you with any bugs, questions, or contributions you have. Let's tackle this together!

I wasn't able to find specific information in the Cosmos repository about using metadata fields like "airflow_schedule" for conditional task execution without post-rendering the DAG. Cosmos does handle task scheduling and execution based on model metadata by leveraging Apache Airflow's scheduling capabilities, but it primarily uses Airflow's time-based scheduling options and data-aware scheduling through dataset URIs [1]. If you need more detailed functionality, you might need to implement custom logic to interpret such metadata within your DAG definition.

To continue talking to Dosu, mention @dosu.


Help Dosu learn! Give it feedback: Great Response | Irrelevant Answer | Incorrect Sources | Too Verbose | Hallucination | Other

@pankajastro
Copy link
Contributor

Hi @itestyoy, Could you please explain a bit more about your use cases for my understanding. Cosmos does support some param using model meta. See docs https://astronomer.github.io/astronomer-cosmos/getting_started/custom-airflow-properties.html

@itestyoy
Copy link
Author

@pankajastro Hi!

I am considering the option of using metadata (e.g., "airflow_schedule": "@daily") directly in the model definition to optimize DAG execution time. Specifically, certain models need to be updated only once a day, while others can be refreshed more frequently.

To achieve this optimization, metadata like airflow_schedule can be analyzed for each task, and task executions that do not align with the current execution schedule can be selectively skipped. For instance, if the current execution time does not match the airflow_schedule specified in the metadata, the task should raise a SkipException to avoid unnecessary execution.

@tatiana
Copy link
Collaborator

tatiana commented Dec 27, 2024

This is a very valid use case, and we've also considered it. Unfortunately, Airflow 2.x does not have built-in support for different schedules in the same DAG.

If Cosmos 1.8 users want to handle this use case, they need to:

  1. Use DbtDag dbt selectors (--select or --exclude) to choose subsets of dbt nodes for each of those specific schedules (https://astronomer.github.io/astronomer-cosmos/configuration/selecting-excluding.html)
  2. "Manually" set the task dependency between those DAGs that represent subsets of the dbt project

We logged a task that could simplify step (2) by allowing Cosmos users to add Dataset-based schedules to DbtDags automatically:
#1321

The implementation of this task is relatively close in our roadmap.

Moving forward, a few things could be done to improve this experience.

One of them would be if Airlfow had built-in support for different schedules within the same DAG. I believe there was an intention of accomplishing this as part of https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-73+Expanded+Data+Awareness. @cmarteepants and @uranusjr may be able to give additional context and information on this.

Another possibility would be for Cosmos to handle this Airflow 2.x limitation by adding conditional operators and deciding when subparts of the DAG should only be run in a specific subset of a more granular schedule.

As an example, assuming we had the dbt models a, b, c, d with the following schedules:

[a - hourly] -> [b] -> [d - daily]
                    -> [c - weekly]

If all these models were in the same DAG, the DAG could be run hourly

  • [a] would always be run
  • [b] would always be run
  • [d] would be run only once a day, in a subset of the hours the DAG is scheduled (at a particular hour):
  • [c] would be run weekly ( on a specific set hour of a day of the week)

@itestyoy @pankajastro any thoughts on these ideas?

@itestyoy
Copy link
Author

@tatiana Hi!

Airflow provides @skip_if and @run_if callback decorators in newer versions. More details here:
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/decorators/condition/index.html

Another approach is to wrap any operator with a callback function and raise an AirflowSkipException for tasks that need to be skipped.

condition_callback is any function that uses the context information and optionally op_kwargs, which can provide additional metadata about the model.

In general, you don’t need to set a schedule for tasks in the graph(Airflow 2.x does not have built-in support for different schedules in the same DAG). Instead, tasks can be conditionally skipped based on a function that represents the own schedule logic. The DAG itself should be triggered according to your specific needs and aligned with the logic of the scheduling requirements. Using built-in task option trigger_rule, you can define specific behavior for running downstream tasks.

Here’s an example implementation:

from airflow.exceptions import AirflowSkipException
from airflow.utils.session import provide_session

class <Name>OperatorWithCondition(<Name>Operator):
    def __init__(
        self,
        condition_callback,
        op_kwargs=None,
        ignore_downstream_trigger_rules: bool = True,
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.ignore_downstream_trigger_rules = ignore_downstream_trigger_rules
        self.condition_callback = condition_callback
        self.op_kwargs = op_kwargs or {}

    @provide_session
    def execute(self, context, session=None):
        if self.condition_callback(context, **self.op_kwargs):
            super().execute(context)
        else:
            raise AirflowSkipException

@tatiana tatiana added this to the Cosmos 1.9.0 milestone Dec 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants