diff --git a/mipdb/commands.py b/mipdb/commands.py index ada16b3..25ce429 100644 --- a/mipdb/commands.py +++ b/mipdb/commands.py @@ -6,7 +6,12 @@ from mipdb.database import MonetDB from mipdb.reader import JsonFileReader -from mipdb.usecases import AddDataModel, Cleanup +from mipdb.usecases import ( + AddDataModel, + Cleanup, + ValidateDatasetNoDatabase, + ValidateDataModel, +) from mipdb.usecases import AddPropertyToDataModel from mipdb.usecases import AddPropertyToDataset from mipdb.usecases import DeleteDataModel @@ -147,6 +152,27 @@ def load_folder(file, copy_from_file, ip, port, username, password, db_name): print(f"CSV '{csv_path}' was successfully added.") +@entry.command() +@cl.argument("file", required=True) +@handle_errors +def validate_folder(file): + for subdir, dirs, files in os.walk(file): + if dirs: + continue + print(f"Data model '{subdir}' is being validated...") + metadata_path = os.path.join(subdir, "CDEsMetadata.json") + reader = JsonFileReader(metadata_path) + data_model_metadata = reader.read() + code = data_model_metadata["code"] + ValidateDataModel().execute(data_model_metadata) + print(f"Data model '{code}' was successfully validated.") + + for csv_path in glob.glob(subdir + "/*.csv"): + print(f"CSV '{csv_path}' is being validated...") + ValidateDatasetNoDatabase().execute(csv_path, data_model_metadata) + print(f"CSV '{csv_path}' was successfully validated.") + + @entry.command() @db_configs_options @handle_errors diff --git a/mipdb/data_frame_schema.py b/mipdb/data_frame_schema.py index 8684894..fc037e0 100644 --- a/mipdb/data_frame_schema.py +++ b/mipdb/data_frame_schema.py @@ -15,12 +15,12 @@ def __init__( # There is a need to construct a DataFrameSchema with all the constraints that the metadata is imposing # For each column a pandera Column is created that will contain the constraints for the specific column - for column in columns: - if column not in sql_type_per_column.keys(): - raise InvalidDatasetError( - f"The column: '{column}' does not exist in the metadata" - ) + if not set(columns) <= set(sql_type_per_column.keys()): + raise InvalidDatasetError( + f"Columns:{set(columns) - set(sql_type_per_column.keys()) - {'row_id'} } are not present in the CDEs" + ) + for column in columns: checks = self._get_pa_checks( cdes_with_min_max, cdes_with_enumerations, column ) diff --git a/mipdb/dataelements.py b/mipdb/dataelements.py index d117a7d..494323f 100644 --- a/mipdb/dataelements.py +++ b/mipdb/dataelements.py @@ -44,6 +44,40 @@ def make_cdes(schema_data): return cdes +def get_sql_type_per_column(cdes): + return {code: json.loads(cde.metadata)["sql_type"] for code, cde in cdes.items()} + + +def get_cdes_with_min_max(cdes, columns): + cdes_with_min_max = {} + for code, cde in cdes.items(): + if code not in columns: + continue + metadata = json.loads(cde.metadata) + max_value = metadata["max"] if "max" in metadata else None + min_value = metadata["min"] if "min" in metadata else None + if code in columns and min_value or max_value: + cdes_with_min_max[code] = (min_value, max_value) + return cdes_with_min_max + + +def get_cdes_with_enumerations(cdes, columns): + return { + code: [ + enum_code + for enum_code, enum_label in json.loads(cde.metadata)[ + "enumerations" + ].items() + ] + for code, cde in cdes.items() + if json.loads(cde.metadata)["is_categorical"] and code in columns + } + + +def get_dataset_enums(cdes): + return json.loads(cdes["dataset"].metadata)["enumerations"] + + def validate_dataset_present_on_cdes_with_proper_format(cdes): dataset_cde = [cde for cde in cdes if cde.code == "dataset"] if not dataset_cde: diff --git a/mipdb/tables.py b/mipdb/tables.py index 5802392..1f18563 100644 --- a/mipdb/tables.py +++ b/mipdb/tables.py @@ -343,39 +343,6 @@ def insert_values(self, values, db: Union[DataBase, Connection]): ) db.execute(query, values) - def get_dataset_enums(self): - return json.loads(self.table["dataset"].metadata)["enumerations"] - - def get_sql_type_per_column(self): - return { - code: json.loads(cde.metadata)["sql_type"] - for code, cde in self.table.items() - } - - def get_cdes_with_min_max(self, columns): - cdes_with_min_max = {} - for code, cde in self.table.items(): - if code not in columns: - continue - metadata = json.loads(cde.metadata) - max_value = metadata["max"] if "max" in metadata else None - min_value = metadata["min"] if "min" in metadata else None - if code in columns and min_value or max_value: - cdes_with_min_max[code] = (min_value, max_value) - return cdes_with_min_max - - def get_cdes_with_enumerations(self, columns): - return { - code: [ - enum_code - for enum_code, enum_label in json.loads(cde.metadata)[ - "enumerations" - ].items() - ] - for code, cde in self.table.items() - if json.loads(cde.metadata)["is_categorical"] and code in columns - } - class TemporaryTable(Table): def __init__(self, dataframe_sql_type_per_column, db): diff --git a/mipdb/usecases.py b/mipdb/usecases.py index 789a1aa..8338055 100644 --- a/mipdb/usecases.py +++ b/mipdb/usecases.py @@ -5,7 +5,7 @@ import pandas as pd -from mipdb.database import DataBase, Connection +from mipdb.database import DataBase from mipdb.database import METADATA_SCHEMA from mipdb.data_frame_schema import DataFrameSchema from mipdb.exceptions import ForeignKeyError, InvalidDatasetError @@ -17,6 +17,10 @@ make_cdes, validate_dataset_present_on_cdes_with_proper_format, validate_longitudinal_data_model, + get_sql_type_per_column, + get_cdes_with_min_max, + get_cdes_with_enumerations, + get_dataset_enums, ) from mipdb.tables import ( DataModelTable, @@ -153,6 +157,14 @@ def _create_metadata_table(self, schema, conn, cdes): metadata_table.insert_values(values, conn) +class ValidateDataModel(UseCase): + def execute(self, data_model_metadata) -> None: + cdes = make_cdes(copy.deepcopy(data_model_metadata)) + validate_dataset_present_on_cdes_with_proper_format(cdes) + if LONGITUDINAL in data_model_metadata and data_model_metadata[LONGITUDINAL]: + validate_longitudinal_data_model(cdes) + + class DeleteDataModel(UseCase): def __init__(self, db: DataBase) -> None: self.db = db @@ -226,8 +238,9 @@ def execute( data_model_code, data_model_version, conn ) metadata_table = MetadataTable.from_db(data_model, conn) - dataset_enumerations = metadata_table.get_dataset_enums() - sql_type_per_column = metadata_table.get_sql_type_per_column() + cdes = metadata_table.table + dataset_enumerations = get_dataset_enums(cdes) + sql_type_per_column = get_sql_type_per_column(cdes) if copy_from_file: imported_datasets = self.import_csv_with_volume( @@ -406,16 +419,14 @@ def execute( "The 'dataset' column is required to exist in the csv." ) metadata_table = MetadataTable.from_db(data_model, conn) - sql_type_per_column = metadata_table.get_sql_type_per_column() - cdes_with_min_max = metadata_table.get_cdes_with_min_max(csv_columns) - cdes_with_enumerations = metadata_table.get_cdes_with_enumerations( - csv_columns - ) - dataset_enumerations = metadata_table.get_dataset_enums() + cdes = metadata_table.table + sql_type_per_column = get_sql_type_per_column(cdes) + cdes_with_min_max = get_cdes_with_min_max(cdes, csv_columns) + cdes_with_enumerations = get_cdes_with_enumerations(cdes, csv_columns) + dataset_enumerations = get_dataset_enums(cdes) if self.is_data_model_longitudinal( data_model_code, data_model_version, conn ): - print("is_data_model_longitudinal") are_data_valid_longitudinal(csv_path) if copy_from_file: @@ -509,6 +520,65 @@ def verify_datasets_exist_in_enumerations(self, datasets, dataset_enumerations): ) +class ValidateDatasetNoDatabase(UseCase): + """ + We separate the data validation from the importation to make sure that a csv is valid as a whole before committing it to the main table. + In the data validation we use chunking in order to reduce the memory footprint of the process. + Database constraints must NOT be used as part of the validation process since that could result in partially imported csvs. + """ + + def execute(self, csv_path, data_model_metadata) -> None: + + csv_columns = pd.read_csv(csv_path, nrows=0).columns.tolist() + if DATASET_COLUMN_NAME not in csv_columns: + raise InvalidDatasetError( + "The 'dataset' column is required to exist in the csv." + ) + cdes = make_cdes(copy.deepcopy(data_model_metadata)) + cdes = {cde.code: cde for cde in cdes} + sql_type_per_column = get_sql_type_per_column(cdes) + cdes_with_min_max = get_cdes_with_min_max(cdes, csv_columns) + cdes_with_enumerations = get_cdes_with_enumerations(cdes, csv_columns) + dataset_enumerations = get_dataset_enums(cdes) + if "longitudinal" in data_model_metadata: + are_data_valid_longitudinal(csv_path) + validated_datasets = self.validate_csv( + csv_path, + sql_type_per_column, + cdes_with_min_max, + cdes_with_enumerations, + ) + self.verify_datasets_exist_in_enumerations( + datasets=validated_datasets, + dataset_enumerations=dataset_enumerations, + ) + + def validate_csv( + self, csv_path, sql_type_per_column, cdes_with_min_max, cdes_with_enumerations + ): + imported_datasets = [] + + csv_columns = pd.read_csv(csv_path, nrows=0).columns.tolist() + dataframe_schema = DataFrameSchema( + sql_type_per_column, cdes_with_min_max, cdes_with_enumerations, csv_columns + ) + with CSVDataFrameReader(csv_path).get_reader() as reader: + for dataset_data in reader: + dataframe = DataFrame(dataset_data) + dataframe_schema.validate_dataframe(dataframe.data) + imported_datasets = set(imported_datasets) | set(dataframe.datasets) + return imported_datasets + + def verify_datasets_exist_in_enumerations(self, datasets, dataset_enumerations): + non_existing_datasets = [ + dataset for dataset in datasets if dataset not in dataset_enumerations + ] + if non_existing_datasets: + raise InvalidDatasetError( + f"The values:'{non_existing_datasets}' are not present in the enumerations of the CDE 'dataset'." + ) + + class DeleteDataset(UseCase): def __init__(self, db: DataBase) -> None: self.db = db diff --git a/pyproject.toml b/pyproject.toml index b5dd88e..5696dd8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "mipdb" -version = "2.1.0" +version = "2.2.0" description = "" authors = ["Your Name "] diff --git a/tests/test_data_frame_schema.py b/tests/test_data_frame_schema.py index ac6905c..c99598e 100644 --- a/tests/test_data_frame_schema.py +++ b/tests/test_data_frame_schema.py @@ -106,7 +106,7 @@ def test_validate_dataframe( "dataset": ["dataset1", "dataset1"], } ), - "The column: 'var5' does not exist in the metadata", + "Columns:{'var5'} are not present in the CDEs", id="text with int/float enumerations(1,2.0) and 1.0 was given", ), pytest.param( diff --git a/tests/test_usecases.py b/tests/test_usecases.py index 033cd57..2d90828 100644 --- a/tests/test_usecases.py +++ b/tests/test_usecases.py @@ -436,7 +436,9 @@ def test_check_duplicate_pairs_success(): def test_check_duplicate_pairs_fail(): - df = pd.DataFrame({"visitid": [1, 2, 3, 3, 3, 4], "subjectid": [10, 20, 20, 30, 30, 40]}) + df = pd.DataFrame( + {"visitid": [1, 2, 3, 3, 3, 4], "subjectid": [10, 20, 20, 30, 30, 40]} + ) expected_output = "Invalid csv: the following visitid and subjectid pairs are duplicated:\n visitid subjectid\n3 3 30\n4 3 30" with pytest.raises(InvalidDatasetError, match=expected_output):