Skip to content

Commit

Permalink
Added flat file handling (#142)
Browse files Browse the repository at this point in the history
* Added flat file handling

* Some coverage updates

* more coverage, PR feedback
  • Loading branch information
dogversioning authored Dec 5, 2024
1 parent dc96834 commit 512074e
Show file tree
Hide file tree
Showing 21 changed files with 899 additions and 252 deletions.
4 changes: 2 additions & 2 deletions scripts/credential_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ def _put_s3_data(name: str, bucket_name: str, client, data: dict, path: str = "a
client.upload_fileobj(Bucket=bucket_name, Key=f"{path}/{name}", Fileobj=b_data)


def create_auth(client, user: str, auth: str, site: str) -> str:
def create_auth(user: str, auth: str, site: str) -> str:
"""Adds a new entry to the auth dict used to issue pre-signed URLs"""
site_id = _basic_auth_str(user, auth).split(" ")[1]
return f'"{site_id}"": {{"user": {user}, "site":{site}}}'
return f'"{site_id}": {{"user": "{user}", "site":"{site}"}}'


def create_meta(client, bucket_name: str, site: str, folder: str) -> None:
Expand Down
25 changes: 25 additions & 0 deletions src/shared/awswrangler_functions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""functions specifically requiring AWSWranger, which requires a lambda layer"""

import csv

import awswrangler
import numpy

from .enums import BucketPath

Expand Down Expand Up @@ -35,3 +38,25 @@ def get_s3_study_meta_list(
),
suffix=extension,
)


def generate_csv_from_parquet(
bucket_name: str, bucket_root: str, subbucket_path: str, to_path: str | None = None
):
"""Convenience function for generating csvs for dashboard upload
TODO: Remove on dashboard parquet/API support"""
if to_path is None:
to_path = f"s3://{bucket_name}/{bucket_root}/{subbucket_path}".replace(".parquet", ".csv")
last_valid_df = awswrangler.s3.read_parquet(
f"s3://{bucket_name}/{bucket_root}" f"/{subbucket_path}"
)
last_valid_df = last_valid_df.apply(lambda x: x.strip() if isinstance(x, str) else x).replace(
'""', numpy.nan
)
awswrangler.s3.to_csv(
last_valid_df,
to_path,
index=False,
quoting=csv.QUOTE_MINIMAL,
)
26 changes: 20 additions & 6 deletions src/shared/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ class BucketPath(enum.Enum):
ARCHIVE = "archive"
CACHE = "cache"
CSVAGGREGATE = "csv_aggregates"
CSVFLAT = "csv_flat"
ERROR = "error"
FLAT = "flat"
LAST_VALID = "last_valid"
LATEST = "latest"
META = "metadata"
Expand All @@ -33,9 +35,19 @@ class JsonFilename(enum.Enum):
COLUMN_TYPES = "column_types"
TRANSACTIONS = "transactions"
DATA_PACKAGES = "data_packages"
FLAT_PACKAGES = "flat_packages"
STUDY_PERIODS = "study_periods"


class StudyPeriodMetadataKeys(enum.Enum):
"""stores names of expected keys in the study period metadata dictionary"""

STUDY_PERIOD_FORMAT_VERSION = "study_period_format_version"
EARLIEST_DATE = "earliest_date"
LATEST_DATE = "latest_date"
LAST_DATA_UPDATE = "last_data_update"


class TransactionKeys(enum.Enum):
"""stores names of expected keys in the transaction dictionary"""

Expand All @@ -47,10 +59,12 @@ class TransactionKeys(enum.Enum):
DELETED = "deleted"


class StudyPeriodMetadataKeys(enum.Enum):
"""stores names of expected keys in the study period metadata dictionary"""
class UploadTypes(enum.Enum):
"""stores names of different expected upload formats"""

STUDY_PERIOD_FORMAT_VERSION = "study_period_format_version"
EARLIEST_DATE = "earliest_date"
LATEST_DATE = "latest_date"
LAST_DATA_UPDATE = "last_data_update"
# archive is not expected to be uploaded, but is one of the generated file types
# in the library
ARCHIVE = "archive"
CUBE = "cube"
FLAT = "flat"
META = "meta"
45 changes: 35 additions & 10 deletions src/shared/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def update_metadata(
# Should only be hit if you add a new JSON dict and forget to add it
# to this function
case _:
raise OSError(f"{meta_type} does not have a handler for updates.")
raise ValueError(f"{meta_type} does not have a handler for updates.")
data_version_metadata.update(extra_items)
return metadata

Expand All @@ -158,7 +158,7 @@ def write_metadata(
s3_client.put_object(
Bucket=s3_bucket_name,
Key=f"{enums.BucketPath.META.value}/{meta_type}.json",
Body=json.dumps(metadata, default=str),
Body=json.dumps(metadata, default=str, indent=2),
)


Expand All @@ -182,6 +182,30 @@ def move_s3_file(s3_client, s3_bucket_name: str, old_key: str, new_key: str) ->
raise S3UploadError


def get_s3_keys(
s3_client,
s3_bucket_name: str,
prefix: str,
token: str | None = None,
max_keys: int | None = None,
) -> list[str]:
"""Gets the list of all keys in S3 starting with the prefix"""
if max_keys is None:
max_keys = 1000
if token:
res = s3_client.list_objects_v2(
Bucket=s3_bucket_name, Prefix=prefix, ContinuationToken=token, MaxKeys=max_keys
)
else:
res = s3_client.list_objects_v2(Bucket=s3_bucket_name, Prefix=prefix, MaxKeys=max_keys)
if "Contents" not in res:
return []
contents = [record["Key"] for record in res["Contents"]]
if res["IsTruncated"]:
contents += get_s3_keys(s3_client, s3_bucket_name, prefix, res["NextContinuationToken"])
return contents


def get_s3_site_filename_suffix(s3_path: str):
"""Extracts site/filename data from s3 path"""
# The expected s3 path for site data packages looks like:
Expand Down Expand Up @@ -209,14 +233,15 @@ def get_latest_data_package_version(bucket, prefix):
prefix = prefix + "/"
s3_res = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
highest_ver = None
for item in s3_res["Contents"]:
ver_str = item["Key"].replace(prefix, "").split("/")[0]
if ver_str.isdigit():
if highest_ver is None:
highest_ver = ver_str
else:
if int(highest_ver) < int(ver_str):
if "Contents" in s3_res:
for item in s3_res["Contents"]:
ver_str = item["Key"].replace(prefix, "").split("/")[1].split("__")[2]
if ver_str.isdigit():
if highest_ver is None:
highest_ver = ver_str
if highest_ver is None:
else:
if int(highest_ver) < int(ver_str):
highest_ver = ver_str
if "Contents" not in s3_res or highest_ver is None:
logging.error("No data package versions found for %s", prefix)
return highest_ver
Loading

0 comments on commit 512074e

Please sign in to comment.