1
1
from typing import Literal
2
2
3
- from dagster import (
4
- AssetSelection ,
5
- Definitions ,
6
- ScheduleDefinition ,
7
- define_asset_job
8
- )
3
+ from dagster import AssetSelection , Definitions , ScheduleDefinition , define_asset_job
9
4
from dagster_aws .s3 import S3Resource
10
5
11
- from ol_orchestrate .assets .edxorg_archive import (
12
- CourseListConfig ,
6
+ from ol_orchestrate .assets .openedx import (
13
7
course_list ,
14
8
course_structure ,
15
9
)
16
-
17
10
from ol_orchestrate .io_managers .filepath import (
18
11
S3FileObjectIOManager ,
19
12
)
20
- from ol_orchestrate .jobs .open_edx import extract_open_edx_data_to_ol_data_platform
21
13
from ol_orchestrate .lib .constants import DAGSTER_ENV , VAULT_ADDRESS
22
14
from ol_orchestrate .resources .openedx import OpenEdxApiClient
23
- from ol_orchestrate .resources .outputs import DailyResultsDir
24
15
from ol_orchestrate .resources .secrets .vault import Vault
25
16
26
17
if DAGSTER_ENV == "dev" :
@@ -64,30 +55,6 @@ def open_edx_extract_job_config(
64
55
}
65
56
66
57
67
- ol_extract_jobs = [
68
- extract_open_edx_data_to_ol_data_platform .to_job (
69
- resource_defs = {
70
- "results_dir" : DailyResultsDir (),
71
- "s3_upload" : S3Resource (),
72
- "s3" : S3Resource (),
73
- "openedx" : OpenEdxApiClient .configure_at_launch (),
74
- },
75
- name = f"extract_{ deployment } _open_edx_data_to_data_platform" ,
76
- config = open_edx_extract_job_config (deployment , DAGSTER_ENV ), # type: ignore[arg-type]
77
- )
78
- for deployment in ("residential" , "xpro" , "mitxonline" )
79
- ]
80
-
81
- openedx_data_extracts = Definitions (
82
- jobs = ol_extract_jobs ,
83
- schedules = [
84
- ScheduleDefinition (
85
- name = f"{ job .name } _nightly" , cron_schedule = "0 0 * * *" , job = job
86
- )
87
- for job in ol_extract_jobs
88
- ],
89
- )
90
-
91
58
def s3_uploads_bucket (
92
59
dagster_env : Literal ["dev" , "qa" , "production" ],
93
60
) -> dict [str , Any ]:
@@ -101,6 +68,7 @@ def s3_uploads_bucket(
101
68
}
102
69
return bucket_map [dagster_env ]
103
70
71
+
104
72
def edxorg_data_archive_config (dagster_env ):
105
73
return {
106
74
"ops" : {
@@ -114,26 +82,35 @@ def edxorg_data_archive_config(dagster_env):
114
82
}
115
83
116
84
117
- openedx_course_structures_job = extract_open_edx_data_to_ol_data_platform . to_job (
85
+ openedx_course_structures_job = define_asset_job (
118
86
name = "extract_open_edx_data_to_ol_data_platform" ,
119
- config = edxorg_data_archive_config (DAGSTER_ENV ),
120
- selection = AssetSelection .assets (CourseListConfig , course_list , course_structure ),
87
+ config = open_edx_extract_job_config (DAGSTER_ENV ),
88
+ selection = AssetSelection .assets (course_list ).downstream (),
89
+ # only include direct descendants
90
+ depth = 1 ,
91
+ include_self = True ,
121
92
)
122
93
94
+
123
95
retrieve_openedx_course_data = Definitions (
124
96
resources = {
125
- "s3" : S3Resource (),
126
- "exports_dir" : DailyResultsDir .configure_at_launch (),
127
- "s3file_io_manager" : S3FileObjectIOManager (
97
+ "io_manager" : S3FileObjectIOManager (
128
98
bucket = s3_uploads_bucket (DAGSTER_ENV )["bucket" ],
129
99
path_prefix = s3_uploads_bucket (DAGSTER_ENV )["prefix" ],
130
100
),
131
101
"vault" : vault ,
102
+ "openedx" : OpenEdxApiClient .configure_at_launch (),
132
103
},
133
104
jobs = [openedx_course_structures_job ],
134
105
assets = [
135
- CourseListConfig ,
136
106
course_list ,
137
107
course_structure ,
138
108
],
109
+ schedules = [
110
+ ScheduleDefinition (
111
+ name = f"{ openedx_course_structures_job .name } _nightly" ,
112
+ cron_schedule = "0 0 * * *" ,
113
+ job = openedx_course_structures_job ,
114
+ )
115
+ ],
139
116
)
0 commit comments