Skip to content

Commit

Permalink
Drop and recreate pop constants instead of refreshing
Browse files Browse the repository at this point in the history
  • Loading branch information
stacimc committed Aug 17, 2023
1 parent d8574af commit f2d693e
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 9 deletions.
15 changes: 15 additions & 0 deletions catalog/dags/common/popularity/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,21 @@ def drop_media_matview(
postgres.run(f"DROP MATERIALIZED VIEW IF EXISTS public.{db_view} CASCADE;")


def drop_media_popularity_constants(
postgres_conn_id,
media_type=IMAGE,
constants=IMAGE_POPULARITY_CONSTANTS_VIEW,
pg_timeout: float = timedelta(minutes=10).total_seconds(),
):
if media_type == AUDIO:
constants = AUDIO_POPULARITY_CONSTANTS_VIEW

postgres = PostgresHook(
postgres_conn_id=postgres_conn_id, default_statement_timeout=pg_timeout
)
postgres.run(f"DROP MATERIALIZED VIEW IF EXISTS public.{constants} CASCADE;")


def drop_media_popularity_relations(
postgres_conn_id,
media_type=IMAGE,
Expand Down
30 changes: 21 additions & 9 deletions catalog/dags/popularity/refresh_popularity_metrics_task_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

GROUP_ID = "refresh_popularity_metrics_and_constants"
UPDATE_MEDIA_POPULARITY_METRICS_TASK_ID = "update_media_popularity_metrics_table"
UPDATE_MEDIA_POPULARITY_CONSTANTS_TASK_ID = "update_media_popularity_constants_view"
DROP_MEDIA_POPULARITY_CONSTANTS_TASK_ID = "drop_media_popularity_constants_view"
CREATE_MEDIA_POPULARITY_CONSTANTS_TASK_ID = "create_media_popularity_constants_view"


def create_refresh_popularity_metrics_task_group(
Expand Down Expand Up @@ -67,22 +68,33 @@ def create_refresh_popularity_metrics_task_group(
},
)

update_constants = PythonOperator(
task_id=UPDATE_MEDIA_POPULARITY_CONSTANTS_TASK_ID,
python_callable=sql.update_media_popularity_constants,
drop_constants = PythonOperator(
task_id=DROP_MEDIA_POPULARITY_CONSTANTS_TASK_ID,
python_callable=sql.drop_media_popularity_constants,
op_kwargs={
"postgres_conn_id": POSTGRES_CONN_ID,
"media_type": media_type,
},
execution_timeout=execution_timeout,
doc=("Drops the popularity constants view."),
)

create_constants = PythonOperator(
task_id=CREATE_MEDIA_POPULARITY_CONSTANTS_TASK_ID,
python_callable=sql.create_media_popularity_constants_view,
op_kwargs={
"postgres_conn_id": POSTGRES_CONN_ID,
"media_type": media_type,
},
execution_timeout=execution_timeout,
doc=(
"Updates the popularity constants view. This completely "
"Recreates the popularity constants view. This completely "
"recalculates the popularity constants for each provider."
),
)

update_constants_status = PythonOperator(
task_id=f"report_{UPDATE_MEDIA_POPULARITY_CONSTANTS_TASK_ID}_status",
recreate_constants_status = PythonOperator(
task_id=f"report_{CREATE_MEDIA_POPULARITY_CONSTANTS_TASK_ID}_status",
python_callable=reporting.report_status,
op_kwargs={
"media_type": media_type,
Expand All @@ -92,7 +104,7 @@ def create_refresh_popularity_metrics_task_group(
},
)

update_metrics >> [update_constants, update_metrics_status]
update_constants >> update_constants_status
update_metrics >> [drop_constants, update_metrics_status]
drop_constants >> create_constants >> recreate_constants_status

return refresh_all_popularity_data

0 comments on commit f2d693e

Please sign in to comment.