Skip to content

Commit

Permalink
Backend: Adds Phrma cancer screening pipeline (#3567)
Browse files Browse the repository at this point in the history
# Description and Motivation
<!--- bulleted, high level items. use keywords (eg "closes #144" or
"fixes #4323") -->

- part of #3547 
- more shared phrma utils
- generates all breakdowns without crashing: age / race / education /
income / insurance status X national / state

## Has this been tested? How?

- initial tests scaffolded out and each breakdown runs (and properly
created .csv files for checks and for later conversion into golden_data)

## Screenshots

pandas printout of tests running and printing the df per breakdown
<img width="1454" alt="Screenshot 2024-08-20 at 5 19 53 AM"
src="https://github.com/user-attachments/assets/8e15177e-ad2e-4460-abb9-c05d742bab52">


## Types of changes

(leave all that apply)

- New content or feature

## New frontend preview link is below in the Netlify comment 😎
  • Loading branch information
benhammondmusic authored Aug 20, 2024
1 parent 23dcd12 commit 1ee298c
Show file tree
Hide file tree
Showing 6 changed files with 458 additions and 313 deletions.
151 changes: 7 additions & 144 deletions python/datasources/phrma.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,35 @@
from datasources.data_source import DataSource
from ingestion.constants import (
COUNTY_LEVEL,
STATE_LEVEL,
NATIONAL_LEVEL,
ALL_VALUE,
US_FIPS,
US_NAME,
UNKNOWN,
)
from ingestion.dataset_utils import (
ensure_leading_zeros,
generate_pct_share_col_with_unknowns,
generate_pct_share_col_without_unknowns,
)
from ingestion import gcs_to_bq_util, standardized_columns as std_col
from ingestion.merge_utils import merge_county_names, merge_state_ids, merge_dfs_list
from ingestion.merge_utils import merge_county_names, merge_state_ids
from ingestion.het_types import (
GEO_TYPE,
SEX_RACE_ETH_AGE_TYPE,
PHRMA_BREAKDOWN_TYPE_OR_ALL,
PHRMA_BREAKDOWN_TYPE,
)
from ingestion.phrma_utils import (
TMP_ALL,
PHRMA_DIR,
get_sheet_name,
ADHERENCE_RATE,
PER_100K,
MEDICARE_DISEASE_COUNT,
COUNT_TOTAL,
COUNT_YES,
RACE_NAME,
AGE_GROUP,
MEDICARE_POP_COUNT,
SEX_NAME,
LIS,
ENTLMT_RSN_CURR,
STATE_FIPS,
COUNTY_FIPS,
PHRMA_PCT_CONDITIONS,
PHRMA_100K_CONDITIONS,
rename_cols,
ADHERENCE,
BENEFICIARIES,
BREAKDOWN_TO_STANDARD_BY_COL,
load_phrma_df_from_data_dir,
)


Expand All @@ -56,43 +45,6 @@

# constants
ELIGIBILITY = "eligibility"
ADHERENCE = 'adherence'
BENEFICIARIES = 'beneficiaries'
DTYPE = {'COUNTY_FIPS': str, 'STATE_FIPS': str}


# a nested dictionary that contains values swaps per column name
BREAKDOWN_TO_STANDARD_BY_COL = {
std_col.LIS_COL: {
"Yes": "Receiving low income subsidy (LIS)",
"No": "Not receiving low income subsidy (LIS)",
},
std_col.ELIGIBILITY_COL: {
"Aged": "Eligible due to age",
"Disabled": "Eligible due to disability",
"ESRD": "Eligible due to end-stage renal disease (ESRD)",
"Disabled and ESRD": "Eligible due to disability and end-stage renal disease (ESRD)",
},
std_col.AGE_COL: {
"_18-39": "18-39",
"_40-64": "40-64",
"_65-69": "65-69",
"_70-74": "70-74",
"_75-79": "75-79",
"_80-84": "80-84",
"_85+": "85+",
},
std_col.RACE_CATEGORY_ID_COL: {
'Unknown': std_col.Race.UNKNOWN.value,
'American Indian / Alaska Native': std_col.Race.AIAN_NH.value,
'Asian/Pacific Islander': std_col.Race.API_NH.value,
'Black or African-American': std_col.Race.BLACK_NH.value,
'Hispanic': std_col.Race.HISP.value,
'Other': std_col.Race.OTHER_NONSTANDARD_NH.value,
'Non-Hispanic White': std_col.Race.WHITE_NH.value,
},
# SEX source groups already match needed HET groups
}


class PhrmaData(DataSource):
Expand All @@ -111,7 +63,7 @@ def write_to_bq(self, dataset, gcs_bucket, **attrs):
demo_type = self.get_attr(attrs, 'demographic')
geo_level = self.get_attr(attrs, 'geographic')

alls_df = load_phrma_df_from_data_dir(geo_level, TMP_ALL)
alls_df = load_phrma_df_from_data_dir(geo_level, TMP_ALL, 'standard')

table_name = f'{demo_type}_{geo_level}'
df = self.generate_breakdown_df(demo_type, geo_level, alls_df)
Expand Down Expand Up @@ -169,7 +121,7 @@ def generate_breakdown_df(

fips_to_use = std_col.COUNTY_FIPS_COL if geo_level == COUNTY_LEVEL else std_col.STATE_FIPS_COL

breakdown_group_df = load_phrma_df_from_data_dir(geo_level, demo_breakdown)
breakdown_group_df = load_phrma_df_from_data_dir(geo_level, demo_breakdown, 'standard')

df = pd.concat([breakdown_group_df, alls_df], axis=0)
df = df.replace(to_replace=BREAKDOWN_TO_STANDARD_BY_COL)
Expand Down Expand Up @@ -243,92 +195,3 @@ def generate_breakdown_df(
df = df.sort_values(by=[fips_to_use, demo_col]).reset_index(drop=True)

return df


def load_phrma_df_from_data_dir(geo_level: GEO_TYPE, breakdown: PHRMA_BREAKDOWN_TYPE_OR_ALL) -> pd.DataFrame:
"""Generates Phrma data by breakdown and geo_level
geo_level: string equal to `county`, `national`, or `state`
breakdown: string equal to `age`, `race_and_ethnicity`, `sex`, `lis`, `eligibility`, or `all`
return: a single data frame of data by demographic breakdown and
geo_level with data columns loaded from multiple Phrma source tables"""

sheet_name = get_sheet_name(geo_level, breakdown)
merge_cols = []

if geo_level == COUNTY_LEVEL:
merge_cols.append(std_col.COUNTY_FIPS_COL)
else:
merge_cols.append(std_col.STATE_FIPS_COL)

if breakdown != TMP_ALL:
breakdown_col = std_col.RACE_CATEGORY_ID_COL if breakdown == std_col.RACE_OR_HISPANIC_COL else breakdown
merge_cols.append(breakdown_col)
fips_col = std_col.COUNTY_FIPS_COL if geo_level == COUNTY_LEVEL else std_col.STATE_FIPS_COL

breakdown_het_to_source_type = {
"age": AGE_GROUP,
"race_and_ethnicity": RACE_NAME,
"sex": SEX_NAME,
"lis": LIS,
"eligibility": ENTLMT_RSN_CURR,
}

# only read certain columns from source data
keep_cols = []
fips_length = 0

if breakdown != TMP_ALL:
keep_cols.append(breakdown_het_to_source_type[breakdown])

if geo_level == COUNTY_LEVEL:
fips_length = 5
keep_cols.append(COUNTY_FIPS)
if geo_level == STATE_LEVEL:
fips_length = 2
keep_cols.append(STATE_FIPS)
if geo_level == NATIONAL_LEVEL:
fips_length = 2

topic_dfs = []
condition_keep_cols = []

for condition in [*PHRMA_PCT_CONDITIONS, *PHRMA_100K_CONDITIONS]:
if condition in PHRMA_PCT_CONDITIONS:
condition_keep_cols = [*keep_cols, COUNT_YES, COUNT_TOTAL, ADHERENCE_RATE]

if condition in PHRMA_100K_CONDITIONS:
condition_keep_cols = [
*keep_cols,
MEDICARE_DISEASE_COUNT,
MEDICARE_POP_COUNT,
PER_100K,
]

topic_df = gcs_to_bq_util.load_csv_as_df_from_data_dir(
PHRMA_DIR,
f'{condition}-{sheet_name}.csv',
subdirectory=condition,
dtype=DTYPE,
na_values=["."],
usecols=condition_keep_cols,
)

if geo_level == NATIONAL_LEVEL:
topic_df[STATE_FIPS] = US_FIPS

topic_df = rename_cols(
topic_df,
cast(GEO_TYPE, geo_level),
cast(SEX_RACE_ETH_AGE_TYPE, breakdown),
condition,
)

topic_dfs.append(topic_df)

df_merged = merge_dfs_list(topic_dfs, merge_cols)

# drop rows that dont include FIPS and DEMO values
df_merged = df_merged[df_merged[fips_col].notna()]
df_merged = ensure_leading_zeros(df_merged, fips_col, fips_length)

return df_merged
Loading

0 comments on commit 1ee298c

Please sign in to comment.