From 2699387152959f956500dfb2ec5d51292f1d71f8 Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Thu, 16 Dec 2021 10:46:31 -0500 Subject: [PATCH 01/13] Fix TSA helper causing 500 error There is was code running that still referenced the now removed TSA catalog database. This results in a 500 response being return for certain operations and indicated in #537 and #538 --- src/dataloaderinterface/models.py | 4 +- src/dataloaderinterface/signals.py | 41 +++++++++++-------- .../templatetags/helpers.py | 2 +- src/tsa/{helpers.py => helpers_deprecated.py} | 0 4 files changed, 26 insertions(+), 21 deletions(-) rename src/tsa/{helpers.py => helpers_deprecated.py} (100%) diff --git a/src/dataloaderinterface/models.py b/src/dataloaderinterface/models.py index bb26006d..47da30b9 100644 --- a/src/dataloaderinterface/models.py +++ b/src/dataloaderinterface/models.py @@ -13,7 +13,6 @@ Unit, Medium, Organization from dataloaderinterface.querysets import SiteRegistrationQuerySet, SensorOutputQuerySet - class SiteRegistration(models.Model): registration_id = models.AutoField(primary_key=True, db_column='RegistrationID') registration_token = models.CharField(max_length=64, editable=False, db_column='RegistrationToken', unique=True, default=uuid4) @@ -60,7 +59,8 @@ def latest_measurement(self): return try: last_updated_sensor = [sensor for sensor in self.sensors.all() if sensor.last_measurement.pk == self.latest_measurement_id].pop() - except IndexError: + #except IndexError: + except: return None return last_updated_sensor.last_measurement diff --git a/src/dataloaderinterface/signals.py b/src/dataloaderinterface/signals.py index 158721ff..ffa006ec 100644 --- a/src/dataloaderinterface/signals.py +++ b/src/dataloaderinterface/signals.py @@ -7,7 +7,9 @@ from dataloader.models import SamplingFeature, Site, Annotation, SamplingFeatureAnnotation, SpatialReference, Action, \ Method, Result, ProcessingLevel, TimeSeriesResult, Unit from dataloaderinterface.models import SiteRegistration, SiteSensor -from tsa.helpers import TimeSeriesAnalystHelper + +#PRT - deprecated +#from tsa.helpers import TimeSeriesAnalystHelper @receiver(pre_save, sender=SiteRegistration) @@ -78,12 +80,13 @@ def handle_site_registration_post_save(sender, instance, created, update_fields= sampling_feature.annotations.filter(annotation_code='closest_town').update(annotation_text=instance.closest_town or '') -@receiver(post_save, sender=SiteRegistration) -def handle_site_registration_tsa_post_save(sender, instance, created, update_fields=None, **kwargs): - if created: - return - helper = TimeSeriesAnalystHelper() - helper.update_series_from_site(instance) +#PRT - deprecated +#@receiver(post_save, sender=SiteRegistration) +#def handle_site_registration_tsa_post_save(sender, instance, created, update_fields=None, **kwargs): +# if created: +# return +# helper = TimeSeriesAnalystHelper() +# helper.update_series_from_site(instance) @receiver(post_delete, sender=SiteRegistration) @@ -144,13 +147,14 @@ def handle_sensor_post_save(sender, instance, created, update_fields=None, **kwa TimeSeriesResult.objects.filter(result_id=instance.result_id).update(z_location=instance.height) -@receiver(post_save, sender=SiteSensor) -def handle_sensor_tsa_post_save(sender, instance, created, update_fields=None, **kwargs): - helper = TimeSeriesAnalystHelper() - if created: - helper.create_series_from_sensor(instance) - else: - helper.update_series_from_sensor(instance) +#PRT - deprecated +#@receiver(post_save, sender=SiteSensor) +#def handle_sensor_tsa_post_save(sender, instance, created, update_fields=None, **kwargs): +# helper = TimeSeriesAnalystHelper() +# if created: +# helper.create_series_from_sensor(instance) +# else: +# helper.update_series_from_sensor(instance) @receiver(post_delete, sender=SiteSensor) @@ -159,7 +163,8 @@ def handle_sensor_post_delete(sender, instance, **kwargs): result and result.feature_action.action.delete() -@receiver(post_delete, sender=SiteSensor) -def handle_sensor_tsa_post_delete(sender, instance, **kwargs): - helper = TimeSeriesAnalystHelper() - helper.delete_series_for_sensor(instance) +#PRT - deprecated +#@receiver(post_delete, sender=SiteSensor) +#def handle_sensor_tsa_post_delete(sender, instance, **kwargs): +# helper = TimeSeriesAnalystHelper() +# helper.delete_series_for_sensor(instance) diff --git a/src/dataloaderinterface/templatetags/helpers.py b/src/dataloaderinterface/templatetags/helpers.py index 6584a4e7..33468d19 100644 --- a/src/dataloaderinterface/templatetags/helpers.py +++ b/src/dataloaderinterface/templatetags/helpers.py @@ -54,7 +54,7 @@ def is_stale(value, default): def divide(value, arg): try: return int(value) / int(arg) if int(arg) != 0 else 0 - except (ValueError, ZeroDivisionError): + except (TypeError, ValueError, ZeroDivisionError): return None diff --git a/src/tsa/helpers.py b/src/tsa/helpers_deprecated.py similarity index 100% rename from src/tsa/helpers.py rename to src/tsa/helpers_deprecated.py From 5dfbc03ed7868cf09f0b734e86021cfa67380ad9 Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Thu, 16 Dec 2021 15:45:01 -0500 Subject: [PATCH 02/13] Minor bug fixes 1) Fixes TSV not showing full timeseries for selected end date #536 2) Corrected "Time Series Analyst' text that wasn't updated to Time Series Visualization --- .../static/timeseries_visualization/js/visualization.js | 2 +- .../templates/dataloaderinterface/site_details.html | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dataloaderinterface/static/timeseries_visualization/js/visualization.js b/src/dataloaderinterface/static/timeseries_visualization/js/visualization.js index 2270aa42..3475a6b9 100644 --- a/src/dataloaderinterface/static/timeseries_visualization/js/visualization.js +++ b/src/dataloaderinterface/static/timeseries_visualization/js/visualization.js @@ -108,7 +108,7 @@ function updatePlotDateRange(min, max) { } if (max != null) { $('#dpd2').val(dateToString(max)); - max = max.getTime(); + max = max.getTime() + 86399; //select end of day as end point } _chart.xAxis[0].update({'min':min, 'max':max}); _chart.xAxis[0].setExtremes(); diff --git a/src/dataloaderinterface/templates/dataloaderinterface/site_details.html b/src/dataloaderinterface/templates/dataloaderinterface/site_details.html index d80c713f..35ead80c 100644 --- a/src/dataloaderinterface/templates/dataloaderinterface/site_details.html +++ b/src/dataloaderinterface/templates/dataloaderinterface/site_details.html @@ -316,7 +316,7 @@
show_chart
- +

View data for this site.

From 5ca7588c69d275c892bcd728116b172b8d47473d Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Thu, 16 Dec 2021 15:45:46 -0500 Subject: [PATCH 03/13] Increase database pool size. Preemptive increase of pool size to prevent performance bottle neck. --- src/dataloaderinterface/ajax.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dataloaderinterface/ajax.py b/src/dataloaderinterface/ajax.py index 77ed2740..947c637b 100644 --- a/src/dataloaderinterface/ajax.py +++ b/src/dataloaderinterface/ajax.py @@ -13,7 +13,7 @@ _dbsettings = settings.DATABASES['odm2'] _connection_str = f"postgresql://{_dbsettings['USER']}:{_dbsettings['PASSWORD']}@{_dbsettings['HOST']}:{_dbsettings['PORT']}/{_dbsettings['NAME']}" -_db_engine = sqlalchemy.create_engine(_connection_str) +_db_engine = sqlalchemy.create_engine(_connection_str, pool_size=30) def get_result_timeseries_recent(request_data:Dict[str,Any]) -> str: result_id = int(request_data['resultid']) From e43ff3a5a60b1de8fd19f3dc912bf726bfb5256f Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Fri, 17 Dec 2021 13:49:37 -0500 Subject: [PATCH 04/13] Revised error handling for duplicate datetimes. Previous implementation of #510 meant that any database related exception would result in 500. As suggested in #538, duplicate data being sent to the server does not necessarily constitute an error from the external perspective. This commit introduce code that pass on exceptions for primary key violations (duplicate resultid and datetime) and will still return a 201. --- src/dataloaderservices/views.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/dataloaderservices/views.py b/src/dataloaderservices/views.py index 62e7e66d..f435ee5e 100644 --- a/src/dataloaderservices/views.py +++ b/src/dataloaderservices/views.py @@ -41,6 +41,7 @@ #PRT - temporary work around after replacing InfluxDB but not replacement models import sqlalchemy from sqlalchemy.sql import text +import psycopg2 from django.conf import settings _dbsettings = settings.DATABASES['odm2'] _connection_str = f"postgresql://{_dbsettings['USER']}:{_dbsettings['PASSWORD']}@{_dbsettings['HOST']}:{_dbsettings['PORT']}/{_dbsettings['NAME']}" @@ -609,6 +610,14 @@ def post(self, request, format=None): try: query_result = InsertTimeseriesResultValues(result_value) + except sqlalchemy.exc.IntegrityError as e: + if hasattr(e, 'orig'): + if isinstance(e.orig, psycopg2.errors.UniqueViolation): + pass #data is already in database + else: + errors.append(f"Failed to INSERT data for uuid('{result.result_uuid}')") + else: + errors.append(f"Failed to INSERT data for uuid('{result.result_uuid}')") except Exception as e: errors.append(f"Failed to INSERT data for uuid('{result.result_uuid}')") From 25ca268c3ae35e868eac00327f1b032bd4b4b06a Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Thu, 23 Dec 2021 16:49:16 -0500 Subject: [PATCH 05/13] Revised DataStream API This implements a threaded approach to the datastream view, which should increase performance by insert data in parallel. --- src/WebSDL/settings/base.py | 1 + src/dataloaderservices/views.py | 162 +++++++++++++++++--------------- 2 files changed, 89 insertions(+), 74 deletions(-) diff --git a/src/WebSDL/settings/base.py b/src/WebSDL/settings/base.py index 8720e93e..3a962b4e 100644 --- a/src/WebSDL/settings/base.py +++ b/src/WebSDL/settings/base.py @@ -120,6 +120,7 @@ 'HOST': database['host'] if 'host' in database else '', 'PORT': database['port'] if 'port' in database else '', 'OPTIONS': database['options'] if 'options' in database else {}, + 'CONN_MAX_AGE': 0, 'TEST': database['test'] if 'test' in database else {}, } diff --git a/src/dataloaderservices/views.py b/src/dataloaderservices/views.py index f435ee5e..9ef6c688 100644 --- a/src/dataloaderservices/views.py +++ b/src/dataloaderservices/views.py @@ -2,6 +2,7 @@ import os from collections import OrderedDict from datetime import time, timedelta, datetime +from typing import Union from io import StringIO from django.utils import encoding @@ -17,7 +18,7 @@ from django.shortcuts import reverse from rest_framework.generics import GenericAPIView -from dataloader.models import SamplingFeature, TimeSeriesResultValue, Unit, EquipmentModel, TimeSeriesResult, Result +from dataloader.models import ProfileResultValue, SamplingFeature, TimeSeriesResultValue, Unit, EquipmentModel, TimeSeriesResult, Result from django.db.models.expressions import F from django.utils.dateparse import parse_datetime from rest_framework import exceptions @@ -45,11 +46,13 @@ from django.conf import settings _dbsettings = settings.DATABASES['odm2'] _connection_str = f"postgresql://{_dbsettings['USER']}:{_dbsettings['PASSWORD']}@{_dbsettings['HOST']}:{_dbsettings['PORT']}/{_dbsettings['NAME']}" -_db_engine = sqlalchemy.create_engine(_connection_str) +_db_engine = sqlalchemy.create_engine(_connection_str, pool_size=5) # TODO: Check user permissions to edit, add, or remove stuff with a permissions class. # TODO: Use generic api views for create, edit, delete, and list. +from concurrent.futures import ThreadPoolExecutor, as_completed + class ModelVariablesApi(APIView): authentication_classes = (SessionAuthentication, ) @@ -592,82 +595,35 @@ def post(self, request, format=None): if not sampling_feature: raise exceptions.ParseError('Sampling Feature code does not match any existing site.') feature_actions = sampling_feature.feature_actions.prefetch_related('results__variable', 'action').all() - errors = [] - for feature_action in feature_actions: - result = feature_action.results.all().first() - if str(result.result_uuid) not in request.data: - continue - - result_value = TimeseriesResultValueTechDebt( - result_id=result.result_id, - data_value=request.data[str(result.result_uuid)], - value_datetime=measurement_datetime, - utc_offset=utc_offset, - censor_code='Not censored', - quality_code='None', - time_aggregation_interval=1, - time_aggregation_interval_unit=(Unit.objects.get(unit_name='hour minute')).unit_id) - - try: - query_result = InsertTimeseriesResultValues(result_value) - except sqlalchemy.exc.IntegrityError as e: - if hasattr(e, 'orig'): - if isinstance(e.orig, psycopg2.errors.UniqueViolation): - pass #data is already in database - else: - errors.append(f"Failed to INSERT data for uuid('{result.result_uuid}')") + + futures = {} + unit_id = Unit.objects.get(unit_name='hour minute').unit_id + with ThreadPoolExecutor() as executor: + for feature_action in feature_actions: + result = feature_action.results.all().first() + if str(result.result_uuid) not in request.data: + continue else: - errors.append(f"Failed to INSERT data for uuid('{result.result_uuid}')") - except Exception as e: - errors.append(f"Failed to INSERT data for uuid('{result.result_uuid}')") - - # PRT - long term we would like to remove dataloader database but for now - # this block of code keeps dataloaderinterface_sensormeasurement table in sync - result.value_count = F('value_count') + 1 - result.result_datetime = measurement_datetime - result.result_datetime_utc_offset = utc_offset - site_sensor = SiteSensor.objects.filter(result_id=result.result_id).first() - last_measurement = SensorMeasurement.objects.filter(sensor=site_sensor).first() - if not last_measurement: - SensorMeasurement.objects.create( - sensor=site_sensor, - value_datetime=result_value.value_datetime, - value_datetime_utc_offset=timedelta(hours=result_value.utc_offset), - data_value=result_value.data_value - ) - elif last_measurement and result_value.value_datetime > last_measurement.value_datetime: - last_measurement and last_measurement.delete() - SensorMeasurement.objects.create( - sensor=site_sensor, - value_datetime=result_value.value_datetime, - value_datetime_utc_offset=timedelta(hours=result_value.utc_offset), - data_value=result_value.data_value - ) - - if result.value_count == 0: - result.valid_datetime = measurement_datetime - result.valid_datetime_utc_offset = utc_offset - - if not site_sensor.registration.deployment_date: - site_sensor.registration.deployment_date = measurement_datetime - #site_sensor.registration.deployment_date_utc_offset = utc_offset - site_sensor.registration.save(update_fields=['deployment_date']) - - try: - result.save(update_fields=[ - 'result_datetime', 'value_count', 'result_datetime_utc_offset', - 'valid_datetime', 'valid_datetime_utc_offset' - ]) - except Exception as e: - #PRT - An exception here means the dataloaderinterface data tables will not in sync - # for this sensor, but that is better than a fail state where data is lost so pass - # expection for now. Long term plan is to remove this whole block of code. - pass - # End dataloaderinterface_sensormeasurement sync block + result_value = TimeseriesResultValueTechDebt( + result_id=result.result_id, + data_value=request.data[str(result.result_uuid)], + value_datetime=measurement_datetime, + utc_offset=utc_offset, + censor_code='Not censored', + quality_code='None', + time_aggregation_interval=1, + time_aggregation_interval_unit=unit_id) + futures[executor.submit(ProcessResultValue, result_value, result)] = None + + errors = [] + for future in as_completed(futures): + if future.result() is not None: errors.append(future.result()) + if errors: return Response(errors, status=status.HTTP_500_INTERNAL_SERVER_ERROR) - return Response({}, status.HTTP_201_CREATED) + + class TimeseriesResultValueTechDebt(): def __init__(self, result_id:str, @@ -687,6 +643,64 @@ def __init__(self, self.time_aggregation_interval = time_aggregation_interval self.time_aggregation_interval_unit = time_aggregation_interval_unit +def ProcessResultValue(result_value:TimeseriesResultValueTechDebt, result:Result) -> Union[str,None]: + try: + query_result = InsertTimeseriesResultValues(result_value) + except sqlalchemy.exc.IntegrityError as e: + if hasattr(e, 'orig'): + if isinstance(e.orig, psycopg2.errors.UniqueViolation): + pass #data is already in database + else: + return (f"Failed to INSERT data for uuid('{result.result_uuid}')") + else: + return (f"Failed to INSERT data for uuid('{result.result_uuid}')") + except Exception as e: + return (f"Failed to INSERT data for uuid('{result.result_uuid}')") + + # PRT - long term we would like to remove dataloader database but for now + # this block of code keeps dataloaderinterface_sensormeasurement table in sync + result.value_count = F('value_count') + 1 + result.result_datetime = result_value.value_datetime + result.result_datetime_utc_offset = result_value.utc_offset + site_sensor = SiteSensor.objects.filter(result_id=result.result_id).first() + last_measurement = SensorMeasurement.objects.filter(sensor=site_sensor).first() + if not last_measurement: + SensorMeasurement.objects.create( + sensor=site_sensor, + value_datetime=result_value.value_datetime, + value_datetime_utc_offset=timedelta(hours=result_value.utc_offset), + data_value=result_value.data_value + ) + elif last_measurement and result_value.value_datetime > last_measurement.value_datetime: + last_measurement and last_measurement.delete() + SensorMeasurement.objects.create( + sensor=site_sensor, + value_datetime=result_value.value_datetime, + value_datetime_utc_offset=timedelta(hours=result_value.utc_offset), + data_value=result_value.data_value + ) + + if result.value_count == 0: + result.valid_datetime = result_value.value_datetime + result.valid_datetime_utc_offset = result_value.utc_offset + + if not site_sensor.registration.deployment_date: + site_sensor.registration.deployment_date = result_value.value_datetime + #site_sensor.registration.deployment_date_utc_offset = utc_offset + site_sensor.registration.save(update_fields=['deployment_date']) + + try: + result.save(update_fields=[ + 'result_datetime', 'value_count', 'result_datetime_utc_offset', + 'valid_datetime', 'valid_datetime_utc_offset' + ]) + except Exception as e: + #PRT - An exception here means the dataloaderinterface data tables will not in sync + # for this sensor, but that is better than a fail state where data is lost so pass + # expection for now. Long term plan is to remove this whole block of code. + pass + return None + def InsertTimeseriesResultValues(result_value : TimeseriesResultValueTechDebt) -> None: with _db_engine.connect() as connection: query = text("INSERT INTO odm2.timeseriesresultvalues " \ From 107ef550e786fe54c3bcb8ae4c57565c71d1c979 Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Wed, 29 Dec 2021 10:14:26 -0500 Subject: [PATCH 06/13] Separate out update method from threaded approach --- src/dataloaderservices/views.py | 90 ++++++++++++++++----------------- 1 file changed, 45 insertions(+), 45 deletions(-) diff --git a/src/dataloaderservices/views.py b/src/dataloaderservices/views.py index 9ef6c688..90dbbdd6 100644 --- a/src/dataloaderservices/views.py +++ b/src/dataloaderservices/views.py @@ -598,7 +598,7 @@ def post(self, request, format=None): futures = {} unit_id = Unit.objects.get(unit_name='hour minute').unit_id - with ThreadPoolExecutor() as executor: + with ThreadPoolExecutor(max_workers=8) as executor: for feature_action in feature_actions: result = feature_action.results.all().first() if str(result.result_uuid) not in request.data: @@ -614,7 +614,49 @@ def post(self, request, format=None): time_aggregation_interval=1, time_aggregation_interval_unit=unit_id) futures[executor.submit(ProcessResultValue, result_value, result)] = None - + + # PRT - long term we would like to remove dataloader database but for now + # this block of code keeps dataloaderinterface_sensormeasurement table in sync + result.value_count = F('value_count') + 1 + result.result_datetime = measurement_datetime + result.result_datetime_utc_offset = utc_offset + site_sensor = SiteSensor.objects.filter(result_id=result.result_id).first() + last_measurement = SensorMeasurement.objects.filter(sensor=site_sensor).first() + if not last_measurement: + SensorMeasurement.objects.create( + sensor=site_sensor, + value_datetime=measurement_datetime, + value_datetime_utc_offset=timedelta(hours=utc_offset), + data_value=result_value.data_value + ) + elif last_measurement and measurement_datetime > last_measurement.value_datetime: + last_measurement and last_measurement.delete() + SensorMeasurement.objects.create( + sensor=site_sensor, + value_datetime=measurement_datetime, + value_datetime_utc_offset=timedelta(hours=utc_offset), + data_value=result_value.data_value + ) + + if result.value_count == 0: + result.valid_datetime = measurement_datetime + result.valid_datetime_utc_offset = utc_offset + + if not site_sensor.registration.deployment_date: + site_sensor.registration.deployment_date = measurement_datetime + #site_sensor.registration.deployment_date_utc_offset = utc_offset + site_sensor.registration.save(update_fields=['deployment_date']) + + try: + result.save(update_fields=[ + 'result_datetime', 'value_count', 'result_datetime_utc_offset', + 'valid_datetime', 'valid_datetime_utc_offset' + ]) + except Exception as e: + #PRT - An exception here means the dataloaderinterface data tables will not in sync + # for this sensor, but that is better than a fail state where data is lost so pass + # expection for now. Long term plan is to remove this whole block of code. + pass errors = [] for future in as_completed(futures): if future.result() is not None: errors.append(future.result()) @@ -646,6 +688,7 @@ def __init__(self, def ProcessResultValue(result_value:TimeseriesResultValueTechDebt, result:Result) -> Union[str,None]: try: query_result = InsertTimeseriesResultValues(result_value) + pass except sqlalchemy.exc.IntegrityError as e: if hasattr(e, 'orig'): if isinstance(e.orig, psycopg2.errors.UniqueViolation): @@ -656,49 +699,6 @@ def ProcessResultValue(result_value:TimeseriesResultValueTechDebt, result:Result return (f"Failed to INSERT data for uuid('{result.result_uuid}')") except Exception as e: return (f"Failed to INSERT data for uuid('{result.result_uuid}')") - - # PRT - long term we would like to remove dataloader database but for now - # this block of code keeps dataloaderinterface_sensormeasurement table in sync - result.value_count = F('value_count') + 1 - result.result_datetime = result_value.value_datetime - result.result_datetime_utc_offset = result_value.utc_offset - site_sensor = SiteSensor.objects.filter(result_id=result.result_id).first() - last_measurement = SensorMeasurement.objects.filter(sensor=site_sensor).first() - if not last_measurement: - SensorMeasurement.objects.create( - sensor=site_sensor, - value_datetime=result_value.value_datetime, - value_datetime_utc_offset=timedelta(hours=result_value.utc_offset), - data_value=result_value.data_value - ) - elif last_measurement and result_value.value_datetime > last_measurement.value_datetime: - last_measurement and last_measurement.delete() - SensorMeasurement.objects.create( - sensor=site_sensor, - value_datetime=result_value.value_datetime, - value_datetime_utc_offset=timedelta(hours=result_value.utc_offset), - data_value=result_value.data_value - ) - - if result.value_count == 0: - result.valid_datetime = result_value.value_datetime - result.valid_datetime_utc_offset = result_value.utc_offset - - if not site_sensor.registration.deployment_date: - site_sensor.registration.deployment_date = result_value.value_datetime - #site_sensor.registration.deployment_date_utc_offset = utc_offset - site_sensor.registration.save(update_fields=['deployment_date']) - - try: - result.save(update_fields=[ - 'result_datetime', 'value_count', 'result_datetime_utc_offset', - 'valid_datetime', 'valid_datetime_utc_offset' - ]) - except Exception as e: - #PRT - An exception here means the dataloaderinterface data tables will not in sync - # for this sensor, but that is better than a fail state where data is lost so pass - # expection for now. Long term plan is to remove this whole block of code. - pass return None def InsertTimeseriesResultValues(result_value : TimeseriesResultValueTechDebt) -> None: From f2c6763715acae18839a3b99b2bdf863bb35e8d8 Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Wed, 29 Dec 2021 10:14:36 -0500 Subject: [PATCH 07/13] Add Profiler Method --- src/WebSDL/settings/base.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/WebSDL/settings/base.py b/src/WebSDL/settings/base.py index 3a962b4e..7e9acf5d 100644 --- a/src/WebSDL/settings/base.py +++ b/src/WebSDL/settings/base.py @@ -75,8 +75,11 @@ 'django.middleware.clickjacking.XFrameOptionsMiddleware', 'hydroshare_util.middleware.AuthMiddleware', # 'debug_toolbar.middleware.DebugToolbarMiddleware', + 'django_cprofile_middleware.middleware.ProfilerMiddleware', ] +DJANGO_CPROFILE_MIDDLEWARE_REQUIRE_STAFF = False + REST_FRAMEWORK = { 'DEFAULT_RENDERER_CLASSES': ( 'rest_framework.renderers.JSONRenderer', From 76129304d40eaaa9c7ae157a7163d2c6c6ff3102 Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Wed, 29 Dec 2021 17:11:39 -0500 Subject: [PATCH 08/13] Work around to allow multithread 'api/data-steam' Performance profiling showed that the Django models used in the view end point for 'api/data-stream' were acting as a bottle neck, and also did not allow for asynchronous support. This commit is a patch which replaces those models with direct SQL, which should be more performant and also supports multithreading. --- src/dataloaderservices/views.py | 198 +++++++++++++++++++++----------- 1 file changed, 128 insertions(+), 70 deletions(-) diff --git a/src/dataloaderservices/views.py b/src/dataloaderservices/views.py index 90dbbdd6..ccdbbcc3 100644 --- a/src/dataloaderservices/views.py +++ b/src/dataloaderservices/views.py @@ -2,7 +2,7 @@ import os from collections import OrderedDict from datetime import time, timedelta, datetime -from typing import Union +from typing import Union, Dict, Any, final from io import StringIO from django.utils import encoding @@ -44,10 +44,16 @@ from sqlalchemy.sql import text import psycopg2 from django.conf import settings + _dbsettings = settings.DATABASES['odm2'] _connection_str = f"postgresql://{_dbsettings['USER']}:{_dbsettings['PASSWORD']}@{_dbsettings['HOST']}:{_dbsettings['PORT']}/{_dbsettings['NAME']}" _db_engine = sqlalchemy.create_engine(_connection_str, pool_size=5) +_dbsettings_loader = settings.DATABASES['default'] +_connection_str_loader = f"postgresql://{_dbsettings_loader['USER']}:{_dbsettings_loader['PASSWORD']}@{_dbsettings_loader['HOST']}:{_dbsettings_loader['PORT']}/{_dbsettings_loader['NAME']}" +_db_engine_loader = sqlalchemy.create_engine(_connection_str_loader, pool_size=5) + + # TODO: Check user permissions to edit, add, or remove stuff with a permissions class. # TODO: Use generic api views for create, edit, delete, and list. @@ -293,6 +299,7 @@ def post(self, request, *args, **kwargs): data_value = row[results_mapping['results'][uuid]['index']] result_value = TimeseriesResultValueTechDebt( result_id=sensor.result_id, + result_uuid=uuid, data_value=data_value, utc_offset=results_mapping['utc_offset'], value_datetime=measurement_datetime, @@ -302,7 +309,7 @@ def post(self, request, *args, **kwargs): time_aggregation_interval_unit=data_value_units.unit_id, ) try: - result = InsertTimeseriesResultValues(result_value) + result = insert_timeseries_result_values(result_value) except Exception as e: warnings.append(f"Error inserting value '{data_value}'"\ f"at datetime '{measurement_datetime}' for result uuid '{uuid}'") @@ -594,69 +601,33 @@ def post(self, request, format=None): sampling_feature = SamplingFeature.objects.filter(sampling_feature_uuid__exact=request.data['sampling_feature']).first() if not sampling_feature: raise exceptions.ParseError('Sampling Feature code does not match any existing site.') - feature_actions = sampling_feature.feature_actions.prefetch_related('results__variable', 'action').all() + result_uuids = get_result_UUIDs(sampling_feature.sampling_feature_id) + if not result_uuids: + raise exceptions.ParseError(f"No results_uuids matched to sampling_feature '{request.data['sampling_feature']}'") + futures = {} unit_id = Unit.objects.get(unit_name='hour minute').unit_id - with ThreadPoolExecutor(max_workers=8) as executor: - for feature_action in feature_actions: - result = feature_action.results.all().first() - if str(result.result_uuid) not in request.data: - continue - else: - result_value = TimeseriesResultValueTechDebt( - result_id=result.result_id, - data_value=request.data[str(result.result_uuid)], - value_datetime=measurement_datetime, - utc_offset=utc_offset, - censor_code='Not censored', - quality_code='None', - time_aggregation_interval=1, - time_aggregation_interval_unit=unit_id) - futures[executor.submit(ProcessResultValue, result_value, result)] = None - - # PRT - long term we would like to remove dataloader database but for now - # this block of code keeps dataloaderinterface_sensormeasurement table in sync - result.value_count = F('value_count') + 1 - result.result_datetime = measurement_datetime - result.result_datetime_utc_offset = utc_offset - site_sensor = SiteSensor.objects.filter(result_id=result.result_id).first() - last_measurement = SensorMeasurement.objects.filter(sensor=site_sensor).first() - if not last_measurement: - SensorMeasurement.objects.create( - sensor=site_sensor, - value_datetime=measurement_datetime, - value_datetime_utc_offset=timedelta(hours=utc_offset), - data_value=result_value.data_value - ) - elif last_measurement and measurement_datetime > last_measurement.value_datetime: - last_measurement and last_measurement.delete() - SensorMeasurement.objects.create( - sensor=site_sensor, - value_datetime=measurement_datetime, - value_datetime_utc_offset=timedelta(hours=utc_offset), - data_value=result_value.data_value - ) - - if result.value_count == 0: - result.valid_datetime = measurement_datetime - result.valid_datetime_utc_offset = utc_offset - - if not site_sensor.registration.deployment_date: - site_sensor.registration.deployment_date = measurement_datetime - #site_sensor.registration.deployment_date_utc_offset = utc_offset - site_sensor.registration.save(update_fields=['deployment_date']) - + + with ThreadPoolExecutor() as executor: + for key in request.data: try: - result.save(update_fields=[ - 'result_datetime', 'value_count', 'result_datetime_utc_offset', - 'valid_datetime', 'valid_datetime_utc_offset' - ]) - except Exception as e: - #PRT - An exception here means the dataloaderinterface data tables will not in sync - # for this sensor, but that is better than a fail state where data is lost so pass - # expection for now. Long term plan is to remove this whole block of code. - pass + result_id = result_uuids[key] + except KeyError: + continue + + result_value = TimeseriesResultValueTechDebt( + result_id=result_id, + result_uuid=key, + data_value=request.data[str(key)], + value_datetime=measurement_datetime, + utc_offset=utc_offset, + censor_code='Not censored', + quality_code='None', + time_aggregation_interval=1, + time_aggregation_interval_unit=unit_id) + futures[executor.submit(process_result_value, result_value)] = None + errors = [] for future in as_completed(futures): if future.result() is not None: errors.append(future.result()) @@ -664,11 +635,30 @@ def post(self, request, format=None): if errors: return Response(errors, status=status.HTTP_500_INTERNAL_SERVER_ERROR) return Response({}, status.HTTP_201_CREATED) +####################################################### +### Temporary HOT fix to address model performance +####################################################### +#PRT - the code in this block is meant as a hot fix to address poor model performance +#the long term goal is to refactor the application models to make them more performant. +def get_result_UUIDs(sampling_feature_id:str) -> Union[Dict[str, str],None]: + try: + with _db_engine.connect() as connection: + query = text("SELECT r.resultid, r.resultuuid FROM odm2.results AS r " \ + "JOIN odm2.featureactions AS fa ON r.featureactionid = fa.featureactionid "\ + "WHERE fa.samplingfeatureid = ':sampling_feature_id';") + df = pd.read_sql(query, connection, params={'sampling_feature_id': sampling_feature_id}) + df['resultuuid'] = df['resultuuid'].astype(str) + df = df.set_index('resultuuid') + results = df['resultid'].to_dict() + return results + except: + return None class TimeseriesResultValueTechDebt(): def __init__(self, - result_id:str, + result_id:str, + result_uuid:str, data_value:float, value_datetime:datetime, utc_offset:int, @@ -677,6 +667,7 @@ def __init__(self, time_aggregation_interval:int, time_aggregation_interval_unit:int) -> None: self.result_id = result_id + self.result_uuid = result_uuid self.data_value = data_value self.utc_offset = utc_offset self.value_datetime = value_datetime @@ -685,23 +676,88 @@ def __init__(self, self.time_aggregation_interval = time_aggregation_interval self.time_aggregation_interval_unit = time_aggregation_interval_unit -def ProcessResultValue(result_value:TimeseriesResultValueTechDebt, result:Result) -> Union[str,None]: +def process_result_value(result_value:TimeseriesResultValueTechDebt) -> Union[str,None]: try: - query_result = InsertTimeseriesResultValues(result_value) - pass + query_result = insert_timeseries_result_values(result_value) + sync_dataloader_tables(result_value) + sync_result_table(result_value) except sqlalchemy.exc.IntegrityError as e: if hasattr(e, 'orig'): if isinstance(e.orig, psycopg2.errors.UniqueViolation): pass #data is already in database else: - return (f"Failed to INSERT data for uuid('{result.result_uuid}')") + return (f"Failed to INSERT data for uuid('{result_value.result_uuid}')") else: - return (f"Failed to INSERT data for uuid('{result.result_uuid}')") + return (f"Failed to INSERT data for uuid('{result_value.result_uuid}')") except Exception as e: - return (f"Failed to INSERT data for uuid('{result.result_uuid}')") + return (f"Failed to INSERT data for uuid('{result_value.result_uuid}')") + + + # PRT - long term we would like to remove dataloader database but for now + # this block of code keeps dataloaderinterface_sensormeasurement table in sync + + + #if not site_sensor.registration.deployment_date: + #site_sensor.registration.deployment_date = measurement_datetime + # #site_sensor.registration.deployment_date_utc_offset = utc_offset + # site_sensor.registration.save(update_fields=['deployment_date']) + + return None + +#dataloader utility function +def get_site_sensor(resultid:str) -> Union[Dict[str, Any],None]: + with _db_engine_loader.connect() as connection: + query = text('SELECT * FROM dataloaderinterface_sitesensor ' \ + 'WHERE "ResultID"=:resultid;' + ) + df = pd.read_sql(query, connection, params={'resultid':resultid}) + return df.to_dict(orient='records')[0] + +#dataloader utility function +def update_sensormeasurement(sensor_id:str, result_value:TimeseriesResultValueTechDebt) -> None: + with _db_engine_loader.connect() as connection: + query = text("DO $condition_insert$ BEGIN " \ + 'IF (SELECT COUNT(sensor_id > 0) FROM ' \ + ' dataloaderinterface_sensormeasurement WHERE ' \ + ' sensor_id = :sensor_id) THEN ' \ + ' UPDATE dataloaderinterface_sensormeasurement ' \ + " SET value_datetime=:datetime, " \ + " value_datetime_utc_offset = ':utc_offset', " \ + ' data_value = data_value ' \ + ' WHERE sensor_id=:sensor_id; ' \ + 'ELSE ' \ + ' INSERT INTO dataloaderinterface_sensormeasurement ' \ + " VALUES (:sensor_id,:datetime,':utc_offset',:data_value); " \ + 'END IF;' \ + 'END $condition_insert$') + connection.execute(query, + sensor_id=sensor_id, + datetime=result_value.value_datetime, + utc_offset=result_value.utc_offset, + data_value=result_value.data_value + ) return None -def InsertTimeseriesResultValues(result_value : TimeseriesResultValueTechDebt) -> None: +#dataloader utility function +def sync_dataloader_tables(result_value: TimeseriesResultValueTechDebt) -> None: + site_sensor = get_site_sensor(result_value.result_id) + if not site_sensor: return None + update_sensormeasurement(site_sensor['id'], result_value) + return None + +def sync_result_table(result_value: TimeseriesResultValueTechDebt) -> None: + with _db_engine.connect() as connection: + query = text("DO $sync_function$ BEGIN "\ + "UPDATE odm2.results SET valuecount = valuecount + 1 WHERE resultid=:result_id; "\ + "IF (SELECT (resultdatetime < :result_datetime) FROM odm2.results WHERE resultid=:result_id) THEN" \ + " UPDATE odm2.results SET resultdatetime = :result_datetime WHERE resultid=:result_id; "\ + "END IF; END $sync_function$ ") + return connection.execute(query, + result_id=result_value.result_id, + result_datetime=result_value.value_datetime, + ) + +def insert_timeseries_result_values(result_value : TimeseriesResultValueTechDebt) -> None: with _db_engine.connect() as connection: query = text("INSERT INTO odm2.timeseriesresultvalues " \ "(valueid, resultid, datavalue, valuedatetime, valuedatetimeutcoffset, " \ @@ -726,4 +782,6 @@ def InsertTimeseriesResultValues(result_value : TimeseriesResultValueTechDebt) - time_aggregation_interval=result_value.time_aggregation_interval, time_aggregation_interval_unit=result_value.time_aggregation_interval_unit, ) - return result \ No newline at end of file + if result: + return sync_result_table(result_value) + return None \ No newline at end of file From 6238305e721bdc14dab667b6464aabe2c83af5b2 Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Thu, 30 Dec 2021 13:19:57 -0500 Subject: [PATCH 09/13] Replace Postgres functions with simpler queries My previous commit (76129304d40eaaa9c7ae157a7163d2c6c6ff3102) replace django models with customized queries. These queries leverages the DO operation and some Postgres IF logic. While this executes fine in Postgres, it doesn't appear to function with SQLAlchemy. This commit replace those queries with more simple logic that is supported by SQLAlchemy. --- src/dataloaderservices/views.py | 89 ++++++++++++++++----------------- 1 file changed, 43 insertions(+), 46 deletions(-) diff --git a/src/dataloaderservices/views.py b/src/dataloaderservices/views.py index ccdbbcc3..a0857749 100644 --- a/src/dataloaderservices/views.py +++ b/src/dataloaderservices/views.py @@ -48,10 +48,11 @@ _dbsettings = settings.DATABASES['odm2'] _connection_str = f"postgresql://{_dbsettings['USER']}:{_dbsettings['PASSWORD']}@{_dbsettings['HOST']}:{_dbsettings['PORT']}/{_dbsettings['NAME']}" _db_engine = sqlalchemy.create_engine(_connection_str, pool_size=5) +_db_engine = sqlalchemy.create_engine(_connection_str, pool_size=10) _dbsettings_loader = settings.DATABASES['default'] _connection_str_loader = f"postgresql://{_dbsettings_loader['USER']}:{_dbsettings_loader['PASSWORD']}@{_dbsettings_loader['HOST']}:{_dbsettings_loader['PORT']}/{_dbsettings_loader['NAME']}" -_db_engine_loader = sqlalchemy.create_engine(_connection_str_loader, pool_size=5) +_db_engine_loader = sqlalchemy.create_engine(_connection_str_loader, pool_size=10) # TODO: Check user permissions to edit, add, or remove stuff with a permissions class. @@ -586,7 +587,6 @@ class TimeSeriesValuesApi(APIView): def post(self, request, format=None): if not all(key in request.data for key in ('timestamp', 'sampling_feature')): raise exceptions.ParseError("Required data not found in request.") - try: measurement_datetime = parse_datetime(request.data['timestamp']) except ValueError: @@ -609,7 +609,7 @@ def post(self, request, format=None): futures = {} unit_id = Unit.objects.get(unit_name='hour minute').unit_id - with ThreadPoolExecutor() as executor: + with ThreadPoolExecutor(max_workers=8) as executor: for key in request.data: try: result_id = result_uuids[key] @@ -628,9 +628,9 @@ def post(self, request, format=None): time_aggregation_interval_unit=unit_id) futures[executor.submit(process_result_value, result_value)] = None - errors = [] - for future in as_completed(futures): - if future.result() is not None: errors.append(future.result()) + errors = [] + for future in as_completed(futures): + if future.result() is not None: errors.append(future.result()) if errors: return Response(errors, status=status.HTTP_500_INTERNAL_SERVER_ERROR) return Response({}, status.HTTP_201_CREATED) @@ -679,12 +679,11 @@ def __init__(self, def process_result_value(result_value:TimeseriesResultValueTechDebt) -> Union[str,None]: try: query_result = insert_timeseries_result_values(result_value) - sync_dataloader_tables(result_value) - sync_result_table(result_value) except sqlalchemy.exc.IntegrityError as e: if hasattr(e, 'orig'): if isinstance(e.orig, psycopg2.errors.UniqueViolation): - pass #data is already in database + #data is already in database + return None else: return (f"Failed to INSERT data for uuid('{result_value.result_uuid}')") else: @@ -692,17 +691,18 @@ def process_result_value(result_value:TimeseriesResultValueTechDebt) -> Union[st except Exception as e: return (f"Failed to INSERT data for uuid('{result_value.result_uuid}')") - # PRT - long term we would like to remove dataloader database but for now # this block of code keeps dataloaderinterface_sensormeasurement table in sync - - - #if not site_sensor.registration.deployment_date: - #site_sensor.registration.deployment_date = measurement_datetime - # #site_sensor.registration.deployment_date_utc_offset = utc_offset - # site_sensor.registration.save(update_fields=['deployment_date']) - - return None + try: + query_result = sync_dataloader_tables(result_value) + query_result = sync_result_table(result_value) + return None + #if not site_sensor.registration.deployment_date: + #site_sensor.registration.deployment_date = measurement_datetime + # #site_sensor.registration.deployment_date_utc_offset = utc_offset + # site_sensor.registration.save(update_fields=['deployment_date']) + except Exception as e: + return None #dataloader utility function def get_site_sensor(resultid:str) -> Union[Dict[str, Any],None]: @@ -716,46 +716,45 @@ def get_site_sensor(resultid:str) -> Union[Dict[str, Any],None]: #dataloader utility function def update_sensormeasurement(sensor_id:str, result_value:TimeseriesResultValueTechDebt) -> None: with _db_engine_loader.connect() as connection: - query = text("DO $condition_insert$ BEGIN " \ - 'IF (SELECT COUNT(sensor_id > 0) FROM ' \ - ' dataloaderinterface_sensormeasurement WHERE ' \ - ' sensor_id = :sensor_id) THEN ' \ - ' UPDATE dataloaderinterface_sensormeasurement ' \ - " SET value_datetime=:datetime, " \ - " value_datetime_utc_offset = ':utc_offset', " \ - ' data_value = data_value ' \ - ' WHERE sensor_id=:sensor_id; ' \ - 'ELSE ' \ - ' INSERT INTO dataloaderinterface_sensormeasurement ' \ - " VALUES (:sensor_id,:datetime,':utc_offset',:data_value); " \ - 'END IF;' \ - 'END $condition_insert$') - connection.execute(query, + query = text('UPDATE dataloaderinterface_sensormeasurement ' \ + "SET value_datetime=:datetime, " \ + "value_datetime_utc_offset = :utc_offset, " \ + 'data_value = :data_value ' \ + 'WHERE sensor_id=:sensor_id; ') + result = connection.execute(query, sensor_id=sensor_id, datetime=result_value.value_datetime, - utc_offset=result_value.utc_offset, + utc_offset=timedelta(hours=result_value.utc_offset), data_value=result_value.data_value - ) - return None + ) + if result.rowcount < 1: + query = text('INSERT INTO dataloaderinterface_sensormeasurement ' \ + "VALUES (:sensor_id,:datetime,':utc_offset',:data_value); ") + result = connection.execute(query, + sensor_id=sensor_id, + datetime=result_value.value_datetime, + utc_offset=timedelta(hours=result_value.utc_offset), + data_value=result_value.data_value + ) + return result #dataloader utility function def sync_dataloader_tables(result_value: TimeseriesResultValueTechDebt) -> None: site_sensor = get_site_sensor(result_value.result_id) if not site_sensor: return None - update_sensormeasurement(site_sensor['id'], result_value) + result = update_sensormeasurement(site_sensor['id'], result_value) return None def sync_result_table(result_value: TimeseriesResultValueTechDebt) -> None: with _db_engine.connect() as connection: - query = text("DO $sync_function$ BEGIN "\ - "UPDATE odm2.results SET valuecount = valuecount + 1 WHERE resultid=:result_id; "\ - "IF (SELECT (resultdatetime < :result_datetime) FROM odm2.results WHERE resultid=:result_id) THEN" \ - " UPDATE odm2.results SET resultdatetime = :result_datetime WHERE resultid=:result_id; "\ - "END IF; END $sync_function$ ") - return connection.execute(query, + query = text("UPDATE odm2.results SET valuecount = valuecount + 1, " \ + "resultdatetime = GREATEST(:result_datetime, resultdatetime)" \ + "WHERE resultid=:result_id; ") + result = connection.execute(query, result_id=result_value.result_id, result_datetime=result_value.value_datetime, ) + return result def insert_timeseries_result_values(result_value : TimeseriesResultValueTechDebt) -> None: with _db_engine.connect() as connection: @@ -782,6 +781,4 @@ def insert_timeseries_result_values(result_value : TimeseriesResultValueTechDebt time_aggregation_interval=result_value.time_aggregation_interval, time_aggregation_interval_unit=result_value.time_aggregation_interval_unit, ) - if result: - return sync_result_table(result_value) - return None \ No newline at end of file + return result \ No newline at end of file From b4f6c408695c855a8cb8f10d51ad559b0b730e9b Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Thu, 30 Dec 2021 13:21:03 -0500 Subject: [PATCH 10/13] Missing change of engine pool size --- src/dataloaderservices/views.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/dataloaderservices/views.py b/src/dataloaderservices/views.py index a0857749..b0defe79 100644 --- a/src/dataloaderservices/views.py +++ b/src/dataloaderservices/views.py @@ -47,7 +47,6 @@ _dbsettings = settings.DATABASES['odm2'] _connection_str = f"postgresql://{_dbsettings['USER']}:{_dbsettings['PASSWORD']}@{_dbsettings['HOST']}:{_dbsettings['PORT']}/{_dbsettings['NAME']}" -_db_engine = sqlalchemy.create_engine(_connection_str, pool_size=5) _db_engine = sqlalchemy.create_engine(_connection_str, pool_size=10) _dbsettings_loader = settings.DATABASES['default'] From 64f263673b625f3e6ac8be3556d8d99030bb1dfa Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Thu, 30 Dec 2021 13:46:12 -0500 Subject: [PATCH 11/13] Restore set sensor deployment date logic --- src/dataloaderservices/views.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/src/dataloaderservices/views.py b/src/dataloaderservices/views.py index b0defe79..37c78637 100644 --- a/src/dataloaderservices/views.py +++ b/src/dataloaderservices/views.py @@ -605,6 +605,9 @@ def post(self, request, format=None): if not result_uuids: raise exceptions.ParseError(f"No results_uuids matched to sampling_feature '{request.data['sampling_feature']}'") + #dataloader table related + set_deployment_date(sampling_feature.sampling_feature_id, measurement_datetime) + futures = {} unit_id = Unit.objects.get(unit_name='hour minute').unit_id @@ -696,10 +699,6 @@ def process_result_value(result_value:TimeseriesResultValueTechDebt) -> Union[st query_result = sync_dataloader_tables(result_value) query_result = sync_result_table(result_value) return None - #if not site_sensor.registration.deployment_date: - #site_sensor.registration.deployment_date = measurement_datetime - # #site_sensor.registration.deployment_date_utc_offset = utc_offset - # site_sensor.registration.save(update_fields=['deployment_date']) except Exception as e: return None @@ -744,6 +743,20 @@ def sync_dataloader_tables(result_value: TimeseriesResultValueTechDebt) -> None: result = update_sensormeasurement(site_sensor['id'], result_value) return None +#dataloader utility function +def set_deployment_date(sample_feature_id:int, date_time:datetime) -> None: + with _db_engine_loader.connect() as connection: + query = text('UPDATE dataloaderinterface_siteregistration '\ + 'SET "DeploymentDate"=:date_time '\ + 'WHERE "DeploymentDate" IS NULL AND ' \ + '"SamplingFeatureID"=:sample_feature_id' ) + result = connection.execute(query, + sample_feature_id=sample_feature_id, + date_time=date_time + ) + return None + + def sync_result_table(result_value: TimeseriesResultValueTechDebt) -> None: with _db_engine.connect() as connection: query = text("UPDATE odm2.results SET valuecount = valuecount + 1, " \ From bd211705f282dfa9110d417953d92b04509533b1 Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Wed, 5 Jan 2022 09:10:30 -0500 Subject: [PATCH 12/13] Add error handling to set_deployment_date method. --- src/dataloaderservices/views.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/dataloaderservices/views.py b/src/dataloaderservices/views.py index 37c78637..66ea8db2 100644 --- a/src/dataloaderservices/views.py +++ b/src/dataloaderservices/views.py @@ -606,7 +606,10 @@ def post(self, request, format=None): raise exceptions.ParseError(f"No results_uuids matched to sampling_feature '{request.data['sampling_feature']}'") #dataloader table related - set_deployment_date(sampling_feature.sampling_feature_id, measurement_datetime) + try: + set_deployment_date(sampling_feature.sampling_feature_id, measurement_datetime) + except Exception as e: + pass futures = {} unit_id = Unit.objects.get(unit_name='hour minute').unit_id From 17a0ce797743928c4a9bc779377705c5d7c3b975 Mon Sep 17 00:00:00 2001 From: Paul Tomasula <31142705+ptomasula@users.noreply.github.com> Date: Wed, 5 Jan 2022 11:43:57 -0500 Subject: [PATCH 13/13] Move timeseries_result_values error handling --- src/dataloaderservices/views.py | 77 +++++++++++++++++---------------- 1 file changed, 39 insertions(+), 38 deletions(-) diff --git a/src/dataloaderservices/views.py b/src/dataloaderservices/views.py index 66ea8db2..7ab1ba7d 100644 --- a/src/dataloaderservices/views.py +++ b/src/dataloaderservices/views.py @@ -682,20 +682,9 @@ def __init__(self, self.time_aggregation_interval_unit = time_aggregation_interval_unit def process_result_value(result_value:TimeseriesResultValueTechDebt) -> Union[str,None]: - try: - query_result = insert_timeseries_result_values(result_value) - except sqlalchemy.exc.IntegrityError as e: - if hasattr(e, 'orig'): - if isinstance(e.orig, psycopg2.errors.UniqueViolation): - #data is already in database - return None - else: - return (f"Failed to INSERT data for uuid('{result_value.result_uuid}')") - else: - return (f"Failed to INSERT data for uuid('{result_value.result_uuid}')") - except Exception as e: - return (f"Failed to INSERT data for uuid('{result_value.result_uuid}')") - + result = insert_timeseries_result_values(result_value) + if result is not None: + return result # PRT - long term we would like to remove dataloader database but for now # this block of code keeps dataloaderinterface_sensormeasurement table in sync try: @@ -772,28 +761,40 @@ def sync_result_table(result_value: TimeseriesResultValueTechDebt) -> None: return result def insert_timeseries_result_values(result_value : TimeseriesResultValueTechDebt) -> None: - with _db_engine.connect() as connection: - query = text("INSERT INTO odm2.timeseriesresultvalues " \ - "(valueid, resultid, datavalue, valuedatetime, valuedatetimeutcoffset, " \ - "censorcodecv, qualitycodecv, timeaggregationinterval, timeaggregationintervalunitsid) " \ - "VALUES ( " \ - "(SELECT nextval('odm2.\"timeseriesresultvalues_valueid_seq\"'))," \ - ":result_id, " \ - ":data_value, " \ - ":value_datetime, " \ - ":utc_offset, " \ - ":censor_code, " \ - ":quality_code, " \ - ":time_aggregation_interval, " \ - ":time_aggregation_interval_unit);") - result = connection.execute(query, - result_id=result_value.result_id, - data_value=result_value.data_value, - value_datetime=result_value.value_datetime, - utc_offset=result_value.utc_offset, - censor_code=result_value.censor_code, - quality_code=result_value.quality_code, - time_aggregation_interval=result_value.time_aggregation_interval, - time_aggregation_interval_unit=result_value.time_aggregation_interval_unit, + try: + with _db_engine.connect() as connection: + query = text("INSERT INTO odm2.timeseriesresultvalues " \ + "(valueid, resultid, datavalue, valuedatetime, valuedatetimeutcoffset, " \ + "censorcodecv, qualitycodecv, timeaggregationinterval, timeaggregationintervalunitsid) " \ + "VALUES ( " \ + "(SELECT nextval('odm2.\"timeseriesresultvalues_valueid_seq\"'))," \ + ":result_id, " \ + ":data_value, " \ + ":value_datetime, " \ + ":utc_offset, " \ + ":censor_code, " \ + ":quality_code, " \ + ":time_aggregation_interval, " \ + ":time_aggregation_interval_unit);") + result = connection.execute(query, + result_id=result_value.result_id, + data_value=result_value.data_value, + value_datetime=result_value.value_datetime, + utc_offset=result_value.utc_offset, + censor_code=result_value.censor_code, + quality_code=result_value.quality_code, + time_aggregation_interval=result_value.time_aggregation_interval, + time_aggregation_interval_unit=result_value.time_aggregation_interval_unit, ) - return result \ No newline at end of file + return None + except sqlalchemy.exc.IntegrityError as e: + if hasattr(e, 'orig'): + if isinstance(e.orig, psycopg2.errors.UniqueViolation): + #data is already in database + return None + else: + return (f"Failed to INSERT data for uuid('{result_value.result_uuid}')") + else: + return (f"Failed to INSERT data for uuid('{result_value.result_uuid}')") + except Exception as e: + return (f"Failed to INSERT data for uuid('{result_value.result_uuid}')") \ No newline at end of file