Skip to content

Commit

Permalink
Revert "Rename flows"
Browse files Browse the repository at this point in the history
This reverts commit 148ac51.
  • Loading branch information
VincentAntoine committed Feb 5, 2025
1 parent c8ade97 commit bfe12a0
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"source_database","source_table","query_filepath","destination_database","destination_table","ddl_script_path","order_by","cron_string",flow_name
"monitorfish_proxy","control_objectives",,"monitorfish","control_objectives",,"year","30 4 * * *",Fish - Control objectives
"monitorfish_proxy","fleet_segments",,"monitorfish","fleet_segments",,"year","32 4 * * *",Fish - Fleet segments
"monitorfish_proxy","ports",,"monitorfish","ports",,"locode","34 4 * * *",Fish - Ports
"monitorfish_proxy","species",,"monitorfish","species",,"species_code","36 4 * * *",Fish - Species
monitorfish_proxy,,monitorfish_proxy/vessels.sql,monitorfish,vessels,,id,40 4 * * *,Fish - Vessels
"monitorfish_proxy","infractions",,"monitorfish","infractions",,"natinf_code","42 4 * * *",Fish - Infractions
"source_database","source_table","query_filepath","destination_database","destination_table","ddl_script_path","order_by","cron_string"
"monitorfish_proxy","control_objectives",,"monitorfish","control_objectives",,"year","30 4 * * *"
"monitorfish_proxy","fleet_segments",,"monitorfish","fleet_segments",,"year","32 4 * * *"
"monitorfish_proxy","ports",,"monitorfish","ports",,"locode","34 4 * * *"
"monitorfish_proxy","species",,"monitorfish","species",,"species_code","36 4 * * *"
monitorfish_proxy,,monitorfish_proxy/vessels.sql,monitorfish,vessels,,id,40 4 * * *
"monitorfish_proxy","infractions",,"monitorfish","infractions",,"natinf_code","42 4 * * *"
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"source_database","query_filepath","schema","table_name","backend","geom_col","destination_database","destination_table","ddl_script_path","post_processing_script_path","final_table","cron_string",flow_name
"monitorenv_remote",,"public","amp_cacem","geopandas","geom","monitorenv","amp_cacem_tmp","monitorenv/create_amp_cacem_tmp.sql","post_process_amp_cacem.sql","amp_cacem","31 1 * * *",Env - AMP
"monitorenv_remote","monitorenv_remote/analytics_actions.sql",,,"pandas","geom","monitorenv","analytics_actions","monitorenv/create_analytics_actions.sql",,,"30 * * * *",Env - Analytics actions
"monitorfish_remote","monitorfish_remote/analytics_controls_full_data.sql",,,"pandas","geom","monitorfish","analytics_controls_full_data","monitorfish/create_analytics_controls_full_data.sql",,,"32 1 * * *",Fish - Analytics controls full data
"monitorfish_remote",,"public","fao_areas","geopandas","wkb_geometry","monitorfish","fao_areas_tmp","monitorfish/create_fao_areas_tmp.sql","post_process_fao_areas.sql","fao_areas","34 1 * * *",Fish - Fao areas
"monitorfish_remote",,"public","rectangles_stat_areas","geopandas","wkb_geometry","monitorfish","rectangles_stat_areas_tmp","monitorfish/create_rectangles_stat_areas_tmp.sql","post_process_rectangles_stat_areas.sql","rectangles_stat_areas","36 1 * * *",Fish - Stat rectangles
"monitorfish_remote","monitorfish_remote/facade_areas_subdivided.sql",,,"geopandas","geometry","monitorfish","facade_areas_subdivided_tmp","monitorfish/create_facade_areas_subdivided_tmp.sql","post_process_facade_areas_subdivided.sql","facade_areas_subdivided","38 1 * * *",Fish - Facade areas
"monitorfish_remote","monitorfish_remote/eez_areas.sql",,,"geopandas","wkb_geometry","monitorfish","eez_areas_tmp","monitorfish/create_eez_areas_tmp.sql","post_process_eez_areas.sql","eez_areas","40 1 * * *",Fish - EEZ Areas
"source_database","query_filepath","schema","table_name","backend","geom_col","destination_database","destination_table","ddl_script_path","post_processing_script_path","final_table","cron_string"
"monitorenv_remote",,"public","amp_cacem","geopandas","geom","monitorenv","amp_cacem_tmp","monitorenv/create_amp_cacem_tmp.sql","post_process_amp_cacem.sql","amp_cacem","31 1 * * *"
"monitorenv_remote","monitorenv_remote/analytics_actions.sql",,,"pandas","geom","monitorenv","analytics_actions","monitorenv/create_analytics_actions.sql",,,"30 * * * *"
"monitorfish_remote","monitorfish_remote/analytics_controls_full_data.sql",,,"pandas","geom","monitorfish","analytics_controls_full_data","monitorfish/create_analytics_controls_full_data.sql",,,"32 1 * * *"
"monitorfish_remote",,"public","fao_areas","geopandas","wkb_geometry","monitorfish","fao_areas_tmp","monitorfish/create_fao_areas_tmp.sql","post_process_fao_areas.sql","fao_areas","34 1 * * *"
"monitorfish_remote",,"public","rectangles_stat_areas","geopandas","wkb_geometry","monitorfish","rectangles_stat_areas_tmp","monitorfish/create_rectangles_stat_areas_tmp.sql","post_process_rectangles_stat_areas.sql","rectangles_stat_areas","36 1 * * *"
"monitorfish_remote","monitorfish_remote/facade_areas_subdivided.sql",,,"geopandas","geometry","monitorfish","facade_areas_subdivided_tmp","monitorfish/create_facade_areas_subdivided_tmp.sql","post_process_facade_areas_subdivided.sql","facade_areas_subdivided","38 1 * * *"
"monitorfish_remote","monitorfish_remote/eez_areas.sql",,,"geopandas","wkb_geometry","monitorfish","eez_areas_tmp","monitorfish/create_eez_areas_tmp.sql","post_process_eez_areas.sql","eez_areas","40 1 * * *"
38 changes: 14 additions & 24 deletions forklift/forklift/pipeline/flows_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def make_cron_clock_from_run_param_series(s: pd.Series) -> clocks.CronClock:

################################ Define flow schedules ################################
def get_flows_to_register():
flows_to_register = []
clean_flow_runs_flow = deepcopy(clean_flow_runs.flow)
reset_proxy_pg_database_flow = deepcopy(reset_proxy_pg_database.flow)
sync_table_from_db_connection_flow = deepcopy(sync_table_from_db_connection.flow)
Expand All @@ -41,38 +40,29 @@ def get_flows_to_register():
scheduled_runs = pd.read_csv(
LIBRARY_LOCATION / "pipeline/flow_schedules/sync_table_from_db_connection.csv"
)
for scheduled_run in scheduled_runs.iterrows():
flow = sync_table_from_db_connection_flow.copy()
flow.name = scheduled_run[1].loc["flow_name"]
flow.schedule = Schedule(
clocks=[
make_cron_clock_from_run_param_series(
s=scheduled_run[1].drop("flow_name")
)
]
)
flows_to_register.append(flow)
sync_table_from_db_connection_flow.schedule = Schedule(
clocks=[
make_cron_clock_from_run_param_series(s=run[1])
for run in scheduled_runs.iterrows()
]
)

scheduled_runs = pd.read_csv(
LIBRARY_LOCATION / "pipeline/flow_schedules/sync_table_with_pandas.csv"
)
for scheduled_run in scheduled_runs.iterrows():
flow = sync_table_with_pandas_flow.copy()
flow.name = scheduled_run[1].loc["flow_name"]
flow.schedule = Schedule(
clocks=[
make_cron_clock_from_run_param_series(
s=scheduled_run[1].drop("flow_name")
)
]
)
flows_to_register.append(flow)
sync_table_with_pandas_flow.schedule = Schedule(
clocks=[
make_cron_clock_from_run_param_series(s=run[1])
for run in scheduled_runs.iterrows()
]
)

#################### List flows to register with prefect server ###################
flows_to_register += [
flows_to_register = [
clean_flow_runs_flow,
reset_proxy_pg_database_flow,
sync_table_from_db_connection_flow,
sync_table_with_pandas_flow,
]

############################## Define flows' storage ##############################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

scheduled_runs = pd.read_csv(
LIBRARY_LOCATION / "pipeline/flow_schedules/sync_table_from_db_connection.csv"
).drop(columns=["cron_string", "flow_name"])
).drop(columns=["cron_string"])
parameters = ",".join(scheduled_runs.columns)
try:
assert parameters == (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

scheduled_runs = pd.read_csv(
LIBRARY_LOCATION / "pipeline/flow_schedules/sync_table_with_pandas.csv"
).drop(columns=["cron_string", "flow_name"])
).drop(columns=["cron_string"])
parameters = ",".join(scheduled_runs.columns)
try:
assert parameters == (
Expand Down
2 changes: 0 additions & 2 deletions forklift/tests/test_pipeline/test_flows_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
def test_flows_registration():
for flow in get_flows_to_register():
# Check that the flow and its params can be serialized and deserialized
print(f"Testing { flow.name }")
serialized_flow = flow.serialize()
prefect.serialization.flow.FlowSchema().load(serialized_flow)

Expand All @@ -15,7 +14,6 @@ def test_flows_registration():
if flow.schedule is not None and required_parameters:
required_names = {p.name for p in required_parameters}
for c in flow.schedule.clocks:
print(c.cron)
try:
assert required_names <= set(c.parameter_defaults.keys())
except AssertionError:
Expand Down

0 comments on commit bfe12a0

Please sign in to comment.