Skip to content

Commit

Permalink
FE-350 updated manifest.py to accept a csv with a new value for publi…
Browse files Browse the repository at this point in the history
…c (No/Yes), so that we filter out Managed Access (Yes) data from the final partition set which will make snapshots public.
  • Loading branch information
bahill committed Nov 20, 2024
1 parent 641cba8 commit 8ce0a31
Showing 1 changed file with 21 additions and 10 deletions.
31 changes: 21 additions & 10 deletions orchestration/hca_manage/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,21 +92,22 @@ def _sanitize_gs_path(path: str) -> str:


def _parse_csv(csv_path: str, env: str, project_id_only: bool = False,
include_release_tag: bool = False, release_tag: str = "") -> list[list[str]]:
include_release_tag: bool = False, release_tag: str = "") -> tuple[list[list[str]], list[list[str]]]:
keys = set()
public_projects = set()
with open(csv_path, "r") as f:
reader = csv.reader(f)
for row in reader:
if not row:
logging.debug("Empty path detected, skipping")
continue

assert len(row) == 2
assert len(row) == 3, "CSV must have 3 columns: institution, project_id, and Yes or No for Public"
row = [x.strip() for x in row]
institution = row[0].upper()
project_id = find_project_id_in_str(row[1])
public = row[2].lower()

key = None
if project_id_only:
project_id = row[1]
key = project_id
Expand All @@ -126,18 +127,25 @@ def _parse_csv(csv_path: str, env: str, project_id_only: bool = False,
key = key + f",{release_tag}"
keys.add(key)

# make a separate set of public projects
if public == "no":
public_projects.add(key)

chunked_paths = chunked(keys, MAX_STAGING_AREAS_PER_PARTITION_SET)
return [chunk for chunk in chunked_paths]
chunked_paths_no_ma = chunked(public_projects, MAX_STAGING_AREAS_PER_PARTITION_SET)
return [chunk for chunk in chunked_paths], [chunk for chunk in chunked_paths_no_ma]


def parse_and_load_manifest(env: str, csv_path: str, release_tag: str,
pipeline_name: str, project_id_only: bool = False,
include_release_tag: bool = False) -> None:
include_release_tag: bool = False, no_ma: bool = False) -> None:
chunked_paths = _parse_csv(csv_path, env, project_id_only, include_release_tag, release_tag)
paths_to_use = chunked_paths[1] if no_ma else chunked_paths[0]
print(f"paths_to_use: {paths_to_use}")
storage_client = Client()
bucket: Bucket = storage_client.bucket(bucket_name=ETL_PARTITION_BUCKETS[env])

for pos, chunk in enumerate(chunked_paths):
for pos, chunk in enumerate(paths_to_use):
assert len(chunk), "At least one import path is required"
qualifier = chr(pos + 97) # dcp11_a, dcp11_b, etc.
blob_name = f"{pipeline_name}/{release_tag}_{qualifier}_manifest.csv"
Expand All @@ -146,8 +154,9 @@ def parse_and_load_manifest(env: str, csv_path: str, release_tag: str,
if not query_yes_no(f"Manifest {blob.name} already exists for pipeline {pipeline_name}, overwrite?"):
return

logging.info(f"Uploading manifest [bucket={bucket.name}, name={blob_name}]")
blob.upload_from_string(data="\n".join(chunk))
# TODO turn back on
# logging.info(f"Uploading manifest [bucket={bucket.name}, name={blob_name}]")
# blob.upload_from_string(data="\n".join(chunk))


def _get_dagster_client() -> DagsterGraphQLClient:
Expand Down Expand Up @@ -196,9 +205,11 @@ def load(args: argparse.Namespace) -> None:
args.release_tag,
f"make_snapshot_public_job_{ENV_PIPELINE_ENDINGS[args.env]}",
project_id_only=True,
include_release_tag=True
include_release_tag=True,
no_ma=True
)
_reload_repository(_get_dagster_client())
# TODO turn back on
# _reload_repository(_get_dagster_client())


def enumerate_manifests(args: argparse.Namespace) -> None:
Expand Down

0 comments on commit 8ce0a31

Please sign in to comment.