Skip to content

Commit

Permalink
Added a db less dataset validation.
Browse files Browse the repository at this point in the history
  • Loading branch information
KFilippopolitis committed Jun 30, 2023
1 parent 3d5344a commit dac748a
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 50 deletions.
2 changes: 2 additions & 0 deletions mipdb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from mipdb.commands import disable_dataset
from mipdb.commands import validate_dataset_no_db
from mipdb.commands import disable_data_model
from mipdb.commands import enable_dataset
from mipdb.commands import enable_data_model
Expand All @@ -21,6 +22,7 @@
"delete_data_model",
"add_dataset",
"validate_dataset",
"validate_dataset_no_db",
"delete_dataset",
"enable_data_model",
"disable_data_model",
Expand Down
14 changes: 13 additions & 1 deletion mipdb/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from mipdb.database import MonetDB
from mipdb.reader import JsonFileReader
from mipdb.usecases import AddDataModel, Cleanup
from mipdb.usecases import AddDataModel, Cleanup, ValidateDatasetNoDatabase
from mipdb.usecases import AddPropertyToDataModel
from mipdb.usecases import AddPropertyToDataset
from mipdb.usecases import DeleteDataModel
Expand Down Expand Up @@ -200,6 +200,18 @@ def add_dataset(
print(f"CSV '{csv_path}' was successfully added.")


@entry.command()
@cl.argument("cdes_path", required=True)
@cl.argument("csv_path", required=True)
@handle_errors
def validate_dataset_no_db(cdes_path, csv_path):
print(f"Dataset '{csv_path}' is being validated...")
reader = JsonFileReader(cdes_path)
data_model_metadata = reader.read()
ValidateDatasetNoDatabase().execute(csv_path, data_model_metadata)
print(f"Dataset '{csv_path}' has a valid structure.")


@entry.command()
@cl.argument("csv_path", required=True)
@cl.option(
Expand Down
10 changes: 6 additions & 4 deletions mipdb/data_frame_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ def __init__(

# There is a need to construct a DataFrameSchema with all the constraints that the metadata is imposing
# For each column a pandera Column is created that will contain the constraints for the specific column
if not set(columns) <= set(sql_type_per_column.keys()):

raise InvalidDatasetError(
f"Columns:{set(columns) - set(sql_type_per_column.keys()) - {'row_id'} } are not present in the CDEs"
)

for column in columns:
if column not in sql_type_per_column.keys():
raise InvalidDatasetError(
f"The column: '{column}' does not exist in the metadata"
)

checks = self._get_pa_checks(
cdes_with_min_max, cdes_with_enumerations, column
Expand Down
34 changes: 34 additions & 0 deletions mipdb/dataelements.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,40 @@ def make_cdes(schema_data):
return cdes


def get_sql_type_per_column(cdes):
return {code: json.loads(cde.metadata)["sql_type"] for code, cde in cdes.items()}


def get_cdes_with_min_max(cdes, columns):
cdes_with_min_max = {}
for code, cde in cdes.items():
if code not in columns:
continue
metadata = json.loads(cde.metadata)
max_value = metadata["max"] if "max" in metadata else None
min_value = metadata["min"] if "min" in metadata else None
if code in columns and min_value or max_value:
cdes_with_min_max[code] = (min_value, max_value)
return cdes_with_min_max


def get_cdes_with_enumerations(cdes, columns):
return {
code: [
enum_code
for enum_code, enum_label in json.loads(cde.metadata)[
"enumerations"
].items()
]
for code, cde in cdes.items()
if json.loads(cde.metadata)["is_categorical"] and code in columns
}


def get_dataset_enums(cdes):
return json.loads(cdes["dataset"].metadata)["enumerations"]


def validate_dataset_present_on_cdes_with_proper_format(cdes):
dataset_cde = [cde for cde in cdes if cde.code == "dataset"]
if not dataset_cde:
Expand Down
33 changes: 0 additions & 33 deletions mipdb/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,39 +343,6 @@ def insert_values(self, values, db: Union[DataBase, Connection]):
)
db.execute(query, values)

def get_dataset_enums(self):
return json.loads(self.table["dataset"].metadata)["enumerations"]

def get_sql_type_per_column(self):
return {
code: json.loads(cde.metadata)["sql_type"]
for code, cde in self.table.items()
}

def get_cdes_with_min_max(self, columns):
cdes_with_min_max = {}
for code, cde in self.table.items():
if code not in columns:
continue
metadata = json.loads(cde.metadata)
max_value = metadata["max"] if "max" in metadata else None
min_value = metadata["min"] if "min" in metadata else None
if code in columns and min_value or max_value:
cdes_with_min_max[code] = (min_value, max_value)
return cdes_with_min_max

def get_cdes_with_enumerations(self, columns):
return {
code: [
enum_code
for enum_code, enum_label in json.loads(cde.metadata)[
"enumerations"
].items()
]
for code, cde in self.table.items()
if json.loads(cde.metadata)["is_categorical"] and code in columns
}


class TemporaryTable(Table):
def __init__(self, dataframe_sql_type_per_column, db):
Expand Down
85 changes: 75 additions & 10 deletions mipdb/usecases.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import pandas as pd

from mipdb.database import DataBase, Connection
from mipdb.database import DataBase
from mipdb.database import METADATA_SCHEMA
from mipdb.data_frame_schema import DataFrameSchema
from mipdb.exceptions import ForeignKeyError, InvalidDatasetError
Expand All @@ -17,6 +17,10 @@
make_cdes,
validate_dataset_present_on_cdes_with_proper_format,
validate_longitudinal_data_model,
get_sql_type_per_column,
get_cdes_with_min_max,
get_cdes_with_enumerations,
get_dataset_enums,
)
from mipdb.tables import (
DataModelTable,
Expand Down Expand Up @@ -226,8 +230,9 @@ def execute(
data_model_code, data_model_version, conn
)
metadata_table = MetadataTable.from_db(data_model, conn)
dataset_enumerations = metadata_table.get_dataset_enums()
sql_type_per_column = metadata_table.get_sql_type_per_column()
cdes = metadata_table.table
dataset_enumerations = get_dataset_enums(cdes)
sql_type_per_column = get_sql_type_per_column(cdes)

if copy_from_file:
imported_datasets = self.import_csv_with_volume(
Expand Down Expand Up @@ -406,16 +411,14 @@ def execute(
"The 'dataset' column is required to exist in the csv."
)
metadata_table = MetadataTable.from_db(data_model, conn)
sql_type_per_column = metadata_table.get_sql_type_per_column()
cdes_with_min_max = metadata_table.get_cdes_with_min_max(csv_columns)
cdes_with_enumerations = metadata_table.get_cdes_with_enumerations(
csv_columns
)
dataset_enumerations = metadata_table.get_dataset_enums()
cdes = metadata_table.table
sql_type_per_column = get_sql_type_per_column(cdes)
cdes_with_min_max = get_cdes_with_min_max(cdes, csv_columns)
cdes_with_enumerations = get_cdes_with_enumerations(cdes, csv_columns)
dataset_enumerations = get_dataset_enums(cdes)
if self.is_data_model_longitudinal(
data_model_code, data_model_version, conn
):
print("is_data_model_longitudinal")
are_data_valid_longitudinal(csv_path)

if copy_from_file:
Expand Down Expand Up @@ -509,6 +512,68 @@ def verify_datasets_exist_in_enumerations(self, datasets, dataset_enumerations):
)


class ValidateDatasetNoDatabase(UseCase):
"""
We separate the data validation from the importation to make sure that a csv is valid as a whole before committing it to the main table.
In the data validation we use chunking in order to reduce the memory footprint of the process.
Database constraints must NOT be used as part of the validation process since that could result in partially imported csvs.
"""

def execute(self, csv_path, data_model_metadata) -> None:

csv_columns = pd.read_csv(csv_path, nrows=0).columns.tolist()
if DATASET_COLUMN_NAME not in csv_columns:
raise InvalidDatasetError(
"The 'dataset' column is required to exist in the csv."
)
cdes = make_cdes(copy.deepcopy(data_model_metadata))
cdes = {cde.code: cde for cde in cdes}
sql_type_per_column = get_sql_type_per_column(cdes)
cdes_with_min_max = get_cdes_with_min_max(cdes, csv_columns)
cdes_with_enumerations = get_cdes_with_enumerations(cdes, csv_columns)
dataset_enumerations = get_dataset_enums(cdes)
if "longitudinal" in data_model_metadata:
are_data_valid_longitudinal(csv_path)
validated_datasets = self.validate_csv(
csv_path,
sql_type_per_column,
cdes_with_min_max,
cdes_with_enumerations,
)
self.verify_datasets_exist_in_enumerations(
datasets=validated_datasets,
dataset_enumerations=dataset_enumerations,
)

def is_data_model_longitudinal(self, data_model_metadata):
return

def validate_csv(
self, csv_path, sql_type_per_column, cdes_with_min_max, cdes_with_enumerations
):
imported_datasets = []

csv_columns = pd.read_csv(csv_path, nrows=0).columns.tolist()
dataframe_schema = DataFrameSchema(
sql_type_per_column, cdes_with_min_max, cdes_with_enumerations, csv_columns
)
with CSVDataFrameReader(csv_path).get_reader() as reader:
for dataset_data in reader:
dataframe = DataFrame(dataset_data)
dataframe_schema.validate_dataframe(dataframe.data)
imported_datasets = set(imported_datasets) | set(dataframe.datasets)
return imported_datasets

def verify_datasets_exist_in_enumerations(self, datasets, dataset_enumerations):
non_existing_datasets = [
dataset for dataset in datasets if dataset not in dataset_enumerations
]
if non_existing_datasets:
raise InvalidDatasetError(
f"The values:'{non_existing_datasets}' are not present in the enumerations of the CDE 'dataset'."
)


class DeleteDataset(UseCase):
def __init__(self, db: DataBase) -> None:
self.db = db
Expand Down
76 changes: 76 additions & 0 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from mipdb import list_data_models
from mipdb import list_datasets
from mipdb import validate_dataset
from mipdb import validate_dataset_no_db
from mipdb.exceptions import ExitCode
from tests.conftest import (
DATASET_FILE,
Expand Down Expand Up @@ -333,6 +334,81 @@ def test_invalid_dataset_error_cases(data_model, dataset, exception_message, db)
)


dataset_files = [
(
"data_model",
"dataset_exceeds_max.csv",
"""An error occurred while validating the csv on column: 'var3'
index failure_case
0 0 100.0
""",
),
(
"data_model",
"dataset_exceeds_min.csv",
"""An error occurred while validating the csv on column: 'var3'
index failure_case
0 0 0.0""",
),
(
"data_model",
"invalid_enum.csv",
"""An error occurred while validating the csv on column: 'var2'
index failure_case
0 0 l3""",
),
(
"data_model",
"invalid_type1.csv",
"""An error occurred while validating the csv on column: 'var3'
index failure_case
0 1 invalid
""",
),
(
"data_model",
"missing_column_dataset.csv",
"""""",
),
(
"data_model",
"column_not_present_in_cdes.csv",
"Columns:{'non_existing_col'} are not present in the CDEs",
),
(
"data_model_longitudinal",
"dataset.csv",
"""Dataset error: Invalid csv: the following visitid and subjectid pairs are duplicated:
subjectid visitid
1 subjectid2 FL1
2 subjectid2 FL1""",
),
]


@pytest.mark.parametrize("data_model,dataset,exception_message", dataset_files)
def test_invalid_dataset_error_cases_no_db(data_model, dataset, exception_message):
runner = CliRunner()

validation_result = runner.invoke(
validate_dataset_no_db,
[
ABSOLUTE_PATH_FAIL_DATA_FOLDER
+ "/"
+ data_model
+ "_v_1_0/CDEsMetadata.json",
ABSOLUTE_PATH_FAIL_DATA_FOLDER + "/" + data_model + "_v_1_0/" + dataset,
],
)
print(validation_result.stdout)
print("\n")

assert (
validation_result.exception.__str__() == exception_message
or exception_message in validation_result.stdout
)


@pytest.mark.database
@pytest.mark.usefixtures("monetdb_container", "cleanup_db")
def test_validate_dataset(db):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_data_frame_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def test_validate_dataframe(
"dataset": ["dataset1", "dataset1"],
}
),
"The column: 'var5' does not exist in the metadata",
"Columns:{'var5'} are not present in the CDEs",
id="text with int/float enumerations(1,2.0) and 1.0 was given",
),
pytest.param(
Expand Down
4 changes: 3 additions & 1 deletion tests/test_usecases.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,9 @@ def test_check_duplicate_pairs_success():


def test_check_duplicate_pairs_fail():
df = pd.DataFrame({"visitid": [1, 2, 3, 3, 3, 4], "subjectid": [10, 20, 20, 30, 30, 40]})
df = pd.DataFrame(
{"visitid": [1, 2, 3, 3, 3, 4], "subjectid": [10, 20, 20, 30, 30, 40]}
)
expected_output = "Invalid csv: the following visitid and subjectid pairs are duplicated:\n visitid subjectid\n3 3 30\n4 3 30"

with pytest.raises(InvalidDatasetError, match=expected_output):
Expand Down

0 comments on commit dac748a

Please sign in to comment.