Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] authored and blarghmatey committed Aug 27, 2024
1 parent 175e6b3 commit 556a96d
Showing 1 changed file with 20 additions and 12 deletions.
32 changes: 20 additions & 12 deletions src/ol_orchestrate/assets/openedx.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@
from flatten_dict import flatten
from flatten_dict.reducers import make_reducer
from pydantic import Field
from upath import UPath

from ol_orchestrate.lib.openedx import un_nest_course_structure

course_list_asset_key = AssetKey(("openedx", "raw_data", "course_list")),


@asset(
key=AssetKey(("openedx", "raw_data", "course_list_config")),
# partitions_def=,
group_name="openedx",
)
class CourseListConfig(Config):
edx_course_api_page_size: int = Field(
default=100,
Expand All @@ -56,13 +61,14 @@ def course_list(context: AssetExecutionContext, config: CourseListConfig):
course_ids.extend([course["id"] for course in result_set])
yield Output(course_ids)


@multi_asset(
outs={
"course_structure": AssetOut(
key=AssetKey(("openedx", "raw_data", "course_structure")),
io_manager_key="s3file_io_manager",
description=("A json file with the course structure information."),
description=(
"A json file with the course structure information."
),
auto_materialize_policy=AutoMaterializePolicy.eager(
max_materializations_per_minute=None
),
Expand All @@ -79,17 +85,20 @@ def course_list(context: AssetExecutionContext, config: CourseListConfig):
),
),
},
ins={"course_list": AssetIn(key=AssetKey(("openedx", "raw_data", "course_list")))},
ins={
"course_list": AssetIn(
key=AssetKey(("openedx", "raw_data", "course_list"))
)
},
# partitions_def=,
group_name="openedx",
)
def course_structure(
context: AssetExecutionContext,
course_list: list[str],
context: AssetExecutionContext, course_list: list[str],
):
dagster_instance = context.instance
## TODO: Is this correctly iterating over the list of course_ids?
input_asset_materialization_event = dagster_instance.get_event_records(
input_asset_materialization_event = dagster_instance.get_event_records(
event_records_filter=EventRecordsFilter(
asset_key=context.asset_key_for_input("course_list"),
event_type=DagsterEventType.ASSET_MATERIALIZATION,
Expand Down Expand Up @@ -131,9 +140,9 @@ def course_structure(
course_id, course_structure_document, data_retrieval_timestamp
):
blocks.write(block)
# TODO: How do i get the open_edx_deployment from the context? instance?
structure_object_key = f"{context.open_edx_deployment}_openedx_extracts/course_structure/{context.partition_key}/course_structures_{data_version}.json" # noqa: E501
blocks_object_key = f"{context.open_edx_deployment}_openedx_extracts/course_blocks/{context.partition_key}/course_blocks_{data_version}.json" # noqa: E501
# todo: How do i get the open_edx_deployment from the context? instance?
structure_object_key = f"{context.open_edx_deployment}_openedx_extracts/course_structure/{context.partition_key}/course_structures_{data_version}.json" # noqa: E501
blocks_object_key = f"{context.open_edx_deployment}_openedx_extracts/course_blocks/{context.partition_key}/course_blocks_{data_version}.json" # noqa: E501
yield Output(
(structures_file, structure_object_key),
output_name="flattened_course_structure",
Expand All @@ -145,7 +154,6 @@ def course_structure(
output_name="course_blocks",
data_version=DataVersion(data_version),
metadata={
"course_id": course_id,
"object_key": blocks_object_key,
"course_id": course_id, "object_key": blocks_object_key,
},
)

0 comments on commit 556a96d

Please sign in to comment.