-
Notifications
You must be signed in to change notification settings - Fork 4
feat: Add Variance Reduction to Delta Method and couple it with CUPAC #230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
66cc2d6
e8f4eb0
df0c7e1
22c344d
3801b7b
aa3318d
55709f1
5c1bd1a
e65774e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1222,7 +1222,7 @@ def _split_pre_experiment_df(self, df: pd.DataFrame): | |
class DeltaMethodAnalysis(ExperimentAnalysis): | ||
def __init__( | ||
self, | ||
cluster_cols: Optional[List[str]] = None, | ||
cluster_cols: List[str], | ||
target_col: str = "target", | ||
scale_col: str = "scale", | ||
treatment_col: str = "treatment", | ||
|
@@ -1235,13 +1235,12 @@ def __init__( | |
The analysis is done on the aggregated data at the cluster level, making computation more efficient. | ||
|
||
Arguments: | ||
cluster_cols: list of columns to use as clusters. Not available for the CUPED method. | ||
cluster_cols: list of columns to use as clusters. | ||
target_col: name of the column containing the variable to measure (the numerator of the ratio). | ||
scale_col: name of the column containing the scale variable (the denominator of the ratio). | ||
treatment_col: name of the column containing the treatment variable. | ||
treatment: name of the treatment to use as the treated group. | ||
covariates: list of columns to use as covariates. | ||
ratio_covariates: list of tuples of columns to use as covariates for ratio metrics. First element is the numerator column, second element is the denominator column. | ||
covariates: list of columns to use as covariates. Have to be previously aggregated at the cluster level. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so this doesn't work with raw data? only aggregated? |
||
hypothesis: one of "two-sided", "less", "greater" indicating the alternative hypothesis. | ||
|
||
Usage: | ||
|
@@ -1260,7 +1259,8 @@ def __init__( | |
DeltaMethodAnalysis( | ||
cluster_cols=['cluster'], | ||
target_col='x', | ||
scale_col='y' | ||
scale_col='y', | ||
covariates=['x'] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same covariate as target? |
||
).get_pvalue(df) | ||
``` | ||
""" | ||
|
@@ -1275,19 +1275,83 @@ def __init__( | |
) | ||
self.scale_col = scale_col | ||
self.cluster_cols = cluster_cols or [] | ||
self.covariates = covariates or [] | ||
|
||
def _transform_ratio_metric( | ||
self, df: pd.DataFrame, numerator: str, denominator: str | ||
) -> pd.DataFrame: | ||
df = df.copy() | ||
mean_numerator, mean_denominator = df.loc[:, [numerator, denominator]].mean() | ||
df[f"{numerator}_{denominator}"] = ( | ||
df[numerator] / mean_denominator | ||
- df[denominator] * mean_numerator / mean_denominator**2 | ||
) | ||
return df | ||
|
||
def _transform_metrics(self, df: pd.DataFrame) -> pd.DataFrame: | ||
""" | ||
Transforms ratio metrics that are not at user level into a column with linearization from Delta Method done already. | ||
These columns are added to the covariates which allows for the Delta Method to be used equally for all covariates (non user-level and user-level metrics). | ||
""" | ||
df = df.copy() | ||
# transform target metric if necessary | ||
numerator, denominator = self.target_col, self.scale_col | ||
df = self._transform_ratio_metric(df, numerator, denominator) | ||
df[f"original_{numerator}_{denominator}"] = df[numerator] / df[denominator] | ||
self.target_metric = f"{numerator}_{denominator}" | ||
|
||
return df | ||
|
||
if covariates is not None: | ||
warnings.warn( | ||
"Covariates are not supported in the Delta Method approximation for the time being. They will be ignored." | ||
def _compute_thetas(self, df: pd.DataFrame) -> np.array: | ||
""" | ||
Computes the theta value for the CUPED method. | ||
""" | ||
|
||
data = df[[self.target_metric] + self.covariates] | ||
cov_mat = data.cov() # nxn | ||
sigma = cov_mat.iloc[1:, 1:] # (n-1)x(n-1) | ||
z = cov_mat.iloc[1:, 0] # (n-1)x1 | ||
|
||
thetas = np.dot(np.linalg.inv(sigma), z) | ||
return thetas | ||
|
||
def _get_y_hat( | ||
self, df: pd.DataFrame, theta: Optional[np.array] = None | ||
) -> pd.DataFrame: | ||
""" | ||
Compute CUPED estimate | ||
""" | ||
|
||
Y = df["original_" + self.target_metric].values.astype(float) | ||
covariates = df[self.covariates] | ||
|
||
Y_cv = Y.copy() | ||
for k, _covariate in enumerate(self.covariates): | ||
Y_cv -= theta[k] * ( | ||
covariates.values[:, k] - covariates.values[:, k].mean() | ||
) | ||
if cluster_cols is None: | ||
raise ValueError( | ||
"cluster_cols must be provided for the Delta Method analysis" | ||
return Y_cv | ||
|
||
def _get_var_y_hat( | ||
self, df: pd.DataFrame, theta: Optional[np.array] = None | ||
) -> float: | ||
""" | ||
Compute variance from CUPED estimate. Should also work for non-CUPED estimates if no covariate is given. | ||
""" | ||
|
||
group_size = len(df) | ||
|
||
Y_var = df[self.target_metric].var() | ||
for k, covariate in enumerate(self.covariates): | ||
Y_var += ( | ||
theta[k] ** 2 * df[covariate].var() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wdyt about splitting in a couple of steps? |
||
- 2 * theta[k] * np.cov(df[covariate], df[self.target_metric])[0, 1] | ||
) | ||
return Y_var / group_size | ||
|
||
def _aggregate_to_cluster(self, df: pd.DataFrame) -> pd.DataFrame: | ||
""" | ||
Returns an aggreegated dataframe of the target and scale variables at the cluster (and treatment) level. | ||
Returns an aggregated dataframe of the target and scale variables at the cluster (and treatment) level. | ||
|
||
Arguments: | ||
df: dataframe containing the data to analyze | ||
|
@@ -1301,30 +1365,23 @@ def _aggregate_to_cluster(self, df: pd.DataFrame) -> pd.DataFrame: | |
def _get_group_mean_and_variance(self, df: pd.DataFrame) -> tuple[float, float]: | ||
""" | ||
Returns the mean and variance of the ratio metric (target/scale) as estimated by the delta method for a given group (treatment). | ||
If covariates are given, variance reduction is used. For it to work, the dataframe must be aggregated first at the cluster level, so no assumptions on aggregation of covariates has to be done. | ||
|
||
Arguments: | ||
df: dataframe containing the data to analyze. | ||
""" | ||
df = self._aggregate_to_cluster(df) | ||
group_size = len(df) | ||
|
||
if group_size < 1000: | ||
self.__warn_small_group_size() | ||
|
||
target_mean, scale_mean = df.loc[:, [self.target_col, self.scale_col]].mean() | ||
target_variance, scale_variance = df.loc[ | ||
:, [self.target_col, self.scale_col] | ||
].var() | ||
target_sum, scale_sum = df.loc[:, [self.target_col, self.scale_col]].sum() | ||
|
||
target_scale_cov = df.loc[:, self.target_col].cov(df.loc[:, self.scale_col]) | ||
df = df.copy() | ||
df = self._transform_metrics(df) | ||
if self.covariates: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wondering if we could avoid this if else in any way |
||
thetas = self._compute_thetas(df) | ||
Y_hat = self._get_y_hat(df, thetas) | ||
group_mean = Y_hat.mean() | ||
group_variance = self._get_var_y_hat(df, thetas) | ||
else: | ||
Y_hat = self._get_y_hat(df) | ||
group_mean = Y_hat.mean() | ||
group_variance = self._get_var_y_hat(df) | ||
|
||
group_mean = target_sum / scale_sum | ||
group_variance = ( | ||
(1 / (scale_mean**2)) * target_variance | ||
+ (target_mean**2) / (scale_mean**4) * scale_variance | ||
- (2 * target_mean) / (scale_mean**3) * target_scale_cov | ||
) / group_size | ||
return group_mean, group_variance | ||
|
||
def _get_mean_standard_error(self, df: pd.DataFrame) -> tuple[float, float]: | ||
|
@@ -1333,6 +1390,14 @@ def _get_mean_standard_error(self, df: pd.DataFrame) -> tuple[float, float]: | |
Variance reduction is used if covariates are given. | ||
""" | ||
|
||
if (self._get_num_clusters(df) < 1000).any(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could the 1000 be a class variable? |
||
self.__warn_small_group_size() | ||
|
||
if self.covariates: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why are covariates relevant to aggregation? |
||
self.__check_data_is_aggregated(df) | ||
else: | ||
df = self._aggregate_to_cluster(df) | ||
|
||
is_treatment = df[self.treatment_col] == 1 | ||
treat_mean, treat_var = self._get_group_mean_and_variance(df[is_treatment]) | ||
ctrl_mean, ctrl_var = self._get_group_mean_and_variance(df[~is_treatment]) | ||
|
@@ -1457,6 +1522,24 @@ def from_config(cls, config): | |
hypothesis=config.hypothesis, | ||
) | ||
|
||
def __check_data_is_aggregated(self, df): | ||
""" | ||
Check if the data is already aggregated at the cluster level. | ||
""" | ||
|
||
if df.groupby(self.cluster_cols).size().max() > 1: | ||
raise ValueError( | ||
"The data should be aggregated at the cluster level for the Delta Method analysis using CUPED." | ||
) | ||
|
||
def _get_num_clusters(self, df): | ||
""" | ||
Check if there are enough clusters to run the analysis. | ||
""" | ||
return df.groupby(self.treatment_col).apply( | ||
lambda x: self._get_cluster_column(x).nunique() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we groupby treatment col? |
||
) | ||
|
||
def __warn_small_group_size(self): | ||
warnings.warn( | ||
"Delta Method approximation may not be accurate for small group sizes" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps adding a property in the class called is_delta_method would make this more explicit