Skip to content

Commit

Permalink
Update DAG parsing tests, add tests for ensuring new DAGs are added (#…
Browse files Browse the repository at this point in the history
…4797)

* Add a check to make sure new DAG files get added to the parsing tests

* Update tests with new DAGs and expected DAG counts

* Fix deprecation warning
  • Loading branch information
AetherUnbound authored Aug 23, 2024
1 parent 1a86e40 commit 6e58c85
Showing 1 changed file with 38 additions and 10 deletions.
48 changes: 38 additions & 10 deletions catalog/tests/dags/test_dag_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,52 @@

# DAG paths to test
DAG_PATHS = [
"providers/provider_workflow_dag_factory.py",
"data_refresh/dag_factory.py",
"database/batched_update/batched_update_dag.py",
"database/catalog_cleaner/catalog_cleaner.py",
"database/delete_records/delete_records_dag.py",
"database/report_pending_reported_media.py",
"database/staging_database_restore/staging_database_restore_dag.py",
"elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py",
"elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py", # noqa: E501
"elasticsearch_cluster/healthcheck_dag.py",
"elasticsearch_cluster/point_es_alias/point_es_alias_dag.py",
"elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py",
"legacy_data_refresh/create_filtered_index_dag.py",
"legacy_data_refresh/dag_factory.py",
"maintenance/add_license_url.py",
"maintenance/airflow_log_cleanup_workflow.py",
"maintenance/check_silenced_dags.py",
"maintenance/decode_and_deduplicate_image_tags.py",
"maintenance/flickr_audit_sub_provider_workflow.py",
"maintenance/pr_review_reminders/pr_review_reminders_dag.py",
"maintenance/rotate_db_snapshots.py",
"popularity/recreate_popularity_calculation_dag_factory.py",
"popularity/popularity_refresh_dag_factory.py",
"legacy_data_refresh/dag_factory.py",
"legacy_data_refresh/create_filtered_index_dag.py",
"elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py",
"elasticsearch_cluster/healthcheck_dag.py",
"oauth2/authorize_dag.py",
"oauth2/token_refresh_dag.py",
"database/delete_records/delete_records_dag.py",
"popularity/popularity_refresh_dag_factory.py",
"popularity/recreate_popularity_calculation_dag_factory.py",
"providers/provider_reingestion_workflow_dag_factory.py",
"providers/provider_workflow_dag_factory.py",
]

# Expected count from the DagBag once a file has been parsed
# (this will likely not need to be edited for new providers)
EXPECTED_COUNT = {
"providers/provider_workflow_dag_factory.py": len(PROVIDER_WORKFLOW_CONFIGS),
"providers/provider_ingestion_workflow_dag_factory.py": len(
"providers/provider_reingestion_workflow_dag_factory.py": len(
REINGESTION_WORKFLOW_CONFIGS
),
"popularity/recreate_popularity_calculation_dag_factory.py": len(MEDIA_TYPES),
"popularity/popularity_refresh_dag_factory.py": len(MEDIA_TYPES),
"legacy_data_refresh/dag_factory.py": len(MEDIA_TYPES),
"legacy_data_refresh/create_filtered_index_dag.py": len(MEDIA_TYPES),
"elasticsearch_cluster/healthcheck_dag.py": len(ENVIRONMENTS),
"data_refresh/dag_factory.py": len(MEDIA_TYPES) * len(ENVIRONMENTS),
"database/batched_update/batched_update_dag.py": 2,
"elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py": len(
ENVIRONMENTS
),
"elasticsearch_cluster/point_es_alias/point_es_alias_dag.py": len(ENVIRONMENTS),
}


Expand All @@ -60,6 +79,13 @@ def test_dag_import_errors():
len(dagbag.import_errors) == 0
), f"Errors found during DAG import for files: {error_string}"

all_paths = {str(dag.relative_fileloc) for dag in dagbag.dags.values()}
missing_paths = all_paths - set(DAG_PATHS)
assert len(missing_paths) == 0, (
f"The following DAG files are unaccounted for in the DAG parse testing, "
f"please add them to `DAG_PATHS` in `test_dag_parsing.py`: {missing_paths}"
)


# relative_path represents the path from the DAG folder to the file
@pytest.mark.parametrize("relative_path", DAG_PATHS)
Expand All @@ -69,8 +95,10 @@ def test_dags_loads_correct_number_with_no_errors(relative_path, tmpdir):
expected_count = EXPECTED_COUNT.get(relative_path, 1)
dag_bag = DagBag(dag_folder=tmpdir, include_examples=False)
dag_bag.process_file(str(DAG_FOLDER / relative_path))
assert len(dag_bag.dags), "No DAGs found in file"
assert len(dag_bag.import_errors) == 0, "Errors found during DAG import"
assert len(dag_bag.dags) == expected_count, "An unexpected # of DAGs was found"
found = len(dag_bag.dags)
assert found == expected_count, f"An unexpected # of DAGs ({found}) were found"


def test_dag_uses_default_args():
Expand Down

0 comments on commit 6e58c85

Please sign in to comment.