diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..64affdd --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,2 @@ +# Default Approval rule if one of the later sections does not apply +* @nv-morpheus/dfencoder-codeowners diff --git a/.github/ops-bot.yaml b/.github/ops-bot.yaml new file mode 100644 index 0000000..fbe76f6 --- /dev/null +++ b/.github/ops-bot.yaml @@ -0,0 +1,24 @@ +# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This file controls which features from the `ops-bot` repository below are enabled. +# - https://github.com/rapidsai/ops-bot + +auto_merger: true +branch_checker: true +label_checker: true +release_drafter: true +copy_prs: true +rerun_tests: true diff --git a/dfencoder/autoencoder.py b/dfencoder/autoencoder.py index 75a02bd..9f951d8 100644 --- a/dfencoder/autoencoder.py +++ b/dfencoder/autoencoder.py @@ -44,17 +44,18 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -from collections import OrderedDict import gc +from collections import OrderedDict -import pandas as pd import numpy as np +import pandas as pd import torch import tqdm from .dataframe import EncoderDataFrame from .logging import BasicLogger, IpynbLogger, TensorboardXLogger -from .scalers import StandardScaler, NullScaler, GaussRankScaler +from .scalers import GaussRankScaler, NullScaler, StandardScaler, ModifiedScaler + def ohe(input_vector, dim, device="cpu"): """Does one-hot encoding of input vector.""" @@ -69,6 +70,7 @@ def ohe(input_vector, dim, device="cpu"): return y_onehot + def compute_embedding_size(n_categories): """ Applies a standard formula to choose the number of feature embeddings @@ -76,23 +78,16 @@ def compute_embedding_size(n_categories): n_categories is the number of unique categories in a column. """ - val = min(600, round(1.6 * n_categories ** 0.56)) + val = min(600, round(1.6 * n_categories**0.56)) return int(val) + class CompleteLayer(torch.nn.Module): """ Impliments a layer with linear transformation and optional activation and dropout.""" - def __init__( - self, - in_dim, - out_dim, - activation=None, - dropout=None, - *args, - **kwargs - ): + def __init__(self, in_dim, out_dim, activation=None, dropout=None, *args, **kwargs): super(CompleteLayer, self).__init__(*args, **kwargs) self.layers = [] linear = torch.nn.Linear(in_dim, out_dim) @@ -137,42 +132,44 @@ def forward(self, x): x = layer(x) return x + class AutoEncoder(torch.nn.Module): - def __init__( - self, - encoder_layers=None, - decoder_layers=None, - encoder_dropout=None, - decoder_dropout=None, - encoder_activations=None, - decoder_activations=None, - activation='relu', - min_cats=10, - swap_p=.15, - lr=0.01, - batch_size=256, - eval_batch_size=1024, - optimizer='adam', - amsgrad=False, - momentum=0, - betas=(0.9, 0.999), - dampening=0, - weight_decay=0, - lr_decay=None, - nesterov=False, - verbose=False, - device=None, - logger='basic', - logdir='logdir/', - project_embeddings=True, - run=None, - progress_bar=True, - n_megabatches=1, - scaler='standard', - *args, - **kwargs - ): + def __init__(self, + encoder_layers=None, + decoder_layers=None, + encoder_dropout=None, + decoder_dropout=None, + encoder_activations=None, + decoder_activations=None, + activation='relu', + min_cats=10, + swap_p=.15, + lr=0.01, + batch_size=256, + eval_batch_size=1024, + optimizer='adam', + amsgrad=False, + momentum=0, + betas=(0.9, 0.999), + dampening=0, + weight_decay=0, + lr_decay=None, + nesterov=False, + verbose=False, + device=None, + logger='basic', + logdir='logdir/', + project_embeddings=True, + run=None, + progress_bar=True, + n_megabatches=1, + scaler='standard', + patience=5, + preset_cats=None, + loss_scaler='standard', # scaler for the losses (z score) + *args, + **kwargs): super(AutoEncoder, self).__init__(*args, **kwargs) self.numeric_fts = OrderedDict() self.binary_fts = OrderedDict() @@ -186,6 +183,7 @@ def __init__( self.encoder_dropout = encoder_dropout self.decoder_dropout = decoder_dropout self.min_cats = min_cats + self.preset_cats = preset_cats self.encoder = [] self.decoder = [] self.train_mode = self.train @@ -228,16 +226,21 @@ def __init__( self.logdir = logdir self.run = run self.project_embeddings = project_embeddings - self.scaler = scaler + self.patience = patience + + # scaler class used to scale losses and collect loss stats + self.loss_scaler_str = loss_scaler + self.loss_scaler = self.get_scaler(loss_scaler) self.n_megabatches = n_megabatches def get_scaler(self, name): scalers = { - 'standard': StandardScaler, - 'gauss_rank': GaussRankScaler, - None: NullScaler, + 'standard': StandardScaler, + 'gauss_rank': GaussRankScaler, + 'modified': ModifiedScaler, + None: NullScaler, 'none': NullScaler } return scalers[name] @@ -255,16 +258,13 @@ def init_numeric(self, df): for ft in numeric: Scaler = self.get_scaler(scalers.get(ft, 'gauss_rank')) - feature = { - 'mean': df[ft].mean(), - 'std': df[ft].std(), - 'scaler': Scaler() - } + feature = {'mean': df[ft].mean(), 'std': df[ft].std(), 'scaler': Scaler()} feature['scaler'].fit(df[ft][~df[ft].isna()].values) self.numeric_fts[ft] = feature self.num_names = list(self.numeric_fts.keys()) - def create_numerical_col_max(self,num_names, mse_loss): + + def create_numerical_col_max(self, num_names, mse_loss): if num_names: num_df = pd.DataFrame(num_names) num_df.columns = ['num_col_max_loss'] @@ -277,8 +277,7 @@ def create_numerical_col_max(self,num_names, mse_loss): num_df = pd.DataFrame() return num_df - - def create_binary_col_max(self,bin_names, bce_loss): + def create_binary_col_max(self, bin_names, bce_loss): if bin_names: bool_df = pd.DataFrame(bin_names) bool_df.columns = ['bin_col_max_loss'] @@ -291,8 +290,7 @@ def create_binary_col_max(self,bin_names, bce_loss): bool_df = pd.DataFrame() return bool_df - - def create_categorical_col_max(self,cat_names, cce_loss): + def create_categorical_col_max(self, cat_names, cce_loss): final_list = [] if cat_names: for index, val in enumerate(cce_loss): @@ -304,16 +302,15 @@ def create_categorical_col_max(self,cat_names, cce_loss): else: cat_df = pd.DataFrame() return cat_df - - def get_variable_importance(self, num_names, cat_names, bin_names, mse_loss, bce_loss, cce_loss, - cloudtrail_df): + + def get_variable_importance(self, num_names, cat_names, bin_names, mse_loss, bce_loss, cce_loss, cloudtrail_df): # Get data in the right format num_df = self.create_numerical_col_max(num_names, mse_loss) bool_df = self.create_binary_col_max(bin_names, bce_loss) cat_df = self.create_categorical_col_max(cat_names, cce_loss) variable_importance_df = pd.concat([num_df, bool_df, cat_df], axis=1) return variable_importance_df - + def return_feature_names(self): bin_names = list(self.binary_fts.keys()) num_names = list(self.numeric_fts.keys()) @@ -347,8 +344,11 @@ def init_binary(self, df): self.bin_names = list(self.binary_fts.keys()) def init_features(self, df): + if self.preset_cats is not None: + self.categorical_fts = self.preset_cats + else: + self.init_cats(df) self.init_numeric(df) - self.init_cats(df) self.init_binary(df) def build_inputs(self): @@ -413,13 +413,11 @@ def build_optimizer(self): lr = self.lr params = self.parameters() if self.optimizer == 'adam': - return torch.optim.Adam( - params, - lr=self.lr, - amsgrad=self.amsgrad, - weight_decay=self.weight_decay, - betas=self.betas - ) + return torch.optim.Adam(params, + lr=self.lr, + amsgrad=self.amsgrad, + weight_decay=self.weight_decay, + betas=self.betas) elif self.optimizer == 'sgd': return torch.optim.SGD( params, @@ -428,7 +426,6 @@ def build_optimizer(self): nesterov=self.nesterov, dampening=self.dampening, weight_decay=self.weight_decay, - ) def build_model(self, df): @@ -468,24 +465,14 @@ def build_model(self, df): for i, dim in enumerate(self.encoder_layers): activation = self.encoder_activations[i] - layer = CompleteLayer( - input_dim, - dim, - activation=activation, - dropout=self.encoder_dropout[i] - ) + layer = CompleteLayer(input_dim, dim, activation=activation, dropout=self.encoder_dropout[i]) input_dim = dim self.encoder.append(layer) self.add_module(f'encoder_{i}', layer) for i, dim in enumerate(self.decoder_layers): activation = self.decoder_activations[i] - layer = CompleteLayer( - input_dim, - dim, - activation=activation, - dropout=self.decoder_dropout[i] - ) + layer = CompleteLayer(input_dim, dim, activation=activation, dropout=self.decoder_dropout[i]) input_dim = dim self.decoder.append(layer) self.add_module(f'decoder_{i}', layer) @@ -586,7 +573,7 @@ def compute_loss(self, num, bin, cat, target_df, logging=True, _id=False): net_loss += list(mse_loss.mean(dim=0).cpu().detach().numpy()) mse_loss = mse_loss.mean() bce_loss = self.bce(bin, bin_target) - + net_loss += list(bce_loss.mean(dim=0).cpu().detach().numpy()) bce_loss = bce_loss.mean() cce_loss = [] @@ -640,38 +627,54 @@ def compute_baseline_performance(self, in_, out_): dim = len(feature['cats']) + 1 pred = ohe(cd, dim, device=self.device) * 5 codes_pred.append(pred) - mse_loss, bce_loss, cce_loss, net_loss = self.compute_loss( - num_pred, - bin_pred, - codes_pred, - out_, - logging=False - ) + mse_loss, bce_loss, cce_loss, net_loss = self.compute_loss(num_pred, bin_pred, codes_pred, out_, logging=False) if isinstance(self.logger, BasicLogger): self.logger.baseline_loss = net_loss return net_loss def _create_stat_dict(self, a): - scaler = StandardScaler() + scaler = self.loss_scaler() scaler.fit(a) - mean = scaler.mean - std = scaler.std - return {'scaler': scaler, 'mean': mean, 'std': std} - - def fit(self, df, epochs=1, val=None): - """Does training.""" - pdf = df.copy() - # if val is None: - # pdf_val = None - # else: - # pdf_val = val.copy() + return {'scaler': scaler} + + def fit( + self, df, epochs=1, val=None, run_validation=False, use_val_for_loss_stats=False + ): + """Does training. + Args: + df: pandas df used for training + epochs: number of epochs to run training + val: optional pandas dataframe for validation or loss stats + run_validation: boolean indicating whether to collect validation loss for each + epoch during training + use_val_for_loss_stats: boolean indicating whether to use the validation set + for loss statistics collection (for z score calculation) + + Raises: + ValueError: + if run_validation or use_val_for_loss_stats is True but val is not provided + """ + if (run_validation or use_val_for_loss_stats) and val is None: + raise ValueError( + "Validation set is required if either run_validation or \ + use_val_for_loss_stats is set to True." + ) + + if use_val_for_loss_stats: + df_for_loss_stats = val.copy() + else: + # use train loss + df_for_loss_stats = df.copy() + + if run_validation and val is not None: + val = val.copy() if self.optim is None: self.build_model(df) if self.n_megabatches == 1: df = self.prepare_df(df) - if val is not None: + if run_validation and val is not None: val_df = self.prepare_df(val) val_in = val_df.swap(likelihood=self.swap_p) msg = "Validating during training.\n" @@ -687,6 +690,9 @@ def fit(self, df, epochs=1, val=None): n_updates = len(df) // self.batch_size if len(df) % self.batch_size > 0: n_updates += 1 + last_loss = 5000 + + count_es = 0 for i in range(epochs): self.train() if self.verbose: @@ -702,7 +708,7 @@ def fit(self, df, epochs=1, val=None): if self.lr_decay is not None: self.lr_decay.step() - if val is not None: + if run_validation and val is not None: self.eval() with torch.no_grad(): swapped_loss = [] @@ -713,7 +719,7 @@ def fit(self, df, epochs=1, val=None): slc_in = val_in.iloc[start:stop] slc_in_tensor = self.build_input_tensor(slc_in) - + slc_out = val_df.iloc[start:stop] slc_out_tensor = self.build_input_tensor(slc_out) @@ -725,6 +731,28 @@ def fit(self, df, epochs=1, val=None): _, _, _, net_loss = self.compute_loss(num, bin, cat, slc_out, _id=True) id_loss.append(net_loss) + # Early stopping + current_net_loss = net_loss + if self.verbose: + print('The Current Net Loss:', current_net_loss) + + if current_net_loss > last_loss: + count_es += 1 + if self.verbose: + print('Early stop count:', count_es) + + if count_es >= self.patience: + if self.verbose: + print('Early stopping: early stop count({}) >= patience({})'.format(count_es, self.patience)) + break + + else: + if self.verbose: + print('Set count for earlystop: 0') + count_es = 0 + + last_loss = current_net_loss + self.logger.end_epoch() # if self.project_embeddings: # self.logger.show_embeddings(self.categorical_fts) @@ -743,17 +771,17 @@ def fit(self, df, epochs=1, val=None): #Getting training loss statistics # mse_loss, bce_loss, cce_loss, _ = self.get_anomaly_score(pdf) if pdf_val is None else self.get_anomaly_score(pd.concat([pdf, pdf_val])) - mse_loss, bce_loss, cce_loss, _ = self.get_anomaly_score_with_losses(pdf) + mse_loss, bce_loss, cce_loss, _ = self.get_anomaly_score_with_losses(df_for_loss_stats) for i, ft in enumerate(self.numeric_fts): - i_loss = mse_loss[:,i].cpu().numpy() + i_loss = mse_loss[:, i] self.feature_loss_stats[ft] = self._create_stat_dict(i_loss) for i, ft in enumerate(self.binary_fts): - i_loss = bce_loss[:,i].cpu().numpy() + i_loss = bce_loss[:, i] self.feature_loss_stats[ft] = self._create_stat_dict(i_loss) for i, ft in enumerate(self.categorical_fts): - i_loss = cce_loss[i].cpu().numpy() + i_loss = cce_loss[:, i] self.feature_loss_stats[ft] = self._create_stat_dict(i_loss) - + def train_epoch(self, n_updates, input_df, df, pbar=None): """Run regular epoch.""" @@ -771,10 +799,7 @@ def train_epoch(self, n_updates, input_df, df, pbar=None): in_sample_tensor = self.build_input_tensor(in_sample) target_sample = df.iloc[start:stop] num, bin, cat = self.forward(in_sample_tensor) - mse, bce, cce, net_loss = self.compute_loss( - num, bin, cat, target_sample, - logging=True - ) + mse, bce, cce, net_loss = self.compute_loss(num, bin, cat, target_sample, logging=True) self.do_backward(mse, bce, cce) self.optim.step() self.optim.zero_grad() @@ -888,60 +913,18 @@ def get_deep_stack_features(self, df): result = torch.cat(result, dim=0) return result - def get_anomaly_score_with_losses(self, df): - self.eval() - data = self.prepare_df(df) - input = self.build_input_tensor(data) - - num_target, bin_target, codes = self.compute_targets(data) - - with torch.no_grad(): - num, bin, cat = self.forward(input) - - mse_loss = self.mse(num, num_target) - net_loss = [mse_loss.data] - bce_loss = self.bce(bin, bin_target) - net_loss += [bce_loss.data] - cce_loss = [] - for i, ft in enumerate(self.categorical_fts): - loss = self.cce(cat[i], codes[i]) - cce_loss.append(loss) - net_loss += [loss.data.reshape(-1, 1)] - - net_loss = torch.cat(net_loss, dim=1).mean(dim=1) - return mse_loss, bce_loss,cce_loss,net_loss.cpu().numpy() - def get_anomaly_score(self, df): """ Returns a per-row loss of the input dataframe. Does not corrupt inputs. """ - return self.get_anomaly_score_with_losses(df)[3] - - def get_scaled_anomaly_scores(self, df): - self.eval() - data = self.prepare_df(df) - input = self.build_input_tensor(data) - - num_target, bin_target, codes = self.compute_targets(data) - with torch.no_grad(): - num, bin, cat = self.forward(input) + mse, bce, cce = self.get_anomaly_score_losses(df) + combined_loss = torch.cat([mse, bce, cce], dim=1) - mse_loss = self.mse(num, num_target) - mse_scaled = torch.zeros(mse_loss.shape) - for i, ft in enumerate(self.numeric_fts): - mse_scaled[:,i] = torch.tensor(self.feature_loss_stats[ft]['scaler'].transform(mse_loss[:,i].cpu().numpy())) - bce_loss = self.bce(bin, bin_target) - bce_scaled = torch.zeros(bce_loss.shape) - for i, ft in enumerate(self.binary_fts): - bce_scaled[:,i] = torch.tensor(self.feature_loss_stats[ft]['scaler'].transform(mse_loss[:,i].cpu().numpy())) - cce_scaled = [] - for i, ft in enumerate(self.categorical_fts): - loss = torch.tensor(self.feature_loss_stats[ft]['scaler'].transform(self.cce(cat[i], codes[i]).cpu().numpy())) - cce_scaled.append(loss) + net_loss = combined_loss.mean(dim=1).cpu().numpy() - return mse_scaled, bce_scaled, cce_scaled + return net_loss def decode_to_df(self, x, df=None): """ @@ -972,19 +955,22 @@ def decode_to_df(self, x, df=None): bin_df = bin_df.apply(lambda x: round(x)).astype(bool) for ft in bin_df.columns: feature = self.binary_fts[ft] - map = { - False: feature['cats'][0], - True: feature['cats'][1] - } + map = {False: feature['cats'][0], True: feature['cats'][1]} bin_df[ft] = bin_df[ft].apply(lambda x: map[x]) cat_df = pd.DataFrame(index=df.index) for i, ft in enumerate(self.categorical_fts): feature = self.categorical_fts[ft] - # get argmax excluding NaN column (impute with next-best guess) - codes = torch.argmax(cat[i][:, :-1], dim=1).cpu().numpy() - cat_df[ft] = codes cats = feature['cats'] + + if (len(cats) > 0): + # get argmax excluding NaN column (impute with next-best guess) + codes = torch.argmax(cat[i][:, :-1], dim=1).cpu().numpy() + else: + # Only one option + codes = torch.argmax(cat[i], dim=1).cpu().numpy() + cat_df[ft] = codes + cats = feature['cats'] + ["_other"] cat_df[ft] = cat_df[ft].apply(lambda x: cats[x]) # concat @@ -1009,36 +995,128 @@ def df_predict(self, df): return output_df - def get_results(self, df, return_abs = False): - pdf = df.copy() - orig_cols = pdf.columns + def get_anomaly_score_with_losses(self, df): + + mse, bce, cce = self.get_anomaly_score_losses(df) + + net = self.get_anomaly_score(df) + + return mse, bce, cce, net + + def get_anomaly_score_losses(self, df): + """ + Run the input dataframe `df` through the autoencoder to get the recovery losses by feature type + (numerical/boolean/categorical). + """ self.eval() + + n_batches = len(df) // self.batch_size + if len(df) % self.batch_size > 0: + n_batches += 1 + + mse_loss_slices, bce_loss_slices, cce_loss_slices = [], [], [] + with torch.no_grad(): + for i in range(n_batches): + start = i * self.batch_size + stop = (i + 1) * self.batch_size + + df_slice = df.iloc[start:stop] + data_slice = self.prepare_df(df_slice) + num_target, bin_target, codes = self.compute_targets(data_slice) + + input_slice = self.build_input_tensor(data_slice) + + num, bin, cat = self.forward(input_slice) + mse_loss_slice: torch.Tensor = self.mse(num, num_target) + bce_loss_slice: torch.Tensor = self.bce(bin, bin_target) + cce_loss_slice_of_each_feat = [] # each entry in this list is the cce loss of a feature, ordered by the feature list self.categorical_fts + for i, ft in enumerate(self.categorical_fts): + loss = self.cce(cat[i], codes[i]) + # Convert to 2 dimensions + cce_loss_slice_of_each_feat.append(loss.data.reshape(-1, 1)) + cce_loss_slice = torch.cat(cce_loss_slice_of_each_feat, dim=1) # merge the tensors into one (n_records * n_features) tensor + + mse_loss_slices.append(mse_loss_slice) + bce_loss_slices.append(bce_loss_slice) + cce_loss_slices.append(cce_loss_slice) + + mse_loss = torch.cat(mse_loss_slices, dim=0) + bce_loss = torch.cat(bce_loss_slices, dim=0) + cce_loss = torch.cat(cce_loss_slices, dim=0) + return mse_loss, bce_loss, cce_loss + + def scale_losses(self, mse, bce, cce): + + # Create outputs + mse_scaled = torch.zeros_like(mse) + bce_scaled = torch.zeros_like(bce) + cce_scaled = torch.zeros_like(cce) + + for i, ft in enumerate(self.numeric_fts): + mse_scaled[:, i] = self.feature_loss_stats[ft]['scaler'].transform(mse[:, i]) + + for i, ft in enumerate(self.binary_fts): + bce_scaled[:, i] = self.feature_loss_stats[ft]['scaler'].transform(bce[:, i]) + + for i, ft in enumerate(self.categorical_fts): + cce_scaled[:, i] = self.feature_loss_stats[ft]['scaler'].transform(cce[:, i]) + + return mse_scaled, bce_scaled, cce_scaled + + def get_results(self, df, return_abs=False): + pdf = pd.DataFrame() + self.eval() + data = self.prepare_df(df) + with torch.no_grad(): num, bin, embeddings = self.encode_input(data) x = torch.cat(num + bin + embeddings, dim=1) x = self.encode(x) - output_df = self.decode_to_df(x, df=df) - mse, bce, cce, _ = self.get_anomaly_score_with_losses(df) - mse_scaled, bce_scaled, cce_scaled = self.get_scaled_anomaly_scores(df) + output_df = self.decode_to_df(x) + + # set the index of the prediction df to match the input df + output_df.index = df.index + + mse, bce, cce = self.get_anomaly_score_losses(df) + mse_scaled, bce_scaled, cce_scaled = self.scale_losses(mse, bce, cce) + + if (return_abs): + mse_scaled = abs(mse_scaled) + bce_scaled = abs(bce_scaled) + cce_scaled = abs(cce_scaled) + + combined_loss = torch.cat([mse_scaled, bce_scaled, cce_scaled], dim=1) + for i, ft in enumerate(self.numeric_fts): - pdf[ft+'_pred'] = output_df[ft] - pdf[ft+'_loss'] = mse[:, i].cpu().numpy() - pdf[ft+'_z_loss'] = mse_scaled[:, i].cpu().numpy() if not return_abs else abs(mse_scaled[:, i].cpu().numpy()) + pdf[ft] = df[ft] + pdf[ft + '_pred'] = output_df[ft] + pdf[ft + '_loss'] = mse[:, i].cpu().numpy() + pdf[ft + '_z_loss'] = mse_scaled[:, i].cpu().numpy() + for i, ft in enumerate(self.binary_fts): - pdf[ft+'_pred'] = output_df[ft] - pdf[ft+'_loss'] = bce[:, i].cpu().numpy() - pdf[ft+'_z_loss'] = bce_scaled[:, i].cpu().numpy() if not return_abs else abs(bce_scaled[:, i].cpu().numpy()) + pdf[ft] = df[ft] + pdf[ft + '_pred'] = output_df[ft] + pdf[ft + '_loss'] = bce[:, i].cpu().numpy() + pdf[ft + '_z_loss'] = bce_scaled[:, i].cpu().numpy() + for i, ft in enumerate(self.categorical_fts): - pdf[ft+'_pred'] = output_df[ft] - pdf[ft+'_loss'] = cce[i].cpu().numpy() - pdf[ft+'_z_loss'] = cce_scaled[i].cpu().numpy() if not return_abs else abs(cce_scaled[i].cpu().numpy()) - all_cols = [[c, c+'_pred', c+'_loss', c+'_z_loss'] for c in orig_cols] - result_cols = [col for col_collection in all_cols for col in col_collection] - z_losses = [c+'_z_loss' for c in orig_cols] - pdf['max_abs_z'] = pdf[z_losses].max(axis=1) - pdf['mean_abs_z'] = pdf[z_losses].mean(axis=1) - result_cols.append('max_abs_z') - result_cols.append('mean_abs_z') - return pdf[result_cols] - \ No newline at end of file + pdf[ft] = df[ft] + pdf[ft + '_pred'] = output_df[ft] + pdf[ft + '_loss'] = cce[:, i].cpu().numpy() + pdf[ft + '_z_loss'] = cce_scaled[:, i].cpu().numpy() + + pdf['max_abs_z'] = combined_loss.max(dim=1)[0].cpu().numpy() + pdf['mean_abs_z'] = combined_loss.mean(dim=1).cpu().numpy() + + # add a column describing the scaler of the losses + if self.loss_scaler_str == 'standard': + output_scaled_loss_str = 'z' + elif self.loss_scaler_str == 'modified': + output_scaled_loss_str = 'modz' + else: + # in case other custom scaling is used + output_scaled_loss_str = f'{self.loss_scaler_str}_scaled' + pdf['z_loss_scaler_type'] = output_scaled_loss_str + + return pdf diff --git a/dfencoder/scalers.py b/dfencoder/scalers.py index e78d773..07a275f 100644 --- a/dfencoder/scalers.py +++ b/dfencoder/scalers.py @@ -1,6 +1,20 @@ +import typing + import numpy as np +import torch from sklearn.preprocessing import QuantileTransformer +def ensure_float_type(x: typing.Union[torch.Tensor, np.ndarray]): + """Ensure we are in the right floating point format. """ + if (isinstance(x, torch.Tensor)): + result = x.to(dtype=torch.float32, copy=True) + elif (isinstance(x, np.ndarray)): + result = x.astype(float) + else: + raise ValueError(f"Unsupported type: {type(x)}") + return result + + class StandardScaler(object): """Impliments standard (mean/std) scaling.""" @@ -8,26 +22,76 @@ def __init__(self): self.mean = None self.std = None - def fit(self, x): - self.mean = x.mean() - self.std = x.std() + def fit(self, x: torch.Tensor): + self.mean = x.mean().item() + self.std = x.std().item() - def transform(self, x): - result = x.astype(float) + # Having a std == 0 (when all values are the same), breaks training. Just use 1.0 in this case + if (self.std == 0): + self.std = 1.0 + + def transform(self, x: typing.Union[torch.Tensor, np.ndarray]): + result = ensure_float_type(x) result -= self.mean result /= self.std return result - def inverse_transform(self, x): - result = x.astype(float) + def inverse_transform(self, x: torch.Tensor): + result = ensure_float_type(x) result *= self.std result += self.mean return result - def fit_transform(self, x): + def fit_transform(self, x: torch.Tensor): self.fit(x) return self.transform(x) +class ModifiedScaler(object): + """Implements scaling using modified z score. + Reference: https://www.ibm.com/docs/el/cognos-analytics/11.1.0?topic=terms-modified-z-score + """ + MAD_SCALING_FACTOR = 1.486 # 1.486 * MAD approximately equals the standard deviation + MEANAD_SCALING_FACTOR = 1.253314 # 1.253314 * MeanAD approximately equals the standard deviation + + def __init__(self): + self.median: float = None + self.mad: float = None # median absolute deviation + self.meanad: float = None # mean absolute deviation + + def fit(self, x: torch.Tensor): + med = x.median().item() + self.median = med + self.mad = (x - med).abs().median().item() + self.meanad = (x - med).abs().mean().item() + # Having a meanad == 0 (when all values are the same), breaks training. Just use 1.0 in this case + if (self.meanad == 0): + self.meanad = 1.0 + + def transform(self, x: typing.Union[torch.Tensor, np.ndarray]): + result = ensure_float_type(x) + + result -= self.median + if self.mad == 0: + result /= (self.MEANAD_SCALING_FACTOR * self.meanad) + else: + result /= (self.MAD_SCALING_FACTOR * self.mad) + return result + + def inverse_transform(self, x: torch.Tensor): + result = ensure_float_type(x) + + if self.mad == 0: + result *= (self.MEANAD_SCALING_FACTOR * self.meanad) + else: + result *= (self.MAD_SCALING_FACTOR * self.mad) + result += self.median + return result + + def fit_transform(self, x: torch.Tensor): + self.fit(x) + return self.transform(x) + + class GaussRankScaler(object): """ So-called "Gauss Rank" scaling. @@ -58,6 +122,7 @@ def fit_transform(self, x): self.fit(x) return self.transform(x) + class NullScaler(object): def __init__(self):