Skip to content

Commit

Permalink
Keep directory and use to infer write location (#210)
Browse files Browse the repository at this point in the history
* Keep directory and use to infer write location

* Give more help for `strict` param.
  • Loading branch information
delucchi-cmu authored Feb 9, 2024
1 parent c2e8bfd commit 684cd83
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 24 deletions.
70 changes: 60 additions & 10 deletions src/hipscat/catalog/association_catalog/partition_join_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ class PartitionJoinInfo:
JOIN_PIXEL_COLUMN_NAME,
]

def __init__(self, join_info_df: pd.DataFrame) -> None:
def __init__(self, join_info_df: pd.DataFrame, catalog_base_dir: str = None) -> None:
self.data_frame = join_info_df
self.catalog_base_dir = catalog_base_dir
self._check_column_names()

def _check_column_names(self):
Expand Down Expand Up @@ -69,13 +70,21 @@ def primary_to_join_map(self) -> Dict[HealpixPixel, List[HealpixPixel]]:
primary_to_join = dict(primary_to_join)
return primary_to_join

def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: dict = None):
def write_to_metadata_files(self, catalog_path: FilePointer = None, storage_options: dict = None):
"""Generate parquet metadata, using the known partitions.
Args:
catalog_path (FilePointer): base path for the catalog
storage_options (dict): dictionary that contains abstract filesystem credentials
Raises:
ValueError: if no path is provided, and could not be inferred.
"""
if catalog_path is None:
if self.catalog_base_dir is None:
raise ValueError("catalog_path is required if info was not loaded from a directory")
catalog_path = self.catalog_base_dir

batches = [
[
pa.RecordBatch.from_arrays(
Expand All @@ -94,7 +103,7 @@ def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: di

write_parquet_metadata_for_batches(batches, catalog_path, storage_options)

def write_to_csv(self, catalog_path: FilePointer, storage_options: dict = None):
def write_to_csv(self, catalog_path: FilePointer = None, storage_options: dict = None):
"""Write all partition data to CSV files.
Two files will be written::
Expand All @@ -106,7 +115,15 @@ def write_to_csv(self, catalog_path: FilePointer, storage_options: dict = None):
catalog_path: FilePointer to the directory where the
`partition_join_info.csv` file will be written
storage_options (dict): dictionary that contains abstract filesystem credentials
Raises:
ValueError: if no path is provided, and could not be inferred.
"""
if catalog_path is None:
if self.catalog_base_dir is None:
raise ValueError("catalog_path is required if info was not loaded from a directory")
catalog_path = self.catalog_base_dir

partition_join_info_file = paths.get_partition_join_info_pointer(catalog_path)
file_io.write_dataframe_to_csv(
self.data_frame, partition_join_info_file, index=False, storage_options=storage_options
Expand Down Expand Up @@ -141,29 +158,48 @@ def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = No
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
partition_join_info_file = paths.get_partition_join_info_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(partition_join_info_file, storage_options=storage_options):
partition_join_info = PartitionJoinInfo.read_from_csv(
pixel_frame = PartitionJoinInfo._read_from_csv(
partition_join_info_file, storage_options=storage_options
)
elif file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
warnings.warn("Reading partitions from parquet metadata. This is typically slow.")
partition_join_info = PartitionJoinInfo.read_from_file(
pixel_frame = PartitionJoinInfo._read_from_metadata_file(
metadata_file, storage_options=storage_options
)
else:
raise FileNotFoundError(
f"_metadata or partition join info file is required in catalog directory {catalog_base_dir}"
)
return partition_join_info
return cls(pixel_frame, catalog_base_dir)

@classmethod
def read_from_file(
cls, metadata_file: FilePointer, strict=False, storage_options: dict = None
cls, metadata_file: FilePointer, strict: bool = False, storage_options: dict = None
) -> PartitionJoinInfo:
"""Read partition join info from a `_metadata` file to create an object
Args:
metadata_file (FilePointer): FilePointer to the `_metadata` file
storage_options (dict): dictionary that contains abstract filesystem credentials
strict (bool): use strict parsing of _metadata file. this is slower, but
gives more helpful error messages in the case of invalid data.
Returns:
A `PartitionJoinInfo` object with the data from the file
"""
return cls(cls._read_from_metadata_file(metadata_file, strict, storage_options))

@classmethod
def _read_from_metadata_file(
cls, metadata_file: FilePointer, strict: bool = False, storage_options: dict = None
) -> pd.DataFrame:
"""Read partition join info from a `_metadata` file to create an object
Args:
metadata_file (FilePointer): FilePointer to the `_metadata` file
storage_options (dict): dictionary that contains abstract filesystem credentials
strict (bool): use strict parsing of _metadata file. this is slower, but
gives more helpful error messages in the case of invalid data.
Returns:
A `PartitionJoinInfo` object with the data from the file
Expand Down Expand Up @@ -229,14 +265,29 @@ def read_from_file(
columns=cls.COLUMN_NAMES,
)

return cls(pixel_frame)
return pixel_frame

@classmethod
def read_from_csv(
cls, partition_join_info_file: FilePointer, storage_options: dict = None
) -> PartitionJoinInfo:
"""Read partition join info from a `partition_join_info.csv` file to create an object
Args:
partition_join_info_file (FilePointer): FilePointer to the `partition_join_info.csv` file
storage_options (dict): dictionary that contains abstract filesystem credentials
Returns:
A `PartitionJoinInfo` object with the data from the file
"""
return cls(cls._read_from_csv(partition_join_info_file, storage_options))

@classmethod
def _read_from_csv(
cls, partition_join_info_file: FilePointer, storage_options: dict = None
) -> pd.DataFrame:
"""Read partition join info from a `partition_join_info.csv` file to create an object
Args:
partition_join_info_file (FilePointer): FilePointer to the `partition_join_info.csv` file
storage_options (dict): dictionary that contains abstract filesystem credentials
Expand All @@ -251,5 +302,4 @@ def read_from_csv(
f"No partition join info found where expected: {str(partition_join_info_file)}"
)

data_frame = file_io.load_csv_to_pandas(partition_join_info_file, storage_options=storage_options)
return cls(data_frame)
return file_io.load_csv_to_pandas(partition_join_info_file, storage_options=storage_options)
89 changes: 75 additions & 14 deletions src/hipscat/catalog/partition_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ class PartitionInfo:
METADATA_DIR_COLUMN_NAME = "Dir"
METADATA_PIXEL_COLUMN_NAME = "Npix"

def __init__(self, pixel_list: List[HealpixPixel]) -> None:
def __init__(self, pixel_list: List[HealpixPixel], catalog_base_dir: str = None) -> None:
self.pixel_list = pixel_list
self.catalog_base_dir = catalog_base_dir

def get_healpix_pixels(self) -> List[HealpixPixel]:
"""Get healpix pixel objects for all pixels represented as partitions.
Expand All @@ -45,25 +46,55 @@ def get_highest_order(self) -> int:
max_pixel = np.max(self.pixel_list)
return max_pixel.order

def write_to_file(self, partition_info_file: FilePointer, storage_options: dict = None):
def write_to_file(
self,
partition_info_file: FilePointer = None,
catalog_path: FilePointer = None,
storage_options: dict = None,
):
"""Write all partition data to CSV file.
If no paths are provided, the catalog base directory from the `read_from_dir` call is used.
Args:
partition_info_file: FilePointer to where the `partition_info.csv`
file will be written
file will be written.
catalog_path: base directory for a catalog where the `partition_info.csv`
file will be written.
storage_options (dict): dictionary that contains abstract filesystem credentials
Raises:
ValueError: if no path is provided, and could not be inferred.
"""
if partition_info_file is None:
if catalog_path is not None:
partition_info_file = paths.get_partition_info_pointer(catalog_path)
elif self.catalog_base_dir is not None:
partition_info_file = paths.get_partition_info_pointer(self.catalog_base_dir)
else:
raise ValueError("partition_info_file is required if info was not loaded from a directory")

file_io.write_dataframe_to_csv(
self.as_dataframe(), partition_info_file, index=False, storage_options=storage_options
)

def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: dict = None):
def write_to_metadata_files(self, catalog_path: FilePointer = None, storage_options: dict = None):
"""Generate parquet metadata, using the known partitions.
If no catalog_path is provided, the catalog base directory from the `read_from_dir` call is used.
Args:
catalog_path (FilePointer): base path for the catalog
storage_options (dict): dictionary that contains abstract filesystem credentials
Raises:
ValueError: if no path is provided, and could not be inferred.
"""
if catalog_path is None:
if self.catalog_base_dir is None:
raise ValueError("catalog_path is required if info was not loaded from a directory")
catalog_path = self.catalog_base_dir

batches = [
[
pa.RecordBatch.from_arrays(
Expand Down Expand Up @@ -102,25 +133,46 @@ def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = No
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
partition_info_file = paths.get_partition_info_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(partition_info_file, storage_options=storage_options):
partition_info = PartitionInfo.read_from_csv(partition_info_file, storage_options=storage_options)
pixel_list = PartitionInfo._read_from_csv(partition_info_file, storage_options=storage_options)
elif file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
warnings.warn("Reading partitions from parquet metadata. This is typically slow.")
partition_info = PartitionInfo.read_from_file(metadata_file, storage_options=storage_options)
pixel_list = PartitionInfo._read_from_metadata_file(
metadata_file, storage_options=storage_options
)
else:
raise FileNotFoundError(
f"_metadata or partition info file is required in catalog directory {catalog_base_dir}"
)
return partition_info
return cls(pixel_list, catalog_base_dir)

@classmethod
def read_from_file(
cls, metadata_file: FilePointer, strict=False, storage_options: dict = None
cls, metadata_file: FilePointer, strict: bool = False, storage_options: dict = None
) -> PartitionInfo:
"""Read partition info from a `_metadata` file to create an object
Args:
metadata_file (FilePointer): FilePointer to the `_metadata` file
storage_options (dict): dictionary that contains abstract filesystem credentials
strict (bool): use strict parsing of _metadata file. this is slower, but
gives more helpful error messages in the case of invalid data.
Returns:
A `PartitionInfo` object with the data from the file
"""
return cls(cls._read_from_metadata_file(metadata_file, strict, storage_options))

@classmethod
def _read_from_metadata_file(
cls, metadata_file: FilePointer, strict: bool = False, storage_options: dict = None
) -> List[HealpixPixel]:
"""Read partition info list from a `_metadata` file.
Args:
metadata_file (FilePointer): FilePointer to the `_metadata` file
storage_options (dict): dictionary that contains abstract filesystem credentials
strict (bool): use strict parsing of _metadata file. this is slower, but
gives more helpful error messages in the case of invalid data.
Returns:
A `PartitionInfo` object with the data from the file
Expand Down Expand Up @@ -163,14 +215,25 @@ def read_from_file(
## Remove duplicates, preserving order.
## In the case of association partition join info, we may have multiple entries
## for the primary order/pixels.
pixel_list = list(dict.fromkeys(pixel_list))

return cls(pixel_list)
return list(dict.fromkeys(pixel_list))

@classmethod
def read_from_csv(cls, partition_info_file: FilePointer, storage_options: dict = None) -> PartitionInfo:
"""Read partition info from a `partition_info.csv` file to create an object
Args:
partition_info_file (FilePointer): FilePointer to the `partition_info.csv` file
storage_options (dict): dictionary that contains abstract filesystem credentials
Returns:
A `PartitionInfo` object with the data from the file
"""
return cls(cls._read_from_csv(partition_info_file, storage_options))

@classmethod
def _read_from_csv(cls, partition_info_file: FilePointer, storage_options: dict = None) -> PartitionInfo:
"""Read partition info from a `partition_info.csv` file to create an object
Args:
partition_info_file (FilePointer): FilePointer to the `partition_info.csv` file
storage_options (dict): dictionary that contains abstract filesystem credentials
Expand All @@ -183,16 +246,14 @@ def read_from_csv(cls, partition_info_file: FilePointer, storage_options: dict =

data_frame = file_io.load_csv_to_pandas(partition_info_file, storage_options=storage_options)

pixel_list = [
return [
HealpixPixel(order, pixel)
for order, pixel in zip(
data_frame[cls.METADATA_ORDER_COLUMN_NAME],
data_frame[cls.METADATA_PIXEL_COLUMN_NAME],
)
]

return cls(pixel_list)

def as_dataframe(self):
"""Construct a pandas dataframe for the partition info pixels.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,28 @@ def test_read_from_missing_file(tmp_path):
file_pointer = file_io.get_file_pointer_from_path(wrong_path)
with pytest.raises(FileNotFoundError):
PartitionJoinInfo.read_from_csv(file_pointer)


def test_load_partition_info_from_dir_and_write(tmp_path, association_catalog_join_pixels):
info = PartitionJoinInfo(association_catalog_join_pixels)

## Path arguments are required if the info was not created from a `read_from_dir` call
with pytest.raises(ValueError):
info.write_to_csv()
with pytest.raises(ValueError):
info.write_to_metadata_files()

info.write_to_csv(catalog_path=tmp_path)
info = PartitionJoinInfo.read_from_dir(tmp_path)

## Can write out the partition info CSV by providing:
## - no arguments
## - new catalog directory
info.write_to_csv()
info.write_to_csv(catalog_path=tmp_path)

## Can write out the _metadata file by providing:
## - no arguments
## - new catalog directory
info.write_to_metadata_files()
info.write_to_metadata_files(catalog_path=tmp_path)
27 changes: 27 additions & 0 deletions tests/hipscat/catalog/test_partition_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,30 @@ def test_write_to_file_sorted(tmp_path, pixel_list_depth_first, pixel_list_bread
new_partition_info = PartitionInfo.read_from_file(partition_info_pointer)

npt.assert_array_equal(pixel_list_breadth_first, new_partition_info.get_healpix_pixels())


def test_load_partition_info_from_dir_and_write(tmp_path, pixel_list_depth_first):
partition_info = PartitionInfo.from_healpix(pixel_list_depth_first)

## Path arguments are required if the info was not created from a `read_from_dir` call
with pytest.raises(ValueError):
partition_info.write_to_file()
with pytest.raises(ValueError):
partition_info.write_to_metadata_files()

partition_info.write_to_file(catalog_path=tmp_path)
info = PartitionInfo.read_from_dir(tmp_path)

## Can write out the partition info CSV by providing:
## - no arguments
## - new catalog directory
## - full path to the csv file
info.write_to_file()
info.write_to_file(catalog_path=tmp_path)
info.write_to_file(partition_info_file=os.path.join(tmp_path, "new_csv.csv"))

## Can write out the _metadata file by providing:
## - no arguments
## - new catalog directory
info.write_to_metadata_files()
info.write_to_metadata_files(catalog_path=tmp_path)

0 comments on commit 684cd83

Please sign in to comment.