diff --git a/.github/workflows/tests-docker.yml b/.github/workflows/tests-docker.yml index b34fbbc8..e80a5a7e 100644 --- a/.github/workflows/tests-docker.yml +++ b/.github/workflows/tests-docker.yml @@ -53,12 +53,14 @@ jobs: DISCOVERY_METADATA: /data/wis2box/metadata/discovery/mw-surface-weather-observations.yml DISCOVERY_METADATA_ID: urn:x-wmo:md:mw-mw_met_centre:surface-weather-observations TEST_DATA: /data/wis2box/observations/malawi + TEST_DATA_UPDATE: /data/wis2box/observations/malawi_update run: | python3 wis2box-ctl.py execute wis2box metadata discovery publish $DISCOVERY_METADATA python3 wis2box-ctl.py execute wis2box data add-collection $DISCOVERY_METADATA python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID + python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA_UPDATE - name: add Italy data 🇮🇹 env: TOPIC_HIERARCHY: it-roma_met_centre.data.core.weather.surface-based-observations.synop diff --git a/tests/data/observations/malawi_update/WIGOS_0-454-2-AWSBALAKA_update_2021-11-18T0955.csv b/tests/data/observations/malawi_update/WIGOS_0-454-2-AWSBALAKA_update_2021-11-18T0955.csv new file mode 100644 index 00000000..6638cf73 --- /dev/null +++ b/tests/data/observations/malawi_update/WIGOS_0-454-2-AWSBALAKA_update_2021-11-18T0955.csv @@ -0,0 +1,5 @@ +"TOA5","Namitambo","CR300","4720","CR310-CELL200.Std.08.01","CPU:CR310_Malawi_V1R6_06072021_T3.CR300","13058","SYNOP" +"TIMESTAMP","RECORD","WMO_Block","Station_ID","Station_Name","WMO_Station_Type","M_Year","M_Month","M_DayOfMonth","M_HourOfDay","M_Minutes","Latitude","Longitude","Elevation","BP_Elevation","BP","QNH","BP_Change","BP_Tendency","Temp_H","AirTempK","DewPointTempK","RH","Sun_hr","SunHrs","Sun_hr24","SunHrs24","Rain_H","Rain_hr","Rain_mm_Tot","Temp_hr24","Temp24T","AirTempMaxK","AirTempMinK","WSpeed_height","Wind_Type","Wind_Sig","Wind_T","WSpeed","WindDir","WSpeed10M_Avg","WindG_Sig","WindGust","Solar_hr","SlrJ","Solar_hr24","SlrJ24" +"TS","RN","","","","","","","","","","","","","m","","hPa","Pa","","m","K","K","%","","hours","","hours","m","","mm","","","K","K","","","","","meters/second","degrees","m/s","","m/s","","J/m^2","","J/m^2" +"","","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Tot","Smp","Smp","Smp","Smp","Tot","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","WVc","WVc","Smp","Smp","Smp","Smp","Tot","Smp","Smp" +"2021-11-18 11:55:00",719,6,"0-454-2-AWSBALAKA","Balaka","0",2021,11,18,9,55,-14.98,34.97,617.6,618.6,94399.23,101638.1,-221.2813,7,1.5,309.5,288.9,29.48,-1,0,-24,0,1.5,-1,0,-24,0,350.1,303.9,2,"0",2,-10,1.635,352.5,2.169,"None",5.68,-1,703.221,-24,704.221 diff --git a/tests/data/observations/romania/A_SMRO01YRBK171200_C_EDZW_20230117125200_51396856.txt b/tests/data/observations/romania_update/A_SMRO01YRBK171200_C_EDZW_20230117125200_51396856.txt similarity index 100% rename from tests/data/observations/romania/A_SMRO01YRBK171200_C_EDZW_20230117125200_51396856.txt rename to tests/data/observations/romania_update/A_SMRO01YRBK171200_C_EDZW_20230117125200_51396856.txt diff --git a/tests/data/observations/romania/A_SMRO01YRBK171800CCA_C_EDZW_20230117184900_51697747.txt b/tests/data/observations/romania_update/A_SMRO01YRBK171800CCA_C_EDZW_20230117184900_51697747.txt similarity index 100% rename from tests/data/observations/romania/A_SMRO01YRBK171800CCA_C_EDZW_20230117184900_51697747.txt rename to tests/data/observations/romania_update/A_SMRO01YRBK171800CCA_C_EDZW_20230117184900_51697747.txt diff --git a/tests/data/observations/romania/A_SMRO01YRBK171800_C_EDZW_20230117181403_51669400.txt b/tests/data/observations/romania_update/A_SMRO01YRBK171800_C_EDZW_20230117181403_51669400.txt similarity index 100% rename from tests/data/observations/romania/A_SMRO01YRBK171800_C_EDZW_20230117181403_51669400.txt rename to tests/data/observations/romania_update/A_SMRO01YRBK171800_C_EDZW_20230117181403_51669400.txt diff --git a/tests/data/observations/romania/A_SMRO01YRBK180000CCA_C_EDZW_20230118004301_51967254.txt b/tests/data/observations/romania_update/A_SMRO01YRBK180000CCA_C_EDZW_20230118004301_51967254.txt similarity index 67% rename from tests/data/observations/romania/A_SMRO01YRBK180000CCA_C_EDZW_20230118004301_51967254.txt rename to tests/data/observations/romania_update/A_SMRO01YRBK180000CCA_C_EDZW_20230118004301_51967254.txt index eb6d235b..0b0935f8 100644 --- a/tests/data/observations/romania/A_SMRO01YRBK180000CCA_C_EDZW_20230118004301_51967254.txt +++ b/tests/data/observations/romania_update/A_SMRO01YRBK180000CCA_C_EDZW_20230118004301_51967254.txt @@ -1,6 +1,12 @@ -SMRO01 YRBK 180000 CCA -AAXX 18001 -15280 01/90 92034 11034 21040 37301 47838 53008 60001 74143 333 49080 -55300 0//// 20000 3//// 55000 0//// 20003 3//// 60007 91040 911// -92956= - +SMRO01 YRBK 180000 CCA + +AAXX 18001 + +15280 01/90 92034 11034 21040 37301 47838 53008 60001 74143 333 49080 + +55300 0//// 20000 3//// 55000 0//// 20003 3//// 60007 91040 911// + +92956= + + + diff --git a/tests/data/observations/romania/A_SMRO01YRBK180000_C_EDZW_20230118001801_51945941.txt b/tests/data/observations/romania_update/A_SMRO01YRBK180000_C_EDZW_20230118001801_51945941.txt similarity index 100% rename from tests/data/observations/romania/A_SMRO01YRBK180000_C_EDZW_20230118001801_51945941.txt rename to tests/data/observations/romania_update/A_SMRO01YRBK180000_C_EDZW_20230118001801_51945941.txt diff --git a/tests/integration/test_workflow.py b/tests/integration/test_workflow.py index 5fa7385e..761080bd 100644 --- a/tests/integration/test_workflow.py +++ b/tests/integration/test_workflow.py @@ -225,12 +225,21 @@ def test_data_api(): def test_message_api(): """Test message API collection queries""" + # check messages with "wigos_station_identifier"="0-454-2-AWSBALAKA" + url = f'{API_URL}/collections/messages/items?q=0-454-2-AWSBALAKA&limit=2' # noqa + r = SESSION.get(url).json() + # get links from 2nd message + links = r['features'][1]['links'] + + # check link contains rel='http://def.wmo.int/def/rel/wnm/-/update' + assert any(link['rel'] == 'http://def.wmo.int/def/rel/wnm/-/update' for link in links) # noqa + # test messages per test dataset counts = { - 'mw_met_centre': 24, + 'mw_met_centre': 25, 'roma_met_centre': 33, 'alger_met_centre': 29, - 'rnimh': 188, + 'rnimh': 116, 'brazza_met_centre': 15 } for key, value in counts.items(): diff --git a/wis2box-management/wis2box/api/backend/elastic.py b/wis2box-management/wis2box/api/backend/elastic.py index e56fd3fb..96692b3c 100644 --- a/wis2box-management/wis2box/api/backend/elastic.py +++ b/wis2box-management/wis2box/api/backend/elastic.py @@ -325,7 +325,6 @@ def delete_collection_item(self, collection_id: str, item_id: str) -> str: _ = self.conn.delete(index=collection_id, id=item_id) except Exception as err: msg = f'Item deletion failed: {err}' - LOGGER.error(msg) raise RuntimeError(msg) return True diff --git a/wis2box-management/wis2box/data/base.py b/wis2box-management/wis2box/data/base.py index a761a02f..33048951 100644 --- a/wis2box-management/wis2box/data/base.py +++ b/wis2box-management/wis2box/data/base.py @@ -29,7 +29,7 @@ STORAGE_SOURCE, BROKER_PUBLIC, BROKER_HOST, BROKER_USERNAME, BROKER_PASSWORD, BROKER_PORT) -from wis2box.storage import put_data +from wis2box.storage import exists, get_data, put_data from wis2box.topic_hierarchy import TopicHierarchy from wis2box.plugin import load_plugin, PLUGINS @@ -128,7 +128,8 @@ def transform(self, input_data: Union[bytes, str], def notify(self, identifier: str, storage_path: str, datetime_: str, geometry: dict = None, - wigos_station_identifier: str = None) -> bool: + wigos_station_identifier: str = None, + is_update: bool = False) -> bool: """ Send notification of data to broker @@ -147,9 +148,11 @@ def notify(self, identifier: str, storage_path: str, topic = f'origin/a/wis2/{self.topic_hierarchy.dirpath}' data_id = topic.replace('origin/a/wis2/', '') + operation = 'create' if is_update is False else 'update' + wis_message = WISNotificationMessage( identifier, data_id, storage_path, datetime_, geometry, - wigos_station_identifier) + wigos_station_identifier, operation) # load plugin for public broker defs = { @@ -224,10 +227,22 @@ def publish_item(self, identifier, item) -> bool: data_bytes = self.as_bytes(the_data) storage_path = f'{STORAGE_SOURCE}/{STORAGE_PUBLIC}/{rfp}/{identifier}.{format_}' # noqa - LOGGER.info(f'Writing data to {storage_path}') - put_data(data_bytes, storage_path) - - if self.enable_notification: + is_update = False + is_new = True + # check if storage_path already exists + if exists(storage_path): + # if data exists, check if it is the same + if data_bytes == get_data(storage_path): + LOGGER.error(f'Data already published for {identifier}-{format_}; not publishing') # noqa + is_new = False + else: + LOGGER.warning(f'Data already published for {identifier}-{format_}; updating') # noqa + is_update = True + if is_new: + LOGGER.info(f'Writing data to {storage_path}') + put_data(data_bytes, storage_path) + + if self.enable_notification and is_new: LOGGER.debug('Sending notification to broker') try: @@ -237,7 +252,7 @@ def publish_item(self, identifier, item) -> bool: self.notify(identifier, storage_path, datetime_, - item['_meta'].get('geometry'), wsi) + item['_meta'].get('geometry'), wsi, is_update) else: LOGGER.debug('No notification sent') except Exception as err: diff --git a/wis2box-management/wis2box/pubsub/message.py b/wis2box-management/wis2box/pubsub/message.py index 461bc8cd..bc351d7a 100644 --- a/wis2box-management/wis2box/pubsub/message.py +++ b/wis2box-management/wis2box/pubsub/message.py @@ -128,7 +128,8 @@ def _generate_checksum(self, bytes, algorithm: SecureHashAlgorithms) -> str: # class WISNotificationMessage(PubSubMessage): def __init__(self, identifier: str, topic: str, filepath: str, - datetime_: str, geometry=None, wigos_station_identifier=None): + datetime_: str, geometry=None, wigos_station_identifier=None, + operation: str = 'create') -> None: super().__init__('wis2-notification-message', identifier, topic, filepath, datetime_, geometry) @@ -152,6 +153,20 @@ def __init__(self, identifier: str, topic: str, filepath: str, if self.datetime is None: LOGGER.warning('Missing data datetime') + links = [{ + 'rel': 'canonical', + 'type': mimetype, + 'href': public_file_url, + 'length': self.length + }] + if operation == 'update': + links.append({ + 'rel': 'http://def.wmo.int/def/rel/wnm/-/update', + 'type': mimetype, + 'href': public_file_url, + 'length': self.length + }) + self.message = { 'id': str(uuid.uuid4()), 'type': 'Feature', @@ -166,12 +181,7 @@ def __init__(self, identifier: str, topic: str, filepath: str, 'value': self.checksum_value } }, - 'links': [{ - 'rel': 'canonical', - 'type': mimetype, - 'href': public_file_url, - 'length': self.length - }] + 'links': links } if self.length < 4096: diff --git a/wis2box-management/wis2box/storage/__init__.py b/wis2box-management/wis2box/storage/__init__.py index 0e64f337..695b59fe 100644 --- a/wis2box-management/wis2box/storage/__init__.py +++ b/wis2box-management/wis2box/storage/__init__.py @@ -29,6 +29,35 @@ LOGGER = logging.getLogger(__name__) +def exists(path: str) -> bool: + """ + Check if storage path exists + + :param path: path to check + + :returns: `bool` of result + """ + LOGGER.debug(f'exists: {path}') + storage_path = path.replace(f'{STORAGE_SOURCE}/', '') + name = storage_path.split('/')[0] + + defs = { + 'storage_type': STORAGE_TYPE, + 'source': STORAGE_SOURCE, + 'name': name, + 'auth': {'username': STORAGE_USERNAME, 'password': STORAGE_PASSWORD}, + 'codepath': PLUGINS['storage'][STORAGE_TYPE]['plugin'] + } + + LOGGER.debug(f'Connecting to storage: {name}') + storage = load_plugin('storage', defs) + + identifier = storage_path.replace(name, '') + + LOGGER.debug(f'Checking if {identifier} exists') + return storage.exists(identifier) + + def get_data(path: str) -> Any: """ Get data from storage diff --git a/wis2box-management/wis2box/storage/minio.py b/wis2box-management/wis2box/storage/minio.py index 74bc8e87..b6889b3e 100644 --- a/wis2box-management/wis2box/storage/minio.py +++ b/wis2box-management/wis2box/storage/minio.py @@ -26,6 +26,7 @@ from urllib.parse import urlparse from minio import Minio +from minio import error as minio_error from minio.notificationconfig import NotificationConfig, QueueConfig from wis2box.storage.base import PolicyTypes, StorageBase @@ -135,6 +136,23 @@ def create_bucket(self, bucket_policy: PolicyTypes = 'private'): LOGGER.debug(f'Adding notification config {config}') self.client.set_bucket_notification(self.name, config) + def exists(self, identifier: str) -> bool: + LOGGER.debug(f'Checking if object {identifier} exists') + try: + # Attempt to get object info to check if it exists + self.client.stat_object(bucket_name=self.name, object_name=identifier) # noqa + return True # Object exists + except minio_error.S3Error as err: + if err.code == 'NoSuchKey': + LOGGER.debug(err) + return False + else: + LOGGER.error(err) + raise err + except Exception as err: + LOGGER.error(err) + raise err + def get(self, identifier: str) -> Any: LOGGER.debug(f'Getting object {identifier} from bucket={self.name}') diff --git a/wis2box-management/wis2box/storage/s3.py b/wis2box-management/wis2box/storage/s3.py index 5b764cd1..b03c251f 100644 --- a/wis2box-management/wis2box/storage/s3.py +++ b/wis2box-management/wis2box/storage/s3.py @@ -24,6 +24,7 @@ from typing import Any import boto3 +from botocore.exceptions import ClientError from wis2box.storage.base import StorageBase @@ -40,6 +41,23 @@ def __init__(self, defs: dict) -> None: aws_access_key_id=self.auth['username'], aws_secret_access_key=self.auth['password']) + def exists(self, identifier: str) -> bool: + + LOGGER.debug(f'Checking if {identifier} exists') + try: + self.client.head_object(Bucket=self.name, Key=identifier) + except ClientError as e: + # If the object does not exist, return False + if e.response['Error']['Code'] == '404': + return False + else: + # If any other error occurs, raise an exception + raise e + except Exception as e: + raise e + + return True + def get(self, identifier: str) -> Any: LOGGER.debug(f'Getting object {identifier}')