diff --git a/.github/workflows/lint-format.yaml b/.github/workflows/lint-format.yaml index d6b49c37..3c2ea9b7 100644 --- a/.github/workflows/lint-format.yaml +++ b/.github/workflows/lint-format.yaml @@ -25,9 +25,9 @@ jobs: - name: Archive production artifacts if: ${{ success() }} || ${{ failure() }} - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v3 with: name: MegaLinter reports path: | megalinter-reports - mega-linter.log \ No newline at end of file + mega-linter.log diff --git a/.pylintrc b/.pylintrc index f1e2a61c..4ab81c8c 100644 --- a/.pylintrc +++ b/.pylintrc @@ -371,8 +371,8 @@ min-public-methods=2 [EXCEPTIONS] # Exceptions that will emit a warning when caught. -overgeneral-exceptions=BaseException, - Exception +overgeneral-exceptions=builtins.BaseException, + builtins.Exception [FORMAT] diff --git a/causal_testing/estimation/abstract_estimator.py b/causal_testing/estimation/abstract_estimator.py new file mode 100644 index 00000000..47ab1efe --- /dev/null +++ b/causal_testing/estimation/abstract_estimator.py @@ -0,0 +1,73 @@ +"""This module contains the Estimator abstract class""" + +import logging +from abc import ABC, abstractmethod +from typing import Any + +import pandas as pd + +logger = logging.getLogger(__name__) + + +class Estimator(ABC): + # pylint: disable=too-many-instance-attributes + """An estimator contains all of the information necessary to compute a causal estimate for the effect of changing + a set of treatment variables to a set of values. + + All estimators must implement the following two methods: + + 1) add_modelling_assumptions: The validity of a model-assisted causal inference result depends on whether + the modelling assumptions imposed by a model actually hold. Therefore, for each model, is important to state + the modelling assumption upon which the validity of the results depend. To achieve this, the estimator object + maintains a list of modelling assumptions (as strings). If a user wishes to implement their own estimator, they + must implement this method and add all assumptions to the list of modelling assumptions. + + 2) estimate_ate: All estimators must be capable of returning the average treatment effect as a minimum. That is, the + average effect of the intervention (changing treatment from control to treated value) on the outcome of interest + adjusted for all confounders. + """ + + def __init__( + # pylint: disable=too-many-arguments + self, + treatment: str, + treatment_value: float, + control_value: float, + adjustment_set: set, + outcome: str, + df: pd.DataFrame = None, + effect_modifiers: dict[str:Any] = None, + alpha: float = 0.05, + query: str = "", + ): + self.treatment = treatment + self.treatment_value = treatment_value + self.control_value = control_value + self.adjustment_set = adjustment_set + self.outcome = outcome + self.alpha = alpha + self.df = df.query(query) if query else df + + if effect_modifiers is None: + self.effect_modifiers = {} + else: + self.effect_modifiers = effect_modifiers + self.modelling_assumptions = [] + if query: + self.modelling_assumptions.append(query) + self.add_modelling_assumptions() + logger.debug("Effect Modifiers: %s", self.effect_modifiers) + + @abstractmethod + def add_modelling_assumptions(self): + """ + Add modelling assumptions to the estimator. This is a list of strings which list the modelling assumptions that + must hold if the resulting causal inference is to be considered valid. + """ + + def compute_confidence_intervals(self) -> list[float, float]: + """ + Estimate the 95% Wald confidence intervals for the effect of changing the treatment from control values to + treatment values on the outcome. + :return: 95% Wald confidence intervals. + """ diff --git a/causal_testing/estimation/abstract_regression_estimator.py b/causal_testing/estimation/abstract_regression_estimator.py new file mode 100644 index 00000000..c6786d20 --- /dev/null +++ b/causal_testing/estimation/abstract_regression_estimator.py @@ -0,0 +1,119 @@ +"""This module contains the RegressionEstimator, which is an abstract class for concrete regression estimators.""" + +import logging +from typing import Any +from abc import abstractmethod + +import pandas as pd +from statsmodels.regression.linear_model import RegressionResultsWrapper +from patsy import dmatrix # pylint: disable = no-name-in-module + +from causal_testing.specification.variable import Variable +from causal_testing.estimation.abstract_estimator import Estimator + +logger = logging.getLogger(__name__) + + +class RegressionEstimator(Estimator): + """A Linear Regression Estimator is a parametric estimator which restricts the variables in the data to a linear + combination of parameters and functions of the variables (note these functions need not be linear). + """ + + def __init__( + # pylint: disable=too-many-arguments + self, + treatment: str, + treatment_value: float, + control_value: float, + adjustment_set: set, + outcome: str, + df: pd.DataFrame = None, + effect_modifiers: dict[Variable:Any] = None, + formula: str = None, + alpha: float = 0.05, + query: str = "", + ): + super().__init__( + treatment=treatment, + treatment_value=treatment_value, + control_value=control_value, + adjustment_set=adjustment_set, + outcome=outcome, + df=df, + effect_modifiers=effect_modifiers, + query=query, + ) + + self.model = None + if effect_modifiers is None: + effect_modifiers = [] + if adjustment_set is None: + adjustment_set = [] + if formula is not None: + self.formula = formula + else: + terms = [treatment] + sorted(list(adjustment_set)) + sorted(list(effect_modifiers)) + self.formula = f"{outcome} ~ {'+'.join(terms)}" + + @property + @abstractmethod + def regressor(self): + """ + The regressor to use, e.g. ols or logit. + This should be a property accessible with self.regressor. + Define as `regressor = ...`` outside of __init__, not as `self.regressor = ...`, otherwise + you'll get an "cannot instantiate with abstract method" error. + """ + + def add_modelling_assumptions(self): + """ + Add modelling assumptions to the estimator. This is a list of strings which list the modelling assumptions that + must hold if the resulting causal inference is to be considered valid. + """ + self.modelling_assumptions.append( + "The variables in the data must fit a shape which can be expressed as a linear" + "combination of parameters and functions of variables. Note that these functions" + "do not need to be linear." + ) + + def _run_regression(self, data=None) -> RegressionResultsWrapper: + """Run logistic regression of the treatment and adjustment set against the outcome and return the model. + + :return: The model after fitting to data. + """ + if data is None: + data = self.df + model = self.regressor(formula=self.formula, data=data).fit(disp=0) + self.model = model + return model + + def _predict(self, data=None, adjustment_config: dict = None) -> pd.DataFrame: + """Estimate the outcomes under control and treatment. + + :param data: The data to use, defaults to `self.df`. Controllable for boostrap sampling. + :param: adjustment_config: The values of the adjustment variables to use. + + :return: The estimated outcome under control and treatment, with confidence intervals in the form of a + dataframe with columns "predicted", "se", "ci_lower", and "ci_upper". + """ + if adjustment_config is None: + adjustment_config = {} + + model = self._run_regression(data) + + x = pd.DataFrame(columns=self.df.columns) + x["Intercept"] = 1 # self.intercept + x[self.treatment] = [self.treatment_value, self.control_value] + + for k, v in adjustment_config.items(): + x[k] = v + for k, v in self.effect_modifiers.items(): + x[k] = v + x = dmatrix(self.formula.split("~")[1], x, return_type="dataframe") + for col in x: + if str(x.dtypes[col]) == "object": + x = pd.get_dummies(x, columns=[col], drop_first=True) + + # This has to be here in case the treatment variable is in an I(...) block in the self.formula + x[self.treatment] = [self.treatment_value, self.control_value] + return model.get_prediction(x).summary_frame() diff --git a/causal_testing/estimation/cubic_spline_estimator.py b/causal_testing/estimation/cubic_spline_estimator.py new file mode 100644 index 00000000..b8ceb2fd --- /dev/null +++ b/causal_testing/estimation/cubic_spline_estimator.py @@ -0,0 +1,75 @@ +"""This module contains the CubicSplineRegressionEstimator class, for estimating +continuous outcomes with changes in behaviour""" + +import logging +from typing import Any + +import pandas as pd + +from causal_testing.specification.variable import Variable +from causal_testing.estimation.linear_regression_estimator import LinearRegressionEstimator + +logger = logging.getLogger(__name__) + + +class CubicSplineRegressionEstimator(LinearRegressionEstimator): + """A Cubic Spline Regression Estimator is a parametric estimator which restricts the variables in the data to a + combination of parameters and basis functions of the variables. + """ + + def __init__( + # pylint: disable=too-many-arguments + self, + treatment: str, + treatment_value: float, + control_value: float, + adjustment_set: set, + outcome: str, + basis: int, + df: pd.DataFrame = None, + effect_modifiers: dict[Variable:Any] = None, + formula: str = None, + alpha: float = 0.05, + expected_relationship=None, + ): + super().__init__( + treatment, treatment_value, control_value, adjustment_set, outcome, df, effect_modifiers, formula, alpha + ) + + self.expected_relationship = expected_relationship + + if effect_modifiers is None: + effect_modifiers = [] + + if formula is None: + terms = [treatment] + sorted(list(adjustment_set)) + sorted(list(effect_modifiers)) + self.formula = f"{outcome} ~ cr({'+'.join(terms)}, df={basis})" + + def estimate_ate_calculated(self, adjustment_config: dict = None) -> pd.Series: + """Estimate the ate effect of the treatment on the outcome. That is, the change in outcome caused + by changing the treatment variable from the control value to the treatment value. Here, we actually + calculate the expected outcomes under control and treatment and divide one by the other. This + allows for custom terms to be put in such as squares, inverses, products, etc. + + :param: adjustment_config: The configuration of the adjustment set as a dict mapping variable names to + their values. N.B. Every variable in the adjustment set MUST have a value in + order to estimate the outcome under control and treatment. + + :return: The average treatment effect. + """ + model = self._run_regression() + + x = {"Intercept": 1, self.treatment: self.treatment_value} + if adjustment_config is not None: + for k, v in adjustment_config.items(): + x[k] = v + if self.effect_modifiers is not None: + for k, v in self.effect_modifiers.items(): + x[k] = v + + treatment = model.predict(x).iloc[0] + + x[self.treatment] = self.control_value + control = model.predict(x).iloc[0] + + return pd.Series(treatment - control) diff --git a/causal_testing/estimation/genetic_programming_regression_fitter.py b/causal_testing/estimation/genetic_programming_regression_fitter.py new file mode 100644 index 00000000..49aa4dfc --- /dev/null +++ b/causal_testing/estimation/genetic_programming_regression_fitter.py @@ -0,0 +1,381 @@ +""" +This module contains a genetic programming implementation to infer the functional +form between the adjustment set and the outcome. +""" + +import copy +from inspect import isclass +from operator import add, mul +import random + +import patsy +import numpy as np +import pandas as pd +import statsmodels.formula.api as smf +import statsmodels +import sympy + +from deap import base, creator, tools, gp + +from numpy import power, log + + +def reciprocal(x: float) -> float: + """ + Return the reciprocal of the input. + :param x: Float to reciprocate. + :return: 1/x + """ + return power(x, -1) + + +def mut_insert(expression: gp.PrimitiveTree, pset: gp.PrimitiveSet): + """ + NOTE: This is a temporary workaround. This method is copied verbatim from + gp.mutInsert. It seems they forgot to import isclass from inspect, so their + method throws an error, saying that "isclass is not defined". A couple of + lines are not covered by tests, but since this is 1. a temporary workaround + until they release a new version of DEAP, and 2. not our code, I don't think + that matters. + + Inserts a new branch at a random position in *expression*. The subtree + at the chosen position is used as child node of the created subtree, in + that way, it is really an insertion rather than a replacement. Note that + the original subtree will become one of the children of the new primitive + inserted, but not perforce the first (its position is randomly selected if + the new primitive has more than one child). + + :param expression: The normal or typed tree to be mutated. + :param pset: The pset object defining the variables and constants. + + :return: A tuple of one tree. + """ + index = random.randrange(len(expression)) + node = expression[index] + expr_slice = expression.searchSubtree(index) + choice = random.choice + + # As we want to keep the current node as children of the new one, + # it must accept the return value of the current node + primitives = [p for p in pset.primitives[node.ret] if node.ret in p.args] + + if len(primitives) == 0: + return (expression,) + + new_node = choice(primitives) + new_subtree = [None] * len(new_node.args) + position = choice([i for i, a in enumerate(new_node.args) if a == node.ret]) + + for i, arg_type in enumerate(new_node.args): + if i != position: + term = choice(pset.terminals[arg_type]) + if isclass(term): + term = term() + new_subtree[i] = term + + new_subtree[position : position + 1] = expression[expr_slice] + new_subtree.insert(0, new_node) + expression[expr_slice] = new_subtree + return (expression,) + + +def create_power_function(order: int): + """ + Creates a power operator and its corresponding sympy conversion. + + :param order: The order of the power, e.g. `order=2` will give x^2. + + :return: A pair consisting of the power function and the sympy conversion + """ + + def power_func(x): + return power(x, order) + + def sympy_conversion(x): + return f"Pow({x},{order})" + + return power_func, sympy_conversion + + +class GP: + """ + Object to perform genetic programming. + """ + + # pylint: disable=too-many-instance-attributes + + def __init__( + self, + df: pd.DataFrame, + features: list, + outcome: str, + max_order: int = 0, + extra_operators: list = None, + sympy_conversions: dict = None, + seed=0, + ): + # pylint: disable=too-many-arguments + random.seed(seed) + self.df = df + self.features = features + self.outcome = outcome + self.max_order = max_order + self.seed = seed + self.pset = gp.PrimitiveSet("MAIN", len(self.features)) + self.pset.renameArguments(**{f"ARG{i}": f for i, f in enumerate(self.features)}) + + standard_operators = [(add, 2), (mul, 2)] + if extra_operators is None: + extra_operators = [(log, 1), (reciprocal, 1)] + if sympy_conversions is None: + sympy_conversions = {} + for operator, num_args in standard_operators + extra_operators: + self.pset.addPrimitive(operator, num_args) + + self.sympy_conversions = { + "mul": lambda x1, x2: f"Mul({x1},{x2})", + "add": lambda x1, x2: f"Add({x1},{x2})", + "reciprocal": lambda x1: f"Pow({x1},-1)", + } | sympy_conversions + + for i in range(self.max_order + 1): + name = f"power_{i}" + func, conversion = create_power_function(i) + self.pset.addPrimitive(func, 1, name=name) + if name in self.sympy_conversions: + raise ValueError( + f"You have provided a function called {name}, which is reserved for raising to power" + f"{i}. Please choose a different name for your function." + ) + self.sympy_conversions[name] = conversion + + print(self.pset.mapping) + creator.create("FitnessMin", base.Fitness, weights=(-1.0,)) + creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMin) + + self.toolbox = base.Toolbox() + self.toolbox.register("expr", gp.genHalfAndHalf, pset=self.pset, min_=1, max_=2) + self.toolbox.register("individual", tools.initIterate, creator.Individual, self.toolbox.expr) + self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual) + self.toolbox.register("compile", gp.compile, pset=self.pset) + self.toolbox.register("evaluate", self.fitness) + self.toolbox.register("repair", self.repair) + self.toolbox.register("select", tools.selBest) + self.toolbox.register("mate", gp.cxOnePoint) + self.toolbox.register("mutate", self.mutate) + self.toolbox.decorate("mate", gp.staticLimit(key=lambda x: x.height + 1, max_value=17)) + self.toolbox.decorate("mutate", gp.staticLimit(key=lambda x: x.height + 1, max_value=17)) + + def split(self, individual: gp.PrimitiveTree) -> list: + """ + Split an expression into its components, e.g. 2x + 4y - xy -> [2x, 4y, xy]. + + :param individual: The expression to be split. + :return: A list of the equations components that are linearly combined into the full equation. + """ + if len(individual) > 1: + terms = [] + # Recurse over children if add/sub + if individual[0].name in ["add", "sub"]: + terms.extend( + self.split( + creator.Individual( + gp.PrimitiveTree( + individual[individual.searchSubtree(1).start : individual.searchSubtree(1).stop] + ) + ) + ) + ) + terms.extend( + self.split(creator.Individual(gp.PrimitiveTree(individual[individual.searchSubtree(1).stop :]))) + ) + else: + terms.append(individual) + return terms + return [individual] + + def _convert_prim(self, prim: gp.Primitive, args: list) -> str: + """ + Convert primitives to sympy format. + + :param prim: A GP primitive, e.g. add + :param args: The list of arguments + + :return: A sympy compatible string representing the function, e.g. add(x, y) -> Add(x, y). + """ + prim = copy.copy(prim) + prim_formatter = self.sympy_conversions.get(prim.name, prim.format) + return prim_formatter(*args) + + def _stringify_for_sympy(self, expression: gp.PrimitiveTree) -> str: + """ + Return the expression in a sympy compatible string. + + :param expression: The expression to be simplified. + + :return: A sympy compatible string representing the equation. + """ + string = "" + stack = [] + for node in expression: + stack.append((node, [])) + while len(stack[-1][1]) == stack[-1][0].arity: + prim, args = stack.pop() + string = self._convert_prim(prim, args) + if len(stack) == 0: + break # If stack is empty, all nodes should have been seen + stack[-1][1].append(string) + return string + + def simplify(self, expression: gp.PrimitiveTree) -> sympy.core.Expr: + """ + Simplify an expression by appling mathematical equivalences. + + :param expression: The expression to simplify. + + :return: The simplified expression as a sympy Expr object. + """ + return sympy.simplify(self._stringify_for_sympy(expression)) + + def repair(self, expression: gp.PrimitiveTree) -> gp.PrimitiveTree: + """ + Use linear regression to infer the coefficients of the linear components of the expression. + Named "repair" since a "repair operator" is quite common in GP. + + :param expression: The expression to process. + + :return: The expression with constant coefficients, or the original expression if that fails. + """ + eq = f"{self.outcome} ~ {' + '.join(str(x) for x in self.split(expression))}" + try: + # Create model, fit (run) it, give estimates from it] + model = smf.ols(eq, self.df) + res = model.fit() + + eqn = f"{res.params['Intercept']}" + for term, coefficient in res.params.items(): + if term != "Intercept": + eqn = f"add({eqn}, mul({coefficient}, {term}))" + repaired = type(expression)(gp.PrimitiveTree.from_string(eqn, self.pset)) + return repaired + except ( + OverflowError, + ValueError, + ZeroDivisionError, + statsmodels.tools.sm_exceptions.MissingDataError, + patsy.PatsyError, + ): + return expression + + def fitness(self, expression: gp.PrimitiveTree) -> float: + """ + Evaluate the fitness of an candidate expression according to the error between the estimated and observed + values. Low values are better. + + :param expression: The candidate expression to evaluate. + + :return: The fitness of the individual. + """ + old_settings = np.seterr(all="raise") + try: + # Create model, fit (run) it, give estimates from it] + func = gp.compile(expression, self.pset) + y_estimates = pd.Series([func(**x) for _, x in self.df[self.features].iterrows()]) + + # Calc errors using an improved normalised mean squared + sqerrors = (self.df[self.outcome] - y_estimates) ** 2 + mean_squared = sqerrors.sum() / len(self.df) + nmse = mean_squared / (self.df[self.outcome].sum() / len(self.df)) + + return (nmse,) + + # Fitness value of infinite if error - not return 1 + except ( + OverflowError, + ValueError, + ZeroDivisionError, + statsmodels.tools.sm_exceptions.MissingDataError, + patsy.PatsyError, + RuntimeWarning, + FloatingPointError, + ): + return (float("inf"),) + finally: + np.seterr(**old_settings) # Restore original settings + + def make_offspring(self, population: list, num_offspring: int) -> list: + """ + Create the next generation of individuals. + + :param population: The current population. + :param num_offspring: The number of new individuals to generate. + + :return: A list of num_offspring new individuals generated through crossover and mutation. + """ + offspring = [] + for _ in range(num_offspring): + parent1, parent2 = tools.selTournament(population, 2, 2) + child, _ = self.toolbox.mate(self.toolbox.clone(parent1), self.toolbox.clone(parent2)) + del child.fitness.values + (child,) = self.toolbox.mutate(child) + offspring.append(child) + return offspring + + def run_gp(self, ngen: int, pop_size: int = 20, num_offspring: int = 10, seeds: list = None) -> gp.PrimitiveTree: + """ + Execute Genetic Programming to find the best expression using a mu+lambda algorithm. + + :param ngen: The maximum number of generations. + :param pop_size: The population size. + :param num_offspring: The number of new individuals per generation. + :param seeds: Seed individuals for the initial population. + + :return: The best candididate expression. + """ + population = [self.toolbox.repair(ind) for ind in self.toolbox.population(n=pop_size)] + if seeds is not None: + for seed in seeds: + ind = creator.Individual(gp.PrimitiveTree.from_string(seed, self.pset)) + ind.fitness.values = self.toolbox.evaluate(ind) + population.append(ind) + + # Evaluate the individuals with an invalid fitness + for ind in population: + ind.fitness.values = self.toolbox.evaluate(ind) + population.sort(key=lambda x: (x.fitness.values, x.height)) + + # Begin the generational process + for _ in range(1, ngen + 1): + # Vary the population + offspring = self.make_offspring(population, num_offspring) + offspring = [self.toolbox.repair(ind) for ind in offspring] + + # Evaluate the individuals with an invalid fitness + for ind in offspring: + ind.fitness.values = self.toolbox.evaluate(ind) + + # Select the best pop_size individuals to continue to the next generation + population[:] = self.toolbox.select(population + offspring, pop_size) + + # Update the statistics with the new population + population.sort(key=lambda x: (x.fitness.values, x.height)) + + return population[0] + + def mutate(self, expression: gp.PrimitiveTree) -> gp.PrimitiveTree: + """ + mutate individuals to replicate the small changes in DNA that occur in natural reproduction. + A node will randomly be inserted, removed, or replaced. + + :param expression: The expression to mutate. + + :return: The mutated expression. + """ + choice = random.randint(1, 3) + if choice == 1: + mutated = gp.mutNodeReplacement(self.toolbox.clone(expression), self.pset) + elif choice == 2: + mutated = mut_insert(self.toolbox.clone(expression), self.pset) + elif choice == 3: + mutated = gp.mutShrink(self.toolbox.clone(expression)) + return mutated diff --git a/causal_testing/estimation/instrumental_variable_estimator.py b/causal_testing/estimation/instrumental_variable_estimator.py new file mode 100644 index 00000000..38d0fc1b --- /dev/null +++ b/causal_testing/estimation/instrumental_variable_estimator.py @@ -0,0 +1,91 @@ +"""This module contains the InstrumentalVariableEstimator class, for estimating +continuous outcomes with unobservable confounding.""" + +import logging +from math import ceil +import pandas as pd +import statsmodels.api as sm + +from causal_testing.estimation.abstract_estimator import Estimator + +logger = logging.getLogger(__name__) + + +class InstrumentalVariableEstimator(Estimator): + """ + Carry out estimation using instrumental variable adjustment rather than conventional adjustment. This means we do + not need to observe all confounders in order to adjust for them. A key assumption here is linearity. + """ + + def __init__( + # pylint: disable=too-many-arguments + # pylint: disable=duplicate-code + self, + treatment: str, + treatment_value: float, + control_value: float, + adjustment_set: set, + outcome: str, + instrument: str, + df: pd.DataFrame = None, + alpha: float = 0.05, + query: str = "", + ): + super().__init__( + treatment=treatment, + treatment_value=treatment_value, + control_value=control_value, + adjustment_set=adjustment_set, + outcome=outcome, + df=df, + effect_modifiers=None, + alpha=alpha, + query=query, + ) + + self.instrument = instrument + + def add_modelling_assumptions(self): + """ + Add modelling assumptions to the estimator. This is a list of strings which list the modelling assumptions that + must hold if the resulting causal inference is to be considered valid. + """ + self.modelling_assumptions.append( + """The instrument and the treatment, and the treatment and the outcome must be + related linearly in the form Y = aX + b.""" + ) + self.modelling_assumptions.append( + """The three IV conditions must hold + (i) Instrument is associated with treatment + (ii) Instrument does not affect outcome except through its potential effect on treatment + (iii) Instrument and outcome do not share causes + """ + ) + + def estimate_iv_coefficient(self, df) -> float: + """ + Estimate the linear regression coefficient of the treatment on the + outcome. + """ + # Estimate the total effect of instrument I on outcome Y = abI + c1 + ab = sm.OLS(df[self.outcome], df[[self.instrument]]).fit().params[self.instrument] + + # Estimate the direct effect of instrument I on treatment X = aI + c1 + a = sm.OLS(df[self.treatment], df[[self.instrument]]).fit().params[self.instrument] + + # Estimate the coefficient of I on X by cancelling + return ab / a + + def estimate_coefficient(self, bootstrap_size=100) -> tuple[pd.Series, list[pd.Series, pd.Series]]: + """ + Estimate the unit ate (i.e. coefficient) of the treatment on the + outcome. + """ + bootstraps = sorted( + [self.estimate_iv_coefficient(self.df.sample(len(self.df), replace=True)) for _ in range(bootstrap_size)] + ) + bound = ceil((bootstrap_size * self.alpha) / 2) + ci_low = pd.Series(bootstraps[bound]) + ci_high = pd.Series(bootstraps[bootstrap_size - bound]) + + return pd.Series(self.estimate_iv_coefficient(self.df)), [ci_low, ci_high] diff --git a/causal_testing/estimation/ipcw_estimator.py b/causal_testing/estimation/ipcw_estimator.py new file mode 100644 index 00000000..584182ff --- /dev/null +++ b/causal_testing/estimation/ipcw_estimator.py @@ -0,0 +1,249 @@ +"""This module contains the IPCWEstimator class, for estimating the time to a particular event""" + +import logging +from math import ceil + +import numpy as np +import pandas as pd +import statsmodels.formula.api as smf +from lifelines import CoxPHFitter + +from causal_testing.specification.capabilities import TreatmentSequence, Capability +from causal_testing.estimation.abstract_estimator import Estimator + +logger = logging.getLogger(__name__) + + +class IPCWEstimator(Estimator): + """ + Class to perform inverse probability of censoring weighting (IPCW) estimation + for sequences of treatments over time-varying data. + """ + + # pylint: disable=too-many-arguments + # pylint: disable=too-many-instance-attributes + def __init__( + self, + df: pd.DataFrame, + timesteps_per_intervention: int, + control_strategy: TreatmentSequence, + treatment_strategy: TreatmentSequence, + outcome: str, + fault_column: str, + fit_bl_switch_formula: str, + fit_bltd_switch_formula: str, + eligibility=None, + alpha: float = 0.05, + ): + super().__init__( + [c.variable for c in treatment_strategy.capabilities], + [c.value for c in treatment_strategy.capabilities], + [c.value for c in control_strategy.capabilities], + None, + outcome, + df, + None, + alpha=alpha, + query="", + ) + self.timesteps_per_intervention = timesteps_per_intervention + self.control_strategy = control_strategy + self.treatment_strategy = treatment_strategy + self.outcome = outcome + self.fault_column = fault_column + self.timesteps_per_intervention = timesteps_per_intervention + self.fit_bl_switch_formula = fit_bl_switch_formula + self.fit_bltd_switch_formula = fit_bltd_switch_formula + self.eligibility = eligibility + self.df = df + self.preprocess_data() + + def add_modelling_assumptions(self): + self.modelling_assumptions.append("The variables in the data vary over time.") + + def setup_xo_t_do(self, strategy_assigned: list, strategy_followed: list, eligible: pd.Series): + """ + Return a binary sequence with each bit representing whether the current + index is the time point at which the individual diverted from the + assigned treatment strategy (and thus should be censored). + + :param strategy_assigned - the assigned treatment strategy + :param strategy_followed - the strategy followed by the individual + :param eligible - binary sequence represnting the eligibility of the individual at each time step + """ + strategy_assigned = [1] + strategy_assigned + [1] + strategy_followed = [1] + strategy_followed + [1] + + mask = ( + pd.Series(strategy_assigned, index=eligible.index) != pd.Series(strategy_followed, index=eligible.index) + ).astype("boolean") + mask = mask | ~eligible + mask.reset_index(inplace=True, drop=True) + false = mask.loc[mask] + if false.empty: + return np.zeros(len(mask)) + mask = (mask * 1).tolist() + cutoff = false.index[0] + 1 + return mask[:cutoff] + ([None] * (len(mask) - cutoff)) + + def setup_fault_t_do(self, individual: pd.DataFrame): + """ + Return a binary sequence with each bit representing whether the current + index is the time point at which the event of interest (i.e. a fault) + occurred. + """ + fault = individual[~individual[self.fault_column]] + fault_t_do = pd.Series(np.zeros(len(individual)), index=individual.index) + + if not fault.empty: + fault_time = individual["time"].loc[fault.index[0]] + # Ceiling to nearest observation point + fault_time = ceil(fault_time / self.timesteps_per_intervention) * self.timesteps_per_intervention + # Set the correct observation point to be the fault time of doing (fault_t_do) + observations = individual.loc[ + (individual["time"] % self.timesteps_per_intervention == 0) & (individual["time"] < fault_time) + ] + if not observations.empty: + fault_t_do.loc[observations.index[0]] = 1 + assert sum(fault_t_do) <= 1, f"Multiple fault times for\n{individual}" + + return pd.DataFrame({"fault_t_do": fault_t_do}) + + def setup_fault_time(self, individual: pd.DataFrame, perturbation: float = -0.001): + """ + Return the time at which the event of interest (i.e. a fault) occurred. + """ + fault = individual[~individual[self.fault_column]] + fault_time = ( + individual["time"].loc[fault.index[0]] + if not fault.empty + else (individual["time"].max() + self.timesteps_per_intervention) + ) + return pd.DataFrame({"fault_time": np.repeat(fault_time + perturbation, len(individual))}) + + def preprocess_data(self): + """ + Set up the treatment-specific columns in the data that are needed to estimate the hazard ratio. + """ + self.df["trtrand"] = None # treatment/control arm + self.df["xo_t_do"] = None # did the individual deviate from the treatment of interest here? + self.df["eligible"] = self.df.eval(self.eligibility) if self.eligibility is not None else True + + # when did a fault occur? + self.df["fault_time"] = self.df.groupby("id")[[self.fault_column, "time"]].apply(self.setup_fault_time).values + self.df["fault_t_do"] = ( + self.df.groupby("id")[["id", "time", self.fault_column]].apply(self.setup_fault_t_do).values + ) + assert not pd.isnull(self.df["fault_time"]).any() + + living_runs = self.df.query("fault_time > 0").loc[ + (self.df["time"] % self.timesteps_per_intervention == 0) + & (self.df["time"] <= self.control_strategy.total_time()) + ] + + individuals = [] + new_id = 0 + logging.debug(" Preprocessing groups") + for _, individual in living_runs.groupby("id"): + assert sum(individual["fault_t_do"]) <= 1, ( + f"Error initialising fault_t_do for individual\n" + f"{individual[['id', 'time', 'fault_time', 'fault_t_do']]}\n" + "with fault at {individual.fault_time.iloc[0]}" + ) + + strategy_followed = [ + Capability( + c.variable, + individual.loc[individual["time"] == c.start_time, c.variable].values[0], + c.start_time, + c.end_time, + ) + for c in self.treatment_strategy.capabilities + ] + + # Control flow: + # Individuals that start off in both arms, need cloning (hence incrementing the ID within the if statement) + # Individuals that don't start off in either arm are left out + for inx, strategy_assigned in [(0, self.control_strategy), (1, self.treatment_strategy)]: + if strategy_assigned.capabilities[0] == strategy_followed[0] and individual.eligible.iloc[0]: + individual["id"] = new_id + new_id += 1 + individual["trtrand"] = inx + individual["xo_t_do"] = self.setup_xo_t_do( + strategy_assigned.capabilities, strategy_followed, individual["eligible"] + ) + individuals.append(individual.loc[individual["time"] <= individual["fault_time"]].copy()) + if len(individuals) == 0: + raise ValueError("No individuals followed either strategy.") + + self.df = pd.concat(individuals) + + def estimate_hazard_ratio(self): + """ + Estimate the hazard ratio. + """ + + if self.df["fault_t_do"].sum() == 0: + raise ValueError("No recorded faults") + + preprocessed_data = self.df.loc[self.df["xo_t_do"] == 0].copy() + + # Use logistic regression to predict switching given baseline covariates + fit_bl_switch = smf.logit(self.fit_bl_switch_formula, data=self.df).fit() + + preprocessed_data["pxo1"] = fit_bl_switch.predict(preprocessed_data) + + # Use logistic regression to predict switching given baseline and time-updated covariates (model S12) + fit_bltd_switch = smf.logit( + self.fit_bltd_switch_formula, + data=self.df, + ).fit() + + preprocessed_data["pxo2"] = fit_bltd_switch.predict(preprocessed_data) + + # IPCW step 3: For each individual at each time, compute the inverse probability of remaining uncensored + # Estimate the probabilities of remaining ‘un-switched’ and hence the weights + + preprocessed_data["num"] = 1 - preprocessed_data["pxo1"] + preprocessed_data["denom"] = 1 - preprocessed_data["pxo2"] + preprocessed_data[["num", "denom"]] = ( + preprocessed_data.sort_values(["id", "time"]).groupby("id")[["num", "denom"]].cumprod() + ) + + assert ( + not preprocessed_data["num"].isnull().any() + ), f"{len(preprocessed_data['num'].isnull())} null numerator values" + assert ( + not preprocessed_data["denom"].isnull().any() + ), f"{len(preprocessed_data['denom'].isnull())} null denom values" + + preprocessed_data["weight"] = 1 / preprocessed_data["denom"] + preprocessed_data["sweight"] = preprocessed_data["num"] / preprocessed_data["denom"] + + preprocessed_data["tin"] = preprocessed_data["time"] + preprocessed_data["tout"] = pd.concat( + [(preprocessed_data["time"] + self.timesteps_per_intervention), preprocessed_data["fault_time"]], + axis=1, + ).min(axis=1) + + assert (preprocessed_data["tin"] <= preprocessed_data["tout"]).all(), ( + f"Left before joining\n" f"{preprocessed_data.loc[preprocessed_data['tin'] >= preprocessed_data['tout']]}" + ) + + # IPCW step 4: Use these weights in a weighted analysis of the outcome model + # Estimate the KM graph and IPCW hazard ratio using Cox regression. + cox_ph = CoxPHFitter() + cox_ph.fit( + df=preprocessed_data, + duration_col="tout", + event_col="fault_t_do", + weights_col="weight", + cluster_col="id", + robust=True, + formula="trtrand", + entry_col="tin", + ) + + ci_low, ci_high = [np.exp(cox_ph.confidence_intervals_)[col] for col in cox_ph.confidence_intervals_.columns] + + return (cox_ph.hazard_ratios_, (ci_low, ci_high)) diff --git a/causal_testing/estimation/linear_regression_estimator.py b/causal_testing/estimation/linear_regression_estimator.py new file mode 100644 index 00000000..85a4b178 --- /dev/null +++ b/causal_testing/estimation/linear_regression_estimator.py @@ -0,0 +1,186 @@ +"""This module contains the LinearRegressionEstimator for estimating continuous outcomes.""" + +import logging +from typing import Any + +import pandas as pd +import statsmodels.formula.api as smf +from patsy import dmatrix, ModelDesc # pylint: disable = no-name-in-module + +from causal_testing.specification.variable import Variable +from causal_testing.estimation.genetic_programming_regression_fitter import GP +from causal_testing.estimation.abstract_regression_estimator import RegressionEstimator + +logger = logging.getLogger(__name__) + + +class LinearRegressionEstimator(RegressionEstimator): + """A Linear Regression Estimator is a parametric estimator which restricts the variables in the data to a linear + combination of parameters and functions of the variables (note these functions need not be linear). + """ + + regressor = smf.ols + + def __init__( + # pylint: disable=too-many-arguments + self, + treatment: str, + treatment_value: float, + control_value: float, + adjustment_set: set, + outcome: str, + df: pd.DataFrame = None, + effect_modifiers: dict[Variable:Any] = None, + formula: str = None, + alpha: float = 0.05, + query: str = "", + ): + # pylint: disable=too-many-arguments + super().__init__( + treatment, + treatment_value, + control_value, + adjustment_set, + outcome, + df, + effect_modifiers, + formula, + alpha, + query, + ) + for term in self.effect_modifiers: + self.adjustment_set.add(term) + + def gp_formula( + self, + ngen: int = 100, + pop_size: int = 20, + num_offspring: int = 10, + max_order: int = 0, + extra_operators: list = None, + sympy_conversions: dict = None, + seeds: list = None, + seed: int = 0, + ): + # pylint: disable=too-many-arguments + """ + Use Genetic Programming (GP) to infer the regression equation from the data. + + :param ngen: The maximum number of GP generations to run for. + :param pop_size: The GP population size. + :param num_offspring: The number of offspring per generation. + :param max_order: The maximum polynomial order to use, e.g. `max_order=2` will give + polynomials of the form `ax^2 + bx + c`. + :param extra_operators: Additional operators for the GP (defaults are +, *, log(x), and 1/x). + Operations should be of the form (fun, numArgs), e.g. (add, 2). + :param sympy_conversions: Dictionary of conversions of extra_operators for sympy, + e.g. `"mul": lambda *args_: "Mul({},{})".format(*args_)`. + :param seeds: Seed individuals for the population (e.g. if you think that the relationship between X and Y is + probably logarithmic, you can put that in). + :param seed: Random seed for the GP. + """ + gp = GP( + df=self.df, + features=sorted(list(self.adjustment_set.union([self.treatment]))), + outcome=self.outcome, + extra_operators=extra_operators, + sympy_conversions=sympy_conversions, + seed=seed, + max_order=max_order, + ) + formula = gp.run_gp(ngen=ngen, pop_size=pop_size, num_offspring=num_offspring, seeds=seeds) + formula = gp.simplify(formula) + self.formula = f"{self.outcome} ~ I({formula}) - 1" + + def estimate_coefficient(self) -> tuple[pd.Series, list[pd.Series, pd.Series]]: + """Estimate the unit average treatment effect of the treatment on the outcome. That is, the change in outcome + caused by a unit change in treatment. + + :return: The unit average treatment effect and the 95% Wald confidence intervals. + """ + model = self._run_regression() + newline = "\n" + patsy_md = ModelDesc.from_formula(self.treatment) + + if any( + ( + self.df.dtypes[factor.name()] == "object" + for factor in patsy_md.rhs_termlist[1].factors + # We want to remove this long term as it prevents us from discovering categoricals within I(...) blocks + if factor.name() in self.df.dtypes + ) + ): + design_info = dmatrix(self.formula.split("~")[1], self.df).design_info + treatment = design_info.column_names[design_info.term_name_slices[self.treatment]] + else: + treatment = [self.treatment] + assert set(treatment).issubset( + model.params.index.tolist() + ), f"{treatment} not in\n{' ' + str(model.params.index).replace(newline, newline + ' ')}" + unit_effect = model.params[treatment] # Unit effect is the coefficient of the treatment + [ci_low, ci_high] = self._get_confidence_intervals(model, treatment) + return unit_effect, [ci_low, ci_high] + + def estimate_ate(self) -> tuple[pd.Series, list[pd.Series, pd.Series]]: + """Estimate the average treatment effect of the treatment on the outcome. That is, the change in outcome caused + by changing the treatment variable from the control value to the treatment value. + + :return: The average treatment effect and the 95% Wald confidence intervals. + """ + model = self._run_regression() + + # Create an empty individual for the control and treated + individuals = pd.DataFrame(1, index=["control", "treated"], columns=model.params.index) + + # For Pandas version > 2, we need to explicitly state that the dataframe takes floating-point values + individuals = individuals.astype(float) + + # It is ABSOLUTELY CRITICAL that these go last, otherwise we can't index + # the effect with "ate = t_test_results.effect[0]" + individuals.loc["control", [self.treatment]] = self.control_value + individuals.loc["treated", [self.treatment]] = self.treatment_value + + # Perform a t-test to compare the predicted outcome of the control and treated individual (ATE) + t_test_results = model.t_test(individuals.loc["treated"] - individuals.loc["control"]) + ate = pd.Series(t_test_results.effect[0]) + confidence_intervals = list(t_test_results.conf_int(alpha=self.alpha).flatten()) + confidence_intervals = [pd.Series(interval) for interval in confidence_intervals] + return ate, confidence_intervals + + def estimate_risk_ratio(self, adjustment_config: dict = None) -> tuple[pd.Series, list[pd.Series, pd.Series]]: + """Estimate the risk_ratio effect of the treatment on the outcome. That is, the change in outcome caused + by changing the treatment variable from the control value to the treatment value. + + :return: The average treatment effect and the 95% Wald confidence intervals. + """ + prediction = self._predict(adjustment_config=adjustment_config) + control_outcome, treatment_outcome = prediction.iloc[1], prediction.iloc[0] + ci_low = pd.Series(treatment_outcome["mean_ci_lower"] / control_outcome["mean_ci_upper"]) + ci_high = pd.Series(treatment_outcome["mean_ci_upper"] / control_outcome["mean_ci_lower"]) + return pd.Series(treatment_outcome["mean"] / control_outcome["mean"]), [ci_low, ci_high] + + def estimate_ate_calculated(self, adjustment_config: dict = None) -> tuple[pd.Series, list[pd.Series, pd.Series]]: + """Estimate the ate effect of the treatment on the outcome. That is, the change in outcome caused + by changing the treatment variable from the control value to the treatment value. Here, we actually + calculate the expected outcomes under control and treatment and divide one by the other. This + allows for custom terms to be put in such as squares, inverses, products, etc. + + :param: adjustment_config: The configuration of the adjustment set as a dict mapping variable names to + their values. N.B. Every variable in the adjustment set MUST have a value in + order to estimate the outcome under control and treatment. + + :return: The average treatment effect and the 95% Wald confidence intervals. + """ + prediction = self._predict(adjustment_config=adjustment_config) + control_outcome, treatment_outcome = prediction.iloc[1], prediction.iloc[0] + ci_low = pd.Series(treatment_outcome["mean_ci_lower"] - control_outcome["mean_ci_upper"]) + ci_high = pd.Series(treatment_outcome["mean_ci_upper"] - control_outcome["mean_ci_lower"]) + return pd.Series(treatment_outcome["mean"] - control_outcome["mean"]), [ci_low, ci_high] + + def _get_confidence_intervals(self, model, treatment): + confidence_intervals = model.conf_int(alpha=self.alpha, cols=None) + ci_low, ci_high = ( + pd.Series(confidence_intervals[0].loc[treatment]), + pd.Series(confidence_intervals[1].loc[treatment]), + ) + return [ci_low, ci_high] diff --git a/causal_testing/estimation/logistic_regression_estimator.py b/causal_testing/estimation/logistic_regression_estimator.py new file mode 100644 index 00000000..ca5537d4 --- /dev/null +++ b/causal_testing/estimation/logistic_regression_estimator.py @@ -0,0 +1,41 @@ +"""This module contains the LogisticRegressionEstimator class for estimating categorical outcomes.""" + +import logging + +import numpy as np +import statsmodels.formula.api as smf + +from causal_testing.estimation.abstract_regression_estimator import RegressionEstimator + +logger = logging.getLogger(__name__) + + +class LogisticRegressionEstimator(RegressionEstimator): + """A Logistic Regression Estimator is a parametric estimator which restricts the variables in the data to a linear + combination of parameters and functions of the variables (note these functions need not be linear). It is designed + for estimating categorical outcomes. + """ + + regressor = smf.logit + + def add_modelling_assumptions(self): + """ + Add modelling assumptions to the estimator. This is a list of strings which list the modelling assumptions that + must hold if the resulting causal inference is to be considered valid. + """ + self.modelling_assumptions.append( + "The variables in the data must fit a shape which can be expressed as a linear" + "combination of parameters and functions of variables. Note that these functions" + "do not need to be linear." + ) + self.modelling_assumptions.append("The outcome must be binary.") + self.modelling_assumptions.append("Independently and identically distributed errors.") + + def estimate_unit_odds_ratio(self) -> float: + """Estimate the odds ratio of increasing the treatment by one. In logistic regression, this corresponds to the + coefficient of the treatment of interest. + + :return: The odds ratio. Confidence intervals are not yet supported. + """ + model = self._run_regression(self.df) + return np.exp(model.params[self.treatment]) diff --git a/causal_testing/json_front/json_class.py b/causal_testing/json_front/json_class.py index 8c6c265c..6be7fa68 100644 --- a/causal_testing/json_front/json_class.py +++ b/causal_testing/json_front/json_class.py @@ -22,10 +22,13 @@ from causal_testing.specification.variable import Input, Meta, Output from causal_testing.testing.causal_test_case import CausalTestCase from causal_testing.testing.causal_test_result import CausalTestResult -from causal_testing.testing.estimators import Estimator, LinearRegressionEstimator, LogisticRegressionEstimator from causal_testing.testing.base_test_case import BaseTestCase from causal_testing.testing.causal_test_adequacy import DataAdequacy +from causal_testing.estimation.abstract_estimator import Estimator +from causal_testing.estimation.linear_regression_estimator import LinearRegressionEstimator +from causal_testing.estimation.logistic_regression_estimator import LogisticRegressionEstimator + logger = logging.getLogger(__name__) diff --git a/causal_testing/specification/capabilities.py b/causal_testing/specification/capabilities.py index ed692e6d..1cb6d30b 100644 --- a/causal_testing/specification/capabilities.py +++ b/causal_testing/specification/capabilities.py @@ -2,6 +2,7 @@ This module contains the Capability and TreatmentSequence classes to implement treatment sequences that operate over time. """ + from typing import Any from causal_testing.specification.variable import Variable diff --git a/causal_testing/surrogate/causal_surrogate_assisted.py b/causal_testing/surrogate/causal_surrogate_assisted.py index c67d4e4b..4fba5371 100644 --- a/causal_testing/surrogate/causal_surrogate_assisted.py +++ b/causal_testing/surrogate/causal_surrogate_assisted.py @@ -7,7 +7,7 @@ from causal_testing.data_collection.data_collector import ObservationalDataCollector from causal_testing.specification.causal_specification import CausalSpecification from causal_testing.testing.base_test_case import BaseTestCase -from causal_testing.testing.estimators import CubicSplineRegressionEstimator +from causal_testing.estimation.cubic_spline_estimator import CubicSplineRegressionEstimator @dataclass diff --git a/causal_testing/surrogate/surrogate_search_algorithms.py b/causal_testing/surrogate/surrogate_search_algorithms.py index 495c2f86..54e7bb48 100644 --- a/causal_testing/surrogate/surrogate_search_algorithms.py +++ b/causal_testing/surrogate/surrogate_search_algorithms.py @@ -6,7 +6,7 @@ from pygad import GA from causal_testing.specification.causal_specification import CausalSpecification -from causal_testing.testing.estimators import CubicSplineRegressionEstimator +from causal_testing.estimation.cubic_spline_estimator import CubicSplineRegressionEstimator from causal_testing.surrogate.causal_surrogate_assisted import SearchAlgorithm diff --git a/causal_testing/testing/causal_test_adequacy.py b/causal_testing/testing/causal_test_adequacy.py index 740bba5e..5fb043eb 100644 --- a/causal_testing/testing/causal_test_adequacy.py +++ b/causal_testing/testing/causal_test_adequacy.py @@ -11,7 +11,7 @@ from causal_testing.testing.causal_test_suite import CausalTestSuite from causal_testing.specification.causal_dag import CausalDAG -from causal_testing.testing.estimators import Estimator +from causal_testing.estimation.abstract_estimator import Estimator from causal_testing.testing.causal_test_case import CausalTestCase logger = logging.getLogger(__name__) diff --git a/causal_testing/testing/causal_test_case.py b/causal_testing/testing/causal_test_case.py index 52daf040..08ad54f1 100644 --- a/causal_testing/testing/causal_test_case.py +++ b/causal_testing/testing/causal_test_case.py @@ -7,7 +7,7 @@ from causal_testing.specification.variable import Variable from causal_testing.testing.causal_test_outcome import CausalTestOutcome from causal_testing.testing.base_test_case import BaseTestCase -from causal_testing.testing.estimators import Estimator +from causal_testing.estimation.abstract_estimator import Estimator from causal_testing.testing.causal_test_result import CausalTestResult, TestValue from causal_testing.data_collection.data_collector import DataCollector diff --git a/causal_testing/testing/causal_test_result.py b/causal_testing/testing/causal_test_result.py index 65a2085e..bfcfe826 100644 --- a/causal_testing/testing/causal_test_result.py +++ b/causal_testing/testing/causal_test_result.py @@ -6,7 +6,7 @@ from dataclasses import dataclass import pandas as pd -from causal_testing.testing.estimators import Estimator +from causal_testing.estimation.abstract_estimator import Estimator from causal_testing.specification.variable import Variable diff --git a/causal_testing/testing/causal_test_suite.py b/causal_testing/testing/causal_test_suite.py index 47c5ef98..14099143 100644 --- a/causal_testing/testing/causal_test_suite.py +++ b/causal_testing/testing/causal_test_suite.py @@ -7,7 +7,7 @@ from typing import Type, Iterable from causal_testing.testing.base_test_case import BaseTestCase from causal_testing.testing.causal_test_case import CausalTestCase -from causal_testing.testing.estimators import Estimator +from causal_testing.estimation.abstract_estimator import Estimator from causal_testing.testing.causal_test_result import CausalTestResult from causal_testing.data_collection.data_collector import DataCollector from causal_testing.specification.causal_specification import CausalSpecification diff --git a/causal_testing/testing/estimators.py b/causal_testing/testing/estimators.py deleted file mode 100644 index ffe76387..00000000 --- a/causal_testing/testing/estimators.py +++ /dev/null @@ -1,840 +0,0 @@ -"""This module contains the Estimator abstract class, as well as its concrete extensions: LogisticRegressionEstimator, -LinearRegressionEstimator""" - -import logging -from abc import ABC, abstractmethod -from typing import Any -from math import ceil - -import numpy as np -import pandas as pd -import statsmodels.api as sm -import statsmodels.formula.api as smf -from patsy import dmatrix # pylint: disable = no-name-in-module -from patsy import ModelDesc -from statsmodels.regression.linear_model import RegressionResultsWrapper -from statsmodels.tools.sm_exceptions import PerfectSeparationError -from lifelines import CoxPHFitter - -from causal_testing.specification.variable import Variable -from causal_testing.specification.capabilities import TreatmentSequence, Capability - -logger = logging.getLogger(__name__) - - -class Estimator(ABC): - # pylint: disable=too-many-instance-attributes - """An estimator contains all of the information necessary to compute a causal estimate for the effect of changing - a set of treatment variables to a set of values. - - All estimators must implement the following two methods: - - 1) add_modelling_assumptions: The validity of a model-assisted causal inference result depends on whether - the modelling assumptions imposed by a model actually hold. Therefore, for each model, is important to state - the modelling assumption upon which the validity of the results depend. To achieve this, the estimator object - maintains a list of modelling assumptions (as strings). If a user wishes to implement their own estimator, they - must implement this method and add all assumptions to the list of modelling assumptions. - - 2) estimate_ate: All estimators must be capable of returning the average treatment effect as a minimum. That is, the - average effect of the intervention (changing treatment from control to treated value) on the outcome of interest - adjusted for all confounders. - """ - - def __init__( - # pylint: disable=too-many-arguments - self, - treatment: str, - treatment_value: float, - control_value: float, - adjustment_set: set, - outcome: str, - df: pd.DataFrame = None, - effect_modifiers: dict[str:Any] = None, - alpha: float = 0.05, - query: str = "", - ): - self.treatment = treatment - self.treatment_value = treatment_value - self.control_value = control_value - self.adjustment_set = adjustment_set - self.outcome = outcome - self.alpha = alpha - self.df = df.query(query) if query else df - - if effect_modifiers is None: - self.effect_modifiers = {} - elif isinstance(effect_modifiers, dict): - self.effect_modifiers = effect_modifiers - else: - raise ValueError(f"Unsupported type for effect_modifiers {effect_modifiers}. Expected iterable") - self.modelling_assumptions = [] - if query: - self.modelling_assumptions.append(query) - self.add_modelling_assumptions() - logger.debug("Effect Modifiers: %s", self.effect_modifiers) - - @abstractmethod - def add_modelling_assumptions(self): - """ - Add modelling assumptions to the estimator. This is a list of strings which list the modelling assumptions that - must hold if the resulting causal inference is to be considered valid. - """ - - def compute_confidence_intervals(self) -> list[float, float]: - """ - Estimate the 95% Wald confidence intervals for the effect of changing the treatment from control values to - treatment values on the outcome. - :return: 95% Wald confidence intervals. - """ - - -class LogisticRegressionEstimator(Estimator): - """A Logistic Regression Estimator is a parametric estimator which restricts the variables in the data to a linear - combination of parameters and functions of the variables (note these functions need not be linear). It is designed - for estimating categorical outcomes. - """ - - def __init__( - # pylint: disable=too-many-arguments - self, - treatment: str, - treatment_value: float, - control_value: float, - adjustment_set: set, - outcome: str, - df: pd.DataFrame = None, - effect_modifiers: dict[str:Any] = None, - formula: str = None, - query: str = "", - ): - super().__init__( - treatment=treatment, - treatment_value=treatment_value, - control_value=control_value, - adjustment_set=adjustment_set, - outcome=outcome, - df=df, - effect_modifiers=effect_modifiers, - query=query, - ) - - self.model = None - - if formula is not None: - self.formula = formula - else: - terms = [treatment] + sorted(list(adjustment_set)) + sorted(list(self.effect_modifiers)) - self.formula = f"{outcome} ~ {'+'.join(((terms)))}" - - def add_modelling_assumptions(self): - """ - Add modelling assumptions to the estimator. This is a list of strings which list the modelling assumptions that - must hold if the resulting causal inference is to be considered valid. - """ - self.modelling_assumptions.append( - "The variables in the data must fit a shape which can be expressed as a linear" - "combination of parameters and functions of variables. Note that these functions" - "do not need to be linear." - ) - self.modelling_assumptions.append("The outcome must be binary.") - self.modelling_assumptions.append("Independently and identically distributed errors.") - - def _run_logistic_regression(self, data) -> RegressionResultsWrapper: - """Run logistic regression of the treatment and adjustment set against the outcome and return the model. - - :return: The model after fitting to data. - """ - model = smf.logit(formula=self.formula, data=data).fit(disp=0) - self.model = model - return model - - def estimate(self, data: pd.DataFrame, adjustment_config: dict = None) -> RegressionResultsWrapper: - """add terms to the dataframe and estimate the outcome from the data - :param data: A pandas dataframe containing execution data from the system-under-test. - :param adjustment_config: Dictionary containing the adjustment configuration of the adjustment set - """ - if adjustment_config is None: - adjustment_config = {} - if set(self.adjustment_set) != set(adjustment_config): - raise ValueError( - f"Invalid adjustment configuration {adjustment_config}. Must specify values for {self.adjustment_set}" - ) - - model = self._run_logistic_regression(data) - - x = pd.DataFrame(columns=self.df.columns) - x["Intercept"] = 1 # self.intercept - x[self.treatment] = [self.treatment_value, self.control_value] - for k, v in adjustment_config.items(): - x[k] = v - for k, v in self.effect_modifiers.items(): - x[k] = v - x = dmatrix(self.formula.split("~")[1], x, return_type="dataframe") - for col in x: - if str(x.dtypes[col]) == "object": - x = pd.get_dummies(x, columns=[col], drop_first=True) - # x = x[model.params.index] - return model.predict(x) - - def estimate_control_treatment( - self, adjustment_config: dict = None, bootstrap_size: int = 100 - ) -> tuple[pd.Series, pd.Series]: - """Estimate the outcomes under control and treatment. - - :return: The estimated control and treatment values and their confidence - intervals in the form ((ci_low, control, ci_high), (ci_low, treatment, ci_high)). - """ - if adjustment_config is None: - adjustment_config = {} - y = self.estimate(self.df, adjustment_config=adjustment_config) - - try: - bootstrap_samples = [ - self.estimate(self.df.sample(len(self.df), replace=True), adjustment_config=adjustment_config) - for _ in range(bootstrap_size) - ] - control, treatment = zip(*[(x.iloc[1], x.iloc[0]) for x in bootstrap_samples]) - except PerfectSeparationError: - logger.warning( - "Perfect separation detected, results not available. Cannot calculate confidence intervals for such " - "a small dataset." - ) - return (y.iloc[1], None), (y.iloc[0], None) - except np.linalg.LinAlgError: - logger.warning("Singular matrix detected. Confidence intervals not available. Try with a larger data set") - return (y.iloc[1], None), (y.iloc[0], None) - - # Delta method confidence intervals from - # https://stackoverflow.com/questions/47414842/confidence-interval-of-probability-prediction-from-logistic-regression-statsmode - # cov = model.cov_params() - # gradient = (y * (1 - y) * x.T).T # matrix of gradients for each observation - # std_errors = np.array([np.sqrt(np.dot(np.dot(g, cov), g)) for g in gradient.to_numpy()]) - # c = 1.96 # multiplier for confidence interval - # upper = np.maximum(0, np.minimum(1, y + std_errors * c)) - # lower = np.maximum(0, np.minimum(1, y - std_errors * c)) - - return (y.iloc[1], np.array(control)), (y.iloc[0], np.array(treatment)) - - def estimate_ate(self, adjustment_config: dict = None, bootstrap_size: int = 100) -> float: - """Estimate the ate effect of the treatment on the outcome. That is, the change in outcome caused - by changing the treatment variable from the control value to the treatment value. Here, we actually - calculate the expected outcomes under control and treatment and take one away from the other. This - allows for custom terms to be put in such as squares, inverses, products, etc. - - :return: The estimated average treatment effect and 95% confidence intervals - """ - if adjustment_config is None: - adjustment_config = {} - (control_outcome, control_bootstraps), ( - treatment_outcome, - treatment_bootstraps, - ) = self.estimate_control_treatment(bootstrap_size=bootstrap_size, adjustment_config=adjustment_config) - estimate = treatment_outcome - control_outcome - - if control_bootstraps is None or treatment_bootstraps is None: - return estimate, (None, None) - - bootstraps = sorted(list(treatment_bootstraps - control_bootstraps)) - bound = int((bootstrap_size * self.alpha) / 2) - ci_low = bootstraps[bound] - ci_high = bootstraps[bootstrap_size - bound] - - logger.info( - f"Changing {self.treatment} from {self.control_value} to {self.treatment_value} gives an estimated " - f"ATE of {ci_low} < {estimate} < {ci_high}" - ) - assert ci_low < estimate < ci_high, f"Expecting {ci_low} < {estimate} < {ci_high}" - - return estimate, (ci_low, ci_high) - - def estimate_risk_ratio(self, adjustment_config: dict = None, bootstrap_size: int = 100) -> float: - """Estimate the ate effect of the treatment on the outcome. That is, the change in outcome caused - by changing the treatment variable from the control value to the treatment value. Here, we actually - calculate the expected outcomes under control and treatment and divide one by the other. This - allows for custom terms to be put in such as squares, inverses, products, etc. - - :return: The estimated risk ratio and 95% confidence intervals. - """ - if adjustment_config is None: - adjustment_config = {} - (control_outcome, control_bootstraps), ( - treatment_outcome, - treatment_bootstraps, - ) = self.estimate_control_treatment(bootstrap_size=bootstrap_size, adjustment_config=adjustment_config) - estimate = treatment_outcome / control_outcome - - if control_bootstraps is None or treatment_bootstraps is None: - return estimate, (None, None) - - bootstraps = sorted(list(treatment_bootstraps / control_bootstraps)) - bound = ceil((bootstrap_size * self.alpha) / 2) - ci_low = bootstraps[bound] - ci_high = bootstraps[bootstrap_size - bound] - - logger.info( - f"Changing {self.treatment} from {self.control_value} to {self.treatment_value} gives an estimated " - f"risk ratio of {ci_low} < {estimate} < {ci_high}" - ) - assert ci_low < estimate < ci_high, f"Expecting {ci_low} < {estimate} < {ci_high}" - - return estimate, (ci_low, ci_high) - - def estimate_unit_odds_ratio(self) -> float: - """Estimate the odds ratio of increasing the treatment by one. In logistic regression, this corresponds to the - coefficient of the treatment of interest. - - :return: The odds ratio. Confidence intervals are not yet supported. - """ - model = self._run_logistic_regression(self.df) - return np.exp(model.params[self.treatment]) - - -class LinearRegressionEstimator(Estimator): - """A Linear Regression Estimator is a parametric estimator which restricts the variables in the data to a linear - combination of parameters and functions of the variables (note these functions need not be linear). - """ - - def __init__( - # pylint: disable=too-many-arguments - self, - treatment: str, - treatment_value: float, - control_value: float, - adjustment_set: set, - outcome: str, - df: pd.DataFrame = None, - effect_modifiers: dict[Variable:Any] = None, - formula: str = None, - alpha: float = 0.05, - query: str = "", - ): - super().__init__( - treatment, - treatment_value, - control_value, - adjustment_set, - outcome, - df, - effect_modifiers, - alpha=alpha, - query=query, - ) - - self.model = None - if effect_modifiers is None: - effect_modifiers = [] - - if formula is not None: - self.formula = formula - else: - terms = [treatment] + sorted(list(adjustment_set)) + sorted(list(effect_modifiers)) - self.formula = f"{outcome} ~ {'+'.join(terms)}" - - for term in self.effect_modifiers: - self.adjustment_set.add(term) - - def add_modelling_assumptions(self): - """ - Add modelling assumptions to the estimator. This is a list of strings which list the modelling assumptions that - must hold if the resulting causal inference is to be considered valid. - """ - self.modelling_assumptions.append( - "The variables in the data must fit a shape which can be expressed as a linear" - "combination of parameters and functions of variables. Note that these functions" - "do not need to be linear." - ) - - def estimate_coefficient(self) -> tuple[pd.Series, list[pd.Series, pd.Series]]: - """Estimate the unit average treatment effect of the treatment on the outcome. That is, the change in outcome - caused by a unit change in treatment. - - :return: The unit average treatment effect and the 95% Wald confidence intervals. - """ - model = self._run_linear_regression() - newline = "\n" - patsy_md = ModelDesc.from_formula(self.treatment) - - if any( - ( - self.df.dtypes[factor.name()] == "object" - for factor in patsy_md.rhs_termlist[1].factors - # We want to remove this long term as it prevents us from discovering categoricals within I(...) blocks - if factor.name() in self.df.dtypes - ) - ): - design_info = dmatrix(self.formula.split("~")[1], self.df).design_info - treatment = design_info.column_names[design_info.term_name_slices[self.treatment]] - else: - treatment = [self.treatment] - assert set(treatment).issubset( - model.params.index.tolist() - ), f"{treatment} not in\n{' ' + str(model.params.index).replace(newline, newline + ' ')}" - unit_effect = model.params[treatment] # Unit effect is the coefficient of the treatment - [ci_low, ci_high] = self._get_confidence_intervals(model, treatment) - return unit_effect, [ci_low, ci_high] - - def estimate_ate(self) -> tuple[pd.Series, list[pd.Series, pd.Series]]: - """Estimate the average treatment effect of the treatment on the outcome. That is, the change in outcome caused - by changing the treatment variable from the control value to the treatment value. - - :return: The average treatment effect and the 95% Wald confidence intervals. - """ - model = self._run_linear_regression() - - # Create an empty individual for the control and treated - individuals = pd.DataFrame(1, index=["control", "treated"], columns=model.params.index) - - # For Pandas version > 2, we need to explicitly state that the dataframe takes floating-point values - individuals = individuals.astype(float) - - # It is ABSOLUTELY CRITICAL that these go last, otherwise we can't index - # the effect with "ate = t_test_results.effect[0]" - individuals.loc["control", [self.treatment]] = self.control_value - individuals.loc["treated", [self.treatment]] = self.treatment_value - - # Perform a t-test to compare the predicted outcome of the control and treated individual (ATE) - t_test_results = model.t_test(individuals.loc["treated"] - individuals.loc["control"]) - ate = pd.Series(t_test_results.effect[0]) - confidence_intervals = list(t_test_results.conf_int(alpha=self.alpha).flatten()) - confidence_intervals = [pd.Series(interval) for interval in confidence_intervals] - return ate, confidence_intervals - - def estimate_control_treatment(self, adjustment_config: dict = None) -> tuple[pd.Series, pd.Series]: - """Estimate the outcomes under control and treatment. - - :return: The estimated outcome under control and treatment in the form - (control_outcome, treatment_outcome). - """ - if adjustment_config is None: - adjustment_config = {} - model = self._run_linear_regression() - - x = pd.DataFrame(columns=self.df.columns) - x[self.treatment] = [self.treatment_value, self.control_value] - x["Intercept"] = 1 # self.intercept - for k, v in adjustment_config.items(): - x[k] = v - for k, v in self.effect_modifiers.items(): - x[k] = v - x = dmatrix(self.formula.split("~")[1], x, return_type="dataframe") - for col in x: - if str(x.dtypes[col]) == "object": - x = pd.get_dummies(x, columns=[col], drop_first=True) - x = x[model.params.index] - y = model.get_prediction(x).summary_frame() - - return y.iloc[1], y.iloc[0] - - def estimate_risk_ratio(self, adjustment_config: dict = None) -> tuple[pd.Series, list[pd.Series, pd.Series]]: - """Estimate the risk_ratio effect of the treatment on the outcome. That is, the change in outcome caused - by changing the treatment variable from the control value to the treatment value. - - :return: The average treatment effect and the 95% Wald confidence intervals. - """ - if adjustment_config is None: - adjustment_config = {} - control_outcome, treatment_outcome = self.estimate_control_treatment(adjustment_config=adjustment_config) - ci_low = pd.Series(treatment_outcome["mean_ci_lower"] / control_outcome["mean_ci_upper"]) - ci_high = pd.Series(treatment_outcome["mean_ci_upper"] / control_outcome["mean_ci_lower"]) - return pd.Series(treatment_outcome["mean"] / control_outcome["mean"]), [ci_low, ci_high] - - def estimate_ate_calculated(self, adjustment_config: dict = None) -> tuple[pd.Series, list[pd.Series, pd.Series]]: - """Estimate the ate effect of the treatment on the outcome. That is, the change in outcome caused - by changing the treatment variable from the control value to the treatment value. Here, we actually - calculate the expected outcomes under control and treatment and divide one by the other. This - allows for custom terms to be put in such as squares, inverses, products, etc. - - :return: The average treatment effect and the 95% Wald confidence intervals. - """ - if adjustment_config is None: - adjustment_config = {} - control_outcome, treatment_outcome = self.estimate_control_treatment(adjustment_config=adjustment_config) - ci_low = pd.Series(treatment_outcome["mean_ci_lower"] - control_outcome["mean_ci_upper"]) - ci_high = pd.Series(treatment_outcome["mean_ci_upper"] - control_outcome["mean_ci_lower"]) - return pd.Series(treatment_outcome["mean"] - control_outcome["mean"]), [ci_low, ci_high] - - def _run_linear_regression(self) -> RegressionResultsWrapper: - """Run linear regression of the treatment and adjustment set against the outcome and return the model. - - :return: The model after fitting to data. - """ - model = smf.ols(formula=self.formula, data=self.df).fit() - self.model = model - return model - - def _get_confidence_intervals(self, model, treatment): - confidence_intervals = model.conf_int(alpha=self.alpha, cols=None) - ci_low, ci_high = ( - pd.Series(confidence_intervals[0].loc[treatment]), - pd.Series(confidence_intervals[1].loc[treatment]), - ) - return [ci_low, ci_high] - - -class CubicSplineRegressionEstimator(LinearRegressionEstimator): - """A Cubic Spline Regression Estimator is a parametric estimator which restricts the variables in the data to a - combination of parameters and basis functions of the variables. - """ - - def __init__( - # pylint: disable=too-many-arguments - self, - treatment: str, - treatment_value: float, - control_value: float, - adjustment_set: set, - outcome: str, - basis: int, - df: pd.DataFrame = None, - effect_modifiers: dict[Variable:Any] = None, - formula: str = None, - alpha: float = 0.05, - expected_relationship=None, - ): - super().__init__( - treatment, treatment_value, control_value, adjustment_set, outcome, df, effect_modifiers, formula, alpha - ) - - self.expected_relationship = expected_relationship - - if effect_modifiers is None: - effect_modifiers = [] - - if formula is None: - terms = [treatment] + sorted(list(adjustment_set)) + sorted(list(effect_modifiers)) - self.formula = f"{outcome} ~ cr({'+'.join(terms)}, df={basis})" - - def estimate_ate_calculated(self, adjustment_config: dict = None) -> pd.Series: - model = self._run_linear_regression() - - x = {"Intercept": 1, self.treatment: self.treatment_value} - if adjustment_config is not None: - for k, v in adjustment_config.items(): - x[k] = v - if self.effect_modifiers is not None: - for k, v in self.effect_modifiers.items(): - x[k] = v - - treatment = model.predict(x).iloc[0] - - x[self.treatment] = self.control_value - control = model.predict(x).iloc[0] - - return pd.Series(treatment - control) - - -class InstrumentalVariableEstimator(Estimator): - """ - Carry out estimation using instrumental variable adjustment rather than conventional adjustment. This means we do - not need to observe all confounders in order to adjust for them. A key assumption here is linearity. - """ - - def __init__( - # pylint: disable=too-many-arguments - self, - treatment: str, - treatment_value: float, - control_value: float, - adjustment_set: set, - outcome: str, - instrument: str, - df: pd.DataFrame = None, - intercept: int = 1, - effect_modifiers: dict = None, # Not used (yet?). Needed for compatibility - alpha: float = 0.05, - query: str = "", - ): - super().__init__( - treatment=treatment, - treatment_value=treatment_value, - control_value=control_value, - adjustment_set=adjustment_set, - outcome=outcome, - df=df, - effect_modifiers=None, - alpha=alpha, - query=query, - ) - self.intercept = intercept - self.model = None - self.instrument = instrument - - def add_modelling_assumptions(self): - """ - Add modelling assumptions to the estimator. This is a list of strings which list the modelling assumptions that - must hold if the resulting causal inference is to be considered valid. - """ - self.modelling_assumptions.append( - """The instrument and the treatment, and the treatment and the outcome must be - related linearly in the form Y = aX + b.""" - ) - self.modelling_assumptions.append( - """The three IV conditions must hold - (i) Instrument is associated with treatment - (ii) Instrument does not affect outcome except through its potential effect on treatment - (iii) Instrument and outcome do not share causes - """ - ) - - def estimate_iv_coefficient(self, df) -> float: - """ - Estimate the linear regression coefficient of the treatment on the - outcome. - """ - # Estimate the total effect of instrument I on outcome Y = abI + c1 - ab = sm.OLS(df[self.outcome], df[[self.instrument]]).fit().params[self.instrument] - - # Estimate the direct effect of instrument I on treatment X = aI + c1 - a = sm.OLS(df[self.treatment], df[[self.instrument]]).fit().params[self.instrument] - - # Estimate the coefficient of I on X by cancelling - return ab / a - - def estimate_coefficient(self, bootstrap_size=100) -> tuple[pd.Series, list[pd.Series, pd.Series]]: - """ - Estimate the unit ate (i.e. coefficient) of the treatment on the - outcome. - """ - bootstraps = sorted( - [self.estimate_iv_coefficient(self.df.sample(len(self.df), replace=True)) for _ in range(bootstrap_size)] - ) - bound = ceil((bootstrap_size * self.alpha) / 2) - ci_low = pd.Series(bootstraps[bound]) - ci_high = pd.Series(bootstraps[bootstrap_size - bound]) - - return pd.Series(self.estimate_iv_coefficient(self.df)), [ci_low, ci_high] - - -class IPCWEstimator(Estimator): - """ - Class to perform inverse probability of censoring weighting (IPCW) estimation - for sequences of treatments over time-varying data. - """ - - # pylint: disable=too-many-arguments - # pylint: disable=too-many-instance-attributes - def __init__( - self, - df: pd.DataFrame, - timesteps_per_intervention: int, - control_strategy: TreatmentSequence, - treatment_strategy: TreatmentSequence, - outcome: str, - fault_column: str, - fit_bl_switch_formula: str, - fit_bltd_switch_formula: str, - eligibility=None, - alpha: float = 0.05, - ): - super().__init__( - [c.variable for c in treatment_strategy.capabilities], - [c.value for c in treatment_strategy.capabilities], - [c.value for c in control_strategy.capabilities], - None, - outcome, - df, - None, - alpha=alpha, - query="", - ) - self.timesteps_per_intervention = timesteps_per_intervention - self.control_strategy = control_strategy - self.treatment_strategy = treatment_strategy - self.outcome = outcome - self.fault_column = fault_column - self.timesteps_per_intervention = timesteps_per_intervention - self.fit_bl_switch_formula = fit_bl_switch_formula - self.fit_bltd_switch_formula = fit_bltd_switch_formula - self.eligibility = eligibility - self.df = df - self.preprocess_data() - - def add_modelling_assumptions(self): - self.modelling_assumptions.append("The variables in the data vary over time.") - - def setup_xo_t_do(self, strategy_assigned: list, strategy_followed: list, eligible: pd.Series): - """ - Return a binary sequence with each bit representing whether the current - index is the time point at which the individual diverted from the - assigned treatment strategy (and thus should be censored). - - :param strategy_assigned - the assigned treatment strategy - :param strategy_followed - the strategy followed by the individual - :param eligible - binary sequence represnting the eligibility of the individual at each time step - """ - strategy_assigned = [1] + strategy_assigned + [1] - strategy_followed = [1] + strategy_followed + [1] - - mask = ( - pd.Series(strategy_assigned, index=eligible.index) != pd.Series(strategy_followed, index=eligible.index) - ).astype("boolean") - mask = mask | ~eligible - mask.reset_index(inplace=True, drop=True) - false = mask.loc[mask] - if false.empty: - return np.zeros(len(mask)) - mask = (mask * 1).tolist() - cutoff = false.index[0] + 1 - return mask[:cutoff] + ([None] * (len(mask) - cutoff)) - - def setup_fault_t_do(self, individual: pd.DataFrame): - """ - Return a binary sequence with each bit representing whether the current - index is the time point at which the event of interest (i.e. a fault) - occurred. - """ - fault = individual[~individual[self.fault_column]] - fault_t_do = pd.Series(np.zeros(len(individual)), index=individual.index) - - if not fault.empty: - fault_time = individual["time"].loc[fault.index[0]] - # Ceiling to nearest observation point - fault_time = ceil(fault_time / self.timesteps_per_intervention) * self.timesteps_per_intervention - # Set the correct observation point to be the fault time of doing (fault_t_do) - observations = individual.loc[ - (individual["time"] % self.timesteps_per_intervention == 0) & (individual["time"] < fault_time) - ] - if not observations.empty: - fault_t_do.loc[observations.index[0]] = 1 - assert sum(fault_t_do) <= 1, f"Multiple fault times for\n{individual}" - - return pd.DataFrame({"fault_t_do": fault_t_do}) - - def setup_fault_time(self, individual: pd.DataFrame, perturbation: float = -0.001): - """ - Return the time at which the event of interest (i.e. a fault) occurred. - """ - fault = individual[~individual[self.fault_column]] - fault_time = ( - individual["time"].loc[fault.index[0]] - if not fault.empty - else (individual["time"].max() + self.timesteps_per_intervention) - ) - return pd.DataFrame({"fault_time": np.repeat(fault_time + perturbation, len(individual))}) - - def preprocess_data(self): - """ - Set up the treatment-specific columns in the data that are needed to estimate the hazard ratio. - """ - self.df["trtrand"] = None # treatment/control arm - self.df["xo_t_do"] = None # did the individual deviate from the treatment of interest here? - self.df["eligible"] = self.df.eval(self.eligibility) if self.eligibility is not None else True - - # when did a fault occur? - self.df["fault_time"] = self.df.groupby("id")[[self.fault_column, "time"]].apply(self.setup_fault_time).values - self.df["fault_t_do"] = ( - self.df.groupby("id")[["id", "time", self.fault_column]].apply(self.setup_fault_t_do).values - ) - assert not pd.isnull(self.df["fault_time"]).any() - - living_runs = self.df.query("fault_time > 0").loc[ - (self.df["time"] % self.timesteps_per_intervention == 0) - & (self.df["time"] <= self.control_strategy.total_time()) - ] - - individuals = [] - new_id = 0 - logging.debug(" Preprocessing groups") - for _, individual in living_runs.groupby("id"): - assert sum(individual["fault_t_do"]) <= 1, ( - f"Error initialising fault_t_do for individual\n" - f"{individual[['id', 'time', 'fault_time', 'fault_t_do']]}\n" - "with fault at {individual.fault_time.iloc[0]}" - ) - - strategy_followed = [ - Capability( - c.variable, - individual.loc[individual["time"] == c.start_time, c.variable].values[0], - c.start_time, - c.end_time, - ) - for c in self.treatment_strategy.capabilities - ] - - # Control flow: - # Individuals that start off in both arms, need cloning (hence incrementing the ID within the if statement) - # Individuals that don't start off in either arm are left out - for inx, strategy_assigned in [(0, self.control_strategy), (1, self.treatment_strategy)]: - if strategy_assigned.capabilities[0] == strategy_followed[0] and individual.eligible.iloc[0]: - individual["id"] = new_id - new_id += 1 - individual["trtrand"] = inx - individual["xo_t_do"] = self.setup_xo_t_do( - strategy_assigned.capabilities, strategy_followed, individual["eligible"] - ) - individuals.append(individual.loc[individual["time"] <= individual["fault_time"]].copy()) - if len(individuals) == 0: - raise ValueError("No individuals followed either strategy.") - - self.df = pd.concat(individuals) - - def estimate_hazard_ratio(self): - """ - Estimate the hazard ratio. - """ - - if self.df["fault_t_do"].sum() == 0: - raise ValueError("No recorded faults") - - preprocessed_data = self.df.loc[self.df["xo_t_do"] == 0].copy() - - # Use logistic regression to predict switching given baseline covariates - fit_bl_switch = smf.logit(self.fit_bl_switch_formula, data=self.df).fit() - - preprocessed_data["pxo1"] = fit_bl_switch.predict(preprocessed_data) - - # Use logistic regression to predict switching given baseline and time-updated covariates (model S12) - fit_bltd_switch = smf.logit( - self.fit_bltd_switch_formula, - data=self.df, - ).fit() - - preprocessed_data["pxo2"] = fit_bltd_switch.predict(preprocessed_data) - - # IPCW step 3: For each individual at each time, compute the inverse probability of remaining uncensored - # Estimate the probabilities of remaining ‘un-switched’ and hence the weights - - preprocessed_data["num"] = 1 - preprocessed_data["pxo1"] - preprocessed_data["denom"] = 1 - preprocessed_data["pxo2"] - preprocessed_data[["num", "denom"]] = ( - preprocessed_data.sort_values(["id", "time"]).groupby("id")[["num", "denom"]].cumprod() - ) - - assert ( - not preprocessed_data["num"].isnull().any() - ), f"{len(preprocessed_data['num'].isnull())} null numerator values" - assert ( - not preprocessed_data["denom"].isnull().any() - ), f"{len(preprocessed_data['denom'].isnull())} null denom values" - - preprocessed_data["weight"] = 1 / preprocessed_data["denom"] - preprocessed_data["sweight"] = preprocessed_data["num"] / preprocessed_data["denom"] - - preprocessed_data["tin"] = preprocessed_data["time"] - preprocessed_data["tout"] = pd.concat( - [(preprocessed_data["time"] + self.timesteps_per_intervention), preprocessed_data["fault_time"]], - axis=1, - ).min(axis=1) - - assert (preprocessed_data["tin"] <= preprocessed_data["tout"]).all(), ( - f"Left before joining\n" f"{preprocessed_data.loc[preprocessed_data['tin'] >= preprocessed_data['tout']]}" - ) - - # IPCW step 4: Use these weights in a weighted analysis of the outcome model - # Estimate the KM graph and IPCW hazard ratio using Cox regression. - cox_ph = CoxPHFitter(alpha=self.alpha) - cox_ph.fit( - df=preprocessed_data, - duration_col="tout", - event_col="fault_t_do", - weights_col="weight", - cluster_col="id", - robust=True, - formula="trtrand", - entry_col="tin", - ) - - ci_low, ci_high = [np.exp(cox_ph.confidence_intervals_)[col] for col in cox_ph.confidence_intervals_.columns] - - return (cox_ph.hazard_ratios_, (ci_low, ci_high)) diff --git a/dafni/main_dafni.py b/dafni/main_dafni.py index e6b142f3..5fa66b0a 100644 --- a/dafni/main_dafni.py +++ b/dafni/main_dafni.py @@ -11,7 +11,7 @@ from causal_testing.specification.scenario import Scenario from causal_testing.specification.variable import Input, Output from causal_testing.testing.causal_test_outcome import Positive, Negative, NoEffect, SomeEffect -from causal_testing.testing.estimators import LinearRegressionEstimator +from causal_testing.estimation.linear_regression_estimator import LinearRegressionEstimator from causal_testing.json_front.json_class import JsonUtility @@ -30,35 +30,38 @@ def get_args(test_args=None) -> argparse.Namespace: """ parser = argparse.ArgumentParser(description="A script for running the CTF on DAFNI.") - parser.add_argument( - "--data_path", required=True, - help="Path to the input runtime data (.csv)", nargs="+") + parser.add_argument("--data_path", required=True, help="Path to the input runtime data (.csv)", nargs="+") - parser.add_argument('--tests_path', required=True, - help='Input configuration file path ' - 'containing the causal tests (.json)') + parser.add_argument( + "--tests_path", required=True, help="Input configuration file path " "containing the causal tests (.json)" + ) - parser.add_argument('--variables_path', required=True, - help='Input configuration file path ' - 'containing the predefined variables (.json)') + parser.add_argument( + "--variables_path", + required=True, + help="Input configuration file path " "containing the predefined variables (.json)", + ) - parser.add_argument("--dag_path", required=True, - help="Input configuration file path containing a valid DAG (.dot). " - "Note: this must be supplied if the --tests argument isn't provided.") + parser.add_argument( + "--dag_path", + required=True, + help="Input configuration file path containing a valid DAG (.dot). " + "Note: this must be supplied if the --tests argument isn't provided.", + ) - parser.add_argument('--output_path', required=False, help='Path to the output directory.') + parser.add_argument("--output_path", required=False, help="Path to the output directory.") parser.add_argument( - "-f", - default=False, - help="(Optional) Failure flag to step the framework from running if a test has failed.") + "-f", default=False, help="(Optional) Failure flag to step the framework from running if a test has failed." + ) parser.add_argument( "-w", default=False, help="(Optional) Specify to overwrite any existing output files. " - "This can lead to the loss of existing outputs if not " - "careful") + "This can lead to the loss of existing outputs if not " + "careful", + ) args = parser.parse_args(test_args) @@ -100,7 +103,7 @@ def read_variables(variables_path: Path) -> FileNotFoundError | dict: raise FileNotFoundError - with variables_path.open('r') as file: + with variables_path.open("r") as file: inputs = json.load(file) @@ -118,13 +121,17 @@ def validate_variables(data_dict: dict) -> tuple: variables = data_dict["variables"] - inputs = [Input(variable["name"], eval(variable["datatype"])) - for variable in variables if - variable["typestring"] == "Input"] + inputs = [ + Input(variable["name"], eval(variable["datatype"])) + for variable in variables + if variable["typestring"] == "Input" + ] - outputs = [Output(variable["name"], eval(variable["datatype"])) - for variable in variables if - variable["typestring"] == "Output"] + outputs = [ + Output(variable["name"], eval(variable["datatype"])) + for variable in variables + if variable["typestring"] == "Output" + ] constraints = set() @@ -172,7 +179,8 @@ def main(): "Positive": Positive(), "Negative": Negative(), "NoEffect": NoEffect(), - "SomeEffect": SomeEffect()} + "SomeEffect": SomeEffect(), + } # Step 4: Call the JSONUtility class to perform the causal tests @@ -185,9 +193,9 @@ def main(): json_utility.setup(scenario=modelling_scenario, data=data_frame) # Step 7: Run the causal tests - test_outcomes = json_utility.run_json_tests(effects=expected_outcome_effects, - mutates={}, estimators=estimators, - f_flag=args.f) + test_outcomes = json_utility.run_json_tests( + effects=expected_outcome_effects, mutates={}, estimators=estimators, f_flag=args.f + ) # Step 8: Update, print and save the final outputs @@ -201,7 +209,6 @@ def main(): test["result"].pop("control_value") - with open(args.output_path, "w", encoding="utf-8") as f: print(json.dumps(test_outcomes, indent=2), file=f) @@ -214,8 +221,7 @@ def main(): else: - print(f"Execution successful. " - f"Output file saved at {Path(args.output_path).parent.resolve()}") + print(f"Execution successful. " f"Output file saved at {Path(args.output_path).parent.resolve()}") if __name__ == "__main__": diff --git a/docs/source/usage.rst b/docs/source/usage.rst index 7f1e701d..f2f95bf8 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -87,7 +87,7 @@ the `documentation None: + Z = np.linspace(0, 10) + X = 2 * Z + Y = 2 * X + cls.df = pd.DataFrame({"Z": Z, "X": X, "Y": Y}) + + def test_estimate_coefficient(self): + """ + Test we get the correct coefficient. + """ + iv_estimator = InstrumentalVariableEstimator( + df=self.df, + treatment="X", + treatment_value=None, + control_value=None, + adjustment_set=set(), + outcome="Y", + instrument="Z", + ) + self.assertEqual(iv_estimator.estimate_coefficient(self.df), 2) + + def test_estimate_coefficient(self): + """ + Test we get the correct coefficient. + """ + iv_estimator = InstrumentalVariableEstimator( + df=self.df, + treatment="X", + treatment_value=None, + control_value=None, + adjustment_set=set(), + outcome="Y", + instrument="Z", + ) + coefficient, [low, high] = iv_estimator.estimate_coefficient() + self.assertEqual(coefficient[0], 2) diff --git a/tests/estimation_tests/test_ipcw_estimator.py b/tests/estimation_tests/test_ipcw_estimator.py new file mode 100644 index 00000000..d4383d6e --- /dev/null +++ b/tests/estimation_tests/test_ipcw_estimator.py @@ -0,0 +1,82 @@ +import unittest +import pandas as pd +import numpy as np +import matplotlib.pyplot as plt +from causal_testing.specification.variable import Input +from causal_testing.utils.validation import CausalValidator +from causal_testing.specification.capabilities import TreatmentSequence + +from causal_testing.estimation.ipcw_estimator import IPCWEstimator + + +class TestIPCWEstimator(unittest.TestCase): + """ + Test the IPCW estimator class + """ + + def test_estimate_hazard_ratio(self): + timesteps_per_intervention = 1 + control_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 0), ("t", 0), ("t", 0)]) + treatment_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) + outcome = "outcome" + fit_bl_switch_formula = "xo_t_do ~ time" + df = pd.read_csv("tests/resources/data/temporal_data.csv") + df["ok"] = df["outcome"] == 1 + estimation_model = IPCWEstimator( + df, + timesteps_per_intervention, + control_strategy, + treatment_strategy, + outcome, + "ok", + fit_bl_switch_formula=fit_bl_switch_formula, + fit_bltd_switch_formula=fit_bl_switch_formula, + eligibility=None, + ) + estimate, intervals = estimation_model.estimate_hazard_ratio() + self.assertEqual(estimate["trtrand"], 1.0) + + def test_invalid_treatment_strategies(self): + timesteps_per_intervention = 1 + control_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 0), ("t", 0), ("t", 0)]) + treatment_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) + outcome = "outcome" + fit_bl_switch_formula = "xo_t_do ~ time" + df = pd.read_csv("tests/resources/data/temporal_data.csv") + df["t"] = (["1", "0"] * len(df))[: len(df)] + df["ok"] = df["outcome"] == 1 + with self.assertRaises(ValueError): + estimation_model = IPCWEstimator( + df, + timesteps_per_intervention, + control_strategy, + treatment_strategy, + outcome, + "ok", + fit_bl_switch_formula=fit_bl_switch_formula, + fit_bltd_switch_formula=fit_bl_switch_formula, + eligibility=None, + ) + + def test_invalid_fault_t_do(self): + timesteps_per_intervention = 1 + control_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 0), ("t", 0), ("t", 0)]) + treatment_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) + outcome = "outcome" + fit_bl_switch_formula = "xo_t_do ~ time" + df = pd.read_csv("tests/resources/data/temporal_data.csv") + df["ok"] = df["outcome"] == 1 + estimation_model = IPCWEstimator( + df, + timesteps_per_intervention, + control_strategy, + treatment_strategy, + outcome, + "ok", + fit_bl_switch_formula=fit_bl_switch_formula, + fit_bltd_switch_formula=fit_bl_switch_formula, + eligibility=None, + ) + estimation_model.df["fault_t_do"] = 0 + with self.assertRaises(ValueError): + estimate, intervals = estimation_model.estimate_hazard_ratio() diff --git a/tests/estimation_tests/test_linear_regression_estimator.py b/tests/estimation_tests/test_linear_regression_estimator.py new file mode 100644 index 00000000..c3749fdb --- /dev/null +++ b/tests/estimation_tests/test_linear_regression_estimator.py @@ -0,0 +1,351 @@ +import unittest +import pandas as pd +import numpy as np +import matplotlib.pyplot as plt +from causal_testing.specification.variable import Input +from causal_testing.utils.validation import CausalValidator +from causal_testing.specification.capabilities import TreatmentSequence + +from causal_testing.estimation.linear_regression_estimator import LinearRegressionEstimator +from causal_testing.estimation.genetic_programming_regression_fitter import reciprocal + + +def load_nhefs_df(): + """Get the NHEFS data from chapter 12 and put into a dataframe. NHEFS = National Health and Nutrition Examination + Survey Data I Epidemiological Follow-up Study.""" + + nhefs_df = pd.read_csv("tests/resources/data/nhefs.csv") + nhefs_df["one"] = 1 + nhefs_df["zero"] = 0 + edu_dummies = pd.get_dummies(nhefs_df.education, prefix="edu") + exercise_dummies = pd.get_dummies(nhefs_df.exercise, prefix="exercise") + active_dummies = pd.get_dummies(nhefs_df.active, prefix="active") + nhefs_df = pd.concat([nhefs_df, edu_dummies, exercise_dummies, active_dummies], axis=1) + return nhefs_df + + +def load_chapter_11_df(): + """Get the data from chapter 11 and put into a dataframe.""" + + treatments, outcomes = zip( + *( + (3, 21), + (11, 54), + (17, 33), + (23, 101), + (29, 85), + (37, 65), + (41, 157), + (53, 120), + (67, 111), + (79, 200), + (83, 140), + (97, 220), + (60, 230), + (71, 217), + (15, 11), + (45, 190), + ) + ) + chapter_11_df = pd.DataFrame({"treatments": treatments, "outcomes": outcomes, "constant": np.ones(16)}) + return chapter_11_df + + +class TestLinearRegressionEstimator(unittest.TestCase): + """Test the linear regression estimator against the programming exercises in Section 2 of Hernán and Robins [1]. + + Reference: Hernán MA, Robins JM (2020). Causal Inference: What If. Boca Raton: Chapman & Hall/CRC. + Link: https://www.hsph.harvard.edu/miguel-hernan/causal-inference-book/ + """ + + @classmethod + def setUpClass(cls) -> None: + cls.nhefs_df = load_nhefs_df() + cls.chapter_11_df = load_chapter_11_df() + cls.scarf_df = pd.read_csv("tests/resources/data/scarf_data.csv") + + def test_query(self): + df = self.nhefs_df + linear_regression_estimator = LinearRegressionEstimator( + "treatments", None, None, set(), "outcomes", df, query="sex==1" + ) + self.assertTrue(linear_regression_estimator.df.sex.all()) + + def test_linear_regression_categorical_ate(self): + df = self.scarf_df.copy() + logistic_regression_estimator = LinearRegressionEstimator("color", None, None, set(), "completed", df) + ate, confidence = logistic_regression_estimator.estimate_coefficient() + self.assertTrue(all([ci_low < 0 < ci_high for ci_low, ci_high in zip(confidence[0], confidence[1])])) + + def test_program_11_2(self): + """Test whether our linear regression implementation produces the same results as program 11.2 (p. 141).""" + df = self.chapter_11_df + linear_regression_estimator = LinearRegressionEstimator("treatments", None, None, set(), "outcomes", df) + ate, _ = linear_regression_estimator.estimate_coefficient() + + self.assertEqual( + round( + linear_regression_estimator.model.params["Intercept"] + + 90 * linear_regression_estimator.model.params["treatments"], + 1, + ), + 216.9, + ) + + # Increasing treatments from 90 to 100 should be the same as 10 times the unit ATE + self.assertTrue( + all( + round(linear_regression_estimator.model.params["treatments"], 1) == round(ate_single, 1) + for ate_single in ate + ) + ) + + def test_program_11_3(self): + """Test whether our linear regression implementation produces the same results as program 11.3 (p. 144).""" + df = self.chapter_11_df.copy() + linear_regression_estimator = LinearRegressionEstimator( + "treatments", None, None, set(), "outcomes", df, formula="outcomes ~ treatments + I(treatments ** 2)" + ) + ate, _ = linear_regression_estimator.estimate_coefficient() + self.assertEqual( + round( + linear_regression_estimator.model.params["Intercept"] + + 90 * linear_regression_estimator.model.params["treatments"] + + 90 * 90 * linear_regression_estimator.model.params["I(treatments ** 2)"], + 1, + ), + 197.1, + ) + # Increasing treatments from 90 to 100 should be the same as 10 times the unit ATE + self.assertTrue( + all( + round(linear_regression_estimator.model.params["treatments"], 3) == round(ate_single, 3) + for ate_single in ate + ) + ) + + def test_program_15_1A(self): + """Test whether our linear regression implementation produces the same results as program 15.1 (p. 163, 184).""" + df = self.nhefs_df + covariates = { + "sex", + "race", + "age", + "edu_2", + "edu_3", + "edu_4", + "edu_5", + "exercise_1", + "exercise_2", + "active_1", + "active_2", + "wt71", + "smokeintensity", + "smokeyrs", + } + linear_regression_estimator = LinearRegressionEstimator( + "qsmk", + 1, + 0, + covariates, + "wt82_71", + df, + formula=f"""wt82_71 ~ qsmk + + {'+'.join(sorted(list(covariates)))} + + I(age ** 2) + + I(wt71 ** 2) + + I(smokeintensity ** 2) + + I(smokeyrs ** 2) + + (qsmk * smokeintensity)""", + ) + # terms_to_square = ["age", "wt71", "smokeintensity", "smokeyrs"] + # terms_to_product = [("qsmk", "smokeintensity")] + # for term_to_square in terms_to_square: + # for term_a, term_b in terms_to_product: + # linear_regression_estimator.add_product_term_to_df(term_a, term_b) + + linear_regression_estimator.estimate_coefficient() + self.assertEqual(round(linear_regression_estimator.model.params["qsmk"], 1), 2.6) + self.assertEqual(round(linear_regression_estimator.model.params["qsmk:smokeintensity"], 2), 0.05) + + def test_program_15_no_interaction(self): + """Test whether our linear regression implementation produces the same results as program 15.1 (p. 163, 184) + without product parameter.""" + df = self.nhefs_df + covariates = { + "sex", + "race", + "age", + "edu_2", + "edu_3", + "edu_4", + "edu_5", + "exercise_1", + "exercise_2", + "active_1", + "active_2", + "wt71", + "smokeintensity", + "smokeyrs", + } + linear_regression_estimator = LinearRegressionEstimator( + "qsmk", + 1, + 0, + covariates, + "wt82_71", + df, + formula="wt82_71 ~ qsmk + age + I(age ** 2) + wt71 + I(wt71 ** 2) + smokeintensity + I(smokeintensity ** 2) + smokeyrs + I(smokeyrs ** 2)", + ) + # terms_to_square = ["age", "wt71", "smokeintensity", "smokeyrs"] + # for term_to_square in terms_to_square: + ate, [ci_low, ci_high] = linear_regression_estimator.estimate_coefficient() + + self.assertEqual(round(ate[0], 1), 3.5) + self.assertEqual([round(ci_low[0], 1), round(ci_high[0], 1)], [2.6, 4.3]) + + def test_program_15_no_interaction_ate(self): + """Test whether our linear regression implementation produces the same results as program 15.1 (p. 163, 184) + without product parameter.""" + df = self.nhefs_df + covariates = { + "sex", + "race", + "age", + "edu_2", + "edu_3", + "edu_4", + "edu_5", + "exercise_1", + "exercise_2", + "active_1", + "active_2", + "wt71", + "smokeintensity", + "smokeyrs", + } + linear_regression_estimator = LinearRegressionEstimator( + "qsmk", + 1, + 0, + covariates, + "wt82_71", + df, + formula="wt82_71 ~ qsmk + age + I(age ** 2) + wt71 + I(wt71 ** 2) + smokeintensity + I(smokeintensity ** 2) + smokeyrs + I(smokeyrs ** 2)", + ) + # terms_to_square = ["age", "wt71", "smokeintensity", "smokeyrs"] + # for term_to_square in terms_to_square: + ate, [ci_low, ci_high] = linear_regression_estimator.estimate_ate() + self.assertEqual(round(ate[0], 1), 3.5) + self.assertEqual([round(ci_low[0], 1), round(ci_high[0], 1)], [2.6, 4.3]) + + def test_program_15_no_interaction_ate_calculated(self): + """Test whether our linear regression implementation produces the same results as program 15.1 (p. 163, 184) + without product parameter.""" + df = self.nhefs_df + covariates = { + "sex", + "race", + "age", + "edu_2", + "edu_3", + "edu_4", + "edu_5", + "exercise_1", + "exercise_2", + "active_1", + "active_2", + "wt71", + "smokeintensity", + "smokeyrs", + } + linear_regression_estimator = LinearRegressionEstimator( + "qsmk", + 1, + 0, + covariates, + "wt82_71", + df, + formula="wt82_71 ~ qsmk + age + I(age ** 2) + wt71 + I(wt71 ** 2) + smokeintensity + I(smokeintensity ** 2) + smokeyrs + I(smokeyrs ** 2)", + ) + # terms_to_square = ["age", "wt71", "smokeintensity", "smokeyrs"] + # for term_to_square in terms_to_square: + + ate, [ci_low, ci_high] = linear_regression_estimator.estimate_ate_calculated( + adjustment_config={k: self.nhefs_df.mean()[k] for k in covariates} + ) + self.assertEqual(round(ate[0], 1), 3.5) + self.assertEqual([round(ci_low[0], 1), round(ci_high[0], 1)], [1.9, 5]) + + def test_program_11_2_with_robustness_validation(self): + """Test whether our linear regression estimator, as used in test_program_11_2 can correctly estimate robustness.""" + df = self.chapter_11_df.copy() + linear_regression_estimator = LinearRegressionEstimator("treatments", 100, 90, set(), "outcomes", df) + linear_regression_estimator.estimate_coefficient() + + cv = CausalValidator() + self.assertEqual(round(cv.estimate_robustness(linear_regression_estimator.model)["treatments"], 4), 0.7353) + + def test_gp(self): + df = pd.DataFrame() + df["X"] = np.arange(10) + df["Y"] = 1 / (df["X"] + 1) + linear_regression_estimator = LinearRegressionEstimator("X", 0, 1, set(), "Y", df.astype(float)) + linear_regression_estimator.gp_formula(seeds=["reciprocal(add(X, 1))"]) + self.assertEqual(linear_regression_estimator.formula, "Y ~ I(1/(X + 1)) - 1") + ate, (ci_low, ci_high) = linear_regression_estimator.estimate_ate_calculated() + self.assertEqual(round(ate[0], 2), 0.50) + self.assertEqual(round(ci_low[0], 2), 0.50) + self.assertEqual(round(ci_high[0], 2), 0.50) + + def test_gp_power(self): + df = pd.DataFrame() + df["X"] = np.arange(10) + df["Y"] = 2 * (df["X"] ** 2) + linear_regression_estimator = LinearRegressionEstimator("X", 0, 1, set(), "Y", df.astype(float)) + linear_regression_estimator.gp_formula(seed=1, max_order=2, seeds=["mul(2, power_2(X))"]) + self.assertEqual( + linear_regression_estimator.formula, + "Y ~ I(2*X**2) - 1", + ) + ate, (ci_low, ci_high) = linear_regression_estimator.estimate_ate_calculated() + self.assertEqual(round(ate[0], 2), -2.00) + self.assertEqual(round(ci_low[0], 2), -2.00) + self.assertEqual(round(ci_high[0], 2), -2.00) + + +class TestLinearRegressionInteraction(unittest.TestCase): + """Test linear regression for estimating effects involving interaction.""" + + @classmethod + def setUpClass(cls) -> None: + # Y = 2X1 - 3X2 + 2*X1*X2 + 10 + df = pd.DataFrame({"X1": np.random.uniform(-1000, 1000, 1000), "X2": np.random.uniform(-1000, 1000, 1000)}) + df["Y"] = 2 * df["X1"] - 3 * df["X2"] + 2 * df["X1"] * df["X2"] + 10 + cls.df = df + cls.scarf_df = pd.read_csv("tests/resources/data/scarf_data.csv") + + def test_X1_effect(self): + """When we fix the value of X2 to 0, the effect of X1 on Y should become ~2 (because X2 terms are cancelled).""" + lr_model = LinearRegressionEstimator( + "X1", 1, 0, {"X2"}, "Y", effect_modifiers={"x2": 0}, formula="Y ~ X1 + X2 + (X1 * X2)", df=self.df + ) + test_results = lr_model.estimate_ate() + ate = test_results[0][0] + self.assertAlmostEqual(ate, 2.0) + + def test_categorical_confidence_intervals(self): + lr_model = LinearRegressionEstimator( + treatment="color", + control_value=None, + treatment_value=None, + adjustment_set={}, + outcome="length_in", + df=self.scarf_df, + ) + coefficients, [ci_low, ci_high] = lr_model.estimate_coefficient() + + # The precise values don't really matter. This test is primarily intended to make sure the return type is correct. + self.assertTrue(coefficients.round(2).equals(pd.Series({"color[T.grey]": 0.92, "color[T.orange]": -4.25}))) + self.assertTrue(ci_low.round(2).equals(pd.Series({"color[T.grey]": -22.12, "color[T.orange]": -25.58}))) + self.assertTrue(ci_high.round(2).equals(pd.Series({"color[T.grey]": 23.95, "color[T.orange]": 17.08}))) diff --git a/tests/estimation_tests/test_logistic_regression_estimator.py b/tests/estimation_tests/test_logistic_regression_estimator.py new file mode 100644 index 00000000..a5d104e3 --- /dev/null +++ b/tests/estimation_tests/test_logistic_regression_estimator.py @@ -0,0 +1,24 @@ +import unittest +import pandas as pd +import numpy as np +import matplotlib.pyplot as plt +from causal_testing.specification.variable import Input +from causal_testing.utils.validation import CausalValidator +from causal_testing.specification.capabilities import TreatmentSequence +from causal_testing.estimation.logistic_regression_estimator import LogisticRegressionEstimator + + +class TestLogisticRegressionEstimator(unittest.TestCase): + """Test the logistic regression estimator against the scarf example from + https://investigate.ai/regression/logistic-regression/. + """ + + @classmethod + def setUpClass(cls) -> None: + cls.scarf_df = pd.read_csv("tests/resources/data/scarf_data.csv") + + def test_odds_ratio(self): + df = self.scarf_df.copy() + logistic_regression_estimator = LogisticRegressionEstimator("length_in", 65, 55, set(), "completed", df) + odds = logistic_regression_estimator.estimate_unit_odds_ratio() + self.assertEqual(round(odds, 4), 0.8948) diff --git a/tests/json_front_tests/test_json_class.py b/tests/json_front_tests/test_json_class.py index 8fa49194..ab565da7 100644 --- a/tests/json_front_tests/test_json_class.py +++ b/tests/json_front_tests/test_json_class.py @@ -4,7 +4,8 @@ import scipy import os -from causal_testing.testing.estimators import LinearRegressionEstimator, Estimator +from causal_testing.estimation.linear_regression_estimator import LinearRegressionEstimator +from causal_testing.estimation.abstract_estimator import Estimator from causal_testing.testing.causal_test_outcome import NoEffect, Positive from causal_testing.json_front.json_class import JsonUtility, CausalVariables from causal_testing.specification.variable import Input, Output, Meta @@ -313,7 +314,7 @@ def add_modelling_assumptions(self): effects = {"Positive": Positive()} mutates = { "Increase": lambda x: self.json_class.scenario.treatment_variables[x].z3 - > self.json_class.scenario.variables[x].z3 + > self.json_class.scenario.variables[x].z3 } estimators = {"ExampleEstimator": ExampleEstimator} with self.assertRaises(TypeError): diff --git a/tests/specification_tests/test_capabilities.py b/tests/specification_tests/test_capabilities.py index 505c7eff..98fd466c 100644 --- a/tests/specification_tests/test_capabilities.py +++ b/tests/specification_tests/test_capabilities.py @@ -3,7 +3,6 @@ class TestCapability(unittest.TestCase): - """ Test the Capability class for basic methods. """ @@ -17,7 +16,6 @@ def test_repr(self): class TestTreatmentSequence(unittest.TestCase): - """ Test the TreatmentSequence class for basic methods. """ diff --git a/tests/specification_tests/test_causal_dag.py b/tests/specification_tests/test_causal_dag.py index c020ae67..bd01d11c 100644 --- a/tests/specification_tests/test_causal_dag.py +++ b/tests/specification_tests/test_causal_dag.py @@ -8,8 +8,6 @@ from causal_testing.testing.base_test_case import BaseTestCase - - class TestCausalDAGIssue90(unittest.TestCase): """ Test the CausalDAG class for the resolution of Issue 90. @@ -63,10 +61,11 @@ def test_common_cause(self): causal_dag.graph.add_edge("U", "I") with self.assertRaises(ValueError): causal_dag.check_iv_assumptions("X", "Y", "I") - + def tearDown(self) -> None: shutil.rmtree(self.temp_dir_path) + class TestCausalDAG(unittest.TestCase): """ Test the CausalDAG class for creation of Causal Directed Acyclic Graphs (DAGs). @@ -154,10 +153,11 @@ def test_direct_effect_adjustment_sets_no_adjustment(self): causal_dag = CausalDAG(self.dag_dot_path) adjustment_sets = causal_dag.direct_effect_adjustment_sets(["X2"], ["D1"]) self.assertEqual(list(adjustment_sets), [set()]) - + def tearDown(self) -> None: shutil.rmtree(self.temp_dir_path) + class TestDAGIdentification(unittest.TestCase): """ Test the Causal DAG identification algorithms and supporting algorithms. @@ -345,6 +345,7 @@ def test_dag_with_non_character_nodes(self): def tearDown(self) -> None: shutil.rmtree(self.temp_dir_path) + class TestDependsOnOutputs(unittest.TestCase): """ Test the depends_on_outputs method. diff --git a/tests/specification_tests/test_variable.py b/tests/specification_tests/test_variable.py index cc76df6a..b35724a2 100644 --- a/tests/specification_tests/test_variable.py +++ b/tests/specification_tests/test_variable.py @@ -7,7 +7,6 @@ class TestVariable(unittest.TestCase): - """ Test the Variable class for basic methods. """ @@ -143,7 +142,6 @@ def test_copy(self): class TestZ3Methods(unittest.TestCase): - """ Test the Variable class for Z3 methods. diff --git a/tests/surrogate_tests/test_causal_surrogate_assisted.py b/tests/surrogate_tests/test_causal_surrogate_assisted.py index 54c93af1..5d408a85 100644 --- a/tests/surrogate_tests/test_causal_surrogate_assisted.py +++ b/tests/surrogate_tests/test_causal_surrogate_assisted.py @@ -4,20 +4,25 @@ from causal_testing.specification.causal_specification import CausalSpecification from causal_testing.specification.scenario import Scenario from causal_testing.specification.variable import Input, Output -from causal_testing.surrogate.causal_surrogate_assisted import SimulationResult, CausalSurrogateAssistedTestCase, Simulator +from causal_testing.surrogate.causal_surrogate_assisted import ( + SimulationResult, + CausalSurrogateAssistedTestCase, + Simulator, +) from causal_testing.surrogate.surrogate_search_algorithms import GeneticSearchAlgorithm -from causal_testing.testing.estimators import CubicSplineRegressionEstimator +from causal_testing.estimation.cubic_spline_estimator import CubicSplineRegressionEstimator import os import shutil, tempfile import pandas as pd import numpy as np + class TestSimulationResult(unittest.TestCase): def setUp(self): - self.data = {'key': 'value'} + self.data = {"key": "value"} def test_inputs(self): @@ -37,6 +42,7 @@ def test_inputs(self): self.assertEqual(result.relationship, relationship) + class TestCausalSurrogate(unittest.TestCase): @classmethod @@ -79,20 +85,18 @@ def test_causal_surrogate_assisted_execution(self): x = Input("X", float) m = Input("M", int) y = Output("Y", float) - scenario = Scenario(variables={z, x, m, y}, constraints={ - z <= 0, z >= 3, - x <= 0, x >= 3, - m <= 0, m >= 3 - }) + scenario = Scenario(variables={z, x, m, y}, constraints={z <= 0, z >= 3, x <= 0, x >= 3, m <= 0, m >= 3}) specification = CausalSpecification(scenario, causal_dag) - search_algorithm = GeneticSearchAlgorithm(config= { + search_algorithm = GeneticSearchAlgorithm( + config={ "parent_selection_type": "tournament", "K_tournament": 4, "mutation_type": "random", "mutation_percent_genes": 50, "mutation_by_replacement": True, - }) + } + ) simulator = TestSimulator() c_s_a_test_case = CausalSurrogateAssistedTestCase(specification, search_algorithm, simulator) @@ -111,20 +115,18 @@ def test_causal_surrogate_assisted_execution_failure(self): x = Input("X", float) m = Input("M", int) y = Output("Y", float) - scenario = Scenario(variables={z, x, m, y}, constraints={ - z <= 0, z >= 3, - x <= 0, x >= 3, - m <= 0, m >= 3 - }) + scenario = Scenario(variables={z, x, m, y}, constraints={z <= 0, z >= 3, x <= 0, x >= 3, m <= 0, m >= 3}) specification = CausalSpecification(scenario, causal_dag) - search_algorithm = GeneticSearchAlgorithm(config= { + search_algorithm = GeneticSearchAlgorithm( + config={ "parent_selection_type": "tournament", "K_tournament": 4, "mutation_type": "random", "mutation_percent_genes": 50, "mutation_by_replacement": True, - }) + } + ) simulator = TestSimulatorFailing() c_s_a_test_case = CausalSurrogateAssistedTestCase(specification, search_algorithm, simulator) @@ -143,26 +145,25 @@ def test_causal_surrogate_assisted_execution_custom_aggregator(self): x = Input("X", float) m = Input("M", int) y = Output("Y", float) - scenario = Scenario(variables={z, x, m, y}, constraints={ - z <= 0, z >= 3, - x <= 0, x >= 3, - m <= 0, m >= 3 - }) + scenario = Scenario(variables={z, x, m, y}, constraints={z <= 0, z >= 3, x <= 0, x >= 3, m <= 0, m >= 3}) specification = CausalSpecification(scenario, causal_dag) - search_algorithm = GeneticSearchAlgorithm(config= { + search_algorithm = GeneticSearchAlgorithm( + config={ "parent_selection_type": "tournament", "K_tournament": 4, "mutation_type": "random", "mutation_percent_genes": 50, "mutation_by_replacement": True, - }) + } + ) simulator = TestSimulator() c_s_a_test_case = CausalSurrogateAssistedTestCase(specification, search_algorithm, simulator) - result, iterations, result_data = c_s_a_test_case.execute(ObservationalDataCollector(scenario, df), - custom_data_aggregator=data_double_aggregator) + result, iterations, result_data = c_s_a_test_case.execute( + ObservationalDataCollector(scenario, df), custom_data_aggregator=data_double_aggregator + ) self.assertIsInstance(result, SimulationResult) self.assertEqual(iterations, 1) @@ -176,62 +177,69 @@ def test_causal_surrogate_assisted_execution_incorrect_search_config(self): x = Input("X", float) m = Input("M", int) y = Output("Y", float) - scenario = Scenario(variables={z, x, m, y}, constraints={ - z <= 0, z >= 3, - x <= 0, x >= 3, - m <= 0, m >= 3 - }) + scenario = Scenario(variables={z, x, m, y}, constraints={z <= 0, z >= 3, x <= 0, x >= 3, m <= 0, m >= 3}) specification = CausalSpecification(scenario, causal_dag) - search_algorithm = GeneticSearchAlgorithm(config= { + search_algorithm = GeneticSearchAlgorithm( + config={ "parent_selection_type": "tournament", "K_tournament": 4, "mutation_type": "random", "mutation_percent_genes": 50, "mutation_by_replacement": True, - "gene_space": "Something" - }) + "gene_space": "Something", + } + ) simulator = TestSimulator() c_s_a_test_case = CausalSurrogateAssistedTestCase(specification, search_algorithm, simulator) - self.assertRaises(ValueError, c_s_a_test_case.execute, - data_collector=ObservationalDataCollector(scenario, df), - custom_data_aggregator=data_double_aggregator) + self.assertRaises( + ValueError, + c_s_a_test_case.execute, + data_collector=ObservationalDataCollector(scenario, df), + custom_data_aggregator=data_double_aggregator, + ) def tearDown(self) -> None: shutil.rmtree(self.temp_dir_path) + def load_class_df(): """Get the testing data and put into a dataframe.""" - class_df = pd.DataFrame({"Z": np.arange(16), "X": np.arange(16), "M": np.arange(16, 32), "Y": np.arange(32,16,-1)}) + class_df = pd.DataFrame( + {"Z": np.arange(16), "X": np.arange(16), "M": np.arange(16, 32), "Y": np.arange(32, 16, -1)} + ) return class_df + class TestSimulator(Simulator): def run_with_config(self, configuration: dict) -> SimulationResult: return SimulationResult({"Z": 1, "X": 1, "M": 1, "Y": 1}, True, None) - + def startup(self): pass def shutdown(self): pass + class TestSimulatorFailing(Simulator): def run_with_config(self, configuration: dict) -> SimulationResult: return SimulationResult({"Z": 1, "X": 1, "M": 1, "Y": 1}, False, None) - + def startup(self): pass def shutdown(self): pass + def data_double_aggregator(data, new_data): - """Previously used data.append(new_data), however, pandas version >2 requires pd.concat() since append is now a private method. - Converting new_data to a pd.DataFrame is required to use pd.concat(). """ + """Previously used data.append(new_data), however, pandas version >2 requires pd.concat() since append is now a private method. + Converting new_data to a pd.DataFrame is required to use pd.concat().""" new_data = pd.DataFrame([new_data]) return pd.concat([data, new_data, new_data], ignore_index=True) diff --git a/tests/testing_tests/test_causal_test_adequacy.py b/tests/testing_tests/test_causal_test_adequacy.py index e29a8207..d5aeb5b1 100644 --- a/tests/testing_tests/test_causal_test_adequacy.py +++ b/tests/testing_tests/test_causal_test_adequacy.py @@ -5,7 +5,8 @@ import os import pandas as pd -from causal_testing.testing.estimators import LinearRegressionEstimator, IPCWEstimator +from causal_testing.estimation.linear_regression_estimator import LinearRegressionEstimator +from causal_testing.estimation.ipcw_estimator import IPCWEstimator from causal_testing.testing.base_test_case import BaseTestCase from causal_testing.testing.causal_test_case import CausalTestCase from causal_testing.testing.causal_test_suite import CausalTestSuite diff --git a/tests/testing_tests/test_causal_test_case.py b/tests/testing_tests/test_causal_test_case.py index 4d081a62..600191d3 100644 --- a/tests/testing_tests/test_causal_test_case.py +++ b/tests/testing_tests/test_causal_test_case.py @@ -11,7 +11,7 @@ from causal_testing.data_collection.data_collector import ObservationalDataCollector from causal_testing.testing.causal_test_case import CausalTestCase from causal_testing.testing.causal_test_outcome import ExactValue -from causal_testing.testing.estimators import LinearRegressionEstimator +from causal_testing.estimation.linear_regression_estimator import LinearRegressionEstimator from causal_testing.testing.base_test_case import BaseTestCase @@ -107,7 +107,6 @@ def test_check_minimum_adjustment_set(self): minimal_adjustment_set = self.causal_dag.identification(self.base_test_case) self.assertEqual(minimal_adjustment_set, {"D"}) - def test_invalid_causal_effect(self): """Check that executing the causal test case returns the correct results for dummy data using a linear regression estimator.""" @@ -170,7 +169,7 @@ def test_execute_test_observational_linear_regression_estimator_coefficient(self ) self.causal_test_case.estimate_type = "coefficient" causal_test_result = self.causal_test_case.execute_test(estimation_model, self.data_collector) - pd.testing.assert_series_equal(causal_test_result.test_value.value, pd.Series({'D': 0.0}), atol=1e-1) + pd.testing.assert_series_equal(causal_test_result.test_value.value, pd.Series({"D": 0.0}), atol=1e-1) def test_execute_test_observational_linear_regression_estimator_risk_ratio(self): """Check that executing the causal test case returns the correct results for dummy data using a linear diff --git a/tests/testing_tests/test_causal_test_outcome.py b/tests/testing_tests/test_causal_test_outcome.py index 235cc724..0bdbe4fa 100644 --- a/tests/testing_tests/test_causal_test_outcome.py +++ b/tests/testing_tests/test_causal_test_outcome.py @@ -2,7 +2,7 @@ import pandas as pd from causal_testing.testing.causal_test_outcome import ExactValue, SomeEffect, Positive, Negative, NoEffect from causal_testing.testing.causal_test_result import CausalTestResult, TestValue -from causal_testing.testing.estimators import LinearRegressionEstimator +from causal_testing.estimation.linear_regression_estimator import LinearRegressionEstimator from causal_testing.utils.validation import CausalValidator diff --git a/tests/testing_tests/test_causal_test_suite.py b/tests/testing_tests/test_causal_test_suite.py index d6af77cb..151f7af2 100644 --- a/tests/testing_tests/test_causal_test_suite.py +++ b/tests/testing_tests/test_causal_test_suite.py @@ -9,7 +9,8 @@ from causal_testing.testing.base_test_case import BaseTestCase from causal_testing.specification.variable import Input, Output from causal_testing.testing.causal_test_outcome import ExactValue -from causal_testing.testing.estimators import LinearRegressionEstimator, LogisticRegressionEstimator +from causal_testing.estimation.linear_regression_estimator import LinearRegressionEstimator +from causal_testing.estimation.logistic_regression_estimator import LogisticRegressionEstimator from causal_testing.specification.causal_specification import CausalSpecification, Scenario from causal_testing.data_collection.data_collector import ObservationalDataCollector from causal_testing.specification.causal_dag import CausalDAG @@ -102,7 +103,9 @@ def test_execute_test_suite_single_base_test_case(self): causal_test_results = self.test_suite.execute_test_suite(self.data_collector, self.causal_specification) causal_test_case_result = causal_test_results[self.base_test_case] - self.assertAlmostEqual(causal_test_case_result["LinearRegressionEstimator"][0].test_value.value[0], 4, delta=1e-10) + self.assertAlmostEqual( + causal_test_case_result["LinearRegressionEstimator"][0].test_value.value[0], 4, delta=1e-10 + ) # Without CausalForestEstimator we now only have 2 estimators. Unfortunately LogicisticRegressionEstimator does not # currently work with TestSuite. So for now removed test diff --git a/tests/testing_tests/test_estimators.py b/tests/testing_tests/test_estimators.py deleted file mode 100644 index b7604b86..00000000 --- a/tests/testing_tests/test_estimators.py +++ /dev/null @@ -1,547 +0,0 @@ -import unittest -import pandas as pd -import numpy as np -import matplotlib.pyplot as plt -from causal_testing.testing.estimators import ( - LinearRegressionEstimator, - LogisticRegressionEstimator, - InstrumentalVariableEstimator, - CubicSplineRegressionEstimator, - IPCWEstimator, -) -from causal_testing.specification.variable import Input -from causal_testing.utils.validation import CausalValidator -from causal_testing.specification.capabilities import TreatmentSequence - - -def plot_results_df(df): - """A helper method to plot results dataframe for estimators, where the df parameter must have columns for the cate, - ci_low, and ci_high. - - :param df: A dataframe containing the columns cate, ci_low, and ci_high, where each row is an observation. - :return: Plot the treatment effect with confidence intervals for each observation. - """ - - df.sort_values("smokeintensity", inplace=True, ascending=True) - df.reset_index(inplace=True, drop=True) - plt.scatter(df["smokeintensity"], df["cate"], label="CATE", color="black") - plt.fill_between(df["smokeintensity"], df["ci_low"], df["ci_high"], alpha=0.2) - plt.ylabel("Weight Change (kg) caused by stopping smoking") - plt.xlabel("Smoke intensity (cigarettes smoked per day)") - plt.show() - - -def load_nhefs_df(): - """Get the NHEFS data from chapter 12 and put into a dataframe. NHEFS = National Health and Nutrition Examination - Survey Data I Epidemiological Follow-up Study.""" - - nhefs_df = pd.read_csv("tests/resources/data/nhefs.csv") - nhefs_df["one"] = 1 - nhefs_df["zero"] = 0 - edu_dummies = pd.get_dummies(nhefs_df.education, prefix="edu") - exercise_dummies = pd.get_dummies(nhefs_df.exercise, prefix="exercise") - active_dummies = pd.get_dummies(nhefs_df.active, prefix="active") - nhefs_df = pd.concat([nhefs_df, edu_dummies, exercise_dummies, active_dummies], axis=1) - return nhefs_df - - -def load_chapter_11_df(): - """Get the data from chapter 11 and put into a dataframe.""" - - treatments, outcomes = zip( - *( - (3, 21), - (11, 54), - (17, 33), - (23, 101), - (29, 85), - (37, 65), - (41, 157), - (53, 120), - (67, 111), - (79, 200), - (83, 140), - (97, 220), - (60, 230), - (71, 217), - (15, 11), - (45, 190), - ) - ) - chapter_11_df = pd.DataFrame({"treatments": treatments, "outcomes": outcomes, "constant": np.ones(16)}) - return chapter_11_df - - -class TestLogisticRegressionEstimator(unittest.TestCase): - """Test the logistic regression estimator against the scarf example from - https://investigate.ai/regression/logistic-regression/. - """ - - @classmethod - def setUpClass(cls) -> None: - cls.scarf_df = pd.read_csv("tests/resources/data/scarf_data.csv") - - # Yes, this probably shouldn't be in here, but it uses the scarf data so it makes more sense to put it - # here than duplicating the scarf data for a single test - def test_linear_regression_categorical_ate(self): - df = self.scarf_df.copy() - logistic_regression_estimator = LinearRegressionEstimator("color", None, None, set(), "completed", df) - ate, confidence = logistic_regression_estimator.estimate_coefficient() - self.assertTrue(all([ci_low < 0 < ci_high for ci_low, ci_high in zip(confidence[0], confidence[1])])) - - def test_ate(self): - df = self.scarf_df.copy() - logistic_regression_estimator = LogisticRegressionEstimator("length_in", 65, 55, set(), "completed", df) - ate, _ = logistic_regression_estimator.estimate_ate() - self.assertEqual(round(ate, 4), -0.1987) - - def test_risk_ratio(self): - df = self.scarf_df.copy() - logistic_regression_estimator = LogisticRegressionEstimator("length_in", 65, 55, set(), "completed", df) - rr, _ = logistic_regression_estimator.estimate_risk_ratio() - self.assertEqual(round(rr, 4), 0.7664) - - def test_odds_ratio(self): - df = self.scarf_df.copy() - logistic_regression_estimator = LogisticRegressionEstimator("length_in", 65, 55, set(), "completed", df) - odds = logistic_regression_estimator.estimate_unit_odds_ratio() - self.assertEqual(round(odds, 4), 0.8948) - - def test_ate_adjustment(self): - df = self.scarf_df.copy() - logistic_regression_estimator = LogisticRegressionEstimator( - "length_in", 65, 55, {"large_gauge"}, "completed", df - ) - ate, _ = logistic_regression_estimator.estimate_ate(adjustment_config={"large_gauge": 0}) - self.assertEqual(round(ate, 4), -0.3388) - - def test_ate_invalid_adjustment(self): - df = self.scarf_df.copy() - logistic_regression_estimator = LogisticRegressionEstimator("length_in", 65, 55, {}, "completed", df) - with self.assertRaises(ValueError): - ate, _ = logistic_regression_estimator.estimate_ate(adjustment_config={"large_gauge": 0}) - - def test_ate_effect_modifiers(self): - df = self.scarf_df.copy() - logistic_regression_estimator = LogisticRegressionEstimator( - "length_in", 65, 55, set(), "completed", df, effect_modifiers={"large_gauge": 0} - ) - ate, _ = logistic_regression_estimator.estimate_ate() - self.assertEqual(round(ate, 4), -0.3388) - - def test_ate_effect_modifiers_formula(self): - df = self.scarf_df.copy() - logistic_regression_estimator = LogisticRegressionEstimator( - "length_in", - 65, - 55, - set(), - "completed", - df, - effect_modifiers={"large_gauge": 0}, - formula="completed ~ length_in + large_gauge", - ) - ate, _ = logistic_regression_estimator.estimate_ate() - self.assertEqual(round(ate, 4), -0.3388) - - -class TestInstrumentalVariableEstimator(unittest.TestCase): - """ - Test the instrumental variable estimator. - """ - - @classmethod - def setUpClass(cls) -> None: - Z = np.linspace(0, 10) - X = 2 * Z - Y = 2 * X - cls.df = pd.DataFrame({"Z": Z, "X": X, "Y": Y}) - - def test_estimate_coefficient(self): - """ - Test we get the correct coefficient. - """ - iv_estimator = InstrumentalVariableEstimator( - df=self.df, - treatment="X", - treatment_value=None, - control_value=None, - adjustment_set=set(), - outcome="Y", - instrument="Z", - ) - self.assertEqual(iv_estimator.estimate_coefficient(self.df), 2) - - def test_estimate_coefficient(self): - """ - Test we get the correct coefficient. - """ - iv_estimator = InstrumentalVariableEstimator( - df=self.df, - treatment="X", - treatment_value=None, - control_value=None, - adjustment_set=set(), - outcome="Y", - instrument="Z", - ) - coefficient, [low, high] = iv_estimator.estimate_coefficient() - self.assertEqual(coefficient[0], 2) - - -class TestLinearRegressionEstimator(unittest.TestCase): - """Test the linear regression estimator against the programming exercises in Section 2 of Hernán and Robins [1]. - - Reference: Hernán MA, Robins JM (2020). Causal Inference: What If. Boca Raton: Chapman & Hall/CRC. - Link: https://www.hsph.harvard.edu/miguel-hernan/causal-inference-book/ - """ - - @classmethod - def setUpClass(cls) -> None: - cls.nhefs_df = load_nhefs_df() - cls.chapter_11_df = load_chapter_11_df() - - def test_query(self): - df = self.nhefs_df - linear_regression_estimator = LinearRegressionEstimator( - "treatments", None, None, set(), "outcomes", df, query="sex==1" - ) - self.assertTrue(linear_regression_estimator.df.sex.all()) - - def test_program_11_2(self): - """Test whether our linear regression implementation produces the same results as program 11.2 (p. 141).""" - df = self.chapter_11_df - linear_regression_estimator = LinearRegressionEstimator("treatments", None, None, set(), "outcomes", df) - model = linear_regression_estimator._run_linear_regression() - ate, _ = linear_regression_estimator.estimate_coefficient() - - self.assertEqual(round(model.params["Intercept"] + 90 * model.params["treatments"], 1), 216.9) - - # Increasing treatments from 90 to 100 should be the same as 10 times the unit ATE - self.assertTrue(all(round(model.params["treatments"], 1) == round(ate_single, 1) for ate_single in ate)) - - def test_program_11_3(self): - """Test whether our linear regression implementation produces the same results as program 11.3 (p. 144).""" - df = self.chapter_11_df.copy() - linear_regression_estimator = LinearRegressionEstimator( - "treatments", None, None, set(), "outcomes", df, formula="outcomes ~ treatments + np.power(treatments, 2)" - ) - model = linear_regression_estimator._run_linear_regression() - ate, _ = linear_regression_estimator.estimate_coefficient() - self.assertEqual( - round( - model.params["Intercept"] - + 90 * model.params["treatments"] - + 90 * 90 * model.params["np.power(treatments, 2)"], - 1, - ), - 197.1, - ) - # Increasing treatments from 90 to 100 should be the same as 10 times the unit ATE - self.assertTrue(all(round(model.params["treatments"], 3) == round(ate_single, 3) for ate_single in ate)) - - def test_program_15_1A(self): - """Test whether our linear regression implementation produces the same results as program 15.1 (p. 163, 184).""" - df = self.nhefs_df - covariates = { - "sex", - "race", - "age", - "edu_2", - "edu_3", - "edu_4", - "edu_5", - "exercise_1", - "exercise_2", - "active_1", - "active_2", - "wt71", - "smokeintensity", - "smokeyrs", - } - linear_regression_estimator = LinearRegressionEstimator( - "qsmk", - 1, - 0, - covariates, - "wt82_71", - df, - formula=f"""wt82_71 ~ qsmk + - {'+'.join(sorted(list(covariates)))} + - np.power(age, 2) + - np.power(wt71, 2) + - np.power(smokeintensity, 2) + - np.power(smokeyrs, 2) + - (qsmk * smokeintensity)""", - ) - # terms_to_square = ["age", "wt71", "smokeintensity", "smokeyrs"] - # terms_to_product = [("qsmk", "smokeintensity")] - # for term_to_square in terms_to_square: - # for term_a, term_b in terms_to_product: - # linear_regression_estimator.add_product_term_to_df(term_a, term_b) - - model = linear_regression_estimator._run_linear_regression() - self.assertEqual(round(model.params["qsmk"], 1), 2.6) - self.assertEqual(round(model.params["qsmk:smokeintensity"], 2), 0.05) - - def test_program_15_no_interaction(self): - """Test whether our linear regression implementation produces the same results as program 15.1 (p. 163, 184) - without product parameter.""" - df = self.nhefs_df - covariates = { - "sex", - "race", - "age", - "edu_2", - "edu_3", - "edu_4", - "edu_5", - "exercise_1", - "exercise_2", - "active_1", - "active_2", - "wt71", - "smokeintensity", - "smokeyrs", - } - linear_regression_estimator = LinearRegressionEstimator( - "qsmk", - 1, - 0, - covariates, - "wt82_71", - df, - formula="wt82_71 ~ qsmk + age + np.power(age, 2) + wt71 + np.power(wt71, 2) + smokeintensity + np.power(smokeintensity, 2) + smokeyrs + np.power(smokeyrs, 2)", - ) - # terms_to_square = ["age", "wt71", "smokeintensity", "smokeyrs"] - # for term_to_square in terms_to_square: - ate, [ci_low, ci_high] = linear_regression_estimator.estimate_coefficient() - - self.assertEqual(round(ate[0], 1), 3.5) - self.assertEqual([round(ci_low[0], 1), round(ci_high[0], 1)], [2.6, 4.3]) - - def test_program_15_no_interaction_ate(self): - """Test whether our linear regression implementation produces the same results as program 15.1 (p. 163, 184) - without product parameter.""" - df = self.nhefs_df - covariates = { - "sex", - "race", - "age", - "edu_2", - "edu_3", - "edu_4", - "edu_5", - "exercise_1", - "exercise_2", - "active_1", - "active_2", - "wt71", - "smokeintensity", - "smokeyrs", - } - linear_regression_estimator = LinearRegressionEstimator( - "qsmk", - 1, - 0, - covariates, - "wt82_71", - df, - formula="wt82_71 ~ qsmk + age + np.power(age, 2) + wt71 + np.power(wt71, 2) + smokeintensity + np.power(smokeintensity, 2) + smokeyrs + np.power(smokeyrs, 2)", - ) - # terms_to_square = ["age", "wt71", "smokeintensity", "smokeyrs"] - # for term_to_square in terms_to_square: - ate, [ci_low, ci_high] = linear_regression_estimator.estimate_ate() - self.assertEqual(round(ate[0], 1), 3.5) - self.assertEqual([round(ci_low[0], 1), round(ci_high[0], 1)], [2.6, 4.3]) - - def test_program_15_no_interaction_ate_calculated(self): - """Test whether our linear regression implementation produces the same results as program 15.1 (p. 163, 184) - without product parameter.""" - df = self.nhefs_df - covariates = { - "sex", - "race", - "age", - "edu_2", - "edu_3", - "edu_4", - "edu_5", - "exercise_1", - "exercise_2", - "active_1", - "active_2", - "wt71", - "smokeintensity", - "smokeyrs", - } - linear_regression_estimator = LinearRegressionEstimator( - "qsmk", - 1, - 0, - covariates, - "wt82_71", - df, - formula="wt82_71 ~ qsmk + age + np.power(age, 2) + wt71 + np.power(wt71, 2) + smokeintensity + np.power(smokeintensity, 2) + smokeyrs + np.power(smokeyrs, 2)", - ) - # terms_to_square = ["age", "wt71", "smokeintensity", "smokeyrs"] - # for term_to_square in terms_to_square: - - ate, [ci_low, ci_high] = linear_regression_estimator.estimate_ate_calculated( - adjustment_config={k: self.nhefs_df.mean()[k] for k in covariates} - ) - self.assertEqual(round(ate[0], 1), 3.5) - self.assertEqual([round(ci_low[0], 1), round(ci_high[0], 1)], [1.9, 5]) - - def test_program_11_2_with_robustness_validation(self): - """Test whether our linear regression estimator, as used in test_program_11_2 can correctly estimate robustness.""" - df = self.chapter_11_df.copy() - linear_regression_estimator = LinearRegressionEstimator("treatments", 100, 90, set(), "outcomes", df) - model = linear_regression_estimator._run_linear_regression() - - cv = CausalValidator() - self.assertEqual(round(cv.estimate_robustness(model)["treatments"], 4), 0.7353) - - -class TestCubicSplineRegressionEstimator(TestLinearRegressionEstimator): - @classmethod - def setUpClass(cls): - super().setUpClass() - - def test_program_11_3_cublic_spline(self): - """Test whether the cublic_spline regression implementation produces the same results as program 11.3 (p. 162). - https://www.hsph.harvard.edu/miguel-hernan/wp-content/uploads/sites/1268/2023/10/hernanrobins_WhatIf_30sep23.pdf - Slightly modified as Hernan et al. use linear regression for this example. - """ - - df = self.chapter_11_df.copy() - - cublic_spline_estimator = CubicSplineRegressionEstimator("treatments", 1, 0, set(), "outcomes", 3, df) - - model = cublic_spline_estimator._run_linear_regression() - - self.assertEqual( - round( - cublic_spline_estimator.model.predict({"Intercept": 1, "treatments": 90}).iloc[0], - 1, - ), - 195.6, - ) - - ate_1 = cublic_spline_estimator.estimate_ate_calculated() - cublic_spline_estimator.treatment_value = 2 - ate_2 = cublic_spline_estimator.estimate_ate_calculated() - - # Doubling the treatemebnt value should roughly but not exactly double the ATE - self.assertNotEqual(ate_1[0] * 2, ate_2[0]) - self.assertAlmostEqual(ate_1[0] * 2, ate_2[0]) - - -class TestLinearRegressionInteraction(unittest.TestCase): - """Test linear regression for estimating effects involving interaction.""" - - @classmethod - def setUpClass(cls) -> None: - # Y = 2X1 - 3X2 + 2*X1*X2 + 10 - df = pd.DataFrame({"X1": np.random.uniform(-1000, 1000, 1000), "X2": np.random.uniform(-1000, 1000, 1000)}) - df["Y"] = 2 * df["X1"] - 3 * df["X2"] + 2 * df["X1"] * df["X2"] + 10 - cls.df = df - cls.scarf_df = pd.read_csv("tests/resources/data/scarf_data.csv") - - def test_X1_effect(self): - """When we fix the value of X2 to 0, the effect of X1 on Y should become ~2 (because X2 terms are cancelled).""" - lr_model = LinearRegressionEstimator( - "X1", 1, 0, {"X2"}, "Y", effect_modifiers={"x2": 0}, formula="Y ~ X1 + X2 + (X1 * X2)", df=self.df - ) - test_results = lr_model.estimate_ate() - ate = test_results[0][0] - self.assertAlmostEqual(ate, 2.0) - - def test_categorical_confidence_intervals(self): - lr_model = LinearRegressionEstimator( - treatment="color", - control_value=None, - treatment_value=None, - adjustment_set={}, - outcome="length_in", - df=self.scarf_df, - ) - coefficients, [ci_low, ci_high] = lr_model.estimate_coefficient() - - # The precise values don't really matter. This test is primarily intended to make sure the return type is correct. - self.assertTrue(coefficients.round(2).equals(pd.Series({"color[T.grey]": 0.92, "color[T.orange]": -4.25}))) - self.assertTrue(ci_low.round(2).equals(pd.Series({"color[T.grey]": -22.12, "color[T.orange]": -25.58}))) - self.assertTrue(ci_high.round(2).equals(pd.Series({"color[T.grey]": 23.95, "color[T.orange]": 17.08}))) - - -class TestIPCWEstimator(unittest.TestCase): - """ - Test the IPCW estimator class - """ - - def test_estimate_hazard_ratio(self): - timesteps_per_intervention = 1 - control_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 0), ("t", 0), ("t", 0)]) - treatment_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) - outcome = "outcome" - fit_bl_switch_formula = "xo_t_do ~ time" - df = pd.read_csv("tests/resources/data/temporal_data.csv") - df["ok"] = df["outcome"] == 1 - estimation_model = IPCWEstimator( - df, - timesteps_per_intervention, - control_strategy, - treatment_strategy, - outcome, - "ok", - fit_bl_switch_formula=fit_bl_switch_formula, - fit_bltd_switch_formula=fit_bl_switch_formula, - eligibility=None, - ) - estimate, intervals = estimation_model.estimate_hazard_ratio() - self.assertEqual(estimate["trtrand"], 1.0) - - def test_invalid_treatment_strategies(self): - timesteps_per_intervention = 1 - control_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 0), ("t", 0), ("t", 0)]) - treatment_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) - outcome = "outcome" - fit_bl_switch_formula = "xo_t_do ~ time" - df = pd.read_csv("tests/resources/data/temporal_data.csv") - df["t"] = (["1", "0"] * len(df))[: len(df)] - df["ok"] = df["outcome"] == 1 - with self.assertRaises(ValueError): - estimation_model = IPCWEstimator( - df, - timesteps_per_intervention, - control_strategy, - treatment_strategy, - outcome, - "ok", - fit_bl_switch_formula=fit_bl_switch_formula, - fit_bltd_switch_formula=fit_bl_switch_formula, - eligibility=None, - ) - - def test_invalid_fault_t_do(self): - timesteps_per_intervention = 1 - control_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 0), ("t", 0), ("t", 0)]) - treatment_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) - outcome = "outcome" - fit_bl_switch_formula = "xo_t_do ~ time" - df = pd.read_csv("tests/resources/data/temporal_data.csv") - df["ok"] = df["outcome"] == 1 - estimation_model = IPCWEstimator( - df, - timesteps_per_intervention, - control_strategy, - treatment_strategy, - outcome, - "ok", - fit_bl_switch_formula=fit_bl_switch_formula, - fit_bltd_switch_formula=fit_bl_switch_formula, - eligibility=None, - ) - estimation_model.df["fault_t_do"] = 0 - with self.assertRaises(ValueError): - estimate, intervals = estimation_model.estimate_hazard_ratio()