From c9242cedfd480d38f4558d6806548c45d2c67cf3 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 14 Aug 2024 21:15:41 +0000 Subject: [PATCH] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../definitions/edx/openedx_data_extract.py | 61 ++++++------------- src/ol_orchestrate/jobs/open_edx.py | 4 -- 2 files changed, 19 insertions(+), 46 deletions(-) diff --git a/src/ol_orchestrate/definitions/edx/openedx_data_extract.py b/src/ol_orchestrate/definitions/edx/openedx_data_extract.py index 74e9decd2..aa4c5a9a7 100644 --- a/src/ol_orchestrate/definitions/edx/openedx_data_extract.py +++ b/src/ol_orchestrate/definitions/edx/openedx_data_extract.py @@ -1,26 +1,17 @@ from typing import Literal -from dagster import ( -AssetSelection, - Definitions, - ScheduleDefinition, -define_asset_job -) +from dagster import AssetSelection, Definitions, ScheduleDefinition, define_asset_job from dagster_aws.s3 import S3Resource -from ol_orchestrate.assets.edxorg_archive import ( - CourseListConfig, +from ol_orchestrate.assets.openedx import ( course_list, course_structure, ) - from ol_orchestrate.io_managers.filepath import ( S3FileObjectIOManager, ) -from ol_orchestrate.jobs.open_edx import extract_open_edx_data_to_ol_data_platform from ol_orchestrate.lib.constants import DAGSTER_ENV, VAULT_ADDRESS from ol_orchestrate.resources.openedx import OpenEdxApiClient -from ol_orchestrate.resources.outputs import DailyResultsDir from ol_orchestrate.resources.secrets.vault import Vault if DAGSTER_ENV == "dev": @@ -64,30 +55,6 @@ def open_edx_extract_job_config( } -ol_extract_jobs = [ - extract_open_edx_data_to_ol_data_platform.to_job( - resource_defs={ - "results_dir": DailyResultsDir(), - "s3_upload": S3Resource(), - "s3": S3Resource(), - "openedx": OpenEdxApiClient.configure_at_launch(), - }, - name=f"extract_{deployment}_open_edx_data_to_data_platform", - config=open_edx_extract_job_config(deployment, DAGSTER_ENV), # type: ignore[arg-type] - ) - for deployment in ("residential", "xpro", "mitxonline") -] - -openedx_data_extracts = Definitions( - jobs=ol_extract_jobs, - schedules=[ - ScheduleDefinition( - name=f"{job.name}_nightly", cron_schedule="0 0 * * *", job=job - ) - for job in ol_extract_jobs - ], -) - def s3_uploads_bucket( dagster_env: Literal["dev", "qa", "production"], ) -> dict[str, Any]: @@ -101,6 +68,7 @@ def s3_uploads_bucket( } return bucket_map[dagster_env] + def edxorg_data_archive_config(dagster_env): return { "ops": { @@ -114,26 +82,35 @@ def edxorg_data_archive_config(dagster_env): } -openedx_course_structures_job = extract_open_edx_data_to_ol_data_platform.to_job( +openedx_course_structures_job = define_asset_job( name="extract_open_edx_data_to_ol_data_platform", - config=edxorg_data_archive_config(DAGSTER_ENV), - selection=AssetSelection.assets(CourseListConfig, course_list, course_structure), + config=open_edx_extract_job_config(DAGSTER_ENV), + selection=AssetSelection.assets(course_list).downstream(), + # only include direct descendants + depth=1, + include_self=True, ) + retrieve_openedx_course_data = Definitions( resources={ - "s3": S3Resource(), - "exports_dir": DailyResultsDir.configure_at_launch(), - "s3file_io_manager": S3FileObjectIOManager( + "io_manager": S3FileObjectIOManager( bucket=s3_uploads_bucket(DAGSTER_ENV)["bucket"], path_prefix=s3_uploads_bucket(DAGSTER_ENV)["prefix"], ), "vault": vault, + "openedx": OpenEdxApiClient.configure_at_launch(), }, jobs=[openedx_course_structures_job], assets=[ - CourseListConfig, course_list, course_structure, ], + schedules=[ + ScheduleDefinition( + name=f"{openedx_course_structures_job.name}_nightly", + cron_schedule="0 0 * * *", + job=openedx_course_structures_job, + ) + ], ) diff --git a/src/ol_orchestrate/jobs/open_edx.py b/src/ol_orchestrate/jobs/open_edx.py index f5530dc63..06a4e8fd2 100644 --- a/src/ol_orchestrate/jobs/open_edx.py +++ b/src/ol_orchestrate/jobs/open_edx.py @@ -1,9 +1,5 @@ from dagster import graph -from ol_orchestrate.assets.open_edx import ( - course_list, - course_structure, -) from ol_orchestrate.lib.hooks import ( notify_healthchecks_io_on_failure, notify_healthchecks_io_on_success,