Skip to content

Commit

Permalink
Extract file-finding for partition info reading. (#196)
Browse files Browse the repository at this point in the history
* Extract file-finding for partition info reading.

* Add more csv implementation and testing.

* Improve consistency from comments.
  • Loading branch information
delucchi-cmu authored Jan 30, 2024
1 parent ffa8352 commit 3a77c01
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 30 deletions.
13 changes: 3 additions & 10 deletions src/hipscat/catalog/association_catalog/association_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,9 @@ def _read_args(
cls, catalog_base_dir: FilePointer, storage_options: Union[Dict[Any, Any], None] = None
) -> Tuple[CatalogInfoClass, PixelInputTypes, JoinPixelInputTypes]: # type: ignore[override]
args = super()._read_args(catalog_base_dir, storage_options=storage_options)
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
partition_join_info = PartitionJoinInfo.read_from_file(
metadata_file, storage_options=storage_options
)
else:
partition_join_info_file = paths.get_partition_join_info_pointer(catalog_base_dir)
partition_join_info = PartitionJoinInfo.read_from_csv(
partition_join_info_file, storage_options=storage_options
)
partition_join_info = PartitionJoinInfo.read_from_dir(
catalog_base_dir, storage_options=storage_options
)
return args + (partition_join_info,)

@classmethod
Expand Down
75 changes: 67 additions & 8 deletions src/hipscat/catalog/association_catalog/partition_join_info.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
"""Container class to hold primary-to-join partition metadata"""
from __future__ import annotations

from typing import Any, Dict, List, Union
from typing import Dict, List

import numpy as np
import pandas as pd
import pyarrow as pa
from typing_extensions import Self

from hipscat.io import FilePointer, file_io
from hipscat.catalog.partition_info import PartitionInfo
from hipscat.io import FilePointer, file_io, paths
from hipscat.io.parquet_metadata import (
read_row_group_fragments,
row_group_stat_single_value,
Expand Down Expand Up @@ -91,10 +92,68 @@ def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: di

write_parquet_metadata_for_batches(batches, catalog_path, storage_options)

def write_to_csv(self, catalog_path: FilePointer, storage_options: dict = None):
"""Write all partition data to CSV files.
Two files will be written::
- partition_info.csv - covers all primary catalog pixels, and should match the file structure
- partition_join_info.csv - covers all pairwise relationships between primary and
join catalogs.
Args:
catalog_path: FilePointer to the directory where the
`partition_join_info.csv` file will be written
storage_options (dict): dictionary that contains abstract filesystem credentials
"""
partition_join_info_file = paths.get_partition_join_info_pointer(catalog_path)
file_io.write_dataframe_to_csv(
self.data_frame, partition_join_info_file, index=False, storage_options=storage_options
)

primary_pixels = self.primary_to_join_map().keys()
partition_info_pointer = paths.get_partition_info_pointer(catalog_path)
partition_info = PartitionInfo.from_healpix(primary_pixels)
partition_info.write_to_file(
partition_info_file=partition_info_pointer, storage_options=storage_options
)

@classmethod
def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = None) -> PartitionJoinInfo:
"""Read partition join info from a file within a hipscat directory.
This will look for a `_metadata` file, and if not found, will look for
a `partition_join_info.csv` file.
Args:
catalog_base_dir: path to the root directory of the catalog
storage_options (dict): dictionary that contains abstract filesystem credentials
Returns:
A `PartitionJoinInfo` object with the data from the file
Raises:
FileNotFoundError: if neither desired file is found in the catalog_base_dir
"""
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
partition_join_info_file = paths.get_partition_join_info_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
partition_join_info = PartitionJoinInfo.read_from_file(
metadata_file, storage_options=storage_options
)
elif file_io.does_file_or_directory_exist(partition_join_info_file, storage_options=storage_options):
partition_join_info = PartitionJoinInfo.read_from_csv(
partition_join_info_file, storage_options=storage_options
)
else:
raise FileNotFoundError(
f"_metadata or partition join info file is required in catalog directory {catalog_base_dir}"
)
return partition_join_info

@classmethod
def read_from_file(
cls, metadata_file: FilePointer, strict=False, storage_options: Union[Dict[Any, Any], None] = None
) -> Self:
cls, metadata_file: FilePointer, strict=False, storage_options: dict = None
) -> PartitionJoinInfo:
"""Read partition join info from a `_metadata` file to create an object
Args:
Expand Down Expand Up @@ -169,8 +228,8 @@ def read_from_file(

@classmethod
def read_from_csv(
cls, partition_join_info_file: FilePointer, storage_options: Union[Dict[Any, Any], None] = None
) -> Self:
cls, partition_join_info_file: FilePointer, storage_options: dict = None
) -> PartitionJoinInfo:
"""Read partition join info from a `partition_join_info.csv` file to create an object
Args:
Expand All @@ -184,7 +243,7 @@ def read_from_csv(
partition_join_info_file, storage_options=storage_options
):
raise FileNotFoundError(
f"No partition info found where expected: {str(partition_join_info_file)}"
f"No partition join info found where expected: {str(partition_join_info_file)}"
)

data_frame = file_io.load_csv_to_pandas(partition_join_info_file, storage_options=storage_options)
Expand Down
7 changes: 1 addition & 6 deletions src/hipscat/catalog/healpix_dataset/healpix_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,7 @@ def _read_args(
storage_options: Union[Dict[Any, Any], None] = None,
) -> Tuple[CatalogInfoClass, PartitionInfo]:
args = super()._read_args(catalog_base_dir, storage_options=storage_options)
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
partition_info = PartitionInfo.read_from_file(metadata_file, storage_options=storage_options)
else:
partition_info_file = paths.get_partition_info_pointer(catalog_base_dir)
partition_info = PartitionInfo.read_from_csv(partition_info_file, storage_options=storage_options)
partition_info = PartitionInfo.read_from_dir(catalog_base_dir, storage_options=storage_options)
return args + (partition_info,)

@classmethod
Expand Down
38 changes: 35 additions & 3 deletions src/hipscat/catalog/partition_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pandas as pd
import pyarrow as pa

from hipscat.io import FilePointer, file_io
from hipscat.io import FilePointer, file_io, paths
from hipscat.io.parquet_metadata import (
read_row_group_fragments,
row_group_stat_single_value,
Expand Down Expand Up @@ -43,14 +43,17 @@ def get_highest_order(self) -> int:
max_pixel = np.max(self.pixel_list)
return max_pixel.order

def write_to_file(self, partition_info_file: FilePointer):
def write_to_file(self, partition_info_file: FilePointer, storage_options: dict = None):
"""Write all partition data to CSV file.
Args:
partition_info_file: FilePointer to where the `partition_info.csv`
file will be written
storage_options (dict): dictionary that contains abstract filesystem credentials
"""
file_io.write_dataframe_to_csv(self.as_dataframe(), partition_info_file, index=False)
file_io.write_dataframe_to_csv(
self.as_dataframe(), partition_info_file, index=False, storage_options=storage_options
)

def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: dict = None):
"""Generate parquet metadata, using the known partitions.
Expand All @@ -75,6 +78,35 @@ def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: di

write_parquet_metadata_for_batches(batches, catalog_path, storage_options)

@classmethod
def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = None) -> PartitionInfo:
"""Read partition info from a file within a hipscat directory.
This will look for a `_metadata` file, and if not found, will look for
a `partition_info.csv` file.
Args:
catalog_base_dir: path to the root directory of the catalog
storage_options (dict): dictionary that contains abstract filesystem credentials
Returns:
A `PartitionInfo` object with the data from the file
Raises:
FileNotFoundError: if neither desired file is found in the catalog_base_dir
"""
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
partition_info_file = paths.get_partition_info_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
partition_info = PartitionInfo.read_from_file(metadata_file, storage_options=storage_options)
elif file_io.does_file_or_directory_exist(partition_info_file, storage_options=storage_options):
partition_info = PartitionInfo.read_from_csv(partition_info_file, storage_options=storage_options)
else:
raise FileNotFoundError(
f"_metadata or partition info file is required in catalog directory {catalog_base_dir}"
)
return partition_info

@classmethod
def read_from_file(
cls, metadata_file: FilePointer, strict=False, storage_options: dict = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,35 @@ def test_empty_directory(tmp_path, association_catalog_info_data, association_ca

## Now we create the needed _metadata and everything is right.
part_info = PartitionJoinInfo(association_catalog_join_pixels)
part_info.write_to_metadata_files(
catalog_path=catalog_path,
)
part_info.write_to_metadata_files(catalog_path=catalog_path)
catalog = AssociationCatalog.read_from_hipscat(catalog_path)
assert catalog.catalog_name == association_catalog_info_data["catalog_name"]


def test_csv_round_trip(tmp_path, association_catalog_info_data, association_catalog_join_pixels):
## Path doesn't exist
with pytest.raises(FileNotFoundError):
AssociationCatalog.read_from_hipscat(os.path.join("path", "empty"))

catalog_path = os.path.join(tmp_path, "empty")
os.makedirs(catalog_path, exist_ok=True)

file_name = os.path.join(catalog_path, "catalog_info.json")
with open(file_name, "w", encoding="utf-8") as metadata_file:
metadata_file.write(json.dumps(association_catalog_info_data))

with pytest.raises(FileNotFoundError, match="partition"):
AssociationCatalog.read_from_hipscat(catalog_path)

file_name = os.path.join(catalog_path, "partition_info.csv")
with open(file_name, "w", encoding="utf-8") as metadata_file:
# dump some garbage in there - just needs to exist.
metadata_file.write(json.dumps(association_catalog_info_data))
with pytest.raises(FileNotFoundError, match="partition"):
AssociationCatalog.read_from_hipscat(catalog_path)

part_info = PartitionJoinInfo(association_catalog_join_pixels)
part_info.write_to_csv(catalog_path=catalog_path)

catalog = AssociationCatalog.read_from_hipscat(catalog_path)
pd.testing.assert_frame_equal(catalog.get_join_pixels(), association_catalog_join_pixels)
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ def test_read_from_metadata_fail(tmp_path):
PartitionJoinInfo.read_from_file(metadata_filename, strict=True)


def test_load_partition_join_info_from_dir_fail(tmp_path):
empty_dataframe = pd.DataFrame()
metadata_filename = os.path.join(tmp_path, "empty_metadata.parquet")
empty_dataframe.to_parquet(metadata_filename)
with pytest.raises(FileNotFoundError, match="_metadata or partition join info"):
PartitionJoinInfo.read_from_dir(tmp_path)

# The file is there, but doesn't have the required content.
metadata_filename = os.path.join(tmp_path, "_metadata")
empty_dataframe.to_parquet(metadata_filename)
with pytest.raises(ValueError, match="missing columns"):
PartitionJoinInfo.read_from_dir(tmp_path)


def test_primary_to_join_map(association_catalog_join_pixels):
info = PartitionJoinInfo(association_catalog_join_pixels)
pd.testing.assert_frame_equal(info.data_frame, association_catalog_join_pixels)
Expand All @@ -81,6 +95,16 @@ def test_metadata_file_round_trip(association_catalog_join_pixels, tmp_path):
pd.testing.assert_frame_equal(new_info.data_frame, association_catalog_join_pixels)


def test_csv_file_round_trip(association_catalog_join_pixels, tmp_path):
info = PartitionJoinInfo(association_catalog_join_pixels)
pd.testing.assert_frame_equal(info.data_frame, association_catalog_join_pixels)
info.write_to_csv(tmp_path)

file_pointer = file_io.get_file_pointer_from_path(os.path.join(tmp_path, "partition_join_info.csv"))
new_info = PartitionJoinInfo.read_from_csv(file_pointer)
pd.testing.assert_frame_equal(new_info.data_frame, association_catalog_join_pixels)


def test_read_from_csv(association_catalog_partition_join_file, association_catalog_join_pixels):
file_pointer = file_io.get_file_pointer_from_path(association_catalog_partition_join_file)
info = PartitionJoinInfo.read_from_csv(file_pointer)
Expand Down
14 changes: 14 additions & 0 deletions tests/hipscat/catalog/test_partition_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ def test_load_partition_info_from_metadata_fail(tmp_path):
PartitionInfo.read_from_file(metadata_filename, strict=True)


def test_load_partition_info_from_dir_fail(tmp_path):
empty_dataframe = pd.DataFrame()
metadata_filename = os.path.join(tmp_path, "empty_metadata.parquet")
empty_dataframe.to_parquet(metadata_filename)
with pytest.raises(FileNotFoundError, match="_metadata or partition info"):
PartitionInfo.read_from_dir(tmp_path)

# The file is there, but doesn't have the required content.
metadata_filename = os.path.join(tmp_path, "_metadata")
empty_dataframe.to_parquet(metadata_filename)
with pytest.raises(ValueError, match="missing Norder"):
PartitionInfo.read_from_dir(tmp_path)


def test_load_partition_info_small_sky_order1(small_sky_order1_dir):
"""Instantiate the partition info for catalog with 4 pixels"""
partition_info_file = paths.get_parquet_metadata_pointer(small_sky_order1_dir)
Expand Down

0 comments on commit 3a77c01

Please sign in to comment.