Skip to content

Commit

Permalink
Prefer reading CSV and issue warning (#203)
Browse files Browse the repository at this point in the history
* Prefer reading CSV and issue warning

* Update docstrings
  • Loading branch information
delucchi-cmu authored Feb 1, 2024
1 parent 22467ac commit fb575dd
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 21 deletions.
18 changes: 11 additions & 7 deletions src/hipscat/catalog/association_catalog/partition_join_info.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Container class to hold primary-to-join partition metadata"""
from __future__ import annotations

import warnings
from typing import Dict, List

import numpy as np
Expand Down Expand Up @@ -121,8 +122,10 @@ def write_to_csv(self, catalog_path: FilePointer, storage_options: dict = None):
def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = None) -> PartitionJoinInfo:
"""Read partition join info from a file within a hipscat directory.
This will look for a `_metadata` file, and if not found, will look for
a `partition_join_info.csv` file.
This will look for a `partition_join_info.csv` file, and if not found, will look for
a `_metadata` file. The second approach is typically slower for large catalogs
therefore a warning is issued to the user. In internal testing with large catalogs,
the first approach takes less than a second, while the second can take 10-20 seconds.
Args:
catalog_base_dir: path to the root directory of the catalog
Expand All @@ -136,14 +139,15 @@ def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = No
"""
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
partition_join_info_file = paths.get_partition_join_info_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
partition_join_info = PartitionJoinInfo.read_from_file(
metadata_file, storage_options=storage_options
)
elif file_io.does_file_or_directory_exist(partition_join_info_file, storage_options=storage_options):
if file_io.does_file_or_directory_exist(partition_join_info_file, storage_options=storage_options):
partition_join_info = PartitionJoinInfo.read_from_csv(
partition_join_info_file, storage_options=storage_options
)
elif file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
warnings.warn("Reading partitions from parquet metadata. This is typically slow.")
partition_join_info = PartitionJoinInfo.read_from_file(
metadata_file, storage_options=storage_options
)
else:
raise FileNotFoundError(
f"_metadata or partition join info file is required in catalog directory {catalog_base_dir}"
Expand Down
18 changes: 10 additions & 8 deletions src/hipscat/catalog/partition_info.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Container class to hold per-partition metadata"""
from __future__ import annotations

import warnings
from typing import List

import numpy as np
Expand Down Expand Up @@ -82,8 +83,10 @@ def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: di
def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = None) -> PartitionInfo:
"""Read partition info from a file within a hipscat directory.
This will look for a `_metadata` file, and if not found, will look for
a `partition_info.csv` file.
This will look for a `partition_info.csv` file, and if not found, will look for
a `_metadata` file. The second approach is typically slower for large catalogs
therefore a warning is issued to the user. In internal testing with large catalogs,
the first approach takes less than a second, while the second can take 10-20 seconds.
Args:
catalog_base_dir: path to the root directory of the catalog
Expand All @@ -97,10 +100,11 @@ def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = No
"""
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
partition_info_file = paths.get_partition_info_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
partition_info = PartitionInfo.read_from_file(metadata_file, storage_options=storage_options)
elif file_io.does_file_or_directory_exist(partition_info_file, storage_options=storage_options):
if file_io.does_file_or_directory_exist(partition_info_file, storage_options=storage_options):
partition_info = PartitionInfo.read_from_csv(partition_info_file, storage_options=storage_options)
elif file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
warnings.warn("Reading partitions from parquet metadata. This is typically slow.")
partition_info = PartitionInfo.read_from_file(metadata_file, storage_options=storage_options)
else:
raise FileNotFoundError(
f"_metadata or partition info file is required in catalog directory {catalog_base_dir}"
Expand Down Expand Up @@ -202,9 +206,7 @@ def as_dataframe(self):
for pixel in self.pixel_list:
partition_info_dict[PartitionInfo.METADATA_ORDER_COLUMN_NAME].append(pixel.order)
partition_info_dict[PartitionInfo.METADATA_PIXEL_COLUMN_NAME].append(pixel.pixel)
partition_info_dict[PartitionInfo.METADATA_DIR_COLUMN_NAME].append(
int(pixel.pixel / 10_000) * 10_000
)
partition_info_dict[PartitionInfo.METADATA_DIR_COLUMN_NAME].append(pixel.dir)
return pd.DataFrame.from_dict(partition_info_dict)

@classmethod
Expand Down
2 changes: 2 additions & 0 deletions tests/data/small_sky/partition_info.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Norder,Npix,Dir
0,11,0
5 changes: 5 additions & 0 deletions tests/data/small_sky_order1/partition_info.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Norder,Npix,Dir
1,44,0
1,45,0
1,46,0
1,47,0
15 changes: 15 additions & 0 deletions tests/data/small_sky_source/partition_info.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Norder,Npix,Dir
0,4,0
1,47,0
2,176,0
2,177,0
2,178,0
2,179,0
2,180,0
2,181,0
2,182,0
2,183,0
2,184,0
2,185,0
2,186,0
2,187,0
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ def test_empty_directory(tmp_path, association_catalog_info_data, association_ca
## Now we create the needed _metadata and everything is right.
part_info = PartitionJoinInfo(association_catalog_join_pixels)
part_info.write_to_metadata_files(catalog_path=catalog_path)
catalog = AssociationCatalog.read_from_hipscat(catalog_path)
with pytest.warns(UserWarning, match="slow"):
catalog = AssociationCatalog.read_from_hipscat(catalog_path)
assert catalog.catalog_name == association_catalog_info_data["catalog_name"]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ def test_load_partition_join_info_from_dir_fail(tmp_path):
# The file is there, but doesn't have the required content.
metadata_filename = os.path.join(tmp_path, "_metadata")
empty_dataframe.to_parquet(metadata_filename)
with pytest.raises(ValueError, match="missing columns"):
PartitionJoinInfo.read_from_dir(tmp_path)
with pytest.warns(UserWarning, match="slow"):
with pytest.raises(ValueError, match="missing columns"):
PartitionJoinInfo.read_from_dir(tmp_path)


def test_primary_to_join_map(association_catalog_join_pixels):
Expand Down
3 changes: 2 additions & 1 deletion tests/hipscat/catalog/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ def test_empty_directory(tmp_path):
part_info = PartitionInfo.from_healpix([HealpixPixel(0, 11)])
part_info.write_to_metadata_files(catalog_path=catalog_path)

catalog = Catalog.read_from_hipscat(catalog_path)
with pytest.warns(UserWarning, match="slow"):
catalog = Catalog.read_from_hipscat(catalog_path)
assert catalog.catalog_name == "empty"


Expand Down
5 changes: 3 additions & 2 deletions tests/hipscat/catalog/test_partition_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ def test_load_partition_info_from_dir_fail(tmp_path):
# The file is there, but doesn't have the required content.
metadata_filename = os.path.join(tmp_path, "_metadata")
empty_dataframe.to_parquet(metadata_filename)
with pytest.raises(ValueError, match="missing Norder"):
PartitionInfo.read_from_dir(tmp_path)
with pytest.warns(UserWarning, match="slow"):
with pytest.raises(ValueError, match="missing Norder"):
PartitionInfo.read_from_dir(tmp_path)


def test_load_partition_info_small_sky_order1(small_sky_order1_dir):
Expand Down
1 change: 1 addition & 0 deletions tests/hipscat/io/file_io/test_file_pointers.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def test_get_directory_contents(small_sky_order1_dir, tmp_path):
"_common_metadata",
"_metadata",
"catalog_info.json",
"partition_info.csv",
"point_map.fits",
"provenance_info.json",
]
Expand Down

0 comments on commit fb575dd

Please sign in to comment.