diff --git a/src/hipscat/catalog/association_catalog/association_catalog.py b/src/hipscat/catalog/association_catalog/association_catalog.py index 9dcad432..a733bbc8 100644 --- a/src/hipscat/catalog/association_catalog/association_catalog.py +++ b/src/hipscat/catalog/association_catalog/association_catalog.py @@ -62,16 +62,9 @@ def _read_args( cls, catalog_base_dir: FilePointer, storage_options: Union[Dict[Any, Any], None] = None ) -> Tuple[CatalogInfoClass, PixelInputTypes, JoinPixelInputTypes]: # type: ignore[override] args = super()._read_args(catalog_base_dir, storage_options=storage_options) - metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir) - if file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options): - partition_join_info = PartitionJoinInfo.read_from_file( - metadata_file, storage_options=storage_options - ) - else: - partition_join_info_file = paths.get_partition_join_info_pointer(catalog_base_dir) - partition_join_info = PartitionJoinInfo.read_from_csv( - partition_join_info_file, storage_options=storage_options - ) + partition_join_info = PartitionJoinInfo.read_from_dir( + catalog_base_dir, storage_options=storage_options + ) return args + (partition_join_info,) @classmethod diff --git a/src/hipscat/catalog/association_catalog/partition_join_info.py b/src/hipscat/catalog/association_catalog/partition_join_info.py index a42aa5ab..bbf82a64 100644 --- a/src/hipscat/catalog/association_catalog/partition_join_info.py +++ b/src/hipscat/catalog/association_catalog/partition_join_info.py @@ -1,13 +1,14 @@ """Container class to hold primary-to-join partition metadata""" +from __future__ import annotations -from typing import Any, Dict, List, Union +from typing import Dict, List import numpy as np import pandas as pd import pyarrow as pa -from typing_extensions import Self -from hipscat.io import FilePointer, file_io +from hipscat.catalog.partition_info import PartitionInfo +from hipscat.io import FilePointer, file_io, paths from hipscat.io.parquet_metadata import ( read_row_group_fragments, row_group_stat_single_value, @@ -91,10 +92,68 @@ def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: di write_parquet_metadata_for_batches(batches, catalog_path, storage_options) + def write_to_csv(self, catalog_path: FilePointer, storage_options: dict = None): + """Write all partition data to CSV files. + + Two files will be written:: + - partition_info.csv - covers all primary catalog pixels, and should match the file structure + - partition_join_info.csv - covers all pairwise relationships between primary and + join catalogs. + + Args: + catalog_path: FilePointer to the directory where the + `partition_join_info.csv` file will be written + storage_options (dict): dictionary that contains abstract filesystem credentials + """ + partition_join_info_file = paths.get_partition_join_info_pointer(catalog_path) + file_io.write_dataframe_to_csv( + self.data_frame, partition_join_info_file, index=False, storage_options=storage_options + ) + + primary_pixels = self.primary_to_join_map().keys() + partition_info_pointer = paths.get_partition_info_pointer(catalog_path) + partition_info = PartitionInfo.from_healpix(primary_pixels) + partition_info.write_to_file( + partition_info_file=partition_info_pointer, storage_options=storage_options + ) + + @classmethod + def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = None) -> PartitionJoinInfo: + """Read partition join info from a file within a hipscat directory. + + This will look for a `_metadata` file, and if not found, will look for + a `partition_join_info.csv` file. + + Args: + catalog_base_dir: path to the root directory of the catalog + storage_options (dict): dictionary that contains abstract filesystem credentials + + Returns: + A `PartitionJoinInfo` object with the data from the file + + Raises: + FileNotFoundError: if neither desired file is found in the catalog_base_dir + """ + metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir) + partition_join_info_file = paths.get_partition_join_info_pointer(catalog_base_dir) + if file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options): + partition_join_info = PartitionJoinInfo.read_from_file( + metadata_file, storage_options=storage_options + ) + elif file_io.does_file_or_directory_exist(partition_join_info_file, storage_options=storage_options): + partition_join_info = PartitionJoinInfo.read_from_csv( + partition_join_info_file, storage_options=storage_options + ) + else: + raise FileNotFoundError( + f"_metadata or partition join info file is required in catalog directory {catalog_base_dir}" + ) + return partition_join_info + @classmethod def read_from_file( - cls, metadata_file: FilePointer, strict=False, storage_options: Union[Dict[Any, Any], None] = None - ) -> Self: + cls, metadata_file: FilePointer, strict=False, storage_options: dict = None + ) -> PartitionJoinInfo: """Read partition join info from a `_metadata` file to create an object Args: @@ -169,8 +228,8 @@ def read_from_file( @classmethod def read_from_csv( - cls, partition_join_info_file: FilePointer, storage_options: Union[Dict[Any, Any], None] = None - ) -> Self: + cls, partition_join_info_file: FilePointer, storage_options: dict = None + ) -> PartitionJoinInfo: """Read partition join info from a `partition_join_info.csv` file to create an object Args: @@ -184,7 +243,7 @@ def read_from_csv( partition_join_info_file, storage_options=storage_options ): raise FileNotFoundError( - f"No partition info found where expected: {str(partition_join_info_file)}" + f"No partition join info found where expected: {str(partition_join_info_file)}" ) data_frame = file_io.load_csv_to_pandas(partition_join_info_file, storage_options=storage_options) diff --git a/src/hipscat/catalog/healpix_dataset/healpix_dataset.py b/src/hipscat/catalog/healpix_dataset/healpix_dataset.py index 18374e37..3eb51bec 100644 --- a/src/hipscat/catalog/healpix_dataset/healpix_dataset.py +++ b/src/hipscat/catalog/healpix_dataset/healpix_dataset.py @@ -84,12 +84,7 @@ def _read_args( storage_options: Union[Dict[Any, Any], None] = None, ) -> Tuple[CatalogInfoClass, PartitionInfo]: args = super()._read_args(catalog_base_dir, storage_options=storage_options) - metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir) - if file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options): - partition_info = PartitionInfo.read_from_file(metadata_file, storage_options=storage_options) - else: - partition_info_file = paths.get_partition_info_pointer(catalog_base_dir) - partition_info = PartitionInfo.read_from_csv(partition_info_file, storage_options=storage_options) + partition_info = PartitionInfo.read_from_dir(catalog_base_dir, storage_options=storage_options) return args + (partition_info,) @classmethod diff --git a/src/hipscat/catalog/partition_info.py b/src/hipscat/catalog/partition_info.py index ddccbe10..b8e0602e 100644 --- a/src/hipscat/catalog/partition_info.py +++ b/src/hipscat/catalog/partition_info.py @@ -7,7 +7,7 @@ import pandas as pd import pyarrow as pa -from hipscat.io import FilePointer, file_io +from hipscat.io import FilePointer, file_io, paths from hipscat.io.parquet_metadata import ( read_row_group_fragments, row_group_stat_single_value, @@ -43,14 +43,17 @@ def get_highest_order(self) -> int: max_pixel = np.max(self.pixel_list) return max_pixel.order - def write_to_file(self, partition_info_file: FilePointer): + def write_to_file(self, partition_info_file: FilePointer, storage_options: dict = None): """Write all partition data to CSV file. Args: partition_info_file: FilePointer to where the `partition_info.csv` file will be written + storage_options (dict): dictionary that contains abstract filesystem credentials """ - file_io.write_dataframe_to_csv(self.as_dataframe(), partition_info_file, index=False) + file_io.write_dataframe_to_csv( + self.as_dataframe(), partition_info_file, index=False, storage_options=storage_options + ) def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: dict = None): """Generate parquet metadata, using the known partitions. @@ -75,6 +78,35 @@ def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: di write_parquet_metadata_for_batches(batches, catalog_path, storage_options) + @classmethod + def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = None) -> PartitionInfo: + """Read partition info from a file within a hipscat directory. + + This will look for a `_metadata` file, and if not found, will look for + a `partition_info.csv` file. + + Args: + catalog_base_dir: path to the root directory of the catalog + storage_options (dict): dictionary that contains abstract filesystem credentials + + Returns: + A `PartitionInfo` object with the data from the file + + Raises: + FileNotFoundError: if neither desired file is found in the catalog_base_dir + """ + metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir) + partition_info_file = paths.get_partition_info_pointer(catalog_base_dir) + if file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options): + partition_info = PartitionInfo.read_from_file(metadata_file, storage_options=storage_options) + elif file_io.does_file_or_directory_exist(partition_info_file, storage_options=storage_options): + partition_info = PartitionInfo.read_from_csv(partition_info_file, storage_options=storage_options) + else: + raise FileNotFoundError( + f"_metadata or partition info file is required in catalog directory {catalog_base_dir}" + ) + return partition_info + @classmethod def read_from_file( cls, metadata_file: FilePointer, strict=False, storage_options: dict = None diff --git a/tests/hipscat/catalog/association_catalog/test_association_catalog.py b/tests/hipscat/catalog/association_catalog/test_association_catalog.py index 3a61bf50..1e7e63a7 100644 --- a/tests/hipscat/catalog/association_catalog/test_association_catalog.py +++ b/tests/hipscat/catalog/association_catalog/test_association_catalog.py @@ -89,8 +89,35 @@ def test_empty_directory(tmp_path, association_catalog_info_data, association_ca ## Now we create the needed _metadata and everything is right. part_info = PartitionJoinInfo(association_catalog_join_pixels) - part_info.write_to_metadata_files( - catalog_path=catalog_path, - ) + part_info.write_to_metadata_files(catalog_path=catalog_path) catalog = AssociationCatalog.read_from_hipscat(catalog_path) assert catalog.catalog_name == association_catalog_info_data["catalog_name"] + + +def test_csv_round_trip(tmp_path, association_catalog_info_data, association_catalog_join_pixels): + ## Path doesn't exist + with pytest.raises(FileNotFoundError): + AssociationCatalog.read_from_hipscat(os.path.join("path", "empty")) + + catalog_path = os.path.join(tmp_path, "empty") + os.makedirs(catalog_path, exist_ok=True) + + file_name = os.path.join(catalog_path, "catalog_info.json") + with open(file_name, "w", encoding="utf-8") as metadata_file: + metadata_file.write(json.dumps(association_catalog_info_data)) + + with pytest.raises(FileNotFoundError, match="partition"): + AssociationCatalog.read_from_hipscat(catalog_path) + + file_name = os.path.join(catalog_path, "partition_info.csv") + with open(file_name, "w", encoding="utf-8") as metadata_file: + # dump some garbage in there - just needs to exist. + metadata_file.write(json.dumps(association_catalog_info_data)) + with pytest.raises(FileNotFoundError, match="partition"): + AssociationCatalog.read_from_hipscat(catalog_path) + + part_info = PartitionJoinInfo(association_catalog_join_pixels) + part_info.write_to_csv(catalog_path=catalog_path) + + catalog = AssociationCatalog.read_from_hipscat(catalog_path) + pd.testing.assert_frame_equal(catalog.get_join_pixels(), association_catalog_join_pixels) diff --git a/tests/hipscat/catalog/association_catalog/test_partition_join_info.py b/tests/hipscat/catalog/association_catalog/test_partition_join_info.py index c135a4cd..80b086da 100644 --- a/tests/hipscat/catalog/association_catalog/test_partition_join_info.py +++ b/tests/hipscat/catalog/association_catalog/test_partition_join_info.py @@ -55,6 +55,20 @@ def test_read_from_metadata_fail(tmp_path): PartitionJoinInfo.read_from_file(metadata_filename, strict=True) +def test_load_partition_join_info_from_dir_fail(tmp_path): + empty_dataframe = pd.DataFrame() + metadata_filename = os.path.join(tmp_path, "empty_metadata.parquet") + empty_dataframe.to_parquet(metadata_filename) + with pytest.raises(FileNotFoundError, match="_metadata or partition join info"): + PartitionJoinInfo.read_from_dir(tmp_path) + + # The file is there, but doesn't have the required content. + metadata_filename = os.path.join(tmp_path, "_metadata") + empty_dataframe.to_parquet(metadata_filename) + with pytest.raises(ValueError, match="missing columns"): + PartitionJoinInfo.read_from_dir(tmp_path) + + def test_primary_to_join_map(association_catalog_join_pixels): info = PartitionJoinInfo(association_catalog_join_pixels) pd.testing.assert_frame_equal(info.data_frame, association_catalog_join_pixels) @@ -81,6 +95,16 @@ def test_metadata_file_round_trip(association_catalog_join_pixels, tmp_path): pd.testing.assert_frame_equal(new_info.data_frame, association_catalog_join_pixels) +def test_csv_file_round_trip(association_catalog_join_pixels, tmp_path): + info = PartitionJoinInfo(association_catalog_join_pixels) + pd.testing.assert_frame_equal(info.data_frame, association_catalog_join_pixels) + info.write_to_csv(tmp_path) + + file_pointer = file_io.get_file_pointer_from_path(os.path.join(tmp_path, "partition_join_info.csv")) + new_info = PartitionJoinInfo.read_from_csv(file_pointer) + pd.testing.assert_frame_equal(new_info.data_frame, association_catalog_join_pixels) + + def test_read_from_csv(association_catalog_partition_join_file, association_catalog_join_pixels): file_pointer = file_io.get_file_pointer_from_path(association_catalog_partition_join_file) info = PartitionJoinInfo.read_from_csv(file_pointer) diff --git a/tests/hipscat/catalog/test_partition_info.py b/tests/hipscat/catalog/test_partition_info.py index 5360e3e1..b3768fd5 100644 --- a/tests/hipscat/catalog/test_partition_info.py +++ b/tests/hipscat/catalog/test_partition_info.py @@ -55,6 +55,20 @@ def test_load_partition_info_from_metadata_fail(tmp_path): PartitionInfo.read_from_file(metadata_filename, strict=True) +def test_load_partition_info_from_dir_fail(tmp_path): + empty_dataframe = pd.DataFrame() + metadata_filename = os.path.join(tmp_path, "empty_metadata.parquet") + empty_dataframe.to_parquet(metadata_filename) + with pytest.raises(FileNotFoundError, match="_metadata or partition info"): + PartitionInfo.read_from_dir(tmp_path) + + # The file is there, but doesn't have the required content. + metadata_filename = os.path.join(tmp_path, "_metadata") + empty_dataframe.to_parquet(metadata_filename) + with pytest.raises(ValueError, match="missing Norder"): + PartitionInfo.read_from_dir(tmp_path) + + def test_load_partition_info_small_sky_order1(small_sky_order1_dir): """Instantiate the partition info for catalog with 4 pixels""" partition_info_file = paths.get_parquet_metadata_pointer(small_sky_order1_dir)