Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract file-finding for partition info reading. #196

Merged
merged 3 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions src/hipscat/catalog/association_catalog/association_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,9 @@ def _read_args(
cls, catalog_base_dir: FilePointer, storage_options: Union[Dict[Any, Any], None] = None
) -> Tuple[CatalogInfoClass, PixelInputTypes, JoinPixelInputTypes]: # type: ignore[override]
args = super()._read_args(catalog_base_dir, storage_options=storage_options)
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
partition_join_info = PartitionJoinInfo.read_from_file(
metadata_file, storage_options=storage_options
)
else:
partition_join_info_file = paths.get_partition_join_info_pointer(catalog_base_dir)
partition_join_info = PartitionJoinInfo.read_from_csv(
partition_join_info_file, storage_options=storage_options
)
partition_join_info = PartitionJoinInfo.read_from_dir(
catalog_base_dir, storage_options=storage_options
)
return args + (partition_join_info,)

@classmethod
Expand Down
75 changes: 67 additions & 8 deletions src/hipscat/catalog/association_catalog/partition_join_info.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
"""Container class to hold primary-to-join partition metadata"""
from __future__ import annotations

from typing import Any, Dict, List, Union
from typing import Dict, List

import numpy as np
import pandas as pd
import pyarrow as pa
from typing_extensions import Self

from hipscat.io import FilePointer, file_io
from hipscat.catalog.partition_info import PartitionInfo
from hipscat.io import FilePointer, file_io, paths
from hipscat.io.parquet_metadata import (
read_row_group_fragments,
row_group_stat_single_value,
Expand Down Expand Up @@ -91,10 +92,68 @@ def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: di

write_parquet_metadata_for_batches(batches, catalog_path, storage_options)

def write_to_csv(self, catalog_path: FilePointer, storage_options: dict = None):
"""Write all partition data to CSV files.

Two files will be written::
- partition_info.csv - covers all primary catalog pixels, and should match the file structure
- partition_join_info.csv - covers all pairwise relationships between primary and
join catalogs.

Args:
catalog_path: FilePointer to the directory where the
`partition_join_info.csv` file will be written
storage_options (dict): dictionary that contains abstract filesystem credentials
"""
partition_join_info_file = paths.get_partition_join_info_pointer(catalog_path)
file_io.write_dataframe_to_csv(
self.data_frame, partition_join_info_file, index=False, storage_options=storage_options
)

primary_pixels = self.primary_to_join_map().keys()
partition_info_pointer = paths.get_partition_info_pointer(catalog_path)
partition_info = PartitionInfo.from_healpix(primary_pixels)
partition_info.write_to_file(
partition_info_file=partition_info_pointer, storage_options=storage_options
)

@classmethod
def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = None) -> PartitionJoinInfo:
"""Read partition join info from a file within a hipscat directory.

This will look for a `_metadata` file, and if not found, will look for
a `partition_join_info.csv` file.

Args:
catalog_base_dir: path to the root directory of the catalog
storage_options (dict): dictionary that contains abstract filesystem credentials

Returns:
A `PartitionJoinInfo` object with the data from the file

Raises:
FileNotFoundError: if neither desired file is found in the catalog_base_dir
"""
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
partition_join_info_file = paths.get_partition_join_info_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
partition_join_info = PartitionJoinInfo.read_from_file(
metadata_file, storage_options=storage_options
)
elif file_io.does_file_or_directory_exist(partition_join_info_file, storage_options=storage_options):
partition_join_info = PartitionJoinInfo.read_from_csv(
partition_join_info_file, storage_options=storage_options
)
else:
raise FileNotFoundError(
f"_metadata or partition join info file is required in catalog directory {catalog_base_dir}"
)
return partition_join_info

@classmethod
def read_from_file(
cls, metadata_file: FilePointer, strict=False, storage_options: Union[Dict[Any, Any], None] = None
) -> Self:
cls, metadata_file: FilePointer, strict=False, storage_options: dict = None
) -> PartitionJoinInfo:
"""Read partition join info from a `_metadata` file to create an object

Args:
Expand Down Expand Up @@ -169,8 +228,8 @@ def read_from_file(

@classmethod
def read_from_csv(
cls, partition_join_info_file: FilePointer, storage_options: Union[Dict[Any, Any], None] = None
) -> Self:
cls, partition_join_info_file: FilePointer, storage_options: dict = None
) -> PartitionJoinInfo:
"""Read partition join info from a `partition_join_info.csv` file to create an object

Args:
Expand All @@ -184,7 +243,7 @@ def read_from_csv(
partition_join_info_file, storage_options=storage_options
):
raise FileNotFoundError(
f"No partition info found where expected: {str(partition_join_info_file)}"
f"No partition join info found where expected: {str(partition_join_info_file)}"
)

data_frame = file_io.load_csv_to_pandas(partition_join_info_file, storage_options=storage_options)
Expand Down
7 changes: 1 addition & 6 deletions src/hipscat/catalog/healpix_dataset/healpix_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,7 @@ def _read_args(
storage_options: Union[Dict[Any, Any], None] = None,
) -> Tuple[CatalogInfoClass, PartitionInfo]:
args = super()._read_args(catalog_base_dir, storage_options=storage_options)
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
partition_info = PartitionInfo.read_from_file(metadata_file, storage_options=storage_options)
else:
partition_info_file = paths.get_partition_info_pointer(catalog_base_dir)
partition_info = PartitionInfo.read_from_csv(partition_info_file, storage_options=storage_options)
partition_info = PartitionInfo.read_from_dir(catalog_base_dir, storage_options=storage_options)
return args + (partition_info,)

@classmethod
Expand Down
38 changes: 35 additions & 3 deletions src/hipscat/catalog/partition_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pandas as pd
import pyarrow as pa

from hipscat.io import FilePointer, file_io
from hipscat.io import FilePointer, file_io, paths
from hipscat.io.parquet_metadata import (
read_row_group_fragments,
row_group_stat_single_value,
Expand Down Expand Up @@ -43,14 +43,17 @@ def get_highest_order(self) -> int:
max_pixel = np.max(self.pixel_list)
return max_pixel.order

def write_to_file(self, partition_info_file: FilePointer):
def write_to_file(self, partition_info_file: FilePointer, storage_options: dict = None):
"""Write all partition data to CSV file.

Args:
partition_info_file: FilePointer to where the `partition_info.csv`
file will be written
storage_options (dict): dictionary that contains abstract filesystem credentials
"""
file_io.write_dataframe_to_csv(self.as_dataframe(), partition_info_file, index=False)
file_io.write_dataframe_to_csv(
self.as_dataframe(), partition_info_file, index=False, storage_options=storage_options
)

def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: dict = None):
"""Generate parquet metadata, using the known partitions.
Expand All @@ -75,6 +78,35 @@ def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: di

write_parquet_metadata_for_batches(batches, catalog_path, storage_options)

@classmethod
def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = None) -> PartitionInfo:
"""Read partition info from a file within a hipscat directory.

This will look for a `_metadata` file, and if not found, will look for
a `partition_info.csv` file.

Args:
catalog_base_dir: path to the root directory of the catalog
storage_options (dict): dictionary that contains abstract filesystem credentials

Returns:
A `PartitionInfo` object with the data from the file

Raises:
FileNotFoundError: if neither desired file is found in the catalog_base_dir
"""
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
partition_info_file = paths.get_partition_info_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
partition_info = PartitionInfo.read_from_file(metadata_file, storage_options=storage_options)
elif file_io.does_file_or_directory_exist(partition_info_file, storage_options=storage_options):
partition_info = PartitionInfo.read_from_csv(partition_info_file, storage_options=storage_options)
else:
raise FileNotFoundError(
f"_metadata or partition info file is required in catalog directory {catalog_base_dir}"
)
return partition_info

@classmethod
def read_from_file(
cls, metadata_file: FilePointer, strict=False, storage_options: dict = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,35 @@ def test_empty_directory(tmp_path, association_catalog_info_data, association_ca

## Now we create the needed _metadata and everything is right.
part_info = PartitionJoinInfo(association_catalog_join_pixels)
part_info.write_to_metadata_files(
catalog_path=catalog_path,
)
part_info.write_to_metadata_files(catalog_path=catalog_path)
catalog = AssociationCatalog.read_from_hipscat(catalog_path)
assert catalog.catalog_name == association_catalog_info_data["catalog_name"]


def test_csv_round_trip(tmp_path, association_catalog_info_data, association_catalog_join_pixels):
## Path doesn't exist
with pytest.raises(FileNotFoundError):
AssociationCatalog.read_from_hipscat(os.path.join("path", "empty"))

catalog_path = os.path.join(tmp_path, "empty")
os.makedirs(catalog_path, exist_ok=True)

file_name = os.path.join(catalog_path, "catalog_info.json")
with open(file_name, "w", encoding="utf-8") as metadata_file:
metadata_file.write(json.dumps(association_catalog_info_data))

with pytest.raises(FileNotFoundError, match="partition"):
AssociationCatalog.read_from_hipscat(catalog_path)

file_name = os.path.join(catalog_path, "partition_info.csv")
with open(file_name, "w", encoding="utf-8") as metadata_file:
# dump some garbage in there - just needs to exist.
metadata_file.write(json.dumps(association_catalog_info_data))
with pytest.raises(FileNotFoundError, match="partition"):
AssociationCatalog.read_from_hipscat(catalog_path)

part_info = PartitionJoinInfo(association_catalog_join_pixels)
part_info.write_to_csv(catalog_path=catalog_path)

catalog = AssociationCatalog.read_from_hipscat(catalog_path)
pd.testing.assert_frame_equal(catalog.get_join_pixels(), association_catalog_join_pixels)
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ def test_read_from_metadata_fail(tmp_path):
PartitionJoinInfo.read_from_file(metadata_filename, strict=True)


def test_load_partition_join_info_from_dir_fail(tmp_path):
empty_dataframe = pd.DataFrame()
metadata_filename = os.path.join(tmp_path, "empty_metadata.parquet")
empty_dataframe.to_parquet(metadata_filename)
with pytest.raises(FileNotFoundError, match="_metadata or partition join info"):
PartitionJoinInfo.read_from_dir(tmp_path)

# The file is there, but doesn't have the required content.
metadata_filename = os.path.join(tmp_path, "_metadata")
empty_dataframe.to_parquet(metadata_filename)
with pytest.raises(ValueError, match="missing columns"):
PartitionJoinInfo.read_from_dir(tmp_path)


def test_primary_to_join_map(association_catalog_join_pixels):
info = PartitionJoinInfo(association_catalog_join_pixels)
pd.testing.assert_frame_equal(info.data_frame, association_catalog_join_pixels)
Expand All @@ -81,6 +95,16 @@ def test_metadata_file_round_trip(association_catalog_join_pixels, tmp_path):
pd.testing.assert_frame_equal(new_info.data_frame, association_catalog_join_pixels)


def test_csv_file_round_trip(association_catalog_join_pixels, tmp_path):
info = PartitionJoinInfo(association_catalog_join_pixels)
pd.testing.assert_frame_equal(info.data_frame, association_catalog_join_pixels)
info.write_to_csv(tmp_path)

file_pointer = file_io.get_file_pointer_from_path(os.path.join(tmp_path, "partition_join_info.csv"))
new_info = PartitionJoinInfo.read_from_csv(file_pointer)
pd.testing.assert_frame_equal(new_info.data_frame, association_catalog_join_pixels)


def test_read_from_csv(association_catalog_partition_join_file, association_catalog_join_pixels):
file_pointer = file_io.get_file_pointer_from_path(association_catalog_partition_join_file)
info = PartitionJoinInfo.read_from_csv(file_pointer)
Expand Down
14 changes: 14 additions & 0 deletions tests/hipscat/catalog/test_partition_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ def test_load_partition_info_from_metadata_fail(tmp_path):
PartitionInfo.read_from_file(metadata_filename, strict=True)


def test_load_partition_info_from_dir_fail(tmp_path):
empty_dataframe = pd.DataFrame()
metadata_filename = os.path.join(tmp_path, "empty_metadata.parquet")
empty_dataframe.to_parquet(metadata_filename)
with pytest.raises(FileNotFoundError, match="_metadata or partition info"):
PartitionInfo.read_from_dir(tmp_path)

# The file is there, but doesn't have the required content.
metadata_filename = os.path.join(tmp_path, "_metadata")
empty_dataframe.to_parquet(metadata_filename)
with pytest.raises(ValueError, match="missing Norder"):
PartitionInfo.read_from_dir(tmp_path)


def test_load_partition_info_small_sky_order1(small_sky_order1_dir):
"""Instantiate the partition info for catalog with 4 pixels"""
partition_info_file = paths.get_parquet_metadata_pointer(small_sky_order1_dir)
Expand Down
Loading