Skip to content

Commit

Permalink
Added a validate folder command. (#37)
Browse files Browse the repository at this point in the history
It will validate a folder without a need to setup a database.
  • Loading branch information
KFilippopolitis authored Jul 5, 2023
1 parent 3d5344a commit faa2443
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 52 deletions.
28 changes: 27 additions & 1 deletion mipdb/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@

from mipdb.database import MonetDB
from mipdb.reader import JsonFileReader
from mipdb.usecases import AddDataModel, Cleanup
from mipdb.usecases import (
AddDataModel,
Cleanup,
ValidateDatasetNoDatabase,
ValidateDataModel,
)
from mipdb.usecases import AddPropertyToDataModel
from mipdb.usecases import AddPropertyToDataset
from mipdb.usecases import DeleteDataModel
Expand Down Expand Up @@ -147,6 +152,27 @@ def load_folder(file, copy_from_file, ip, port, username, password, db_name):
print(f"CSV '{csv_path}' was successfully added.")


@entry.command()
@cl.argument("file", required=True)
@handle_errors
def validate_folder(file):
for subdir, dirs, files in os.walk(file):
if dirs:
continue
print(f"Data model '{subdir}' is being validated...")
metadata_path = os.path.join(subdir, "CDEsMetadata.json")
reader = JsonFileReader(metadata_path)
data_model_metadata = reader.read()
code = data_model_metadata["code"]
ValidateDataModel().execute(data_model_metadata)
print(f"Data model '{code}' was successfully validated.")

for csv_path in glob.glob(subdir + "/*.csv"):
print(f"CSV '{csv_path}' is being validated...")
ValidateDatasetNoDatabase().execute(csv_path, data_model_metadata)
print(f"CSV '{csv_path}' was successfully validated.")


@entry.command()
@db_configs_options
@handle_errors
Expand Down
10 changes: 5 additions & 5 deletions mipdb/data_frame_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ def __init__(

# There is a need to construct a DataFrameSchema with all the constraints that the metadata is imposing
# For each column a pandera Column is created that will contain the constraints for the specific column
for column in columns:
if column not in sql_type_per_column.keys():
raise InvalidDatasetError(
f"The column: '{column}' does not exist in the metadata"
)
if not set(columns) <= set(sql_type_per_column.keys()):
raise InvalidDatasetError(
f"Columns:{set(columns) - set(sql_type_per_column.keys()) - {'row_id'} } are not present in the CDEs"
)

for column in columns:
checks = self._get_pa_checks(
cdes_with_min_max, cdes_with_enumerations, column
)
Expand Down
34 changes: 34 additions & 0 deletions mipdb/dataelements.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,40 @@ def make_cdes(schema_data):
return cdes


def get_sql_type_per_column(cdes):
return {code: json.loads(cde.metadata)["sql_type"] for code, cde in cdes.items()}


def get_cdes_with_min_max(cdes, columns):
cdes_with_min_max = {}
for code, cde in cdes.items():
if code not in columns:
continue
metadata = json.loads(cde.metadata)
max_value = metadata["max"] if "max" in metadata else None
min_value = metadata["min"] if "min" in metadata else None
if code in columns and min_value or max_value:
cdes_with_min_max[code] = (min_value, max_value)
return cdes_with_min_max


def get_cdes_with_enumerations(cdes, columns):
return {
code: [
enum_code
for enum_code, enum_label in json.loads(cde.metadata)[
"enumerations"
].items()
]
for code, cde in cdes.items()
if json.loads(cde.metadata)["is_categorical"] and code in columns
}


def get_dataset_enums(cdes):
return json.loads(cdes["dataset"].metadata)["enumerations"]


def validate_dataset_present_on_cdes_with_proper_format(cdes):
dataset_cde = [cde for cde in cdes if cde.code == "dataset"]
if not dataset_cde:
Expand Down
33 changes: 0 additions & 33 deletions mipdb/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,39 +343,6 @@ def insert_values(self, values, db: Union[DataBase, Connection]):
)
db.execute(query, values)

def get_dataset_enums(self):
return json.loads(self.table["dataset"].metadata)["enumerations"]

def get_sql_type_per_column(self):
return {
code: json.loads(cde.metadata)["sql_type"]
for code, cde in self.table.items()
}

def get_cdes_with_min_max(self, columns):
cdes_with_min_max = {}
for code, cde in self.table.items():
if code not in columns:
continue
metadata = json.loads(cde.metadata)
max_value = metadata["max"] if "max" in metadata else None
min_value = metadata["min"] if "min" in metadata else None
if code in columns and min_value or max_value:
cdes_with_min_max[code] = (min_value, max_value)
return cdes_with_min_max

def get_cdes_with_enumerations(self, columns):
return {
code: [
enum_code
for enum_code, enum_label in json.loads(cde.metadata)[
"enumerations"
].items()
]
for code, cde in self.table.items()
if json.loads(cde.metadata)["is_categorical"] and code in columns
}


class TemporaryTable(Table):
def __init__(self, dataframe_sql_type_per_column, db):
Expand Down
90 changes: 80 additions & 10 deletions mipdb/usecases.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import pandas as pd

from mipdb.database import DataBase, Connection
from mipdb.database import DataBase
from mipdb.database import METADATA_SCHEMA
from mipdb.data_frame_schema import DataFrameSchema
from mipdb.exceptions import ForeignKeyError, InvalidDatasetError
Expand All @@ -17,6 +17,10 @@
make_cdes,
validate_dataset_present_on_cdes_with_proper_format,
validate_longitudinal_data_model,
get_sql_type_per_column,
get_cdes_with_min_max,
get_cdes_with_enumerations,
get_dataset_enums,
)
from mipdb.tables import (
DataModelTable,
Expand Down Expand Up @@ -153,6 +157,14 @@ def _create_metadata_table(self, schema, conn, cdes):
metadata_table.insert_values(values, conn)


class ValidateDataModel(UseCase):
def execute(self, data_model_metadata) -> None:
cdes = make_cdes(copy.deepcopy(data_model_metadata))
validate_dataset_present_on_cdes_with_proper_format(cdes)
if LONGITUDINAL in data_model_metadata and data_model_metadata[LONGITUDINAL]:
validate_longitudinal_data_model(cdes)


class DeleteDataModel(UseCase):
def __init__(self, db: DataBase) -> None:
self.db = db
Expand Down Expand Up @@ -226,8 +238,9 @@ def execute(
data_model_code, data_model_version, conn
)
metadata_table = MetadataTable.from_db(data_model, conn)
dataset_enumerations = metadata_table.get_dataset_enums()
sql_type_per_column = metadata_table.get_sql_type_per_column()
cdes = metadata_table.table
dataset_enumerations = get_dataset_enums(cdes)
sql_type_per_column = get_sql_type_per_column(cdes)

if copy_from_file:
imported_datasets = self.import_csv_with_volume(
Expand Down Expand Up @@ -406,16 +419,14 @@ def execute(
"The 'dataset' column is required to exist in the csv."
)
metadata_table = MetadataTable.from_db(data_model, conn)
sql_type_per_column = metadata_table.get_sql_type_per_column()
cdes_with_min_max = metadata_table.get_cdes_with_min_max(csv_columns)
cdes_with_enumerations = metadata_table.get_cdes_with_enumerations(
csv_columns
)
dataset_enumerations = metadata_table.get_dataset_enums()
cdes = metadata_table.table
sql_type_per_column = get_sql_type_per_column(cdes)
cdes_with_min_max = get_cdes_with_min_max(cdes, csv_columns)
cdes_with_enumerations = get_cdes_with_enumerations(cdes, csv_columns)
dataset_enumerations = get_dataset_enums(cdes)
if self.is_data_model_longitudinal(
data_model_code, data_model_version, conn
):
print("is_data_model_longitudinal")
are_data_valid_longitudinal(csv_path)

if copy_from_file:
Expand Down Expand Up @@ -509,6 +520,65 @@ def verify_datasets_exist_in_enumerations(self, datasets, dataset_enumerations):
)


class ValidateDatasetNoDatabase(UseCase):
"""
We separate the data validation from the importation to make sure that a csv is valid as a whole before committing it to the main table.
In the data validation we use chunking in order to reduce the memory footprint of the process.
Database constraints must NOT be used as part of the validation process since that could result in partially imported csvs.
"""

def execute(self, csv_path, data_model_metadata) -> None:

csv_columns = pd.read_csv(csv_path, nrows=0).columns.tolist()
if DATASET_COLUMN_NAME not in csv_columns:
raise InvalidDatasetError(
"The 'dataset' column is required to exist in the csv."
)
cdes = make_cdes(copy.deepcopy(data_model_metadata))
cdes = {cde.code: cde for cde in cdes}
sql_type_per_column = get_sql_type_per_column(cdes)
cdes_with_min_max = get_cdes_with_min_max(cdes, csv_columns)
cdes_with_enumerations = get_cdes_with_enumerations(cdes, csv_columns)
dataset_enumerations = get_dataset_enums(cdes)
if "longitudinal" in data_model_metadata:
are_data_valid_longitudinal(csv_path)
validated_datasets = self.validate_csv(
csv_path,
sql_type_per_column,
cdes_with_min_max,
cdes_with_enumerations,
)
self.verify_datasets_exist_in_enumerations(
datasets=validated_datasets,
dataset_enumerations=dataset_enumerations,
)

def validate_csv(
self, csv_path, sql_type_per_column, cdes_with_min_max, cdes_with_enumerations
):
imported_datasets = []

csv_columns = pd.read_csv(csv_path, nrows=0).columns.tolist()
dataframe_schema = DataFrameSchema(
sql_type_per_column, cdes_with_min_max, cdes_with_enumerations, csv_columns
)
with CSVDataFrameReader(csv_path).get_reader() as reader:
for dataset_data in reader:
dataframe = DataFrame(dataset_data)
dataframe_schema.validate_dataframe(dataframe.data)
imported_datasets = set(imported_datasets) | set(dataframe.datasets)
return imported_datasets

def verify_datasets_exist_in_enumerations(self, datasets, dataset_enumerations):
non_existing_datasets = [
dataset for dataset in datasets if dataset not in dataset_enumerations
]
if non_existing_datasets:
raise InvalidDatasetError(
f"The values:'{non_existing_datasets}' are not present in the enumerations of the CDE 'dataset'."
)


class DeleteDataset(UseCase):
def __init__(self, db: DataBase) -> None:
self.db = db
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "mipdb"
version = "2.1.0"
version = "2.2.0"
description = ""
authors = ["Your Name <[email protected]>"]

Expand Down
2 changes: 1 addition & 1 deletion tests/test_data_frame_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def test_validate_dataframe(
"dataset": ["dataset1", "dataset1"],
}
),
"The column: 'var5' does not exist in the metadata",
"Columns:{'var5'} are not present in the CDEs",
id="text with int/float enumerations(1,2.0) and 1.0 was given",
),
pytest.param(
Expand Down
4 changes: 3 additions & 1 deletion tests/test_usecases.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,9 @@ def test_check_duplicate_pairs_success():


def test_check_duplicate_pairs_fail():
df = pd.DataFrame({"visitid": [1, 2, 3, 3, 3, 4], "subjectid": [10, 20, 20, 30, 30, 40]})
df = pd.DataFrame(
{"visitid": [1, 2, 3, 3, 3, 4], "subjectid": [10, 20, 20, 30, 30, 40]}
)
expected_output = "Invalid csv: the following visitid and subjectid pairs are duplicated:\n visitid subjectid\n3 3 30\n4 3 30"

with pytest.raises(InvalidDatasetError, match=expected_output):
Expand Down

0 comments on commit faa2443

Please sign in to comment.