From dac748a0bb57f869b46bb8df8fb50a1c0ed06931 Mon Sep 17 00:00:00 2001 From: kfilippopolitis Date: Fri, 30 Jun 2023 19:30:29 +0300 Subject: [PATCH] Added a db less dataset validation. --- mipdb/__init__.py | 2 + mipdb/commands.py | 14 +++++- mipdb/data_frame_schema.py | 10 ++-- mipdb/dataelements.py | 34 +++++++++++++ mipdb/tables.py | 33 ------------- mipdb/usecases.py | 85 +++++++++++++++++++++++++++++---- tests/test_commands.py | 76 +++++++++++++++++++++++++++++ tests/test_data_frame_schema.py | 2 +- tests/test_usecases.py | 4 +- 9 files changed, 210 insertions(+), 50 deletions(-) diff --git a/mipdb/__init__.py b/mipdb/__init__.py index 75d7932..fddcaa4 100644 --- a/mipdb/__init__.py +++ b/mipdb/__init__.py @@ -1,4 +1,5 @@ from mipdb.commands import disable_dataset +from mipdb.commands import validate_dataset_no_db from mipdb.commands import disable_data_model from mipdb.commands import enable_dataset from mipdb.commands import enable_data_model @@ -21,6 +22,7 @@ "delete_data_model", "add_dataset", "validate_dataset", + "validate_dataset_no_db", "delete_dataset", "enable_data_model", "disable_data_model", diff --git a/mipdb/commands.py b/mipdb/commands.py index ada16b3..16ef42b 100644 --- a/mipdb/commands.py +++ b/mipdb/commands.py @@ -6,7 +6,7 @@ from mipdb.database import MonetDB from mipdb.reader import JsonFileReader -from mipdb.usecases import AddDataModel, Cleanup +from mipdb.usecases import AddDataModel, Cleanup, ValidateDatasetNoDatabase from mipdb.usecases import AddPropertyToDataModel from mipdb.usecases import AddPropertyToDataset from mipdb.usecases import DeleteDataModel @@ -200,6 +200,18 @@ def add_dataset( print(f"CSV '{csv_path}' was successfully added.") +@entry.command() +@cl.argument("cdes_path", required=True) +@cl.argument("csv_path", required=True) +@handle_errors +def validate_dataset_no_db(cdes_path, csv_path): + print(f"Dataset '{csv_path}' is being validated...") + reader = JsonFileReader(cdes_path) + data_model_metadata = reader.read() + ValidateDatasetNoDatabase().execute(csv_path, data_model_metadata) + print(f"Dataset '{csv_path}' has a valid structure.") + + @entry.command() @cl.argument("csv_path", required=True) @cl.option( diff --git a/mipdb/data_frame_schema.py b/mipdb/data_frame_schema.py index 8684894..903264d 100644 --- a/mipdb/data_frame_schema.py +++ b/mipdb/data_frame_schema.py @@ -15,11 +15,13 @@ def __init__( # There is a need to construct a DataFrameSchema with all the constraints that the metadata is imposing # For each column a pandera Column is created that will contain the constraints for the specific column + if not set(columns) <= set(sql_type_per_column.keys()): + + raise InvalidDatasetError( + f"Columns:{set(columns) - set(sql_type_per_column.keys()) - {'row_id'} } are not present in the CDEs" + ) + for column in columns: - if column not in sql_type_per_column.keys(): - raise InvalidDatasetError( - f"The column: '{column}' does not exist in the metadata" - ) checks = self._get_pa_checks( cdes_with_min_max, cdes_with_enumerations, column diff --git a/mipdb/dataelements.py b/mipdb/dataelements.py index d117a7d..494323f 100644 --- a/mipdb/dataelements.py +++ b/mipdb/dataelements.py @@ -44,6 +44,40 @@ def make_cdes(schema_data): return cdes +def get_sql_type_per_column(cdes): + return {code: json.loads(cde.metadata)["sql_type"] for code, cde in cdes.items()} + + +def get_cdes_with_min_max(cdes, columns): + cdes_with_min_max = {} + for code, cde in cdes.items(): + if code not in columns: + continue + metadata = json.loads(cde.metadata) + max_value = metadata["max"] if "max" in metadata else None + min_value = metadata["min"] if "min" in metadata else None + if code in columns and min_value or max_value: + cdes_with_min_max[code] = (min_value, max_value) + return cdes_with_min_max + + +def get_cdes_with_enumerations(cdes, columns): + return { + code: [ + enum_code + for enum_code, enum_label in json.loads(cde.metadata)[ + "enumerations" + ].items() + ] + for code, cde in cdes.items() + if json.loads(cde.metadata)["is_categorical"] and code in columns + } + + +def get_dataset_enums(cdes): + return json.loads(cdes["dataset"].metadata)["enumerations"] + + def validate_dataset_present_on_cdes_with_proper_format(cdes): dataset_cde = [cde for cde in cdes if cde.code == "dataset"] if not dataset_cde: diff --git a/mipdb/tables.py b/mipdb/tables.py index 5802392..1f18563 100644 --- a/mipdb/tables.py +++ b/mipdb/tables.py @@ -343,39 +343,6 @@ def insert_values(self, values, db: Union[DataBase, Connection]): ) db.execute(query, values) - def get_dataset_enums(self): - return json.loads(self.table["dataset"].metadata)["enumerations"] - - def get_sql_type_per_column(self): - return { - code: json.loads(cde.metadata)["sql_type"] - for code, cde in self.table.items() - } - - def get_cdes_with_min_max(self, columns): - cdes_with_min_max = {} - for code, cde in self.table.items(): - if code not in columns: - continue - metadata = json.loads(cde.metadata) - max_value = metadata["max"] if "max" in metadata else None - min_value = metadata["min"] if "min" in metadata else None - if code in columns and min_value or max_value: - cdes_with_min_max[code] = (min_value, max_value) - return cdes_with_min_max - - def get_cdes_with_enumerations(self, columns): - return { - code: [ - enum_code - for enum_code, enum_label in json.loads(cde.metadata)[ - "enumerations" - ].items() - ] - for code, cde in self.table.items() - if json.loads(cde.metadata)["is_categorical"] and code in columns - } - class TemporaryTable(Table): def __init__(self, dataframe_sql_type_per_column, db): diff --git a/mipdb/usecases.py b/mipdb/usecases.py index 789a1aa..649f6ba 100644 --- a/mipdb/usecases.py +++ b/mipdb/usecases.py @@ -5,7 +5,7 @@ import pandas as pd -from mipdb.database import DataBase, Connection +from mipdb.database import DataBase from mipdb.database import METADATA_SCHEMA from mipdb.data_frame_schema import DataFrameSchema from mipdb.exceptions import ForeignKeyError, InvalidDatasetError @@ -17,6 +17,10 @@ make_cdes, validate_dataset_present_on_cdes_with_proper_format, validate_longitudinal_data_model, + get_sql_type_per_column, + get_cdes_with_min_max, + get_cdes_with_enumerations, + get_dataset_enums, ) from mipdb.tables import ( DataModelTable, @@ -226,8 +230,9 @@ def execute( data_model_code, data_model_version, conn ) metadata_table = MetadataTable.from_db(data_model, conn) - dataset_enumerations = metadata_table.get_dataset_enums() - sql_type_per_column = metadata_table.get_sql_type_per_column() + cdes = metadata_table.table + dataset_enumerations = get_dataset_enums(cdes) + sql_type_per_column = get_sql_type_per_column(cdes) if copy_from_file: imported_datasets = self.import_csv_with_volume( @@ -406,16 +411,14 @@ def execute( "The 'dataset' column is required to exist in the csv." ) metadata_table = MetadataTable.from_db(data_model, conn) - sql_type_per_column = metadata_table.get_sql_type_per_column() - cdes_with_min_max = metadata_table.get_cdes_with_min_max(csv_columns) - cdes_with_enumerations = metadata_table.get_cdes_with_enumerations( - csv_columns - ) - dataset_enumerations = metadata_table.get_dataset_enums() + cdes = metadata_table.table + sql_type_per_column = get_sql_type_per_column(cdes) + cdes_with_min_max = get_cdes_with_min_max(cdes, csv_columns) + cdes_with_enumerations = get_cdes_with_enumerations(cdes, csv_columns) + dataset_enumerations = get_dataset_enums(cdes) if self.is_data_model_longitudinal( data_model_code, data_model_version, conn ): - print("is_data_model_longitudinal") are_data_valid_longitudinal(csv_path) if copy_from_file: @@ -509,6 +512,68 @@ def verify_datasets_exist_in_enumerations(self, datasets, dataset_enumerations): ) +class ValidateDatasetNoDatabase(UseCase): + """ + We separate the data validation from the importation to make sure that a csv is valid as a whole before committing it to the main table. + In the data validation we use chunking in order to reduce the memory footprint of the process. + Database constraints must NOT be used as part of the validation process since that could result in partially imported csvs. + """ + + def execute(self, csv_path, data_model_metadata) -> None: + + csv_columns = pd.read_csv(csv_path, nrows=0).columns.tolist() + if DATASET_COLUMN_NAME not in csv_columns: + raise InvalidDatasetError( + "The 'dataset' column is required to exist in the csv." + ) + cdes = make_cdes(copy.deepcopy(data_model_metadata)) + cdes = {cde.code: cde for cde in cdes} + sql_type_per_column = get_sql_type_per_column(cdes) + cdes_with_min_max = get_cdes_with_min_max(cdes, csv_columns) + cdes_with_enumerations = get_cdes_with_enumerations(cdes, csv_columns) + dataset_enumerations = get_dataset_enums(cdes) + if "longitudinal" in data_model_metadata: + are_data_valid_longitudinal(csv_path) + validated_datasets = self.validate_csv( + csv_path, + sql_type_per_column, + cdes_with_min_max, + cdes_with_enumerations, + ) + self.verify_datasets_exist_in_enumerations( + datasets=validated_datasets, + dataset_enumerations=dataset_enumerations, + ) + + def is_data_model_longitudinal(self, data_model_metadata): + return + + def validate_csv( + self, csv_path, sql_type_per_column, cdes_with_min_max, cdes_with_enumerations + ): + imported_datasets = [] + + csv_columns = pd.read_csv(csv_path, nrows=0).columns.tolist() + dataframe_schema = DataFrameSchema( + sql_type_per_column, cdes_with_min_max, cdes_with_enumerations, csv_columns + ) + with CSVDataFrameReader(csv_path).get_reader() as reader: + for dataset_data in reader: + dataframe = DataFrame(dataset_data) + dataframe_schema.validate_dataframe(dataframe.data) + imported_datasets = set(imported_datasets) | set(dataframe.datasets) + return imported_datasets + + def verify_datasets_exist_in_enumerations(self, datasets, dataset_enumerations): + non_existing_datasets = [ + dataset for dataset in datasets if dataset not in dataset_enumerations + ] + if non_existing_datasets: + raise InvalidDatasetError( + f"The values:'{non_existing_datasets}' are not present in the enumerations of the CDE 'dataset'." + ) + + class DeleteDataset(UseCase): def __init__(self, db: DataBase) -> None: self.db = db diff --git a/tests/test_commands.py b/tests/test_commands.py index 3ade077..4c3d6b6 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -18,6 +18,7 @@ from mipdb import list_data_models from mipdb import list_datasets from mipdb import validate_dataset +from mipdb import validate_dataset_no_db from mipdb.exceptions import ExitCode from tests.conftest import ( DATASET_FILE, @@ -333,6 +334,81 @@ def test_invalid_dataset_error_cases(data_model, dataset, exception_message, db) ) +dataset_files = [ + ( + "data_model", + "dataset_exceeds_max.csv", + """An error occurred while validating the csv on column: 'var3' + index failure_case +0 0 100.0 +""", + ), + ( + "data_model", + "dataset_exceeds_min.csv", + """An error occurred while validating the csv on column: 'var3' + index failure_case +0 0 0.0""", + ), + ( + "data_model", + "invalid_enum.csv", + """An error occurred while validating the csv on column: 'var2' + index failure_case +0 0 l3""", + ), + ( + "data_model", + "invalid_type1.csv", + """An error occurred while validating the csv on column: 'var3' + index failure_case +0 1 invalid +""", + ), + ( + "data_model", + "missing_column_dataset.csv", + """""", + ), + ( + "data_model", + "column_not_present_in_cdes.csv", + "Columns:{'non_existing_col'} are not present in the CDEs", + ), + ( + "data_model_longitudinal", + "dataset.csv", + """Dataset error: Invalid csv: the following visitid and subjectid pairs are duplicated: + subjectid visitid +1 subjectid2 FL1 +2 subjectid2 FL1""", + ), +] + + +@pytest.mark.parametrize("data_model,dataset,exception_message", dataset_files) +def test_invalid_dataset_error_cases_no_db(data_model, dataset, exception_message): + runner = CliRunner() + + validation_result = runner.invoke( + validate_dataset_no_db, + [ + ABSOLUTE_PATH_FAIL_DATA_FOLDER + + "/" + + data_model + + "_v_1_0/CDEsMetadata.json", + ABSOLUTE_PATH_FAIL_DATA_FOLDER + "/" + data_model + "_v_1_0/" + dataset, + ], + ) + print(validation_result.stdout) + print("\n") + + assert ( + validation_result.exception.__str__() == exception_message + or exception_message in validation_result.stdout + ) + + @pytest.mark.database @pytest.mark.usefixtures("monetdb_container", "cleanup_db") def test_validate_dataset(db): diff --git a/tests/test_data_frame_schema.py b/tests/test_data_frame_schema.py index ac6905c..c99598e 100644 --- a/tests/test_data_frame_schema.py +++ b/tests/test_data_frame_schema.py @@ -106,7 +106,7 @@ def test_validate_dataframe( "dataset": ["dataset1", "dataset1"], } ), - "The column: 'var5' does not exist in the metadata", + "Columns:{'var5'} are not present in the CDEs", id="text with int/float enumerations(1,2.0) and 1.0 was given", ), pytest.param( diff --git a/tests/test_usecases.py b/tests/test_usecases.py index 033cd57..2d90828 100644 --- a/tests/test_usecases.py +++ b/tests/test_usecases.py @@ -436,7 +436,9 @@ def test_check_duplicate_pairs_success(): def test_check_duplicate_pairs_fail(): - df = pd.DataFrame({"visitid": [1, 2, 3, 3, 3, 4], "subjectid": [10, 20, 20, 30, 30, 40]}) + df = pd.DataFrame( + {"visitid": [1, 2, 3, 3, 3, 4], "subjectid": [10, 20, 20, 30, 30, 40]} + ) expected_output = "Invalid csv: the following visitid and subjectid pairs are duplicated:\n visitid subjectid\n3 3 30\n4 3 30" with pytest.raises(InvalidDatasetError, match=expected_output):