diff --git a/Dockerfile b/Dockerfile index 436180c3e..53db803dc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,25 +1,38 @@ FROM python:3.9.19-slim-bullseye +# Install system dependencies RUN apt update && \ apt install --yes zlib1g-dev libjpeg-dev gdal-bin libproj-dev \ libgeos-dev libspatialite-dev libsqlite3-mod-spatialite \ sqlite3 libsqlite3-dev openssl libssl-dev fping && \ rm -rf /var/lib/apt/lists/* /root/.cache/pip/* /tmp/* +# Upgrade pip and install Python dependencies RUN pip install -U pip setuptools wheel +# Copy and install project dependencies COPY requirements-test.txt requirements.txt /opt/openwisp/ RUN pip install -r /opt/openwisp/requirements.txt && \ pip install -r /opt/openwisp/requirements-test.txt && \ rm -rf /var/lib/apt/lists/* /root/.cache/pip/* /tmp/* +# Copy project files and install the project ADD . /opt/openwisp RUN pip install -U /opt/openwisp && \ rm -rf /var/lib/apt/lists/* /root/.cache/pip/* /tmp/* + +# Set working directory WORKDIR /opt/openwisp/tests/ + +# Set environment variables ENV NAME=openwisp-monitoring \ PYTHONBUFFERED=1 \ - INFLUXDB_HOST=influxdb \ + INFLUXDB1_HOST=influxdb \ + INFLUXDB2_HOST=influxdb2 \ REDIS_HOST=redis -CMD ["sh", "docker-entrypoint.sh"] + +# Expose the application port EXPOSE 8000 + +# Command to run the application +CMD ["sh", "docker-entrypoint.sh"] diff --git a/docker-compose.yml b/docker-compose.yml index a84213ddd..f98e72c59 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,6 +10,7 @@ services: - "8000:8000" depends_on: - influxdb + - influxdb2 - redis influxdb: @@ -28,6 +29,20 @@ services: INFLUXDB_USER: openwisp INFLUXDB_USER_PASSWORD: openwisp + influxdb2: + image: influxdb:2.0-alpine + volumes: + - influxdb2-data:/var/lib/influxdb2 + ports: + - "8087:8086" + environment: + DOCKER_INFLUXDB_INIT_MODE: setup + DOCKER_INFLUXDB_INIT_USERNAME: openwisp + DOCKER_INFLUXDB_INIT_PASSWORD: openwisp + DOCKER_INFLUXDB_INIT_ORG: openwisp + DOCKER_INFLUXDB_INIT_BUCKET: openwisp2 + DOCKER_INFLUXDB_INIT_RETENTION: 1w + redis: image: redis:5.0-alpine ports: @@ -36,3 +51,4 @@ services: volumes: influxdb-data: {} + influxdb2-data: {} diff --git a/openwisp_monitoring/db/backends/__init__.py b/openwisp_monitoring/db/backends/__init__.py index 715b1113c..5eeeea503 100644 --- a/openwisp_monitoring/db/backends/__init__.py +++ b/openwisp_monitoring/db/backends/__init__.py @@ -30,9 +30,16 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None): """ try: assert 'BACKEND' in TIMESERIES_DB, 'BACKEND' - assert 'USER' in TIMESERIES_DB, 'USER' - assert 'PASSWORD' in TIMESERIES_DB, 'PASSWORD' - assert 'NAME' in TIMESERIES_DB, 'NAME' + if 'BACKEND' in TIMESERIES_DB and '2' in TIMESERIES_DB['BACKEND']: + # InfluxDB 2.x specific checks + assert 'TOKEN' in TIMESERIES_DB, 'TOKEN' + assert 'ORG' in TIMESERIES_DB, 'ORG' + assert 'BUCKET' in TIMESERIES_DB, 'BUCKET' + else: + # InfluxDB 1.x specific checks + assert 'USER' in TIMESERIES_DB, 'USER' + assert 'PASSWORD' in TIMESERIES_DB, 'PASSWORD' + assert 'NAME' in TIMESERIES_DB, 'NAME' assert 'HOST' in TIMESERIES_DB, 'HOST' assert 'PORT' in TIMESERIES_DB, 'PORT' if module: @@ -48,7 +55,8 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None): except ImportError as e: # The database backend wasn't found. Display a helpful error message # listing all built-in database backends. - builtin_backends = ['influxdb'] + builtin_backends = ['influxdb', 'influxdb2'] + raise e if backend_name not in [ f'openwisp_monitoring.db.backends.{b}' for b in builtin_backends ]: diff --git a/openwisp_monitoring/db/backends/base.py b/openwisp_monitoring/db/backends/base.py new file mode 100644 index 000000000..da23e5282 --- /dev/null +++ b/openwisp_monitoring/db/backends/base.py @@ -0,0 +1,42 @@ +import logging + +from django.utils.functional import cached_property + +from openwisp_monitoring.utils import retry + +logger = logging.getLogger(__name__) + + +class BaseDatabaseClient: + def __init__(self, db_name=None): + self._db = None + self.db_name = db_name + + @cached_property + def db(self): + raise NotImplementedError("Subclasses must implement `db` method") + + @retry + def create_database(self): + raise NotImplementedError("Subclasses must implement `create_database` method") + + @retry + def drop_database(self): + raise NotImplementedError("Subclasses must implement `drop_database` method") + + @retry + def query(self, query): + raise NotImplementedError("Subclasses must implement `query` method") + + def write(self, name, values, **kwargs): + raise NotImplementedError("Subclasses must implement `write` method") + + def get_list_retention_policies(self, name=None): + raise NotImplementedError( + "Subclasses must implement `get_list_retention_policies` method" + ) + + def create_or_alter_retention_policy(self, name, duration): + raise NotImplementedError( + "Subclasses must implement `create_or_alter_retention_policy` method" + ) diff --git a/openwisp_monitoring/db/backends/influxdb2/client.py b/openwisp_monitoring/db/backends/influxdb2/client.py new file mode 100644 index 000000000..72e142534 --- /dev/null +++ b/openwisp_monitoring/db/backends/influxdb2/client.py @@ -0,0 +1,78 @@ +import logging + +from django.utils.functional import cached_property +from influxdb_client import InfluxDBClient, Point +from influxdb_client.client.exceptions import InfluxDBError +from influxdb_client.client.write_api import SYNCHRONOUS + +from openwisp_monitoring.utils import retry + +from ...exceptions import TimeseriesWriteException +from .. import TIMESERIES_DB +from ..base import BaseDatabaseClient + +logger = logging.getLogger(__name__) + + +class DatabaseClient(BaseDatabaseClient): + backend_name = 'influxdb2' + + def __init__(self, db_name=None): + super().__init__(db_name) + self.client_error = InfluxDBError + + @cached_property + def db(self): + return InfluxDBClient( + url=f"http://{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}", + token=TIMESERIES_DB['TOKEN'], + org=TIMESERIES_DB['ORG'], + bucket=self.db_name, + ) + + @retry + def create_database(self): + self.write_api = self.db.write_api(write_options=SYNCHRONOUS) + self.query_api = self.db.query_api() + logger.debug('Initialized APIs for InfluxDB 2.0') + + @retry + def drop_database(self): + pass # Implement as needed for InfluxDB 2.0 + + @retry + def query(self, query): + return self.query_api.query(query) + + def write(self, name, values, **kwargs): + point = Point(name).time(self._get_timestamp(kwargs.get('timestamp'))) + tags = kwargs.get('tags', {}) + for tag, value in tags.items(): + point.tag(tag, value) + for field, value in values.items(): + point.field(field, value) + try: + self.write_api.write(bucket=self.db_name, record=point) + except InfluxDBError as e: + raise TimeseriesWriteException(str(e)) + + @retry + def get_list_retention_policies(self, name=None): + bucket = self.db.buckets_api().find_bucket_by_name(name) + if bucket: + return bucket.retention_rules + return [] + + @retry + def create_or_alter_retention_policy(self, name, duration): + bucket = self.db.buckets_api().find_bucket_by_name(name) + retention_rules = [{"type": "expire", "everySeconds": duration}] + if bucket: + bucket.retention_rules = retention_rules + self.db.buckets_api().update_bucket(bucket=bucket) + else: + self.db.buckets_api().create_bucket( + bucket_name=name, + retention_rules=retention_rules, + org=TIMESERIES_DB["ORG"], + ) diff --git a/openwisp_monitoring/db/backends/influxdb2/queries.py b/openwisp_monitoring/db/backends/influxdb2/queries.py new file mode 100644 index 000000000..216ad1cf8 --- /dev/null +++ b/openwisp_monitoring/db/backends/influxdb2/queries.py @@ -0,0 +1,277 @@ +chart_query = { + 'uptime': { + 'influxdb2': ( + 'from(bucket: "{key}") ' + '|> range(start: {time}{end_date}) ' + '|> filter(fn: (r) => r["_measurement"] == "{field_name}" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mean() ' + '|> map(fn: (r) => ({ r with uptime: r._value * 100 }))' + ) + }, + 'packet_loss': { + 'influxdb2': ( + 'from(bucket: "{key}") ' + '|> range(start: {time}{end_date}) ' + '|> filter(fn: (r) => r["_measurement"] == "loss" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mean()' + ) + }, + 'rtt': { + 'influxdb2': ( + 'from(bucket: "{key}") ' + '|> range(start: {time}{end_date}) ' + '|> filter(fn: (r) => r["_measurement"] == "rtt_avg" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mean() ' + '|> yield(name: "RTT_average") ' + 'from(bucket: "{key}") ' + '|> range(start: {time}{end_date}) ' + '|> filter(fn: (r) => r["_measurement"] == "rtt_max" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mean() ' + '|> yield(name: "RTT_max") ' + 'from(bucket: "{key}") ' + '|> range(start: {time}{end_date}) ' + '|> filter(fn: (r) => r["_measurement"] == "rtt_min" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mean() ' + '|> yield(name: "RTT_min")' + ) + }, + 'wifi_clients': { + 'influxdb2': ( + 'from(bucket: "{key}") ' + '|> range(start: {time}{end_date}) ' + '|> filter(fn: (r) => r["_measurement"] == "{field_name}" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}" and ' + 'r["ifname"] == "{ifname}") ' + '|> distinct() ' + '|> count()' + ) + }, + 'general_wifi_clients': { + 'influxdb2': ( + 'from(bucket: "{key}") ' + '|> range(start: {time}{end_date}) ' + '|> filter(fn: (r) => r["_measurement"] == "{field_name}"' + '{organization_id}{location_id}{floorplan_id}) ' + '|> distinct() ' + '|> count()' + ) + }, + 'traffic': { + 'influxdb2': ( + 'from(bucket: "{key}") ' + '|> range(start: {time}{end_date}) ' + '|> filter(fn: (r) => r["_measurement"] == "tx_bytes" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}" and ' + 'r["ifname"] == "{ifname}") ' + '|> sum() ' + '|> map(fn: (r) => ({ r with upload: r._value / 1000000000 })) ' + '|> yield(name: "upload") ' + 'from(bucket: "{key}") ' + '|> range(start: {time}{end_date}) ' + '|> filter(fn: (r) => r["_measurement"] == "rx_bytes" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}" and ' + 'r["ifname"] == "{ifname}") ' + '|> sum() ' + '|> map(fn: (r) => ({ r with download: r._value / 1000000000 })) ' + '|> yield(name: "download")' + ) + }, + 'general_traffic': { + 'influxdb2': ( + 'from(bucket: "{key}") ' + '|> range(start: {time}{end_date}) ' + '|> filter(fn: (r) => r["_measurement"] == "tx_bytes"{organization_id}' + '{location_id}{floorplan_id}{ifname}) ' + '|> sum() ' + '|> map(fn: (r) => ({ r with upload: r._value / 1000000000 })) ' + '|> yield(name: "upload") ' + 'from(bucket: "{key}") ' + '|> range(start: {time}{end_date}) ' + '|> filter(fn: (r) => r["_measurement"] == "rx_bytes"{organization_id}' + '{location_id}{floorplan_id}{ifname}) ' + '|> sum() ' + '|> map(fn: (r) => ({ r with download: r._value / 1000000000 })) ' + '|> yield(name: "download")' + ) + }, + 'memory': { + 'influxdb2': ( + 'from(bucket: "{key}") ' + '|> range(start: {time}{end_date}) ' + '|> filter(fn: (r) => r["_measurement"] == "percent_used" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mean() ' + '|> map(fn: (r) => ({ r with memory_usage: r._value }))' + ) + }, + 'cpu': { + 'influxdb2': ( + 'from(bucket: "{key}") ' + '|> range(start: {time}{end_date}) ' + '|> filter(fn: (r) => r["_measurement"] == "cpu_usage" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mean() ' + '|> map(fn: (r) => ({ r with CPU_load: r._value }))' + ) + }, + 'disk': { + 'influxdb2': ( + 'from(bucket: "{key}") ' + '|> range(start: {time}{end_date}) ' + '|> filter(fn: (r) => r["_measurement"] == "used_disk" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mean() ' + '|> map(fn: (r) => ({ r with disk_usage: r._value }))' + ) + }, + 'signal_strength': { + 'influxdb2': ( + 'from(bucket: "{key}") ' + '|> range(start: {time}{end_date}) ' + '|> filter(fn: (r) => r["_measurement"] == "signal_strength" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mean() ' + '|> map(fn: (r) => ({ r with signal_strength: round(r._value) })) ' + '|> yield(name: "signal_strength") ' + 'from(bucket: "{key}") ' + '|> range(start: {time}{end_date}) ' + '|> filter(fn: (r) => r["_measurement"] == "signal_power" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mean() ' + '|> map(fn: (r) => ({ r with signal_power: round(r._value) })) ' + '|> yield(name: "signal_power")' + ) + }, + 'signal_quality': { + 'influxdb2': ( + 'from(bucket: "{key}") ' + '|> range(start: {time}{end_date}) ' + '|> filter(fn: (r) => r["_measurement"] == "signal_quality" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mean() ' + '|> map(fn: (r) => ({ r with signal_quality: round(r._value) })) ' + '|> yield(name: "signal_quality") ' + 'from(bucket: "{key}") ' + '|> range(start: {time}{end_date}) ' + '|> filter(fn: (r) => r["_measurement"] == "snr" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mean() ' + '|> map(fn: (r) => ({ r with signal_to_noise_ratio: round(r._value) })) ' + '|> yield(name: "signal_to_noise_ratio")' + ) + }, + 'access_tech': { + 'influxdb2': ( + 'from(bucket: "{key}") ' + '|> range(start: {time}{end_date}) ' + '|> filter(fn: (r) => r["_measurement"] == "access_tech" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mode() ' + '|> map(fn: (r) => ({ r with access_tech: r._value }))' + ) + }, + 'bandwidth': { + 'influxdb2': ( + 'from(bucket: "{key}") ' + '|> range(start: {time}) ' + '|> filter(fn: (r) => r["_measurement"] == "sent_bps_tcp" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mean() ' + '|> map(fn: (r) => ({ r with TCP: r._value / 1000000000 })) ' + '|> yield(name: "TCP") ' + 'from(bucket: "{key}") ' + '|> range(start: {time}) ' + '|> filter(fn: (r) => r["_measurement"] == "sent_bps_udp" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mean() ' + '|> map(fn: (r) => ({ r with UDP: r._value / 1000000000 })) ' + '|> yield(name: "UDP")' + ) + }, + 'transfer': { + 'influxdb2': ( + 'from(bucket: "{key}") ' + '|> range(start: {time}) ' + '|> filter(fn: (r) => r["_measurement"] == "sent_bytes_tcp" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> sum() ' + '|> map(fn: (r) => ({ r with TCP: r._value / 1000000000 })) ' + '|> yield(name: "TCP") ' + 'from(bucket: "{key}") ' + '|> range(start: {time}) ' + '|> filter(fn: (r) => r["_measurement"] == "sent_bytes_udp" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> sum() ' + '|> map(fn: (r) => ({ r with UDP: r._value / 1000000000 })) ' + '|> yield(name: "UDP")' + ) + }, + 'retransmits': { + 'influxdb2': ( + 'from(bucket: "{key}") ' + '|> range(start: {time}) ' + '|> filter(fn: (r) => r["_measurement"] == "retransmits" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mean() ' + '|> map(fn: (r) => ({ r with retransmits: r._value }))' + ) + }, + 'jitter': { + 'influxdb2': ( + 'from(bucket: "{key}") ' + '|> range(start: {time}) ' + '|> filter(fn: (r) => r["_measurement"] == "jitter" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mean() ' + '|> map(fn: (r) => ({ r with jitter: r._value }))' + ) + }, + 'datagram': { + 'influxdb2': ( + 'from(bucket: "{key}") ' + '|> range(start: {time}) ' + '|> filter(fn: (r) => r["_measurement"] == "lost_packets" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mean() ' + '|> map(fn: (r) => ({ r with lost_datagram: r._value })) ' + '|> yield(name: "lost_datagram") ' + 'from(bucket: "{key}") ' + '|> range(start: {time}) ' + '|> filter(fn: (r) => r["_measurement"] == "total_packets" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mean() ' + '|> map(fn: (r) => ({ r with total_datagram: r._value })) ' + '|> yield(name: "total_datagram")' + ) + }, + 'datagram_loss': { + 'influxdb2': ( + 'from(bucket: "{key}") ' + '|> range(start: {time}) ' + '|> filter(fn: (r) => r["_measurement"] == "lost_percent" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mean() ' + '|> map(fn: (r) => ({ r with datagram_loss: r._value }))' + ) + }, +} + +default_chart_query = [ + 'from(bucket: "{key}") |> range(start: {time}{end_date}) ', + ( + '|> filter(fn: (r) => r["_measurement"] == "{field_name}" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}")' + ), +] + +device_data_query = ( + 'from(bucket: "{0}") |> range(start: 0) ' + '|> filter(fn: (r) => r["_measurement"] == "{1}" and r["pk"] == "{2}") ' + '|> sort(columns: ["_time"], desc: true) ' + '|> limit(n: 1)' +) diff --git a/openwisp_monitoring/db/backends/influxdb2/tests.py b/openwisp_monitoring/db/backends/influxdb2/tests.py new file mode 100644 index 000000000..eb3cd5124 --- /dev/null +++ b/openwisp_monitoring/db/backends/influxdb2/tests.py @@ -0,0 +1,433 @@ +from datetime import datetime, timedelta +from unittest.mock import patch + +from celery.exceptions import Retry +from django.core.exceptions import ValidationError +from django.test import TestCase, tag +from django.utils.timezone import now +from freezegun import freeze_time +from influxdb_client import InfluxDBClient +from influxdb_client.client.exceptions import InfluxDBError +from pytz import timezone as tz +from swapper import load_model + +from openwisp_monitoring.device.settings import ( + DEFAULT_RETENTION_POLICY, + SHORT_RETENTION_POLICY, +) +from openwisp_monitoring.device.utils import ( + DEFAULT_RP, + SHORT_RP, + manage_default_retention_policy, + manage_short_retention_policy, +) +from openwisp_monitoring.monitoring.tests import TestMonitoringMixin +from openwisp_monitoring.settings import MONITORING_TIMESERIES_RETRY_OPTIONS +from openwisp_utils.tests import capture_stderr + +from ...exceptions import TimeseriesWriteException +from .. import timeseries_db + +Chart = load_model('monitoring', 'Chart') +Notification = load_model('openwisp_notifications', 'Notification') + + +@tag('timeseries_client') +class TestDatabaseClient(TestMonitoringMixin, TestCase): + def test_forbidden_queries(self): + queries = [ + 'DROP DATABASE openwisp2', + 'DROP MEASUREMENT test_metric', + 'CREATE DATABASE test', + 'DELETE MEASUREMENT test_metric', + 'ALTER RETENTION POLICY policy', + 'SELECT * INTO metric2 FROM test_metric', + ] + for q in queries: + try: + timeseries_db.validate_query(q) + except ValidationError as e: + self.assertIn('configuration', e.message_dict) + else: + self.fail('ValidationError not raised') + + def test_get_custom_query(self): + c = self._create_chart(test_data=None) + custom_q = c._default_query.replace('{field_name}', '{fields}') + q = c.get_query(query=custom_q, fields=['SUM(*)']) + self.assertIn('|> sum()', q) + + def test_is_aggregate_bug(self): + m = self._create_object_metric(name='summary_avg') + c = Chart(metric=m, configuration='dummy') + self.assertFalse(timeseries_db._is_aggregate(c.query)) + + def test_is_aggregate_fields_function(self): + m = self._create_object_metric(name='is_aggregate_func') + c = Chart(metric=m, configuration='uptime') + self.assertTrue(timeseries_db._is_aggregate(c.query)) + + def test_get_query_fields_function(self): + c = self._create_chart(test_data=None, configuration='histogram') + q = c.get_query(fields=['ssh', 'http2', 'apple-music']) + expected = ( + '|> sum(column: "ssh") ' + '|> sum(column: "http2") ' + '|> sum(column: "apple-music")' + ) + self.assertIn(expected, q) + + def test_default_query(self): + c = self._create_chart(test_data=False) + q = ( + 'from(bucket: "{key}") |> range(start: {time}) ' + '|> filter(fn: (r) => r["_measurement"] == "{field_name}" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}")' + ) + self.assertEqual(c.query, q) + + def test_write(self): + timeseries_db.write('test_write', dict(value=2), database=self.TEST_DB) + result = timeseries_db.query( + f'from(bucket: "{self.TEST_DB}") |> range(start: -1h) |> ' + f'filter(fn: (r) => r["_measurement"] == "test_write")' + ) + measurement = list(result)[0] + self.assertEqual(measurement['_value'], 2) + + def test_general_write(self): + m = self._create_general_metric(name='Sync test') + m.write(1) + result = timeseries_db.query( + f'from(bucket: "{self.TEST_DB}") |> range(start: -1h) |> ' + f'filter(fn: (r) => r["_measurement"] == "sync_test")' + ) + measurement = list(result)[0] + self.assertEqual(measurement['_value'], 1) + + def test_object_write(self): + om = self._create_object_metric() + om.write(3) + content_type = '.'.join(om.content_type.natural_key()) + result = timeseries_db.query( + f'from(bucket: "{self.TEST_DB}") |> range(start: -1h) |> ' + f'filter(fn: (r) => r["_measurement"] == "test_metric" and r["object_id"] == "{om.object_id}" ' + f'and r["content_type"] == "{content_type}")' + ) + measurement = list(result)[0] + self.assertEqual(measurement['_value'], 3) + + def test_general_same_key_different_fields(self): + down = self._create_general_metric( + name='traffic (download)', key='traffic', field_name='download' + ) + down.write(200) + up = self._create_general_metric( + name='traffic (upload)', key='traffic', field_name='upload' + ) + up.write(100) + result = timeseries_db.query( + f'from(bucket: "{self.TEST_DB}") |> range(start: -1h) |> ' + f'filter(fn: (r) => r["_measurement"] == "traffic")' + ) + measurements = list(result) + download_measurement = next( + m for m in measurements if m['_field'] == 'download' + ) + upload_measurement = next(m for m in measurements if m['_field'] == 'upload') + self.assertEqual(download_measurement['_value'], 200) + self.assertEqual(upload_measurement['_value'], 100) + + def test_object_same_key_different_fields(self): + user = self._create_user() + user_down = self._create_object_metric( + name='traffic (download)', + key='traffic', + field_name='download', + content_object=user, + ) + user_down.write(200) + user_up = self._create_object_metric( + name='traffic (upload)', + key='traffic', + field_name='upload', + content_object=user, + ) + user_up.write(100) + content_type = '.'.join(user_down.content_type.natural_key()) + result = timeseries_db.query( + f'from(bucket: "{self.TEST_DB}") |> range(start: -1h) |> ' + f'filter(fn: (r) => r["_measurement"] == "traffic" and ' + f'r["object_id"] == "{user_down.object_id}" and r["content_type"] == "{content_type}")' + ) + measurements = list(result) + download_measurement = next( + m for m in measurements if m['_field'] == 'download' + ) + upload_measurement = next(m for m in measurements if m['_field'] == 'upload') + self.assertEqual(download_measurement['_value'], 200) + self.assertEqual(upload_measurement['_value'], 100) + + def test_delete_metric_data(self): + m = self._create_general_metric(name='test_metric') + m.write(100) + self.assertEqual(m.read()[0]['value'], 100) + timeseries_db.delete_metric_data(key=m.key) + self.assertEqual(m.read(), []) + om = self._create_object_metric(name='dummy') + om.write(50) + m.write(100) + self.assertEqual(m.read()[0]['value'], 100) + self.assertEqual(om.read()[0]['value'], 50) + timeseries_db.delete_metric_data() + self.assertEqual(m.read(), []) + self.assertEqual(om.read(), []) + + def test_get_query_1d(self): + c = self._create_chart(test_data=None, configuration='uptime') + q = c.get_query(time='1d') + last24 = now() - timedelta(days=1) + self.assertIn(str(last24)[0:14], q) + self.assertIn('|> aggregateWindow(every: 10m, fn: mean)', q) + + def test_get_query_30d(self): + c = self._create_chart(test_data=None, configuration='uptime') + q = c.get_query(time='30d') + last30d = now() - timedelta(days=30) + self.assertIn(str(last30d)[0:10], q) + self.assertIn('|> aggregateWindow(every: 24h, fn: mean)', q) + + def test_group_by_tags(self): + self.assertEqual( + timeseries_db._group_by( + 'from(bucket: "measurement") |> range(start: -1d) |> ' + 'filter(fn: (r) => r["_measurement"] == "item") |> ' + 'aggregateWindow(every: 1d, fn: count)', + time='30d', + chart_type='stackedbar+lines', + group_map={'30d': '30d'}, + strip=False, + ), + 'from(bucket: "measurement") |> range(start: -30d) |> ' + 'filter(fn: (r) => r["_measurement"] == "item") |> ' + 'aggregateWindow(every: 30d, fn: count)', + ) + self.assertEqual( + timeseries_db._group_by( + 'from(bucket: "measurement") |> range(start: -1d) |> ' + 'filter(fn: (r) => r["_measurement"] == "item") |> ' + 'aggregateWindow(every: 1d, fn: count)', + time='30d', + chart_type='stackedbar+lines', + group_map={'30d': '30d'}, + strip=True, + ), + 'from(bucket: "measurement") |> range(start: -30d) |> ' + 'filter(fn: (r) => r["_measurement"] == "item")', + ) + self.assertEqual( + timeseries_db._group_by( + 'from(bucket: "measurement") |> range(start: -1d) |> ' + 'filter(fn: (r) => r["_measurement"] == "item") |> ' + 'aggregateWindow(every: 1d, fn: count) |> group(columns: ["tag"])', + time='30d', + chart_type='stackedbar+lines', + group_map={'30d': '30d'}, + strip=False, + ), + 'from(bucket: "measurement") |> range(start: -30d) |> ' + 'filter(fn: (r) => r["_measurement"] == "item") |> ' + 'aggregateWindow(every: 30d, fn: count) |> group(columns: ["tag"])', + ) + self.assertEqual( + timeseries_db._group_by( + 'from(bucket: "measurement") |> range(start: -1d) |> ' + 'filter(fn: (r) => r["_measurement"] == "item") |> ' + 'aggregateWindow(every: 1d, fn: count) |> group(columns: ["tag"])', + time='30d', + chart_type='stackedbar+lines', + group_map={'30d': '30d'}, + strip=True, + ), + 'from(bucket: "measurement") |> range(start: -30d) |> ' + 'filter(fn: (r) => r["_measurement"] == "item") |> ' + 'group(columns: ["tag"])', + ) + + def test_retention_policy(self): + manage_short_retention_policy() + manage_default_retention_policy() + rp = timeseries_db.get_list_retention_policies() + self.assertEqual(len(rp), 2) + self.assertEqual(rp[0].name, DEFAULT_RP) + self.assertEqual(rp[0].every_seconds, DEFAULT_RETENTION_POLICY) + self.assertEqual(rp[1].name, SHORT_RP) + self.assertEqual(rp[1].every_seconds, SHORT_RETENTION_POLICY) + + def test_query_set(self): + c = self._create_chart(configuration='histogram') + expected = ( + 'from(bucket: "{key}") |> range(start: {time}) ' + '|> filter(fn: (r) => r["_measurement"] == "{field_name}" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> aggregateWindow(every: {time}, fn: sum) ' + ) + self.assertEqual(c.query, expected) + self.assertEqual( + ''.join(timeseries_db.queries.default_chart_query[0:2]), c._default_query + ) + c.metric.object_id = None + self.assertEqual(timeseries_db.queries.default_chart_query[0], c._default_query) + + def test_read_order(self): + m = self._create_general_metric(name='dummy') + m.write(30) + m.write(40, time=now() - timedelta(days=2)) + with self.subTest('Test ascending read order'): + metric_data = m.read(limit=2, order='time') + self.assertEqual(metric_data[0]['value'], 40) + self.assertEqual(metric_data[1]['value'], 30) + with self.subTest('Test descending read order'): + metric_data = m.read(limit=2, order='-time') + self.assertEqual(metric_data[0]['value'], 30) + self.assertEqual(metric_data[1]['value'], 40) + with self.subTest('Test invalid read order'): + with self.assertRaises(timeseries_db.client_error) as e: + metric_data = m.read(limit=2, order='invalid') + self.assertIn('Invalid order "invalid" passed.', str(e)) + + def test_read_with_rp(self): + self._create_admin() + manage_short_retention_policy() + with self.subTest( + 'Test metric write on short retention_policy immediate alert' + ): + m = self._create_general_metric(name='dummy') + self._create_alert_settings( + metric=m, custom_operator='<', custom_threshold=1, custom_tolerance=0 + ) + m.write(0, retention_policy=SHORT_RP) + self.assertEqual(m.read(retention_policy=SHORT_RP)[0][m.field_name], 0) + m.refresh_from_db() + self.assertEqual(m.is_healthy, False) + self.assertEqual(m.is_healthy_tolerant, False) + self.assertEqual(Notification.objects.count(), 1) + with self.subTest( + 'Test metric write on short retention_policy with deferred alert' + ): + m2 = self._create_general_metric(name='dummy2') + self._create_alert_settings( + metric=m2, custom_operator='<', custom_threshold=1, custom_tolerance=1 + ) + m.write(0, retention_policy=SHORT_RP, time=now() - timedelta(minutes=2)) + self.assertEqual(m.read(retention_policy=SHORT_RP)[0][m.field_name], 0) + m.refresh_from_db() + self.assertEqual(m.is_healthy, False) + self.assertEqual(m.is_healthy_tolerant, False) + self.assertEqual(Notification.objects.count(), 1) + + def test_metric_write_microseconds_precision(self): + m = self._create_object_metric( + name='wlan0', key='wlan0', configuration='clients' + ) + m.write('00:14:5c:00:00:00', time=datetime(2020, 7, 31, 22, 5, 47, 235142)) + m.write('00:23:4a:00:00:00', time=datetime(2020, 7, 31, 22, 5, 47, 235152)) + self.assertEqual(len(m.read()), 2) + + @patch.object( + InfluxDBClient, 'write_api', side_effect=InfluxDBError('Server error') + ) + @capture_stderr() + def test_write_retry(self, mock_write): + with self.assertRaises(TimeseriesWriteException): + timeseries_db.write('test_write', {'value': 1}) + m = self._create_general_metric(name='Test metric') + with self.assertRaises(Retry): + m.write(1) + + @patch.object( + InfluxDBClient, + 'write_api', + side_effect=InfluxDBError( + content='{"error":"partial write: points beyond retention policy dropped=1"}', + code=400, + ), + ) + @capture_stderr() + def test_write_skip_retry_for_retention_policy(self, mock_write): + try: + timeseries_db.write('test_write', {'value': 1}) + except TimeseriesWriteException: + self.fail( + 'TimeseriesWriteException should not be raised when data ' + 'points crosses retention policy' + ) + m = self._create_general_metric(name='Test metric') + try: + m.write(1) + except Retry: + self.fail( + 'Writing metric should not be retried when data ' + 'points crosses retention policy' + ) + + @patch.object( + InfluxDBClient, 'write_api', side_effect=InfluxDBError('Server error') + ) + @capture_stderr() + def test_timeseries_write_params(self, mock_write): + with freeze_time('Jan 14th, 2020') as frozen_datetime: + m = self._create_general_metric(name='Test metric') + with self.assertRaises(Retry) as e: + m.write(1) + frozen_datetime.tick(delta=timedelta(minutes=10)) + self.assertEqual( + now(), datetime(2020, 1, 14, tzinfo=tz('UTC')) + timedelta(minutes=10) + ) + task_signature = e.exception.sig + with patch.object(timeseries_db, 'write') as mock_write: + self._retry_task(task_signature) + mock_write.assert_called_with( + 'test_metric', + {'value': 1}, + database=None, + retention_policy=None, + tags={}, + timestamp=datetime(2020, 1, 14, tzinfo=tz('UTC')).isoformat(), + current=False, + ) + + def _retry_task(self, task_signature): + task_kwargs = task_signature.kwargs + task_signature.type.run(**task_kwargs) + + @patch.object( + InfluxDBClient, 'query_api', side_effect=InfluxDBError('Server error') + ) + def test_retry_mechanism(self, mock_query): + max_retries = MONITORING_TIMESERIES_RETRY_OPTIONS.get('max_retries') + with patch('logging.Logger.info') as mocked_logger: + try: + self.test_get_query_fields_function() + except Exception: + pass + self.assertEqual(mocked_logger.call_count, max_retries) + mocked_logger.assert_called_with( + 'Error while executing method "query":\nServer error\n' + f'Attempt {max_retries} out of {max_retries}.\n' + ) + + +class TestDatabaseClientUdp(TestMonitoringMixin, TestCase): + def test_exceed_udp_packet_limit(self): + # InfluxDB 2.x does not use UDP for writing data, but this test is kept + # for backward compatibility reference + timeseries_db.write( + 'test_udp_write', dict(value='O' * 66000), database=self.TEST_DB + ) + result = timeseries_db.query( + f'from(bucket: "{self.TEST_DB}") |> range(start: -1h) |> ' + f'filter(fn: (r) => r["_measurement"] == "test_udp_write")' + ) + measurement = list(result) + self.assertEqual(len(measurement), 1) diff --git a/openwisp_monitoring/monitoring/tests/__init__.py b/openwisp_monitoring/monitoring/tests/__init__.py index 8f50774e1..eb3e3243c 100644 --- a/openwisp_monitoring/monitoring/tests/__init__.py +++ b/openwisp_monitoring/monitoring/tests/__init__.py @@ -91,7 +91,13 @@ "SELECT {fields|SUM|/ 1} FROM {key} " "WHERE time >= '{time}' AND content_type = " "'{content_type}' AND object_id = '{object_id}'" - ) + ), + 'influxdb2': ( + 'from(bucket: "{key}") |> range(start: {time}) ' + '|> filter(fn: (r) => r["_measurement"] == "{field_name}" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> sum()' + ), }, }, 'dummy': { @@ -108,7 +114,7 @@ 'description': 'Bugged chart for testing purposes.', 'unit': 'bugs', 'order': 999, - 'query': {'influxdb': "BAD"}, + 'query': {'influxdb': "BAD", 'influxdb2': "BAD"}, }, 'default': { 'type': 'line', @@ -120,7 +126,12 @@ 'influxdb': ( "SELECT {field_name} FROM {key} WHERE time >= '{time}' AND " "content_type = '{content_type}' AND object_id = '{object_id}'" - ) + ), + 'influxdb2': ( + 'from(bucket: "{key}") |> range(start: {time}) ' + '|> filter(fn: (r) => r["_measurement"] == "{field_name}" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}")' + ), }, }, 'multiple_test': { @@ -133,26 +144,43 @@ 'influxdb': ( "SELECT {field_name}, value2 FROM {key} WHERE time >= '{time}' AND " "content_type = '{content_type}' AND object_id = '{object_id}'" - ) + ), + 'influxdb2': ( + 'from(bucket: "{key}") |> range(start: {time}) ' + '|> filter(fn: (r) => r["_measurement"] == "{field_name}" or ' + 'r["_measurement"] == "value2" and ' + 'r["content_type"] == "{content_type}" and ' + 'r["object_id"] == "{object_id}")' + ), }, }, 'group_by_tag': { 'type': 'stackedbars', 'title': 'Group by tag', - 'description': 'Query is groupped by tag along with time', + 'description': 'Query is grouped by tag along with time', 'unit': 'n.', 'order': 999, 'query': { 'influxdb': ( "SELECT CUMULATIVE_SUM(SUM({field_name})) FROM {key} WHERE time >= '{time}'" " GROUP BY time(1d), metric_num" - ) + ), + 'influxdb2': ( + 'from(bucket: "{key}") |> range(start: {time}) ' + '|> filter(fn: (r) => r["_measurement"] == "{field_name}") ' + '|> group(columns: ["metric_num"]) |> sum() |> cumulativeSum() |> window(every: 1d)' + ), }, 'summary_query': { 'influxdb': ( "SELECT SUM({field_name}) FROM {key} WHERE time >= '{time}'" " GROUP BY time(30d), metric_num" - ) + ), + 'influxdb2': ( + 'from(bucket: "{key}") |> range(start: {time}) ' + '|> filter(fn: (r) => r["_measurement"] == "{field_name}") ' + '|> group(columns: ["metric_num"]) |> sum() |> window(every: 30d)' + ), }, }, 'mean_test': { @@ -165,7 +193,13 @@ 'influxdb': ( "SELECT MEAN({field_name}) AS {field_name} FROM {key} WHERE time >= '{time}' AND " "content_type = '{content_type}' AND object_id = '{object_id}'" - ) + ), + 'influxdb2': ( + 'from(bucket: "{key}") |> range(start: {time}) ' + '|> filter(fn: (r) => r["_measurement"] == "{field_name}" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mean()' + ), }, }, 'sum_test': { @@ -178,7 +212,13 @@ 'influxdb': ( "SELECT SUM({field_name}) AS {field_name} FROM {key} WHERE time >= '{time}' AND " "content_type = '{content_type}' AND object_id = '{object_id}'" - ) + ), + 'influxdb2': ( + 'from(bucket: "{key}") |> range(start: {time}) ' + '|> filter(fn: (r) => r["_measurement"] == "{field_name}" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> sum()' + ), }, }, 'top_fields_mean': { @@ -192,7 +232,13 @@ "SELECT {fields|MEAN} FROM {key} " "WHERE time >= '{time}' AND content_type = " "'{content_type}' AND object_id = '{object_id}'" - ) + ), + 'influxdb2': ( + 'from(bucket: "{key}") |> range(start: {time}) ' + '|> filter(fn: (r) => r["_measurement"] == "{field_name}" and ' + 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' + '|> mean()' + ), }, }, } diff --git a/requirements.txt b/requirements.txt index 90feaac61..cc87d6f08 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ django-nested-admin~=4.0.2 netaddr~=0.8.0 python-dateutil>=2.7.0,<3.0.0 openwisp-utils[rest] @ https://github.com/openwisp/openwisp-utils/tarball/master +influxdb-client~=1.21.0 diff --git a/setup.py b/setup.py index 43ca4bb97..3b935de4b 100755 --- a/setup.py +++ b/setup.py @@ -55,6 +55,10 @@ def get_install_requires(): include_package_data=True, zip_safe=False, install_requires=get_install_requires(), + extras_require={ + 'influxdb': ['influxdb>=5.2,<5.3'], + 'influxdb2': ['influxdb-client>=1.17.0,<2.0.0'], + }, classifiers=[ 'Development Status :: 3 - Alpha', 'Environment :: Web Environment', @@ -64,7 +68,10 @@ def get_install_requires(): 'License :: OSI Approved :: GNU General Public License v3 (GPLv3)', 'Operating System :: OS Independent', 'Framework :: Django', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10', ], ) diff --git a/tests/openwisp2/settings.py b/tests/openwisp2/settings.py index c7772d5cd..3c366a62b 100644 --- a/tests/openwisp2/settings.py +++ b/tests/openwisp2/settings.py @@ -21,16 +21,30 @@ } } -TIMESERIES_DATABASE = { +INFLUXDB_1x_DATABASE = { 'BACKEND': 'openwisp_monitoring.db.backends.influxdb', 'USER': 'openwisp', 'PASSWORD': 'openwisp', 'NAME': 'openwisp2', 'HOST': os.getenv('INFLUXDB_HOST', 'localhost'), 'PORT': '8086', - # UDP writes are disabled by default 'OPTIONS': {'udp_writes': False, 'udp_port': 8089}, } + +INFLUXDB_2x_DATABASE = { + 'BACKEND': 'openwisp_monitoring.db.backends.influxdb2', + 'TOKEN': 'your-influxdb-2.0-token', + 'ORG': 'your-org', + 'BUCKET': 'your-bucket', + 'HOST': os.getenv('INFLUXDB2_HOST', 'localhost'), + 'PORT': '8087', +} + +if os.environ.get('USE_INFLUXDB2', False): + TIMESERIES_DATABASE = INFLUXDB_2x_DATABASE +else: + TIMESERIES_DATABASE = INFLUXDB_1x_DATABASE + if TESTING: if os.environ.get('TIMESERIES_UDP', False): TIMESERIES_DATABASE['OPTIONS'] = {'udp_writes': True, 'udp_port': 8091}