diff --git a/src/hipscat_import/verification/arguments.py b/src/hipscat_import/verification/arguments.py index 86c139b1..fb7bee27 100644 --- a/src/hipscat_import/verification/arguments.py +++ b/src/hipscat_import/verification/arguments.py @@ -2,50 +2,51 @@ from __future__ import annotations -from dataclasses import dataclass, field from pathlib import Path -from typing import List, Optional -from hipscat.catalog import Catalog -from hipscat.io.validation import is_valid_catalog +import attrs from upath import UPath -from hipscat_import.runtime_arguments import RuntimeArguments - - -@dataclass -class VerificationArguments(RuntimeArguments): - """Data class for holding verification arguments""" - - ## Input - input_catalog_path: str | Path | UPath | None = None - """Path to an existing catalog that will be inspected.""" - input_catalog: Optional[Catalog] = None - """In-memory representation of a catalog. If not provided, it will be loaded - from the input_catalog_path.""" - - ## Verification options - field_distribution_cols: List[str] = field(default_factory=list) - """List of fields to get the overall distribution for. e.g. ["ra", "dec"]. - Should be valid columns in the parquet files.""" - - def __post_init__(self): - self._check_arguments() - - def _check_arguments(self): - super()._check_arguments() - if not self.input_catalog_path and not self.input_catalog: - raise ValueError("input catalog is required (either input_catalog_path or input_catalog)") - if not self.input_catalog: - if not is_valid_catalog(self.input_catalog_path): - raise ValueError("input_catalog_path not a valid catalog") - self.input_catalog = Catalog.read_from_hipscat(catalog_path=self.input_catalog_path) - if not self.input_catalog_path: - self.input_catalog_path = self.input_catalog.catalog_path - +# from hipscat_import.runtime_arguments import RuntimeArguments + + +def _dir_exists(instance: VerificationArguments, attribute: attrs.Attribute, value: UPath): + """This function will be used as a validator for attributes of VerificationArguments.""" + if not value.is_dir(): + raise ValueError(f"{attribute.name} must be an existing directory") + + +def _path_exists(instance: VerificationArguments, attribute: attrs.Attribute, value: UPath): + """This function will be used as a validator for attributes of VerificationArguments.""" + if not value.exists(): + raise ValueError(f"{attribute.name} must be an existing file or directory") + + +@attrs.define(kw_only=True) +class VerificationArguments: + """Container for verification arguments.""" + + input_catalog_path: str | Path | UPath = attrs.field(converter=UPath, validator=_dir_exists) + """Path to an existing catalog that will be inspected. This must be a directory + containing the Parquet dataset and metadata sidecars.""" + output_path: str | Path | UPath = attrs.field(converter=UPath) + """Base path where output files should be written.""" + output_report_filename: str = attrs.field(factory=lambda: "verifier_results.csv") + """Filename for the verification report that will be generated.""" + output_distributions_filename: str = attrs.field(factory=lambda: "field_distributions.csv") + """Filename for the field distributions that will be calculated.""" + truth_total_rows: int | None = attrs.field(default=None) + """Total number of rows expected in this catalog.""" + truth_schema: str | Path | UPath | None = attrs.field( + default=None, + converter=attrs.converters.optional(UPath), + validator=attrs.validators.optional(_path_exists), + ) + """Path to a Parquet file or dataset containing the expected schema. + If you provided the 'use_schema_file' argument when importing the catalog, use the same value here. + If not provided, the catalog's _common_metadata file will be used as the source of truth. + """ + + # [FIXME] Connect this with RuntimeArguments.provenance_info. Even then, does this ever get written to file? def additional_runtime_provenance_info(self) -> dict: - return { - "pipeline": "verification pipeline", - "input_catalog_path": self.input_catalog_path, - "field_distribution_cols": self.field_distribution_cols, - } + return {"pipeline": "verification pipeline", **{k: str(v) for k, v in vars(self).items()}} diff --git a/src/hipscat_import/verification/run_verification.py b/src/hipscat_import/verification/run_verification.py index 2b7d5954..e8e33ea8 100644 --- a/src/hipscat_import/verification/run_verification.py +++ b/src/hipscat_import/verification/run_verification.py @@ -1,14 +1,539 @@ -"""Run pass/fail checks and generate verification report of existing hipscat table.""" +"""Run pass/fail tests and generate verification report of existing hipscat table.""" + +import collections +import datetime +from pathlib import Path + +import attrs +import hipscat.io.validation +import pandas as pd +import pyarrow.dataset from hipscat_import.verification.arguments import VerificationArguments -def run(args): - """Run verification pipeline.""" +def run(args: VerificationArguments, write_mode: str = "a"): + """Create a `Verifier` using `args`, run all tests, and write reports. + + Parameters + ---------- + args : VerificationArguments + Arguments to construct the Verifier. + write_mode : str, optional + Mode to be used when writing output files. + + Returns + ------- + Verifier + The `Verifier` instance used to perform the tests. The `results_df` and + `distributions_df` properties contain the same information as written reports. + + Raises + ------ + TypeError + If `args` is not provided or is not an instance of `VerificationArguments`. + """ if not args: raise TypeError("args is required and should be type VerificationArguments") if not isinstance(args, VerificationArguments): raise TypeError("args must be type VerificationArguments") - # implement everything else. - raise NotImplementedError("Verification not yet implemented.") + verifier = Verifier.from_args(args) + verifier.run(write_mode=write_mode) + + return verifier + + +Result = collections.namedtuple( + "Result", ["passed", "test", "target", "description", "affected_files", "datetime"] +) +"""Verification test result.""" + + +def now(): + """Return the current time as a string.""" + return datetime.datetime.now(datetime.timezone.utc).strftime("%Y/%m/%d %H:%M:%S %Z") + + +@attrs.define +class Verifier: + """Class for verification tests. Instantiate using the `from_args` method.""" + + args: VerificationArguments = attrs.field() + """Arguments to use during verification.""" + files_ds: pyarrow.dataset.Dataset = attrs.field() + """Pyarrow dataset, loaded from the actual files on disk.""" + metadata_ds: pyarrow.dataset.Dataset = attrs.field() + """Pyarrow dataset, loaded from the _metadata file.""" + common_ds: pyarrow.dataset.Dataset = attrs.field() + """Pyarrow dataset, loaded from the _common_metadata file.""" + truth_schema: pyarrow.Schema = attrs.field() + """Pyarrow schema to be used as truth. This will be loaded from args.truth_schema + if provided, and then hipscat columns and metadata will be added if not already present. + If args.truth_schema not provided, the catalog's _common_metadata file will be used.""" + truth_src: str = attrs.field() + """'truth_schema' if args.truth_schema was provided, else '_common_metadata'.""" + results: list[Result] = attrs.field(factory=list) + """List of results, one for each test that has been done.""" + _distributions_df: pd.DataFrame | None = attrs.field(default=None) + + @classmethod + def from_args(cls, args: VerificationArguments) -> "Verifier": + """Create a `Verifier` instance from the provided arguments. + + This method initializes the `Verifier` by setting up the necessary datasets + and schemas based on the input arguments. + + Parameters + ---------- + args : VerificationArguments + Arguments for the Verifier. + + Returns + ------- + Verifier : An instance of the `Verifier` class. + """ + # make sure the output directory exists + args.output_path.mkdir(exist_ok=True, parents=True) + + # load a dataset from the actual files on disk + files_ds = pyarrow.dataset.dataset( + args.input_catalog_path, + ignore_prefixes=[ + ".", + "_", + "catalog_info.json", + "partition_info.csv", + "point_map.fits", + "provenance_info.json", + ], + ) + + # load a dataset from the _metadata file + metadata_ds = pyarrow.dataset.parquet_dataset(f"{args.input_catalog_path}/_metadata") + + # load a dataset from the _common_metadata file + common_ds = pyarrow.dataset.parquet_dataset(f"{args.input_catalog_path}/_common_metadata") + + # load the input schema if provided, else use the _common_metadata schema + if args.truth_schema is not None: + truth_schema = pyarrow.dataset.parquet_dataset(args.truth_schema).schema + truth_src = "truth_schema" + else: + truth_schema = common_ds.schema + truth_src = "_common_metadata" + + return cls( + args=args, + files_ds=files_ds, + metadata_ds=metadata_ds, + common_ds=common_ds, + truth_schema=truth_schema, + truth_src=truth_src, + ) + + def run(self, write_mode: str = "a") -> None: + """Run all tests and write reports.""" + self.test_is_valid_catalog() + self.test_file_sets() + self.test_num_rows() + self.test_rowgroup_stats(write_mode=write_mode) + self.test_schemas() + + self.write_results(write_mode=write_mode) + + @property + def results_df(self) -> pd.DataFrame: + """Verifier results as a dataframe.""" + return pd.DataFrame(self.results) + + def truth_schema_plus_common_metadata(self) -> pyarrow.Schema: + """Copy of `truth_schema` with hipscat fields and metadata added from `common_ds.schema`.""" + hipscat_cols = ["Norder", "Dir", "Npix", "_hipscat_index"] + new_fields = [ + self.common_ds.schema.field(fld) for fld in hipscat_cols if fld not in self.truth_schema.names + ] + + # use pandas metadata from common_ds but keep all other metadata from truth_schema + metadata = self.truth_schema.metadata or {} + metadata[b"pandas"] = self.common_ds.schema.metadata[b"pandas"] + + return pyarrow.schema(list(self.truth_schema) + new_fields).with_metadata(metadata) + + def test_file_sets(self) -> bool: + """Test that files in _metadata match the parquet files on disk. Add one `Result` to `results`. + + This is a simple test that can be especially useful to run after copying or moving + the catalog to a different local or cloud-based destination. + + Returns + ------- + bool: True if the file sets match, else False. + """ + test = "file sets" + description = "Test that files in _metadata match files on disk." + test_info = dict(test=test, description=description) + print(f"\nStarting: {description}") + + targets = "_metadata vs files on disk" + base_dir = str(self.args.input_catalog_path) + files_ds_files = [f.removeprefix(base_dir).strip("/") for f in self.files_ds.files] + metadata_ds_files = [f.removeprefix(base_dir).strip("/") for f in self.metadata_ds.files] + failed_files = list(set(files_ds_files).symmetric_difference(metadata_ds_files)) + passed = len(failed_files) == 0 + self._append_result(passed=passed, target=targets, affected_files=failed_files, **test_info) + + print(f"Result: {'PASSED' if passed else 'FAILED'}") + return passed + + def test_is_valid_catalog(self) -> bool: + """Test if the provided catalog is a valid HiPSCat catalog. Add one `Result` to `results`. + + Returns + ------- + bool: True if the catalog is valid, else False. + """ + test = "is valid catalog" + target = self.args.input_catalog_path + # [FIXME] How to get the hipscat version? + description = "Test that this is a valid HiPSCat catalog using hipscat version ." + print(f"\nStarting: {description}") + + passed = hipscat.io.validation.is_valid_catalog(target, strict=True) + self._append_result(test=test, description=description, passed=passed, target=target.name) + print(f"Result: {'PASSED' if passed else 'FAILED'}") + return passed + + def test_num_rows(self) -> bool: + """Test the number of rows in the dataset. Add two `Results` to `results`. + + File footers are compared with _metadata and the user-supplied truth (if provided). + + Returns + ------- + bool: True if all checks pass, else False. + """ + test = "num rows" + description = "Test that number of rows are equal." + test_info = dict(test=test, description=description) + print(f"\nStarting: {description}") + + # get the number of rows in each file, indexed by file path. we treat this as truth. + files_df = self._load_nrows(self.files_ds, explicit_count=True) + + # check _metadata + targets = "_metadata vs file footers" + print(f"\t{targets}") + metadata_df = self._load_nrows(self.metadata_ds) + row_diff = files_df - metadata_df + failed_frags = row_diff.loc[row_diff.num_rows != 0].index.to_list() + passed = len(failed_frags) == 0 + self._append_result(passed=passed, target=targets, affected_files=failed_frags, **test_info) + + # check user-supplied total + if self.args.truth_total_rows is not None: + targets = "user total vs file footers" + print(f"\t{targets}") + _passed = self.args.truth_total_rows == files_df.num_rows.sum() + self._append_result(passed=_passed, target=targets, **test_info) + else: + _passed = True # this test did not fail. this is only needed for the return value. + + all_passed = all([passed, _passed]) + print(f"Result: {'PASSED' if all_passed else 'FAILED'}") + return all_passed + + def _load_nrows(self, dataset: pyarrow.dataset.Dataset, explicit_count: bool = False) -> pd.DataFrame: + """Load the number of rows in each file in the dataset. + + Parameters + ---------- + dataset : pyarrow.dataset.Dataset + The dataset from which to load the number of rows. + explicit_count : bool + If True, explicitly count the rows in each fragment. + + Returns + ------- + pd.DataFrame: A DataFrame with the number of rows per file, indexed by file path. + """ + nrows_df = pd.DataFrame( + columns=["num_rows", "frag_path"], + data=[ + ( + # [TODO] check cpu/ram usage to try to determine if there is a difference here + frag.count_rows() if explicit_count else frag.metadata.num_rows, + frag.path.removeprefix(str(self.args.input_catalog_path)).strip("/"), + ) + for frag in dataset.get_fragments() + ], + ) + nrows_df = nrows_df.set_index("frag_path").sort_index() + return nrows_df + + def test_rowgroup_stats(self, *, write_mode: str = "a") -> bool: + """Test that statistics were recorded for all row groups. Add a `Result` to `results`. + + If the test passes, `distributions_df` is written to file. + + Parameters + ---------- + write_mode : str + Mode to be used when writing the output file. + + Returns + ------- + bool: True if the test passes, else False. + """ + test = "rowgroup stats" + description = "Test that statstistics were recorded for all row groups." + target = "_metadata" + test_info = dict(test=test, description=description, target=target) + print(f"\nStarting: {description}") + + common_truth_schema = self.truth_schema_plus_common_metadata() + self._distributions_df = None # start fresh + try: + assert set(self.distributions_df.index) == set(common_truth_schema.names) + except AssertionError: + passed = False + else: + passed = True + self._append_result(passed=passed, **test_info) + print(f"Result: {'PASSED' if passed else 'FAILED'}") + + if passed: + fout = self.args.output_path / self.args.output_distributions_filename + fout.parent.mkdir(exist_ok=True, parents=True) + header = False if (write_mode == "a" and fout.is_file()) else True + self.distributions_df.to_csv(fout, mode=write_mode, header=header, index=True) + print(f"Distributions written to {fout}") + + return passed + + @property + def distributions_df(self) -> pd.DataFrame: + """Distributions (min and max) for each column in the catalog. + + Returns + ------- + pd.DataFrame: A DataFrame with 'minimum' and 'maximum' indexed by column name. + + Raises + ------ + pyarrow.ArrowTypeError: If a schema mismatch is encountered while gathering statistics. + AssertionError: If the gathered statistics do not contain all expected columns. + """ + if self._distributions_df is not None: + return self._distributions_df + + print("Gathering distributions (min/max) for fields.") + common_truth_schema = self.truth_schema_plus_common_metadata() + + try: + rowgrp_stats = [ + rg.statistics for frag in self.metadata_ds.get_fragments() for rg in frag.row_groups + ] + except pyarrow.ArrowTypeError as exc: + msg = "Distributions failed due to mismatched schemas. Run 'test_schemas' to find problematic files." + raise pyarrow.ArrowTypeError(msg) from exc + + dist = pd.json_normalize(rowgrp_stats) + + # if dist doesn't contain all expected columns, fail now + msg = "Statistics not found" + assert set([c.split(".")[0] for c in dist.columns]) == set(common_truth_schema.names), msg + + min_ = dist[[f"{c}.min" for c in common_truth_schema.names]].min() + min_ = min_.rename(index={name: name.removesuffix(".min") for name in min_.index}) + + max_ = dist[[f"{c}.max" for c in common_truth_schema.names]].max() + max_ = max_.rename(index={name: name.removesuffix(".max") for name in max_.index}) + + self._distributions_df = pd.DataFrame({"minimum": min_, "maximum": max_}).rename_axis(index="field") + return self._distributions_df + + def test_schemas(self) -> bool: + """Test the equality of schemas and their metadata. Add `Result`s to `results`. + + This method performs up to four tests: + 1. Schema metadata includes a correct pandas schema. + 2. _common_metadata matches user-supplied `args.truth_schema` (schema and metadata), if provided. + 3. _metadata matches Verifier `truth_schema` (schema and metadata). + 4. File footers match Verifier `truth_schema` (schema and metadata). + + Returns + ------- + bool: True if all tests pass, else False. + """ + test, testmd = "schema", "schema metadata" + test_info = dict(test=test, description="Test that schemas are equal.") + testmd_info = dict(test=testmd, description="Test that schema metadata is equal.") + print(f"\nStarting: {test_info['description']}") + + passed_cm = self._test_schema__common_metadata(test_info, testmd_info) + passed_md = self._test_schema__metadata(test_info, testmd_info) + passed_ff = self._test_schema_file_footers(test_info, testmd_info) + + all_passed = all([passed_cm, passed_md, passed_ff]) + print(f"Result: {'PASSED' if all_passed else 'FAILED'}") + return all_passed + + def _test_schema__common_metadata(self, test_info: dict, testmd_info: dict) -> bool: + """Test _common_metadata schema and metadata against the truth schema. + + This method performs up to two tests: + 1. Schema metadata includes a correct pandas schema. + 2. _common_metadata matches user-supplied `args.truth_schema` (schema and metadata), if provided. + + Parameters + ---------- + test_info : dict + Information related to the schema test. + testmd_info : dict + Information related to the metadata test. + + Returns + ------- + bool: True if all tests pass, else False. + """ + pandas_passed = self._test_schema__common_metadata_pandas() + + if self.truth_src == "_common_metadata": + # no input schema provided => _common_metadata is being used as truth, so skip the rest + return pandas_passed + + # an input schema was provided as truth, so we need to test _common_metadata against it + targets = f"_common_metadata vs {self.truth_src}" + print(f"\t{targets}") + common_truth_schema = self.truth_schema_plus_common_metadata() + + # check schema and metadata separately because we want to report the results separately + passed = self.common_ds.schema.equals(common_truth_schema, check_metadata=False) + self._append_result(passed=passed, target=targets, **test_info) + passedmd = self.common_ds.schema.metadata == common_truth_schema.metadata + self._append_result(passed=passedmd, target=targets, **testmd_info) + + return all([pandas_passed, passed, passedmd]) + + def _test_schema__common_metadata_pandas(self) -> bool: + """Test that the pandas metadata in _common_metadata matches the actual field names + and types in `truth_schema`. + + Returns + ------- + bool: True if the pandas metadata matches the expected schema and index columns, else False. + """ + test = "schema metadata" + description = "Test that pandas metadata contains correct field names and types." + target = "b'pandas' in _common_metadata" + test_info = dict(test=test, description=description, target=target) + print(f"\t{target}") + + common_truth_schema = self.truth_schema_plus_common_metadata() + base_schema = pyarrow.schema([pyarrow.field(fld.name, fld.type) for fld in common_truth_schema]) + pandas_md = common_truth_schema.pandas_metadata + pfields = [ + pyarrow.field(pcol["name"], pyarrow.from_numpy_dtype(pcol["pandas_type"])) + for pcol in pandas_md["columns"] + ] + pandas_schema = pyarrow.schema(pfields) + + passed = base_schema.equals(pandas_schema) and (pandas_md["index_columns"] == ["_hipscat_index"]) + self._append_result(passed=passed, **test_info) + return passed + + def _test_schema__metadata(self, test_info: dict, testmd_info: dict) -> bool: + """Test _metadata schema and metadata against the truth schema. + + Parameters + ---------- + test_info : dict + Information related to the schema test. + testmd_info : dict + Information related to the metadata test. + + Returns + ------- + bool: True if both schema and metadata match the truth source, else False. + """ + targets = f"_metadata vs {self.truth_src}" + print(f"\t{targets}") + common_truth_schema = self.truth_schema_plus_common_metadata() + + # check schema and metadata separately because we want to report the results separately + passed = self.metadata_ds.schema.equals(common_truth_schema, check_metadata=False) + self._append_result(passed=passed, target=targets, **test_info) + passedmd = self.metadata_ds.schema.metadata == common_truth_schema.metadata + self._append_result(passed=passedmd, target=targets, **testmd_info) + + return all([passed, passedmd]) + + def _test_schema_file_footers(self, test_info: dict, testmd_info: dict) -> bool: + """Test the file footers schema and metadata against the truth schema. + + Parameters + ---------- + test_info : dict + Information related to the test results for schema comparison. + testmd_info : dict + Information related to the test results for metadata comparison. + + Returns + ------- + bool: True if all schema and metadata tests pass, else False. + """ + targets = f"file footers vs {self.truth_src}" + print(f"\t{targets}") + common_truth_schema = self.truth_schema_plus_common_metadata() + + affected_files, affectedmd_files = [], [] + for frag in self.files_ds.get_fragments(): + frag_path = str(Path(frag.path).relative_to(self.args.input_catalog_path)) + # check schema and metadata separately because we want to report the results separately + if not frag.physical_schema.equals(common_truth_schema, check_metadata=False): + affected_files.append(frag_path) + if not frag.physical_schema.metadata == common_truth_schema.metadata: + affectedmd_files.append(frag_path) + + passed = len(affected_files) == 0 + self._append_result(passed=passed, target=targets, affected_files=affected_files, **test_info) + passedmd = len(affectedmd_files) == 0 + self._append_result(passed=passedmd, target=targets, affected_files=affectedmd_files, **testmd_info) + + return all([passed, passedmd]) + + def _append_result( + self, + *, + test: str, + target: str, + description: str, + passed: bool, + affected_files: list[str] | None = None, + ): + """Create a `Result` and append it to `self.results`.""" + self.results.append( + Result( + datetime=now(), + passed=passed, + test=test, + target=target, + description=description, + affected_files=affected_files or [], + ) + ) + + def write_results(self, *, write_mode: str = "a") -> None: + """Write the verification results to a file. + + Parameters + ---------- + write_mode : str + Mode to be used when writing output file. + """ + fout = self.args.output_path / self.args.output_report_filename + fout.parent.mkdir(exist_ok=True, parents=True) + header = False if (write_mode == "a" and fout.is_file()) else True + self.results_df.to_csv(fout, mode=write_mode, header=header, index=False) + print(f"\nVerifier results written to {fout}") diff --git a/tests/hipscat_import/conftest.py b/tests/hipscat_import/conftest.py index 1cd8cbf2..6747b874 100644 --- a/tests/hipscat_import/conftest.py +++ b/tests/hipscat_import/conftest.py @@ -10,6 +10,8 @@ import pytest from hipscat import pixel_math +from tests.hipscat_import.verification.fixture import VerifierFixture + # pylint: disable=missing-function-docstring, redefined-outer-name @@ -300,3 +302,52 @@ def assert_parquet_file_index(file_name, expected_values): npt.assert_array_equal(values, expected_values) return assert_parquet_file_index + + +@pytest.fixture +def malformed_catalog_dirs(test_data_dir): + base_dir = test_data_dir / "malformed_catalogs" + catalog_dirs = {dr.name: dr for dr in base_dir.iterdir() if dr.is_dir()} + # valid_truth dir contains a README pointing to the valid catalog used to generate malformed ones + # resolve the path + catalog_dirs["valid_truth"] = test_data_dir / (catalog_dirs["valid_truth"] / "README").read_text() + return catalog_dirs + + +@pytest.fixture(params=["valid_truth", "wrong_files"]) +def verifier_for_file_sets(request, malformed_catalog_dirs, tmp_path): + return VerifierFixture.from_param(request.param, malformed_catalog_dirs, tmp_path) + + +@pytest.fixture(params=["valid_truth", "no_rowgroup_stats"]) +def verifier_for_is_valid_catalog(request, malformed_catalog_dirs, tmp_path): + return VerifierFixture.from_param(request.param, malformed_catalog_dirs, tmp_path) + + +@pytest.fixture(params=["valid_truth", "wrong_rows"]) +def verifier_for_num_rows(request, malformed_catalog_dirs, tmp_path): + return VerifierFixture.from_param(request.param, malformed_catalog_dirs, tmp_path) + + +@pytest.fixture(params=["valid_truth", "no_rowgroup_stats"]) +def verifier_for_rowgroup_stats(request, malformed_catalog_dirs, tmp_path): + return VerifierFixture.from_param(request.param, malformed_catalog_dirs, tmp_path) + + +@pytest.fixture(params=["valid_truth", "no_rowgroup_stats"]) +def verifier_for_runner(request, malformed_catalog_dirs, tmp_path): + return VerifierFixture.from_param(request.param, malformed_catalog_dirs, tmp_path) + + +@pytest.fixture( + params=[ + "valid_truth", + "schema", + "schema_with_md_truth", + "schema_with_cmd_truth", + "schema_with_import_truth", + "schema_with_no_truth", + ] +) +def verifier_for_schemas(request, malformed_catalog_dirs, tmp_path): + return VerifierFixture.from_param(request.param, malformed_catalog_dirs, tmp_path) diff --git a/tests/hipscat_import/data/malformed_catalogs/bad_schemas/Norder=0/Dir=0/Npix=11.extra_column.parquet b/tests/hipscat_import/data/malformed_catalogs/bad_schemas/Norder=0/Dir=0/Npix=11.extra_column.parquet new file mode 100644 index 00000000..f30ed667 Binary files /dev/null and b/tests/hipscat_import/data/malformed_catalogs/bad_schemas/Norder=0/Dir=0/Npix=11.extra_column.parquet differ diff --git a/tests/hipscat_import/data/malformed_catalogs/bad_schemas/Norder=0/Dir=0/Npix=11.missing_column.parquet b/tests/hipscat_import/data/malformed_catalogs/bad_schemas/Norder=0/Dir=0/Npix=11.missing_column.parquet new file mode 100644 index 00000000..d793a2cd Binary files /dev/null and b/tests/hipscat_import/data/malformed_catalogs/bad_schemas/Norder=0/Dir=0/Npix=11.missing_column.parquet differ diff --git a/tests/hipscat_import/data/malformed_catalogs/bad_schemas/Norder=0/Dir=0/Npix=11.no_metadata.parquet b/tests/hipscat_import/data/malformed_catalogs/bad_schemas/Norder=0/Dir=0/Npix=11.no_metadata.parquet new file mode 100644 index 00000000..08b1a375 Binary files /dev/null and b/tests/hipscat_import/data/malformed_catalogs/bad_schemas/Norder=0/Dir=0/Npix=11.no_metadata.parquet differ diff --git a/tests/hipscat_import/data/malformed_catalogs/bad_schemas/Norder=0/Dir=0/Npix=11.parquet b/tests/hipscat_import/data/malformed_catalogs/bad_schemas/Norder=0/Dir=0/Npix=11.parquet new file mode 100644 index 00000000..e0cb8d94 Binary files /dev/null and b/tests/hipscat_import/data/malformed_catalogs/bad_schemas/Norder=0/Dir=0/Npix=11.parquet differ diff --git a/tests/hipscat_import/data/malformed_catalogs/bad_schemas/Norder=0/Dir=0/Npix=11.wrong_dtypes.parquet b/tests/hipscat_import/data/malformed_catalogs/bad_schemas/Norder=0/Dir=0/Npix=11.wrong_dtypes.parquet new file mode 100644 index 00000000..2237419d Binary files /dev/null and b/tests/hipscat_import/data/malformed_catalogs/bad_schemas/Norder=0/Dir=0/Npix=11.wrong_dtypes.parquet differ diff --git a/tests/hipscat_import/data/malformed_catalogs/bad_schemas/_common_metadata b/tests/hipscat_import/data/malformed_catalogs/bad_schemas/_common_metadata new file mode 100644 index 00000000..a72be7f8 Binary files /dev/null and b/tests/hipscat_import/data/malformed_catalogs/bad_schemas/_common_metadata differ diff --git a/tests/hipscat_import/data/malformed_catalogs/bad_schemas/_common_metadata.import b/tests/hipscat_import/data/malformed_catalogs/bad_schemas/_common_metadata.import new file mode 100644 index 00000000..dfc011fa Binary files /dev/null and b/tests/hipscat_import/data/malformed_catalogs/bad_schemas/_common_metadata.import differ diff --git a/tests/hipscat_import/data/malformed_catalogs/bad_schemas/_metadata b/tests/hipscat_import/data/malformed_catalogs/bad_schemas/_metadata new file mode 100644 index 00000000..fce59f1a Binary files /dev/null and b/tests/hipscat_import/data/malformed_catalogs/bad_schemas/_metadata differ diff --git a/tests/hipscat_import/data/malformed_catalogs/no_rowgroup_stats/Norder=0/Dir=0/Npix=11.parquet b/tests/hipscat_import/data/malformed_catalogs/no_rowgroup_stats/Norder=0/Dir=0/Npix=11.parquet new file mode 100644 index 00000000..11e599de Binary files /dev/null and b/tests/hipscat_import/data/malformed_catalogs/no_rowgroup_stats/Norder=0/Dir=0/Npix=11.parquet differ diff --git a/tests/hipscat_import/data/malformed_catalogs/no_rowgroup_stats/_common_metadata b/tests/hipscat_import/data/malformed_catalogs/no_rowgroup_stats/_common_metadata new file mode 100644 index 00000000..4cf7a744 Binary files /dev/null and b/tests/hipscat_import/data/malformed_catalogs/no_rowgroup_stats/_common_metadata differ diff --git a/tests/hipscat_import/data/malformed_catalogs/no_rowgroup_stats/_metadata b/tests/hipscat_import/data/malformed_catalogs/no_rowgroup_stats/_metadata new file mode 100644 index 00000000..925d3f67 Binary files /dev/null and b/tests/hipscat_import/data/malformed_catalogs/no_rowgroup_stats/_metadata differ diff --git a/tests/hipscat_import/data/malformed_catalogs/valid_truth/README b/tests/hipscat_import/data/malformed_catalogs/valid_truth/README new file mode 100644 index 00000000..8dd3ea47 --- /dev/null +++ b/tests/hipscat_import/data/malformed_catalogs/valid_truth/README @@ -0,0 +1 @@ +small_sky_object_catalog \ No newline at end of file diff --git a/tests/hipscat_import/data/malformed_catalogs/wrong_files_and_rows/Norder=0/Dir=0/Npix=11.extra_file.parquet b/tests/hipscat_import/data/malformed_catalogs/wrong_files_and_rows/Norder=0/Dir=0/Npix=11.extra_file.parquet new file mode 100644 index 00000000..94585fd9 Binary files /dev/null and b/tests/hipscat_import/data/malformed_catalogs/wrong_files_and_rows/Norder=0/Dir=0/Npix=11.extra_file.parquet differ diff --git a/tests/hipscat_import/data/malformed_catalogs/wrong_files_and_rows/Norder=0/Dir=0/Npix=11.extra_rows.parquet b/tests/hipscat_import/data/malformed_catalogs/wrong_files_and_rows/Norder=0/Dir=0/Npix=11.extra_rows.parquet new file mode 100644 index 00000000..a51234ca Binary files /dev/null and b/tests/hipscat_import/data/malformed_catalogs/wrong_files_and_rows/Norder=0/Dir=0/Npix=11.extra_rows.parquet differ diff --git a/tests/hipscat_import/data/malformed_catalogs/wrong_files_and_rows/Norder=0/Dir=0/Npix=11.parquet b/tests/hipscat_import/data/malformed_catalogs/wrong_files_and_rows/Norder=0/Dir=0/Npix=11.parquet new file mode 100644 index 00000000..e0cb8d94 Binary files /dev/null and b/tests/hipscat_import/data/malformed_catalogs/wrong_files_and_rows/Norder=0/Dir=0/Npix=11.parquet differ diff --git a/tests/hipscat_import/data/malformed_catalogs/wrong_files_and_rows/_common_metadata b/tests/hipscat_import/data/malformed_catalogs/wrong_files_and_rows/_common_metadata new file mode 100644 index 00000000..4cf7a744 Binary files /dev/null and b/tests/hipscat_import/data/malformed_catalogs/wrong_files_and_rows/_common_metadata differ diff --git a/tests/hipscat_import/data/malformed_catalogs/wrong_files_and_rows/_metadata b/tests/hipscat_import/data/malformed_catalogs/wrong_files_and_rows/_metadata new file mode 100644 index 00000000..42c25a75 Binary files /dev/null and b/tests/hipscat_import/data/malformed_catalogs/wrong_files_and_rows/_metadata differ diff --git a/tests/hipscat_import/verification/fixture.py b/tests/hipscat_import/verification/fixture.py new file mode 100644 index 00000000..3a8415de --- /dev/null +++ b/tests/hipscat_import/verification/fixture.py @@ -0,0 +1,115 @@ +"""Run pass/fail tests and generate verification report of existing hipscat table.""" + +from pathlib import Path + +import attrs +import yaml + +from hipscat_import.verification.arguments import VerificationArguments +from hipscat_import.verification.run_verification import Verifier + + +@attrs.define +class VerifierFixture: + """Class to generate pytest fixtures for verification tests. Instantiate using the 'from_param' method.""" + + test_targets: dict[str, list | dict] = attrs.field(validator=attrs.validators.instance_of(dict)) + """Dictionary mapping test names to targets.""" + verifier: Verifier = attrs.field(validator=attrs.validators.instance_of(Verifier)) + """Verifier instance that the fixture will use to run verification tests.""" + assert_passed: bool | dict = attrs.field(validator=attrs.validators.instance_of((bool, dict))) + """Expected result(s) of the test(s) this verifier will run.""" + + @classmethod + def from_param( + cls, fixture_param: str, malformed_catalog_dirs: dict[str, Path], tmp_path: Path + ) -> "VerifierFixture": + """Create a VerifierFixture from the given fixture parameter. + + Fixture definitions, including the expected test outcomes, are defined in fixture_defs.yaml. + + Parameters + ---------- + fixture_param : str + The fixture parameter key to look up fixture definitions. + malformed_catalog_dirs : dict[str, Path] + A mapping of malformed test dataset names to their directories. + tmp_path : Path + A temporary path for output. + + Returns: + VerifierFixture: An instance of VerifierFixture configured with the specified parameters. + """ + with open(Path(__file__).parent / "fixture_defs.yaml", "r") as fin: + fixture_defs = yaml.safe_load(fin) + fixture_def = fixture_defs[fixture_param] + + truth_schema = fixture_def.get("truth_schema") + if truth_schema is not None: + truth_schema = malformed_catalog_dirs[truth_schema.split("/")[0]] / truth_schema.split("/")[1] + + args = VerificationArguments( + input_catalog_path=malformed_catalog_dirs[fixture_def["input_dir"]], + output_path=tmp_path, + truth_schema=truth_schema, + truth_total_rows=fixture_def.get("truth_total_rows"), + ) + + fixture = cls( + test_targets=fixture_defs["test_targets"], + verifier=Verifier.from_args(args), + assert_passed=fixture_def["assert_passed"], + ) + return fixture + + @staticmethod + def unpack_assert_passed( + assert_passed: bool | dict, *, targets: list | None = None + ) -> tuple[bool, list] | dict: + """Unpack assert_passed and return a tuple or dictionary based on the provided targets. + + Parameters + ---------- + assert_passed : bool, or dict + A boolean indicating pass/fail status or a dictionary with target-specific statuses. + targets list, or None + A list of targets that assert_passed should apply to. If None, the return type is a + tuple with a bool indicating whether the test is expected to pass and a list of + parquet file suffixes that are expected to fail. Otherwise, the return type is a dict + with a key for each target and values indicating pass/fail for the given target. + + Returns + ------- + tuple[bool, list] | dict: + - If assert_passed is a boolean: + - If targets is None, returns a tuple (assert_passed, []). + - Else, returns a dict of {target: assert_passed}. + - If assert_passed is a dictionary: + - If targets is None, assert_passed is expected to contain a single item with + key=False and value=list of file suffixes that should have failed. The item + is returned as a tuple. + - Else, assert_passed is expected to have a key for every target. The + assert_passed dict is returned. + + Raises + ------ + AssertionError: If assert_passed is a dict but it does not have the expected key(s). + """ + + if isinstance(assert_passed, bool): + if targets is None: + return assert_passed, [] + return {target: assert_passed for target in targets} + + # assert_passed is a dict + + if targets is None: + # Expecting a single item with key=False, value=list of file suffixes that should have failed. + msg = "Unexpected key. There is probably a bug in the fixture definition." + assert set(assert_passed) == {False}, msg + return False, assert_passed[False] + + # Expecting one key per target + msg = "Unexpected set of targets. There is probably a bug in the fixture definition." + assert set(assert_passed) == set(targets), msg + return assert_passed diff --git a/tests/hipscat_import/verification/fixture_defs.yaml b/tests/hipscat_import/verification/fixture_defs.yaml new file mode 100644 index 00000000..c4333e91 --- /dev/null +++ b/tests/hipscat_import/verification/fixture_defs.yaml @@ -0,0 +1,156 @@ +# region ---- Tests and their targets +# fixture's 'assert_passed' will be coerced to a dict indexed by test and/or target. +test_targets: + num_rows: + - _metadata + - user total + schema: + schema: + - _common_metadata + - _metadata + - file footers + 'schema metadata': + - "b'pandas' in _common_metadata" + - _common_metadata + - _metadata + - file footers +# endregion +# region ---- Fixture params and their definitions +# valid_truth should pass all tests +valid_truth: + input_dir: valid_truth + truth_schema: valid_truth/_common_metadata + truth_total_rows: 131 + assert_passed: True +# no_rowgroup_stats is used for test_rowgroup_stats, test_is_valid_catalog, and verification_runner +no_rowgroup_stats: + input_dir: no_rowgroup_stats + assert_passed: False +# schema* is used for test_schemas +schema: + # Case: test bad_schemas catalog given valid_truth schema as truth_schema + input_dir: bad_schemas + truth_schema: valid_truth/_common_metadata + assert_passed: + schema: + _common_metadata: False # _common_metadata has wrong dtypes + _metadata: True + file footers: + False: + - .extra_column.parquet + - .missing_column.parquet + - .wrong_dtypes.parquet + 'schema metadata': + "b'pandas' in _common_metadata": True + _common_metadata: True + _metadata: False # _metadata is missing b'pandas' metadata + file footers: + False: + - .no_metadata.parquet +schema_with_cmd_truth: + # Case: test bad_schemas catalog given a truth_schema that has the wrong dtypes + input_dir: bad_schemas + truth_schema: bad_schemas/_common_metadata + assert_passed: + schema: + _common_metadata: True + _metadata: False # truth_schema has wrong dtypes + file footers: + False: + - .extra_column.parquet + - .missing_column.parquet + - .no_metadata.parquet + - .parquet + 'schema metadata': + "b'pandas' in _common_metadata": False # b'pandas' dtypes != truth_schema dtypes + _common_metadata: True + _metadata: False # _metadata is missing b'pandas' metadata + file footers: + False: + - .no_metadata.parquet +schema_with_import_truth: + # Case: Test bad_schemas catalog given a truth_schema with custom metadata that should be preserved, but + # missing hipscat fields and b'pandas' metadata. This schema could have been used during catalog import. + input_dir: bad_schemas + truth_schema: bad_schemas/_common_metadata.import + assert_passed: + schema: + _common_metadata: False # _common_metadata has wrong dtypes + _metadata: True + file footers: + False: + - .extra_column.parquet + - .missing_column.parquet + - .wrong_dtypes.parquet + 'schema metadata': + "b'pandas' in _common_metadata": True + _common_metadata: False # _common_metadata is missing the custom metadata + _metadata: False # _metadata is missing all metadata + file footers: + False: + # Every files fails because the custom metadata is missing. + - .extra_column.parquet + - .missing_column.parquet + - .no_metadata.parquet + - .parquet + - .wrong_dtypes.parquet +schema_with_no_truth: + # Case: Test bad_schemas catalog given no truth_schema + input_dir: bad_schemas + truth_schema: null # _common_metadata will be used as the source of truth + assert_passed: + schema: + _common_metadata: null # this test should not run + _metadata: False # truth_schema has wrong dtypes + file footers: + False: + - .extra_column.parquet + - .missing_column.parquet + - .no_metadata.parquet + - .parquet + 'schema metadata': + "b'pandas' in _common_metadata": False # b'pandas' dtypes != truth_schema dtypes + _common_metadata: null # this test should not run + _metadata: False # _metadata is missing b'pandas' metadata + file footers: + False: + - .no_metadata.parquet +schema_with_md_truth: + # Case: Test bad_schemas catalog given a truth_schema with no metadata + input_dir: bad_schemas + truth_schema: bad_schemas/_metadata + assert_passed: + schema: + _common_metadata: False # _common_metadata has wrong dtypes + _metadata: True + file footers: + False: + - .extra_column.parquet + - .missing_column.parquet + - .wrong_dtypes.parquet + 'schema metadata': + "b'pandas' in _common_metadata": True + _common_metadata: True + _metadata: False # _metadata is missing b'pandas' metadata + file footers: + False: + - .no_metadata.parquet +# wrong_files is used for test_file_sets +wrong_files: + input_dir: wrong_files_and_rows + assert_passed: + False: + - .missing_file.parquet + - .extra_file.parquet +# wrong_rows is used for test_num_rows +wrong_rows: + input_dir: wrong_files_and_rows + truth_total_rows: 131 + assert_passed: + _metadata: + False: + - .missing_file.parquet + - .extra_file.parquet + - .extra_rows.parquet + 'user total': False +# endregion diff --git a/tests/hipscat_import/verification/generate_malformed_catalogs.py b/tests/hipscat_import/verification/generate_malformed_catalogs.py new file mode 100644 index 00000000..5809fd89 --- /dev/null +++ b/tests/hipscat_import/verification/generate_malformed_catalogs.py @@ -0,0 +1,229 @@ +import random +import shutil +from pathlib import Path + +import attrs +import pyarrow +import pyarrow.dataset +import pyarrow.parquet + +DATA_DIR = Path(__file__).parent.parent.parent.parent / "tests/hipscat_import/data" +VALID_CATALOG_DIR = DATA_DIR / "small_sky_object_catalog" +MALFORMED_CATALOGS_DIR = DATA_DIR / "malformed_catalogs" + + +def run( + valid_catalog_dir: Path = VALID_CATALOG_DIR, malformed_catalogs_dir: Path = MALFORMED_CATALOGS_DIR +) -> None: + """Generate malformed catalogs to be used as test data for verification. + This only needs to be run once unless/until it is desirable to regenerate the dataset. + """ + Generate.run(valid_catalog_dir=valid_catalog_dir, malformed_catalogs_dir=malformed_catalogs_dir) + + +@attrs.define +class ValidBase: + dataset: pyarrow.dataset.Dataset = attrs.field() + frag: pyarrow.dataset.FileFragment = attrs.field() + tbl: pyarrow.Table = attrs.field() + schema: pyarrow.Schema = attrs.field() + valid_catalog_dir: Path = attrs.field() + malformed_catalogs_dir: Path = attrs.field() + insert_dir: str = attrs.field(factory=str) + + @classmethod + def from_dirs(cls, valid_catalog_dir: Path, malformed_catalogs_dir: Path) -> "ValidBase": + valid_ds = pyarrow.dataset.parquet_dataset(valid_catalog_dir / "_metadata") + valid_frag = next(valid_ds.get_fragments()) + valid_tbl = valid_frag.to_table() + return cls( + dataset=valid_ds, + frag=valid_frag, + tbl=valid_tbl, + schema=valid_tbl.schema, + valid_catalog_dir=valid_catalog_dir, + malformed_catalogs_dir=malformed_catalogs_dir, + ) + + @property + def fmeta(self) -> Path: + return self.malformed_catalogs_dir / self.insert_dir / "_metadata" + + @property + def fcmeta(self) -> Path: + return self.malformed_catalogs_dir / self.insert_dir / "_common_metadata" + + @property + def fdata(self) -> Path: + frag_key = Path(self.frag.path).relative_to(self.valid_catalog_dir) + return self.malformed_catalogs_dir / self.insert_dir / frag_key + + +@attrs.define +class Generate: + def run( + self, + valid_catalog_dir: Path = VALID_CATALOG_DIR, + malformed_catalogs_dir: Path = MALFORMED_CATALOGS_DIR, + ) -> None: + """Generate malformed catalogs to be used as test data for verification. + This only needs to be run once unless/until it is desirable to regenerate the dataset. + """ + if malformed_catalogs_dir.is_dir(): + print(f"Output directory exists. Remove it and try again.\n{malformed_catalogs_dir}") + return + print(f"Generating malformed catalogs from valid catalog at {valid_catalog_dir}...") + + valid = ValidBase.from_dirs( + valid_catalog_dir=valid_catalog_dir, malformed_catalogs_dir=malformed_catalogs_dir + ) + generate = Generate() + generate.valid_truth(valid) + generate.bad_schemas(valid) + generate.no_rowgroup_stats(valid) + generate.wrong_files_and_rows(valid) + + def malformed(self, valid: ValidBase) -> None: + """Case: