From 175e6b3c1ccb0d8fbc8175ebf14e76da229aaaf4 Mon Sep 17 00:00:00 2001 From: Quazi Date: Tue, 6 Aug 2024 15:52:25 -0400 Subject: [PATCH] Updated the open_edx job to use the new asset oriented methods. Pulled in the new assets into the openedx_data_extract definitions. Slightly refactored the assets code. --- src/ol_orchestrate/assets/openedx.py | 45 +++++-------- .../definitions/edx/openedx_data_extract.py | 66 ++++++++++++++++++- src/ol_orchestrate/jobs/open_edx.py | 9 ++- 3 files changed, 86 insertions(+), 34 deletions(-) diff --git a/src/ol_orchestrate/assets/openedx.py b/src/ol_orchestrate/assets/openedx.py index b42e87cc2..5f18c683f 100644 --- a/src/ol_orchestrate/assets/openedx.py +++ b/src/ol_orchestrate/assets/openedx.py @@ -24,16 +24,12 @@ from flatten_dict import flatten from flatten_dict.reducers import make_reducer from pydantic import Field -from upath import UPath from ol_orchestrate.lib.openedx import un_nest_course_structure +course_list_asset_key = AssetKey(("openedx", "raw_data", "course_list")), + -@asset( - key=AssetKey(("openedx", "raw_data", "course_list_config")), - # partitions_def=, - group_name="openedx", -) class CourseListConfig(Config): edx_course_api_page_size: int = Field( default=100, @@ -44,22 +40,14 @@ class CourseListConfig(Config): @asset( # partitions_def=, - key=AssetKey(("openedx", "raw_data", "course_list")), + key=course_list_asset_key, group_name="openedx", io_manager_key="s3file_io_manager", - # todo: best practice for passing in the config for page_size? - ins={ - "course_list_config": AssetIn( - key=AssetKey(("openedx", "raw_data", "course_list_config")) - ) - }, auto_materialize_policy=AutoMaterializePolicy.eager( max_materializations_per_minute=None ), ) -def course_list( - context: AssetExecutionContext, config: Config -): +def course_list(context: AssetExecutionContext, config: CourseListConfig): course_ids = [] course_id_generator = context.resources.openedx.get_edx_course_ids( page_size=config.edx_course_api_page_size, @@ -68,14 +56,13 @@ def course_list( course_ids.extend([course["id"] for course in result_set]) yield Output(course_ids) + @multi_asset( outs={ "course_structure": AssetOut( key=AssetKey(("openedx", "raw_data", "course_structure")), io_manager_key="s3file_io_manager", - description=( - "A json file with the course structure information." - ), + description=("A json file with the course structure information."), auto_materialize_policy=AutoMaterializePolicy.eager( max_materializations_per_minute=None ), @@ -92,20 +79,17 @@ def course_list( ), ), }, - ins={ - "course_list": AssetIn( - key=AssetKey(("openedx", "raw_data", "course_list")) - ) - }, + ins={"course_list": AssetIn(key=AssetKey(("openedx", "raw_data", "course_list")))}, # partitions_def=, group_name="openedx", ) def course_structure( - context: AssetExecutionContext, course_list: list[str], + context: AssetExecutionContext, + course_list: list[str], ): dagster_instance = context.instance ## TODO: Is this correctly iterating over the list of course_ids? - input_asset_materialization_event = dagster_instance.get_event_records( + input_asset_materialization_event = dagster_instance.get_event_records( event_records_filter=EventRecordsFilter( asset_key=context.asset_key_for_input("course_list"), event_type=DagsterEventType.ASSET_MATERIALIZATION, @@ -147,9 +131,9 @@ def course_structure( course_id, course_structure_document, data_retrieval_timestamp ): blocks.write(block) - # todo: How do i get the open_edx_deployment from the context? instance? - structure_object_key = f"{context.open_edx_deployment}_openedx_extracts/course_structure/{context.partition_key}/course_structures_{data_version}.json" # noqa: E501 - blocks_object_key = f"{context.open_edx_deployment}_openedx_extracts/course_blocks/{context.partition_key}/course_blocks_{data_version}.json" # noqa: E501 + # TODO: How do i get the open_edx_deployment from the context? instance? + structure_object_key = f"{context.open_edx_deployment}_openedx_extracts/course_structure/{context.partition_key}/course_structures_{data_version}.json" # noqa: E501 + blocks_object_key = f"{context.open_edx_deployment}_openedx_extracts/course_blocks/{context.partition_key}/course_blocks_{data_version}.json" # noqa: E501 yield Output( (structures_file, structure_object_key), output_name="flattened_course_structure", @@ -161,6 +145,7 @@ def course_structure( output_name="course_blocks", data_version=DataVersion(data_version), metadata={ - "course_id": course_id, "object_key": blocks_object_key, + "course_id": course_id, + "object_key": blocks_object_key, }, ) diff --git a/src/ol_orchestrate/definitions/edx/openedx_data_extract.py b/src/ol_orchestrate/definitions/edx/openedx_data_extract.py index 645dd7103..74e9decd2 100644 --- a/src/ol_orchestrate/definitions/edx/openedx_data_extract.py +++ b/src/ol_orchestrate/definitions/edx/openedx_data_extract.py @@ -1,8 +1,22 @@ from typing import Literal -from dagster import Definitions, ScheduleDefinition +from dagster import ( +AssetSelection, + Definitions, + ScheduleDefinition, +define_asset_job +) from dagster_aws.s3 import S3Resource +from ol_orchestrate.assets.edxorg_archive import ( + CourseListConfig, + course_list, + course_structure, +) + +from ol_orchestrate.io_managers.filepath import ( + S3FileObjectIOManager, +) from ol_orchestrate.jobs.open_edx import extract_open_edx_data_to_ol_data_platform from ol_orchestrate.lib.constants import DAGSTER_ENV, VAULT_ADDRESS from ol_orchestrate.resources.openedx import OpenEdxApiClient @@ -73,3 +87,53 @@ def open_edx_extract_job_config( for job in ol_extract_jobs ], ) + +def s3_uploads_bucket( + dagster_env: Literal["dev", "qa", "production"], +) -> dict[str, Any]: + bucket_map = { + "dev": {"bucket": "ol-devops-sandbox", "prefix": "pipeline-storage"}, + "qa": {"bucket": "ol-data-lake-landing-zone-qa", "prefix": "edxorg-raw-data"}, + "production": { + "bucket": "ol-data-lake-landing-zone-production", + "prefix": "edxorg-raw-data", + }, + } + return bucket_map[dagster_env] + +def edxorg_data_archive_config(dagster_env): + return { + "ops": { + "process_edxorg_archive_bundle": { + "config": { + "s3_bucket": s3_uploads_bucket(dagster_env)["bucket"], + "s3_prefix": s3_uploads_bucket(dagster_env)["prefix"], + } + }, + } + } + + +openedx_course_structures_job = extract_open_edx_data_to_ol_data_platform.to_job( + name="extract_open_edx_data_to_ol_data_platform", + config=edxorg_data_archive_config(DAGSTER_ENV), + selection=AssetSelection.assets(CourseListConfig, course_list, course_structure), +) + +retrieve_openedx_course_data = Definitions( + resources={ + "s3": S3Resource(), + "exports_dir": DailyResultsDir.configure_at_launch(), + "s3file_io_manager": S3FileObjectIOManager( + bucket=s3_uploads_bucket(DAGSTER_ENV)["bucket"], + path_prefix=s3_uploads_bucket(DAGSTER_ENV)["prefix"], + ), + "vault": vault, + }, + jobs=[openedx_course_structures_job], + assets=[ + CourseListConfig, + course_list, + course_structure, + ], +) diff --git a/src/ol_orchestrate/jobs/open_edx.py b/src/ol_orchestrate/jobs/open_edx.py index bb934b1a8..5af183553 100644 --- a/src/ol_orchestrate/jobs/open_edx.py +++ b/src/ol_orchestrate/jobs/open_edx.py @@ -18,6 +18,11 @@ user_roles, write_course_list_csv, ) +from ol_orchestrate.assets.open_edx import ( + course_list, + course_structure, + course_list_asset_key, +) @graph( @@ -69,6 +74,4 @@ def edx_course_pipeline(): }, ) def extract_open_edx_data_to_ol_data_platform(): - fetch_edx_course_structure_from_api(list_courses()).map( - lambda fpath: upload_files_to_s3(fpath) - ) + course_structure(course_list())