Skip to content

Commit

Permalink
Updated the open_edx job to use the new asset oriented methods.
Browse files Browse the repository at this point in the history
Pulled in the new assets into the openedx_data_extract definitions.
Slightly refactored the assets code.
  • Loading branch information
quazi-h authored and blarghmatey committed Aug 27, 2024
1 parent 1b24db1 commit 175e6b3
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 34 deletions.
45 changes: 15 additions & 30 deletions src/ol_orchestrate/assets/openedx.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,12 @@
from flatten_dict import flatten
from flatten_dict.reducers import make_reducer
from pydantic import Field
from upath import UPath

from ol_orchestrate.lib.openedx import un_nest_course_structure

course_list_asset_key = AssetKey(("openedx", "raw_data", "course_list")),


@asset(
key=AssetKey(("openedx", "raw_data", "course_list_config")),
# partitions_def=,
group_name="openedx",
)
class CourseListConfig(Config):
edx_course_api_page_size: int = Field(
default=100,
Expand All @@ -44,22 +40,14 @@ class CourseListConfig(Config):

@asset(
# partitions_def=,
key=AssetKey(("openedx", "raw_data", "course_list")),
key=course_list_asset_key,
group_name="openedx",
io_manager_key="s3file_io_manager",
# todo: best practice for passing in the config for page_size?
ins={
"course_list_config": AssetIn(
key=AssetKey(("openedx", "raw_data", "course_list_config"))
)
},
auto_materialize_policy=AutoMaterializePolicy.eager(
max_materializations_per_minute=None
),
)
def course_list(
context: AssetExecutionContext, config: Config
):
def course_list(context: AssetExecutionContext, config: CourseListConfig):
course_ids = []
course_id_generator = context.resources.openedx.get_edx_course_ids(
page_size=config.edx_course_api_page_size,
Expand All @@ -68,14 +56,13 @@ def course_list(
course_ids.extend([course["id"] for course in result_set])
yield Output(course_ids)


@multi_asset(
outs={
"course_structure": AssetOut(
key=AssetKey(("openedx", "raw_data", "course_structure")),
io_manager_key="s3file_io_manager",
description=(
"A json file with the course structure information."
),
description=("A json file with the course structure information."),
auto_materialize_policy=AutoMaterializePolicy.eager(
max_materializations_per_minute=None
),
Expand All @@ -92,20 +79,17 @@ def course_list(
),
),
},
ins={
"course_list": AssetIn(
key=AssetKey(("openedx", "raw_data", "course_list"))
)
},
ins={"course_list": AssetIn(key=AssetKey(("openedx", "raw_data", "course_list")))},
# partitions_def=,
group_name="openedx",
)
def course_structure(
context: AssetExecutionContext, course_list: list[str],
context: AssetExecutionContext,
course_list: list[str],
):
dagster_instance = context.instance
## TODO: Is this correctly iterating over the list of course_ids?
input_asset_materialization_event = dagster_instance.get_event_records(
input_asset_materialization_event = dagster_instance.get_event_records(
event_records_filter=EventRecordsFilter(
asset_key=context.asset_key_for_input("course_list"),
event_type=DagsterEventType.ASSET_MATERIALIZATION,
Expand Down Expand Up @@ -147,9 +131,9 @@ def course_structure(
course_id, course_structure_document, data_retrieval_timestamp
):
blocks.write(block)
# todo: How do i get the open_edx_deployment from the context? instance?
structure_object_key = f"{context.open_edx_deployment}_openedx_extracts/course_structure/{context.partition_key}/course_structures_{data_version}.json" # noqa: E501
blocks_object_key = f"{context.open_edx_deployment}_openedx_extracts/course_blocks/{context.partition_key}/course_blocks_{data_version}.json" # noqa: E501
# TODO: How do i get the open_edx_deployment from the context? instance?
structure_object_key = f"{context.open_edx_deployment}_openedx_extracts/course_structure/{context.partition_key}/course_structures_{data_version}.json" # noqa: E501
blocks_object_key = f"{context.open_edx_deployment}_openedx_extracts/course_blocks/{context.partition_key}/course_blocks_{data_version}.json" # noqa: E501
yield Output(
(structures_file, structure_object_key),
output_name="flattened_course_structure",
Expand All @@ -161,6 +145,7 @@ def course_structure(
output_name="course_blocks",
data_version=DataVersion(data_version),
metadata={
"course_id": course_id, "object_key": blocks_object_key,
"course_id": course_id,
"object_key": blocks_object_key,
},
)
66 changes: 65 additions & 1 deletion src/ol_orchestrate/definitions/edx/openedx_data_extract.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
from typing import Literal

from dagster import Definitions, ScheduleDefinition
from dagster import (
AssetSelection,
Definitions,
ScheduleDefinition,
define_asset_job
)
from dagster_aws.s3 import S3Resource

from ol_orchestrate.assets.edxorg_archive import (
CourseListConfig,
course_list,
course_structure,
)

from ol_orchestrate.io_managers.filepath import (
S3FileObjectIOManager,
)
from ol_orchestrate.jobs.open_edx import extract_open_edx_data_to_ol_data_platform
from ol_orchestrate.lib.constants import DAGSTER_ENV, VAULT_ADDRESS
from ol_orchestrate.resources.openedx import OpenEdxApiClient
Expand Down Expand Up @@ -73,3 +87,53 @@ def open_edx_extract_job_config(
for job in ol_extract_jobs
],
)

def s3_uploads_bucket(
dagster_env: Literal["dev", "qa", "production"],
) -> dict[str, Any]:
bucket_map = {
"dev": {"bucket": "ol-devops-sandbox", "prefix": "pipeline-storage"},
"qa": {"bucket": "ol-data-lake-landing-zone-qa", "prefix": "edxorg-raw-data"},
"production": {
"bucket": "ol-data-lake-landing-zone-production",
"prefix": "edxorg-raw-data",
},
}
return bucket_map[dagster_env]

def edxorg_data_archive_config(dagster_env):
return {
"ops": {
"process_edxorg_archive_bundle": {
"config": {
"s3_bucket": s3_uploads_bucket(dagster_env)["bucket"],
"s3_prefix": s3_uploads_bucket(dagster_env)["prefix"],
}
},
}
}


openedx_course_structures_job = extract_open_edx_data_to_ol_data_platform.to_job(
name="extract_open_edx_data_to_ol_data_platform",
config=edxorg_data_archive_config(DAGSTER_ENV),
selection=AssetSelection.assets(CourseListConfig, course_list, course_structure),
)

retrieve_openedx_course_data = Definitions(
resources={
"s3": S3Resource(),
"exports_dir": DailyResultsDir.configure_at_launch(),
"s3file_io_manager": S3FileObjectIOManager(
bucket=s3_uploads_bucket(DAGSTER_ENV)["bucket"],
path_prefix=s3_uploads_bucket(DAGSTER_ENV)["prefix"],
),
"vault": vault,
},
jobs=[openedx_course_structures_job],
assets=[
CourseListConfig,
course_list,
course_structure,
],
)
9 changes: 6 additions & 3 deletions src/ol_orchestrate/jobs/open_edx.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
user_roles,
write_course_list_csv,
)
from ol_orchestrate.assets.open_edx import (
course_list,
course_structure,
course_list_asset_key,
)


@graph(
Expand Down Expand Up @@ -69,6 +74,4 @@ def edx_course_pipeline():
},
)
def extract_open_edx_data_to_ol_data_platform():
fetch_edx_course_structure_from_api(list_courses()).map(
lambda fpath: upload_files_to_s3(fpath)
)
course_structure(course_list())

0 comments on commit 175e6b3

Please sign in to comment.