diff --git a/src/imputation/MoR.py b/src/imputation/MoR.py index c9e4b6763..e31360cb0 100644 --- a/src/imputation/MoR.py +++ b/src/imputation/MoR.py @@ -176,11 +176,15 @@ def filter_for_links(df: pd.DataFrame, is_current: bool) -> pd.DataFrame: # Filter out imputation classes that are missing either "200" or "201" nan_mask = df["imp_class"].str.contains("nan").apply(lambda x: not x) # Select only clear, or equivalently, imp_marker R. - # Exclude PRN cells in the current period. + clear_mask = df["imp_marker"] == "R" + # Exclude instance 0 + ins_mask = df["instance"] > 0 if is_current: - mask = (df["imp_marker"] == "R") & (df["selectiontype"] != "P") & nan_mask + # Exclude PRN cells in the current period. + prn_mask = df["selectiontype"] != "P" + mask = clear_mask & nan_mask & prn_mask & ins_mask else: - mask = (df["imp_marker"] == "R") & nan_mask + mask = clear_mask & nan_mask & ins_mask return df.loc[mask, :] diff --git a/src/imputation/imputation_helpers.py b/src/imputation/imputation_helpers.py index 35fe98178..5a3fdd69e 100644 --- a/src/imputation/imputation_helpers.py +++ b/src/imputation/imputation_helpers.py @@ -82,44 +82,76 @@ def create_notnull_mask(df: pd.DataFrame, col: str) -> pd.Series: return df[col].str.len() > 0 -def create_mask(df: pd.DataFrame, options: List) -> pd.Series: - """Create a dataframe mask based on listed options - retrun Bool column. +def create_mask(df: pd.DataFrame, options: List[str]) -> pd.Series: + """Create a dataframe mask based on listed options - return Bool column. Options include: - 'clear_status': rows with one of the clear statuses - 'instance_zero': rows with instance = 0 - 'instance_nonzero': rows with instance != 0 - - 'no_r_and_d' : rows where q604 = 'No' + - 'no_r_and_d': rows where q604 = 'No' - 'postcode_only': rows in which there are no numeric values, only postcodes. - """ - clear_mask = df["status"].isin(["Clear", "Clear - overridden"]) - instance_mask = df.instance == 0 - no_r_and_d_mask = df["604"] == "No" - postcode_only_mask = df["211"].isnull() & ~df["601"].isnull() - - # Set initial values for the mask series as a column in the dataframe - df = df.copy() - df["mask_col"] = False - - if "clear_status" in options: - df["mask_col"] = df["mask_col"] & clear_mask + - 'excl_postcode_only': rows excluding those with only postcodes. + - 'exclude_nan_classes': rows excluding those with "nan" in the imp_class col. + - 'prn_only': PRN rows, ie, rows with selectiontype = 'P' + - 'census_only': Census rows, ie, rows with selectiontype 'C' + - 'longform_only': Longform rows, ie, rows with formtype = '0001' + - 'shortform_only': Shortform rows, ie, rows with formtype = '0006' + - 'bad_status': rows with a status that is not in the clear statuses + - 'mor_imputed' : rows with an imp_marker of 'MoR' or 'CF' + - 'not_mor_imputed' : rows without an imp_marker of 'MoR' or 'CF' - if "instance_zero" in options: - df["mask_col"] = df["mask_col"] & instance_mask - - elif "instance_nonzero" in options: - df["mask_col"] = df["mask_col"] & ~instance_mask - - if "no_r_and_d" in options: - df["mask_col"] = df["mask_col"] & no_r_and_d_mask + Args: + df (pd.DataFrame): The input dataframe. + options (List[str]): List of options to create the mask. - if "postcode_only" in options: - df["mask_col"] = df["mask_col"] & postcode_only_mask + Returns: + pd.Series: Boolean mask based on the options. + """ + df = df.copy() # Ensure the original DataFrame is not modified + + # Define masks for each option + masks = { + "clear_status": df["status"].isin(["Clear", "Clear - overridden"]), + "instance_zero": df.instance == 0, + "instance_nonzero": df.instance > 0, + "no_r_and_d": df["604"] == "No", + "postcode_only": df["211"].isnull() & df["601"].notnull(), + "excl_postcode_only": ~(df["211"].isnull() & df["601"].notnull()), + "exclude_nan_classes": ~df["imp_class"].str.contains("nan", na=True), + "prn_only": df["selectiontype"] == "P", + "census_only": df["selectiontype"] == "C", + "longform_only": df["formtype"] == "0001", + "shortform_only": df["formtype"] == "0006", + "bad_status": df["status"].isin(["Check needed", "Form sent out"]), + "mor_imputed": df["imp_marker"].isin(["MoR", "CF"]), + "not_mor_imputed": ~df["imp_marker"].isin(["MoR", "CF"]), + } + + # Initialize the mask to True + mask = pd.Series(True, index=df.index) + + # Apply the masks based on the options + for option in options: + if option in masks: + mask &= masks[option] + + return mask + + +def special_filter(df: pd.DataFrame, options: List[str]) -> pd.DataFrame: + """Filter the dataframe based on a list of options commonly used in the pipeline. - if "excl_postcode_only" in options: - df["mask_col"] = df["mask_col"] & ~postcode_only_mask + Args: + df (pd.DataFrame): The input dataframe. + options (List[str]): List of options to filter the dataframe. - return df["mask_col"] + Returns: + pd.DataFrame: The filtered dataframe. + """ + mask = create_mask(df, options) + df = df.copy().loc[mask] + return df def instance_fix(df: pd.DataFrame): @@ -281,6 +313,9 @@ def create_r_and_d_instance( # Ensure that in the case longforms with "no R&D" we only have one row df, mult_604_qa_df = fix_604_error(df) + # In the case where there is "no R&D", we create a copy of instance 0 + # and update to instance = 1. In this way we create an "instance 1" which we can + # popultae with zeros for imputation purposes (see docstring above). no_rd_mask = (df.formtype == "0001") & (df["604"] == "No") filtered_df = df.copy().loc[no_rd_mask] filtered_df["instance"] = 1 diff --git a/src/imputation/imputation_main.py b/src/imputation/imputation_main.py index 1997eaca7..4da062436 100644 --- a/src/imputation/imputation_main.py +++ b/src/imputation/imputation_main.py @@ -127,15 +127,14 @@ def run_imputation( # noqa: C901 links_filename = filename_amender("links_qa", config) trimmed_counts_filename = filename_amender("tmi_trim_count_qa", config) - if config["survey"]["survey_type"] == "BERD": - # create trimming qa dataframe with required columns from schema - schema_path = config["schema_paths"]["manual_trimming_schema"] - schema_dict = load_schema(schema_path) - trimming_qa_output = create_output_df(qa_df, schema_dict) - - write_csv(os.path.join(qa_path, trim_qa_filename), trimming_qa_output) - write_csv(os.path.join(qa_path, trimmed_counts_filename), trim_counts_qa) - write_csv(os.path.join(qa_path, wrong_604_filename), wrong_604_qa_df) + # create trimming qa dataframe with required columns from schema + schema_path = config["schema_paths"]["manual_trimming_schema"] + schema_dict = load_schema(schema_path) + trimming_qa_output = create_output_df(qa_df, schema_dict) + + write_csv(os.path.join(qa_path, trim_qa_filename), trimming_qa_output) + write_csv(os.path.join(qa_path, trimmed_counts_filename), trim_counts_qa) + write_csv(os.path.join(qa_path, wrong_604_filename), wrong_604_qa_df) write_csv(os.path.join(qa_path, full_imp_filename), imputed_df) if backdata is not None: diff --git a/src/imputation/tmi_imputation.py b/src/imputation/tmi_imputation.py index ba484fad9..5be37dff5 100644 --- a/src/imputation/tmi_imputation.py +++ b/src/imputation/tmi_imputation.py @@ -199,13 +199,13 @@ def create_mean_dict( # Create an empty dict to store means mean_dict = dict.fromkeys(target_variable_list) - # Filter for clear statuses - clear_statuses = ["Clear", "Clear - overridden"] - - filtered_df = df.loc[df["status"].isin(clear_statuses)] - - # Filter out imputation classes that are missing either "200" or "201" - filtered_df = filtered_df[~(filtered_df["imp_class"].str.contains("nan"))] + filter_conditions_list = [ + "clear_status", + "instance_nonzero", + "exclude_nan_classes", + "excl_postcode_only", + ] + filtered_df = hlp.special_filter(df, filter_conditions_list) # Group by imp_class grp = filtered_df.groupby("imp_class") @@ -264,14 +264,8 @@ def apply_tmi( Returns: pd.DataFrame: The passed dataframe with TMI imputation applied. """ - df = df.copy() - - filtered_df = df.loc[df["status"].isin(["Form sent out", "Check needed"])] - - # Filter out any cases where 200 or 201 are missing from the imputation class - # This ensures that means are calculated using only valid imputation classes - # Since imp_class is string type, any entry containing "nan" is excluded. - filtered_df = filtered_df[~(filtered_df["imp_class"].str.contains("nan"))] + conditions_mask_list = ["bad_status", "instance_nonzero", "exclude_nan_classes"] + filtered_df = hlp.special_filter(df, conditions_mask_list) grp = filtered_df.groupby("imp_class") class_keys = list(grp.groups.keys()) @@ -343,7 +337,7 @@ def run_longform_tmi( def run_shortform_tmi( - shortform_df: pd.DataFrame, + to_impute_df: pd.DataFrame, config: Dict[str, Any], ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: """Function to run shortform TMI imputation. @@ -361,11 +355,6 @@ def run_shortform_tmi( sf_target_variables = list(config["breakdowns"]) - # logic to identify Census rows, only these will be used for shortform TMI - census_mask = shortform_df["selectiontype"] == "C" - to_impute_df = shortform_df.copy().loc[census_mask] - not_imputed_df = shortform_df.copy().loc[~census_mask] - mean_dict, qa_df, trim_counts_qa = create_mean_dict( to_impute_df, sf_target_variables, config ) @@ -381,13 +370,43 @@ def run_shortform_tmi( tmi_df.loc[qa_df.index, "211_trim"] = qa_df["211_trim"] tmi_df.loc[qa_df.index, "305_trim"] = qa_df["305_trim"] - # create imputation classes for shortform entries not imputed (selectiontype 'P') - not_imputed_df = hlp.create_imp_class_col(not_imputed_df, ["200", "201"]) - # concatinate qa dataframes from short forms and long forms - shortforms_updated_df = hlp.concat_with_bool([tmi_df, not_imputed_df]) - TMILogger.info("TMI imputation completed.") - return shortforms_updated_df, qa_df, trim_counts_qa + return tmi_df, qa_df, trim_counts_qa + + +def tmi_prep( + full_df: pd.DataFrame, + config: Dict[str, Any], +) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: + """Return dataframes for longform and shortform imputation and for excluded rows. + + Args: + full_df (pd.DataFrame): The full responses dataframe. + config (Dict): the configuration settings. + + Returns: + Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: + longform_df: A dataframe with longform rows to be imputed. + shortform_df: A dataframe with shortform rows to be imputed. + excluded_df: A dataframe with rows that do not need to be imputed. + """ + # logic to identify rows that do not need to be imputed + mor_mask = hlp.create_mask(full_df, ["mor_imputed"]) + prn_mask = hlp.create_mask(full_df, ["prn_only"]) + excluded_df = full_df.copy().loc[mor_mask | prn_mask] + + # create a dataframe for longform rows to be imputed + longform_df = hlp.special_filter(full_df, ["longform_only", "not_mor_imputed"]) + + # create a dataframe for shortform rows to be imputed if the survey is BERD + if config["survey"]["survey_type"] == "BERD": + shortform_df = hlp.special_filter( + full_df, ["shortform_only", "not_mor_imputed", "census_only"] + ) + else: + shortform_df = pd.DataFrame() + + return longform_df, shortform_df, excluded_df def run_tmi( @@ -406,17 +425,8 @@ def run_tmi( qa_df: QA dataframe. trim_counts (pd.DataFrame): The qa dataframe for trim counts. """ - # logic to identify rows that have had MoR or CF applied, - # these should be excluded from TMI - mor_mask = full_df["imp_marker"].isin(["CF", "MoR"]) - # create dataframe for all the rows excluded from TMI - excluded_df = full_df.copy().loc[mor_mask] - - # create logic to select rows for longform and shortform TMI - long_tmi_mask = (full_df["formtype"] == formtype_long) & ~mor_mask - - # create dataframes to be used for longform TMI - longform_df = full_df.copy().loc[long_tmi_mask] + TMILogger.info("Starting TMI imputation.") + longform_df, shortform_df, excluded_df = tmi_prep(full_df, config) # apply TMI imputation to short forms for the BERD survey (but not PNP) if config["survey"]["survey_type"] == "BERD": @@ -431,9 +441,6 @@ def run_tmi( longform_df, config ) - short_tmi_mask = (full_df["formtype"] == formtype_short) & ~mor_mask - shortform_df = full_df.copy().loc[short_tmi_mask] - shortform_tmi_df, qa_df_short, s_trim_counts = run_shortform_tmi( shortform_df, config ) @@ -443,13 +450,14 @@ def run_tmi( # concatinate qa dataframes from short forms and long forms full_qa_df = hlp.concat_with_bool([qa_df_long, qa_df_short]) - else: + trim_counts = hlp.concat_with_bool([l_trim_counts, s_trim_counts]) + + elif config["survey"]["survey_type"] == "PNP": # apply TMI imputation to PNP long forms - longform_tmi_df, qa_df_long, l_trim_counts = run_longform_tmi( - longform_df, config - ) + longform_tmi_df, full_qa_df, trim_counts = run_longform_tmi(longform_df, config) full_df = hlp.concat_with_bool([longform_tmi_df, excluded_df]) - full_qa_df = qa_df_long + # add extra cols to compenste for the missing short form columns in PNP + full_qa_df[[["emp_total_trim", "headcount_total_trim"]]] = False full_df = full_df.sort_values( ["reference", "instance"], ascending=[True, True] @@ -479,10 +487,6 @@ def run_tmi( ] full_qa_df = full_qa_df[qa_cols] - if config["survey"]["survey_type"] == "BERD": - trim_counts = hlp.concat_with_bool([l_trim_counts, s_trim_counts]) - else: - trim_counts = l_trim_counts # group by imputation class and format data trim_counts = ( trim_counts.groupby(["imp_class", "formtype", "clear_class_size"]) diff --git a/src/user_config.yaml b/src/user_config.yaml index 5a5d4ced2..ec9a89bb0 100644 --- a/src/user_config.yaml +++ b/src/user_config.yaml @@ -2,7 +2,7 @@ config_validation: validate: True path: src/user_config_schema.yaml survey: - survey_type: "PNP" + survey_type: "BERD" survey_year: 2023 global: # Staging and validation settings @@ -37,7 +37,7 @@ global: # Final output settings output_long_form: False output_short_form: False - output_gb_sas: False + output_gb_sas: True output_ni_sas: False output_tau: False output_intram_by_pg_gb: False @@ -48,8 +48,8 @@ global: output_intram_by_sic: False output_fte_total_qa: False output_status_filtered: False - output_frozen_group: False - output_intram_totals: False + output_frozen_group: True + output_intram_totals: True s3_paths: root: "/bat/res_dev/project_data/" # staging input paths diff --git a/tests/test_imputation/test_imputation_helpers.py b/tests/test_imputation/test_imputation_helpers.py index 15a027fd9..2627c5058 100644 --- a/tests/test_imputation/test_imputation_helpers.py +++ b/tests/test_imputation/test_imputation_helpers.py @@ -12,6 +12,8 @@ create_imp_class_col, imputation_marker, concat_with_bool, + create_mask, + special_filter, ) @@ -508,3 +510,156 @@ def test_concat_with_bool(self): result_df = concat_with_bool([df1, df2, df3]) # ignore the order of the columns assert_frame_equal(result_df.reset_index(drop=True), expected_df, check_like=True) + + +class TestCreateMask: + """Unit tests for create_mask function.""" + + def create_input_df(self): + """Create an input dataframe for the test.""" + input_cols = [ + "reference", + "instance", + "imp_class", + "imp_marker", + "211", + "601", + "604", + "status", + "formtype", + "selectiontype", + ] + + data = [ + [111, 0, "nan_A", "CF", np.nan, None, "Yes", "Check needed", "0001", "C"], + [111, 1, "C_A", "MoR", 1, None, None, "Check needed", "0001", "C"], + [222, 0, "nan_A", "R", np.nan, None, "No", "Clear", "0001", "C"], + [222, 1, "C_A", "R", 1, "CB1 2NF", "No", "Clear", "0001", "C"], + [222, 2, "C_A", "R", np.nan, "BA1 5DA", "No", "Clear", "0001", "C"], + [333, np.nan, None, "R", np.nan, None, "No", "Form sent out", "0006", "P"], + ] + + input_df = pd.DataFrame(data=data, columns=input_cols) + return input_df + + def test_clear_status(self): + df = self.create_input_df() + options = ["clear_status"] + expected_mask = pd.Series([False, False, True, True, True, False]) + result_mask = create_mask(df, options) + assert_series_equal(result_mask, expected_mask) + + def test_bad_status(self): + df = self.create_input_df() + options = ["bad_status"] + expected_mask = pd.Series([True, True, False, False, False, True]) + result_mask = create_mask(df, options) + assert_series_equal(result_mask, expected_mask) + + def test_instance_zero(self): + df = self.create_input_df() + options = ["instance_zero"] + expected_mask = pd.Series([True, False, True, False, False, False]) + result_mask = create_mask(df, options) + assert_series_equal(result_mask, expected_mask) + + def test_instance_nonzero(self): + df = self.create_input_df() + options = ["instance_nonzero"] + expected_mask = pd.Series([False, True, False, True, True, False]) + result_mask = create_mask(df, options) + assert_series_equal(result_mask, expected_mask) + + def test_no_r_and_d(self): + df = self.create_input_df() + options = ["no_r_and_d"] + expected_mask = pd.Series([False, False, True, True, True, True]) + result_mask = create_mask(df, options) + assert_series_equal(result_mask, expected_mask) + + def test_postcode_only(self): + df = self.create_input_df() + options = ["postcode_only"] + expected_mask = pd.Series([False, False, False, False, True, False]) + result_mask = create_mask(df, options) + assert_series_equal(result_mask, expected_mask) + + def test_excl_postcode_only(self): + df = self.create_input_df() + options = ["excl_postcode_only"] + expected_mask = pd.Series([True, True, True, True, False, True]) + result_mask = create_mask(df, options) + assert_series_equal(result_mask, expected_mask) + + def test_clear_instance_zero(self): + df = self.create_input_df() + options = ["clear_status", "instance_zero"] + expected_mask = pd.Series([False, False, True, False, False, False]) + result_mask = create_mask(df, options) + assert_series_equal(result_mask, expected_mask) + + def test_exclude_nan_classes(self): + df = self.create_input_df() + options = ["exclude_nan_classes"] + expected_mask = pd.Series([False, True, False, True, True, False]) + result_mask = create_mask(df, options) + assert_series_equal(result_mask, expected_mask) + + def test_clear_instance_nonzero(self): + df = self.create_input_df() + options = ["clear_status", "instance_nonzero"] + expected_mask = pd.Series([False, False, False, True, True, False]) + result_mask = create_mask(df, options) + assert_series_equal(result_mask, expected_mask) + + def test_clear_instance_nonzero_exclude_nan_classes(self): + df = self.create_input_df() + options = ["clear_status", "instance_nonzero", "exclude_nan_classes"] + expected_mask = pd.Series([False, False, False, True, True, False]) + result_mask = create_mask(df, options) + assert_series_equal(result_mask, expected_mask) + + def test_clear_longfom_instance_nonzero(self): + df = self.create_input_df() + options = ["clear_status", "instance_nonzero", "longform"] + expected_mask = pd.Series([False, False, False, True, True, False]) + result_mask = create_mask(df, options) + assert_series_equal(result_mask, expected_mask) + + def test_not_mor_imputed_longform(self): + df = self.create_input_df() + options = ["not_mor_imputed", "longform_only"] + expected_mask = pd.Series([False, False, True, True, True, False]) + result_mask = create_mask(df, options) + assert_series_equal(result_mask, expected_mask) + +class TestSpecialFilter: + """Tests for the SpecialFilter function.""" + def create_input_df(self): + """Create an input dataframe for the test.""" + input_cols = [ + "reference", + "instance", + "imp_class", + "211", + "601", + "604", + "status", + "formtype", + "selectiontype", + ] + + data = [ + [111, 0, "nan_A", np.nan, None, "Yes", "Clear", "0001", "C"], + [111, 1, "C_A", 1, None, None, "Clear - overridden", "0001", "C"], + [222, 0, "nan_A", np.nan, None, None, "Clear", "0001", "C"], + [222, 1, "C_A", 1, "CB1 2NF", "No", "Clear", "0001", "C"], + [222, 2, "C_A", np.nan, "BA1 5DA", "No", "Clear", "0001", "C"], + [333, np.nan, "nan_A", np.nan, None, "No", "Form sent out", "0006", "P"], + ] + + input_df = pd.DataFrame(data=data, columns=input_cols) + return input_df + + def test_special_filter_create_mean_case(self): + filter_conditions_list = ["clear_status", "instance_nonzero", "exclude_nan_classes"]