Skip to content

fix(composer): fix version check logic for 'airflow_db_cleanup.py' #13295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 53 additions & 28 deletions composer/workflows/airflow_db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# Note: This sample is designed for Airflow 1 and 2.

# [START composer_metadb_cleanup]
"""
A maintenance workflow that you can deploy into Airflow to periodically clean
"""A maintenance workflow that you can deploy into Airflow to periodically clean
out the DagRun, TaskInstance, Log, XCom, Job DB and SlaMiss entries to avoid
having too much data in your Airflow MetaStore.

Expand Down Expand Up @@ -68,33 +69,60 @@
from sqlalchemy import desc, sql, text
from sqlalchemy.exc import ProgrammingError


def parse_airflow_version(version: str) -> tuple[int]:
# TODO(developer): Update this function if you are using a version
# with non-numerical characters such as "2.9.3rc1".
COMPOSER_SUFFIX = "+composer"
if version.endswith(COMPOSER_SUFFIX):
airflow_version_without_suffix = version[:-len(COMPOSER_SUFFIX)]
else:
airflow_version_without_suffix = version
airflow_version_str = airflow_version_without_suffix.split(".")

return tuple([int(s) for s in airflow_version_str])


now = timezone.utcnow

# airflow-db-cleanup
DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "")

START_DATE = airflow.utils.dates.days_ago(1)
# How often to Run. @daily - Once a day at Midnight (UTC)

# How often to Run. @daily - Once a day at Midnight (UTC).
SCHEDULE_INTERVAL = "@daily"
# Who is listed as the owner of this DAG in the Airflow Web Server

# Who is listed as the owner of this DAG in the Airflow Web Server.
DAG_OWNER_NAME = "operations"
# List of email address to send email alerts to if this job fails

# List of email address to send email alerts to if this job fails.
ALERT_EMAIL_ADDRESSES = []
# Airflow version used by the environment in list form, value stored in
# airflow_version is in format e.g "2.3.4+composer"
AIRFLOW_VERSION = airflow_version[: -len("+composer")].split(".")
# Length to retain the log files if not already provided in the conf. If this
# is set to 30, the job will remove those files that arE 30 days old or older.

# Airflow version used by the environment as a tuple of integers.
# For example: (2, 9, 2)
#
# Value in `airflow_version` is in format e.g. "2.9.2+composer"
# It's converted to facilitate version comparison.
AIRFLOW_VERSION = parse_airflow_version(airflow_version)

# Length to retain the log files if not already provided in the configuration.
# If this is set to 30, the job will remove those files
# that are 30 days old or older.
DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int(
Variable.get("airflow_db_cleanup__max_db_entry_age_in_days", 30)
)
# Prints the database entries which will be getting deleted; set to False
# to avoid printing large lists and slowdown process

# Prints the database entries which will be getting deleted;
# set to False to avoid printing large lists and slowdown the process.
PRINT_DELETES = False
# Whether the job should delete the db entries or not. Included if you want to
# temporarily avoid deleting the db entries.

# Whether the job should delete the DB entries or not.
# Included if you want to temporarily avoid deleting the DB entries.
ENABLE_DELETE = True
# List of all the objects that will be deleted. Comment out the DB objects you
# want to skip.

# List of all the objects that will be deleted.
# Comment out the DB objects you want to skip.
DATABASE_OBJECTS = [
{
"airflow_db_model": DagRun,
Expand All @@ -105,9 +133,7 @@
},
{
"airflow_db_model": TaskInstance,
"age_check_column": TaskInstance.start_date
if AIRFLOW_VERSION < ["2", "2", "0"]
else TaskInstance.start_date,
"age_check_column": TaskInstance.start_date,
"keep_last": False,
"keep_last_filters": None,
"keep_last_group_by": None,
Expand All @@ -122,7 +148,7 @@
{
"airflow_db_model": XCom,
"age_check_column": XCom.execution_date
if AIRFLOW_VERSION < ["2", "2", "5"]
if AIRFLOW_VERSION < (2, 2, 5)
else XCom.timestamp,
"keep_last": False,
"keep_last_filters": None,
Expand All @@ -144,15 +170,15 @@
},
]

# Check for TaskReschedule model
# Check for TaskReschedule model.
try:
from airflow.models import TaskReschedule

DATABASE_OBJECTS.append(
{
"airflow_db_model": TaskReschedule,
"age_check_column": TaskReschedule.execution_date
if AIRFLOW_VERSION < ["2", "2", "0"]
if AIRFLOW_VERSION < (2, 2, 0)
else TaskReschedule.start_date,
"keep_last": False,
"keep_last_filters": None,
Expand All @@ -163,7 +189,7 @@
except Exception as e:
logging.error(e)

# Check for TaskFail model
# Check for TaskFail model.
try:
from airflow.models import TaskFail

Expand All @@ -180,8 +206,8 @@
except Exception as e:
logging.error(e)

# Check for RenderedTaskInstanceFields model
if AIRFLOW_VERSION < ["2", "4", "0"]:
# Check for RenderedTaskInstanceFields model.
if AIRFLOW_VERSION < (2, 4, 0):
try:
from airflow.models import RenderedTaskInstanceFields

Expand All @@ -198,7 +224,7 @@
except Exception as e:
logging.error(e)

# Check for ImportError model
# Check for ImportError model.
try:
from airflow.models import ImportError

Expand All @@ -216,7 +242,7 @@
except Exception as e:
logging.error(e)

if AIRFLOW_VERSION < ["2", "6", "0"]:
if AIRFLOW_VERSION < (2, 6, 0):
try:
from airflow.jobs.base_job import BaseJob

Expand Down Expand Up @@ -530,5 +556,4 @@ def analyze_db():

print_configuration.set_downstream(cleanup_op)
cleanup_op.set_downstream(analyze_op)

# [END composer_metadb_cleanup]
17 changes: 16 additions & 1 deletion composer/workflows/airflow_db_cleanup_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,23 @@

import internal_unit_testing

from . import airflow_db_cleanup

def test_dag_import(airflow_database):

def test_version_comparison():
# b/408307862 - Validate version check logic used in the sample.
AIRFLOW_VERSION = airflow_db_cleanup.parse_airflow_version("2.10.5+composer")

assert AIRFLOW_VERSION == (2, 10, 5)
assert AIRFLOW_VERSION > (2, 9, 1)

AIRFLOW_VERSION = airflow_db_cleanup.parse_airflow_version("2.9.2")

assert AIRFLOW_VERSION == (2, 9, 2)
assert AIRFLOW_VERSION < (2, 9, 3)


def test_dag_import():
"""Test that the DAG file can be successfully imported.

This tests that the DAG can be parsed, but does not run it in an Airflow
Expand Down
2 changes: 1 addition & 1 deletion composer/workflows/noxfile_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"3.10",
"3.12",
"3.13",
], # Composer w/ Airflow 2 only supports Python 3.8
],
# Old samples are opted out of enforcing Python type hints
# All new samples should feature them
"enforce_type_hints": False,
Expand Down
2 changes: 1 addition & 1 deletion composer/workflows/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
# https://github.com/apache/airflow/blob/main/pyproject.toml

apache-airflow[amazon,apache.beam,cncf.kubernetes,google,microsoft.azure,openlineage,postgres]==2.9.2
google-cloud-dataform==0.5.9 # used in Dataform operators
google-cloud-dataform==0.5.9 # Used in Dataform operators
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewers: Please let me know whether pep-0008/#inline-comments applies for requirements.txt as well.

I couldn't find a Style Guide for this file, rather than https://pip.pypa.io/en/stable/reference/requirements-file-format/

scipy==1.14.1