diff --git a/synapse_data_warehouse/synapse_raw/tables/V2.25.0__add_version_history_to_nodesnapshot.sql b/synapse_data_warehouse/synapse_raw/tables/V2.25.0__add_version_history_to_nodesnapshot.sql new file mode 100644 index 00000000..da8d6d59 --- /dev/null +++ b/synapse_data_warehouse/synapse_raw/tables/V2.25.0__add_version_history_to_nodesnapshot.sql @@ -0,0 +1,8 @@ +-- Configure environment +USE SCHEMA {{database_name}}.synapse_raw; --noqa: JJ01,PRS,TMP,CP02 +USE WAREHOUSE compute_xsmall; + +-- Add `version_history` column +ALTER TABLE nodesnapshots + ADD COLUMN version_history VARIANT + COMMENT 'The list of entity versions, at the time of the snapshot.'; diff --git a/synapse_data_warehouse/synapse_raw/tables/V2.25.2__refresh_node_snapshot.sql b/synapse_data_warehouse/synapse_raw/tables/V2.25.2__refresh_node_snapshot.sql new file mode 100644 index 00000000..3da91943 --- /dev/null +++ b/synapse_data_warehouse/synapse_raw/tables/V2.25.2__refresh_node_snapshot.sql @@ -0,0 +1,79 @@ +-- Configure environment +USE SCHEMA {{database_name}}.synapse_raw; --noqa: JJ01,PRS,TMP,CP02 +USE WAREHOUSE compute_medium; + +SET cutoff_date = '2024-11-01'; +SET num_records_to_be_reloaded = ( + SELECT count(*) + FROM nodesnapshots + WHERE snapshot_date >= $cutoff_date +); + +-- This scripting block is an attempt to avoid any `COPY INTO` action in +-- the unlikely case of a rollback/replay of our database state. For example, +-- if we are deploying our database from scratch -- or more specifically, if +-- we don't have any table data requiring a `version_history` backfill -- we don't +-- want to load data (that's a job for the associated data loading task). +EXECUTE IMMEDIATE $$ +BEGIN + LET count := $num_records_to_be_reloaded; + IF (count > 0) THEN + -- Drop all records added since the month wherein the `version_history` field + -- was included in dev or prod. The field was added on 2024-11-12, but to simplify + -- the regex in the COPY INTO statement we drop/load all data from November 2024 onward. + -- See https://github.com/Sage-Bionetworks/Synapse-Repository-Services/pull/5199 + DELETE FROM nodesnapshots + WHERE snapshot_date >= $cutoff_date; + + -- Run the same query as nodesnapshots_task except with FORCE=TRUE + -- and PATTERN = all snapshot_date from 2024-11-.* onward (until 2099) + COPY INTO nodesnapshots FROM ( + SELECT + $1:change_type AS change_type, + $1:change_timestamp AS change_timestamp, + $1:change_user_id AS change_user_id, + $1:snapshot_timestamp AS snapshot_timestamp, + $1:id AS id, + $1:benefactor_id AS benefactor_id, + $1:project_id AS project_id, + $1:parent_id AS parent_id, + $1:node_type AS node_type, + $1:created_on AS created_on, + $1:created_by AS created_by, + $1:modified_on AS modified_on, + $1:modified_by AS modified_by, + $1:version_number AS version_number, + $1:file_handle_id AS file_handle_id, + $1:name AS name, + $1:is_public AS is_public, + $1:is_controlled AS is_controlled, + $1:is_restricted AS is_restricted, + NULLIF( + REGEXP_REPLACE( + metadata$filename, + '.*nodesnapshots\/snapshot_date\=(.*)\/.*', '\\1' + ), + '__HIVE_DEFAULT_PARTITION__' + ) AS snapshot_date, + $1:effective_ars AS effective_ars, + PARSE_JSON(REPLACE(REPLACE($1:annotations, '\n', '\\n'), '\r', '\\r')) AS annotations, + PARSE_JSON(REPLACE(REPLACE($1:derived_annotations, '\n', '\\n'), '\r', '\\r')) AS derived_annotations, + $1:version_comment AS version_comment, + $1:version_label AS version_label, + $1:alias AS alias, + $1:activity_id AS activity_id, + PARSE_JSON($1:column_model_ids) AS column_model_ids, + PARSE_JSON($1:scope_ids) AS scope_ids, + PARSE_JSON($1:items) AS items, + PARSE_JSON($1:reference) AS reference, + $1:is_search_enabled AS is_search_enabled, + $1:defining_sql AS defining_sql, + PARSE_JSON(REPLACE(REPLACE($1:internal_annotations, '\n', '\\n'), '\r', '\\r')) AS internal_annotations, + PARSE_JSON(REPLACE(REPLACE($1:version_history, '\n', '\\n'), '\r', '\\r')) AS version_history + FROM @{{stage_storage_integration}}_stage/nodesnapshots/ --noqa: TMP + ) + PATTERN = '.*nodesnapshots/snapshot_date=(2024-(11|12)-.*|202[5-9]-.*|20[3-9]\d-.*)/.*' + FORCE = TRUE; + END IF; +END; +$$; diff --git a/synapse_data_warehouse/synapse_raw/tasks/V2.25.1__update_node_tasks.sql b/synapse_data_warehouse/synapse_raw/tasks/V2.25.1__update_node_tasks.sql new file mode 100644 index 00000000..8c02e26d --- /dev/null +++ b/synapse_data_warehouse/synapse_raw/tasks/V2.25.1__update_node_tasks.sql @@ -0,0 +1,62 @@ +-- Configure environment +USE SCHEMA {{database_name}}.synapse_raw; --noqa: JJ01,PRS,TMP,CP02 +USE WAREHOUSE compute_xsmall; + +-- suspend nodesnapshot task and its dependents +ALTER TASK nodesnapshot_task SUSPEND; +ALTER TASK upsert_to_node_latest_task SUSPEND; +ALTER TASK remove_delete_nodes_task SUSPEND; + +-- Add `version_history` column to task definition +ALTER TASK nodesnapshot_task MODIFY AS + COPY INTO + nodesnapshots + FROM ( + SELECT + $1:change_type AS change_type, + $1:change_timestamp AS change_timestamp, + $1:change_user_id AS change_user_id, + $1:snapshot_timestamp AS snapshot_timestamp, + $1:id AS id, + $1:benefactor_id AS benefactor_id, + $1:project_id AS project_id, + $1:parent_id AS parent_id, + $1:node_type AS node_type, + $1:created_on AS created_on, + $1:created_by AS created_by, + $1:modified_on AS modified_on, + $1:modified_by AS modified_by, + $1:version_number AS version_number, + $1:file_handle_id AS file_handle_id, + $1:name AS name, + $1:is_public AS is_public, + $1:is_controlled AS is_controlled, + $1:is_restricted AS is_restricted, + NULLIF( + REGEXP_REPLACE( + metadata$filename, + '.*nodesnapshots\/snapshot_date\=(.*)\/.*', '\\1' + ), + '__HIVE_DEFAULT_PARTITION__' + ) AS snapshot_date, + $1:effective_ars AS effective_ars, + PARSE_JSON(REPLACE(REPLACE($1:annotations, '\n', '\\n'), '\r', '\\r')) AS annotations, + PARSE_JSON(REPLACE(REPLACE($1:derived_annotations, '\n', '\\n'), '\r', '\\r')) AS derived_annotations, + $1:version_comment AS version_comment, + $1:version_label AS version_label, + $1:alias AS alias, + $1:activity_id AS activity_id, + PARSE_JSON($1:column_model_ids) AS column_model_ids, + PARSE_JSON($1:scope_ids) AS scope_ids, + PARSE_JSON($1:items) AS items, + PARSE_JSON($1:reference) AS reference, + $1:is_search_enabled AS is_search_enabled, + $1:defining_sql AS defining_sql, + PARSE_JSON(REPLACE(REPLACE($1:internal_annotations, '\n', '\\n'), '\r', '\\r')) AS internal_annotations, + PARSE_JSON(REPLACE(REPLACE($1:version_history, '\n', '\\n'), '\r', '\\r')) AS version_history + FROM @{{stage_storage_integration}}_stage/nodesnapshots/ --noqa: TMP + ) + PATTERN = '.*nodesnapshots/snapshot_date=.*/.*'; + +-- resume all tasks which we suspended earlier +SELECT SYSTEM$TASK_DEPENDENTS_ENABLE('refresh_synapse_warehouse_s3_stage_task'); \ No newline at end of file