Skip to content

Commit

Permalink
Update deduplicate_staging_areas to take check specific file exists a… (
Browse files Browse the repository at this point in the history
#314)

* Update deduplicate_staging_areas to take check specific file exists and take in extra args and update requirements.txt

---------

Co-authored-by: dsp-fieldeng-bot <[email protected]>
  • Loading branch information
snovod and dsp-fieldeng-bot authored Oct 2, 2023
1 parent 4dd3ede commit 28b7fee
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 29 deletions.
113 changes: 97 additions & 16 deletions orchestration/hca_manage/deduplicate_staging_areas.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,35 @@
"""Outputs contents of up to two GCS paths, comparing if multiple paths are specified.
"""Outputs contents of up to two GCS paths, comparing if multiple paths are specified.
Usage:
> python3 deduplicate_staging_areas.py -s STAGING_AREA_PATH [--print_files] [--skip_deletion]"""

# Imports
import argparse
import os
import re
import argparse
from google.cloud import storage
import pandas as pd
import sys
from typing import Optional

# Library stubs not installed so ignoring linting failures
import pandas as pd # type: ignore[import]
from google.cloud import storage # type: ignore[import]

STAGING_AREA_BUCKETS = {
"prod": {
"EBI": "gs://broad-dsp-monster-hca-prod-ebi-storage/prod",
"UCSC": "gs://broad-dsp-monster-hca-prod-ebi-storage/prod",
"LANTERN": "gs://broad-dsp-monster-hca-prod-lantern",
"LATTICE": "gs://broad-dsp-monster-hca-prod-lattice/staging"
},
"dev": {
"EBI": "gs://broad-dsp-monster-hca-dev-ebi-staging/dev",
"UCSC": "gs://broad-dsp-monster-hca-dev-ebi-staging/dev",
}
}


# Function to return the objects in a staging area bucket
def get_staging_area_objects(bucket_name, prefix, delimiter=None):
def get_staging_area_objects(bucket_name: str, prefix: str, delimiter: Optional[str] = None) -> list[list[str]]:
record_list = []
try:
# List blobs in specified bucket/prefix
Expand All @@ -32,12 +50,13 @@ def get_staging_area_objects(bucket_name, prefix, delimiter=None):
print(f"Error retrieving objects from staging area: {str(e)}")
return record_list


# Function to identify outdated entity files
def identify_outdated_files(record_list):
def identify_outdated_files(record_list: Optional[list[list[str]]]) -> list[storage.blob]:
delete_list = []
if record_list:
if record_list:
# Load records into dataframe, group by path and entity, and order by version descending
df = pd.DataFrame(record_list, columns = ["blob", "path", "entity", "version"])
df = pd.DataFrame(record_list, columns=["blob", "path", "entity", "version"])
df["rn"] = df.groupby(["path", "entity"])["version"].rank(method="first", ascending=False)

# Identify outdated records and return as a list
Expand All @@ -46,8 +65,14 @@ def identify_outdated_files(record_list):
delete_list.append(row["blob"])
return delete_list


# Function to batch delete files
def batch_delete_files(delete_list, bucket_name, prefix, delimiter=None):
def batch_delete_files(
delete_list: list[storage.blob],
bucket_name: str,
prefix: str,
delimiter: Optional[str] = None
) -> None:
if delete_list:
try:
# Loop through and submit batch deletion requests (max 1000)
Expand All @@ -74,24 +99,80 @@ def batch_delete_files(delete_list, bucket_name, prefix, delimiter=None):
except Exception as e:
print(f"Error deleting objects: {str(e)}")


# Creates the staging area json
def create_staging_area_json(bucket_name: str, staging_area_json_prefix: str) -> None:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(staging_area_json_prefix)
print(f"Creating {os.path.join('gs://', bucket_name, staging_area_json_prefix)}")
with blob.open("w") as f:
f.write('{"is_delta": false}')


# Get staging area
def get_staging_area(
staging_area: Optional[str],
institution: Optional[str],
environment: str, uuid:
Optional[str]
) -> str:
# Staging area and institution are mutually exclusive so the return will never be optional
# If institution is provided then infer staging area fom that and uuid
if institution:
# Confirm uuid is passed in if --institution is used
if not uuid:
print("Must provide --uuid if using --institution")
sys.exit(1)
return os.path.join(STAGING_AREA_BUCKETS[environment][institution], uuid)
else:
return staging_area # type: ignore[return-value]


def check_staging_area_json_exists(bucket: str, prefix: str) -> bool:
storage_client = storage.Client()
gcs_bucket = storage_client.bucket(bucket)
if gcs_bucket.get_blob(prefix):
return True
return False


# Main function
if __name__ == "__main__":

# Set up argument parser
parser = argparse.ArgumentParser(description="Remove outdated entity files from HCA staging area.")
parser.add_argument("-s", "--staging_area", required=True, type=str, help="Full GCS path to the staging area of interest.")
parser.add_argument("-p", "--print_files", required=False, action="store_true", help="Add argument to print files to be removed.", default=False)
parser.add_argument("-n", "--skip_deletion", required=False, action="store_true", help="Add argument to skip file deltion.", default=False)
parser = argparse.ArgumentParser(
description="Remove outdated entity files from HCA staging area and check if staging_area.json exists.")
path_parser = parser.add_mutually_exclusive_group(required=True)
path_parser.add_argument("-s", "--staging_area", type=str, help="Full GCS path to the staging area of interest.")
path_parser.add_argument("-i", "--institution", choices=['EBI', 'LATTICE', 'UCSC'], type=str,
help="Must provide uuid if using institution")
parser.add_argument('-e', '--env', choices=['prod', 'dev'], default='prod')
parser.add_argument("-u", "--uuid", help="Only used if using --institution instead of --staging_area")
parser.add_argument("-p", "--print_files", required=False, action="store_true",
help="Add argument to print files to be removed.", default=False)
parser.add_argument("-n", "--skip_deletion", required=False, action="store_true",
help="Add argument to skip file deletion.", default=False)
args = parser.parse_args()

staging_area = get_staging_area(args.staging_area, args.institution, args.env, args.uuid)

# Initialize variables
bucket_name = re.match("gs:\/\/([a-z0-9\-_]+)\/", args.staging_area).group(1)
prefix = re.match("gs:\/\/[a-z0-9\-_]+\/([A-Za-z0-9\-_\/\.]+)", args.staging_area).group(1)
bucket_name = re.match(r"gs:\/\/([a-z0-9\-_]+)\/", staging_area).group(1) # type: ignore[union-attr]
prefix = re.match(r"gs:\/\/[a-z0-9\-_]+\/([A-Za-z0-9\-_\/\.]+)", staging_area).group(1) # type: ignore[union-attr]
# staging area is never optional but the function that returns it, get_staging_area, is confused
if prefix[-1] != "/":
prefix += "/"

# Check if staging_area.json exists
staging_area_prefix = os.path.join(prefix, 'staging_area.json')
if not check_staging_area_json_exists(bucket_name, staging_area_prefix):
print(f"staging_area.json does not exist in {staging_area}")
# Upload if it does not exist
create_staging_area_json(bucket_name, staging_area_prefix)

# Call functions to identify and remove outdated entity files
print(f"Evaluating outdated files in staging area: {args.staging_area}")
print(f"Evaluating outdated files in staging area: {staging_area}")
objects_list = get_staging_area_objects(bucket_name, prefix)
print(f"\t- Total objects found: {len(objects_list)}")
delete_list = identify_outdated_files(objects_list)
Expand Down
41 changes: 31 additions & 10 deletions orchestration/hca_manage/pull_dcp_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@

# Imports
import argparse
import data_repo_client
import pandas as pd
import google.auth
import google.auth.transport.requests
import requests

import data_repo_client # type: ignore[import]
import google.auth # type: ignore[import]
import google.auth.transport.requests # type: ignore[import]
import pandas as pd # type: ignore[import]
import requests # type: ignore[import]


# Function to return the objects in a staging area bucket
def get_snapshots(release):
def get_snapshots(release: str) -> pd.DataFrame:
try:
# Establish TDR API client
creds, project = google.auth.default()
Expand All @@ -32,20 +34,39 @@ def get_snapshots(release):
for snapshot_entry in snapshots_list.items:
public_flag = "N"
public_response = requests.get(
url=f"https://sam.dsde-prod.broadinstitute.org/api/resources/v2/datasnapshot/{snapshot_entry.id}/policies/reader/public",
url="https://sam.dsde-prod.broadinstitute.org/api/resources/v2/datasnapshot/" +
f"{snapshot_entry.id}/policies/reader/public",
headers={"Authorization": f"Bearer {creds.token}"},
)
if public_response.text == "true":
public_flag = "Y"
record = [snapshot_entry.id, snapshot_entry.name, snapshot_entry.data_project, snapshot_entry.created_date[0:10], snapshot_entry.created_date, public_flag]
record = [
snapshot_entry.id,
snapshot_entry.name,
snapshot_entry.data_project,
snapshot_entry.created_date[0:10],
snapshot_entry.created_date,
public_flag
]
records_list.append(record)
df = pd.DataFrame(records_list, columns =["TDR Snapshot ID", "TDR Snapshot Name", "TDR Snapshot Google Project", "Created Date", "Created Datetime", "Public Flag"])
df = pd.DataFrame(
records_list,
columns=[
"TDR Snapshot ID",
"TDR Snapshot Name",
"TDR Snapshot Google Project",
"Created Date",
"Created Datetime",
"Public Flag"
]
)
df_sorted = df.sort_values(by=["TDR Snapshot Name"], ignore_index=True)
except Exception as e:
print(f"Error retrieving snapshots: {str(e)}")
df_sorted = pd.DataFrame()
return df_sorted


# Main function
if __name__ == "__main__":

Expand All @@ -59,4 +80,4 @@ def get_snapshots(release):
df = get_snapshots(args.release)
file_name = f"dcp_snapshot_list_{args.release}.tsv"
df.to_csv(file_name, sep="\t")
print(f"Results outputed to {file_name}")
print(f"Results outputed to {file_name}")
6 changes: 3 additions & 3 deletions orchestration/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ broad-dagster-utils==0.6.7; python_version >= "3.9" and python_version < "3.10"
cached-property==1.5.2
cachetools==5.3.1; python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_full_version >= "3.6.0" and python_version >= "3.9" and python_version < "3.10"
certifi==2023.7.22; python_version >= "3.9" and python_version < "3.10"
charset-normalizer==3.2.0; python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.7.0"
charset-normalizer==3.3.0; python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.7.0"
click==7.1.2; python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.5.0"
colorama==0.4.6; python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" and platform_system == "Windows" or python_version >= "3.9" and python_version < "3.10" and platform_system == "Windows" and python_full_version >= "3.7.0"
coloredlogs==14.0; python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.5.0"
Expand All @@ -20,7 +20,7 @@ dagster-pandas==0.12.14
dagster-postgres==0.12.14
dagster-slack==0.12.14
dagster==0.12.14
data-repo-client==1.527.0
data-repo-client==1.528.0
docstring-parser==0.15; python_version >= "3.9" and python_version < "3.10"
frozenlist==1.4.0; python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.6.0"
google-api-core==2.12.0; python_version >= "3.9" and python_version < "3.10" and (python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_full_version >= "3.6.0" and python_version >= "3.9" and python_version < "3.10") and (python_version >= "3.7" and python_full_version < "3.0.0" or python_full_version >= "3.4.0" and python_version >= "3.7")
Expand Down Expand Up @@ -55,7 +55,7 @@ multidict==6.0.4; python_version >= "3.9" and python_version < "3.10" and python
numpy==1.26.0; python_version >= "3.9" and python_version < "3.11"
oauth2client==4.1.3
oauthlib==3.2.2; python_version >= "3.6"
packaging==23.1; python_version >= "3.9" and python_version < "3.10"
packaging==23.2; python_version >= "3.9" and python_version < "3.10"
pandas==2.1.1; python_version >= "3.9"
pendulum==2.1.2; python_version >= "3.9" and python_full_version < "3.0.0" and python_version < "3.10" or python_version >= "3.9" and python_version < "3.10" and python_full_version >= "3.5.0"
promise==2.3
Expand Down

0 comments on commit 28b7fee

Please sign in to comment.