diff --git a/catalog/dags/common/loader/sql.py b/catalog/dags/common/loader/sql.py index 9a5fd33dbf9..dab820167e0 100644 --- a/catalog/dags/common/loader/sql.py +++ b/catalog/dags/common/loader/sql.py @@ -7,7 +7,7 @@ from common.constants import IMAGE, MediaType, SQLInfo from common.loader import provider_details as prov from common.loader.paths import _extract_media_type -from common.sql import PostgresHook +from common.sql import RETURN_ROW_COUNT, PostgresHook from common.storage import columns as col from common.storage.columns import NULL, Column, UpsertStrategy from common.storage.db_columns import setup_db_columns_for_media_type @@ -40,7 +40,6 @@ } CURRENT_TSV_VERSION = "001" -RETURN_ROW_COUNT = lambda c: c.rowcount # noqa: E731 def create_column_definitions(table_columns: list[Column], is_loading=True): diff --git a/catalog/dags/common/sql.py b/catalog/dags/common/sql.py index f9f2f308bcf..2181957da5b 100644 --- a/catalog/dags/common/sql.py +++ b/catalog/dags/common/sql.py @@ -27,6 +27,9 @@ # https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/_api/airflow/providers/postgres/hooks/postgres/index.html#airflow.providers.postgres.hooks.postgres.PostgresHook.copy_expert # noqa +RETURN_ROW_COUNT = lambda c: c.rowcount # noqa: E731 + + def single_value(cursor): try: row = cursor.fetchone() diff --git a/catalog/dags/common/storage/columns.py b/catalog/dags/common/storage/columns.py index b134dd0df8a..5c096ba4fdf 100644 --- a/catalog/dags/common/storage/columns.py +++ b/catalog/dags/common/storage/columns.py @@ -767,3 +767,13 @@ def prepare_string(self, value): size=1000, truncate=False, ) + +# Columns used by the Deleted Media tables + +DELETED_ON = TimestampColumn( + name="deleted_on", required=True, upsert_strategy=UpsertStrategy.no_change +) + +DELETED_REASON = StringColumn( + name="deleted_reason", required=True, size=80, truncate=True +) diff --git a/catalog/dags/common/storage/db_columns.py b/catalog/dags/common/storage/db_columns.py index f96c26915d2..2ca7d800e90 100644 --- a/catalog/dags/common/storage/db_columns.py +++ b/catalog/dags/common/storage/db_columns.py @@ -82,9 +82,29 @@ col.AUDIO_SET_FOREIGN_IDENTIFIER, ] + DB_COLUMNS_BY_MEDIA_TYPE = {AUDIO: AUDIO_TABLE_COLUMNS, IMAGE: IMAGE_TABLE_COLUMNS} def setup_db_columns_for_media_type(func: callable) -> callable: """Provide media-type-specific DB columns as a kwarg to the decorated function.""" return setup_kwargs_for_media_type(DB_COLUMNS_BY_MEDIA_TYPE, "db_columns")(func) + + +DELETED_IMAGE_TABLE_COLUMNS = IMAGE_TABLE_COLUMNS + [col.DELETED_ON, col.DELETED_REASON] +DELETED_AUDIO_TABLE_COLUMNS = AUDIO_TABLE_COLUMNS + [col.DELETED_ON, col.DELETED_REASON] + +DELETED_MEDIA_DB_COLUMNS_BY_MEDIA_TYPE = { + AUDIO: DELETED_AUDIO_TABLE_COLUMNS, + IMAGE: DELETED_IMAGE_TABLE_COLUMNS, +} + + +def setup_deleted_db_columns_for_media_type(func: callable) -> callable: + """ + Provide media-type-specific deleted media DB columns as a kwarg to the decorated + function. + """ + return setup_kwargs_for_media_type( + DELETED_MEDIA_DB_COLUMNS_BY_MEDIA_TYPE, "deleted_db_columns" + )(func) diff --git a/catalog/dags/database/batched_update/batched_update.py b/catalog/dags/database/batched_update/batched_update.py index d09c2df1d14..bbe22d8d21e 100644 --- a/catalog/dags/database/batched_update/batched_update.py +++ b/catalog/dags/database/batched_update/batched_update.py @@ -7,7 +7,7 @@ from common import slack from common.constants import POSTGRES_CONN_ID -from common.sql import PostgresHook, single_value +from common.sql import RETURN_ROW_COUNT, PostgresHook, single_value from database.batched_update import constants @@ -57,7 +57,7 @@ def run_sql( postgres_conn_id: str = POSTGRES_CONN_ID, task: AbstractOperator = None, timeout: timedelta = None, - handler: callable = constants.RETURN_ROW_COUNT, + handler: callable = RETURN_ROW_COUNT, **kwargs, ): query = sql_template.format( diff --git a/catalog/dags/database/batched_update/constants.py b/catalog/dags/database/batched_update/constants.py index 5bc6d960f18..45ebdc94597 100644 --- a/catalog/dags/database/batched_update/constants.py +++ b/catalog/dags/database/batched_update/constants.py @@ -40,4 +40,3 @@ ); """ DROP_TABLE_QUERY = "DROP TABLE IF EXISTS {temp_table_name} CASCADE;" -RETURN_ROW_COUNT = lambda c: c.rowcount # noqa: E731 diff --git a/catalog/dags/database/delete_records/constants.py b/catalog/dags/database/delete_records/constants.py new file mode 100644 index 00000000000..9cbd769c29e --- /dev/null +++ b/catalog/dags/database/delete_records/constants.py @@ -0,0 +1,21 @@ +from datetime import datetime, timedelta + + +DAG_ID = "delete_records" +SLACK_USERNAME = "Upstream Delete Records" +SLACK_ICON = ":database:" +START_DATE = datetime(2023, 10, 25) +DAGRUN_TIMEOUT = timedelta(days=31 * 3) +CREATE_TIMEOUT = timedelta(hours=6) +DELETE_TIMEOUT = timedelta(hours=1) + +CREATE_RECORDS_QUERY = """ + INSERT INTO {destination_table} ({destination_cols}) + SELECT {source_cols} + FROM {source_table} + {select_query} + """ +DELETE_RECORDS_QUERY = """ + DELETE FROM {table} + {select_query} + """ diff --git a/catalog/dags/database/delete_records/delete_records.py b/catalog/dags/database/delete_records/delete_records.py new file mode 100644 index 00000000000..d1bf20059b4 --- /dev/null +++ b/catalog/dags/database/delete_records/delete_records.py @@ -0,0 +1,103 @@ +import logging +from datetime import timedelta + +from airflow.decorators import task +from airflow.models.abstractoperator import AbstractOperator + +from common import slack +from common.constants import POSTGRES_CONN_ID +from common.sql import RETURN_ROW_COUNT, PostgresHook +from common.storage.columns import DELETED_ON, Column +from common.storage.db_columns import ( + setup_db_columns_for_media_type, + setup_deleted_db_columns_for_media_type, +) +from database.delete_records import constants + + +logger = logging.getLogger(__name__) + + +def run_sql( + sql_template: str, + postgres_conn_id: str = POSTGRES_CONN_ID, + task: AbstractOperator = None, + timeout: timedelta = None, + handler: callable = RETURN_ROW_COUNT, + **kwargs, +): + query = sql_template.format(**kwargs) + + postgres = PostgresHook( + postgres_conn_id=postgres_conn_id, + default_statement_timeout=( + timeout if timeout else PostgresHook.get_execution_timeout(task) + ), + ) + + return postgres.run(query, handler=handler) + + +@task +@setup_deleted_db_columns_for_media_type +@setup_db_columns_for_media_type +def create_deleted_records( + *, + select_query: str, + deleted_reason: str, + media_type: str, + db_columns: list[Column] = None, + deleted_db_columns: list[Column] = None, + task: AbstractOperator = None, + postgres_conn_id: str = POSTGRES_CONN_ID, +): + """ + Select records from the given media table using the select query, and then for each + record create a corresponding record in the Deleted Media table. + """ + + destination_cols = ", ".join([col.db_name for col in deleted_db_columns]) + + # To build the source columns, we first list all columns in the main media table + source_cols = ", ".join([col.db_name for col in db_columns]) + # Then add the deleted-media specific columns. + # `deleted_on` is set to its insert value to get the current timestamp: + source_cols += f", {DELETED_ON.get_insert_value()}" + # `deleted_reason` is set to the given string + source_cols += f", '{deleted_reason}'" + + return run_sql( + sql_template=constants.CREATE_RECORDS_QUERY, + postgres_conn_id=postgres_conn_id, + task=task, + destination_table=f"deleted_{media_type}", + destination_cols=destination_cols, + source_table=media_type, + source_cols=source_cols, + select_query=select_query, + ) + + +@task +def delete_records_from_media_table( + table: str, select_query: str, postgres_conn_id: str = POSTGRES_CONN_ID +): + """Delete records matching the select_query from the given media table.""" + return run_sql( + sql_template=constants.DELETE_RECORDS_QUERY, + table=table, + select_query=select_query, + ) + + +@task +def notify_slack(text: str) -> str: + """Send a message to Slack.""" + slack.send_message( + text, + username=constants.SLACK_USERNAME, + icon_emoji=constants.SLACK_ICON, + dag_id=constants.DAG_ID, + ) + + return text diff --git a/catalog/dags/database/delete_records/delete_records_dag.py b/catalog/dags/database/delete_records/delete_records_dag.py new file mode 100644 index 00000000000..95e9dccf93b --- /dev/null +++ b/catalog/dags/database/delete_records/delete_records_dag.py @@ -0,0 +1,114 @@ +""" +# Delete Records DAG + +This DAG is used to delete records from the Catalog media tables, after creating a +corresponding record in the associated `deleted_` table for each record +to be deleted. It is important to note that records deleted by this DAG will still be +available in the API until the next data refresh runs. + +Required Dagrun Configuration parameters: + +* table_name: the name of the table to delete from. Must be a valid media table +* select_query: a SQL `WHERE` clause used to select the rows that will be deleted +* reason: a string explaining the reason for deleting the records. Ex ('deadlink') + + +An example dag_run configuration used to delete all records for the "foo" image provider +due to deadlinks would look like this: + +``` +{ + "table_name": "image", + "select_query": "WHERE provider='foo'", + "reason": "deadlink" +} +``` + +## Warnings + +Presently, there is no logic to prevent records that have an entry in a Deleted Media +table from simply being reingested during provider ingestion. Therefore in its current +state, the DAG should _only_ be used to delete records that we can guarantee will not +be reingested (for example, because the provider is archived). + +This DAG does not have automated handling for deadlocks, so you must be certain that +records selected for deletion in this DAG are not also being written to by a provider +DAG, for instance. The simplest way to do this is to ensure that any affected provider +DAGs are not currently running. +""" + + +import logging + +from airflow.decorators import dag +from airflow.models.param import Param + +from common.constants import AUDIO, DAG_DEFAULT_ARGS, MEDIA_TYPES +from database.delete_records import constants +from database.delete_records.delete_records import ( + create_deleted_records, + delete_records_from_media_table, + notify_slack, +) + + +logger = logging.getLogger(__name__) + + +@dag( + dag_id=constants.DAG_ID, + schedule=None, + start_date=constants.START_DATE, + tags=["database"], + dagrun_timeout=constants.DAGRUN_TIMEOUT, + doc_md=__doc__, + default_args={**DAG_DEFAULT_ARGS, "retries": 0}, + render_template_as_native_obj=True, + params={ + "table_name": Param( + default=AUDIO, + enum=MEDIA_TYPES, + description="The name of the media table from which to select records.", + ), + "select_query": Param( + default="WHERE...", + type="string", + description=( + "The `WHERE` clause of a query that selects all the rows to" + " be deleted." + ), + pattern="^WHERE", + ), + "reason": Param( + default="", + type="string", + description="Short descriptor of the reason for deleting the records.", + ), + }, +) +def delete_records(): + # Create the records in the Deleted Media table + insert_into_deleted_media_table = create_deleted_records.override( + task_id="update_deleted_media_table", execution_timeout=constants.CREATE_TIMEOUT + )( + select_query="{{ params.select_query }}", + deleted_reason="{{ params.reason }}", + media_type="{{ params.table_name }}", + ) + + # If successful, delete the records from the media table + delete_records = delete_records_from_media_table.override( + execution_timeout=constants.DELETE_TIMEOUT + )(table="{{ params.table_name }}", select_query="{{ params.select_query }}") + + notify_complete = notify_slack( + text=( + f"Deleted {delete_records} records from the" + " {{ params.table_name }} table matching query: `{{ params.select_query }}`" + ), + ) + + insert_into_deleted_media_table >> delete_records >> notify_complete + + +delete_records() diff --git a/catalog/dags/maintenance/add_license_url.py b/catalog/dags/maintenance/add_license_url.py index 5e3b3072f87..61a03012b3f 100644 --- a/catalog/dags/maintenance/add_license_url.py +++ b/catalog/dags/maintenance/add_license_url.py @@ -25,9 +25,8 @@ from common.constants import DAG_DEFAULT_ARGS, POSTGRES_CONN_ID, XCOM_PULL_TEMPLATE from common.licenses import get_license_info_from_license_pair -from common.loader.sql import RETURN_ROW_COUNT from common.slack import send_message -from common.sql import PostgresHook +from common.sql import RETURN_ROW_COUNT, PostgresHook from providers.provider_dag_factory import AWS_CONN_ID, OPENVERSE_BUCKET diff --git a/catalog/dags/retired/database/terminate_long_queries_workflow.py b/catalog/dags/retired/database/terminate_long_queries_workflow.py index 6639746212a..402232ad0b7 100644 --- a/catalog/dags/retired/database/terminate_long_queries_workflow.py +++ b/catalog/dags/retired/database/terminate_long_queries_workflow.py @@ -17,9 +17,8 @@ OPENLEDGER_API_CONN_ID, XCOM_PULL_TEMPLATE, ) -from common.loader.sql import RETURN_ROW_COUNT from common.slack import send_message -from common.sql import PostgresHook +from common.sql import PostgresHook, RETURN_ROW_COUNT logger = logging.getLogger(__name__) diff --git a/catalog/tests/dags/database/test_batched_update.py b/catalog/tests/dags/database/test_batched_update.py index 4b08b3f0196..0e9263fc91f 100644 --- a/catalog/tests/dags/database/test_batched_update.py +++ b/catalog/tests/dags/database/test_batched_update.py @@ -1,5 +1,4 @@ import logging -import uuid import psycopg2 import pytest @@ -7,7 +6,6 @@ from catalog.tests.test_utils import sql from common.storage import columns as col -from common.storage.db_columns import IMAGE_TABLE_COLUMNS from database.batched_update import constants from database.batched_update.batched_update import ( get_expected_update_count, @@ -76,15 +74,6 @@ def postgres_with_image_and_temp_table(batch_start_var, image_table, temp_table) conn.close() -def _get_insert_query(image_table, values: dict): - # Append the required identifier - values[col.IDENTIFIER.db_name] = uuid.uuid4() - - query_values = sql.create_query_values(values, columns=IMAGE_TABLE_COLUMNS) - - return f"INSERT INTO {image_table} VALUES({query_values});" - - def _load_sample_data_into_image_table(image_table, postgres): DEFAULT_COLS = { col.LICENSE.db_name: LICENSE, @@ -112,17 +101,9 @@ def _load_sample_data_into_image_table(image_table, postgres): }, ] - for record in sample_records: - load_data_query = _get_insert_query( - image_table, - { - **record, - **DEFAULT_COLS, - }, - ) - postgres.cursor.execute(load_data_query) - - postgres.connection.commit() + sql.load_sample_data_into_image_table( + image_table, postgres, [{**record, **DEFAULT_COLS} for record in sample_records] + ) @pytest.mark.parametrize( diff --git a/catalog/tests/dags/database/test_delete_records.py b/catalog/tests/dags/database/test_delete_records.py new file mode 100644 index 00000000000..2faf2c45e11 --- /dev/null +++ b/catalog/tests/dags/database/test_delete_records.py @@ -0,0 +1,248 @@ +import logging + +import psycopg2 +import pytest + +from catalog.tests.test_utils import sql +from common.storage import columns as col +from common.storage.db_columns import DELETED_IMAGE_TABLE_COLUMNS, IMAGE_TABLE_COLUMNS +from database.delete_records.delete_records import ( + create_deleted_records, + delete_records_from_media_table, +) + + +logger = logging.getLogger(__name__) + + +FID_A = "a" +FID_B = "b" +FID_C = "c" +MATCHING_PROVIDER = "foo" +NOT_MATCHING_PROVIDER = "bar" +TITLE = "test title" +LICENSE = "by" + + +@pytest.fixture +def identifier(request): + return f"{hash(request.node.name)}".replace("-", "_") + + +@pytest.fixture +def image_table(identifier): + # Parallelized tests need to use distinct database tables + return f"image_{identifier}" + + +@pytest.fixture +def deleted_image_table(identifier): + return f"deleted_image_{identifier}" + + +@pytest.fixture +def postgres_with_image_and_deleted_image_table(image_table, deleted_image_table): + conn = psycopg2.connect(sql.POSTGRES_TEST_URI) + cur = conn.cursor() + drop_table_query = f""" + DROP TABLE IF EXISTS {image_table} CASCADE; + DROP TABLE IF EXISTS {deleted_image_table} CASCADE; + """ + cur.execute(drop_table_query) + cur.execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp" WITH SCHEMA public;') + cur.execute(sql.CREATE_IMAGE_TABLE_QUERY.format(image_table)) + cur.execute(sql.CREATE_DELETED_IMAGE_TABLE_QUERY.format(deleted_image_table)) + + conn.commit() + + yield sql.PostgresRef(cursor=cur, connection=conn) + + cur.execute(drop_table_query) + + cur.close() + conn.commit() + conn.close() + + +def _load_sample_data_into_image_table(image_table, postgres): + DEFAULT_COLS = { + col.LICENSE.db_name: LICENSE, + col.UPDATED_ON.db_name: "NOW()", + col.CREATED_ON.db_name: "NOW()", + col.TITLE.db_name: TITLE, + } + + # Load sample data into the image table + sample_records = [ + { + col.FOREIGN_ID.db_name: FID_A, + col.DIRECT_URL.db_name: f"https://images.com/{FID_A}/img.jpg", + col.PROVIDER.db_name: MATCHING_PROVIDER, + }, + { + col.FOREIGN_ID.db_name: FID_B, + col.DIRECT_URL.db_name: f"https://images.com/{FID_B}/img.jpg", + col.PROVIDER.db_name: MATCHING_PROVIDER, + }, + { + col.FOREIGN_ID.db_name: FID_C, + col.DIRECT_URL.db_name: f"https://images.com/{FID_C}/img.jpg", + col.PROVIDER.db_name: NOT_MATCHING_PROVIDER, + }, + ] + + sql.load_sample_data_into_image_table( + image_table, postgres, [{**record, **DEFAULT_COLS} for record in sample_records] + ) + + +def test_create_deleted_records( + postgres_with_image_and_deleted_image_table, + image_table, + deleted_image_table, + identifier, +): + # Load sample data into the image table + _load_sample_data_into_image_table( + image_table, + postgres_with_image_and_deleted_image_table, + ) + + # Delete records matching the MATCHING_PROVIDER + select_query = f"WHERE provider='{MATCHING_PROVIDER}'" + deleted_reason = "FOO" + deleted_count = create_deleted_records.function( + media_type=image_table, + select_query=select_query, + deleted_reason=deleted_reason, + db_columns=IMAGE_TABLE_COLUMNS, + deleted_db_columns=DELETED_IMAGE_TABLE_COLUMNS, + postgres_conn_id=sql.POSTGRES_CONN_ID, + ) + + # Both records A and B should have been added to the deleted image table + assert deleted_count == 2 + + postgres_with_image_and_deleted_image_table.cursor.execute( + f"SELECT * FROM {deleted_image_table};" + ) + actual_rows = postgres_with_image_and_deleted_image_table.cursor.fetchall() + + assert len(actual_rows) == 2 + + # They should both retain the fields of the original record, and have + # the deleted_reason set appropriately. + assert actual_rows[0][sql.fid_idx] == FID_A + assert actual_rows[0][sql.title_idx] == TITLE + assert actual_rows[0][sql.license_idx] == LICENSE + assert actual_rows[0][sql.deleted_reason_idx] == deleted_reason + + assert actual_rows[1][sql.fid_idx] == FID_B + assert actual_rows[1][sql.title_idx] == TITLE + assert actual_rows[1][sql.license_idx] == LICENSE + assert actual_rows[1][sql.deleted_reason_idx] == deleted_reason + + +def test_create_deleted_records_with_query_matching_no_rows( + postgres_with_image_and_deleted_image_table, + image_table, + deleted_image_table, + identifier, +): + # Load sample data into the image table + _load_sample_data_into_image_table( + image_table, + postgres_with_image_and_deleted_image_table, + ) + + # Try to delete records matching a condition that does not exist + select_query = "WHERE provider='NONEXISTENT_PROVIDER'" + deleted_reason = "FOO" + deleted_count = create_deleted_records.function( + media_type=image_table, + select_query=select_query, + deleted_reason=deleted_reason, + db_columns=IMAGE_TABLE_COLUMNS, + deleted_db_columns=DELETED_IMAGE_TABLE_COLUMNS, + postgres_conn_id=sql.POSTGRES_CONN_ID, + ) + + # No records should have been added to the deleted image table + assert deleted_count == 0 + + postgres_with_image_and_deleted_image_table.cursor.execute( + f"SELECT * FROM {deleted_image_table};" + ) + actual_rows = postgres_with_image_and_deleted_image_table.cursor.fetchall() + + assert len(actual_rows) == 0 + + +def test_delete_records_from_media_table( + postgres_with_image_and_deleted_image_table, + image_table, + deleted_image_table, + identifier, +): + # Load sample data into the image table + _load_sample_data_into_image_table( + image_table, + postgres_with_image_and_deleted_image_table, + ) + + # Delete records matching the MATCHING_PROVIDER + select_query = f"WHERE provider='{MATCHING_PROVIDER}'" + deleted_count = delete_records_from_media_table.function( + table=image_table, + select_query=select_query, + postgres_conn_id=sql.POSTGRES_CONN_ID, + ) + + # Both records A and B should have been deleted + assert deleted_count == 2 + + postgres_with_image_and_deleted_image_table.cursor.execute( + f"SELECT * FROM {image_table};" + ) + actual_rows = postgres_with_image_and_deleted_image_table.cursor.fetchall() + + # There is only one record left in the image table + assert len(actual_rows) == 1 + + # They should both retain the fields of the original record, and have + # the deleted_reason set appropriately. + assert actual_rows[0][sql.fid_idx] == FID_C + assert actual_rows[0][sql.title_idx] == TITLE + assert actual_rows[0][sql.license_idx] == LICENSE + + +def test_delete_no_records_from_media_table( + postgres_with_image_and_deleted_image_table, + image_table, + deleted_image_table, + identifier, +): + # Load sample data into the image table + _load_sample_data_into_image_table( + image_table, + postgres_with_image_and_deleted_image_table, + ) + + # Try to delete records using a query that matches nothing + select_query = "WHERE provider='NONEXISTENT_PROVIDER'" + deleted_count = delete_records_from_media_table.function( + table=image_table, + select_query=select_query, + postgres_conn_id=sql.POSTGRES_CONN_ID, + ) + + # No records should have been deleted + assert deleted_count == 0 + + postgres_with_image_and_deleted_image_table.cursor.execute( + f"SELECT * FROM {image_table};" + ) + actual_rows = postgres_with_image_and_deleted_image_table.cursor.fetchall() + + # All three records are left in the image table + assert len(actual_rows) == 3 diff --git a/catalog/tests/dags/test_dag_parsing.py b/catalog/tests/dags/test_dag_parsing.py index eb90f9b8774..2b6f131c469 100644 --- a/catalog/tests/dags/test_dag_parsing.py +++ b/catalog/tests/dags/test_dag_parsing.py @@ -26,6 +26,7 @@ "data_refresh/create_filtered_index_dag.py", "oauth2/authorize_dag.py", "oauth2/token_refresh_dag.py", + "database/delete_records/delete_records_dag.py", ] # Expected count from the DagBag once a file has been parsed diff --git a/catalog/tests/test_utils/sql.py b/catalog/tests/test_utils/sql.py index 2072c33f510..dcbaccd354f 100644 --- a/catalog/tests/test_utils/sql.py +++ b/catalog/tests/test_utils/sql.py @@ -1,4 +1,5 @@ import os +import uuid from collections import namedtuple from unittest import mock @@ -6,7 +7,7 @@ from common.loader.sql import create_column_definitions from common.storage import columns as col -from common.storage.db_columns import IMAGE_TABLE_COLUMNS +from common.storage.db_columns import DELETED_IMAGE_TABLE_COLUMNS, IMAGE_TABLE_COLUMNS from common.storage.tsv_columns import CURRENT_IMAGE_TSV_COLUMNS @@ -43,6 +44,14 @@ " USING btree (url);" ) +DELETED_IMAGE_TABLE_COLUMN_DEFINITIONS = create_column_definitions( + DELETED_IMAGE_TABLE_COLUMNS +) + +CREATE_DELETED_IMAGE_TABLE_QUERY = f"""CREATE TABLE public.{{}} ( + {DELETED_IMAGE_TABLE_COLUMN_DEFINITIONS} +);""" + PostgresRef = namedtuple("PostgresRef", ["cursor", "connection"]) ti = mock.Mock(spec=TaskInstance) @@ -74,6 +83,9 @@ height_idx = COLUMN_NAMES.index(col.HEIGHT.db_name) standardized_popularity_idx = COLUMN_NAMES.index(col.STANDARDIZED_POPULARITY.db_name) +DELETED_MEDIA_COLUMN_NAMES = [column.db_name for column in DELETED_IMAGE_TABLE_COLUMNS] +deleted_reason_idx = DELETED_MEDIA_COLUMN_NAMES.index(col.DELETED_REASON.db_name) + def create_query_values( column_values: dict, @@ -90,3 +102,20 @@ def create_query_values( val = f"'{str(val)}'" result.append(val) return ",".join(result) + + +def _get_insert_query(image_table, values: dict): + # Append the required identifier + values[col.IDENTIFIER.db_name] = uuid.uuid4() + + query_values = create_query_values(values, columns=IMAGE_TABLE_COLUMNS) + + return f"INSERT INTO {image_table} VALUES({query_values});" + + +def load_sample_data_into_image_table(image_table, postgres, records): + for record in records: + load_data_query = _get_insert_query(image_table, record) + postgres.cursor.execute(load_data_query) + + postgres.connection.commit() diff --git a/docker/upstream_db/0003_openledger_image_schema.sql b/docker/upstream_db/0003_openledger_image_schema.sql index c62314cb953..11e38377f5c 100644 --- a/docker/upstream_db/0003_openledger_image_schema.sql +++ b/docker/upstream_db/0003_openledger_image_schema.sql @@ -53,3 +53,11 @@ CREATE UNIQUE INDEX image_identifier_key CREATE UNIQUE INDEX image_url_key ON public.image USING btree (url); + + +CREATE TABLE public.deleted_image ( + LIKE public.image, + deleted_on timestamp with time zone NOT NULL, + deleted_reason character varying(80) +); +ALTER TABLE public.deleted_image OWNER TO deploy; diff --git a/docker/upstream_db/0006_openledger_audio_schema.sql b/docker/upstream_db/0006_openledger_audio_schema.sql index e14a7b3161e..036991a4b06 100644 --- a/docker/upstream_db/0006_openledger_audio_schema.sql +++ b/docker/upstream_db/0006_openledger_audio_schema.sql @@ -60,3 +60,11 @@ CREATE UNIQUE INDEX audio_identifier_key CREATE UNIQUE INDEX audio_url_key ON public.audio USING btree (url); + + +CREATE TABLE public.deleted_audio ( + LIKE public.audio, + deleted_on timestamp with time zone NOT NULL, + deleted_reason character varying(80) +); +ALTER TABLE public.deleted_audio OWNER TO deploy; diff --git a/documentation/catalog/reference/DAGs.md b/documentation/catalog/reference/DAGs.md index baefb4e4e1b..cef197962e0 100644 --- a/documentation/catalog/reference/DAGs.md +++ b/documentation/catalog/reference/DAGs.md @@ -44,6 +44,7 @@ The following are DAGs grouped by their primary tag: | DAG ID | Schedule Interval | | --------------------------------------------------------------------------------- | ----------------- | | [`batched_update`](#batched_update) | `None` | +| [`delete_records`](#delete_records) | `None` | | [`recreate_audio_popularity_calculation`](#recreate_audio_popularity_calculation) | `None` | | [`recreate_image_popularity_calculation`](#recreate_image_popularity_calculation) | `None` | | [`report_pending_reported_media`](#report_pending_reported_media) | `@weekly` | @@ -126,6 +127,7 @@ The following is documentation associated with each DAG (where available): 1. [`check_silenced_dags`](#check_silenced_dags) 1. [`create_filtered_audio_index`](#create_filtered_audio_index) 1. [`create_filtered_image_index`](#create_filtered_image_index) +1. [`delete_records`](#delete_records) 1. [`europeana_workflow`](#europeana_workflow) 1. [`finnish_museums_workflow`](#finnish_museums_workflow) 1. [`flickr_audit_sub_provider_workflow`](#flickr_audit_sub_provider_workflow) @@ -446,6 +448,47 @@ There are two mechanisms that prevent this from happening: This ensures that neither are depending on or modifying the origin indexes critical for the creation of the filtered indexes. +### `delete_records` + +#### Delete Records DAG + +This DAG is used to delete records from the Catalog media tables, after creating +a corresponding record in the associated `deleted_` table for each +record to be deleted. It is important to note that records deleted by this DAG +will still be available in the API until the next data refresh runs. + +Required Dagrun Configuration parameters: + +- table_name: the name of the table to delete from. Must be a valid media table +- select_query: a SQL `WHERE` clause used to select the rows that will be + deleted +- reason: a string explaining the reason for deleting the records. Ex + ('deadlink') + +An example dag_run configuration used to delete all records for the "foo" image +provider due to deadlinks would look like this: + +``` +{ + "table_name": "image", + "select_query": "WHERE provider='foo'", + "reason": "deadlink" +} +``` + +##### Warnings + +Presently, there is no logic to prevent records that have an entry in a Deleted +Media table from simply being reingested during provider ingestion. Therefore in +its current state, the DAG should _only_ be used to delete records that we can +guarantee will not be reingested (for example, because the provider is +archived). + +This DAG does not have automated handling for deadlocks, so you must be certain +that records selected for deletion in this DAG are not also being written to by +a provider DAG, for instance. The simplest way to do this is to ensure that any +affected provider DAGs are not currently running. + ### `europeana_workflow` Content Provider: Europeana