Skip to content

Commit

Permalink
Add healthchecks on device
Browse files Browse the repository at this point in the history
  • Loading branch information
oscgonfer committed Nov 11, 2024
1 parent d94ec1e commit 01d3b81
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 99 deletions.
90 changes: 89 additions & 1 deletion scdata/_config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,96 @@ class Config(object):
}

### ---------------------------------------
### ------------VALUES-CHECK---------------
### ---------------------------------------
### ---------------------------------------

_default_sampling_rate = {
'AMS AS7731 - UVA': 1,
'AMS AS7731 - UVB': 1,
'AMS AS7731 - UVC': 1,
'LIGHT': 1,
'BATT': 1,
'NOISE_A': 1,
'SCD30_CO2': 1,
'SCD30_HUM': 1,
'SCD30_TEMP': 1,
'SD-card': 1,
'ST LPS33 - Barometric Pressure': 1,
'PRESS': 1,
'PMS5003_PM_1': 5,
'PMS5003_PM_25': 5,
'PMS5003_PM_10': 5,
'PMS5003_PN_03': 5,
'PMS5003_PN_03': 5,
'PMS5003_PN_05':5,
'PMS5003_PN_1':5,
'PMS5003_PN_10':5,
'PMS5003_PN_25':5,
'PMS5003_PN_5':5,
'SEN5X_HUM': 5,
'SEN5X_PM_1': 5,
'SEN5X_PM_10': 5,
'SEN5X_PM_25': 5,
'SEN5X_PM_40': 5,
'SEN5X_PN_05': 5,
'SEN5X_PN_1': 5,
'SEN5X_PN_10': 5,
'SEN5X_PN_25': 5,
'SEN5X_PN_40': 5,
'SEN5X_TPS': 5,
'SEN5X_TEMP': 5,
'SFA30_HCHO': 1,
'SFA30_HUM': 1,
'SFA30_TEMP': 1,
'ADC_48_0': 1,
'ADC_48_1': 1,
'ADC_48_2': 1,
'ADC_48_3': 1,
'ADC_49_0': 1,
'ADC_49_1': 1,
'ADC_49_2': 1,
'ADC_49_3': 1,
'CCS811_VOCS': 1,
'CCS811_ECO2': 1,
'HUM': 1,
'TEMP': 1,
'RSSI': 1,
'NO2': 1,
'O3': 1
}

_default_unplausible_values = {
'NOISE_A': [20, 99],
'SCD30_CO2': [300, 2000],
'SCD30_HUM': [20, 99],
'SCD30_TEMP': [-20, 50],
'ST LPS33 - Barometric Pressure': [50, 110],
'PRESS': [50, 110],
'PMS5003_PM_1': [0, 500],
'PMS5003_PM_25': [0, 500],
'PMS5003_PM_10': [0, 500],
'SEN5X_HUM': [20, 99],
'SEN5X_PM_1': [0, 500],
'SEN5X_PM_10': [0, 500],
'SEN5X_PM_25': [0, 500],
'SEN5X_PM_40': [0, 500],
'SEN5X_TEMP': [-20, 50],
'SFA30_HCHO': [00, 1000],
'SFA30_HUM': [20, 99],
'SFA30_TEMP': [-20, 50],
'ADC_48_0': [0, 3],
'ADC_48_1': [0, 3],
'ADC_48_2': [0, 3],
'ADC_48_3': [0, 3],
'ADC_49_0': [0, 3],
'ADC_49_1': [0, 3],
'ADC_49_2': [0, 3],
'ADC_49_3': [0, 3],
'HUM': [20, 99],
'TEMP': [-20, 50],
'NO2': [0, 1000],
'O3': [0, 1000]
}

def __init__(self):
self._env_file = None
Expand Down
173 changes: 75 additions & 98 deletions scdata/device/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,104 +510,81 @@ def update_postprocessing_date(self):
self.postprocessing_updated = False
return self.postprocessing_updated

# TODO
def health_check(self):
return True

# TODO - Decide if we keep it
# def forward(self, chunk_size = 500, dry_run = False, max_retries = 2):
# '''
# Forwards data to another api.
# Parameters
# ----------
# chunk_size: int
# 500
# Chunk size to be sent to device.post_data_to_device in question
# dry_run: boolean
# False
# Post the payload to the API or just return it
# max_retries: int
# 2
# Maximum number of retries per chunk
# Returns
# ----------
# boolean
# True if posted ok, False otherwise
# '''

# # Import requested handler
# hmod = __import__('scdata.io.device_api', fromlist = ['io.device_api'])
# Hclass = getattr(hmod, config.connectors[self.forwarding_request]['handler'])

# # Create new device in target API if it hasn't been created yet
# if self.forwarding_params is None:
# std_out('Empty forwarding information, attemping creating a new device', 'WARNING')
# # We assume the device has never been posted
# # Construct new device kwargs dictionary
# kwargs = dict()
# for item in config.connectors[self.forwarding_request]['kwargs']:
# val = config.connectors[self.forwarding_request]['kwargs'][item]
# if val == 'options':
# kitem = self.options[item]
# elif val == 'config':
# # Items in config should be underscored
# kitem = config.__getattr__(f'_{item}')
# elif isinstance(val, Iterable):
# if 'same' in val:
# if 'as_device' in val:
# if item == 'sensors':
# kitem = self.merge_sensor_metrics(ignore_empty = True)
# elif item == 'description':
# kitem = self.blueprint.replace('_', ' ')
# elif 'as_api' in val:
# if item == 'sensors':
# kitem = self.api_device.get_device_sensors()
# elif item == 'description':
# kitem = self.api_device.get_device_description()
# else:
# kitem = val
# kwargs[item] = kitem

# response = Hclass.new_device(name = config.connectors[self.forwarding_request]['name_prepend']\
# + str(self.params.id),
# location = self.location,
# dry_run = dry_run,
# **kwargs)
# if response:
# if 'message' in response:
# if response['message'] == 'Created':
# if 'sensorid' in response:
# self.forwarding_params = response['sensorid']
# self.api_device.postprocessing['forwarding_params'] = self.forwarding_params
# std_out(f'New sensor ID in {self.forwarding_request}\
# is {self.forwarding_params}. Updating')

# if self.forwarding_params is not None:
# df = self.data.copy()
# df = df[df.columns.intersection(list(self.merge_sensor_metrics(ignore_empty=True).keys()))]
# df = clean(df, 'drop', how = 'all')

# if df.empty:
# std_out('Empty dataframe, ignoring', 'WARNING')
# return False

# # Create object
# ndev = Hclass(did = self.forwarding_params)
# post_ok = ndev.post_data_to_device(df, chunk_size = chunk_size,
# dry_run = dry_run, max_retries = 2)

# if post_ok:
# # TODO Check if we like this
# if self.source == 'api':
# self.update_latest_postprocessing()
# std_out(f'Posted data for {self.params.id}', 'SUCCESS')
# else:
# std_out(f'Error posting data for {self.params.id}', 'ERROR')
# return post_ok

# else:
# std_out('Empty forwarding information', 'ERROR')
# return False
# Check nans per column, return dict
# TODO-DOCUMENT
def get_nan_ratio(self, **kwargs):
if not self.loaded:
logger.error('Need to load first (device.load())')
return False

if 'sampling_rate' not in kwargs:
sampling_rate = config._default_sampling_rate
else:
sampling_rate = kwargs['sampling_rate']
result = {}

for column in self.data.columns:
if column not in sampling_rate: continue
df = self.data[column].resample(f'{sampling_rate[column]}Min').mean()
minutes = df.groupby(df.index.date).mean().index.to_series().diff()/Timedelta('60s')
result[column] = (1-(minutes-df.isna().groupby(df.index.date).sum())/minutes)*sampling_rate[column]

return result

# Check plausibility per column, return dict. Doesn't take into account nans
# TODO-DOCUMENT
def get_plausible_ratio(self, **kwargs):
if not self.loaded:
logger.error('Need to load first (device.load())')
return False

if 'unplausible_values' not in kwargs:
unplausible_values = config._default_unplausible_values
else:
unplausible_values = kwargs['unplausible_values']

if 'sampling_rate' not in kwargs:
sampling_rate = config._default_sampling_rate
else:
sampling_rate = kwargs['sampling_rate']

return {column: self.data[column].between(left=unplausible_values[column][0], right=unplausible_values[column][1]).groupby(self.data[column].index.date).sum()/self.data.groupby(self.data.index.date).count()[column] for column in self.data.columns if column in unplausible_values}

# Check plausibility per column, return dict. Doesn't take into account nans
def get_outlier_ratio(self, **kwargs):
if not self.loaded:
logger.error('Need to load first (device.load())')
return False
result = {}
resample = '360h'

for column in self.data.columns:
Q1 = self.data[column].resample(resample).mean().quantile(0.25)
Q3 = self.data[column].resample(resample).mean().quantile(0.75)
IQR = Q3 - Q1

mask = (self.data[column] < (Q1 - 1.5 * IQR)) | (self.data[column] > (Q3 + 1.5 * IQR))
result[column] = mask.groupby(mask.index.date).mean()

return result

# Check plausibility per column, return dict. Doesn't take into account nans
def get_outliers(self, **kwargs):
if not self.loaded:
logger.error('Need to load first (device.load())')
return False
result = {}
resample = '360h'

for column in self.data.columns:
Q1 = self.data[column].resample(resample).mean().quantile(0.25)
Q3 = self.data[column].resample(resample).mean().quantile(0.75)
IQR = Q3 - Q1

mask = (self.data[column] < (Q1 - 1.5 * IQR)) | (self.data[column] > (Q3 + 1.5 * IQR))
result[column] = mask

return result

def export(self, path, forced_overwrite = False, file_format = 'csv'):
'''
Expand Down

0 comments on commit 01d3b81

Please sign in to comment.