Skip to content

Commit

Permalink
detect updates and duplicates (#601)
Browse files Browse the repository at this point in the history
* detect updates and duplicates

* update test

* flake8

* alphabatize and move more updates

* remove unreachabel code
  • Loading branch information
maaikelimper authored Dec 14, 2023
1 parent bc73660 commit 3cbbd92
Show file tree
Hide file tree
Showing 14 changed files with 135 additions and 24 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/tests-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ jobs:
DISCOVERY_METADATA: /data/wis2box/metadata/discovery/mw-surface-weather-observations.yml
DISCOVERY_METADATA_ID: urn:x-wmo:md:mw-mw_met_centre:surface-weather-observations
TEST_DATA: /data/wis2box/observations/malawi
TEST_DATA_UPDATE: /data/wis2box/observations/malawi_update
run: |
python3 wis2box-ctl.py execute wis2box metadata discovery publish $DISCOVERY_METADATA
python3 wis2box-ctl.py execute wis2box data add-collection $DISCOVERY_METADATA
python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA
curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID
check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID
python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA_UPDATE
- name: add Italy data 🇮🇹
env:
TOPIC_HIERARCHY: it-roma_met_centre.data.core.weather.surface-based-observations.synop
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"TOA5","Namitambo","CR300","4720","CR310-CELL200.Std.08.01","CPU:CR310_Malawi_V1R6_06072021_T3.CR300","13058","SYNOP"
"TIMESTAMP","RECORD","WMO_Block","Station_ID","Station_Name","WMO_Station_Type","M_Year","M_Month","M_DayOfMonth","M_HourOfDay","M_Minutes","Latitude","Longitude","Elevation","BP_Elevation","BP","QNH","BP_Change","BP_Tendency","Temp_H","AirTempK","DewPointTempK","RH","Sun_hr","SunHrs","Sun_hr24","SunHrs24","Rain_H","Rain_hr","Rain_mm_Tot","Temp_hr24","Temp24T","AirTempMaxK","AirTempMinK","WSpeed_height","Wind_Type","Wind_Sig","Wind_T","WSpeed","WindDir","WSpeed10M_Avg","WindG_Sig","WindGust","Solar_hr","SlrJ","Solar_hr24","SlrJ24"
"TS","RN","","","","","","","","","","","","","m","","hPa","Pa","","m","K","K","%","","hours","","hours","m","","mm","","","K","K","","","","","meters/second","degrees","m/s","","m/s","","J/m^2","","J/m^2"
"","","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Tot","Smp","Smp","Smp","Smp","Tot","Smp","Smp","Smp","Smp","Smp","Smp","Smp","Smp","WVc","WVc","Smp","Smp","Smp","Smp","Tot","Smp","Smp"
"2021-11-18 11:55:00",719,6,"0-454-2-AWSBALAKA","Balaka","0",2021,11,18,9,55,-14.98,34.97,617.6,618.6,94399.23,101638.1,-221.2813,7,1.5,309.5,288.9,29.48,-1,0,-24,0,1.5,-1,0,-24,0,350.1,303.9,2,"0",2,-10,1.635,352.5,2.169,"None",5.68,-1,703.221,-24,704.221
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
SMRO01 YRBK 180000 CCA
AAXX 18001
15280 01/90 92034 11034 21040 37301 47838 53008 60001 74143 333 49080
55300 0//// 20000 3//// 55000 0//// 20003 3//// 60007 91040 911//
92956=

SMRO01 YRBK 180000 CCA

AAXX 18001

15280 01/90 92034 11034 21040 37301 47838 53008 60001 74143 333 49080

55300 0//// 20000 3//// 55000 0//// 20003 3//// 60007 91040 911//

92956=



13 changes: 11 additions & 2 deletions tests/integration/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,21 @@ def test_data_api():
def test_message_api():
"""Test message API collection queries"""

# check messages with "wigos_station_identifier"="0-454-2-AWSBALAKA"
url = f'{API_URL}/collections/messages/items?q=0-454-2-AWSBALAKA&limit=2' # noqa
r = SESSION.get(url).json()
# get links from 2nd message
links = r['features'][1]['links']

# check link contains rel='http://def.wmo.int/def/rel/wnm/-/update'
assert any(link['rel'] == 'http://def.wmo.int/def/rel/wnm/-/update' for link in links) # noqa

# test messages per test dataset
counts = {
'mw_met_centre': 24,
'mw_met_centre': 25,
'roma_met_centre': 33,
'alger_met_centre': 29,
'rnimh': 188,
'rnimh': 116,
'brazza_met_centre': 15
}
for key, value in counts.items():
Expand Down
1 change: 0 additions & 1 deletion wis2box-management/wis2box/api/backend/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ def delete_collection_item(self, collection_id: str, item_id: str) -> str:
_ = self.conn.delete(index=collection_id, id=item_id)
except Exception as err:
msg = f'Item deletion failed: {err}'
LOGGER.error(msg)
raise RuntimeError(msg)

return True
Expand Down
31 changes: 23 additions & 8 deletions wis2box-management/wis2box/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
STORAGE_SOURCE, BROKER_PUBLIC,
BROKER_HOST, BROKER_USERNAME, BROKER_PASSWORD,
BROKER_PORT)
from wis2box.storage import put_data
from wis2box.storage import exists, get_data, put_data
from wis2box.topic_hierarchy import TopicHierarchy
from wis2box.plugin import load_plugin, PLUGINS

Expand Down Expand Up @@ -128,7 +128,8 @@ def transform(self, input_data: Union[bytes, str],
def notify(self, identifier: str, storage_path: str,
datetime_: str,
geometry: dict = None,
wigos_station_identifier: str = None) -> bool:
wigos_station_identifier: str = None,
is_update: bool = False) -> bool:
"""
Send notification of data to broker
Expand All @@ -147,9 +148,11 @@ def notify(self, identifier: str, storage_path: str,
topic = f'origin/a/wis2/{self.topic_hierarchy.dirpath}'
data_id = topic.replace('origin/a/wis2/', '')

operation = 'create' if is_update is False else 'update'

wis_message = WISNotificationMessage(
identifier, data_id, storage_path, datetime_, geometry,
wigos_station_identifier)
wigos_station_identifier, operation)

# load plugin for public broker
defs = {
Expand Down Expand Up @@ -224,10 +227,22 @@ def publish_item(self, identifier, item) -> bool:
data_bytes = self.as_bytes(the_data)
storage_path = f'{STORAGE_SOURCE}/{STORAGE_PUBLIC}/{rfp}/{identifier}.{format_}' # noqa

LOGGER.info(f'Writing data to {storage_path}')
put_data(data_bytes, storage_path)

if self.enable_notification:
is_update = False
is_new = True
# check if storage_path already exists
if exists(storage_path):
# if data exists, check if it is the same
if data_bytes == get_data(storage_path):
LOGGER.error(f'Data already published for {identifier}-{format_}; not publishing') # noqa
is_new = False
else:
LOGGER.warning(f'Data already published for {identifier}-{format_}; updating') # noqa
is_update = True
if is_new:
LOGGER.info(f'Writing data to {storage_path}')
put_data(data_bytes, storage_path)

if self.enable_notification and is_new:
LOGGER.debug('Sending notification to broker')

try:
Expand All @@ -237,7 +252,7 @@ def publish_item(self, identifier, item) -> bool:

self.notify(identifier, storage_path,
datetime_,
item['_meta'].get('geometry'), wsi)
item['_meta'].get('geometry'), wsi, is_update)
else:
LOGGER.debug('No notification sent')
except Exception as err:
Expand Down
24 changes: 17 additions & 7 deletions wis2box-management/wis2box/pubsub/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ def _generate_checksum(self, bytes, algorithm: SecureHashAlgorithms) -> str: #

class WISNotificationMessage(PubSubMessage):
def __init__(self, identifier: str, topic: str, filepath: str,
datetime_: str, geometry=None, wigos_station_identifier=None):
datetime_: str, geometry=None, wigos_station_identifier=None,
operation: str = 'create') -> None:

super().__init__('wis2-notification-message', identifier,
topic, filepath, datetime_, geometry)
Expand All @@ -152,6 +153,20 @@ def __init__(self, identifier: str, topic: str, filepath: str,
if self.datetime is None:
LOGGER.warning('Missing data datetime')

links = [{
'rel': 'canonical',
'type': mimetype,
'href': public_file_url,
'length': self.length
}]
if operation == 'update':
links.append({
'rel': 'http://def.wmo.int/def/rel/wnm/-/update',
'type': mimetype,
'href': public_file_url,
'length': self.length
})

self.message = {
'id': str(uuid.uuid4()),
'type': 'Feature',
Expand All @@ -166,12 +181,7 @@ def __init__(self, identifier: str, topic: str, filepath: str,
'value': self.checksum_value
}
},
'links': [{
'rel': 'canonical',
'type': mimetype,
'href': public_file_url,
'length': self.length
}]
'links': links
}

if self.length < 4096:
Expand Down
29 changes: 29 additions & 0 deletions wis2box-management/wis2box/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,35 @@
LOGGER = logging.getLogger(__name__)


def exists(path: str) -> bool:
"""
Check if storage path exists
:param path: path to check
:returns: `bool` of result
"""
LOGGER.debug(f'exists: {path}')
storage_path = path.replace(f'{STORAGE_SOURCE}/', '')
name = storage_path.split('/')[0]

defs = {
'storage_type': STORAGE_TYPE,
'source': STORAGE_SOURCE,
'name': name,
'auth': {'username': STORAGE_USERNAME, 'password': STORAGE_PASSWORD},
'codepath': PLUGINS['storage'][STORAGE_TYPE]['plugin']
}

LOGGER.debug(f'Connecting to storage: {name}')
storage = load_plugin('storage', defs)

identifier = storage_path.replace(name, '')

LOGGER.debug(f'Checking if {identifier} exists')
return storage.exists(identifier)


def get_data(path: str) -> Any:
"""
Get data from storage
Expand Down
18 changes: 18 additions & 0 deletions wis2box-management/wis2box/storage/minio.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from urllib.parse import urlparse

from minio import Minio
from minio import error as minio_error
from minio.notificationconfig import NotificationConfig, QueueConfig

from wis2box.storage.base import PolicyTypes, StorageBase
Expand Down Expand Up @@ -135,6 +136,23 @@ def create_bucket(self, bucket_policy: PolicyTypes = 'private'):
LOGGER.debug(f'Adding notification config {config}')
self.client.set_bucket_notification(self.name, config)

def exists(self, identifier: str) -> bool:
LOGGER.debug(f'Checking if object {identifier} exists')
try:
# Attempt to get object info to check if it exists
self.client.stat_object(bucket_name=self.name, object_name=identifier) # noqa
return True # Object exists
except minio_error.S3Error as err:
if err.code == 'NoSuchKey':
LOGGER.debug(err)
return False
else:
LOGGER.error(err)
raise err
except Exception as err:
LOGGER.error(err)
raise err

def get(self, identifier: str) -> Any:

LOGGER.debug(f'Getting object {identifier} from bucket={self.name}')
Expand Down
18 changes: 18 additions & 0 deletions wis2box-management/wis2box/storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from typing import Any

import boto3
from botocore.exceptions import ClientError

from wis2box.storage.base import StorageBase

Expand All @@ -40,6 +41,23 @@ def __init__(self, defs: dict) -> None:
aws_access_key_id=self.auth['username'],
aws_secret_access_key=self.auth['password'])

def exists(self, identifier: str) -> bool:

LOGGER.debug(f'Checking if {identifier} exists')
try:
self.client.head_object(Bucket=self.name, Key=identifier)
except ClientError as e:
# If the object does not exist, return False
if e.response['Error']['Code'] == '404':
return False
else:
# If any other error occurs, raise an exception
raise e
except Exception as e:
raise e

return True

def get(self, identifier: str) -> Any:

LOGGER.debug(f'Getting object {identifier}')
Expand Down

0 comments on commit 3cbbd92

Please sign in to comment.