Skip to content

Commit 959488a

Browse files
committed
Keep directory and use to infer write location
1 parent c2e8bfd commit 959488a

File tree

4 files changed

+177
-22
lines changed

4 files changed

+177
-22
lines changed

src/hipscat/catalog/association_catalog/partition_join_info.py

+55-9
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ class PartitionJoinInfo:
3434
JOIN_PIXEL_COLUMN_NAME,
3535
]
3636

37-
def __init__(self, join_info_df: pd.DataFrame) -> None:
37+
def __init__(self, join_info_df: pd.DataFrame, catalog_base_dir: str = None) -> None:
3838
self.data_frame = join_info_df
39+
self.catalog_base_dir = catalog_base_dir
3940
self._check_column_names()
4041

4142
def _check_column_names(self):
@@ -69,13 +70,21 @@ def primary_to_join_map(self) -> Dict[HealpixPixel, List[HealpixPixel]]:
6970
primary_to_join = dict(primary_to_join)
7071
return primary_to_join
7172

72-
def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: dict = None):
73+
def write_to_metadata_files(self, catalog_path: FilePointer = None, storage_options: dict = None):
7374
"""Generate parquet metadata, using the known partitions.
7475
7576
Args:
7677
catalog_path (FilePointer): base path for the catalog
7778
storage_options (dict): dictionary that contains abstract filesystem credentials
79+
80+
Raises:
81+
ValueError: if no path is provided, and could not be inferred.
7882
"""
83+
if catalog_path is None:
84+
if self.catalog_base_dir is None:
85+
raise ValueError("catalog_path is required if info was not loaded from a directory")
86+
catalog_path = self.catalog_base_dir
87+
7988
batches = [
8089
[
8190
pa.RecordBatch.from_arrays(
@@ -94,7 +103,7 @@ def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: di
94103

95104
write_parquet_metadata_for_batches(batches, catalog_path, storage_options)
96105

97-
def write_to_csv(self, catalog_path: FilePointer, storage_options: dict = None):
106+
def write_to_csv(self, catalog_path: FilePointer = None, storage_options: dict = None):
98107
"""Write all partition data to CSV files.
99108
100109
Two files will be written::
@@ -106,7 +115,15 @@ def write_to_csv(self, catalog_path: FilePointer, storage_options: dict = None):
106115
catalog_path: FilePointer to the directory where the
107116
`partition_join_info.csv` file will be written
108117
storage_options (dict): dictionary that contains abstract filesystem credentials
118+
119+
Raises:
120+
ValueError: if no path is provided, and could not be inferred.
109121
"""
122+
if catalog_path is None:
123+
if self.catalog_base_dir is None:
124+
raise ValueError("catalog_path is required if info was not loaded from a directory")
125+
catalog_path = self.catalog_base_dir
126+
110127
partition_join_info_file = paths.get_partition_join_info_pointer(catalog_path)
111128
file_io.write_dataframe_to_csv(
112129
self.data_frame, partition_join_info_file, index=False, storage_options=storage_options
@@ -141,26 +158,41 @@ def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = No
141158
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
142159
partition_join_info_file = paths.get_partition_join_info_pointer(catalog_base_dir)
143160
if file_io.does_file_or_directory_exist(partition_join_info_file, storage_options=storage_options):
144-
partition_join_info = PartitionJoinInfo.read_from_csv(
161+
pixel_frame = PartitionJoinInfo._read_from_csv(
145162
partition_join_info_file, storage_options=storage_options
146163
)
147164
elif file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
148165
warnings.warn("Reading partitions from parquet metadata. This is typically slow.")
149-
partition_join_info = PartitionJoinInfo.read_from_file(
166+
pixel_frame = PartitionJoinInfo._read_from_metadata_file(
150167
metadata_file, storage_options=storage_options
151168
)
152169
else:
153170
raise FileNotFoundError(
154171
f"_metadata or partition join info file is required in catalog directory {catalog_base_dir}"
155172
)
156-
return partition_join_info
173+
return cls(pixel_frame, catalog_base_dir)
157174

158175
@classmethod
159176
def read_from_file(
160177
cls, metadata_file: FilePointer, strict=False, storage_options: dict = None
161178
) -> PartitionJoinInfo:
162179
"""Read partition join info from a `_metadata` file to create an object
163180
181+
Args:
182+
metadata_file (FilePointer): FilePointer to the `_metadata` file
183+
storage_options (dict): dictionary that contains abstract filesystem credentials
184+
185+
Returns:
186+
A `PartitionJoinInfo` object with the data from the file
187+
"""
188+
return cls(cls._read_from_metadata_file(metadata_file, strict, storage_options))
189+
190+
@classmethod
191+
def _read_from_metadata_file(
192+
cls, metadata_file: FilePointer, strict=False, storage_options: dict = None
193+
) -> pd.DataFrame:
194+
"""Read partition join info from a `_metadata` file to create an object
195+
164196
Args:
165197
metadata_file (FilePointer): FilePointer to the `_metadata` file
166198
storage_options (dict): dictionary that contains abstract filesystem credentials
@@ -229,14 +261,29 @@ def read_from_file(
229261
columns=cls.COLUMN_NAMES,
230262
)
231263

232-
return cls(pixel_frame)
264+
return pixel_frame
233265

234266
@classmethod
235267
def read_from_csv(
236268
cls, partition_join_info_file: FilePointer, storage_options: dict = None
237269
) -> PartitionJoinInfo:
238270
"""Read partition join info from a `partition_join_info.csv` file to create an object
239271
272+
Args:
273+
partition_join_info_file (FilePointer): FilePointer to the `partition_join_info.csv` file
274+
storage_options (dict): dictionary that contains abstract filesystem credentials
275+
276+
Returns:
277+
A `PartitionJoinInfo` object with the data from the file
278+
"""
279+
return cls(cls._read_from_csv(partition_join_info_file, storage_options))
280+
281+
@classmethod
282+
def _read_from_csv(
283+
cls, partition_join_info_file: FilePointer, storage_options: dict = None
284+
) -> pd.DataFrame:
285+
"""Read partition join info from a `partition_join_info.csv` file to create an object
286+
240287
Args:
241288
partition_join_info_file (FilePointer): FilePointer to the `partition_join_info.csv` file
242289
storage_options (dict): dictionary that contains abstract filesystem credentials
@@ -251,5 +298,4 @@ def read_from_csv(
251298
f"No partition join info found where expected: {str(partition_join_info_file)}"
252299
)
253300

254-
data_frame = file_io.load_csv_to_pandas(partition_join_info_file, storage_options=storage_options)
255-
return cls(data_frame)
301+
return file_io.load_csv_to_pandas(partition_join_info_file, storage_options=storage_options)

src/hipscat/catalog/partition_info.py

+70-13
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ class PartitionInfo:
2525
METADATA_DIR_COLUMN_NAME = "Dir"
2626
METADATA_PIXEL_COLUMN_NAME = "Npix"
2727

28-
def __init__(self, pixel_list: List[HealpixPixel]) -> None:
28+
def __init__(self, pixel_list: List[HealpixPixel], catalog_base_dir: str = None) -> None:
2929
self.pixel_list = pixel_list
30+
self.catalog_base_dir = catalog_base_dir
3031

3132
def get_healpix_pixels(self) -> List[HealpixPixel]:
3233
"""Get healpix pixel objects for all pixels represented as partitions.
@@ -45,25 +46,55 @@ def get_highest_order(self) -> int:
4546
max_pixel = np.max(self.pixel_list)
4647
return max_pixel.order
4748

48-
def write_to_file(self, partition_info_file: FilePointer, storage_options: dict = None):
49+
def write_to_file(
50+
self,
51+
partition_info_file: FilePointer = None,
52+
catalog_path: FilePointer = None,
53+
storage_options: dict = None,
54+
):
4955
"""Write all partition data to CSV file.
5056
57+
If no paths are provided, the catalog base directory from the `read_from_dir` call is used.
58+
5159
Args:
5260
partition_info_file: FilePointer to where the `partition_info.csv`
53-
file will be written
61+
file will be written.
62+
catalog_path: base directory for a catalog where the `partition_info.csv`
63+
file will be written.
5464
storage_options (dict): dictionary that contains abstract filesystem credentials
65+
66+
Raises:
67+
ValueError: if no path is provided, and could not be inferred.
5568
"""
69+
if partition_info_file is None:
70+
if catalog_path is not None:
71+
partition_info_file = paths.get_partition_info_pointer(catalog_path)
72+
elif self.catalog_base_dir is not None:
73+
partition_info_file = paths.get_partition_info_pointer(self.catalog_base_dir)
74+
else:
75+
raise ValueError("partition_info_file is required if info was not loaded from a directory")
76+
5677
file_io.write_dataframe_to_csv(
5778
self.as_dataframe(), partition_info_file, index=False, storage_options=storage_options
5879
)
5980

60-
def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: dict = None):
81+
def write_to_metadata_files(self, catalog_path: FilePointer = None, storage_options: dict = None):
6182
"""Generate parquet metadata, using the known partitions.
6283
84+
If no catalog_path is provided, the catalog base directory from the `read_from_dir` call is used.
85+
6386
Args:
6487
catalog_path (FilePointer): base path for the catalog
6588
storage_options (dict): dictionary that contains abstract filesystem credentials
89+
90+
Raises:
91+
ValueError: if no path is provided, and could not be inferred.
6692
"""
93+
if catalog_path is None:
94+
if self.catalog_base_dir is None:
95+
raise ValueError("catalog_path is required if info was not loaded from a directory")
96+
catalog_path = self.catalog_base_dir
97+
6798
batches = [
6899
[
69100
pa.RecordBatch.from_arrays(
@@ -102,22 +133,39 @@ def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = No
102133
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
103134
partition_info_file = paths.get_partition_info_pointer(catalog_base_dir)
104135
if file_io.does_file_or_directory_exist(partition_info_file, storage_options=storage_options):
105-
partition_info = PartitionInfo.read_from_csv(partition_info_file, storage_options=storage_options)
136+
pixel_list = PartitionInfo._read_from_csv(partition_info_file, storage_options=storage_options)
106137
elif file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
107138
warnings.warn("Reading partitions from parquet metadata. This is typically slow.")
108-
partition_info = PartitionInfo.read_from_file(metadata_file, storage_options=storage_options)
139+
pixel_list = PartitionInfo._read_from_metadata_file(
140+
metadata_file, storage_options=storage_options
141+
)
109142
else:
110143
raise FileNotFoundError(
111144
f"_metadata or partition info file is required in catalog directory {catalog_base_dir}"
112145
)
113-
return partition_info
146+
return cls(pixel_list, catalog_base_dir)
114147

115148
@classmethod
116149
def read_from_file(
117150
cls, metadata_file: FilePointer, strict=False, storage_options: dict = None
118151
) -> PartitionInfo:
119152
"""Read partition info from a `_metadata` file to create an object
120153
154+
Args:
155+
metadata_file (FilePointer): FilePointer to the `_metadata` file
156+
storage_options (dict): dictionary that contains abstract filesystem credentials
157+
158+
Returns:
159+
A `PartitionInfo` object with the data from the file
160+
"""
161+
return cls(cls._read_from_metadata_file(metadata_file, strict, storage_options))
162+
163+
@classmethod
164+
def _read_from_metadata_file(
165+
cls, metadata_file: FilePointer, strict=False, storage_options: dict = None
166+
) -> List[HealpixPixel]:
167+
"""Read partition info list from a `_metadata` file.
168+
121169
Args:
122170
metadata_file (FilePointer): FilePointer to the `_metadata` file
123171
storage_options (dict): dictionary that contains abstract filesystem credentials
@@ -163,14 +211,25 @@ def read_from_file(
163211
## Remove duplicates, preserving order.
164212
## In the case of association partition join info, we may have multiple entries
165213
## for the primary order/pixels.
166-
pixel_list = list(dict.fromkeys(pixel_list))
167-
168-
return cls(pixel_list)
214+
return list(dict.fromkeys(pixel_list))
169215

170216
@classmethod
171217
def read_from_csv(cls, partition_info_file: FilePointer, storage_options: dict = None) -> PartitionInfo:
172218
"""Read partition info from a `partition_info.csv` file to create an object
173219
220+
Args:
221+
partition_info_file (FilePointer): FilePointer to the `partition_info.csv` file
222+
storage_options (dict): dictionary that contains abstract filesystem credentials
223+
224+
Returns:
225+
A `PartitionInfo` object with the data from the file
226+
"""
227+
return cls(cls._read_from_csv(partition_info_file, storage_options))
228+
229+
@classmethod
230+
def _read_from_csv(cls, partition_info_file: FilePointer, storage_options: dict = None) -> PartitionInfo:
231+
"""Read partition info from a `partition_info.csv` file to create an object
232+
174233
Args:
175234
partition_info_file (FilePointer): FilePointer to the `partition_info.csv` file
176235
storage_options (dict): dictionary that contains abstract filesystem credentials
@@ -183,16 +242,14 @@ def read_from_csv(cls, partition_info_file: FilePointer, storage_options: dict =
183242

184243
data_frame = file_io.load_csv_to_pandas(partition_info_file, storage_options=storage_options)
185244

186-
pixel_list = [
245+
return [
187246
HealpixPixel(order, pixel)
188247
for order, pixel in zip(
189248
data_frame[cls.METADATA_ORDER_COLUMN_NAME],
190249
data_frame[cls.METADATA_PIXEL_COLUMN_NAME],
191250
)
192251
]
193252

194-
return cls(pixel_list)
195-
196253
def as_dataframe(self):
197254
"""Construct a pandas dataframe for the partition info pixels.
198255

tests/hipscat/catalog/association_catalog/test_partition_join_info.py

+25
Original file line numberDiff line numberDiff line change
@@ -117,3 +117,28 @@ def test_read_from_missing_file(tmp_path):
117117
file_pointer = file_io.get_file_pointer_from_path(wrong_path)
118118
with pytest.raises(FileNotFoundError):
119119
PartitionJoinInfo.read_from_csv(file_pointer)
120+
121+
122+
def test_load_partition_info_from_dir_and_write(tmp_path, association_catalog_join_pixels):
123+
info = PartitionJoinInfo(association_catalog_join_pixels)
124+
125+
## Path arguments are required if the info was not created from a `read_from_dir` call
126+
with pytest.raises(ValueError):
127+
info.write_to_csv()
128+
with pytest.raises(ValueError):
129+
info.write_to_metadata_files()
130+
131+
info.write_to_csv(catalog_path=tmp_path)
132+
info = PartitionJoinInfo.read_from_dir(tmp_path)
133+
134+
## Can write out the partition info CSV by providing:
135+
## - no arguments
136+
## - new catalog directory
137+
info.write_to_csv()
138+
info.write_to_csv(catalog_path=tmp_path)
139+
140+
## Can write out the _metadata file by providing:
141+
## - no arguments
142+
## - new catalog directory
143+
info.write_to_metadata_files()
144+
info.write_to_metadata_files(catalog_path=tmp_path)

tests/hipscat/catalog/test_partition_info.py

+27
Original file line numberDiff line numberDiff line change
@@ -132,3 +132,30 @@ def test_write_to_file_sorted(tmp_path, pixel_list_depth_first, pixel_list_bread
132132
new_partition_info = PartitionInfo.read_from_file(partition_info_pointer)
133133

134134
npt.assert_array_equal(pixel_list_breadth_first, new_partition_info.get_healpix_pixels())
135+
136+
137+
def test_load_partition_info_from_dir_and_write(tmp_path, pixel_list_depth_first):
138+
partition_info = PartitionInfo.from_healpix(pixel_list_depth_first)
139+
140+
## Path arguments are required if the info was not created from a `read_from_dir` call
141+
with pytest.raises(ValueError):
142+
partition_info.write_to_file()
143+
with pytest.raises(ValueError):
144+
partition_info.write_to_metadata_files()
145+
146+
partition_info.write_to_file(catalog_path=tmp_path)
147+
info = PartitionInfo.read_from_dir(tmp_path)
148+
149+
## Can write out the partition info CSV by providing:
150+
## - no arguments
151+
## - new catalog directory
152+
## - full path to the csv file
153+
info.write_to_file()
154+
info.write_to_file(catalog_path=tmp_path)
155+
info.write_to_file(partition_info_file=os.path.join(tmp_path, "new_csv.csv"))
156+
157+
## Can write out the _metadata file by providing:
158+
## - no arguments
159+
## - new catalog directory
160+
info.write_to_metadata_files()
161+
info.write_to_metadata_files(catalog_path=tmp_path)

0 commit comments

Comments
 (0)