Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix qa output for PNP #417

Merged
merged 5 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/imputation/MoR.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,15 @@ def filter_for_links(df: pd.DataFrame, is_current: bool) -> pd.DataFrame:
# Filter out imputation classes that are missing either "200" or "201"
nan_mask = df["imp_class"].str.contains("nan").apply(lambda x: not x)
# Select only clear, or equivalently, imp_marker R.
# Exclude PRN cells in the current period.
clear_mask = df["imp_marker"] == "R"
# Exclude instance 0
ins_mask = df["instance"] > 0
if is_current:
mask = (df["imp_marker"] == "R") & (df["selectiontype"] != "P") & nan_mask
# Exclude PRN cells in the current period.
prn_mask = df["selectiontype"] != "P"
mask = clear_mask & nan_mask & prn_mask & ins_mask
else:
mask = (df["imp_marker"] == "R") & nan_mask
mask = clear_mask & nan_mask & ins_mask

return df.loc[mask, :]

Expand Down
91 changes: 63 additions & 28 deletions src/imputation/imputation_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,44 +82,76 @@ def create_notnull_mask(df: pd.DataFrame, col: str) -> pd.Series:
return df[col].str.len() > 0


def create_mask(df: pd.DataFrame, options: List) -> pd.Series:
"""Create a dataframe mask based on listed options - retrun Bool column.
def create_mask(df: pd.DataFrame, options: List[str]) -> pd.Series:
"""Create a dataframe mask based on listed options - return Bool column.

Options include:
- 'clear_status': rows with one of the clear statuses
- 'instance_zero': rows with instance = 0
- 'instance_nonzero': rows with instance != 0
- 'no_r_and_d' : rows where q604 = 'No'
- 'no_r_and_d': rows where q604 = 'No'
- 'postcode_only': rows in which there are no numeric values, only postcodes.
"""
clear_mask = df["status"].isin(["Clear", "Clear - overridden"])
instance_mask = df.instance == 0
no_r_and_d_mask = df["604"] == "No"
postcode_only_mask = df["211"].isnull() & ~df["601"].isnull()

# Set initial values for the mask series as a column in the dataframe
df = df.copy()
df["mask_col"] = False

if "clear_status" in options:
df["mask_col"] = df["mask_col"] & clear_mask
- 'excl_postcode_only': rows excluding those with only postcodes.
- 'exclude_nan_classes': rows excluding those with "nan" in the imp_class col.
- 'prn_only': PRN rows, ie, rows with selectiontype = 'P'
- 'census_only': Census rows, ie, rows with selectiontype 'C'
- 'longform_only': Longform rows, ie, rows with formtype = '0001'
- 'shortform_only': Shortform rows, ie, rows with formtype = '0006'
- 'bad_status': rows with a status that is not in the clear statuses
- 'mor_imputed' : rows with an imp_marker of 'MoR' or 'CF'
- 'not_mor_imputed' : rows without an imp_marker of 'MoR' or 'CF'

if "instance_zero" in options:
df["mask_col"] = df["mask_col"] & instance_mask

elif "instance_nonzero" in options:
df["mask_col"] = df["mask_col"] & ~instance_mask

if "no_r_and_d" in options:
df["mask_col"] = df["mask_col"] & no_r_and_d_mask
Args:
df (pd.DataFrame): The input dataframe.
options (List[str]): List of options to create the mask.

if "postcode_only" in options:
df["mask_col"] = df["mask_col"] & postcode_only_mask
Returns:
pd.Series: Boolean mask based on the options.
"""
df = df.copy() # Ensure the original DataFrame is not modified

# Define masks for each option
masks = {
"clear_status": df["status"].isin(["Clear", "Clear - overridden"]),
"instance_zero": df.instance == 0,
"instance_nonzero": df.instance > 0,
"no_r_and_d": df["604"] == "No",
"postcode_only": df["211"].isnull() & df["601"].notnull(),
"excl_postcode_only": ~(df["211"].isnull() & df["601"].notnull()),
"exclude_nan_classes": ~df["imp_class"].str.contains("nan", na=True),
"prn_only": df["selectiontype"] == "P",
"census_only": df["selectiontype"] == "C",
"longform_only": df["formtype"] == "0001",
"shortform_only": df["formtype"] == "0006",
"bad_status": df["status"].isin(["Check needed", "Form sent out"]),
"mor_imputed": df["imp_marker"].isin(["MoR", "CF"]),
"not_mor_imputed": ~df["imp_marker"].isin(["MoR", "CF"]),
}

# Initialize the mask to True
mask = pd.Series(True, index=df.index)

# Apply the masks based on the options
for option in options:
if option in masks:
mask &= masks[option]

return mask


def special_filter(df: pd.DataFrame, options: List[str]) -> pd.DataFrame:
"""Filter the dataframe based on a list of options commonly used in the pipeline.

if "excl_postcode_only" in options:
df["mask_col"] = df["mask_col"] & ~postcode_only_mask
Args:
df (pd.DataFrame): The input dataframe.
options (List[str]): List of options to filter the dataframe.

return df["mask_col"]
Returns:
pd.DataFrame: The filtered dataframe.
"""
mask = create_mask(df, options)
df = df.copy().loc[mask]
return df


def instance_fix(df: pd.DataFrame):
Expand Down Expand Up @@ -281,6 +313,9 @@ def create_r_and_d_instance(
# Ensure that in the case longforms with "no R&D" we only have one row
df, mult_604_qa_df = fix_604_error(df)

# In the case where there is "no R&D", we create a copy of instance 0
# and update to instance = 1. In this way we create an "instance 1" which we can
# popultae with zeros for imputation purposes (see docstring above).
no_rd_mask = (df.formtype == "0001") & (df["604"] == "No")
filtered_df = df.copy().loc[no_rd_mask]
filtered_df["instance"] = 1
Expand Down
17 changes: 8 additions & 9 deletions src/imputation/imputation_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,14 @@ def run_imputation( # noqa: C901
links_filename = filename_amender("links_qa", config)
trimmed_counts_filename = filename_amender("tmi_trim_count_qa", config)

if config["survey"]["survey_type"] == "BERD":
# create trimming qa dataframe with required columns from schema
schema_path = config["schema_paths"]["manual_trimming_schema"]
schema_dict = load_schema(schema_path)
trimming_qa_output = create_output_df(qa_df, schema_dict)

write_csv(os.path.join(qa_path, trim_qa_filename), trimming_qa_output)
write_csv(os.path.join(qa_path, trimmed_counts_filename), trim_counts_qa)
write_csv(os.path.join(qa_path, wrong_604_filename), wrong_604_qa_df)
# create trimming qa dataframe with required columns from schema
schema_path = config["schema_paths"]["manual_trimming_schema"]
schema_dict = load_schema(schema_path)
trimming_qa_output = create_output_df(qa_df, schema_dict)

write_csv(os.path.join(qa_path, trim_qa_filename), trimming_qa_output)
write_csv(os.path.join(qa_path, trimmed_counts_filename), trim_counts_qa)
write_csv(os.path.join(qa_path, wrong_604_filename), wrong_604_qa_df)

write_csv(os.path.join(qa_path, full_imp_filename), imputed_df)
if backdata is not None:
Expand Down
104 changes: 54 additions & 50 deletions src/imputation/tmi_imputation.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,13 @@ def create_mean_dict(
# Create an empty dict to store means
mean_dict = dict.fromkeys(target_variable_list)

# Filter for clear statuses
clear_statuses = ["Clear", "Clear - overridden"]

filtered_df = df.loc[df["status"].isin(clear_statuses)]

# Filter out imputation classes that are missing either "200" or "201"
filtered_df = filtered_df[~(filtered_df["imp_class"].str.contains("nan"))]
filter_conditions_list = [
"clear_status",
"instance_nonzero",
"exclude_nan_classes",
"excl_postcode_only",
]
filtered_df = hlp.special_filter(df, filter_conditions_list)

# Group by imp_class
grp = filtered_df.groupby("imp_class")
Expand Down Expand Up @@ -264,14 +264,8 @@ def apply_tmi(
Returns:
pd.DataFrame: The passed dataframe with TMI imputation applied.
"""
df = df.copy()

filtered_df = df.loc[df["status"].isin(["Form sent out", "Check needed"])]

# Filter out any cases where 200 or 201 are missing from the imputation class
# This ensures that means are calculated using only valid imputation classes
# Since imp_class is string type, any entry containing "nan" is excluded.
filtered_df = filtered_df[~(filtered_df["imp_class"].str.contains("nan"))]
conditions_mask_list = ["bad_status", "instance_nonzero", "exclude_nan_classes"]
filtered_df = hlp.special_filter(df, conditions_mask_list)

grp = filtered_df.groupby("imp_class")
class_keys = list(grp.groups.keys())
Expand Down Expand Up @@ -343,7 +337,7 @@ def run_longform_tmi(


def run_shortform_tmi(
shortform_df: pd.DataFrame,
to_impute_df: pd.DataFrame,
config: Dict[str, Any],
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Function to run shortform TMI imputation.
Expand All @@ -361,11 +355,6 @@ def run_shortform_tmi(

sf_target_variables = list(config["breakdowns"])

# logic to identify Census rows, only these will be used for shortform TMI
census_mask = shortform_df["selectiontype"] == "C"
to_impute_df = shortform_df.copy().loc[census_mask]
not_imputed_df = shortform_df.copy().loc[~census_mask]

mean_dict, qa_df, trim_counts_qa = create_mean_dict(
to_impute_df, sf_target_variables, config
)
Expand All @@ -381,13 +370,43 @@ def run_shortform_tmi(
tmi_df.loc[qa_df.index, "211_trim"] = qa_df["211_trim"]
tmi_df.loc[qa_df.index, "305_trim"] = qa_df["305_trim"]

# create imputation classes for shortform entries not imputed (selectiontype 'P')
not_imputed_df = hlp.create_imp_class_col(not_imputed_df, ["200", "201"])
# concatinate qa dataframes from short forms and long forms
shortforms_updated_df = hlp.concat_with_bool([tmi_df, not_imputed_df])

TMILogger.info("TMI imputation completed.")
return shortforms_updated_df, qa_df, trim_counts_qa
return tmi_df, qa_df, trim_counts_qa


def tmi_prep(
full_df: pd.DataFrame,
config: Dict[str, Any],
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Return dataframes for longform and shortform imputation and for excluded rows.

Args:
full_df (pd.DataFrame): The full responses dataframe.
config (Dict): the configuration settings.

Returns:
Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
longform_df: A dataframe with longform rows to be imputed.
shortform_df: A dataframe with shortform rows to be imputed.
excluded_df: A dataframe with rows that do not need to be imputed.
"""
# logic to identify rows that do not need to be imputed
mor_mask = hlp.create_mask(full_df, ["mor_imputed"])
prn_mask = hlp.create_mask(full_df, ["prn_only"])
excluded_df = full_df.copy().loc[mor_mask | prn_mask]

# create a dataframe for longform rows to be imputed
longform_df = hlp.special_filter(full_df, ["longform_only", "not_mor_imputed"])

# create a dataframe for shortform rows to be imputed if the survey is BERD
if config["survey"]["survey_type"] == "BERD":
shortform_df = hlp.special_filter(
full_df, ["shortform_only", "not_mor_imputed", "census_only"]
)
else:
shortform_df = pd.DataFrame()

return longform_df, shortform_df, excluded_df


def run_tmi(
Expand All @@ -406,17 +425,8 @@ def run_tmi(
qa_df: QA dataframe.
trim_counts (pd.DataFrame): The qa dataframe for trim counts.
"""
# logic to identify rows that have had MoR or CF applied,
# these should be excluded from TMI
mor_mask = full_df["imp_marker"].isin(["CF", "MoR"])
# create dataframe for all the rows excluded from TMI
excluded_df = full_df.copy().loc[mor_mask]

# create logic to select rows for longform and shortform TMI
long_tmi_mask = (full_df["formtype"] == formtype_long) & ~mor_mask

# create dataframes to be used for longform TMI
longform_df = full_df.copy().loc[long_tmi_mask]
TMILogger.info("Starting TMI imputation.")
longform_df, shortform_df, excluded_df = tmi_prep(full_df, config)

# apply TMI imputation to short forms for the BERD survey (but not PNP)
if config["survey"]["survey_type"] == "BERD":
Expand All @@ -431,9 +441,6 @@ def run_tmi(
longform_df, config
)

short_tmi_mask = (full_df["formtype"] == formtype_short) & ~mor_mask
shortform_df = full_df.copy().loc[short_tmi_mask]

shortform_tmi_df, qa_df_short, s_trim_counts = run_shortform_tmi(
shortform_df, config
)
Expand All @@ -443,13 +450,14 @@ def run_tmi(
# concatinate qa dataframes from short forms and long forms
full_qa_df = hlp.concat_with_bool([qa_df_long, qa_df_short])

else:
trim_counts = hlp.concat_with_bool([l_trim_counts, s_trim_counts])

elif config["survey"]["survey_type"] == "PNP":
# apply TMI imputation to PNP long forms
longform_tmi_df, qa_df_long, l_trim_counts = run_longform_tmi(
longform_df, config
)
longform_tmi_df, full_qa_df, trim_counts = run_longform_tmi(longform_df, config)
full_df = hlp.concat_with_bool([longform_tmi_df, excluded_df])
full_qa_df = qa_df_long
# add extra cols to compenste for the missing short form columns in PNP
full_qa_df[[["emp_total_trim", "headcount_total_trim"]]] = False

full_df = full_df.sort_values(
["reference", "instance"], ascending=[True, True]
Expand Down Expand Up @@ -479,10 +487,6 @@ def run_tmi(
]
full_qa_df = full_qa_df[qa_cols]

if config["survey"]["survey_type"] == "BERD":
trim_counts = hlp.concat_with_bool([l_trim_counts, s_trim_counts])
else:
trim_counts = l_trim_counts
# group by imputation class and format data
trim_counts = (
trim_counts.groupby(["imp_class", "formtype", "clear_class_size"])
Expand Down
8 changes: 4 additions & 4 deletions src/user_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ config_validation:
validate: True
path: src/user_config_schema.yaml
survey:
survey_type: "PNP"
survey_type: "BERD"
survey_year: 2023
global:
# Staging and validation settings
Expand Down Expand Up @@ -37,7 +37,7 @@ global:
# Final output settings
output_long_form: False
output_short_form: False
output_gb_sas: False
output_gb_sas: True
output_ni_sas: False
output_tau: False
output_intram_by_pg_gb: False
Expand All @@ -48,8 +48,8 @@ global:
output_intram_by_sic: False
output_fte_total_qa: False
output_status_filtered: False
output_frozen_group: False
output_intram_totals: False
output_frozen_group: True
output_intram_totals: True
s3_paths:
root: "/bat/res_dev/project_data/"
# staging input paths
Expand Down
Loading
Loading