Skip to content

feat: Add Variance Reduction to Delta Method and couple it with CUPAC #230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
21 changes: 20 additions & 1 deletion cluster_experiments/cupac.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,20 @@ def __init__(
self,
cupac_model: Optional[BaseEstimator] = None,
target_col: str = "target",
scale_col: Optional[str] = None,
features_cupac_model: Optional[List[str]] = None,
cache_fit: bool = True,
):
self.cupac_model: BaseEstimator = cupac_model or EmptyRegressor()
self.target_col = target_col
self.scale_col = scale_col
self.cupac_outcome_name = f"estimate_{target_col}"
self.features_cupac_model: List[str] = features_cupac_model or []
self.is_cupac = not isinstance(self.cupac_model, EmptyRegressor)
self.cache_fit = cache_fit

self.check_cupac_config()

def _prep_data_cupac(
self, df: pd.DataFrame, pre_experiment_df: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series]:
Expand All @@ -136,7 +140,12 @@ def _prep_data_cupac(
df_predict = df.drop(columns=[self.target_col])
# Split data into X and y
pre_experiment_x = pre_experiment_df.drop(columns=[self.target_col])
pre_experiment_y = pre_experiment_df[self.target_col]
if self.scale_col:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps adding a property in the class called is_delta_method would make this more explicit

pre_experiment_y = (
pre_experiment_df[self.target_col] / pre_experiment_df[self.scale_col]
)
else:
pre_experiment_y = pre_experiment_df[self.target_col]

# Keep only cupac features
if self.features_cupac_model:
Expand Down Expand Up @@ -214,3 +223,13 @@ def check_cupac_inputs(self, pre_experiment_df: Optional[pd.DataFrame] = None):
"If cupac is not used, pre_experiment_df should not be provided - "
"remove pre_experiment_df argument or set cupac_model to not None."
)

def check_cupac_config(self):
if self.is_cupac and self.target_col in self.features_cupac_model:
raise ValueError(
"If cupac is used, target_col should not be in features_cupac_model."
)
if self.is_cupac and self.scale_col in self.features_cupac_model:
raise ValueError(
"If cupac is used, scale_col should not be in features_cupac_model."
)
145 changes: 114 additions & 31 deletions cluster_experiments/experiment_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,7 @@ def _split_pre_experiment_df(self, df: pd.DataFrame):
class DeltaMethodAnalysis(ExperimentAnalysis):
def __init__(
self,
cluster_cols: Optional[List[str]] = None,
cluster_cols: List[str],
target_col: str = "target",
scale_col: str = "scale",
treatment_col: str = "treatment",
Expand All @@ -1235,13 +1235,12 @@ def __init__(
The analysis is done on the aggregated data at the cluster level, making computation more efficient.

Arguments:
cluster_cols: list of columns to use as clusters. Not available for the CUPED method.
cluster_cols: list of columns to use as clusters.
target_col: name of the column containing the variable to measure (the numerator of the ratio).
scale_col: name of the column containing the scale variable (the denominator of the ratio).
treatment_col: name of the column containing the treatment variable.
treatment: name of the treatment to use as the treated group.
covariates: list of columns to use as covariates.
ratio_covariates: list of tuples of columns to use as covariates for ratio metrics. First element is the numerator column, second element is the denominator column.
covariates: list of columns to use as covariates. Have to be previously aggregated at the cluster level.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this doesn't work with raw data? only aggregated?

hypothesis: one of "two-sided", "less", "greater" indicating the alternative hypothesis.

Usage:
Expand All @@ -1260,7 +1259,8 @@ def __init__(
DeltaMethodAnalysis(
cluster_cols=['cluster'],
target_col='x',
scale_col='y'
scale_col='y',
covariates=['x']
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same covariate as target?

).get_pvalue(df)
```
"""
Expand All @@ -1275,19 +1275,83 @@ def __init__(
)
self.scale_col = scale_col
self.cluster_cols = cluster_cols or []
self.covariates = covariates or []

def _transform_ratio_metric(
self, df: pd.DataFrame, numerator: str, denominator: str
) -> pd.DataFrame:
df = df.copy()
mean_numerator, mean_denominator = df.loc[:, [numerator, denominator]].mean()
df[f"{numerator}_{denominator}"] = (
df[numerator] / mean_denominator
- df[denominator] * mean_numerator / mean_denominator**2
)
return df

def _transform_metrics(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Transforms ratio metrics that are not at user level into a column with linearization from Delta Method done already.
These columns are added to the covariates which allows for the Delta Method to be used equally for all covariates (non user-level and user-level metrics).
"""
df = df.copy()
# transform target metric if necessary
numerator, denominator = self.target_col, self.scale_col
df = self._transform_ratio_metric(df, numerator, denominator)
df[f"original_{numerator}_{denominator}"] = df[numerator] / df[denominator]
self.target_metric = f"{numerator}_{denominator}"

return df

if covariates is not None:
warnings.warn(
"Covariates are not supported in the Delta Method approximation for the time being. They will be ignored."
def _compute_thetas(self, df: pd.DataFrame) -> np.array:
"""
Computes the theta value for the CUPED method.
"""

data = df[[self.target_metric] + self.covariates]
cov_mat = data.cov() # nxn
sigma = cov_mat.iloc[1:, 1:] # (n-1)x(n-1)
z = cov_mat.iloc[1:, 0] # (n-1)x1

thetas = np.dot(np.linalg.inv(sigma), z)
return thetas

def _get_y_hat(
self, df: pd.DataFrame, theta: Optional[np.array] = None
) -> pd.DataFrame:
"""
Compute CUPED estimate
"""

Y = df["original_" + self.target_metric].values.astype(float)
covariates = df[self.covariates]

Y_cv = Y.copy()
for k, _covariate in enumerate(self.covariates):
Y_cv -= theta[k] * (
covariates.values[:, k] - covariates.values[:, k].mean()
)
if cluster_cols is None:
raise ValueError(
"cluster_cols must be provided for the Delta Method analysis"
return Y_cv

def _get_var_y_hat(
self, df: pd.DataFrame, theta: Optional[np.array] = None
) -> float:
"""
Compute variance from CUPED estimate. Should also work for non-CUPED estimates if no covariate is given.
"""

group_size = len(df)

Y_var = df[self.target_metric].var()
for k, covariate in enumerate(self.covariates):
Y_var += (
theta[k] ** 2 * df[covariate].var()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt about splitting in a couple of steps?

- 2 * theta[k] * np.cov(df[covariate], df[self.target_metric])[0, 1]
)
return Y_var / group_size

def _aggregate_to_cluster(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Returns an aggreegated dataframe of the target and scale variables at the cluster (and treatment) level.
Returns an aggregated dataframe of the target and scale variables at the cluster (and treatment) level.

Arguments:
df: dataframe containing the data to analyze
Expand All @@ -1301,30 +1365,23 @@ def _aggregate_to_cluster(self, df: pd.DataFrame) -> pd.DataFrame:
def _get_group_mean_and_variance(self, df: pd.DataFrame) -> tuple[float, float]:
"""
Returns the mean and variance of the ratio metric (target/scale) as estimated by the delta method for a given group (treatment).
If covariates are given, variance reduction is used. For it to work, the dataframe must be aggregated first at the cluster level, so no assumptions on aggregation of covariates has to be done.

Arguments:
df: dataframe containing the data to analyze.
"""
df = self._aggregate_to_cluster(df)
group_size = len(df)

if group_size < 1000:
self.__warn_small_group_size()

target_mean, scale_mean = df.loc[:, [self.target_col, self.scale_col]].mean()
target_variance, scale_variance = df.loc[
:, [self.target_col, self.scale_col]
].var()
target_sum, scale_sum = df.loc[:, [self.target_col, self.scale_col]].sum()

target_scale_cov = df.loc[:, self.target_col].cov(df.loc[:, self.scale_col])
df = df.copy()
df = self._transform_metrics(df)
if self.covariates:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if we could avoid this if else in any way

thetas = self._compute_thetas(df)
Y_hat = self._get_y_hat(df, thetas)
group_mean = Y_hat.mean()
group_variance = self._get_var_y_hat(df, thetas)
else:
Y_hat = self._get_y_hat(df)
group_mean = Y_hat.mean()
group_variance = self._get_var_y_hat(df)

group_mean = target_sum / scale_sum
group_variance = (
(1 / (scale_mean**2)) * target_variance
+ (target_mean**2) / (scale_mean**4) * scale_variance
- (2 * target_mean) / (scale_mean**3) * target_scale_cov
) / group_size
return group_mean, group_variance

def _get_mean_standard_error(self, df: pd.DataFrame) -> tuple[float, float]:
Expand All @@ -1333,6 +1390,14 @@ def _get_mean_standard_error(self, df: pd.DataFrame) -> tuple[float, float]:
Variance reduction is used if covariates are given.
"""

if (self._get_num_clusters(df) < 1000).any():
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could the 1000 be a class variable?

self.__warn_small_group_size()

if self.covariates:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are covariates relevant to aggregation?

self.__check_data_is_aggregated(df)
else:
df = self._aggregate_to_cluster(df)

is_treatment = df[self.treatment_col] == 1
treat_mean, treat_var = self._get_group_mean_and_variance(df[is_treatment])
ctrl_mean, ctrl_var = self._get_group_mean_and_variance(df[~is_treatment])
Expand Down Expand Up @@ -1457,6 +1522,24 @@ def from_config(cls, config):
hypothesis=config.hypothesis,
)

def __check_data_is_aggregated(self, df):
"""
Check if the data is already aggregated at the cluster level.
"""

if df.groupby(self.cluster_cols).size().max() > 1:
raise ValueError(
"The data should be aggregated at the cluster level for the Delta Method analysis using CUPED."
)

def _get_num_clusters(self, df):
"""
Check if there are enough clusters to run the analysis.
"""
return df.groupby(self.treatment_col).apply(
lambda x: self._get_cluster_column(x).nunique()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we groupby treatment col?

)

def __warn_small_group_size(self):
warnings.warn(
"Delta Method approximation may not be accurate for small group sizes"
Expand Down
2 changes: 2 additions & 0 deletions cluster_experiments/power_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def __init__(
n_simulations: int = 100,
alpha: float = 0.05,
features_cupac_model: Optional[List[str]] = None,
scale_col: Optional[str] = None,
seed: Optional[int] = None,
hypothesis: str = "two-sided",
):
Expand All @@ -120,6 +121,7 @@ def __init__(
self.cupac_handler = CupacHandler(
cupac_model=cupac_model,
target_col=target_col,
scale_col=scale_col,
features_cupac_model=features_cupac_model,
)
if seed is not None:
Expand Down
Loading