Skip to content

Commit 1b24db1

Browse files
quazi-hblarghmatey
authored andcommitted
Refactoring the openedx course structures pipeline code to Dagster assets.
Added a course_list asset and a course_structure multi asset that will produce the course_blocks and course_structures files. Utilizing the existing s3 io_manager to handle the asset oriented files and using data versioning to better organize the uploaded files. Still need to update the actual job to call these new components. Leaving the existing openedx ops alone since the edx_course_pipeline graph job still requires them and we do not want to disrupt downstream consumers of that data like Jon.
1 parent 7aff85c commit 1b24db1

File tree

1 file changed

+166
-0
lines changed

1 file changed

+166
-0
lines changed

src/ol_orchestrate/assets/openedx.py

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
# - Query the openedx api to get course structures and course blocks data
2+
# - Model the different asset objects according to their type
3+
4+
import hashlib
5+
import json
6+
from datetime import UTC, datetime
7+
from pathlib import Path
8+
9+
import jsonlines
10+
from dagster import (
11+
AssetExecutionContext,
12+
AssetIn,
13+
AssetKey,
14+
AssetOut,
15+
AutoMaterializePolicy,
16+
Config,
17+
DagsterEventType,
18+
DataVersion,
19+
EventRecordsFilter,
20+
Output,
21+
asset,
22+
multi_asset,
23+
)
24+
from flatten_dict import flatten
25+
from flatten_dict.reducers import make_reducer
26+
from pydantic import Field
27+
from upath import UPath
28+
29+
from ol_orchestrate.lib.openedx import un_nest_course_structure
30+
31+
32+
@asset(
33+
key=AssetKey(("openedx", "raw_data", "course_list_config")),
34+
# partitions_def=,
35+
group_name="openedx",
36+
)
37+
class CourseListConfig(Config):
38+
edx_course_api_page_size: int = Field(
39+
default=100,
40+
description="The number of records to return per API request. This can be "
41+
"modified to address issues with rate limiting.",
42+
)
43+
44+
45+
@asset(
46+
# partitions_def=,
47+
key=AssetKey(("openedx", "raw_data", "course_list")),
48+
group_name="openedx",
49+
io_manager_key="s3file_io_manager",
50+
# todo: best practice for passing in the config for page_size?
51+
ins={
52+
"course_list_config": AssetIn(
53+
key=AssetKey(("openedx", "raw_data", "course_list_config"))
54+
)
55+
},
56+
auto_materialize_policy=AutoMaterializePolicy.eager(
57+
max_materializations_per_minute=None
58+
),
59+
)
60+
def course_list(
61+
context: AssetExecutionContext, config: Config
62+
):
63+
course_ids = []
64+
course_id_generator = context.resources.openedx.get_edx_course_ids(
65+
page_size=config.edx_course_api_page_size,
66+
)
67+
for result_set in course_id_generator:
68+
course_ids.extend([course["id"] for course in result_set])
69+
yield Output(course_ids)
70+
71+
@multi_asset(
72+
outs={
73+
"course_structure": AssetOut(
74+
key=AssetKey(("openedx", "raw_data", "course_structure")),
75+
io_manager_key="s3file_io_manager",
76+
description=(
77+
"A json file with the course structure information."
78+
),
79+
auto_materialize_policy=AutoMaterializePolicy.eager(
80+
max_materializations_per_minute=None
81+
),
82+
),
83+
"course_blocks": AssetOut(
84+
key=AssetKey(("openedx", "raw_data", "course_blocks")),
85+
io_manager_key="s3file_io_manager",
86+
description=(
87+
"A json file containing the hierarchical representation"
88+
"of the course structure information with course blocks."
89+
),
90+
auto_materialize_policy=AutoMaterializePolicy.eager(
91+
max_materializations_per_minute=None
92+
),
93+
),
94+
},
95+
ins={
96+
"course_list": AssetIn(
97+
key=AssetKey(("openedx", "raw_data", "course_list"))
98+
)
99+
},
100+
# partitions_def=,
101+
group_name="openedx",
102+
)
103+
def course_structure(
104+
context: AssetExecutionContext, course_list: list[str],
105+
):
106+
dagster_instance = context.instance
107+
## TODO: Is this correctly iterating over the list of course_ids?
108+
input_asset_materialization_event = dagster_instance.get_event_records(
109+
event_records_filter=EventRecordsFilter(
110+
asset_key=context.asset_key_for_input("course_list"),
111+
event_type=DagsterEventType.ASSET_MATERIALIZATION,
112+
asset_partitions=[context.partiton_key],
113+
),
114+
limit=1,
115+
)[0]
116+
course_id = input_asset_materialization_event.asset_materialization.metadata[
117+
"course_id"
118+
]
119+
course_structure = context.resources.openedx.get_course_structure_document(
120+
course_id
121+
)
122+
course_structure_document = json.load(course_structure.open())
123+
data_version = hashlib.sha256(
124+
json.dumps(course_structure_document).encode("utf-8")
125+
).hexdigest()
126+
structures_file = Path(f"course_structures_{data_version}.json")
127+
blocks_file = Path(f"course_blocks_{data_version}.json")
128+
data_retrieval_timestamp = datetime.now(tz=UTC).isoformat()
129+
with (
130+
jsonlines.open(structures_file, mode="w") as structures,
131+
jsonlines.open(blocks_file, mode="w") as blocks,
132+
):
133+
table_row = {
134+
"content_hash": hashlib.sha256(
135+
json.dumps(course_structure_document).encode("utf-8")
136+
).hexdigest(),
137+
"course_id": context.partition_key,
138+
"course_structure": course_structure_document,
139+
"course_structure_flattened": flatten(
140+
course_structure_document,
141+
reducer=make_reducer("__"),
142+
),
143+
"retrieved_at": data_retrieval_timestamp,
144+
}
145+
structures.write(table_row)
146+
for block in un_nest_course_structure(
147+
course_id, course_structure_document, data_retrieval_timestamp
148+
):
149+
blocks.write(block)
150+
# todo: How do i get the open_edx_deployment from the context? instance?
151+
structure_object_key = f"{context.open_edx_deployment}_openedx_extracts/course_structure/{context.partition_key}/course_structures_{data_version}.json" # noqa: E501
152+
blocks_object_key = f"{context.open_edx_deployment}_openedx_extracts/course_blocks/{context.partition_key}/course_blocks_{data_version}.json" # noqa: E501
153+
yield Output(
154+
(structures_file, structure_object_key),
155+
output_name="flattened_course_structure",
156+
data_version=DataVersion(data_version),
157+
metadata={"course_id": course_id, "object_key": structure_object_key},
158+
)
159+
yield Output(
160+
(blocks_file, blocks_object_key),
161+
output_name="course_blocks",
162+
data_version=DataVersion(data_version),
163+
metadata={
164+
"course_id": course_id, "object_key": blocks_object_key,
165+
},
166+
)

0 commit comments

Comments
 (0)