Skip to content

Commit

Permalink
chore: refactor mandatory fields for str and geo
Browse files Browse the repository at this point in the history
  • Loading branch information
AngRodrigues committed Jan 14, 2025
1 parent 141c39f commit 9001a08
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 35 deletions.
134 changes: 106 additions & 28 deletions map2loop/data_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,22 @@ def check_geology_fields_validity(mapdata) -> tuple[bool, str]:
if failed:
return (failed, message)

# # 3. Required Columns & are they str, and then empty or null?
required_columns = [config["unitname_column"], config["alt_unitname_column"]]
for col in required_columns:
if col not in geology_data.columns:
logger.error(f"Datatype GEOLOGY: Required column with config key: '{[k for k, v in config.items() if v == col][0]}' is missing from geology data.")
return (True, f"Datatype GEOLOGY: Required column with config key: '{[k for k, v in config.items() if v == col][0]}' is missing from geology data.")
if not geology_data[col].apply(lambda x: isinstance(x, str)).all():
config_key = [k for k, v in config.items() if v == col][0]
logger.error(f"Datatype GEOLOGY: Column '{config_key}' must contain only string values. Please check that the column contains only string values.")
return (True, f"Datatype GEOLOGY: Column '{config_key}' must contain only string values. Please check that the column contains only string values.")
if geology_data[col].isnull().any() or geology_data[col].str.strip().eq("").any():
config_key = [k for k, v in config.items() if v == col][0]
logger.error(f"Datatype GEOLOGY: NaN or blank values found in required column '{config_key}'. Please double check the column for blank values.")
return (True, f"Datatype GEOLOGY: NaN or blank values found in required column '{config_key}'. Please double check the column for blank values.")

# check required columns in geology
required_columns = ["unitname_column", "alt_unitname_column"]

validation_failed, message = validate_required_columns(
geodata=geology_data,
config=config,
required_columns=required_columns,
expected_type=str,
check_blank=True,
datatype_name="GEOLOGY"
)
if validation_failed:
return (validation_failed, message)



# # 3. Optional Columns
optional_string_columns = [
Expand Down Expand Up @@ -164,20 +166,19 @@ def check_structure_fields_validity(mapdata) -> Tuple[bool, str]:
if failed:
return (failed, message)

# 2. Check mandatory numeric columns
required_columns = [config["dipdir_column"], config["dip_column"]]
for col in required_columns:
if col not in structure_data.columns:
logger.error(f"DDatatype STRUCTURE: Required column with config key: '{[k for k, v in config.items() if v == col][0]}' is missing from structure data.")
return (True, f"Datatype STRUCTURE: Required column with config key: '{[k for k, v in config.items() if v == col][0]}' is missing from structure data.")
if not structure_data[col].apply(lambda x: isinstance(x, (int, float))).all():
config_key = [k for k, v in config.items() if v == col][0]
logger.error(f"Datatype STRUCTURE: Column '{config_key}' must contain only numeric values. Please check that the column contains only numeric values.")
return (True, f"Datatype STRUCTURE: Column '{config_key}' must contain only numeric values. Please check that the column contains only numeric values.")
if structure_data[col].isnull().any():
config_key = [k for k, v in config.items() if v == col][0]
logger.error(f"Datatype STRUCTURE: NaN or blank values found in required column '{config_key}'. Please double check the column for blank values.")
return (True, f"Datatype STRUCTURE: NaN or blank values found in required column '{config_key}'. Please double check the column for blank values.")

# check required columns in structure (numeric dips & dip dir)
required_columns = ["dipdir_column", "dip_column"]
validation_failed, message = validate_required_columns(
geodata=structure_data,
config=config,
required_columns=required_columns,
expected_type=(int, float),
check_blank=False,
datatype_name="STRUCTURE"
)
if validation_failed:
return (validation_failed, message)

if config["dip_column"] in structure_data.columns:
invalid_dip = ~((structure_data[config["dip_column"]] >= 0) & (structure_data[config["dip_column"]] <= 90))
Expand Down Expand Up @@ -701,4 +702,81 @@ def validate_id_column(
)
logger.warning(msg)

return (False, "")

from beartype.typing import List, Type, Tuple, Union

def validate_required_columns(
geodata: geopandas.GeoDataFrame,
config: dict,
required_columns: List[str],
expected_type: Union[Type, Tuple[Type, ...]],
check_blank: bool = False,
datatype_name: str = "UNKNOWN"
) -> Tuple[bool, str]:
"""
Validate required columns in a GeoDataFrame.
This function checks whether required columns exist, have the expected data types,
and contain no null or (optionally) blank values.
Args:
geodata (geopandas.GeoDataFrame): The GeoDataFrame to validate.
config (dict): Configuration dictionary mapping config keys to column names.
required_columns (List[str]): List of config keys for required columns.
expected_type (Type or Tuple[Type, ...]): Expected data type(s) for the columns.
check_blank (bool, optional): Whether to check for blank (empty) strings. Defaults to False.
datatype_name (str, optional): Name of the datatype being validated (for logging). Defaults to "UNKNOWN".
Returns:
Tuple[bool, str]: (True, error_message) if validation fails, else (False, "").
"""
for config_key in required_columns:
column_name = config.get(config_key)

if not column_name:
error_msg = (
f"Configuration key '{config_key}' is missing for datatype '{datatype_name}'."
)
logger.error(error_msg)
return (True, error_msg)

if column_name not in geodata.columns:
error_msg = (
f"Datatype {datatype_name.upper()}: Required column with config key '{config_key}' "
f"(column: '{column_name}') is missing from the data."
)
logger.error(error_msg)
return (True, error_msg)

# Check data type
if not geodata[column_name].apply(lambda x: isinstance(x, expected_type)).all():
error_msg = (
f"Datatype {datatype_name.upper()}: Column '{config_key}' (column: '{column_name}') "
f"must contain only {expected_type if isinstance(expected_type, type) else 'numeric'} values."
)
logger.error(error_msg)
return (True, error_msg)

# Check for null values
if geodata[column_name].isnull().any():
error_msg = (
f"Datatype {datatype_name.upper()}: Column '{config_key}' (column: '{column_name}') "
f"contains null values. Please ensure all values are present."
)
logger.error(error_msg)
return (True, error_msg)

# Optionally check for blank strings
if check_blank and issubclass(expected_type, str):
if geodata[column_name].str.strip().eq("").any():
error_msg = (
f"Datatype {datatype_name.upper()}: Column '{config_key}' (column: '{column_name}') "
f"contains blank (empty) values. Please ensure all values are populated."
)
logger.error(error_msg)
return (True, error_msg)

# If all required columns pass validation
logger.info(f"Datatype {datatype_name.upper()}: All required columns validated successfully.")
return (False, "")
8 changes: 4 additions & 4 deletions tests/data_checks/test_input_data_geology.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def __init__(self):
"ID": [1],
},
True,
"Datatype GEOLOGY: Required column with config key: 'alt_unitname_column' is missing from geology data.",
"Datatype GEOLOGY: Required column with config key 'alt_unitname_column' (column: 'CODE') is missing from the data.",
),
# Non-string value in required column
(
Expand All @@ -98,7 +98,7 @@ def __init__(self):
"ID": [1],
},
True,
"Datatype GEOLOGY: Column 'alt_unitname_column' must contain only string values. Please check that the column contains only string values.",
"Datatype GEOLOGY: Column 'alt_unitname_column' (column: 'CODE') must contain only <class 'str'> values.",
),
# NaN or blank value in required column
(
Expand All @@ -116,7 +116,7 @@ def __init__(self):
"ID": [1],
},
True,
"Datatype GEOLOGY: NaN or blank values found in required column 'unitname_column'. Please double check the column for blank values.",
"Datatype GEOLOGY: Column 'unitname_column' (column: 'UNITNAME') contains blank (empty) values. Please ensure all values are populated.",
),
# Duplicate ID values
(
Expand Down Expand Up @@ -179,7 +179,7 @@ def __init__(self):
"ID": [1, 1], # Duplicate ID
},
True,
"Datatype GEOLOGY: Column 'unitname_column' must contain only string values. Please check that the column contains only string values.",
"Datatype GEOLOGY: Column 'unitname_column' (column: 'UNITNAME') must contain only <class 'str'> values.",
),
],
)
Expand Down
6 changes: 3 additions & 3 deletions tests/data_checks/test_input_data_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def __init__(self):
"ID": [1, 2]
},
True,
"Datatype STRUCTURE: Required column with config key: 'dipdir_column' is missing from structure data.",
"Datatype STRUCTURE: Required column with config key 'dipdir_column' (column: 'DIPDIR') is missing from the data.",
),
# Non-numeric value in numeric column
(
Expand All @@ -84,7 +84,7 @@ def __init__(self):
"ID": [1, 2]
},
True,
"Datatype STRUCTURE: Column 'dipdir_column' must contain only numeric values. Please check that the column contains only numeric values.",
"Datatype STRUCTURE: Column 'dipdir_column' (column: 'DIPDIR') must contain only numeric values.",
),
# NaN or blank value in required column
(
Expand All @@ -100,7 +100,7 @@ def __init__(self):
"ID": [1, 2]
},
True,
"Datatype STRUCTURE: NaN or blank values found in required column 'dipdir_column'. Please double check the column for blank values.",
"Datatype STRUCTURE: Column 'dipdir_column' (column: 'DIPDIR') contains null values. Please ensure all values are present.",
),
# Duplicate ID column
(
Expand Down

0 comments on commit 9001a08

Please sign in to comment.