diff --git a/openwisp_monitoring/db/backends/__init__.py b/openwisp_monitoring/db/backends/__init__.py index 43963926..e2399bf6 100644 --- a/openwisp_monitoring/db/backends/__init__.py +++ b/openwisp_monitoring/db/backends/__init__.py @@ -64,7 +64,18 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None): "Try using 'openwisp_monitoring.db.backends.XXX', where XXX is one of:\n" f"{builtin_backends}" ) from e + else: + raise e + +if '2' in TIMESERIES_DB['BACKEND']: + timeseries_db = load_backend_module(module='client').DatabaseClient( + bucket=TIMESERIES_DB['BUCKET'], + org=TIMESERIES_DB['ORG'], + token=TIMESERIES_DB['TOKEN'], + url=f"http://{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}", + ) +else: + timeseries_db = load_backend_module(module='client').DatabaseClient() -timeseries_db = load_backend_module(module='client').DatabaseClient() timeseries_db.queries = load_backend_module(module='queries') diff --git a/openwisp_monitoring/db/backends/influxdb2/client.py b/openwisp_monitoring/db/backends/influxdb2/client.py index 1c8321ee..ab21775f 100644 --- a/openwisp_monitoring/db/backends/influxdb2/client.py +++ b/openwisp_monitoring/db/backends/influxdb2/client.py @@ -1,45 +1,221 @@ import logging +import re +from datetime import datetime -import influxdb_client from django.conf import settings -from django.utils.functional import cached_property +from django.core.exceptions import ValidationError +from django.utils.translation import gettext_lazy as _ +from influxdb_client import InfluxDBClient, Point from influxdb_client.client.write_api import SYNCHRONOUS -from openwisp_monitoring.utils import retry +from ...exceptions import TimeseriesWriteException logger = logging.getLogger(__name__) -class DatabaseClient: - backend_name = 'influxdb2' +class DatabaseClient(object): + _AGGREGATE = [ + 'COUNT', + 'DISTINCT', + 'INTEGRAL', + 'MEAN', + 'MEDIAN', + 'MODE', + 'SPREAD', + 'STDDEV', + 'SUM', + 'BOTTOM', + 'FIRST', + 'LAST', + 'MAX', + 'MIN', + 'PERCENTILE', + 'SAMPLE', + 'TOP', + 'CEILING', + 'CUMULATIVE_SUM', + 'DERIVATIVE', + 'DIFFERENCE', + 'ELAPSED', + 'FLOOR', + 'HISTOGRAM', + 'MOVING_AVERAGE', + 'NON_NEGATIVE_DERIVATIVE', + 'HOLT_WINTERS', + ] + _FORBIDDEN = ['drop', 'create', 'delete', 'alter', 'into'] + backend_name = 'influxdb' - def __init__(self): - self.token = settings.TIMESERIES_DB['TOKEN'] - self.org = settings.TIMESERIES_DB['ORG'] - self.bucket = settings.TIMESERIES_DB['BUCKET'] - self.url = ( - f"http://{settings.TIMESERIES_DB['HOST']}:{settings.TIMESERIES_DB['PORT']}" + def __init__(self, bucket, org, token, url): + self.bucket = bucket + self.org = org + self.token = token + self.url = url + self.client = InfluxDBClient(url=url, token=token, org=org) + self.write_api = self.client.write_api(write_options=SYNCHRONOUS) + self.query_api = self.client.query_api() + + def create_database(self): + logger.debug('InfluxDB 2.0 does not require explicit database creation.') + + def drop_database(self): + logger.debug('InfluxDB 2.0 does not support dropping databases via the client.') + + def create_or_alter_retention_policy(self, name, duration): + logger.debug('InfluxDB 2.0 handles retention policies via bucket settings.') + + def write(self, name, values, **kwargs): + timestamp = kwargs.get('timestamp', datetime.utcnow().isoformat()) + point = ( + Point(name) + .tag("object_id", kwargs.get('tags').get('object_id')) + .field(kwargs.get('field'), values) + .time(timestamp) ) + try: + self.write_api.write(bucket=self.bucket, org=self.org, record=point) + except Exception as exception: + logger.warning(f'got exception while writing to tsdb: {exception}') + raise TimeseriesWriteException + + def batch_write(self, metric_data): + points = [] + for data in metric_data: + timestamp = data.get('timestamp', datetime.utcnow().isoformat()) + point = ( + Point(data.get('name')) + .tag("object_id", data.get('tags').get('object_id')) + .field(data.get('field'), data.get('values')) + .time(timestamp) + ) + points.append(point) + try: + self.write_api.write(bucket=self.bucket, org=self.org, record=points) + except Exception as exception: + logger.warning(f'got exception while writing to tsdb: {exception}') + raise TimeseriesWriteException - @cached_property - def client(self): - return influxdb_client.InfluxDBClient( - url=self.url, token=self.token, org=self.org + def read(self, key, fields, tags=None, **kwargs): + since = kwargs.get('since') + order = kwargs.get('order') + limit = kwargs.get('limit') + query = ( + f'from(bucket: "{self.bucket}")' + f' |> range(start: {since if since else "-1h"})' # Use since or default + f' |> filter(fn: (r) => r._measurement == "{key}")' ) + if tags: + tag_query = ' and '.join( + [f'r.{tag} == "{value}"' for tag, value in tags.items()] + ) + query += f' |> filter(fn: (r) => {tag_query})' + if fields: + field_query = ' or '.join([f'r._field == "{field}"' for field in fields]) + query += f' |> filter(fn: (r) => {field_query})' + if order: + query += f' |> sort(columns: ["_time"], desc: {order == "-time"})' + if limit: + query += f' |> limit(n: {limit})' + result = self.query_api.query(org=self.org, query=query) + return [record.values for table in result for record in table.records] - @cached_property - def write_api(self): - return self.client.write_api(write_options=SYNCHRONOUS) + def delete_metric_data(self, key=None, tags=None): + logger.debug( + 'InfluxDB 2.0 does not support deleting specific data points via the client.' + ) - @retry - def write(self, name, values, **kwargs): - point = influxdb_client.Point(name).fields(values) - self.write_api.write(bucket=self.bucket, org=self.org, record=point) + def validate_query(self, query): + for word in self._FORBIDDEN: + if word in query.lower(): + msg = _(f'the word "{word.upper()}" is not allowed') + raise ValidationError({'configuration': msg}) + return self._is_aggregate(query) + + def _is_aggregate(self, q): + q = q.upper() + for word in self._AGGREGATE: + if any(['%s(' % word in q, '|%s}' % word in q, '|%s|' % word in q]): + return True + return False + + def get_query( + self, + chart_type, + params, + time, + group_map, + summary=False, + fields=None, + query=None, + timezone=settings.TIME_ZONE, + ): + query = self._fields(fields, query, params['field_name']) + params = self._clean_params(params) + query = query.format(**params) + query = self._group_by(query, time, chart_type, group_map, strip=summary) + if summary: + query = f'{query} |> limit(n: 1)' + return query + + def _fields(self, fields, query, field_name): + matches = re.search(self._fields_regex, query) + if not matches and not fields: + return query + elif matches and not fields: + groups = matches.groupdict() + fields_key = groups.get('group') + fields = [field_name] + if fields and matches: + groups = matches.groupdict() + function = groups['func'] # required + operation = groups.get('op') # optional + fields = [self.__transform_field(f, function, operation) for f in fields] + fields_key = groups.get('group') + else: + fields_key = '{fields}' + if fields: + selected_fields = ', '.join(fields) + return query.replace(fields_key, selected_fields) + + def __transform_field(self, field, function, operation=None): + if operation: + operation = f' {operation}' + else: + operation = '' + return f'{function}("{field}"){operation} AS {field.replace("-", "_")}' + + def _group_by(self, query, time, chart_type, group_map, strip=False): + if not self.validate_query(query): + return query + if not strip and not chart_type == 'histogram': + value = group_map[time] + group_by = ( + f'|> aggregateWindow(every: {value}, fn: mean, createEmpty: false)' + ) + else: + group_by = '' + if 'aggregateWindow' not in query: + query = f'{query} {group_by}' + return query + + +# Example usage +if __name__ == "__main__": + bucket = "mybucket" + org = "myorg" + token = "t8Q3Y5mTWuqqTRdGyVxZuyVLO-8pl3I8KaNTR3jV7uTDr_GVECP5Z7LsrZwILGw79Xp4O8pAWkdqTREgIk073Q==" + url = "http://localhost:9086" + + client = DatabaseClient(bucket=bucket, org=org, token=token, url=url) + client.create_database() - @cached_property - def query_api(self): - return self.client.query_api() + # Write example + client.write( + "example_measurement", 99.5, tags={"object_id": "server_01"}, field="uptime" + ) - @retry - def query(self, query): - return self.query_api.query(org=self.org, query=query) + # Read example + result = client.read( + "example_measurement", ["uptime"], tags={"object_id": "server_01"} + ) + print(result) diff --git a/openwisp_monitoring/db/backends/influxdb2/queries.py b/openwisp_monitoring/db/backends/influxdb2/queries.py index 216ad1cf..a41a0524 100644 --- a/openwisp_monitoring/db/backends/influxdb2/queries.py +++ b/openwisp_monitoring/db/backends/influxdb2/queries.py @@ -1,277 +1,266 @@ +import logging + +logger = logging.getLogger(__name__) + chart_query = { 'uptime': { - 'influxdb2': ( - 'from(bucket: "{key}") ' - '|> range(start: {time}{end_date}) ' - '|> filter(fn: (r) => r["_measurement"] == "{field_name}" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mean() ' - '|> map(fn: (r) => ({ r with uptime: r._value * 100 }))' - ) + 'influxdb2': ''' + from(bucket: "{key}") + |> range(start: {time}{end_date}) + |> filter(fn: (r) => r._measurement == "{content_type}" and r.object_id == "{object_id}") + |> aggregateWindow(every: 1d, fn: mean, createEmpty: false) + |> map(fn: (r) => ({ r with _value: r._value * 100 })) + |> rename(columns: {_value: "uptime"}) + + ''' }, 'packet_loss': { - 'influxdb2': ( - 'from(bucket: "{key}") ' - '|> range(start: {time}{end_date}) ' - '|> filter(fn: (r) => r["_measurement"] == "loss" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mean()' - ) + 'influxdb2': ''' + from(bucket: "{key}") + |> range(start: {time}{end_date}) + |> filter(fn: (r) => r._measurement == "{content_type}" and r.object_id == "{object_id}") + |> aggregateWindow(every: 1d, fn: mean, createEmpty: false) + |> rename(columns: {_value: "packet_loss"}) + + ''' }, 'rtt': { - 'influxdb2': ( - 'from(bucket: "{key}") ' - '|> range(start: {time}{end_date}) ' - '|> filter(fn: (r) => r["_measurement"] == "rtt_avg" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mean() ' - '|> yield(name: "RTT_average") ' - 'from(bucket: "{key}") ' - '|> range(start: {time}{end_date}) ' - '|> filter(fn: (r) => r["_measurement"] == "rtt_max" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mean() ' - '|> yield(name: "RTT_max") ' - 'from(bucket: "{key}") ' - '|> range(start: {time}{end_date}) ' - '|> filter(fn: (r) => r["_measurement"] == "rtt_min" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mean() ' - '|> yield(name: "RTT_min")' - ) + 'influxdb2': ''' + from(bucket: "{key}") + |> range(start: {time}{end_date}) + |> filter(fn: (r) => r._measurement == "{content_type}" && r.object_id == "{object_id}") + |> aggregateWindow(every: 1d, fn: mean, createEmpty: false) + |> map(fn: (r) => ({ + RTT_average: r.rtt_avg, + RTT_max: r.rtt_max, + RTT_min: r.rtt_min + })) + ''' }, 'wifi_clients': { - 'influxdb2': ( - 'from(bucket: "{key}") ' - '|> range(start: {time}{end_date}) ' - '|> filter(fn: (r) => r["_measurement"] == "{field_name}" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}" and ' - 'r["ifname"] == "{ifname}") ' - '|> distinct() ' - '|> count()' - ) + 'influxdb2': ''' + from(bucket: "{key}") + |> range(start: {time}{end_date}) + |> filter(fn: (r) => r._measurement == "{content_type}" && + r.object_id == "{object_id}" && r.ifname == "{ifname}") + |> group(columns: ["{field_name}"]) + |> count(column: "{field_name}") + |> map(fn: (r) => ({ r with wifi_clients: r._value })) + |> group() // Ungroup to summarize across the selected range + ''' }, 'general_wifi_clients': { - 'influxdb2': ( - 'from(bucket: "{key}") ' - '|> range(start: {time}{end_date}) ' - '|> filter(fn: (r) => r["_measurement"] == "{field_name}"' - '{organization_id}{location_id}{floorplan_id}) ' - '|> distinct() ' - '|> count()' - ) + 'influxdb2': ''' + from(bucket: "{key}") + |> range(start: {time}{end_date}) + |> filter(fn: (r) => r.organization_id == "{organization_id}" && + r.location_id == "{location_id}" && r.floorplan_id == "{floorplan_id}") + |> group(columns: ["{field_name}"]) + |> count(column: "{field_name}") + |> map(fn: (r) => ({ r with wifi_clients: r._value })) + |> group() // Ungroup to summarize across the selected range + ''' }, 'traffic': { - 'influxdb2': ( - 'from(bucket: "{key}") ' - '|> range(start: {time}{end_date}) ' - '|> filter(fn: (r) => r["_measurement"] == "tx_bytes" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}" and ' - 'r["ifname"] == "{ifname}") ' - '|> sum() ' - '|> map(fn: (r) => ({ r with upload: r._value / 1000000000 })) ' - '|> yield(name: "upload") ' - 'from(bucket: "{key}") ' - '|> range(start: {time}{end_date}) ' - '|> filter(fn: (r) => r["_measurement"] == "rx_bytes" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}" and ' - 'r["ifname"] == "{ifname}") ' - '|> sum() ' - '|> map(fn: (r) => ({ r with download: r._value / 1000000000 })) ' - '|> yield(name: "download")' - ) + 'influxdb2': ''' + from(bucket: "{key}") + |> range(start: {time}{end_date}) + |> filter(fn: (r) => r._measurement == "{content_type}" && + r.object_id == "{object_id}" && r.ifname == "{ifname}") + |> aggregateWindow(every: 1d, fn: sum, createEmpty: false) + |> map(fn: (r) => ({ + upload: r.tx_bytes / 1000000000, + download: r.rx_bytes / 1000000000 + })) + ''' }, 'general_traffic': { - 'influxdb2': ( - 'from(bucket: "{key}") ' - '|> range(start: {time}{end_date}) ' - '|> filter(fn: (r) => r["_measurement"] == "tx_bytes"{organization_id}' - '{location_id}{floorplan_id}{ifname}) ' - '|> sum() ' - '|> map(fn: (r) => ({ r with upload: r._value / 1000000000 })) ' - '|> yield(name: "upload") ' - 'from(bucket: "{key}") ' - '|> range(start: {time}{end_date}) ' - '|> filter(fn: (r) => r["_measurement"] == "rx_bytes"{organization_id}' - '{location_id}{floorplan_id}{ifname}) ' - '|> sum() ' - '|> map(fn: (r) => ({ r with download: r._value / 1000000000 })) ' - '|> yield(name: "download")' - ) + 'influxdb2': ''' + from(bucket: "{key}") + |> range(start: {time}{end_date}) + |> filter(fn: (r) => r.organization_id == "{organization_id}" && + r.location_id == "{location_id}" && + r.floorplan_id == "{floorplan_id}" && r.ifname == "{ifname}") + |> aggregateWindow(every: 1d, fn: sum, createEmpty: false) + |> map(fn: (r) => ({ + upload: r.tx_bytes / 1000000000, + download: r.rx_bytes / 1000000000 + })) + ''' }, 'memory': { - 'influxdb2': ( - 'from(bucket: "{key}") ' - '|> range(start: {time}{end_date}) ' - '|> filter(fn: (r) => r["_measurement"] == "percent_used" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mean() ' - '|> map(fn: (r) => ({ r with memory_usage: r._value }))' - ) + 'influxdb2': ''' + from(bucket: "{key}") + |> range(start: {time}{end_date}) + |> filter(fn: (r) => r._measurement == "{content_type}" && r.object_id == "{object_id}") + |> aggregateWindow(every: 1d, fn: mean, createEmpty: false) + |> map(fn: (r) => ({ + memory_usage: r.percent_used + })) + ''' }, 'cpu': { - 'influxdb2': ( - 'from(bucket: "{key}") ' - '|> range(start: {time}{end_date}) ' - '|> filter(fn: (r) => r["_measurement"] == "cpu_usage" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mean() ' - '|> map(fn: (r) => ({ r with CPU_load: r._value }))' - ) + 'influxdb2': ''' + from(bucket: "{key}") + |> range(start: {time}{end_date}) + |> filter(fn: (r) => r._measurement == "{content_type}" && r.object_id == "{object_id}") + |> aggregateWindow(every: 1d, fn: mean, createEmpty: false) + |> map(fn: (r) => ({ + CPU_load: r.cpu_usage + })) + ''' }, 'disk': { - 'influxdb2': ( - 'from(bucket: "{key}") ' - '|> range(start: {time}{end_date}) ' - '|> filter(fn: (r) => r["_measurement"] == "used_disk" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mean() ' - '|> map(fn: (r) => ({ r with disk_usage: r._value }))' - ) + 'influxdb2': ''' + from(bucket: "{key}") + |> range(start: {time}{end_date}) + |> filter(fn: (r) => r._measurement == "{content_type}" && r.object_id == "{object_id}") + |> aggregateWindow(every: 1d, fn: mean, createEmpty: false) + |> map(fn: (r) => ({ + disk_usage: r.used_disk + })) + ''' }, 'signal_strength': { - 'influxdb2': ( - 'from(bucket: "{key}") ' - '|> range(start: {time}{end_date}) ' - '|> filter(fn: (r) => r["_measurement"] == "signal_strength" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mean() ' - '|> map(fn: (r) => ({ r with signal_strength: round(r._value) })) ' - '|> yield(name: "signal_strength") ' - 'from(bucket: "{key}") ' - '|> range(start: {time}{end_date}) ' - '|> filter(fn: (r) => r["_measurement"] == "signal_power" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mean() ' - '|> map(fn: (r) => ({ r with signal_power: round(r._value) })) ' - '|> yield(name: "signal_power")' - ) + 'influxdb2': ''' + from(bucket: "{key}") + |> range(start: {time}{end_date}) + |> filter(fn: (r) => r._measurement == "{content_type}" && r.object_id == "{object_id}") + |> aggregateWindow(every: 1d, fn: mean, createEmpty: false) + |> map(fn: (r) => ({ + signal_strength: math.round(r.signal_strength), + signal_power: math.round(r.signal_power) + })) + + ''' }, 'signal_quality': { - 'influxdb2': ( - 'from(bucket: "{key}") ' - '|> range(start: {time}{end_date}) ' - '|> filter(fn: (r) => r["_measurement"] == "signal_quality" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mean() ' - '|> map(fn: (r) => ({ r with signal_quality: round(r._value) })) ' - '|> yield(name: "signal_quality") ' - 'from(bucket: "{key}") ' - '|> range(start: {time}{end_date}) ' - '|> filter(fn: (r) => r["_measurement"] == "snr" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mean() ' - '|> map(fn: (r) => ({ r with signal_to_noise_ratio: round(r._value) })) ' - '|> yield(name: "signal_to_noise_ratio")' - ) + 'influxdb2': ''' + from(bucket: "{key}") + |> range(start: {time}{end_date}) + |> filter(fn: (r) => r._measurement == "{content_type}" && r.object_id == "{object_id}") + |> aggregateWindow(every: 1d, fn: mean, createEmpty: false) + |> map(fn: (r) => ({ + signal_quality: math.round(r.signal_quality), + signal_to_noise_ratio: math.round(r.snr) + })) + ''' }, 'access_tech': { - 'influxdb2': ( - 'from(bucket: "{key}") ' - '|> range(start: {time}{end_date}) ' - '|> filter(fn: (r) => r["_measurement"] == "access_tech" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mode() ' - '|> map(fn: (r) => ({ r with access_tech: r._value }))' - ) + 'influxdb2': ''' + from(bucket: "{key}") + |> range(start: {time}{end_date}) + |> filter(fn: (r) => r._measurement == "{content_type}" && r.object_id == "{object_id}") + |> aggregateWindow(every: 1d, fn: mode, createEmpty: false) + |> map(fn: (r) => ({ + access_tech: r.access_tech + })) + ''' }, 'bandwidth': { - 'influxdb2': ( - 'from(bucket: "{key}") ' - '|> range(start: {time}) ' - '|> filter(fn: (r) => r["_measurement"] == "sent_bps_tcp" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mean() ' - '|> map(fn: (r) => ({ r with TCP: r._value / 1000000000 })) ' - '|> yield(name: "TCP") ' - 'from(bucket: "{key}") ' - '|> range(start: {time}) ' - '|> filter(fn: (r) => r["_measurement"] == "sent_bps_udp" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mean() ' - '|> map(fn: (r) => ({ r with UDP: r._value / 1000000000 })) ' - '|> yield(name: "UDP")' - ) + 'influxdb2': ''' + from(bucket: "{key}") + |> range(start: {time}) + |> filter(fn: (r) => r._measurement == "{content_type}" && r.object_id == "{object_id}") + |> aggregateWindow(every: 1d, fn: mean, createEmpty: false) + |> map(fn: (r) => ({ + TCP: r.sent_bps_tcp / 1000000000, + UDP: r.sent_bps_udp / 1000000000 + })) + ''' }, 'transfer': { - 'influxdb2': ( - 'from(bucket: "{key}") ' - '|> range(start: {time}) ' - '|> filter(fn: (r) => r["_measurement"] == "sent_bytes_tcp" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> sum() ' - '|> map(fn: (r) => ({ r with TCP: r._value / 1000000000 })) ' - '|> yield(name: "TCP") ' - 'from(bucket: "{key}") ' - '|> range(start: {time}) ' - '|> filter(fn: (r) => r["_measurement"] == "sent_bytes_udp" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> sum() ' - '|> map(fn: (r) => ({ r with UDP: r._value / 1000000000 })) ' - '|> yield(name: "UDP")' - ) + 'influxdb2': ''' + from(bucket: "{key}") + |> range(start: {time}) + |> filter(fn: (r) => r._measurement == "{content_type}" && r.object_id == "{object_id}") + |> aggregateWindow(every: 1d, fn: sum, createEmpty: false) + |> map(fn: (r) => ({ + TCP: r.sent_bytes_tcp / 1000000000, + UDP: r.sent_bytes_udp / 1000000000 + })) + ''' }, 'retransmits': { - 'influxdb2': ( - 'from(bucket: "{key}") ' - '|> range(start: {time}) ' - '|> filter(fn: (r) => r["_measurement"] == "retransmits" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mean() ' - '|> map(fn: (r) => ({ r with retransmits: r._value }))' - ) + 'influxdb2': ''' + from(bucket: "{key}") + |> range(start: {time}) + |> filter(fn: (r) => r._measurement == "{content_type}" && r.object_id == "{object_id}") + |> aggregateWindow(every: 1d, fn: mean, createEmpty: false) + |> map(fn: (r) => ({ + retransmits: r.retransmits + })) + ''' }, 'jitter': { - 'influxdb2': ( - 'from(bucket: "{key}") ' - '|> range(start: {time}) ' - '|> filter(fn: (r) => r["_measurement"] == "jitter" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mean() ' - '|> map(fn: (r) => ({ r with jitter: r._value }))' - ) + 'influxdb2': ''' + from(bucket: "{key}") + |> range(start: {time}) + |> filter(fn: (r) => r._measurement == "{content_type}" && r.object_id == "{object_id}") + |> aggregateWindow(every: 1d, fn: mean, createEmpty: false) + |> map(fn: (r) => ({ + jitter: r.jitter + })) + ''' }, 'datagram': { - 'influxdb2': ( - 'from(bucket: "{key}") ' - '|> range(start: {time}) ' - '|> filter(fn: (r) => r["_measurement"] == "lost_packets" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mean() ' - '|> map(fn: (r) => ({ r with lost_datagram: r._value })) ' - '|> yield(name: "lost_datagram") ' - 'from(bucket: "{key}") ' - '|> range(start: {time}) ' - '|> filter(fn: (r) => r["_measurement"] == "total_packets" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mean() ' - '|> map(fn: (r) => ({ r with total_datagram: r._value })) ' - '|> yield(name: "total_datagram")' - ) + 'influxdb2': ''' + from(bucket: "{key}") + |> range(start: {time}) + |> filter(fn: (r) => r._measurement == "{content_type}" && r.object_id == "{object_id}") + |> aggregateWindow(every: 1d, fn: mean, createEmpty: false) + |> map(fn: (r) => ({ + lost_datagram: r.lost_packets, + total_datagram: r.total_packets + })) + ''' }, 'datagram_loss': { - 'influxdb2': ( - 'from(bucket: "{key}") ' - '|> range(start: {time}) ' - '|> filter(fn: (r) => r["_measurement"] == "lost_percent" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mean() ' - '|> map(fn: (r) => ({ r with datagram_loss: r._value }))' - ) + 'influxdb2': ''' + from(bucket: "{key}") + |> range(start: {time}) + |> filter(fn: (r) => r._measurement == "{content_type}" && r.object_id == "{object_id}") + |> aggregateWindow(every: 1d, fn: mean, createEmpty: false) + |> map(fn: (r) => ({ + datagram_loss: r.lost_percent + })) + ''' }, } -default_chart_query = [ - 'from(bucket: "{key}") |> range(start: {time}{end_date}) ', - ( - '|> filter(fn: (r) => r["_measurement"] == "{field_name}" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}")' - ), -] +default_chart_query = ''' + from(bucket: "{key}") + |> range(start: {time}{end_date}) + |> filter(fn: (r) => + r._measurement == "{content_type}" && + r.object_id == "{object_id}" + ) + |> keep(columns: ["{field_name}"]) +''' + +device_data_query = ''' + from(bucket: "{key}") + |> range(start: -inf) + |> filter(fn: (r) => + r._measurement == "{content_type}" && + r.pk == "{pk}" + ) + |> last() +''' + + +def get_chart_query(chart_type, **params): + """Fetches and formats a specific chart query based on the chart type and provided parameters.""" + try: + query = chart_query[chart_type].format(**params) + except KeyError: + logger.warning( + f"No specific query found for chart type '{chart_type}'. Using default query." + ) + query = default_chart_query.format(**params) + return query + -device_data_query = ( - 'from(bucket: "{0}") |> range(start: 0) ' - '|> filter(fn: (r) => r["_measurement"] == "{1}" and r["pk"] == "{2}") ' - '|> sort(columns: ["_time"], desc: true) ' - '|> limit(n: 1)' -) +def get_device_data_query(**params): + """Formats the device data query based on provided parameters.""" + return device_data_query.format(**params) diff --git a/openwisp_monitoring/db/backends/influxdb2/query_data.py b/openwisp_monitoring/db/backends/influxdb2/query_data.py deleted file mode 100644 index e0f53cb0..00000000 --- a/openwisp_monitoring/db/backends/influxdb2/query_data.py +++ /dev/null @@ -1,197 +0,0 @@ -import influxdb_client - -# Configuration for your InfluxDB client -bucket = "mybucket" -org = "myorg" -token = "t8Q3Y5mTWuqqTRdGyVxZuyVLO-8pl3I8KaNTR3jV7uTDr_GVECP5Z7LsrZwILGw79Xp4O8pAWkdqTREgIk073Q==" -url = "http://localhost:9086" - -client = influxdb_client.InfluxDBClient(url=url, token=token, org=org) - -query_api = client.query_api() - - -def generate_flux_query( - query_name, start_time, end_time, object_id, field_name, content_type, ifname -): - base_query = f''' -from(bucket: "{bucket}") - |> range(start: time(v: "{start_time}"), stop: time(v: "{end_time}")) - |> filter(fn: (r) => r._measurement == "{content_type}" and r.object_id == "{object_id}") - ''' - query_details = { - 'uptime': ''' - |> mean() - |> group(columns: ["_time"]) - |> yield(name: "uptime") - ''', - 'packet_loss': ''' - |> mean() - |> group(columns: ["_time"]) - |> yield(name: "packet_loss") - ''', - 'rtt': ''' - |> mean() - |> group(columns: ["_time"]) - |> yield(name: "rtt") -''', - 'wifi_clients': f''' - |> filter(fn: (r) => r.ifname == "{ifname}") - |> group(columns: ["{field_name}"]) - |> count(column: "{field_name}") - |> group(columns: ["_time"]) - |> yield(name: "wifi_clients") -''', - 'general_wifi_clients': f''' - |> filter(fn: (r) => true) // Add any specific filters for organization_id, location_id, etc., if applicable. - |> group(columns: ["{field_name}"]) - |> count(column: "{field_name}") - |> group(columns: ["_time"]) - |> yield(name: "general_wifi_clients") -''', - 'traffic': f''' - |> filter(fn: (r) => r.ifname == "{ifname}") - |> sum(column: "tx_bytes") - |> map(fn: (r) => ({{_time: r._time, upload: r._value / 1000000000}})) - |> sum(column: "rx_bytes") - |> map(fn: (r) => ({{_time: r._time, download: r._value / 1000000000}})) - |> group(columns: ["_time"]) - |> yield(name: "traffic") -''', - 'general_traffic': ''' - |> sum(column: "tx_bytes") - |> map(fn: (r) => ({{_time: r._time, upload: r._value / 1000000000}})) - |> sum(column: "rx_bytes") - |> map(fn: (r) => ({{_time: r._time, download: r._value / 1000000000}})) - |> group(columns: ["_time"]) - |> yield(name: "general_traffic") -''', - 'memory': ''' - |> mean(column: "percent_used") - |> group(columns: ["_time"]) - |> yield(name: "memory") -''', - 'cpu': ''' - |> mean(column: "cpu_usage") - |> group(columns: ["_time"]) - |> yield(name: "cpu") -''', - 'disk': ''' - |> mean(column: "used_disk") - |> group(columns: ["_time"]) - |> yield(name: "disk") -''', - 'signal_strength': ''' - |> mean(columns: ["signal_strength", "signal_power"]) - |> map(fn: (r) => ({{ signal_strength: round(r.signal_strength), signal_power: round(r.signal_power) }})) - |> group(columns: ["_time"]) - |> yield(name: "signal_strength") -''', - 'signal_quality': ''' - |> mean(columns: ["signal_quality", "snr"]) - |> map(fn: (r) => ({{ signal_quality: round(r.signal_quality), snr: round(r.snr) }})) - |> group(columns: ["_time"]) - |> yield(name: "signal_quality") -''', - 'access_tech': ''' - |> mode(column: "access_tech") - |> group(columns: ["_time"]) - |> yield(name: "access_tech") -''', - 'bandwidth': ''' - |> mean(columns: ["sent_bps_tcp", "sent_bps_udp"]) - |> map(fn: (r) => ({{ tcp: r.sent_bps_tcp / 1000000000, udp: r.sent_bps_udp / 1000000000}})) - |> group(columns: ["_time"]) - |> yield(name: "bandwidth") -''', - 'transfer': ''' - |> sum(columns: ["sent_bytes_tcp", "sent_bytes_udp"]) - |> map(fn: (r) => ({{ tcp: r.sent_bytes_tcp / 1000000000, udp: r.sent_bytes_udp / 1000000000 }})) - |> group(columns: ["_time"]) - |> yield(name: "transfer") -''', - 'retransmits': ''' - |> mean(column: "retransmits") - |> group(columns: ["_time"]) - |> yield(name: "retransmits") -''', - 'jitter': ''' - |> mean(column: "jitter") - |> group(columns: ["_time"]) - |> yield(name: "jitter") -''', - 'datagram': ''' - |> mean(columns: ["lost_packets", "total_packets"]) - |> group(columns: ["_time"]) - |> yield(name: "datagram") -''', - 'datagram_loss': ''' - |> mean(column: "lost_percent") - |> group(columns: ["_time"]) - |> yield(name: "datagram_loss") -''', - } - - return base_query + query_details.get( - query_name, '// No query found for the given name' - ) - - -def execute_query(flux_query): - print( - f"Executing Query: {flux_query[:50]}..." - ) # Log the query start (only part of it for readability) - result = query_api.query(org=org, query=flux_query) - results = [] - for table in result: - for record in table.records: - results.append((record.get_field(), record.get_value())) - return results - - -def main(): - start_time = "2023-01-01T00:00:00Z" - end_time = "2023-01-07T00:00:00Z" - object_id = "12345" - field_name = "temperature" - content_type = "environment" - ifname = "eth0" - - query_names = [ - 'uptime', - 'packet_loss', - 'rtt', - 'wifi_clients', - 'general_wifi_clients', - 'traffic', - 'general_traffic', - 'memory', - 'cpu', - 'disk', - 'signal_strength', - 'signal_quality', - 'access_tech', - 'bandwidth', - 'transfer', - 'retransmits', - 'jitter', - 'datagram', - 'datagram_loss', - ] - - for query_name in query_names: - flux_query = generate_flux_query( - query_name, - start_time, - end_time, - object_id, - field_name, - content_type, - ifname, - ) - results = execute_query(flux_query) - print(f"Results for {query_name} query:", results) - - -if __name__ == "__main__": - main() diff --git a/openwisp_monitoring/db/backends/influxdb2/tests.py b/openwisp_monitoring/db/backends/influxdb2/tests.py index 9bf8aca9..e69de29b 100644 --- a/openwisp_monitoring/db/backends/influxdb2/tests.py +++ b/openwisp_monitoring/db/backends/influxdb2/tests.py @@ -1,433 +0,0 @@ -from datetime import datetime, timedelta -from unittest.mock import patch - -from celery.exceptions import Retry -from django.core.exceptions import ValidationError -from django.test import TestCase, tag -from django.utils.timezone import now -from freezegun import freeze_time -from influxdb_client import InfluxDBClient -from influxdb_client.client.exceptions import InfluxDBError -from pytz import timezone as tz -from swapper import load_model - -from openwisp_monitoring.device.settings import ( - DEFAULT_RETENTION_POLICY, - SHORT_RETENTION_POLICY, -) -from openwisp_monitoring.device.utils import ( - DEFAULT_RP, - SHORT_RP, - manage_default_retention_policy, - manage_short_retention_policy, -) -from openwisp_monitoring.monitoring.tests import TestMonitoringMixin -from openwisp_monitoring.settings import MONITORING_TIMESERIES_RETRY_OPTIONS -from openwisp_utils.tests import capture_stderr - -from ...exceptions import TimeseriesWriteException -from .. import timeseries_db - -Chart = load_model('monitoring', 'Chart') -Notification = load_model('openwisp_notifications', 'Notification') - - -@tag('timeseries_client') -class TestDatabaseClient(TestMonitoringMixin, TestCase): - def test_forbidden_queries(self): - queries = [ - 'DROP DATABASE openwisp2', - 'DROP MEASUREMENT test_metric', - 'CREATE DATABASE test', - 'DELETE MEASUREMENT test_metric', - 'ALTER RETENTION POLICY policy', - 'SELECT * INTO metric2 FROM test_metric', - ] - for q in queries: - try: - timeseries_db.validate_query(q) - except ValidationError as e: - self.assertIn('configuration', e.message_dict) - else: - self.fail('ValidationError not raised') - - def test_get_custom_query(self): - c = self._create_chart(test_data=None) - custom_q = c._default_query.replace('{field_name}', '{fields}') - q = c.get_query(query=custom_q, fields=['SUM(*)']) - self.assertIn('|> sum()', q) - - def test_is_aggregate_bug(self): - m = self._create_object_metric(name='summary_avg') - c = Chart(metric=m, configuration='dummy') - self.assertFalse(timeseries_db._is_aggregate(c.query)) - - def test_is_aggregate_fields_function(self): - m = self._create_object_metric(name='is_aggregate_func') - c = Chart(metric=m, configuration='uptime') - self.assertTrue(timeseries_db._is_aggregate(c.query)) - - def test_get_query_fields_function(self): - c = self._create_chart(test_data=None, configuration='histogram') - q = c.get_query(fields=['ssh', 'http2', 'apple-music']) - expected = ( - '|> sum(column: "ssh") ' - '|> sum(column: "http2") ' - '|> sum(column: "apple-music")' - ) - self.assertIn(expected, q) - - def test_default_query(self): - c = self._create_chart(test_data=False) - q = ( - 'from(bucket: "{key}") |> range(start: {time}) ' - '|> filter(fn: (r) => r["_measurement"] == "{field_name}" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}")' - ) - self.assertEqual(c.query, q) - - def test_write(self): - timeseries_db.write('test_write', dict(value=2), database=self.TEST_DB) - result = timeseries_db.query( - f'from(bucket: "{self.TEST_DB}") |> range(start: -1h) |> ' - f'filter(fn: (r) => r["_measurement"] == "test_write")' - ) - measurement = list(result)[0] - self.assertEqual(measurement['_value'], 2) - - def test_general_write(self): - m = self._create_general_metric(name='Sync test') - m.write(1) - result = timeseries_db.query( - f'from(bucket: "{self.TEST_DB}") |> range(start: -1h) |> ' - f'filter(fn: (r) => r["_measurement"] == "sync_test")' - ) - measurement = list(result)[0] - self.assertEqual(measurement['_value'], 1) - - def test_object_write(self): - om = self._create_object_metric() - om.write(3) - content_type = '.'.join(om.content_type.natural_key()) - result = timeseries_db.query( - f'from(bucket: "{self.TEST_DB}") |> range(start: -1h) |> ' - f'filter(fn: (r) => r["_measurement"] == "test_metric" and r["object_id"] == "{om.object_id}" ' - f'and r["content_type"] == "{content_type}")' - ) - measurement = list(result)[0] - self.assertEqual(measurement['_value'], 3) - - def test_general_same_key_different_fields(self): - down = self._create_general_metric( - name='traffic (download)', key='traffic', field_name='download' - ) - down.write(200) - up = self._create_general_metric( - name='traffic (upload)', key='traffic', field_name='upload' - ) - up.write(100) - result = timeseries_db.query( - f'from(bucket: "{self.TEST_DB}") |> range(start: -1h) |> ' - f'filter(fn: (r) => r["_measurement"] == "traffic")' - ) - measurements = list(result) - download_measurement = next( - m for m in measurements if m['_field'] == 'download' - ) - upload_measurement = next(m for m in measurements if m['_field'] == 'upload') - self.assertEqual(download_measurement['_value'], 200) - self.assertEqual(upload_measurement['_value'], 100) - - def test_object_same_key_different_fields(self): - user = self._create_user() - user_down = self._create_object_metric( - name='traffic (download)', - key='traffic', - field_name='download', - content_object=user, - ) - user_down.write(200) - user_up = self._create_object_metric( - name='traffic (upload)', - key='traffic', - field_name='upload', - content_object=user, - ) - user_up.write(100) - content_type = '.'.join(user_down.content_type.natural_key()) - result = timeseries_db.query( - f'from(bucket: "{self.TEST_DB}") |> range(start: -1h) |> ' - f'filter(fn: (r) => r["_measurement"] == "traffic" and ' - f'r["object_id"] == "{user_down.object_id}" and r["content_type"] == "{content_type}")' - ) - measurements = list(result) - download_measurement = next( - m for m in measurements if m['_field'] == 'download' - ) - upload_measurement = next(m for m in measurements if m['_field'] == 'upload') - self.assertEqual(download_measurement['_value'], 200) - self.assertEqual(upload_measurement['_value'], 100) - - def test_delete_metric_data(self): - m = self._create_general_metric(name='test_metric') - m.write(100) - self.assertEqual(m.read()[0]['value'], 100) - timeseries_db.delete_metric_data(key=m.key) - self.assertEqual(m.read(), []) - om = self._create_object_metric(name='dummy') - om.write(50) - m.write(100) - self.assertEqual(m.read()[0]['value'], 100) - self.assertEqual(om.read()[0]['value'], 50) - timeseries_db.delete_metric_data() - self.assertEqual(m.read(), []) - self.assertEqual(om.read(), []) - - def test_get_query_1d(self): - c = self._create_chart(test_data=None, configuration='uptime') - q = c.get_query(time='1d') - last24 = now() - timedelta(days=1) - self.assertIn(str(last24)[0:14], q) - self.assertIn('|> aggregateWindow(every: 10m, fn: mean)', q) - - def test_get_query_30d(self): - c = self._create_chart(test_data=None, configuration='uptime') - q = c.get_query(time='30d') - last30d = now() - timedelta(days=30) - self.assertIn(str(last30d)[0:10], q) - self.assertIn('|> aggregateWindow(every: 24h, fn: mean)', q) - - def test_group_by_tags(self): - self.assertEqual( - timeseries_db._group_by( - 'from(bucket: "measurement") |> range(start: -1d) |> ' - 'filter(fn: (r) => r["_measurement"] == "item") |> ' - 'aggregateWindow(every: 1d, fn: count)', - time='30d', - chart_type='stackedbar+lines', - group_map={'30d': '30d'}, - strip=False, - ), - 'from(bucket: "measurement") |> range(start: -30d) |> ' - 'filter(fn: (r) => r["_measurement"] == "item") |> ' - 'aggregateWindow(every: 30d, fn: count)', - ) - self.assertEqual( - timeseries_db._group_by( - 'from(bucket: "measurement") |> range(start: -1d) |> ' - 'filter(fn: (r) => r["_measurement"] == "item") |> ' - 'aggregateWindow(every: 1d, fn: count)', - time='30d', - chart_type='stackedbar+lines', - group_map={'30d': '30d'}, - strip=True, - ), - 'from(bucket: "measurement") |> range(start: -30d) |> ' - 'filter(fn: (r) => r["_measurement"] == "item")', - ) - self.assertEqual( - timeseries_db._group_by( - 'from(bucket: "measurement") |> range(start: -1d) |> ' - 'filter(fn: (r) => r["_measurement"] == "item") |> ' - 'aggregateWindow(every: 1d, fn: count) |> group(columns: ["tag"])', - time='30d', - chart_type='stackedbar+lines', - group_map={'30d': '30d'}, - strip=False, - ), - 'from(bucket: "measurement") |> range(start: -30d) |> ' - 'filter(fn: (r) => r["_measurement"] == "item") |> ' - 'aggregateWindow(every: 30d, fn: count) |> group(columns: ["tag"])', - ) - self.assertEqual( - timeseries_db._group_by( - 'from(bucket: "measurement") |> range(start: -1d) |> ' - 'filter(fn: (r) => r["_measurement"] == "item") |> ' - 'aggregateWindow(every: 1d, fn: count) |> group(columns: ["tag"])', - time='30d', - chart_type='stackedbar+lines', - group_map={'30d': '30d'}, - strip=True, - ), - 'from(bucket: "measurement") |> range(start: -30d) |> ' - 'filter(fn: (r) => r["_measurement"] == "item") |> ' - 'group(columns: ["tag"])', - ) - - def test_retention_policy(self): - manage_short_retention_policy() - manage_default_retention_policy() - rp = timeseries_db.get_list_retention_policies() - self.assertEqual(len(rp), 2) - self.assertEqual(rp[0].name, DEFAULT_RP) - self.assertEqual(rp[0].every_seconds, DEFAULT_RETENTION_POLICY) - self.assertEqual(rp[1].name, SHORT_RP) - self.assertEqual(rp[1].every_seconds, SHORT_RETENTION_POLICY) - - def test_query_set(self): - c = self._create_chart(configuration='histogram') - expected = ( - 'from(bucket: "{key}") |> range(start: {time}) ' - '|> filter(fn: (r) => r["_measurement"] == "{field_name}" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> aggregateWindow(every: {time}, fn: sum) ' - ) - self.assertEqual(c.query, expected) - self.assertEqual( - ''.join(timeseries_db.queries.default_chart_query[0:2]), c._default_query - ) - c.metric.object_id = None - self.assertEqual(timeseries_db.queries.default_chart_query[0], c._default_query) - - def test_read_order(self): - m = self._create_general_metric(name='dummy') - m.write(30) - m.write(40, time=now() - timedelta(days=2)) - with self.subTest('Test ascending read order'): - metric_data = m.read(limit=2, order='time') - self.assertEqual(metric_data[0]['value'], 40) - self.assertEqual(metric_data[1]['value'], 30) - with self.subTest('Test descending read order'): - metric_data = m.read(limit=2, order='-time') - self.assertEqual(metric_data[0]['value'], 30) - self.assertEqual(metric_data[1]['value'], 40) - with self.subTest('Test invalid read order'): - with self.assertRaises(timeseries_db.client_error) as e: - metric_data = m.read(limit=2, order='invalid') - self.assertIn('Invalid order "invalid" passed.', str(e)) - - def test_read_with_rp(self): - self._create_admin() - manage_short_retention_policy() - with self.subTest( - 'Test metric write on short retention_policy immediate alert' - ): - m = self._create_general_metric(name='dummy') - self._create_alert_settings( - metric=m, custom_operator='<', custom_threshold=1, custom_tolerance=0 - ) - m.write(0, retention_policy=SHORT_RP) - self.assertEqual(m.read(retention_policy=SHORT_RP)[0][m.field_name], 0) - m.refresh_from_db() - self.assertEqual(m.is_healthy, False) - self.assertEqual(m.is_healthy_tolerant, False) - self.assertEqual(Notification.objects.count(), 1) - with self.subTest( - 'Test metric write on short retention_policy with deferred alert' - ): - m2 = self._create_general_metric(name='dummy2') - self._create_alert_settings( - metric=m2, custom_operator='<', custom_threshold=1, custom_tolerance=1 - ) - m.write(0, retention_policy=SHORT_RP, time=now() - timedelta(minutes=2)) - self.assertEqual(m.read(retention_policy=SHORT_RP)[0][m.field_name], 0) - m.refresh_from_db() - self.assertEqual(m.is_healthy, False) - self.assertEqual(m.is_healthy_tolerant, False) - self.assertEqual(Notification.objects.count(), 1) - - def test_metric_write_microseconds_precision(self): - m = self._create_object_metric( - name='wlan0', key='wlan0', configuration='clients' - ) - m.write('00:14:5c:00:00:00', time=datetime(2020, 7, 31, 22, 5, 47, 235142)) - m.write('00:23:4a:00:00:00', time=datetime(2020, 7, 31, 22, 5, 47, 235152)) - self.assertEqual(len(m.read()), 2) - - @patch.object( - InfluxDBClient, 'write_api', side_effect=InfluxDBError('Server error') - ) - @capture_stderr() - def test_write_retry(self, mock_write): - with self.assertRaises(TimeseriesWriteException): - timeseries_db.write('test_write', {'value': 1}) - m = self._create_general_metric(name='Test metric') - with self.assertRaises(Retry): - m.write(1) - - @patch.object( - InfluxDBClient, - 'write_api', - side_effect=InfluxDBError( - content='{"error":"partial write: points beyond retention policy dropped=1"}', - code=400, - ), - ) - @capture_stderr() - def test_write_skip_retry_for_retention_policy(self, mock_write): - try: - timeseries_db.write('test_write', {'value': 1}) - except TimeseriesWriteException: - self.fail( - 'TimeseriesWriteException should not be raised when data ' - 'points crosses retention policy' - ) - m = self._create_general_metric(name='Test metric') - try: - m.write(1) - except Retry: - self.fail( - 'Writing metric should not be retried when data ' - 'points crosses retention policy' - ) - - @patch.object( - InfluxDBClient, 'write_api', side_effect=InfluxDBError('Server error') - ) - @capture_stderr() - def test_timeseries_write_params(self, mock_write): - with freeze_time('Jan 14th, 2020') as frozen_datetime: - m = self._create_general_metric(name='Test metric') - with self.assertRaises(Retry) as e: - m.write(1) - frozen_datetime.tick(delta=timedelta(minutes=10)) - self.assertEqual( - now(), datetime(2020, 1, 14, tzinfo=tz('UTC')) + timedelta(minutes=10) - ) - task_signature = e.exception.sig - with patch.object(timeseries_db, 'write') as mock_write: - self._retry_task(task_signature) - mock_write.assert_called_with( - 'test_metric', - {'value': 1}, - database=None, - retention_policy=None, - tags={}, - timestamp=datetime(2020, 1, 14, tzinfo=tz('UTC')).isoformat(), - current=False, - ) - - def _retry_task(self, task_signature): - task_kwargs = task_signature.kwargs - task_signature.type.run(**task_kwargs) - - @patch.object( - InfluxDBClient, 'query_api', side_effect=InfluxDBError('Server error') - ) - def test_retry_mechanism(self, mock_query): - max_retries = MONITORING_TIMESERIES_RETRY_OPTIONS.get('max_retries') - with patch('logging.Logger.info') as mocked_logger: - try: - self.test_get_query_fields_function() - except Exception: - pass - self.assertEqual(mocked_logger.call_count, max_retries) - mocked_logger.assert_called_with( - 'Error while executing method "query":\nServer error\n' - f'Attempt {max_retries} out of {max_retries}.\n' - ) - - -class TestDatabaseClientUdp(TestMonitoringMixin, TestCase): - def test_exceed_udp_packet_limit(self): - # InfluxDB 2.x does not use UDP for writing data, but this is kept - # for backward compatibility reference - timeseries_db.write( - 'test_udp_write', dict(value='O' * 66000), database=self.TEST_DB - ) - result = timeseries_db.query( - f'from(bucket: "{self.TEST_DB}") |> range(start: -1h) |> ' - f'filter(fn: (r) => r["_measurement"] == "test_udp_write")' - ) - measurement = list(result) - self.assertEqual(len(measurement), 1) diff --git a/openwisp_monitoring/db/backends/influxdb2/urls.py b/openwisp_monitoring/db/backends/influxdb2/urls.py deleted file mode 100644 index 4e8903d1..00000000 --- a/openwisp_monitoring/db/backends/influxdb2/urls.py +++ /dev/null @@ -1,13 +0,0 @@ -from django.urls import path -from . import views - -urlpatterns = [ - path('uptime/', views.display_uptime, name='display_uptime'), - path('packet_loss/', views.display_packet_loss, name='display_packet_loss'), - path('rtt/', views.display_rtt, name='display_rtt'), - path('wifi_clients/', views.display_wifi_clients, name='display_wifi_clients'), - path('traffic/', views.display_traffic, name='display_traffic'), - path('memory/', views.display_memory, name='display_memory'), - path('cpu/', views.display_cpu, name='display_cpu'), - path('disk/', views.display_disk, name='display_disk'), -] diff --git a/openwisp_monitoring/db/backends/influxdb2/views.py b/openwisp_monitoring/db/backends/influxdb2/views.py deleted file mode 100644 index 67d28527..00000000 --- a/openwisp_monitoring/db/backends/influxdb2/views.py +++ /dev/null @@ -1,111 +0,0 @@ -from django.shortcuts import render -from client import DatabaseClient -from queries import chart_query - -db_client = DatabaseClient() - -def display_uptime(request): - query = chart_query['uptime']['influxdb2'].format( - bucket=db_client.bucket, - content_type='your_content_type', - object_id='your_object_id' - ) - result = db_client.query(query) - data = [] - for table in result: - for record in table.records: - data.append(record.values) - return render(request, 'display_data.html', {'data': data}) - -def display_packet_loss(request): - query = chart_query['packet_loss']['influxdb2'].format( - bucket=db_client.bucket, - content_type='your_content_type', - object_id='your_object_id' - ) - result = db_client.query(query) - data = [] - for table in result: - for record in table.records: - data.append(record.values) - return render(request, 'display_data.html', {'data': data}) - -def display_rtt(request): - query = chart_query['rtt']['influxdb2'].format( - bucket=db_client.bucket, - content_type='your_content_type', - object_id='your_object_id' - ) - result = db_client.query(query) - data = [] - for table in result: - for record in table.records: - data.append(record.values) - return render(request, 'display_data.html', {'data': data}) - -def display_wifi_clients(request): - query = chart_query['wifi_clients']['influxdb2'].format( - bucket=db_client.bucket, - content_type='your_content_type', - object_id='your_object_id', - ifname='your_ifname' - ) - result = db_client.query(query) - data = [] - for table in result: - for record in table.records: - data.append(record.values) - return render(request, 'display_data.html', {'data': data}) - -def display_traffic(request): - query = chart_query['traffic']['influxdb2'].format( - bucket=db_client.bucket, - content_type='your_content_type', - object_id='your_object_id', - ifname='your_ifname' - ) - result = db_client.query(query) - data = [] - for table in result: - for record in table.records: - data.append(record.values) - return render(request, 'display_data.html', {'data': data}) - -def display_memory(request): - query = chart_query['memory']['influxdb2'].format( - bucket=db_client.bucket, - content_type='your_content_type', - object_id='your_object_id' - ) - result = db_client.query(query) - data = [] - for table in result: - for record in table.records: - data.append(record.values) - return render(request, 'display_data.html', {'data': data}) - -def display_cpu(request): - query = chart_query['cpu']['influxdb2'].format( - bucket=db_client.bucket, - content_type='your_content_type', - object_id='your_object_id' - ) - result = db_client.query(query) - data = [] - for table in result: - for record in table.records: - data.append(record.values) - return render(request, 'display_data.html', {'data': data}) - -def display_disk(request): - query = chart_query['disk']['influxdb2'].format( - bucket=db_client.bucket, - content_type='your_content_type', - object_id='your_object_id' - ) - result = db_client.query(query) - data = [] - for table in result: - for record in table.records: - data.append(record.values) - return render(request, 'display_data.html', {'data': data}) diff --git a/openwisp_monitoring/db/backends/influxdb2/write_data.py b/openwisp_monitoring/db/backends/influxdb2/write_data.py deleted file mode 100644 index 2bd5c29c..00000000 --- a/openwisp_monitoring/db/backends/influxdb2/write_data.py +++ /dev/null @@ -1,25 +0,0 @@ -import influxdb_client -from influxdb_client.client.write_api import SYNCHRONOUS - -bucket = "mybucket" -org = "myorg" -token = "t8Q3Y5mTWuqqTRdGyVxZuyVLO-8pl3I8KaNTR3jV7uTDr_GVECP5Z7LsrZwILGw79Xp4O8pAWkdqTREgIk073Q==" -url = "http://localhost:9086" - -client = influxdb_client.InfluxDBClient(url=url, token=token, org=org) - -write_api = client.write_api(write_options=SYNCHRONOUS) - -p = ( - influxdb_client.Point("my_measurement") - .tag("location", "Prague") - .field("temperature", 25.3) -) - -try: - write_api.write(bucket=bucket, org=org, record=p) - print("Data written successfully.") -except Exception as e: - print(f"Failed to write data: {e}") - -client.close() diff --git a/openwisp_monitoring/db/exceptions.py b/openwisp_monitoring/db/exceptions.py index 3296400a..3aef4d37 100644 --- a/openwisp_monitoring/db/exceptions.py +++ b/openwisp_monitoring/db/exceptions.py @@ -1,2 +1,6 @@ class TimeseriesWriteException(Exception): pass + + +class WriteError(Exception): + pass diff --git a/tests/docker-entrypoint.sh b/tests/docker-entrypoint.sh index cbd50bf7..20766cf4 100755 --- a/tests/docker-entrypoint.sh +++ b/tests/docker-entrypoint.sh @@ -6,7 +6,9 @@ create_superuser() { local password="$3" cat <