From 818ccca67d7fad014010b12a0eccd9b1cfc6f946 Mon Sep 17 00:00:00 2001 From: Dmitry Levshun <37076651+levshun@users.noreply.github.com> Date: Fri, 8 Dec 2023 20:31:58 +0300 Subject: [PATCH] Add files via upload Testing web interface --- .../forecasting/forecaster_ai/forecaster.py | 1321 ++++++++++++----- .../forecasting/forecaster_ai/loader.py | 775 +++++----- .../forecasting/forecaster_ai/logger.py | 296 ++-- .../forecaster_ai/\321\201hecker.py" | 84 ++ 4 files changed, 1647 insertions(+), 829 deletions(-) create mode 100644 "foressment_ai/forecasting/forecaster_ai/\321\201hecker.py" diff --git a/foressment_ai/forecasting/forecaster_ai/forecaster.py b/foressment_ai/forecasting/forecaster_ai/forecaster.py index 15e70f9..28e3ce8 100644 --- a/foressment_ai/forecasting/forecaster_ai/forecaster.py +++ b/foressment_ai/forecasting/forecaster_ai/forecaster.py @@ -1,341 +1,980 @@ -import os -os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' - -from keras.models import Sequential, load_model -from keras.layers import Dense, LSTM, Input, Dropout -from keras import regularizers -from keras.callbacks import EarlyStopping -from keras.preprocessing.sequence import TimeseriesGenerator - -from sklearn.preprocessing import MinMaxScaler -from sklearn.metrics import mean_squared_error, mean_absolute_error -import pickle - -import pandas as pd -import numpy as np -import sys - - -class AIForecaster: - """ - Class for forecasting the states of complex objects and processes - - :param epochs: Number of training epochs - :type epochs: int - - :param batch_size: Training batch size - :type batch_size: int - - :param early_stop: Object of the EarlyStopping class responsible for prematurely stopping training - :type early_stop: keras.callback - - :param time_window_length: Time window size during training - :type time_window_length: int - - :param n_features: Number of features - :type n_features: int - - :param model: Neural network model - :type model: keras.models.Sequential - - :param model_path: Path to the file with the forecasting model - :type model_path: str - """ - def __init__(self, time_window_length=0, n_features=0, - model_path='', - n_epochs=2, open=False): - """ - Model initialization - - :param open: Parameter for load model - :type open: bool - """ - self.model_path = model_path - - if open: - self.open_model() - else: - if (time_window_length != 0) and (n_features != 0): - if type(time_window_length) == int: - if type(n_features) == int: - self.time_window_length = time_window_length - self.n_features = n_features - else: - print('Nuber of features length must be integer') - exit() - else: - print('Time window length must be integer') - exit() - - else: - print('Uncorrected zero values') - exit() - - self.model = Sequential([ - Input(shape=(self.time_window_length, self.n_features)), - # self.input = (time_window_length, n_features) - LSTM(64, activation='relu', return_sequences=True, - kernel_regularizer=regularizers.l2(0.00)), - Dropout(0.01), - LSTM(32, return_sequences=True, activation='relu', - kernel_regularizer=regularizers.l2(0.00)), - Dropout(0.01), - LSTM(16, return_sequences=False, activation='relu', - kernel_regularizer=regularizers.l2(0.00)), - Dropout(0.01), - Dense(self.n_features, activation='sigmoid') - ]) - self.model.compile(optimizer='adam', loss='mse') - - self.epochs = n_epochs - self.batch_size = 128 - self.early_stop = EarlyStopping(monitor='loss', patience=1) - - def train(self, train_generator, validation_generator=None, - save=True): - """ - Training and validation of a neural network model on data - - :param train_generator: Temporary training data batch generator - :type train_generator: keras.preprocessing.sequence.TimeseriesGenerator - - :param validation_generator: Temporary test data batch generator - :type validation_generator: keras.preprocessing.sequence.TimeseriesGenerator - - :param save: Parameter fo saving model - :type save: bool - """ - generator_batch_size = train_generator[0][1].shape[1] - if generator_batch_size != self.n_features: - print('Incorrect data for training. Number of features must be = ' + str(self.n_features)) - exit() - - history = self.model.fit(train_generator, epochs=self.epochs, - validation_data=validation_generator, - callbacks=[self.early_stop], - batch_size=self.batch_size) - if save: - self.save_model() - - loss = round(history.history['loss'][-1], 4) - return loss - - def forecasting(self, current_batch, forecasting_data_length, verbose=True): - """ - Forecasting values within a given time window - - :param current_batch: Data array (batch) in the time window after which the forecasting is made - :type current_batch: numpy.array - - :return: Array of forecast data - :rtype: numpy.array - """ - predictions = [] - - for i in range(forecasting_data_length): - if verbose: - sys.stdout.write('\r\x1b[K' + 'Forecasting: {0}/{1}'.format(i, forecasting_data_length-1)) - sys.stdout.flush() - current_pred = self.model.predict(current_batch, - batch_size=self.batch_size)[0] - predictions.append(current_pred) - current_batch = np.append(current_batch[:, 1:, :], [[current_pred]], axis=1) - predictions = pd.DataFrame(predictions).values - return predictions - - def save_model(self): - """ - Save the forecasting model - """ - self.model.save(self.model_path, save_format='h5') - - def open_model(self): - """ - Open the forecasting model - """ - if os.path.isfile(self.model_path): - self.model = load_model(self.model_path) - self.time_window_length = self.model.input.shape[1] - self.n_features = self.model.input.shape[2] - print(self.model.summary()) - - else: - print('File with foreasting model does not exist') - self.model = None - exit() - - def data_to_generator(self, data): - """ - Convert the data to a temporary data batch generator - - :param data: Array of data to convert - :type data: numpy.array - - :return: Temporary data batch generator - :rtype: keras.preprocessing.sequence.TimeseriesGenerator - """ - try: - generator = TimeseriesGenerator(data, data, - length=self.time_window_length, - batch_size=1) - return generator - except: - return None - - def get_batch(self, generator, current_batch_id): - """ - Upload the last batch of temporary data - - :param generator: Temporary data batch generator - :type generator: keras.preprocessing.sequence.TimeseriesGenerator - - :return: Last batch of temporary data - :rtype: numpy.array - """ - if current_batch_id == -1: - config = generator.get_config() - current_batch_id = config['end_index'] - self.time_window_length - try: - batch = generator[current_batch_id] - batch = np.append(batch[0][:, 1:, :], [batch[1]], axis=1) - return batch - except: - print('Wrong batch number') - exit() - - -class DataScaler: - """ - Class for data normalization (scaling) - - :param scaler: Data normalization (scaling) model - :type scaler: sklearn.preprocessing - - :param scaler_path: Path to the normalization model file - :type scaler_path: string - """ - def __init__(self, scaler_path, - open=False): - """ - Model initialization. - - :param open: Parameter for load model - :type open: bool - """ - self.scaler_path = scaler_path - - if open: - self.open() - else: - self.scaler = MinMaxScaler() - - def fit(self, data, save=True): - """ - Training the normalization model - - :param data: Training data array - :type data: numpy.array - - :param open: Parameter for saving model - :type open: bool - """ - self.scaler.fit(data) - if save: - self.save() - - def save(self): - """ - Save the normalization model - """ - with open(self.scaler_path, 'wb') as file: - pickle.dump(self.scaler, file) - file.close() - - def open(self): - """ - Open the normalization model - """ - if os.path.isfile(self.scaler_path): - with open(self.scaler_path, 'rb') as file: - self.scaler = pickle.load(file) - file.close() - else: - print('File with normalization model does not exist') - self.scaler = None - - def transform(self, data): - """ - Data normalization - - :param data: Data array - :type data: numpy.array - - :return: Array of normalized data - :rtype: numpy.array - """ - return self.scaler.transform(data) - - def inverse(self, data): - """ - Inverse data transformation - - :param data: Array of normalized data - :type data: numpy.array - - :return: Array of inverted data - :rtype: numpy.array - """ - return self.scaler.inverse_transform(data) - - -class ForecastEstimator: - """ - Class for evaluating the quality of the forecasting model - - :param quality: Matrix of forecasting quality metrics - :type quality: pandas.DataFrame - """ - def __init__(self): - self.quality = pd.DataFrame() - - def estimate(self, true, pred, feature_names=[]): - """ - Quality evaluation of the forecasting model - - :param data: Real data array - :type data: numpy.array - - :param pred: Array of forecasted data - :type pred: numpy.array - - :return: Matrix of forecasting quality metrics, MSE - mean squared error, MAE - mean absolute error - :rtype: pandas.DataFrame - """ - if len(true) != len(pred): - print('The length of the samples is not equal') - - else: - self.quality['MSE'] = mean_squared_error(true, pred, multioutput='raw_values') - self.quality['MAE'] = mean_absolute_error(true, pred, multioutput='raw_values') - - if len(feature_names) == self.quality.shape[0]: - self.quality.index = feature_names - - self.quality.loc['ALL_FEATURES', 'MSE'] = mean_squared_error(true, pred) - self.quality.loc['ALL_FEATURES', 'MAE'] = mean_absolute_error(true, pred) - - return self.quality - - def save(self, file_name): - """ - Save results to file - - :param file_name: name of the file to save - :type file_name: str - """ - if not os.path.exists('forecaster_results/'): - os.makedirs('forecaster_results/') - - self.quality.to_csv('forecaster_results/' + file_name + '.csv', - index_label='feature') - +import os +os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' + + +from keras.models import load_model, Sequential +from keras.layers import Dense, Reshape, LSTM, Input, Dropout, SimpleRNN, GRU +from keras.callbacks import EarlyStopping +from keras import optimizers + +import keras_tuner # keras-tuner + grpcio (ver. 1.27.2) +from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score + +import pandas as pd +import numpy as np +import sys +import json +import math +import matplotlib.pyplot as plt +from typing import Type +from tqdm import tqdm + +from foressment_ai.forecasting.forecaster_ai.сhecker import ParamChecker + +checker = ParamChecker() + + +class ForecasterParameters: + """ + Class to initialization parameters. + """ + + def __init__(self, n_features=1, look_back_length=10, horizon=1): + """ + :param n_features: Number of features + :type n_features: int + :param look_back_length: The width (number of time steps) of the input time windows + :type look_back_length: int + :param horizon: Output time window length + :type horizon: int + """ + + self.param_names = ['n_features', 'look_back_length', 'horizon'] + + self.n_features = n_features + self.look_back_length = look_back_length + self.horizon = horizon + + def __setattr__(self, name, val): + if name == 'param_names': + super().__setattr__(name, val) + elif name in self.param_names: + super().__setattr__(name, checker.check_param(val, name)) + else: + raise AttributeError(name) + + def read_json(self, filename): + with open(filename) as f: + params = json.load(f) + + for k, v in params.items(): + self.__setattr__(k, v) + + def save_json(self, filename): + """ + Save model parameters to file. + :param filename: Name of file with parameters and their values + :type filename: string + + """ + with open(filename, 'w') as outfile: + class_dict = self.__dict__.copy() + del class_dict['param_names'] + json_string = json.dumps(class_dict) + outfile.write(json_string) + + def __str__(self): + class_dict = self.__dict__.copy() + del class_dict['param_names'] + return '\n'.join(['{0} = {1}'.format(k, v) for k, v in class_dict.items()]) + + +class AIForecasterParameters(ForecasterParameters): + def __init__(self, n_features=1, look_back_length=10, horizon=1, + units=None, block_type='LSTM', dropout=0, + hidden_activation='tanh', output_activation='linear', + loss='mse', optimizer_clipvalue=0.5, + params_from_file=''): + """ + :param n_features: Number of features + :type n_features: int + :param look_back_length: The width (number of time steps) of the input time windows + :type look_back_length: int + :param horizon: Output time window length + :type horizon: int + + :param model_hps: Model hyperparameters + :type model_params: dict + + :param model_hps['n_rec_layers']: Numbers of recurrent neural networl layers + :type model_hps['n_rec_layers']: int + + :param model_hps['units']: List of number of units on each recurrent layer + :type model_hps['units']: list + + :param model_hps['block_type']: Recurrent block type + :type model_hps['block_type']: str + + :param model_hps['dropout']: Частота отсева слоев + :type model_hps['dropout']: float + + :param model_hps['hidden_activation']: Activation function on hidden layers + :type model_hps['hidden_activation']: str + + :param model_hps['output_activation']: Activation function on output layer + :type model_params['output_activation']: str + + :param model_hps['optimizer']: Optimization function + :type model_hps['optimizer']: str + + :param model_hps['loss']: Loss function + :type model_hps['loss']: str + """ + + super().__init__(n_features, look_back_length, horizon) + + self.param_names = self.param_names + ['units', 'block_type', 'dropout', + 'hidden_activation', 'output_activation', 'optimizer', 'loss'] + if units is None: + units = [512] + + self.units = units + # self.n_rec_layers = len(units) + self.block_type = block_type + self.dropout = dropout + self.hidden_activation = hidden_activation + self.output_activation = output_activation + self.optimizer = optimizers.legacy.Adam(clipvalue=optimizer_clipvalue) + self.loss = loss + + def n_rec_layers(self): + return len(self.units.keys()) + + def __setattr__(self, name, val): + if name == 'param_names': + super().__setattr__(name, val) + elif name == 'units': + assert ((type(val) == list) or (type(val) == dict)), "Type if units must be list or dict" + if type(val) == list: + super().__setattr__(name, self.units_to_dict(val)) + else: + super().__setattr__(name, val) + elif name in self.param_names: + super().__setattr__(name, checker.check_param(val, name)) + else: + raise AttributeError(name) + + def save_json(self, filename): + """ + Save model parameters to file. + :param filename: Name of file with parameters and their values + :type filename: string + + """ + with open(filename, 'w') as outfile: + class_dict = self.__dict__.copy() + del class_dict['param_names'] + units = list(class_dict['units'].values()) + class_dict['units'] = units + json_string = json.dumps(class_dict) + outfile.write(json_string) + + @staticmethod + def units_to_dict(units): + units = [checker.check_param(u, 'units_of_layer') for u in units] + # self.n_rec_layers = len(units) + return {'units_{0}'.format(i): u for i, u in enumerate(units)} + + +class TSGenerator: + """ + Class for timeseries generator. + """ + + def __init__(self, data, model_params: ForecasterParameters): + """ + + :param model_params: + :type model_params: ForecasterParameters + """ + self.model_params = model_params + + self._temporalize(data) + + def change_horizon(self, horizon): + self.model_params.horizon = checker.check_param(horizon, 'horizon') + + data = self.get_data(flatten=True) + targets = self.get_targets(window_id=-1, flatten=True) + X = np.append(data, targets, axis=0) + self._temporalize(X) + + def _temporalize(self, X): + """ + Reformat data to time windows. + :param X: Data + :type X: np.array + """ + X = checker.check_is_type_param(X, 'data for TSGenerator', np.ndarray) + assert len(X.shape) in [1, 2], 'Data for TSGenerator must be 1D or 2D numpy array' + if len(X.shape) == 1: + X = np.reshape(X, (X.shape[0], 1)) + self.model_params.look_back_length = checker.check_in_range_param(self.model_params.look_back_length, + 'look_back_length', (0, X.shape[0])) + self.model_params.horizon = checker.check_in_range_param(self.model_params.horizon, + 'horizon', + (0, X.shape[0] - self.model_params.look_back_length + 1)) + + self.data = np.empty(shape=(0, self.model_params.look_back_length, self.model_params.n_features)) + self.targets = np.empty(shape=(0, self.model_params.horizon, self.model_params.n_features)) + + data_length = X.shape[0] - self.model_params.look_back_length - self.model_params.horizon + 1 + p, q = X.shape + m, n = X.strides + strided = np.lib.stride_tricks.as_strided + + self.data = strided(X, + shape=(data_length, self.model_params.look_back_length, q), + strides=(m, m, n)) + self.targets = strided(X[self.model_params.look_back_length:], + shape=(data_length, self.model_params.horizon, q), + strides=(m, m, n)) + + def get_data(self, flatten=False, window_id=None, sample=None): + return self._get_x(self.data, flatten, window_id, sample) + + def get_targets(self, flatten=False, window_id=None, sample=None): + return self._get_x(self.targets, flatten, window_id, sample) + + def _get_x(self, x, flatten, window_id, sample): + """ + Get time window by order id. + :param x: Data + :type x: np.ndarray + :param flatten: Flat data or not + :type flatten: boolean + :param window_id: Id of window + :type window_id: int + :return: time window + :rtype: np.ndarray + """ + x_to_get = None + if window_id is not None: + window_id = checker.check_is_type_param(window_id, 'window_id', int) + window_id = checker.check_in_range_param(window_id, 'window_id', (-1*(x.shape[0]-1), x.shape[0]-1)) + x_to_get = x[window_id] + x_to_get = np.reshape(x_to_get, (1,) + x_to_get.shape) + elif sample is not None: + sample = checker.check_is_type_param(sample, 'sample', tuple) + if sample[0]: + sample_min = sample[0] + else: + sample_min = 0 + if sample[1]: + sample_max = sample[1] + else: + sample_max = x.shape[0] + x_to_get = x[sample_min:sample_max] + if flatten: + if x_to_get is not None: + x_to_get = self._flatten(x_to_get) + else: + x_to_get = self._flatten(x) + + if x_to_get is not None: + return x_to_get + else: + return x + + @staticmethod + def _flatten(X): + """ + Make data flat (3d array to 2d). + :param X: Data + :type X: np.ndarray + :return: Flatten data + :rtype: np.ndarray + """ + flattened_X = np.empty((X.shape[0] + X.shape[1] - 1, X.shape[2])) + last_id = X.shape[0] - 1 + for i in range(last_id): + flattened_X[i] = X[i, 0, :] + + for i in range(last_id, X[last_id].shape[0]): + flattened_X[i] = X[last_id, i, :] + return flattened_X + + +class NaiveForecaster: + def __init__(self, model_params: ForecasterParameters): + """ + + :param model_params: + :type model_params: ForecasterParameters + """ + self.model_params = model_params + + def _predict(self, data, verbose=0): + predictions = np.empty(shape=(0, self.model_params.horizon, self.model_params.n_features)) + if verbose == 1: + pbar = tqdm(desc='Forecasting', total=self.model_params.horizon * data.shape[0], file=sys.stdout) + else: + pbar = None + for batch in data: + current_pred = np.array([batch[-1] for i in range(self.model_params.horizon)]) + current_pred = np.reshape(current_pred, (1,) + current_pred.shape) + predictions = np.concatenate((predictions, current_pred)) + if verbose == 1: + pbar.update(1) + if verbose == 1: + pbar.close() + return predictions + + def forecasting(self, data, forecasting_data_length=None, verbose=1): + """ + Forecasting values within a given time window. + :param data: Data for forecasting + :type data: np.ndarray + :param forecasting_data_length: Time window size for forecasting + :type forecasting_data_length: int + :param verbose: Show forecasting parameter + :type verbose: int + :return: Array of forecast data + :rtype: np.ndarray + """ + if not forecasting_data_length: + forecasting_data_length = self.model_params.horizon + + forecasting_data_length = checker.check_is_type_param(forecasting_data_length, 'forecasting_data_length', int) + forecasting_data_length = checker.check_in_range_param(forecasting_data_length, 'forecasting_data_length', + (0, None)) + + assert len(data.shape) in [1, 2, 3], 'Data must be 1D, 2D or 3d numpy array' + if len(data.shape) == 1: + data = np.reshape(data, (1, data.shape[0], 1)) + if len(data.shape) == 2: + data = np.reshape(data, (1, data.shape[0], data.shape[1])) + assert data.shape[1] == self.model_params.look_back_length, ('Data length (or data.shape[1]) ' + 'must be equal to look_back_length') + + if forecasting_data_length <= self.model_params.horizon: + predictions = self._predict(data, verbose=1) + for i in range(predictions.shape[0]): + predictions[i] = predictions[i][:forecasting_data_length] + return predictions + else: + predictions = np.empty(shape=(data.shape[0], forecasting_data_length, data.shape[2])) + + if verbose == 1: + pbar = tqdm(desc='Forecasting', total=forecasting_data_length * data.shape[0], file=sys.stdout) + else: + pbar = None + + for i in range(data.shape[0]): + batch = np.reshape(data[i], (1,) + data[i].shape) + + pred_to_batch = np.empty(shape=(0, data.shape[2])) + while len(pred_to_batch) < forecasting_data_length: + current_pred = self._predict(batch) + current_pred = current_pred[0] + pred_to_batch = np.append(pred_to_batch, current_pred, axis=0) + batch = np.append(batch[:, self.model_params.horizon:, :], [current_pred], axis=1) + if verbose == 1: + pbar.update(self.model_params.horizon) + pred_to_batch = pred_to_batch[:forecasting_data_length] + pred_to_batch = np.reshape(pred_to_batch, (1,) + pred_to_batch.shape) + predictions[i] = pred_to_batch + # predictions = np.reshape(predictions, predictions.shape + (1,)) + if verbose == 1: + pbar.close() + return predictions + + +class AIForecaster(NaiveForecaster): + """ + Class for forecasting the states of complex objects and processes + """ + + def __init__(self, model_params: AIForecasterParameters = None, model: Sequential = None, + from_file='', from_file_config='', from_config=None): + # """ + # Model initialization + # + # :param _max_num_units: Maximum of layer units + # :type _max_num_units: int + # :param _default_units_step: Step between units of layers + # :type _default_units_step: int + # + # """ + super().__init__(model_params) + self.model_config = None + self.model = model + if model is not None: + self._init_model() + if from_file: + self.load_from_file(from_file) + if from_file_config: + self.load_from_model_config(filename=from_file_config) + if from_config: + self.load_from_model_config(config=from_config) + self.history = None + + def _init_model(self): + self.model_config = self.model.get_config() + self._set_model_params_from_config() + self.default_filename = self.model_params.block_type.lower() + '_' + \ + '_'.join(str(u) for u in self.model_params.units.values()) + \ + '_d' + str(self.model_params.dropout).replace('.', '') + + def load_from_file(self, filename=''): + """ + Open the forecasting model from file. + :param filename: Name of model file + :type filename: string + """ + checker.check_file_is_exist(filename) + self.model = load_model(filename) + self._init_model() + # print(self.model.summary()) + + def load_from_model_config(self, filename='', config=None): + """ + Create model by keras configuration. + :param filename: Name of file with keras configuration. + :type filename: string + """ + if filename: + with open(filename) as f: + self.model_config = json.load(f) + if config: + self.model_config = config + + self.model = Sequential.from_config(self.model_config) + self._init_model() + # print(self.model.summary()) + self.model.compile(optimizer=self.model_params.optimizer, loss=self.model_params.loss) + + def _set_model_params_from_config(self): + self.model_params = AIForecasterParameters( + n_features=self.model_config['layers'][0]['config']['batch_input_shape'][2], + look_back_length=self.model_config['layers'][0]['config']['batch_input_shape'][1], + block_type=self.model_config['layers'][1]['class_name'], + hidden_activation=self.model_config['layers'][1]['config']['activation'] + ) + + n = 0 + units = [] + dropout = 0 + for layer in self.model_config['layers'][1:]: + if layer['class_name'] == self.model_params.block_type: + n = n + 1 + units.append(layer['config']['units']) + if layer['class_name'] == 'Dropout': + dropout = layer['config']['rate'] + if layer['class_name'] == 'Dense': + self.model_params.output_activation = layer['config']['activation'] + if layer['class_name'] == 'Reshape': + self.model_params.horizon = layer['config']['target_shape'][0] + + self.model_params.units = units + self.model_params.dropout = dropout + + def save_model_config(self, filename): + assert self.model, 'Model does not exist' + + self.model_config = self.model.get_config() + + with open(filename, 'w') as outfile: + json_string = json.dumps(self.model_config) + outfile.write(json_string) + outfile.write('\n') + + def save_model(self, filename): + """ + Save the forecasting model + """ + self.model.save(filename) + + def build_model(self): + """ + Build model by parameters. + """ + self.model = Sequential() + self.model.add(Input(shape=(self.model_params.look_back_length, self.model_params.n_features))) + + for n in range(self.model_params.n_rec_layers()): + units = self.model_params.units['units_' + str(n)] + activation = self.model_params.hidden_activation + last_layer = False + if n == (self.model_params.n_rec_layers() - 1): + last_layer = True + self._add_recurrent_layer(units, activation, last_layer) + + if self.model_params.dropout > 0: + self.model.add(Dropout(self.model_params.dropout)) + + output_activation = self.model_params.output_activation + self.model.add(Dense(self.model_params.horizon * self.model_params.n_features, + activation=output_activation)) + # Shape => [batch, out_steps, features]. + self.model.add(Reshape([self.model_params.horizon, self.model_params.n_features])) + + self.model.compile(optimizer=self.model_params.optimizer, loss=self.model_params.loss) + self.model_config = self.model.get_config() + + def _add_recurrent_layer(self, units, activation, last_layer=False): + return_sequences = not last_layer + # if n == (self.model_hps['n_rec_layers']-1): + + if self.model_params.block_type == 'SimpleRNN': + self.model.add(SimpleRNN(units=units, activation=activation, + return_sequences=return_sequences)) + if self.model_params.block_type == 'LSTM': + self.model.add(LSTM(units=units, activation=activation, + return_sequences=return_sequences)) + if self.model_params.block_type == 'GRU': + self.model.add(GRU(units=units, activation=activation, + return_sequences=return_sequences)) + + def train(self, X, y, n_epochs=100, batch_size=128, + verbose=1, + validation_split=None): + """ + Training and validation of a neural network model on data + + :param X: + :type X: numpy.array + + :param y: targets + :type X: numpy.ndarray + + :param batch_size: Training batch size + :type batch_size: int + + :param epochs: Number of training epochs + :type epochs: int + + :param validation_split: + :type validation_split: float + + """ + early_stop = EarlyStopping(monitor='loss', patience=1) + + self.history = self.model.fit(X, y, epochs=n_epochs, + callbacks=[early_stop], + batch_size=batch_size, + validation_split=validation_split, + shuffle=False, + verbose=verbose) + + def _predict(self, data, verbose=0): + return self.model.predict(data, batch_size=128, verbose=verbose) + + def get_loss(self): + return round(self.history.history['loss'][-1], 4) + + +class AIForecasterTuner: + def __init__(self, model_params: AIForecasterParameters): + self.hp_choices = None + self.model_params = model_params + + def set_tuned_hps(self, block_type=None, units=None, n_rec_layers=None, dropout=None, + hidden_activation=None, output_activation=None): + """ + Set parameters variables for tuning. + :param tuned_hps: Parameters variables + :param tuned_hps: dict + """ + self.hp_choices = {} + + if block_type: + assert checker.check_is_type_param(block_type, 'block_type', list) + self.hp_choices['block_type'] = [checker.check_param(val, param_name='block_type') for val in block_type] + + if units: + assert checker.check_is_type_param(units, 'units', list) + if n_rec_layers: + assert checker.check_is_type_param(n_rec_layers, 'n_rec_layers', list) + assert (max(n_rec_layers) >= len(units)), \ + 'The number of layers should be no more than {0} (the number of transferred units). ' \ + 'Found {1}.'.format(len(units), max(n_rec_layers)) + + self.hp_choices['units'] = {f'units_{i}': [checker.check_param(u, param_name='units_of_layer') + for u in + checker.check_is_type_param(units_of_layers, 'units_of_layers', + list)] + for i, units_of_layers in enumerate(units)} + + if dropout: + assert checker.check_is_type_param(dropout, 'dropout', list) + self.hp_choices['dropout'] = [checker.check_param(val, param_name='dropout') for val in dropout] + + if n_rec_layers: + assert checker.check_is_type_param(n_rec_layers, 'n_rec_layers', list) + self.hp_choices['n_rec_layers'] = [checker.check_param(val, param_name='n_rec_layers') for val in + n_rec_layers] + + if hidden_activation: + assert checker.check_is_type_param(hidden_activation, 'hidden_activation', list) + self.hp_choices['hidden_activation'] = [checker.check_param(val, param_name='hidden_activation') + for val in hidden_activation] + + if output_activation: + assert checker.check_is_type_param(output_activation, 'output_activation', list) + self.hp_choices['output_activation'] = [checker.check_param(val, param_name='output_activation') + for val in output_activation] + + def build_hypermodel(self, hp): + """ + Build hypermodel takes an argument from which to sample hyperparameters. + :param hp: Hyperparameter object of Keras Tuner (to define the search space for the hyperparameter values) + :type hp: keras_tuner.HyperParameters + """ + model = Sequential() + model.add(Input(shape=(self.model_params.look_back_length, + self.model_params.n_features))) + + if 'n_rec_layers' in self.hp_choices: + layers_range = range(hp.Choice('n_rec_layers', self.hp_choices['n_rec_layers'])) + else: + layers_range = range(self.model_params.n_rec_layers()) + + if 'block_type' in self.hp_choices: + block_type = hp.Choice('block_type', self.hp_choices['block_type']) + else: + block_type = self.model_params.block_type + + for n in layers_range: + if 'units' in self.hp_choices: + units = hp.Choice(f'units_{n}', self.hp_choices['units'][f'units_{n}']) + else: + units = self.model_params.units[f'units_{n}'] + if 'hidden_activation' in self.hp_choices: + activation = hp.Choice('hidden_activation', self.hp_choices['hidden_activation']) + else: + activation = self.model_params.hidden_activation + last_layer = False + if n == (layers_range.stop - 1): + last_layer = True + model = self._add_hidden_layer(model, block_type, units, activation, last_layer) + + if 'dropout' in self.hp_choices: + dropout = hp.Choice('dropout', self.hp_choices['dropout']) + model.add(Dropout(dropout)) + else: + if self.model_params.dropout > 0: + model.add(Dropout(self.model_params.dropout)) + + if 'output_activation' in self.hp_choices: + output_activation = hp.Choice('output_activation', self.hp_choices['output_activation']) + else: + output_activation = self.model_params.output_activation + + model.add(Dense(self.model_params.horizon * self.model_params.n_features, + activation=output_activation)) + # Shape => [batch, out_steps, features]. + model.add(Reshape([self.model_params.horizon, self.model_params.n_features])) + + model.compile(optimizer=self.model_params.optimizer, loss=self.model_params.loss) + return model + + def _add_hidden_layer(self, model, block_type, units, activation, last_layer=False): + return_sequences = not last_layer + # if n == (self.model_hps['n_rec_layers']-1): + + if block_type == 'SimpleRNN': + model.add(SimpleRNN(units=units, activation=activation, + return_sequences=return_sequences)) + if block_type == 'LSTM': + model.add(LSTM(units=units, activation=activation, + return_sequences=return_sequences)) + if block_type == 'GRU': + model.add(GRU(units=units, activation=activation, + return_sequences=return_sequences)) + return model + + def _create_tuner_and_searh(self, x, y, tuner_type='RandomSearch', + max_trials=10, batch_size=128, epochs=10): + """ + + :param x: + :param y: + :param tuner_type: + :param n_models: + :param max_trials: + :param batch_size: + :param epochs: + :return: + """ + if not self.hp_choices: + # Default tuned parameters. + self.set_tuned_hps( + units=[[int(u) for u in np.arange(checker.max_num_units, + checker.default_units_step, + -checker.default_units_step)] + for n in range(self.model_params.n_rec_layers())], + hidden_activation=['tanh', 'relu'], + output_activation=['linear', 'sigmoid']) + + tuner_type = checker.check_in_list_param(tuner_type, 'tuner_type', + ['RandomSearch', 'BayesianOptimization', 'Hyperband']) + # Initialize tuner to run the model. + tuner = None + if tuner_type == 'RandomSearch': + tuner = keras_tuner.RandomSearch( + hypermodel=self.build_hypermodel, + objective='loss', + max_trials=max_trials, # the number of different models to try + project_name='ai_forecaster', + overwrite=True + ) + elif tuner_type == 'BayesianOptimization': + tuner = keras_tuner.BayesianOptimization( + hypermodel=self.build_hypermodel, + objective='loss', + max_trials=max_trials, + project_name='ai_forecaster', + overwrite=True + ) + elif tuner_type == 'Hyperband': + tuner = keras_tuner.Hyperband( + hypermodel=self.build_hypermodel, + objective='loss', + project_name='ai_forecaster', + overwrite=True + ) + + print(tuner.search_space_summary()) + # Run the search + tuner.search(x, y, batch_size=batch_size, epochs=epochs, + callbacks=[EarlyStopping('loss', patience=1)]) + return tuner + + def find_best_models(self, x, y, tuner_type='RandomSearch', n_models=1, + max_trials=10, batch_size=128, epochs=10): + tuner = self._create_tuner_and_searh(x, y, tuner_type, + max_trials=max_trials, batch_size=batch_size, epochs=epochs) + + print("Results summary") + print("Showing %d best trials" % n_models) + + for trial in tuner.oracle.get_best_trials(n_models): + print() + print(f"Trial {trial.trial_id} summary") + print("Hyperparameters:") + hyperparameters = trial.hyperparameters.values + + if 'n_rec_layers' in hyperparameters: + n_rec_layers = hyperparameters['n_rec_layers'] + units = [hp for hp in hyperparameters.keys() if 'units' in hp] + if len(units) > n_rec_layers: + for i in range(n_rec_layers, len(units)): + del hyperparameters[f'units_{i}'] + + for hp, value in trial.hyperparameters.values.items(): + print(f"{hp}:", value) + if trial.score is not None: + print(f"Score: {trial.score}") + + + # best_hps = tuner.get_best_hyperparameters(n_models) + best_tuner_models = tuner.get_best_models(n_models) + best_models = [AIForecaster(from_config=tuner_model.get_config()) for tuner_model in best_tuner_models] + return best_models + + +class ForecastEstimator: + """ + Class for evaluating the quality of the forecasting model + + :param quality: Matrix of forecasting quality metrics + :type quality: pandas.DataFrame + """ + + def __init__(self, feature_names=None): + self.first_batch = None + self.true = None + self.pred = {} + self.feature_names = feature_names + self.quality = pd.DataFrame() + + def set_true_values(self, true): + assert len(true.shape) in [1, 2, 3], 'True data must be 1D, 2D or 3d numpy array' + if len(true.shape) == 1: + true = np.reshape(true, (1, true.shape[0], 1)) + if len(true.shape) == 2: + true = np.reshape(true, (1, true.shape[0], true.shape[1])) + self.true = true + + def set_pred_values(self, pred, model_name='naive'): + model_name = checker.check_is_type_param(model_name, model_name, str) + assert len(pred.shape) in [1, 2, 3], 'Predicted data must be 1D, 2D or 3d numpy array' + if len(pred.shape) == 1: + pred = np.reshape(pred, (1, pred.shape[0], 1)) + if len(pred.shape) == 2: + pred = np.reshape(pred, (1, pred.shape[0], pred.shape[1])) + + self.pred[model_name] = pred + + def set_first_batch(self, first_batch): + self.first_batch = first_batch + + def estimate(self): + """ + Quality evaluation of the forecasting model + + :param data: Real data array + :type data: np.ndarray + + :param pred: Array of forecasted data + :type pred: np.ndarray + + :return: Matrix of forecasting quality metrics, MSE - mean squared error, MAE - mean absolute error + :rtype: pandas.DataFrame + """ + assert self.true is not None, 'No true values for estimate' + self.quality = pd.DataFrame() + true_reshaped = self.true.reshape((self.true.shape[0] * self.true.shape[1], self.true.shape[2])) + + if self.feature_names is None: + feature_names = [str(i) for i in range(self.true.shape[2])] + else: + feature_names = self.feature_names.copy() + feature_names.append('ALL_FEATURES') + + for model_name, pred_vals in self.pred.items(): + assert (len(self.true) == len(pred_vals)), f'The length of' + model_name +\ + ' result not equal to true values' + pred_reshaped = pred_vals.reshape((self.true.shape[0] * self.true.shape[1], self.true.shape[2])) + + mse = mean_squared_error(true_reshaped, pred_reshaped, multioutput='raw_values', squared=True) + mse = np.append(mse, mean_squared_error(true_reshaped, pred_reshaped, squared=True)) + self.quality[model_name + '_MSE'] = mse + + rmse = mean_squared_error(true_reshaped, pred_reshaped, multioutput='raw_values', squared=False) + rmse = np.append(rmse, mean_squared_error(true_reshaped, pred_reshaped, squared=False)) + self.quality[model_name + '_RMSE'] = rmse + + mae = mean_absolute_error(true_reshaped, pred_reshaped, multioutput='raw_values') + mae = np.append(mae, mean_absolute_error(true_reshaped, pred_reshaped)) + self.quality[model_name + '_MAE'] = mae + + r2 = r2_score(true_reshaped, pred_reshaped, multioutput='raw_values') + r2 = np.append(r2, r2_score(true_reshaped, pred_reshaped)) + self.quality[model_name + '_R2'] = r2 + + self.quality.index = feature_names + + def save_quality(self, filename): + """ + Save estimation results to file + + :param filename: name of the file to save + :type filename: str + """ + if not os.path.exists('forecaster_results/'): + os.makedirs('forecaster_results/') + + self.quality.to_csv('forecaster_results/' + filename + '.csv', + index_label='feature') + + def save_pred_result(self, dataset_name): + """ + Save forecaster models results to file. + :param filename: name of the file to save + :return: str + """ + if not os.path.exists('forecaster_results/'): + os.makedirs('forecaster_results/') + + for model_name, pred_vals in self.pred.items(): + filename = 'forecaster_results/' + dataset_name + '_' + model_name + '.npy' + np.save(filename, pred_vals) + print('Save ' + filename) + + def draw_feature(self, i_feature, ax, data_size=1000): + + # plt.figure(figsize=(15, 8)) + def draw_windows(data, start_x=0, color='black', label='Data', alpha=1.0): + timeline = [] + for p in range(data.shape[0]): + y = data[p] + timeline.append(y[0]) + x = range(p + start_x, p + y.shape[0] + start_x) + if p == (data.shape[0] - 1): + ax.plot(x, y, marker='.', color=color, label=label, alpha=alpha) + else: + ax.plot(x, y, marker='.', color=color, alpha=alpha) + x = range(start_x, len(timeline) + start_x) + ax.plot(x, timeline, color=color, alpha=alpha) + + target_start_point = 0 + connection_line = {} + + if self.first_batch is not None: + draw_windows(self.first_batch[:, :, i_feature], color='blue', label='Inputs') + target_start_point = self.first_batch.shape[1] + connection_line = {'x': [target_start_point - 1, target_start_point], + 'y': [self.first_batch[0][-1, i_feature], None]} + + if self.true is not None: + if connection_line: + connection_line['y'][1] = self.true[0][0, i_feature] + plt.plot(connection_line['x'], connection_line['y'], marker='.', color='green') + draw_windows(self.true[:data_size, :, i_feature], start_x=target_start_point, color='green', label='True') + + if self.pred: + model_names = list(self.pred.keys()) + color_dicts = {} + if 'naive' in model_names: + model_names.remove('naive') + color_dicts['naive'] = 'grey' + + color_map = plt.cm.get_cmap('plasma', len(model_names)) + for i, model_name in enumerate(model_names): + color_dicts[model_name] = color_map(i) + + for model_name, pred_vals in self.pred.items(): + if connection_line: + connection_line['y'][1] = pred_vals[0][0, i_feature] + plt.plot(connection_line['x'], connection_line['y'], marker='.', color=color_dicts[model_name]) + draw_windows(pred_vals[:data_size, :, i_feature], start_x=target_start_point, color=color_dicts[model_name], + label=model_name.capitalize()) + + ax.legend(fontsize=8) + # plt.grid('both') + # plt.show() + + def draw(self, size=1000, feature_names=None): + if feature_names is None: + if self.feature_names is None: + feature_names = [str(i) for i in range(self.true.shape[2])] + else: + feature_names = self.feature_names.copy() + + n = len(feature_names) + nrows = 1 + ncols = 1 + if n <= 3: + nrows = n + elif n % 3 == 0: + ncols = 3 + nrows = int(n / 3) + elif n % 2 == 0: + ncols = 2 + nrows = int(n / 2) + else: + ncols = 2 + nrows = int(math.floor(n / 2)) + + fig, axs = plt.subplots(nrows, ncols, sharex='col', figsize=(ncols*6, nrows*3)) + fig.tight_layout(pad=2.3) + fig.supxlabel('Time') + + i_feature = 0 + color_map = plt.cm.get_cmap('plasma', n) + + for i in range(nrows): + if ncols == 1: + self.draw_feature(i_feature, axs[i], size) + axs[i].set_ylabel(feature_names[i_feature]) + i_feature = i_feature + 1 + else: + for j in range(ncols): + if i_feature > n: + break + self.draw_feature(i_feature, axs[i, j], size) + axs[i, j].set_ylabel(feature_names[i_feature]) + i_feature = i_feature + 1 + plt.show() diff --git a/foressment_ai/forecasting/forecaster_ai/loader.py b/foressment_ai/forecasting/forecaster_ai/loader.py index 019f3a7..2160305 100644 --- a/foressment_ai/forecasting/forecaster_ai/loader.py +++ b/foressment_ai/forecasting/forecaster_ai/loader.py @@ -1,340 +1,435 @@ -import os -import pandas as pd -from pandas.api.types import is_numeric_dtype -import numpy as np -import configparser -from foressment_ai import RulesExtractor - -class DataLoaderAndPreprocessorDefault: - """ - Class for loading and preprocessing data - - :param DATA_PATH: Path to th e directory with datasets - :type DATA_PATH: string - - :param drop_features: Set of features to remove from the data - :type drop_features: list - - :param categorical_features: A set of categorical features in the data - :type categorical_features: list - - :param data: Data feature matrix - :type data: pandas.DataFrame - """ - def __init__(self, dataset_name, nrows=None, suf='', data_configs_path="../datasets/configs", - label_format='binary', train_size=0.9): - """ - Initializing - - :param dataset_name: Data set name - :type dataset_name: string - - :param mode: Boot mode, for developers - :type mode: int - """ - - self.dataset_name = dataset_name - - self.__load_data__(data_configs_path, label_format, nrows) - - self.set_train_size(train_size) - - def __load_data__(self, data_configs_path, label_format, nrows): - """ - Import data with configuration file. - """ - config = configparser.ConfigParser() - - try: - config.read(data_configs_path + '/' + self.dataset_name + '.ini') - except: - print('Unknown dataset name!') - exit() - - dataset_path = config.get('Params', 'data_path') - dataset_filename = config.get('Params', 'filename', fallback='') - sep = config.get('Params', 'sep', fallback=',') - decimal = config.get('Params', 'decimal', fallback='.') - - if dataset_filename: - self.data = pd.read_csv(dataset_path + '/' + dataset_filename, sep=sep, decimal=decimal, nrows=nrows) - else: - for file in os.listdir(dataset_path): - self.data = pd.concat([self.data, pd.read_csv(dataset_path + '/' + file, sep=sep, decimal=decimal)], - ignore_index=True) - if nrows: - if self.data.shape[0] > nrows: - self.data = self.data.loc[:nrows] - break - - self.data.columns = [c.strip() for c in self.data.columns] - - timestamp_feature = config.get('Features', 'timestamp_feature', fallback=None) - self.binaries_features = self.__config_get_list__(config, 'binaries_features') - self.drop_features = self.__config_get_list__(config, 'drop_features') - self.categorical_features = self.__config_get_list__(config, 'categorical_features') - self.important_features = self.__config_get_list__(config, 'important_features') - - if timestamp_feature: - self.timestamps = self.data[timestamp_feature].squeeze() - - time_format = config.get('Other', 'time_format', fallback='') - timestamp_unit = config.get('Other', 'timestamp_unit', fallback='') - - if time_format: - self.timestamps = self.str_data_to_time(self.timestamps, - time_format=time_format) - if timestamp_unit: - self.timestamps = self.float_data_to_time(self.timestamps, timestamp_unit) - - self.data = self.data.drop(timestamp_feature, axis=1) - - label_section = 'Labels.' + label_format - label_config = config[label_section] - - self.label_feature = label_config.get('label', fallback=None) - normal_label = label_config.get('normal_label', fallback='Normal') - self.labels = self.data[self.label_feature] - self.data = self.data.drop(self.label_feature, axis=1) - - if not is_numeric_dtype(self.labels): - self.__encoding_labels__(normal_label) - - self.drop_features = [f for f in self.drop_features if f in self.data.columns] - if len(self.drop_features) > 0: - self.data = self.data.drop(self.drop_features, axis=1) - - if len(self.categorical_features) > 0: - self.__categorical_feature_encoding__() - - self.feature_names = list(self.data.columns) - self.data = self.data.fillna(0) - - @staticmethod - def __config_get_list__(config, feature_name): - try: - return config.get('Features', feature_name).replace('\n', '').split(',') - except: - return [] - - @staticmethod - def str_data_to_time(data, time_format): - ''' - :param data: timestamp column (pd.Series) - :return: - ''' - data = data.map(lambda x: x.strip() if not pd.isnull(x) else x) - data = pd.to_datetime(data, format=time_format) - return data - - @staticmethod - def float_data_to_time(data, timestamp_unit): - ''' - :param data: timestamp column (pd.Series) - :return: - ''' - data = pd.to_datetime(data, unit=timestamp_unit) - return data - - def __encoding_labels__(self, normal_label): - labels_classes = sorted(self.labels.unique().tolist()) - labels_dictionary = {} - - if normal_label in labels_classes: - labels_dictionary[normal_label] = 0 - labels_classes.remove(normal_label) - - for i, label in enumerate(labels_classes): - labels_dictionary[label] = i - - self.labels = self.labels.map(lambda x: labels_dictionary[x]) - - def __categorical_feature_encoding__(self): - """ - Data categorical feature encoding - """ - new_categorical_features = [] - for feature in self.categorical_features: - try: - self.data[feature] = self.data[feature].astype(str) - categorical_feature_values = self.data[feature].unique().tolist() - if ('0.0' in categorical_feature_values) and ('0' in categorical_feature_values): - self.data[feature] = self.data[feature].map(lambda x: '0'if x == '0.0'else x) - encoded_feature_data = pd.get_dummies(self.data[feature]) - for col in encoded_feature_data.columns: - encoded_feature_data = encoded_feature_data.rename(columns={col: feature + '_' + col}) - new_categorical_features.append(feature + '_' + col) - except: - encoded_feature_data = pd.DataFrame() - - if not encoded_feature_data.empty: - old_data_columns = self.data.columns.tolist() - feature_index = old_data_columns.index(feature) - new_data_columns = old_data_columns[:feature_index] + \ - encoded_feature_data.columns.tolist() + \ - old_data_columns[feature_index+1:] - - self.data = pd.concat([self.data, encoded_feature_data], axis=1) - self.data = self.data[new_data_columns] - else: - print('Too many values for categorical feature "' + feature +'". Delete feature from data') - self.data = self.data.drop(feature, axis=1) - self.categorical_features = new_categorical_features - - def set_train_size(self, train_size): - """ - - """ - if (train_size < 0) and (train_size > 1): - print('The proportion of the training sample is not in the interval (0, 1)') - exit() - - self.train_size = train_size - self.train_end_index = round(train_size * self.data.shape[0]) - - def get_train_data(self): - return self.data.loc[:self.train_end_index] - - def get_test_data(self): - return self.data.loc[self.train_end_index:] - - -class DataLoaderAndPreprocessorExtractor: - """ - Class for loading and preprocessing data. - - :param DATA_PATH: Path to th e directory with datasets - :type DATA_PATH: string - - :param drop_features: Set of features to remove from the data - :type drop_features: list - - :param categorical_features: A set of categorical features in the data - :type categorical_features: list - - :param data: Data feature matrix - :type data: pandas.DataFrame - """ - - def __init__(self, test_name): - """ - Initializing - - :param dataset_name: Data set name - :type dataset_name: str - - :param use_extractor: use preprocessing with Extractor module or not - :type use_extractor: bool - - :param mode: Boot mode, for developers - :type mode: int - """ - - self.DATA_PATH = "../datasets/" - self.dataset_name = '' - self.suf = test_name - - self.features_names = [] - self.drop_features = [] - self.categorical_features = [] - self.data = None - - def __call__(self, dataset_name, use_extractor=False, mode=1): - - self.dataset_name = dataset_name - - if not os.path.exists('../models/' + dataset_name): - os.makedirs('../models/' + dataset_name) - - self.forecasting_model_path = '../models/' + dataset_name + '/forecaster_model_' + dataset_name + '_' + self.suf - self.normalization_model_path = '../models/' + dataset_name + '/forecaster_scaler_' + dataset_name + '_' + self.suf + '.pkl' - - if dataset_name == 'smart_crane': - self.data = pd.read_csv(self.DATA_PATH + 'IEEE_smart_crane.csv') - - if mode not in range(1, 9): - print('Wrong cycle number') - exit() - self.data = self.data[self.data['Cycle'] == mode] - - self.drop_features = ['Date'] - self.drop_labels = ['Alarm', 'Cycle'] - self.delete_features() - - if use_extractor: - self.extractor(class_column='Alarm') - - self.delete_labels() - self.features_names = self.data.columns.values - return self.data - - elif dataset_name == 'hai': - self.data = pd.read_csv(self.DATA_PATH + 'HAI_test2.csv.zip') - self.drop_features = ['timestamp'] - self.drop_labels = ['Attack'] - self.delete_features() - - if use_extractor: - self.extractor(class_column='Attack') - - self.delete_labels() - self.features_names = self.data.columns.values - return self.data - - else: - print('Unknown dataset name') - exit() - - def extractor(self, class_column, positive_class_label=1): - algo = RulesExtractor(0.1) - algo.fit(self.data, class_column=class_column, - positive_class_label=positive_class_label) - rules = algo.get_rules() - self.data = algo.transform(self.data) - - def delete_features(self): - if len(self.drop_features) > 0: - self.data = self.data.drop(self.drop_features, axis=1) - - def delete_labels(self): - if len(self.drop_labels) > 0: - try: - self.data = self.data.drop(self.drop_labels, axis=1) - except: - pass - - def categorical_features_encoding(self): - """ - Data feature preprocessing - """ - if len(self.categorical_features) > 0: - for feature in self.categorical_features: - if 'ip' in feature: - encoded_feature_data = pd.DataFrame([x.split('.') - if x != '0' else [0, 0, 0, 0] - for x in self.data[feature].tolist()]) - - for col in encoded_feature_data.columns: - encoded_feature_data = encoded_feature_data.rename(columns={col: feature + '_' + str(col)}) - else: - try: - encoded_feature_data = pd.get_dummies(self.data[feature]) - for col in encoded_feature_data.columns: - encoded_feature_data = encoded_feature_data.rename(columns={col: feature + '_' + col}) - except: - encoded_feature_data = pd.DataFrame() - - if not encoded_feature_data.empty: - old_data_columns = self.data.columns.tolist() - feature_index = old_data_columns.index(feature) - new_data_columns = old_data_columns[:feature_index] + \ - encoded_feature_data.columns.tolist() + \ - old_data_columns[feature_index + 1:] - - self.data = pd.concat([self.data, encoded_feature_data], axis=1) - self.data = self.data[new_data_columns] - else: - print('Too many values for categorical feature "' + feature + '". Delete feature from data') - self.data = self.data.drop(feature, axis=1) - self.data = self.data.fillna(0) - +import math +import os +import pandas as pd +from pandas.api.types import is_numeric_dtype +import numpy as np +import configparser +import pickle +from foressment_ai import RulesExtractor +from sklearn.preprocessing import MinMaxScaler +import matplotlib.pyplot as plt + +class DataLoaderAndPreprocessorDefault: + """ + Class for loading and preprocessing data + + :param DATA_PATH: Path to th e directory with datasets + :type DATA_PATH: string + + :param drop_features: Set of features to remove from the data + :type drop_features: list + + :param categorical_features: A set of categorical features in the data + :type categorical_features: list + + :param data: Data feature matrix + :type data: np.array + + :param feature_names: + :type feature_names: list + """ + def __init__(self, data=np.array([]), feature_names=None): + """ + Initializing + + """ + self.data = data + self.shape = self.data.shape + if not feature_names: + self.feature_names = [] + else: + self.feature_names = feature_names + self.set_train_size(0.9) + + def generate_test_data(self, shape=(1000, 1)): + self.dataset_name = 'test' + self.data = np.empty(shape=shape) + self.shape = shape + self.feature_names = [] + + self.timestamps = np.arange(0, shape[0] * 0.1, 0.1) + for i in range(shape[1]): + new_col = np.sin(self.timestamps) + np.random.normal(scale=0.5, size=len(self.timestamps)) + self.data[:, i] = new_col + self.feature_names.append('feature_' + str(i)) + + + def load_data(self, dataset_name='', nrows=None, suf='', data_configs_path="../datasets/configs", + label_format=None): + """ + Import data with configuration file. + """ + self.dataset_name = dataset_name + + config = configparser.ConfigParser() + + config = configparser.ConfigParser() + config_filename = data_configs_path + '/' + self.dataset_name + '.ini' + assert os.path.isfile(config_filename), 'Unknown dataset configuration' + config.read(config_filename) + + dataset_path = config.get('Params', 'data_path') + dataset_filename = config.get('Params', 'filename', fallback='') + sep = config.get('Params', 'sep', fallback=',') + decimal = config.get('Params', 'decimal', fallback='.') + + if dataset_filename: + self.data = pd.read_csv(dataset_path + '/' + dataset_filename, sep=sep, decimal=decimal, nrows=nrows) + else: + self.data = pd.concat([pd.read_csv(dataset_path + '/' + file, sep=sep, decimal=decimal) + for file in os.listdir(dataset_path)], + ignore_index=True) + self.data = self.data.loc[:nrows] + + self.data.columns = [c.strip() for c in self.data.columns] + + start_id = int(config.get('Other', 'start_id', fallback='0')) + self.data = self.data[start_id:].reset_index(drop=True) + + timestamp_feature = config.get('Features', 'timestamp_feature', fallback=None) + self.binaries_features = self.__config_get_list__(config, 'binaries_features') + self.drop_features = self.__config_get_list__(config, 'drop_features') + self.categorical_features = self.__config_get_list__(config, 'categorical_features') + self.important_features = self.__config_get_list__(config, 'important_features') + + if timestamp_feature: + self.timestamps = self.data[timestamp_feature].squeeze() + + time_format = config.get('Other', 'time_format', fallback='') + timestamp_unit = config.get('Other', 'timestamp_unit', fallback='') + + if time_format: + self.timestamps = self.str_data_to_time(self.timestamps, + time_format=time_format) + if timestamp_unit: + self.timestamps = self.float_data_to_time(self.timestamps, timestamp_unit) + + self.data = self.data.drop(timestamp_feature, axis=1) + + if label_format: + label_section = 'Labels.' + label_format + label_config = config[label_section] + + self.label_feature = label_config.get('label', fallback=None) + normal_label = label_config.get('normal_label', fallback='Normal') + self.labels = self.data[self.label_feature] + self.data = self.data.drop(self.label_feature, axis=1) + + if not is_numeric_dtype(self.labels): + self.__encoding_labels__(normal_label) + + self.drop_features = [f for f in self.drop_features if f in self.data.columns] + if len(self.drop_features) > 0: + self.data = self.data.drop(self.drop_features, axis=1) + + if len(self.categorical_features) > 0: + self.__categorical_feature_encoding__() + + self.feature_names = list(self.data.columns) + self.data = self.data.fillna(0) + self.data = self.data.values + self.shape = self.data.shape + + @staticmethod + def __config_get_list__(config, feature_name): + try: + return config.get('Features', feature_name).replace('\n', '').split(',') + except: + return [] + + @staticmethod + def str_data_to_time(data, time_format): + ''' + :param data: timestamp column (pd.Series) + :return: + ''' + data = data.map(lambda x: x.strip() if not pd.isnull(x) else x) + data = pd.to_datetime(data, format=time_format) + return data + + @staticmethod + def float_data_to_time(data, timestamp_unit): + ''' + :param data: timestamp column (pd.Series) + :return: + ''' + data = pd.to_datetime(data, unit=timestamp_unit) + return data + + def __encoding_labels__(self, normal_label): + labels_classes = sorted(self.labels.unique().tolist()) + labels_dictionary = {} + + if normal_label in labels_classes: + labels_dictionary[normal_label] = 0 + labels_classes.remove(normal_label) + + for i, label in enumerate(labels_classes): + labels_dictionary[label] = i + + self.labels = self.labels.map(lambda x: labels_dictionary[x]) + + def __categorical_feature_encoding__(self): + """ + Data categorical feature encoding + """ + new_categorical_features = [] + for feature in self.categorical_features: + try: + self.data[feature] = self.data[feature].astype(str) + categorical_feature_values = self.data[feature].unique().tolist() + if ('0.0' in categorical_feature_values) and ('0' in categorical_feature_values): + self.data[feature] = self.data[feature].map(lambda x: '0'if x == '0.0'else x) + encoded_feature_data = pd.get_dummies(self.data[feature]) + for col in encoded_feature_data.columns: + encoded_feature_data = encoded_feature_data.rename(columns={col: feature + '_' + col}) + new_categorical_features.append(feature + '_' + col) + except: + encoded_feature_data = pd.DataFrame() + + if not encoded_feature_data.empty: + old_data_columns = self.data.columns.tolist() + feature_index = old_data_columns.index(feature) + new_data_columns = old_data_columns[:feature_index] + \ + encoded_feature_data.columns.tolist() + \ + old_data_columns[feature_index+1:] + + self.data = pd.concat([self.data, encoded_feature_data], axis=1) + self.data = self.data[new_data_columns] + else: + print('Too many values for categorical feature "' + feature +'". Delete feature from data') + self.data = self.data.drop(feature, axis=1) + self.categorical_features = new_categorical_features + + def scale(self, scaler=None, scaler_from_file='', scaler_to_file=''): + if scaler: + self.scaler = scaler + + if scaler_from_file: + if os.path.isfile(scaler_from_file): + with open(scaler_from_file, 'rb') as file: + self.scaler = pickle.load(file) + file.close() + + else: + if not hasattr(self, 'scaler'): + self.scaler = MinMaxScaler() + self.scaler.fit(self.data) + + if scaler_to_file: + with open(scaler_to_file, 'wb') as file: + pickle.dump(self.scaler, file) + file.close() + + self.data = self.scaler.transform(self.data) + + + def inverse(self, scaler_from_file=''): + if scaler_from_file: + if os.path.isfile(scaler_from_file): + with open(scaler_from_file, 'rb') as file: + self.scaler = pickle.load(file) + file.close() + try: + return self.scaler.inverse_transform(self.data) + except: + 'No scaler to inverse' + return 1 + + + def set_train_size(self, train_size): + """ + + """ + if (train_size < 0) and (train_size > 1): + print('The proportion of the training sample is not in the interval (0, 1)') + exit() + self.train_size = train_size + + + def get_train_data(self): + train_end_index = round(self.train_size * self.data.shape[0]) + return DataLoaderAndPreprocessorDefault(self.data[:train_end_index], feature_names=self.feature_names) + + def get_test_data(self): + train_end_index = round(self.train_size * self.data.shape[0]) + return DataLoaderAndPreprocessorDefault(self.data[train_end_index:], feature_names=self.feature_names) + + def draw(self, size=1000): + n = len(self.feature_names) + nrows = 1 + ncols = 1 + if n <= 3: + nrows = n + elif n % 3 == 0: + ncols = 3 + nrows = int(n / 3) + elif n % 2 == 0: + ncols = 2 + nrows = int(n / 2) + else: + ncols = 2 + nrows = int(math.floor(n / 2)) + + fig, axs = plt.subplots(nrows, ncols, sharex='col', figsize=(ncols*5, nrows*2)) + fig.tight_layout(pad=2.3) + fig.supxlabel('Time') + i_feature = 0 + + color_map = plt.cm.get_cmap('plasma', n) + + for i in range(nrows): + if ncols == 1: + axs[i].plot(self.data[:size, i_feature], color=color_map(i_feature)) + axs[i].set_ylabel(self.feature_names[i_feature]) + i_feature = i_feature + 1 + else: + for j in range(ncols): + if i_feature > n: + break + axs[i, j].plot(self.data[:size, i_feature], color=color_map(i_feature)) + axs[i, j].set_ylabel(self.feature_names[i_feature]) + i_feature = i_feature + 1 + plt.show() + + +class DataLoaderAndPreprocessorExtractor: + """ + Class for loading and preprocessing data. + + :param DATA_PATH: Path to th e directory with datasets + :type DATA_PATH: string + + :param drop_features: Set of features to remove from the data + :type drop_features: list + + :param categorical_features: A set of categorical features in the data + :type categorical_features: list + + :param data: Data feature matrix + :type data: pandas.DataFrame + """ + + def __init__(self, test_name): + """ + Initializing + + :param dataset_name: Data set name + :type dataset_name: str + + :param use_extractor: use preprocessing with Extractor module or not + :type use_extractor: bool + + :param mode: Boot mode, for developers + :type mode: int + """ + + self.DATA_PATH = "../datasets/" + self.dataset_name = '' + self.suf = test_name + + self.features_names = [] + self.drop_features = [] + self.categorical_features = [] + self.data = None + + def __call__(self, dataset_name, use_extractor=False, mode=1): + + self.dataset_name = dataset_name + + if not os.path.exists('../models/' + dataset_name): + os.makedirs('../models/' + dataset_name) + + self.forecasting_model_path = '../models/' + dataset_name + '/forecaster_model_' + dataset_name + '_' + self.suf + self.normalization_model_path = '../models/' + dataset_name + '/forecaster_scaler_' + dataset_name + '_' + self.suf + '.pkl' + + if dataset_name == 'smart_crane': + self.data = pd.read_csv(self.DATA_PATH + 'IEEE_smart_crane.csv') + + if mode not in range(1, 9): + print('Wrong cycle number') + exit() + self.data = self.data[self.data['Cycle'] == mode] + + self.drop_features = ['Date'] + self.drop_labels = ['Alarm', 'Cycle'] + self.delete_features() + + if use_extractor: + self.extractor(class_column='Alarm') + + self.delete_labels() + self.features_names = self.data.columns.values + return self.data + + elif dataset_name == 'hai': + self.data = pd.read_csv(self.DATA_PATH + 'HAI_test2.csv.zip') + self.drop_features = ['timestamp'] + self.drop_labels = ['Attack'] + self.delete_features() + + if use_extractor: + self.extractor(class_column='Attack') + + self.delete_labels() + self.features_names = self.data.columns.values + return self.data + + else: + print('Unknown dataset name') + exit() + + def extractor(self, class_column, positive_class_label=1): + algo = RulesExtractor(0.1) + algo.fit(self.data, class_column=class_column, + positive_class_label=positive_class_label) + rules = algo.get_rules() + self.data = algo.transform(self.data) + + def delete_features(self): + if len(self.drop_features) > 0: + self.data = self.data.drop(self.drop_features, axis=1) + + def delete_labels(self): + if len(self.drop_labels) > 0: + try: + self.data = self.data.drop(self.drop_labels, axis=1) + except: + pass + + def categorical_features_encoding(self): + """ + Data feature preprocessing + """ + if len(self.categorical_features) > 0: + for feature in self.categorical_features: + if 'ip' in feature: + encoded_feature_data = pd.DataFrame([x.split('.') + if x != '0' else [0, 0, 0, 0] + for x in self.data[feature].tolist()]) + + for col in encoded_feature_data.columns: + encoded_feature_data = encoded_feature_data.rename(columns={col: feature + '_' + str(col)}) + else: + try: + encoded_feature_data = pd.get_dummies(self.data[feature]) + for col in encoded_feature_data.columns: + encoded_feature_data = encoded_feature_data.rename(columns={col: feature + '_' + col}) + except: + encoded_feature_data = pd.DataFrame() + + if not encoded_feature_data.empty: + old_data_columns = self.data.columns.tolist() + feature_index = old_data_columns.index(feature) + new_data_columns = old_data_columns[:feature_index] + \ + encoded_feature_data.columns.tolist() + \ + old_data_columns[feature_index + 1:] + + self.data = pd.concat([self.data, encoded_feature_data], axis=1) + self.data = self.data[new_data_columns] + else: + print('Too many values for categorical feature "' + feature + '". Delete feature from data') + self.data = self.data.drop(feature, axis=1) + self.data = self.data.fillna(0) + + diff --git a/foressment_ai/forecasting/forecaster_ai/logger.py b/foressment_ai/forecasting/forecaster_ai/logger.py index d900be8..9f16989 100644 --- a/foressment_ai/forecasting/forecaster_ai/logger.py +++ b/foressment_ai/forecasting/forecaster_ai/logger.py @@ -1,148 +1,148 @@ -import os -import time -from datetime import datetime -import json -import pandas as pd - -class Logger: - def __init__(self, proc, cpu_num): - try: - os.makedirs('../../../examples/forecaster_logs') - except: - pass - - self.proc = proc - self.cpu_num = cpu_num - - self.filename = None - self.log = None - self.run = False - self.show = True - - self.event_name = '' - self.text = '' - - self.logline = {'timestamp': '', - 'event_name': None, - 'text': '', - 'cpu%': None, - 'ram_mb': None - } - - def create(self, filename, rewrite=False): - self.filename = 'forecaster_logs/' + filename - if rewrite: - self.log = self.open('w') - else: - self.log = self.open('a') - - - def event_init(self, event_name='', text=''): - self.event_name = event_name - self.text = text - - def show_off(self): - self.show = False - - def show_on(self): - self.show = True - - def daemon_logger(self, show=True): - self.show = show - while self.run: - d = self.proc.as_dict(attrs=['cpu_percent', 'memory_info', 'memory_percent']) - - self.logline['cpu%'] = round(d['cpu_percent']/self.cpu_num, 2) - self.logline['ram_mb'] = round(d['memory_info'].rss / 1024 ** 2, 2) - - self.logline['timestamp'] = str(datetime.utcnow()) - self.logline['event_name'] = self.event_name - self.logline['text'] = self.text - - self.log.write(json.dumps(self.logline)) - self.log.write('\n') - self.log.flush() - os.fsync(self.log.fileno()) - - if self.show: - line = '{0}, event_type: {1}, text: {2}, CPU: {3}%, RAM: {4} Mb'.format(self.logline['timestamp'], - self.logline['event_name'], - self.logline['text'], - self.logline['cpu%'], - self.logline['ram_mb']) - print(line) - time.sleep(100/1000) - - def open(self, how): - return open(self.filename, how) - - def close(self): - self.log.close() - - def get_resources(self, event_name='all'): - logdata = self.parse_to_dataframe(self.filename) - resources = {'duration_sec': self.get_event_duration(logdata, event_name=event_name)} - for res in ['cpu%', 'ram_mb']: - for stat_param in ['min', 'mean', 'max']: - resources[res + '_' + stat_param] = self.get_resource_stat(logdata, event_name=event_name, - res=res, stat_param=stat_param) - return resources - - @staticmethod - def parse_to_dataframe(filename): - return pd.read_json(filename, lines=True) - - @staticmethod - def get_event_duration(data, event_name='all'): - if event_name == 'all': - indexA = 0 - indexB = data.index[-1] - - else: - if event_name not in data['event_name'].unique(): - print('Wrong event name') - exit() - - event_data_indices = data[data['event_name'] == event_name].index - indexA = event_data_indices[0] - indexB = event_data_indices[-1] - - start = pd.to_datetime(data.loc[indexA, 'timestamp'], format="%Y-%m-%d %H:%M:%S.%f") - end = pd.to_datetime(data.loc[indexB, 'timestamp'], format="%Y-%m-%d %H:%M:%S.%f") - duration = end - start - duration = duration.total_seconds() - return duration - - @staticmethod - def get_resource_stat(data, res, stat_param, event_name='all'): - - event_data, res_data = None, None - - if event_name == 'all': - event_data = data - else: - try: - event_data = data[data['event_name']==event_name] - except: - print('Wrong event name') - exit() - - try: - res_data = event_data[res] - except: - print('Wrong resource name') - exit() - - if stat_param == 'min': - return round(res_data.min(), 3) - elif stat_param == 'mean': - return round(res_data.mean(), 3) - elif stat_param == 'max': - return round(res_data.max(), 3) - else: - print('Wrong statistic param') - exit() - - - - +import os +import time +from datetime import datetime +import json +import pandas as pd + +class Logger: + def __init__(self, proc, cpu_num): + try: + os.makedirs('../../../examples/forecaster_logs') + except: + pass + + self.proc = proc + self.cpu_num = cpu_num + + self.filename = None + self.log = None + self.run = False + self.show = True + + self.event_name = '' + self.text = '' + + self.logline = {'timestamp': '', + 'event_name': None, + 'text': '', + 'cpu%': None, + 'ram_mb': None + } + + def create(self, filename, rewrite=False): + self.filename = 'forecaster_logs/' + filename + if rewrite: + self.log = self.open('w') + else: + self.log = self.open('a') + + + def event_init(self, event_name='', text=''): + self.event_name = event_name + self.text = text + + def show_off(self): + self.show = False + + def show_on(self): + self.show = True + + def daemon_logger(self, show=True): + self.show = show + while self.run: + d = self.proc.as_dict(attrs=['cpu_percent', 'memory_info', 'memory_percent']) + + self.logline['cpu%'] = round(d['cpu_percent']/self.cpu_num, 2) + self.logline['ram_mb'] = round(d['memory_info'].rss / 1024 ** 2, 2) + + self.logline['timestamp'] = str(datetime.utcnow()) + self.logline['event_name'] = self.event_name + self.logline['text'] = self.text + + self.log.write(json.dumps(self.logline)) + self.log.write('\n') + self.log.flush() + os.fsync(self.log.fileno()) + + if self.show: + line = '{0}, event_type: {1}, text: {2}, CPU: {3}%, RAM: {4} Mb'.format(self.logline['timestamp'], + self.logline['event_name'], + self.logline['text'], + self.logline['cpu%'], + self.logline['ram_mb']) + print(line) + time.sleep(100/1000) + + def open(self, how): + return open(self.filename, how) + + def close(self): + self.log.close() + + def get_resources(self, event_name='all'): + logdata = self.parse_to_dataframe(self.filename) + resources = {'duration_sec': self.get_event_duration(logdata, event_name=event_name)} + for res in ['cpu%', 'ram_mb']: + for stat_param in ['min', 'mean', 'max']: + resources[res + '_' + stat_param] = self.get_resource_stat(logdata, event_name=event_name, + res=res, stat_param=stat_param) + return resources + + @staticmethod + def parse_to_dataframe(filename): + return pd.read_json(filename, lines=True) + + @staticmethod + def get_event_duration(data, event_name='all'): + if event_name == 'all': + indexA = 0 + indexB = data.index[-1] + + else: + if event_name not in data['event_name'].unique(): + print('Wrong event name') + exit() + + event_data_indices = data[data['event_name'] == event_name].index + indexA = event_data_indices[0] + indexB = event_data_indices[-1] + + start = pd.to_datetime(data.loc[indexA, 'timestamp'], format="%Y-%m-%d %H:%M:%S.%f") + end = pd.to_datetime(data.loc[indexB, 'timestamp'], format="%Y-%m-%d %H:%M:%S.%f") + duration = end - start + duration = duration.total_seconds() + return duration + + @staticmethod + def get_resource_stat(data, res, stat_param, event_name='all'): + + event_data, res_data = None, None + + if event_name == 'all': + event_data = data + else: + try: + event_data = data[data['event_name']==event_name] + except: + print('Wrong event name') + exit() + + try: + res_data = event_data[res] + except: + print('Wrong resource name') + exit() + + if stat_param == 'min': + return round(res_data.min(), 3) + elif stat_param == 'mean': + return round(res_data.mean(), 3) + elif stat_param == 'max': + return round(res_data.max(), 3) + else: + print('Wrong statistic param') + exit() + + + + diff --git "a/foressment_ai/forecasting/forecaster_ai/\321\201hecker.py" "b/foressment_ai/forecasting/forecaster_ai/\321\201hecker.py" new file mode 100644 index 0000000..ebcc3af --- /dev/null +++ "b/foressment_ai/forecasting/forecaster_ai/\321\201hecker.py" @@ -0,0 +1,84 @@ +from inspect import getmembers, isfunction, signature +import keras.activations as activations +from keras import optimizers +import os +from typing import Type + + +class ParamChecker: + def __init__(self, model_type='forecaster'): + self.max_num_units = 512 + self.default_units_step = 64 + + activations_names = [name[0] for name in getmembers(activations, isfunction) + if 'x' in signature(name[1]).parameters.keys()] + + optimization_names = ['adadelta', 'adagrad', 'adam', 'adamax', 'experimentaladadelta', 'experimentaladagrad', + 'experimentaladam', 'experimentalsgd', 'nadam', 'rmsprop', 'sgd', 'ftrl', + 'lossscaleoptimizer', + 'lossscaleoptimizerv3', 'lossscaleoptimizerv1'] + + if model_type == 'forecaster': + self.params_requirements = { + 'look_back_length': {'is_type': int, 'in_range': (0, None)}, + 'n_features': {'is_type': int, 'in_range': (0, None)}, + 'horizon': {'is_type': int, 'in_range': (0, None)}, + 'n_rec_layers': {'is_type': int, 'in_range': (0, 100)}, + 'block_type': {'is_type': str, 'in_list': ['SimpleRNN', 'LSTM', 'GRU']}, + 'units': {'is_type': dict}, + 'units_of_layer': {'is_type': int, 'in_range': [1, self.max_num_units]}, + 'unit_step': {'is_type': int, 'in_range': (0, self.max_num_units - 1)}, + 'dropout': {'is_type': (float, int), 'in_range': [0, 1]}, + 'hidden_activation': {'is_type': str, 'in_list': activations_names}, + 'output_activation': {'is_type': str, 'in_list': activations_names}, + 'optimizer': {'is_type': optimizers.legacy.Adam}, + 'loss': {'is_type': str, 'in_list': ['mse', 'mae']} + } + + def check_param(self, param, param_name): + requirements = self.params_requirements[param_name] + for req_type, req in requirements.items(): + if req_type == 'is_type': + param = self.check_is_type_param(param, param_name, req) + if req_type == 'in_list': + param = self.check_in_list_param(param, param_name, req) + if req_type == 'in_range': + param = self.check_in_range_param(param, param_name, req) + return param + + def check_in_range_param(self, param, param_name, range): + min_val = range[0] + max_val = range[1] + + if isinstance(range, tuple): + if min_val is not None: + assert (param > min_val), 'Value of the "{0}" argument must be more than {1} '.format( + param_name, min_val) + if max_val is not None: + assert (param <= max_val), 'Value of the "{0}" argument must be less than {1} '.format( + param_name, max_val) + return param + + if isinstance(range, list): + if min_val is not None: + assert (param >= min_val), 'Value of the "{0}" argument must be more than {1} or equal'.format( + param_name, min_val) + if max_val is not None: + assert (param <= max_val), 'Value of the "{0}" argument must be less than {1} or equal'.format( + param_name, max_val) + return param + + def check_in_list_param(self, param, param_name, list_of_values): + assert (param in list_of_values), 'Value of the "{0}" argument must be in list {1}'.format( + param_name, list_of_values) + return param + + def check_is_type_param(self, param, param_name, stype): + assert (isinstance(param, stype)), 'Type of the "{0}" argument must be {1}'.format(param_name, stype) + return param + + @staticmethod + def check_file_is_exist(filename): + assert (os.path.isfile(filename)), 'File {0} does not exist'.format(filename) + +