From 01d3b81cdad708e8175c75b759799eed812ecbff Mon Sep 17 00:00:00 2001 From: oscgonfer Date: Mon, 11 Nov 2024 12:57:43 +0100 Subject: [PATCH] Add healthchecks on device --- scdata/_config/config.py | 90 +++++++++++++++++++- scdata/device/device.py | 173 +++++++++++++++++---------------------- 2 files changed, 164 insertions(+), 99 deletions(-) diff --git a/scdata/_config/config.py b/scdata/_config/config.py index 6c61c85..b5a25dd 100644 --- a/scdata/_config/config.py +++ b/scdata/_config/config.py @@ -411,8 +411,96 @@ class Config(object): } ### --------------------------------------- + ### ------------VALUES-CHECK--------------- ### --------------------------------------- - ### --------------------------------------- + + _default_sampling_rate = { + 'AMS AS7731 - UVA': 1, + 'AMS AS7731 - UVB': 1, + 'AMS AS7731 - UVC': 1, + 'LIGHT': 1, + 'BATT': 1, + 'NOISE_A': 1, + 'SCD30_CO2': 1, + 'SCD30_HUM': 1, + 'SCD30_TEMP': 1, + 'SD-card': 1, + 'ST LPS33 - Barometric Pressure': 1, + 'PRESS': 1, + 'PMS5003_PM_1': 5, + 'PMS5003_PM_25': 5, + 'PMS5003_PM_10': 5, + 'PMS5003_PN_03': 5, + 'PMS5003_PN_03': 5, + 'PMS5003_PN_05':5, + 'PMS5003_PN_1':5, + 'PMS5003_PN_10':5, + 'PMS5003_PN_25':5, + 'PMS5003_PN_5':5, + 'SEN5X_HUM': 5, + 'SEN5X_PM_1': 5, + 'SEN5X_PM_10': 5, + 'SEN5X_PM_25': 5, + 'SEN5X_PM_40': 5, + 'SEN5X_PN_05': 5, + 'SEN5X_PN_1': 5, + 'SEN5X_PN_10': 5, + 'SEN5X_PN_25': 5, + 'SEN5X_PN_40': 5, + 'SEN5X_TPS': 5, + 'SEN5X_TEMP': 5, + 'SFA30_HCHO': 1, + 'SFA30_HUM': 1, + 'SFA30_TEMP': 1, + 'ADC_48_0': 1, + 'ADC_48_1': 1, + 'ADC_48_2': 1, + 'ADC_48_3': 1, + 'ADC_49_0': 1, + 'ADC_49_1': 1, + 'ADC_49_2': 1, + 'ADC_49_3': 1, + 'CCS811_VOCS': 1, + 'CCS811_ECO2': 1, + 'HUM': 1, + 'TEMP': 1, + 'RSSI': 1, + 'NO2': 1, + 'O3': 1 + } + + _default_unplausible_values = { + 'NOISE_A': [20, 99], + 'SCD30_CO2': [300, 2000], + 'SCD30_HUM': [20, 99], + 'SCD30_TEMP': [-20, 50], + 'ST LPS33 - Barometric Pressure': [50, 110], + 'PRESS': [50, 110], + 'PMS5003_PM_1': [0, 500], + 'PMS5003_PM_25': [0, 500], + 'PMS5003_PM_10': [0, 500], + 'SEN5X_HUM': [20, 99], + 'SEN5X_PM_1': [0, 500], + 'SEN5X_PM_10': [0, 500], + 'SEN5X_PM_25': [0, 500], + 'SEN5X_PM_40': [0, 500], + 'SEN5X_TEMP': [-20, 50], + 'SFA30_HCHO': [00, 1000], + 'SFA30_HUM': [20, 99], + 'SFA30_TEMP': [-20, 50], + 'ADC_48_0': [0, 3], + 'ADC_48_1': [0, 3], + 'ADC_48_2': [0, 3], + 'ADC_48_3': [0, 3], + 'ADC_49_0': [0, 3], + 'ADC_49_1': [0, 3], + 'ADC_49_2': [0, 3], + 'ADC_49_3': [0, 3], + 'HUM': [20, 99], + 'TEMP': [-20, 50], + 'NO2': [0, 1000], + 'O3': [0, 1000] + } def __init__(self): self._env_file = None diff --git a/scdata/device/device.py b/scdata/device/device.py index 6f8ea67..1ccb7dc 100644 --- a/scdata/device/device.py +++ b/scdata/device/device.py @@ -510,104 +510,81 @@ def update_postprocessing_date(self): self.postprocessing_updated = False return self.postprocessing_updated - # TODO - def health_check(self): - return True - - # TODO - Decide if we keep it - # def forward(self, chunk_size = 500, dry_run = False, max_retries = 2): - # ''' - # Forwards data to another api. - # Parameters - # ---------- - # chunk_size: int - # 500 - # Chunk size to be sent to device.post_data_to_device in question - # dry_run: boolean - # False - # Post the payload to the API or just return it - # max_retries: int - # 2 - # Maximum number of retries per chunk - # Returns - # ---------- - # boolean - # True if posted ok, False otherwise - # ''' - - # # Import requested handler - # hmod = __import__('scdata.io.device_api', fromlist = ['io.device_api']) - # Hclass = getattr(hmod, config.connectors[self.forwarding_request]['handler']) - - # # Create new device in target API if it hasn't been created yet - # if self.forwarding_params is None: - # std_out('Empty forwarding information, attemping creating a new device', 'WARNING') - # # We assume the device has never been posted - # # Construct new device kwargs dictionary - # kwargs = dict() - # for item in config.connectors[self.forwarding_request]['kwargs']: - # val = config.connectors[self.forwarding_request]['kwargs'][item] - # if val == 'options': - # kitem = self.options[item] - # elif val == 'config': - # # Items in config should be underscored - # kitem = config.__getattr__(f'_{item}') - # elif isinstance(val, Iterable): - # if 'same' in val: - # if 'as_device' in val: - # if item == 'sensors': - # kitem = self.merge_sensor_metrics(ignore_empty = True) - # elif item == 'description': - # kitem = self.blueprint.replace('_', ' ') - # elif 'as_api' in val: - # if item == 'sensors': - # kitem = self.api_device.get_device_sensors() - # elif item == 'description': - # kitem = self.api_device.get_device_description() - # else: - # kitem = val - # kwargs[item] = kitem - - # response = Hclass.new_device(name = config.connectors[self.forwarding_request]['name_prepend']\ - # + str(self.params.id), - # location = self.location, - # dry_run = dry_run, - # **kwargs) - # if response: - # if 'message' in response: - # if response['message'] == 'Created': - # if 'sensorid' in response: - # self.forwarding_params = response['sensorid'] - # self.api_device.postprocessing['forwarding_params'] = self.forwarding_params - # std_out(f'New sensor ID in {self.forwarding_request}\ - # is {self.forwarding_params}. Updating') - - # if self.forwarding_params is not None: - # df = self.data.copy() - # df = df[df.columns.intersection(list(self.merge_sensor_metrics(ignore_empty=True).keys()))] - # df = clean(df, 'drop', how = 'all') - - # if df.empty: - # std_out('Empty dataframe, ignoring', 'WARNING') - # return False - - # # Create object - # ndev = Hclass(did = self.forwarding_params) - # post_ok = ndev.post_data_to_device(df, chunk_size = chunk_size, - # dry_run = dry_run, max_retries = 2) - - # if post_ok: - # # TODO Check if we like this - # if self.source == 'api': - # self.update_latest_postprocessing() - # std_out(f'Posted data for {self.params.id}', 'SUCCESS') - # else: - # std_out(f'Error posting data for {self.params.id}', 'ERROR') - # return post_ok - - # else: - # std_out('Empty forwarding information', 'ERROR') - # return False + # Check nans per column, return dict + # TODO-DOCUMENT + def get_nan_ratio(self, **kwargs): + if not self.loaded: + logger.error('Need to load first (device.load())') + return False + + if 'sampling_rate' not in kwargs: + sampling_rate = config._default_sampling_rate + else: + sampling_rate = kwargs['sampling_rate'] + result = {} + + for column in self.data.columns: + if column not in sampling_rate: continue + df = self.data[column].resample(f'{sampling_rate[column]}Min').mean() + minutes = df.groupby(df.index.date).mean().index.to_series().diff()/Timedelta('60s') + result[column] = (1-(minutes-df.isna().groupby(df.index.date).sum())/minutes)*sampling_rate[column] + + return result + + # Check plausibility per column, return dict. Doesn't take into account nans + # TODO-DOCUMENT + def get_plausible_ratio(self, **kwargs): + if not self.loaded: + logger.error('Need to load first (device.load())') + return False + + if 'unplausible_values' not in kwargs: + unplausible_values = config._default_unplausible_values + else: + unplausible_values = kwargs['unplausible_values'] + + if 'sampling_rate' not in kwargs: + sampling_rate = config._default_sampling_rate + else: + sampling_rate = kwargs['sampling_rate'] + + return {column: self.data[column].between(left=unplausible_values[column][0], right=unplausible_values[column][1]).groupby(self.data[column].index.date).sum()/self.data.groupby(self.data.index.date).count()[column] for column in self.data.columns if column in unplausible_values} + + # Check plausibility per column, return dict. Doesn't take into account nans + def get_outlier_ratio(self, **kwargs): + if not self.loaded: + logger.error('Need to load first (device.load())') + return False + result = {} + resample = '360h' + + for column in self.data.columns: + Q1 = self.data[column].resample(resample).mean().quantile(0.25) + Q3 = self.data[column].resample(resample).mean().quantile(0.75) + IQR = Q3 - Q1 + + mask = (self.data[column] < (Q1 - 1.5 * IQR)) | (self.data[column] > (Q3 + 1.5 * IQR)) + result[column] = mask.groupby(mask.index.date).mean() + + return result + + # Check plausibility per column, return dict. Doesn't take into account nans + def get_outliers(self, **kwargs): + if not self.loaded: + logger.error('Need to load first (device.load())') + return False + result = {} + resample = '360h' + + for column in self.data.columns: + Q1 = self.data[column].resample(resample).mean().quantile(0.25) + Q3 = self.data[column].resample(resample).mean().quantile(0.75) + IQR = Q3 - Q1 + + mask = (self.data[column] < (Q1 - 1.5 * IQR)) | (self.data[column] > (Q3 + 1.5 * IQR)) + result[column] = mask + + return result def export(self, path, forced_overwrite = False, file_format = 'csv'): '''