From 68b857f68eeef67a56b495f220d5f95e1613cf5a Mon Sep 17 00:00:00 2001 From: Nora Gilbertson <148926375+n-gilbertson@users.noreply.github.com> Date: Fri, 6 Sep 2024 11:09:31 -0700 Subject: [PATCH] select bias covariates and add default plotting order --- src/bopforge/categorical_pipeline/__main__.py | 66 +++-- .../categorical_pipeline/functions.py | 251 ++++++++++++------ 2 files changed, 202 insertions(+), 115 deletions(-) diff --git a/src/bopforge/categorical_pipeline/__main__.py b/src/bopforge/categorical_pipeline/__main__.py index eaa3a41..1b63265 100644 --- a/src/bopforge/categorical_pipeline/__main__.py +++ b/src/bopforge/categorical_pipeline/__main__.py @@ -4,13 +4,15 @@ from argparse import ArgumentParser from pathlib import Path -import bopforge.categorical_pipeline.functions as functions import numpy as np -from bopforge.utils import fill_dict, ParseKwargs from pplkit.data.interface import DataInterface +import bopforge.categorical_pipeline.functions as functions +from bopforge.utils import ParseKwargs, fill_dict + warnings.filterwarnings("ignore") + def pre_processing(result_folder: Path) -> None: dataif = DataInterface(result=result_folder) name = dataif.result.name @@ -68,8 +70,8 @@ def fit_signal_model(result_folder: Path) -> None: signal_model = functions.get_signal_model(settings, df) signal_model.fit_model(outer_step_size=200, outer_max_iter=100) - df = functions.add_cols(df, signal_model) - df_coef = functions.get_coefs(all_settings, signal_model) + df = functions.convert_bc_to_em(df, signal_model) + df_coef = functions.get_coefs(df, all_settings, signal_model) summary = functions.get_signal_model_summary(name, df, df_coef) @@ -89,38 +91,40 @@ def fit_signal_model(result_folder: Path) -> None: fig.savefig(dataif.result / "signal_model.pdf", bbox_inches="tight") -# def select_bias_covs(result_folder: Path) -> None: -# """Select the bias covariates. In this step, we first fit a linear model to -# get the prior strength of the bias-covariates. And then we use `CovFinder` -# to select important bias-covariates. A summary of the result will be -# generated and store in file `cov_finder_result.yaml`. +def select_bias_covs(result_folder: Path) -> None: + """Select the bias covariates. In this step, we first fit a linear model to + get the prior strength of the bias-covariates. And then we use `CovFinder` + to select important bias-covariates. A summary of the result will be + generated and store in file `cov_finder_result.yaml`. -# Parameters -# ---------- -# dataif -# Data interface in charge of file reading and writing. + Parameters + ---------- + dataif + Data interface in charge of file reading and writing. -# """ -# dataif = DataInterface(result=result_folder) -# name = dataif.result.name + """ + dataif = DataInterface(result=result_folder) + name = dataif.result.name -# df = dataif.load_result(f"{name}.csv") -# df = df[df.is_outlier == 0].copy() + df = dataif.load_result(f"{name}.csv") + df = df[df.is_outlier == 0].copy() -# all_settings = dataif.load_result("settings.yaml") -# settings = all_settings["select_bias_covs"] + all_settings = dataif.load_result("settings.yaml") + settings = all_settings["select_bias_covs"] -# cov_finder_linear_model = dataif.load_result("signal_model.pkl") + cov_finder_linear_model = functions.get_cov_finder_linear_model(df) + cov_finder_linear_model.fit_model() -# cov_finder = functions.get_cov_finder(settings, cov_finder_linear_model) -# cov_finder.select_covs(verbose=True) + cov_finder = functions.get_cov_finder(settings, cov_finder_linear_model) + cov_finder.select_covs(verbose=True) -# cov_finder_result = functions.get_cov_finder_result( -# cov_finder_linear_model, cov_finder -# ) + cov_finder_result = functions.get_cov_finder_result( + cov_finder_linear_model, cov_finder + ) -# dataif.dump_result(cov_finder_result, "cov_finder_result.yaml") -# dataif.dump_result(cov_finder, "cov_finder.pkl") + dataif.dump_result(cov_finder_result, "cov_finder_result.yaml") + dataif.dump_result(cov_finder_linear_model, "cov_finder_linear_model.pkl") + dataif.dump_result(cov_finder, "cov_finder.pkl") # def fit_linear_model(result_folder: Path) -> None: @@ -222,7 +226,11 @@ def run( def main(args=None) -> None: parser = ArgumentParser(description="Categorical burden of proof pipeline.") parser.add_argument( - "-i", "--input", type=os.path.abspath, required=True, help="Input data folder" + "-i", + "--input", + type=os.path.abspath, + required=True, + help="Input data folder", ) parser.add_argument( "-o", diff --git a/src/bopforge/categorical_pipeline/functions.py b/src/bopforge/categorical_pipeline/functions.py index aec9085..7ccdf9e 100644 --- a/src/bopforge/categorical_pipeline/functions.py +++ b/src/bopforge/categorical_pipeline/functions.py @@ -1,12 +1,12 @@ import matplotlib.pyplot as plt import numpy as np import pandas as pd - -# from bopforge.utils import get_beta_info, get_gamma_info from matplotlib.pyplot import Axes, Figure -from mrtool import MRBRT, LinearCatCovModel, MRData +from mrtool import MRBRT, CovFinder, LinearCatCovModel, LinearCovModel, MRData from pandas import DataFrame +from bopforge.utils import get_beta_info, get_gamma_info + def get_signal_model(settings: dict, df: DataFrame) -> MRBRT: """Create signal model for outliers identification and covariate selection @@ -50,29 +50,41 @@ def get_signal_model(settings: dict, df: DataFrame) -> MRBRT: return signal_model -def add_cols(df: DataFrame, signal_model: MRBRT) -> DataFrame: - """Add columns of outlier indicator. +def convert_bc_to_em(df: DataFrame, signal_model: MRBRT) -> DataFrame: + """Convert bias covariate to effect modifier and add one column to indicate + if data points are inliers or outliers. Parameters ---------- df - Data frame that contains the training data. + Data frame contains the training data. signal_model Fitted signal model. Returns ------- DataFrame - DataFrame with additional columns of oulier indicator. + DataFrame with additional columns for effect modifiers and oulier + indicator. """ data = signal_model.data + signal = signal_model.predict(data) is_outlier = (signal_model.w_soln < 0.1).astype(int) df = df.merge( - pd.DataFrame({"seq": data.data_id, "is_outlier": is_outlier}), + pd.DataFrame( + { + "seq": data.data_id, + "signal": signal, + "is_outlier": is_outlier, + } + ), how="outer", on="seq", ) + for col in df.columns: + if col.startswith("cov_"): + df["em" + col[3:]] = df[col] * df["signal"] return df @@ -106,7 +118,130 @@ def get_signal_model_summary( return summary +def get_cov_finder_linear_model(df: DataFrame) -> MRBRT: + """Create the linear model for the CovFinder to determine the strength of + the prior on the bias covariates. + + Parameters + ---------- + df + Data frame that contains the training data, but without the outlier. + + Returns + ------- + MRBRT + The linear model. + + """ + col_covs = ["signal"] + [col for col in df.columns if col.startswith("em_")] + data = MRData() + data.load_df( + df, + col_obs="ln_rr", + col_obs_se="ln_rr_se", + col_covs=col_covs, + col_study_id="study_id", + col_data_id="seq", + ) + cov_models = [ + LinearCovModel("signal", use_re=True), + ] + cov_finder_linear_model = MRBRT(data, cov_models) + + return cov_finder_linear_model + + +def get_cov_finder(settings: dict, cov_finder_linear_model: MRBRT) -> CovFinder: + """Create the instance of CovFinder class. + + Parameters + ---------- + settings + Settings for bias covariate selection. + cov_finder_linear_model + Fitted cov finder linear model. + + Returns + ------- + CovFinder + The instance of the CovFinder class. + + """ + data = cov_finder_linear_model.data + beta_info = get_beta_info(cov_finder_linear_model, cov_name="signal") + + # covariate selection + pre_selected_covs = settings["cov_finder"]["pre_selected_covs"] + if isinstance(pre_selected_covs, str): + pre_selected_covs = [pre_selected_covs] + pre_selected_covs = [ + col.replace("cov_", "em_") for col in pre_selected_covs + ] + if "signal" not in pre_selected_covs: + pre_selected_covs.append("signal") + settings["cov_finder"]["pre_selected_covs"] = pre_selected_covs + candidate_covs = [ + cov_name + for cov_name in data.covs.keys() + if cov_name not in pre_selected_covs + ["intercept"] + ] + settings["cov_finder"] = { + **dict( + num_samples=1000, + power_range=[-4, 4], + power_step_size=0.05, + laplace_threshold=0.00001, + inlier_pct=1.0, + bias_zero=True, + ), + **settings["cov_finder"], + } + cov_finder = CovFinder( + data, + covs=candidate_covs, + beta_gprior_std=0.1 * beta_info[1], + **settings["cov_finder"], + ) + + return cov_finder + + +def get_cov_finder_result( + cov_finder_linear_model: MRBRT, cov_finder: CovFinder +) -> dict: + """Summarize result from bias covariate selection. + + Parameters + ---------- + cov_finder_linear_model + Fitted cov finder linear model. + cov_finder + Fitted cov finder model. + + Returns + ------- + dict + Result summary for bias covariate selection. + + """ + beta_info = get_beta_info(cov_finder_linear_model, cov_name="signal") + selected_covs = [ + cov_name + for cov_name in cov_finder.selected_covs + if cov_name != "signal" + ] + + # save results + cov_finder_result = { + "beta_sd": float(beta_info[1] * 0.1), + "selected_covs": selected_covs, + } + + return cov_finder_result + + def get_coefs( + df: DataFrame, settings: dict, signal_model: MRBRT, ) -> tuple[DataFrame]: @@ -114,6 +249,8 @@ def get_coefs( Parameters ---------- + df + Data frame contains the training data. settings The settings for category order signal_model @@ -130,12 +267,32 @@ def get_coefs( dict(cat=signal_model.cov_models[0].cats, coef=signal_model.beta_soln) ) cat_order = settings["figure"]["cat_order"] + ref_cat = settings["fit_signal_model"]["cat_cov_model"]["ref_cat"] + # Assign reference category from settings or by most common category + if ref_cat: + ref_cat = ref_cat + else: + unique_cats, counts = np.unique( + np.hstack([df.ref_risk_cat, df.alt_risk_cat]), return_counts=True + ) + ref_cat = unique_cats[counts.argmax()] # Order the categories if cat_order: df_coef["cat"] = pd.Categorical( df_coef["cat"], categories=cat_order, ordered=True ) df_coef = df_coef.sort_values("cat").reset_index(drop=True) + else: + # Default behavior: ref_cat first, then order by proximity to ref_cat's coef + ref_coef = df_coef.loc[df_coef["cat"] == ref_cat, "coef"].iloc[0] + df_coef["abs_diff"] = abs(df_coef["coef"] - ref_coef) + df_coef = df_coef.sort_values(by="abs_diff") + df_coef = pd.concat( + [ + df_coef[df_coef["cat"] == ref_cat], + df_coef[df_coef["cat"] != ref_cat], + ] + ).reset_index(drop=True) # Add x ranges num_cats = df_coef["cat"].nunique() df_coef["x_start"] = range(num_cats) @@ -331,84 +488,6 @@ def _plot_data( return ax -# def get_cov_finder(settings: dict, cov_finder_linear_model: MRBRT) -> CovFinder: -# """Create the instance of CovFinder class. - -# Parameters -# ---------- -# settings -# Settings for bias covariate selection. -# cov_finder_linear_model -# Fitted cov finder linear model. - -# Returns -# ------- -# CovFinder -# The instance of the CovFinder class. - -# """ -# data = cov_finder_linear_model.data -# beta_info = get_beta_info(cov_finder_linear_model, cov_name="intercept") - -# # covariate selection -# if "intercept" not in settings["cov_finder"]["pre_selected_covs"]: -# settings["cov_finder"]["pre_selected_covs"].append("intercept") -# candidate_covs = [ -# cov_name -# for cov_name in data.covs.keys() -# if cov_name not in settings["cov_finder"]["pre_selected_covs"] + ["intercept"] -# ] -# settings["cov_finder"] = { -# **dict( -# num_samples=1000, -# power_range=[-4, 4], -# power_step_size=0.05, -# laplace_threshold=0.00001, -# inlier_pct=1.0, -# bias_zero=True, -# ), -# **settings["cov_finder"], -# } -# cov_finder = CovFinder( -# data, -# covs=candidate_covs, -# beta_gprior_std=0.1 * beta_info[1], -# **settings["cov_finder"], -# ) - -# return cov_finder - - -# def get_cov_finder_result(cov_finder_linear_model: MRBRT, cov_finder: MRBRT) -> dict: -# """Summarize result from bias covariate selection. - -# Parameters -# ---------- -# cov_finder_linear_model -# Fitted cov finder linear model. -# cov_finder -# Fitted cov finder model. - -# Returns -# ------- -# dict -# Result summary for bias covariate selection. - -# """ -# beta_info = get_beta_info(cov_finder_linear_model, cov_name="intercept") -# selected_covs = [ -# cov_name for cov_name in cov_finder.selected_covs if cov_name != "intercept" -# ] - -# # save results -# cov_finder_result = { -# "beta_sd": float(beta_info[1] * 0.1), -# "selected_covs": selected_covs, -# } - -# return cov_finder_result - - # def get_linear_model(df: DataFrame, cov_finder_result: dict) -> MRBRT: # """Create linear model for effect.