From d741403a9b17beef18a92767716dd2d84e6964b5 Mon Sep 17 00:00:00 2001 From: Ben Wu <12437227+BenWu@users.noreply.github.com> Date: Thu, 3 Oct 2024 18:34:40 +0100 Subject: [PATCH] [DENG-4256] Search for shredder targets using table lineage (#6289) --- dags.yaml | 20 + requirements.in | 1 + requirements.txt | 25 +- .../shredder_targets_v1/metadata.yaml | 17 + .../shredder_targets_v1/query.py | 350 ++++++++++++++++++ .../shredder_targets_v1/schema.yaml | 42 +++ 6 files changed, 445 insertions(+), 10 deletions(-) create mode 100644 sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/metadata.yaml create mode 100644 sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/query.py create mode 100644 sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/schema.yaml diff --git a/dags.yaml b/dags.yaml index 04229cf8c27..77164377404 100644 --- a/dags.yaml +++ b/dags.yaml @@ -1810,3 +1810,23 @@ bqetl_dynamic_dau: tags: - repo/bigquery-etl - impact/tier_2 + +bqetl_shredder_monitoring: + default_args: + depends_on_past: false + email: + - bewu@mozilla.com + email_on_failure: true + email_on_retry: False + end_date: null + owner: bewu@mozilla.com + retries: 2 + retry_delay: 30m + start_date: '2024-10-01' + description: '[EXPERIMENTAL] Monitoring queries for shredder operation' + repo: bigquery-etl + schedule_interval: 0 12 * * * + tags: + - repo/bigquery-etl + - impact/tier_3 + - triage/no_triage diff --git a/requirements.in b/requirements.in index 6d4c5059047..af45966bb1f 100644 --- a/requirements.in +++ b/requirements.in @@ -12,6 +12,7 @@ gitpython==3.1.43 google-auth>=2.30.0 # To try to fix "Compute Engine Metadata server call to universe/universe_domain returned 404" errors. google-cloud-bigquery==3.25.0 google-cloud-bigquery-storage[fastavro]==2.24.0 +google-cloud-datacatalog-lineage==0.3.8 google-cloud-storage==2.18.2 Jinja2==3.1.3 jsonschema==4.23.0 diff --git a/requirements.txt b/requirements.txt index e23e845e90f..4e7866a9f57 100644 --- a/requirements.txt +++ b/requirements.txt @@ -514,6 +514,7 @@ google-api-core[grpc]==2.17.1 \ # google-cloud-bigquery # google-cloud-bigquery-storage # google-cloud-core + # google-cloud-datacatalog-lineage # google-cloud-storage google-auth==2.35.0 \ --hash=sha256:25df55f327ef021de8be50bad0dfd4a916ad0de96da86cd05661c9297723ad3f \ @@ -525,6 +526,7 @@ google-auth==2.35.0 \ # google-auth-oauthlib # google-cloud-bigquery # google-cloud-core + # google-cloud-datacatalog-lineage # google-cloud-storage google-auth-oauthlib==0.8.0 \ --hash=sha256:40cc612a13c3336d5433e94e2adb42a0c88f6feb6c55769e44500fc70043a576 \ @@ -544,6 +546,10 @@ google-cloud-core==2.3.2 \ # via # google-cloud-bigquery # google-cloud-storage +google-cloud-datacatalog-lineage==0.3.8 \ + --hash=sha256:8a17b11ba647fda33b36a8680658ce684d75c3d633c67e626001bbb19d8f3ca6 \ + --hash=sha256:e22151121cbe7bfb07ae8c8e28319b7a03cc8d17a3407fa9ace0db8581701cfa + # via -r requirements.in google-cloud-storage==2.18.2 \ --hash=sha256:97a4d45c368b7d401ed48c4fdfe86e1e1cb96401c9e199e419d289e2c0370166 \ --hash=sha256:aaf7acd70cdad9f274d29332673fcab98708d0e1f4dceb5a5356aaef06af4d99 @@ -742,9 +748,7 @@ jaraco-classes==3.4.0 \ jeepney==0.8.0 \ --hash=sha256:5efe48d255973902f6badc3ce55e2aa6c5c3b3bc642059ef3a91247bcfcc5806 \ --hash=sha256:c0a454ad016ca575060802ee4d590dd912e35c122fa04e70306de3d076cce755 - # via - # keyring - # secretstorage + # via secretstorage jinja2==3.1.3 \ --hash=sha256:7d6d50dd97d52cbc355597bd845fabfbac3f551e1f99619e39a35ce8c370b5fa \ --hash=sha256:ac8bd6544d4bb2c9792bf3a159e80bba8fda7f07e81bc3aed565432d5925ba90 @@ -1199,10 +1203,12 @@ pre-commit==3.8.0 \ --hash=sha256:8bb6494d4a20423842e198980c9ecf9f96607a07ea29549e180eef9ae80fe7af \ --hash=sha256:9a90a53bf82fdd8778d58085faf8d83df56e40dfe18f45b19446e26bf1b3a63f # via -r requirements.in -proto-plus==1.22.2 \ - --hash=sha256:0e8cda3d5a634d9895b75c573c9352c16486cb75deb0e078b5fda34db4243165 \ - --hash=sha256:de34e52d6c9c6fcd704192f09767cb561bb4ee64e70eede20b0834d841f0be4d - # via google-cloud-bigquery-storage +proto-plus==1.24.0 \ + --hash=sha256:30b72a5ecafe4406b0d339db35b56c4059064e69227b8c3bda7462397f966445 \ + --hash=sha256:402576830425e5f6ce4c2a6702400ac79897dab0b4343821aa5188b0fab81a12 + # via + # google-cloud-bigquery-storage + # google-cloud-datacatalog-lineage protobuf==4.21.12 \ --hash=sha256:1f22ac0ca65bb70a876060d96d914dae09ac98d114294f77584b0d2644fa9c30 \ --hash=sha256:237216c3326d46808a9f7c26fd1bd4b20015fb6867dc5d263a493ef9a539293b \ @@ -1223,6 +1229,7 @@ protobuf==4.21.12 \ # gcloud # google-api-core # google-cloud-bigquery-storage + # google-cloud-datacatalog-lineage # googleapis-common-protos # grpcio-status # proto-plus @@ -1970,9 +1977,7 @@ s3transfer==0.10.2 \ secretstorage==3.3.3 \ --hash=sha256:2403533ef369eca6d2ba81718576c5e0f564d5cca1b58f73a8b23e7d4eeebd77 \ --hash=sha256:f356e6628222568e3af06f2eba8df495efa13b3b63081dafd4f7d9a7b7bc9f99 - # via - # bigeye-sdk - # keyring + # via bigeye-sdk shellingham==1.5.4 \ --hash=sha256:7ecfff8f2fd72616f7481040475a65b2bf8af90a56c89140852d1120324e8686 \ --hash=sha256:8dbca0739d487e5bd35ab3ca4b36e11c4078f3a234bfce294b0a0291363404de diff --git a/sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/metadata.yaml b/sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/metadata.yaml new file mode 100644 index 00000000000..ea19d8fa490 --- /dev/null +++ b/sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/metadata.yaml @@ -0,0 +1,17 @@ +friendly_name: Shredder Targets +description: |- + Daily list of shredder deletion targets comparing the configured targets with + the lineage of found id tables in bigquery. +owners: + - bewu@mozilla.com +labels: + incremental: true + owner1: benwu +scheduling: + dag_name: bqetl_shredder_monitoring + arguments: ["--output-table", "moz-fx-data-shared-prod.telemetry_derived.shredder_targets_v1", "--run-date", "{{ ds }}"] +bigquery: + time_partitioning: + type: day + field: run_date + require_partition_filter: true diff --git a/sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/query.py b/sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/query.py new file mode 100644 index 00000000000..3a6078e0d2d --- /dev/null +++ b/sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/query.py @@ -0,0 +1,350 @@ +#!/usr/bin/env python3 + +"""Search for tables with client id columns.""" +import datetime +from collections import defaultdict +from multiprocessing.pool import ThreadPool +from pathlib import Path +from typing import Any, Dict, Iterable, List, Set + +import click +from google.cloud import bigquery +from google.cloud import datacatalog_lineage_v1 as datacatalog_lineage +from google.cloud.bigquery import TableReference + +from bigquery_etl.schema import Schema +from bigquery_etl.shredder.config import ( + CLIENT_ID, + DELETE_TARGETS, + GLEAN_CLIENT_ID, + SHARED_PROD, + DeleteSource, + find_glean_targets, + get_glean_channel_to_app_name_mapping, +) + +FIND_TABLES_QUERY_TEMPLATE = """ +WITH no_client_id_tables AS ( + SELECT + table_catalog, + table_schema, + table_name, + FROM + `{project}.region-us.INFORMATION_SCHEMA.TABLE_OPTIONS` + WHERE + option_name = 'labels' + AND option_value LIKE '%(\"include_client_id\", \"false\")%' +) + +SELECT + table_catalog, + table_schema, + table_name, +FROM + `{project}.region-us.INFORMATION_SCHEMA.TABLES` +LEFT JOIN + no_client_id_tables +USING + (table_catalog, table_schema, table_name) +WHERE + -- find tables with columns ending with client_id + ( + ( + ddl LIKE '%client_id STRING%' AND no_client_id_tables.table_name IS NULL + ) + OR ( + -- glean tables may have an all null client_info.client_id column but may have a secondary client id + ddl LIKE '%client_id STRING%client_id STRING%' AND no_client_id_tables.table_name IS NOT NULL + ) + ) + AND table_type = 'BASE TABLE' -- exclude views + AND ( + table_schema LIKE '%_derived' + or table_schema LIKE '%_stable' + ) + -- TODO: can't get lineage for most opmon tables, need to figure this out separately + AND table_schema != "operational_monitoring_derived" + AND table_schema != "backfills_staging_derived" + AND table_name != 'deletion_request_v1' + AND table_name != 'deletion_request_v4' +""" + + +def find_client_id_tables(project: str) -> List[str]: + """Return a list of tables that have columns ending with 'client_id'.""" + client = bigquery.Client() + row_results = client.query_and_wait( + query=FIND_TABLES_QUERY_TEMPLATE.format(project=project) + ) + + return [f"{project}.{row.table_schema}.{row.table_name}" for row in row_results] + + +def get_upstream_stable_tables(id_tables: List[str]) -> Dict[str, Set[str]]: + """Build map of tables to upstream stable tables using GCP data catalog lineage. + + Note that the data catalog only uses the information from the last 30 days of query jobs. + """ + client = datacatalog_lineage.LineageClient() + + upstream_stable_tables = defaultdict(set) + + def traverse_upstream(base_table: str): + """Recursively traverse lineage to find stable tables.""" + table_ref = TableReference.from_string(base_table) + + if table_ref.dataset_id.endswith("_stable"): # stable tables are terminal nodes + upstream_stable_tables[base_table] = {base_table} + elif base_table not in upstream_stable_tables: + upstream_links_result = client.search_links( + request={ + "parent": "projects/moz-fx-data-shared-prod/locations/us", + "target": datacatalog_lineage.EntityReference( + fully_qualified_name=f"bigquery:{base_table}" + ), + } + ) + # recursively add upstream tables + for upstream_link in upstream_links_result: + # remove "bigquery:" and "sharded:" prefixes + parent_table = upstream_link.source.fully_qualified_name.split(":")[-1] + + upstream_stable_tables[base_table] = upstream_stable_tables[ + base_table + ].union(traverse_upstream(parent_table)) + + return upstream_stable_tables[base_table] + + upstream_stable_table_map = {} + + print("Upstream stable tables:") + for table_name in id_tables: + upstream_stable_table_map[table_name] = set(traverse_upstream(table_name)) + print(f"{table_name} upstream: {upstream_stable_table_map[table_name]}") + + return upstream_stable_table_map + + +def get_associated_deletions( + upstream_stable_tables: Dict[str, Set[str]] +) -> Dict[str, Set[DeleteSource]]: + """Get a list of associated deletion requests tables per table based on the stable tables.""" + # deletion targets for stable tables defined in the shredder config + known_stable_table_sources: Dict[str, Set[DeleteSource]] = { + f"{target.project}.{target.dataset_id}.{target.table_id}": ( + set(src) if isinstance(src, Iterable) else {src} + ) + for target, src in DELETE_TARGETS.items() + if target.dataset_id.endswith("_stable") + } + + table_to_deletions: Dict[str, Set[DeleteSource]] = {} + + datasets_with_additional_deletion_requests = {} + + for base_table in upstream_stable_tables: + if base_table.endswith(".additional_deletion_requests_v1"): + dataset_name = TableReference.from_string(base_table).dataset_id + datasets_with_additional_deletion_requests[dataset_name] = ( + f"{dataset_name}.additional_deletion_requests_v1" + ) + datasets_with_additional_deletion_requests[ + dataset_name.replace("_derived", "_stable") + ] = f"{dataset_name}.additional_deletion_requests_v1" + + glean_channel_names = get_glean_channel_to_app_name_mapping() + + for table_name, stable_tables in upstream_stable_tables.items(): + deletion_tables: Set[DeleteSource] = set() + + for stable_table in stable_tables: + if stable_table in known_stable_table_sources: + table_to_deletions[stable_table] = known_stable_table_sources[ + stable_table + ] + elif stable_table not in table_to_deletions: + stable_table_ref = TableReference.from_string(stable_table) + + # glean table + if ( + stable_table_ref.dataset_id[: -len("_stable")] + in glean_channel_names + ): + table_to_deletions[stable_table] = { + DeleteSource( + table=f"{stable_table_ref.dataset_id}.deletion_request_v1", + field=GLEAN_CLIENT_ID, + project=SHARED_PROD, + ) + } + if ( + stable_table_ref.dataset_id + in datasets_with_additional_deletion_requests + ): + table_to_deletions[stable_table].add( + DeleteSource( + table=datasets_with_additional_deletion_requests[ + stable_table_ref.dataset_id + ], + field=CLIENT_ID, + project=SHARED_PROD, + ) + ) + # unknown legacy telemetry or non-glean structured + else: + table_to_deletions[stable_table] = set() + + deletion_tables = deletion_tables.union(table_to_deletions[stable_table]) + + table_to_deletions[table_name] = deletion_tables + + return { + table: deletions + for table, deletions in table_to_deletions.items() + if table in upstream_stable_tables + } + + +def delete_source_to_dict(source: DeleteSource): + """Convert a DeleteSource to a dict, removing the condition field.""" + d = source.__dict__.copy() + d.pop("conditions") + return d + + +def get_missing_deletions( + associated_deletions: Dict[str, Set[DeleteSource]] +) -> List[Dict[str, Any]]: + """Get list of all tables with the currently configured deletion sources and the sources based on lineage.""" + # get the generated glean deletion list + with ThreadPool(processes=12) as pool: + bigquery_client = bigquery.Client() + glean_delete_targets = find_glean_targets(pool, client=bigquery_client) + + glean_channel_names = get_glean_channel_to_app_name_mapping() + glean_app_name_to_channels = defaultdict(list) + for channel, app_name in glean_channel_names.items(): + glean_app_name_to_channels[app_name].append(channel) + + table_deletions = [] + + for target, sources in (*glean_delete_targets.items(), *DELETE_TARGETS.items()): + target_table = f"{target.project}.{target.table}" + + # expand per-app deletion request views into per-channel tables + unnested_sources = set() + if isinstance(sources, Iterable): + for source in sources: + if ( + source.dataset_id in glean_app_name_to_channels + and source.table_id == "deletion_request" + ): + for channel in glean_app_name_to_channels[source.dataset_id]: + unnested_sources.add( + DeleteSource( + table=f"{channel}_stable.deletion_request_v1", + field=source.field, + project=source.project, + conditions=source.conditions, + ) + ) + else: + unnested_sources.add(source) + else: + unnested_sources.add(sources) + + # tables not in associated_deletions likely use another id column, e.g. user_id + detected_deletions = associated_deletions.pop(target_table, set()) + + table_deletions.append( + { + "project_id": target.project, + "dataset_id": target.dataset_id, + "table_id": target.table_id, + "current_sources": [ + delete_source_to_dict(source) for source in unnested_sources + ], + "detected_sources": [ + delete_source_to_dict(detected) for detected in detected_deletions + ], + "matching_sources": set(unnested_sources) == detected_deletions, + } + ) + + # delete target not in shredder config + for table_name, srcs in associated_deletions.items(): + project, dataset, table = table_name.split(".") + table_deletions.append( + { + "project_id": project, + "dataset_id": dataset, + "table_id": table, + "current_sources": [], + "detected_sources": [ + delete_source_to_dict(detected) for detected in detected_deletions + ], + "matching_sources": False, + } + ) + + return table_deletions + + +def write_to_bigquery( + run_date: datetime.datetime, + target_table: TableReference, + deletions: List[Dict[str, Any]], +): + client = bigquery.Client() + + result = client.load_table_from_json( + json_rows=[ + {"run_date": run_date.date().isoformat(), **deletion} + for deletion in deletions + ], + destination=f"{str(target_table)}${run_date.date().strftime('%Y%m%d')}", + job_config=bigquery.LoadJobConfig( + write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, + schema=Schema.from_schema_file( + Path(__file__).parent / "schema.yaml" + ).to_bigquery_schema(), + time_partitioning=bigquery.TimePartitioning(field="run_date"), + ), + ).result() + + print(f"Wrote {result.output_rows} rows to {result.destination}") + + +@click.command +@click.option( + "--run-date", type=click.DateTime(), help="The date to write in the output." +) +@click.option( + "--output-table", + type=TableReference.from_string, + metavar="PROJECT.DATASET.TABLE", + help="Table to write results to in the form of PROJECT.DATASET.TABLE.", +) +@click.option( + "--project-id", + default=SHARED_PROD, + help="BigQuery project to search for client id tables.", +) +def main(run_date, output_table, project_id): + """Find tables in the given project that could be added to shredder.""" + # TODO: handle other id columns + client_id_tables = find_client_id_tables(project_id) + + print(f"Found {len(client_id_tables)} client id tables.") + + upstream_stable_tables = get_upstream_stable_tables(client_id_tables) + + associated_deletions = get_associated_deletions(upstream_stable_tables) + + table_deletions = get_missing_deletions(associated_deletions) + + write_to_bigquery(run_date, output_table, table_deletions) + + +if __name__ == "__main__": + main() diff --git a/sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/schema.yaml b/sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/schema.yaml new file mode 100644 index 00000000000..5acefaba77c --- /dev/null +++ b/sql/moz-fx-data-shared-prod/monitoring_derived/shredder_targets_v1/schema.yaml @@ -0,0 +1,42 @@ +fields: +- name: run_date + type: DATE + mode: NULLABLE +- name: project_id + type: STRING + mode: NULLABLE +- name: dataset_id + type: STRING + mode: NULLABLE +- name: table_id + type: STRING + mode: NULLABLE +- name: current_sources + type: RECORD + mode: REPEATED + fields: + - name: table + type: STRING + mode: NULLABLE + - name: field + type: STRING + mode: NULLABLE + - name: project + type: STRING + mode: NULLABLE +- name: detected_sources + type: RECORD + mode: REPEATED + fields: + - name: table + type: STRING + mode: NULLABLE + - name: field + type: STRING + mode: NULLABLE + - name: project + type: STRING + mode: NULLABLE +- name: matching_sources + type: BOOLEAN + mode: NULLABLE