From 28b7fee6eb0ae02330fb6b68f0780107af58555d Mon Sep 17 00:00:00 2001 From: snovod Date: Mon, 2 Oct 2023 14:08:24 -0400 Subject: [PATCH] =?UTF-8?q?Update=20deduplicate=5Fstaging=5Fareas=20to=20t?= =?UTF-8?q?ake=20check=20specific=20file=20exists=20a=E2=80=A6=20(#314)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Update deduplicate_staging_areas to take check specific file exists and take in extra args and update requirements.txt --------- Co-authored-by: dsp-fieldeng-bot --- .../hca_manage/deduplicate_staging_areas.py | 113 +++++++++++++++--- .../hca_manage/pull_dcp_snapshots.py | 41 +++++-- orchestration/requirements.txt | 6 +- 3 files changed, 131 insertions(+), 29 deletions(-) diff --git a/orchestration/hca_manage/deduplicate_staging_areas.py b/orchestration/hca_manage/deduplicate_staging_areas.py index 885a108e..3934d7a0 100644 --- a/orchestration/hca_manage/deduplicate_staging_areas.py +++ b/orchestration/hca_manage/deduplicate_staging_areas.py @@ -1,17 +1,35 @@ -"""Outputs contents of up to two GCS paths, comparing if multiple paths are specified. +"""Outputs contents of up to two GCS paths, comparing if multiple paths are specified. Usage: > python3 deduplicate_staging_areas.py -s STAGING_AREA_PATH [--print_files] [--skip_deletion]""" # Imports +import argparse import os import re -import argparse -from google.cloud import storage -import pandas as pd +import sys +from typing import Optional + +# Library stubs not installed so ignoring linting failures +import pandas as pd # type: ignore[import] +from google.cloud import storage # type: ignore[import] + +STAGING_AREA_BUCKETS = { + "prod": { + "EBI": "gs://broad-dsp-monster-hca-prod-ebi-storage/prod", + "UCSC": "gs://broad-dsp-monster-hca-prod-ebi-storage/prod", + "LANTERN": "gs://broad-dsp-monster-hca-prod-lantern", + "LATTICE": "gs://broad-dsp-monster-hca-prod-lattice/staging" + }, + "dev": { + "EBI": "gs://broad-dsp-monster-hca-dev-ebi-staging/dev", + "UCSC": "gs://broad-dsp-monster-hca-dev-ebi-staging/dev", + } +} + # Function to return the objects in a staging area bucket -def get_staging_area_objects(bucket_name, prefix, delimiter=None): +def get_staging_area_objects(bucket_name: str, prefix: str, delimiter: Optional[str] = None) -> list[list[str]]: record_list = [] try: # List blobs in specified bucket/prefix @@ -32,12 +50,13 @@ def get_staging_area_objects(bucket_name, prefix, delimiter=None): print(f"Error retrieving objects from staging area: {str(e)}") return record_list + # Function to identify outdated entity files -def identify_outdated_files(record_list): +def identify_outdated_files(record_list: Optional[list[list[str]]]) -> list[storage.blob]: delete_list = [] - if record_list: + if record_list: # Load records into dataframe, group by path and entity, and order by version descending - df = pd.DataFrame(record_list, columns = ["blob", "path", "entity", "version"]) + df = pd.DataFrame(record_list, columns=["blob", "path", "entity", "version"]) df["rn"] = df.groupby(["path", "entity"])["version"].rank(method="first", ascending=False) # Identify outdated records and return as a list @@ -46,8 +65,14 @@ def identify_outdated_files(record_list): delete_list.append(row["blob"]) return delete_list + # Function to batch delete files -def batch_delete_files(delete_list, bucket_name, prefix, delimiter=None): +def batch_delete_files( + delete_list: list[storage.blob], + bucket_name: str, + prefix: str, + delimiter: Optional[str] = None +) -> None: if delete_list: try: # Loop through and submit batch deletion requests (max 1000) @@ -74,24 +99,80 @@ def batch_delete_files(delete_list, bucket_name, prefix, delimiter=None): except Exception as e: print(f"Error deleting objects: {str(e)}") + +# Creates the staging area json +def create_staging_area_json(bucket_name: str, staging_area_json_prefix: str) -> None: + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(staging_area_json_prefix) + print(f"Creating {os.path.join('gs://', bucket_name, staging_area_json_prefix)}") + with blob.open("w") as f: + f.write('{"is_delta": false}') + + +# Get staging area +def get_staging_area( + staging_area: Optional[str], + institution: Optional[str], + environment: str, uuid: + Optional[str] +) -> str: + # Staging area and institution are mutually exclusive so the return will never be optional + # If institution is provided then infer staging area fom that and uuid + if institution: + # Confirm uuid is passed in if --institution is used + if not uuid: + print("Must provide --uuid if using --institution") + sys.exit(1) + return os.path.join(STAGING_AREA_BUCKETS[environment][institution], uuid) + else: + return staging_area # type: ignore[return-value] + + +def check_staging_area_json_exists(bucket: str, prefix: str) -> bool: + storage_client = storage.Client() + gcs_bucket = storage_client.bucket(bucket) + if gcs_bucket.get_blob(prefix): + return True + return False + + # Main function if __name__ == "__main__": # Set up argument parser - parser = argparse.ArgumentParser(description="Remove outdated entity files from HCA staging area.") - parser.add_argument("-s", "--staging_area", required=True, type=str, help="Full GCS path to the staging area of interest.") - parser.add_argument("-p", "--print_files", required=False, action="store_true", help="Add argument to print files to be removed.", default=False) - parser.add_argument("-n", "--skip_deletion", required=False, action="store_true", help="Add argument to skip file deltion.", default=False) + parser = argparse.ArgumentParser( + description="Remove outdated entity files from HCA staging area and check if staging_area.json exists.") + path_parser = parser.add_mutually_exclusive_group(required=True) + path_parser.add_argument("-s", "--staging_area", type=str, help="Full GCS path to the staging area of interest.") + path_parser.add_argument("-i", "--institution", choices=['EBI', 'LATTICE', 'UCSC'], type=str, + help="Must provide uuid if using institution") + parser.add_argument('-e', '--env', choices=['prod', 'dev'], default='prod') + parser.add_argument("-u", "--uuid", help="Only used if using --institution instead of --staging_area") + parser.add_argument("-p", "--print_files", required=False, action="store_true", + help="Add argument to print files to be removed.", default=False) + parser.add_argument("-n", "--skip_deletion", required=False, action="store_true", + help="Add argument to skip file deletion.", default=False) args = parser.parse_args() + staging_area = get_staging_area(args.staging_area, args.institution, args.env, args.uuid) + # Initialize variables - bucket_name = re.match("gs:\/\/([a-z0-9\-_]+)\/", args.staging_area).group(1) - prefix = re.match("gs:\/\/[a-z0-9\-_]+\/([A-Za-z0-9\-_\/\.]+)", args.staging_area).group(1) + bucket_name = re.match(r"gs:\/\/([a-z0-9\-_]+)\/", staging_area).group(1) # type: ignore[union-attr] + prefix = re.match(r"gs:\/\/[a-z0-9\-_]+\/([A-Za-z0-9\-_\/\.]+)", staging_area).group(1) # type: ignore[union-attr] + # staging area is never optional but the function that returns it, get_staging_area, is confused if prefix[-1] != "/": prefix += "/" + # Check if staging_area.json exists + staging_area_prefix = os.path.join(prefix, 'staging_area.json') + if not check_staging_area_json_exists(bucket_name, staging_area_prefix): + print(f"staging_area.json does not exist in {staging_area}") + # Upload if it does not exist + create_staging_area_json(bucket_name, staging_area_prefix) + # Call functions to identify and remove outdated entity files - print(f"Evaluating outdated files in staging area: {args.staging_area}") + print(f"Evaluating outdated files in staging area: {staging_area}") objects_list = get_staging_area_objects(bucket_name, prefix) print(f"\t- Total objects found: {len(objects_list)}") delete_list = identify_outdated_files(objects_list) diff --git a/orchestration/hca_manage/pull_dcp_snapshots.py b/orchestration/hca_manage/pull_dcp_snapshots.py index 7be41a17..a284e8ec 100644 --- a/orchestration/hca_manage/pull_dcp_snapshots.py +++ b/orchestration/hca_manage/pull_dcp_snapshots.py @@ -5,14 +5,16 @@ # Imports import argparse -import data_repo_client -import pandas as pd -import google.auth -import google.auth.transport.requests -import requests + +import data_repo_client # type: ignore[import] +import google.auth # type: ignore[import] +import google.auth.transport.requests # type: ignore[import] +import pandas as pd # type: ignore[import] +import requests # type: ignore[import] + # Function to return the objects in a staging area bucket -def get_snapshots(release): +def get_snapshots(release: str) -> pd.DataFrame: try: # Establish TDR API client creds, project = google.auth.default() @@ -32,20 +34,39 @@ def get_snapshots(release): for snapshot_entry in snapshots_list.items: public_flag = "N" public_response = requests.get( - url=f"https://sam.dsde-prod.broadinstitute.org/api/resources/v2/datasnapshot/{snapshot_entry.id}/policies/reader/public", + url="https://sam.dsde-prod.broadinstitute.org/api/resources/v2/datasnapshot/" + + f"{snapshot_entry.id}/policies/reader/public", headers={"Authorization": f"Bearer {creds.token}"}, ) if public_response.text == "true": public_flag = "Y" - record = [snapshot_entry.id, snapshot_entry.name, snapshot_entry.data_project, snapshot_entry.created_date[0:10], snapshot_entry.created_date, public_flag] + record = [ + snapshot_entry.id, + snapshot_entry.name, + snapshot_entry.data_project, + snapshot_entry.created_date[0:10], + snapshot_entry.created_date, + public_flag + ] records_list.append(record) - df = pd.DataFrame(records_list, columns =["TDR Snapshot ID", "TDR Snapshot Name", "TDR Snapshot Google Project", "Created Date", "Created Datetime", "Public Flag"]) + df = pd.DataFrame( + records_list, + columns=[ + "TDR Snapshot ID", + "TDR Snapshot Name", + "TDR Snapshot Google Project", + "Created Date", + "Created Datetime", + "Public Flag" + ] + ) df_sorted = df.sort_values(by=["TDR Snapshot Name"], ignore_index=True) except Exception as e: print(f"Error retrieving snapshots: {str(e)}") df_sorted = pd.DataFrame() return df_sorted + # Main function if __name__ == "__main__": @@ -59,4 +80,4 @@ def get_snapshots(release): df = get_snapshots(args.release) file_name = f"dcp_snapshot_list_{args.release}.tsv" df.to_csv(file_name, sep="\t") - print(f"Results outputed to {file_name}") \ No newline at end of file + print(f"Results outputed to {file_name}") diff --git a/orchestration/requirements.txt b/orchestration/requirements.txt index 17f53d1c..fb67a619 100644 --- a/orchestration/requirements.txt +++ b/orchestration/requirements.txt @@ -9,7 +9,7 @@ broad-dagster-utils==0.6.7; python_version >= "3.9" and python_version < "3.10" cached-property==1.5.2 cachetools==5.3.1; python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_full_version >= "3.6.0" and python_version >= "3.9" and python_version < "3.10" certifi==2023.7.22; python_version >= "3.9" and python_version < "3.10" -charset-normalizer==3.2.0; python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.7.0" +charset-normalizer==3.3.0; python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.7.0" click==7.1.2; python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.5.0" colorama==0.4.6; python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" and platform_system == "Windows" or python_version >= "3.9" and python_version < "3.10" and platform_system == "Windows" and python_full_version >= "3.7.0" coloredlogs==14.0; python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.5.0" @@ -20,7 +20,7 @@ dagster-pandas==0.12.14 dagster-postgres==0.12.14 dagster-slack==0.12.14 dagster==0.12.14 -data-repo-client==1.527.0 +data-repo-client==1.528.0 docstring-parser==0.15; python_version >= "3.9" and python_version < "3.10" frozenlist==1.4.0; python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.6.0" google-api-core==2.12.0; python_version >= "3.9" and python_version < "3.10" and (python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_full_version >= "3.6.0" and python_version >= "3.9" and python_version < "3.10") and (python_version >= "3.7" and python_full_version < "3.0.0" or python_full_version >= "3.4.0" and python_version >= "3.7") @@ -55,7 +55,7 @@ multidict==6.0.4; python_version >= "3.9" and python_version < "3.10" and python numpy==1.26.0; python_version >= "3.9" and python_version < "3.11" oauth2client==4.1.3 oauthlib==3.2.2; python_version >= "3.6" -packaging==23.1; python_version >= "3.9" and python_version < "3.10" +packaging==23.2; python_version >= "3.9" and python_version < "3.10" pandas==2.1.1; python_version >= "3.9" pendulum==2.1.2; python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.5.0" promise==2.3