From 86c582393730f04126a09f6a78d9a0ebb3d84782 Mon Sep 17 00:00:00 2001 From: Nate Calvanese <92991816+ncalvanese1@users.noreply.github.com> Date: Wed, 20 Sep 2023 09:14:15 -0400 Subject: [PATCH] Add files via upload (#312) * Add files via upload * Update requirements.txt --------- Co-authored-by: dsp-fieldeng-bot --- .../hca_manage/deduplicate_staging_areas.py | 102 ++++++++++++++++++ .../hca_manage/pull_dcp_snapshots.py | 62 +++++++++++ orchestration/requirements.txt | 20 ++-- 3 files changed, 174 insertions(+), 10 deletions(-) create mode 100644 orchestration/hca_manage/deduplicate_staging_areas.py create mode 100644 orchestration/hca_manage/pull_dcp_snapshots.py diff --git a/orchestration/hca_manage/deduplicate_staging_areas.py b/orchestration/hca_manage/deduplicate_staging_areas.py new file mode 100644 index 00000000..885a108e --- /dev/null +++ b/orchestration/hca_manage/deduplicate_staging_areas.py @@ -0,0 +1,102 @@ +"""Outputs contents of up to two GCS paths, comparing if multiple paths are specified. + +Usage: + > python3 deduplicate_staging_areas.py -s STAGING_AREA_PATH [--print_files] [--skip_deletion]""" + +# Imports +import os +import re +import argparse +from google.cloud import storage +import pandas as pd + +# Function to return the objects in a staging area bucket +def get_staging_area_objects(bucket_name, prefix, delimiter=None): + record_list = [] + try: + # List blobs in specified bucket/prefix + storage_client = storage.Client() + blobs = storage_client.list_blobs(bucket_name, prefix=prefix, delimiter=delimiter) + + # Parse blobs and return a list of records + for blob in blobs: + if prefix + "data/" not in blob.name: + obj = blob.name + path = os.path.split(blob.name)[0] + entity = os.path.split(blob.name)[1].split("_")[0] + version = os.path.split(blob.name)[1].split("_")[1] + record = [obj, path, entity, version] + record_list.append(record) + return record_list + except Exception as e: + print(f"Error retrieving objects from staging area: {str(e)}") + return record_list + +# Function to identify outdated entity files +def identify_outdated_files(record_list): + delete_list = [] + if record_list: + # Load records into dataframe, group by path and entity, and order by version descending + df = pd.DataFrame(record_list, columns = ["blob", "path", "entity", "version"]) + df["rn"] = df.groupby(["path", "entity"])["version"].rank(method="first", ascending=False) + + # Identify outdated records and return as a list + df_outdated = df[df["rn"] != 1] + for index, row in df_outdated.iterrows(): + delete_list.append(row["blob"]) + return delete_list + +# Function to batch delete files +def batch_delete_files(delete_list, bucket_name, prefix, delimiter=None): + if delete_list: + try: + # Loop through and submit batch deletion requests (max 1000) + deleted_list = [] + while True: + + # List blobs in specified bucket/prefix + storage_client = storage.Client() + blobs = storage_client.list_blobs(bucket_name, prefix=prefix, delimiter=delimiter) + + # Loop through blobs and delete those found on the delete list + iterator = 0 + with storage_client.batch(): + for blob in blobs: + if blob.name in delete_list and blob.name not in deleted_list and iterator < 1000: + iterator += 1 + deleted_list.append(blob.name) + blob.delete() + + # If all objects deleted, exit loop + if len(deleted_list) == len(delete_list): + break + print("Objects deleted successfully.") + except Exception as e: + print(f"Error deleting objects: {str(e)}") + +# Main function +if __name__ == "__main__": + + # Set up argument parser + parser = argparse.ArgumentParser(description="Remove outdated entity files from HCA staging area.") + parser.add_argument("-s", "--staging_area", required=True, type=str, help="Full GCS path to the staging area of interest.") + parser.add_argument("-p", "--print_files", required=False, action="store_true", help="Add argument to print files to be removed.", default=False) + parser.add_argument("-n", "--skip_deletion", required=False, action="store_true", help="Add argument to skip file deltion.", default=False) + args = parser.parse_args() + + # Initialize variables + bucket_name = re.match("gs:\/\/([a-z0-9\-_]+)\/", args.staging_area).group(1) + prefix = re.match("gs:\/\/[a-z0-9\-_]+\/([A-Za-z0-9\-_\/\.]+)", args.staging_area).group(1) + if prefix[-1] != "/": + prefix += "/" + + # Call functions to identify and remove outdated entity files + print(f"Evaluating outdated files in staging area: {args.staging_area}") + objects_list = get_staging_area_objects(bucket_name, prefix) + print(f"\t- Total objects found: {len(objects_list)}") + delete_list = identify_outdated_files(objects_list) + print(f"\t- Outdated objects found: {len(delete_list)}") + if args.print_files: + print("\t- Outdated object list: \n\t\t- " + "\n\t\t- ".join(delete_list)) + if not args.skip_deletion: + batch_delete_files(delete_list, bucket_name, prefix) diff --git a/orchestration/hca_manage/pull_dcp_snapshots.py b/orchestration/hca_manage/pull_dcp_snapshots.py new file mode 100644 index 00000000..7be41a17 --- /dev/null +++ b/orchestration/hca_manage/pull_dcp_snapshots.py @@ -0,0 +1,62 @@ +"""Outputs the snapshots for a particular DCP release + +Usage: + > python3 pull_dcp_snapshots.py -r dcp_release""" + +# Imports +import argparse +import data_repo_client +import pandas as pd +import google.auth +import google.auth.transport.requests +import requests + +# Function to return the objects in a staging area bucket +def get_snapshots(release): + try: + # Establish TDR API client + creds, project = google.auth.default() + auth_req = google.auth.transport.requests.Request() + creds.refresh(auth_req) + config = data_repo_client.Configuration() + config.host = "https://data.terra.bio" + config.access_token = creds.token + api_client = data_repo_client.ApiClient(configuration=config) + api_client.client_side_validation = False + + # Enumerate snapshots + snapshot_filter = "_" + release + snapshots_api = data_repo_client.SnapshotsApi(api_client=api_client) + snapshots_list = snapshots_api.enumerate_snapshots(filter=snapshot_filter, limit=1000) + records_list = [] + for snapshot_entry in snapshots_list.items: + public_flag = "N" + public_response = requests.get( + url=f"https://sam.dsde-prod.broadinstitute.org/api/resources/v2/datasnapshot/{snapshot_entry.id}/policies/reader/public", + headers={"Authorization": f"Bearer {creds.token}"}, + ) + if public_response.text == "true": + public_flag = "Y" + record = [snapshot_entry.id, snapshot_entry.name, snapshot_entry.data_project, snapshot_entry.created_date[0:10], snapshot_entry.created_date, public_flag] + records_list.append(record) + df = pd.DataFrame(records_list, columns =["TDR Snapshot ID", "TDR Snapshot Name", "TDR Snapshot Google Project", "Created Date", "Created Datetime", "Public Flag"]) + df_sorted = df.sort_values(by=["TDR Snapshot Name"], ignore_index=True) + except Exception as e: + print(f"Error retrieving snapshots: {str(e)}") + df_sorted = pd.DataFrame() + return df_sorted + +# Main function +if __name__ == "__main__": + + # Set up argument parser + parser = argparse.ArgumentParser(description="Pull snapshots for a particular DCP release.") + parser.add_argument("-r", "--release", required=True, type=str, help="DCP release code (e.g., dcp25).") + args = parser.parse_args() + + # Call functions to identify and remove outdated entity files + print(f"Pulling snapshots for release: {args.release}") + df = get_snapshots(args.release) + file_name = f"dcp_snapshot_list_{args.release}.tsv" + df.to_csv(file_name, sep="\t") + print(f"Results outputed to {file_name}") \ No newline at end of file diff --git a/orchestration/requirements.txt b/orchestration/requirements.txt index 2242e6eb..4a60b907 100644 --- a/orchestration/requirements.txt +++ b/orchestration/requirements.txt @@ -20,13 +20,13 @@ dagster-pandas==0.12.14 dagster-postgres==0.12.14 dagster-slack==0.12.14 dagster==0.12.14 -data-repo-client==1.521.0 +data-repo-client==1.527.0 docstring-parser==0.15; python_version >= "3.9" and python_version < "3.10" frozenlist==1.4.0; python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.6.0" google-api-core==2.11.1; python_version >= "3.9" and python_version < "3.10" and (python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_full_version >= "3.6.0" and python_version >= "3.9" and python_version < "3.10") and (python_version >= "3.7" and python_full_version < "3.0.0" or python_full_version >= "3.4.0" and python_version >= "3.7") google-api-python-client==1.12.11; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.4.0" -google-auth-httplib2==0.1.0; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.4.0" -google-auth==2.17.3; python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_full_version >= "3.6.0" and python_version >= "3.9" and python_version < "3.10" +google-auth-httplib2==0.1.1; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.4.0" +google-auth==2.23.0; python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_full_version >= "3.6.0" and python_version >= "3.9" and python_version < "3.10" google-cloud-bigquery==2.34.3; python_version >= "3.6" and python_version < "3.11" google-cloud-core==2.3.3; python_version >= "3.9" and python_version < "3.10" and (python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_full_version >= "3.6.0" and python_version >= "3.9" and python_version < "3.10") google-cloud-storage==1.44.0; (python_version >= "2.7" and python_full_version < "3.0.0") or (python_full_version >= "3.6.0") @@ -39,7 +39,7 @@ graphql-ws==0.3.1 greenlet==2.0.2; python_version >= "3" and python_full_version < "3.0.0" and (platform_machine == "aarch64" or platform_machine == "ppc64le" or platform_machine == "x86_64" or platform_machine == "amd64" or platform_machine == "AMD64" or platform_machine == "win32" or platform_machine == "WIN32") and (python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.6.0") or python_version >= "3" and (platform_machine == "aarch64" or platform_machine == "ppc64le" or platform_machine == "x86_64" or platform_machine == "amd64" or platform_machine == "AMD64" or platform_machine == "win32" or platform_machine == "WIN32") and (python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.6.0") and python_full_version >= "3.5.0" grpcio-health-checking==1.48.2; python_version >= "3.9" and python_version < "3.10" grpcio-status==1.48.2; python_version >= "3.9" and python_version < "3.10" -grpcio==1.57.0; python_version >= "3.9" and python_version < "3.10" +grpcio==1.58.0; python_version >= "3.9" and python_version < "3.10" hca-import-validation==0.0.17; python_version >= "3.6" httplib2==0.22.0; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.4.0" humanfriendly==10.0; python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.5.0" @@ -47,12 +47,12 @@ idna==3.4; python_version >= "3.9" and python_version < "3.10" and python_full_v jinja2==2.11.3; python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.5.0" jsonschema-specifications==2023.7.1; python_version >= "3.8" jsonschema==4.19.0; python_version >= "3.8" -kubernetes==27.2.0; python_version >= "3.6" +kubernetes==28.1.0; python_version >= "3.6" mako==1.2.2; python_version >= "3.7" markupsafe==2.0.1; python_version >= "3.6" more-itertools==10.1.0; python_version >= "3.8" multidict==6.0.4; python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.6.0" -numpy==1.25.2; python_version < "3.11" and python_version >= "3.9" +numpy==1.26.0; python_version >= "3.9" and python_version < "3.11" oauth2client==4.1.3 oauthlib==3.2.2; python_version >= "3.6" packaging==23.1; python_version >= "3.9" and python_version < "3.10" @@ -77,11 +77,11 @@ referencing==0.30.2; python_version >= "3.8" requests-oauthlib==1.3.1; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.4.0" and python_version >= "3.6" requests==2.31.0; python_version >= "3.9" and python_version < "3.10" and (python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.6.0") and (python_version >= "3.7" and python_full_version < "3.0.0" or python_full_version >= "3.4.0" and python_version >= "3.7") rfc3339-validator==0.1.4; (python_version >= "2.7" and python_full_version < "3.0.0") or (python_full_version >= "3.5.0") -rpds-py==0.10.2; python_version >= "3.8" +rpds-py==0.10.3; python_version >= "3.8" rsa==4.9; python_version >= "3.6" and python_version < "4" and (python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_full_version >= "3.6.0" and python_version >= "3.9" and python_version < "3.10") rx==1.6.3; python_version >= "3.9" and python_version < "3.10" six==1.16.0; python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.6.0" -slack-sdk==3.21.3; python_full_version >= "3.6.0" +slack-sdk==3.22.0; python_full_version >= "3.6.0" slackclient==2.9.4; python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.6.0" soupsieve==2.5; python_full_version >= "3.6.0" and python_version >= "3.8" sqlalchemy==1.4.49; python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.6.0" @@ -93,7 +93,7 @@ typing-compat==0.1.0; python_version >= "3.9" and python_full_version < "3.0.0" typing-extensions==3.10.0.2 tzdata==2023.3; python_version >= "3.9" uritemplate==3.0.1; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.4.0" -urllib3==2.0.4; python_version >= "3.9" and python_version < "3.10" +urllib3==1.26.16; python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.6.0" watchdog==3.0.0; python_version >= "3.9" and python_version < "3.10" -websocket-client==1.6.2; python_version >= "3.8" +websocket-client==1.6.3; python_version >= "3.8" yarl==1.9.2; python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.6.0"