Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] authored and blarghmatey committed Aug 27, 2024
1 parent 379a84a commit 5095c91
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 19 deletions.
62 changes: 43 additions & 19 deletions src/ol_orchestrate/definitions/edx/openedx_data_extract.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
from typing import Literal

from dagster import AssetSelection, Definitions, ScheduleDefinition, define_asset_job
from dagster import (
AssetSelection,
Definitions,
ScheduleDefinition,
define_asset_job
)
from dagster_aws.s3 import S3Resource

from ol_orchestrate.assets.openedx import (
from ol_orchestrate.assets.edxorg_archive import (
CourseListConfig,
course_list,
course_structure,
)

from ol_orchestrate.io_managers.filepath import (
S3FileObjectIOManager,
)
from ol_orchestrate.jobs.open_edx import extract_open_edx_data_to_ol_data_platform
from ol_orchestrate.lib.constants import DAGSTER_ENV, VAULT_ADDRESS
from ol_orchestrate.resources.openedx import OpenEdxApiClient
from ol_orchestrate.resources.outputs import DailyResultsDir
from ol_orchestrate.resources.secrets.vault import Vault

if DAGSTER_ENV == "dev":
Expand Down Expand Up @@ -54,6 +64,30 @@ def open_edx_extract_job_config(
}


ol_extract_jobs = [
extract_open_edx_data_to_ol_data_platform.to_job(
resource_defs={
"results_dir": DailyResultsDir(),
"s3_upload": S3Resource(),
"s3": S3Resource(),
"openedx": OpenEdxApiClient.configure_at_launch(),
},
name=f"extract_{deployment}_open_edx_data_to_data_platform",
config=open_edx_extract_job_config(deployment, DAGSTER_ENV), # type: ignore[arg-type]
)
for deployment in ("residential", "xpro", "mitxonline")
]

openedx_data_extracts = Definitions(
jobs=ol_extract_jobs,
schedules=[
ScheduleDefinition(
name=f"{job.name}_nightly", cron_schedule="0 0 * * *", job=job
)
for job in ol_extract_jobs
],
)

def s3_uploads_bucket(
dagster_env: Literal["dev", "qa", "production"],
) -> dict[str, Any]:
Expand All @@ -67,7 +101,6 @@ def s3_uploads_bucket(
}
return bucket_map[dagster_env]


def edxorg_data_archive_config(dagster_env):
return {
"ops": {
Expand All @@ -81,35 +114,26 @@ def edxorg_data_archive_config(dagster_env):
}


openedx_course_structures_job = define_asset_job(
openedx_course_structures_job = extract_open_edx_data_to_ol_data_platform.to_job(
name="extract_open_edx_data_to_ol_data_platform",
config=open_edx_extract_job_config(DAGSTER_ENV),
selection=AssetSelection.assets(course_list).downstream(),
# only include direct descendants
depth=1,
include_self=True,
config=edxorg_data_archive_config(DAGSTER_ENV),
selection=AssetSelection.assets(CourseListConfig, course_list, course_structure),
)


retrieve_openedx_course_data = Definitions(
resources={
"io_manager": S3FileObjectIOManager(
"s3": S3Resource(),
"exports_dir": DailyResultsDir.configure_at_launch(),
"s3file_io_manager": S3FileObjectIOManager(
bucket=s3_uploads_bucket(DAGSTER_ENV)["bucket"],
path_prefix=s3_uploads_bucket(DAGSTER_ENV)["prefix"],
),
"vault": vault,
"openedx": OpenEdxApiClient.configure_at_launch(),
},
jobs=[openedx_course_structures_job],
assets=[
CourseListConfig,
course_list,
course_structure,
],
schedules=[
ScheduleDefinition(
name=f"{openedx_course_structures_job.name}_nightly",
cron_schedule="0 0 * * *",
job=openedx_course_structures_job,
)
],
)
4 changes: 4 additions & 0 deletions src/ol_orchestrate/jobs/open_edx.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from dagster import graph

from ol_orchestrate.assets.open_edx import (
course_list,
course_structure,
)
from ol_orchestrate.lib.hooks import (
notify_healthchecks_io_on_failure,
notify_healthchecks_io_on_success,
Expand Down

0 comments on commit 5095c91

Please sign in to comment.