Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved warnings / error handling. #117

Merged
merged 4 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ ENV DEBIAN_FRONTEND="noninteractive" \
PATH="${PATH}:/opt/eccodes/bin"

RUN apt-get update -y \
&& apt-get install -y vim emacs nedit nano
&& apt-get install -y vim emacs nedit nano git \
&& pip3 install --no-cache-dir git+https://github.com/wmo-im/csv2bufr-templates@main # ToDo - move to requirements.txt

WORKDIR /tmp

Expand Down
93 changes: 60 additions & 33 deletions csv2bufr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
'typicalSecond': 'const:0'
}

_warnings = []

# status codes
FAILED = 0
Expand Down Expand Up @@ -123,19 +124,24 @@

# function to find position in array of requested element
def index_(key, mapping):
global _warnings
idx = 0
for item in mapping:
if item['eccodes_key'] == key:
return idx
idx += 1
if NULLIFY_INVALID:
LOGGER.warning(f"key {key} not found in {mapping}")
msg = f"key {key} not found in {mapping}"
LOGGER.warning(msg)
_warnings.append(msg)
return None
else:
LOGGER.error(f"key {key} not found in {mapping}")
raise ValueError
raise ValueError


def parse_value(element: str, data: dict):
global _warnings
data_type = element.split(":")
if data_type[0] == "const":
value = data_type[1]
Expand All @@ -146,11 +152,14 @@ def parse_value(element: str, data: dict):
elif data_type[0] == "data":
column = data_type[1]
if column not in data:
msg = f"Column {column} not found in input data: {data}"
if NULLIFY_INVALID:
LOGGER.warning(f"Column {column} not found in input data: {data}") # noqa
LOGGER.warning("HERE")
LOGGER.warning(msg) # noqa
_warnings.append(msg)
else:
LOGGER.error(f"Column {column} not found in input data: {data}") # noqa
raise ValueError
LOGGER.error(msg) # noqa
raise ValueError
value = data[column]
elif data_type[0] == "array":
value = data_type[1]
Expand All @@ -172,14 +181,17 @@ def parse_value(element: str, data: dict):

# function to retrieve data
def get_(key: str, mapping: dict, data: dict):
global _warnings
# get position in mapping
try:
idx = index_(key, mapping)
element = mapping[idx]
value = parse_value(element['value'], data)
except Exception as e:
msg = f"Warning raised getting value for {key}, None returned for {key}" # noqa
if NULLIFY_INVALID:
LOGGER.warning(f"Warning raised getting value for {key}, None returned for {key}") # noqa
LOGGER.warning(msg) # noqa
_warnings.append(msg)
value = None
else:
raise e
Expand All @@ -197,7 +209,7 @@ def validate_mapping(mapping: dict) -> bool:

:returns: `bool` of validation result
"""

global _warnings
# load internal file schema for mappings
file_schema = f"{MAPPINGS}{os.sep}mapping_schema.json"
with open(file_schema) as fh:
Expand Down Expand Up @@ -230,7 +242,7 @@ def apply_scaling(value: Union[NUMBERS], scale: Union[NUMBERS],

:returns: scaled value
"""

global _warnings
if isinstance(value, NUMBERS):
if None not in [scale, offset]:
try:
Expand All @@ -257,7 +269,7 @@ def validate_value(key: str, value: Union[NUMBERS],

:returns: validated value
"""

global _warnings
# TODO move this function to the class as part of set value

if value is None:
Expand All @@ -271,6 +283,7 @@ def validate_value(key: str, value: Union[NUMBERS],
e = ValueError(f"{key}: Value ({value}) out of valid range ({valid_min} - {valid_max}).") # noqa
if nullify_on_fail:
LOGGER.warning(f"{e}; Element set to missing")
_warnings.append(f"{e}; Element set to missing")
return None
else:
# LOGGER.error(str(e))
Expand All @@ -296,6 +309,7 @@ def __init__(self, descriptors: list,
:param table_version: version of Master Table 0 to use, default 36

"""
global _warnings
# ================================
# first create empty bufr messages
# ================================
Expand Down Expand Up @@ -359,9 +373,11 @@ def __init__(self, descriptors: list,
self.extended_delayed_replications = \
extended_delayed_replications # used when encoding
self.bufr = None # placeholder for BUFR bytes
self._hash = None # placeholder for hash of data
# ============================================

def create_template(self) -> None:
global _warnings
template = {}
template["inputDelayedDescriptorReplicationFactor"] = \
self.delayed_replications
Expand Down Expand Up @@ -441,7 +457,7 @@ def set_element(self, key: str, value: object) -> None:

:returns: `None`
"""

global _warnings
# TODO move value validation here

if value is not None and not isinstance(value, list):
Expand All @@ -456,6 +472,7 @@ def set_element(self, key: str, value: object) -> None:
if NULLIFY_INVALID:
value = None
LOGGER.warning(f"{e}: Unable to convert value {value} to int for {key}, set to None") # noqa
_warnings.append(f"{e}: Unable to convert value {value} to int for {key}, set to None") # noqa
else:
raise e
elif expected_type == "float" and not isinstance(value, float):
Expand All @@ -465,6 +482,7 @@ def set_element(self, key: str, value: object) -> None:
if NULLIFY_INVALID:
value = None
LOGGER.warning(f"{e}: Unable to convert value {value} to float for {key}, set to None") # noqa
_warnings.append(f"{e}: Unable to convert value {value} to float for {key}, set to None") # noqa
else:
raise e
else:
Expand All @@ -479,6 +497,7 @@ def get_element(self, key: str) -> Any:

:returns: value of the element
"""
global _warnings
result = None
try:
# check if we want value or an attribute (indicated by ->)
Expand All @@ -492,6 +511,7 @@ def get_element(self, key: str) -> Any:
if NULLIFY_INVALID:
result = None
LOGGER.warning(f"Error {e} whilst fetching {key} from data, None returned") # noqa
_warnings.append(f"Error {e} whilst fetching {key} from data, None returned") # noqa
else:
msg = f"Error {e} whilst fetching {key} from data"
raise RuntimeError(msg)
Expand All @@ -507,7 +527,7 @@ def as_bufr(self, use_cached: bool = False) -> bytes:

:returns: bytes containing BUFR data
"""

global _warnings
if use_cached and (self.bufr is not None):
return self.bufr
# ===========================
Expand Down Expand Up @@ -544,12 +564,12 @@ def as_bufr(self, use_cached: bool = False) -> bytes:
try:
codes_set(bufr_msg, "pack", True)
except CodesInternalError as e:
LOGGER.warning(f"error calling codes_set({bufr_msg}, 'pack', True): {e}") # noqa
LOGGER.warning("null message returned")
LOGGER.warning(f"Error calling codes_set({bufr_msg}, 'pack', True): {e}. Null message returned") # noqa
_warnings.append(f"Error calling codes_set({bufr_msg}, 'pack', True): {e}. Null message returned") # noqa
codes_release(bufr_msg)
return self.bufr
except Exception as e:
LOGGER.error(f"error calling codes_set({bufr_msg}, 'pack', True): {e}") # noqa
LOGGER.error(f"Error calling codes_set({bufr_msg}, 'pack', True): {e}") # noqa
LOGGER.error(json.dumps(self.dict, indent=4))
raise e
# =======================================================
Expand All @@ -567,6 +587,13 @@ def as_bufr(self, use_cached: bool = False) -> bytes:
# Return BUFR message bytes
# =============================================
self.bufr = fh.read()
try:
# set hash
self._hash = hashlib.md5(self.bufr).hexdigest()
except Exception as e:
LOGGER.error(f"Error calculating hash (md5) of BUFR string: {self.bufr}") # noqa
raise e

return self.bufr

def md5(self) -> str:
Expand All @@ -575,12 +602,8 @@ def md5(self) -> str:

:returns: md5 of BUFR message
"""
bufr = self.as_bufr(use_cached=True)

if bufr is not None:
return hashlib.md5(bufr).hexdigest()
else:
return None
global _warnings
return self._hash

def parse(self, data: dict, mappings: dict) -> None:
"""
Expand All @@ -598,6 +621,7 @@ def parse(self, data: dict, mappings: dict) -> None:
# ==================================================
# Parse the data.
# ==================================================
global _warnings
for section in ("header", "data"):
for element in mappings[section]:
# get eccodes key
Expand Down Expand Up @@ -631,8 +655,8 @@ def parse(self, data: dict, mappings: dict) -> None:
NULLIFY_INVALID)
except Exception as e:
if NULLIFY_INVALID:
LOGGER.warning(f"Error raised whilst validating {element['eccodes_key']}, value set to None") # noqa
LOGGER.warning(f"data: {data}")
LOGGER.warning(f"Error raised whilst validating {element['eccodes_key']}, value set to None\ndata: {data}") # noqa
_warnings.append(f"Error raised whilst validating {element['eccodes_key']}, value set to None\ndata: {data}") # noqa
value = None
else:
# LOGGER.error(f"Error raised whilst validating {element['eccodes_key']}, raising error") # noqa
Expand All @@ -655,7 +679,7 @@ def parse(self, data: dict, mappings: dict) -> None:
value = apply_scaling(value, scale, offset)
except Exception as e:
LOGGER.error(f"Error scaling data: scale={scale}, offet={offset}, value={value}") # noqa
LOGGER.error(f"data: {data}")
LOGGER.debug(f"data: {data}")
raise e

# ==================================================
Expand All @@ -671,7 +695,7 @@ def get_datetime(self) -> datetime:
:returns: `datetime.datetime` of ISO8601 representation of the
characteristic date/time
"""

global _warnings
if None in [
self.get_element("typicalYear"),
self.get_element("typicalMonth"),
Expand Down Expand Up @@ -730,7 +754,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:

:returns: iterator
"""

global _warnings
# ======================
# validate mapping files
# ======================
Expand Down Expand Up @@ -855,7 +879,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
if not val.isascii():
if NULLIFY_INVALID:
LOGGER.error(f"csv read error, non ASCII data detected ({val}), skipping row") # noqa
LOGGER.error(row)
LOGGER.debug(row)
continue
else:
raise ValueError
Expand All @@ -881,7 +905,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
data_dict["_wsi_local"] = wsi_local
except Exception as e:
LOGGER.error("Error parsing WIGOS station identifier")
LOGGER.error(f"data:{data_dict}")
LOGGER.debug(f"data:{data_dict}")
raise ValueError(e)
# reset BUFR message to clear data
message.reset()
Expand All @@ -894,7 +918,8 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
status = {
"code": PASSED,
"message": "",
"errors": []
"errors": [],
"warnings": _warnings
}
cksum = message.md5()
# now identifier based on WSI and observation date as identifier
Expand All @@ -921,14 +946,14 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
}

except Exception as e:
LOGGER.error(e)
LOGGER.error("Error encoding BUFR, BUFR set to None")
LOGGER.error(f"data:{data_dict}")
LOGGER.error(f"Error encoding BUFR, BUFR set to None\n\t{e}")
LOGGER.debug(f"data:{data_dict}")
result["bufr4"] = None
status = {
"code": FAILED,
"message": "Error encoding row, BUFR set to None",
"errors": [f"Error: {e}\n\t\tData: {data_dict}"]
"errors": [f"Error: {e}\n\t\tData: {data_dict}"],
"warnings": _warnings
}
result["_meta"] = {
"id": None,
Expand All @@ -948,7 +973,9 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:

time_ = datetime.now(timezone.utc).isoformat()
LOGGER.info(f"{time_}|{result['_meta']}")

# now yield result back to caller
yield result
# clear warnings
_warnings = []

fh.close()
24 changes: 14 additions & 10 deletions csv2bufr/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import click

from csv2bufr import __version__, BUFRMessage, transform as transform_csv
import csv2bufr_templates as c2bt

THISDIR = os.path.dirname(os.path.realpath(__file__))
MAPPINGS = f"{THISDIR}{os.sep}resources{os.sep}mappings"
Expand Down Expand Up @@ -108,18 +109,21 @@ def transform(ctx, csv_file, mapping, output_dir, verbosity): # noqa
result = None
click.echo(f"\nCLI:\t... Transforming {csv_file.name} to BUFR ...")

# identify mapping to use
# load / identify mapping to use
if not os.path.isfile(mapping):
mappings_file = f"{MAPPINGS}{os.sep}{mapping}.json"
if not os.path.isfile(mappings_file):
raise click.ClickException(
f"Invalid stored mapping ({mappings_file})")
try:
mappings = c2bt.load_template(mapping)
if mappings is None:
raise click.ClickException(
f"Error loading mappings {mapping}")
except Exception as err:
raise click.ClickException(err)
else:
mappings_file = mapping

# load mappings
with open(mappings_file) as fh:
mappings = json.load(fh)
try:
with open(mapping) as fh:
mappings = json.load(fh)
except Exception as err:
raise click.ClickException(err)

try:
result = transform_csv(csv_file.read(), mappings)
Expand Down
Loading