Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loads of small cleanups #203

Merged
merged 4 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 10 additions & 31 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,16 @@
import pyarrow.parquet as pq
from hipscat import pixel_math
from hipscat.io import FilePointer, file_io, paths
from hipscat.pixel_math.healpix_pixel import HealpixPixel
from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN, hipscat_id_to_healpix

from hipscat_import.catalog.file_readers import InputReader
from hipscat_import.catalog.resume_plan import ResumePlan
from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory

# pylint: disable=too-many-locals,too-many-arguments


def _get_pixel_directory(cache_path: FilePointer, order: np.int64, pixel: np.uint64):
"""Create a path for intermediate pixel data.

This will take the form:

<cache_path>/dir_<directory separator>/pixel_<pixel>

where the directory separator is calculated using integer division:

(pixel/10000)*10000

and exists to mitigate problems on file systems that don't support
more than 10_000 children nodes.
"""
dir_number = int(pixel / 10_000) * 10_000
return file_io.append_paths_to_pointer(
cache_path, f"order_{order}", f"dir_{dir_number}", f"pixel_{pixel}"
)


def _has_named_index(dataframe):
"""Heuristic to determine if a dataframe has some meaningful index.

Expand Down Expand Up @@ -172,7 +154,7 @@ def split_pixels(

filtered_data = data.filter(items=data_indexes, axis=0)

pixel_dir = _get_pixel_directory(cache_shard_path, order, pixel)
pixel_dir = get_pixel_cache_directory(cache_shard_path, HealpixPixel(order, pixel))
file_io.make_directory(pixel_dir, exist_ok=True)
output_file = file_io.append_paths_to_pointer(
pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet"
Expand Down Expand Up @@ -259,7 +241,8 @@ def reduce_pixel_shards(
).schema.to_arrow_schema()

tables = []
pixel_dir = _get_pixel_directory(cache_shard_path, destination_pixel_order, destination_pixel_number)
healpix_pixel = HealpixPixel(destination_pixel_order, destination_pixel_number)
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)

if schema:
tables.append(pq.read_table(pixel_dir, schema=schema))
Expand All @@ -273,7 +256,7 @@ def reduce_pixel_shards(
if rows_written != destination_pixel_size:
raise ValueError(
"Unexpected number of objects at pixel "
f"({destination_pixel_order}, {destination_pixel_number})."
f"({healpix_pixel})."
f" Expected {destination_pixel_size}, wrote {rows_written}"
)

Expand All @@ -286,13 +269,9 @@ def reduce_pixel_shards(
dataframe[dec_column].values,
)

dataframe["Norder"] = np.full(rows_written, fill_value=destination_pixel_order, dtype=np.uint8)
dataframe["Dir"] = np.full(
rows_written,
fill_value=int(destination_pixel_number / 10_000) * 10_000,
dtype=np.uint64,
)
dataframe["Npix"] = np.full(rows_written, fill_value=destination_pixel_number, dtype=np.uint64)
dataframe["Norder"] = np.full(rows_written, fill_value=healpix_pixel.order, dtype=np.uint8)
dataframe["Dir"] = np.full(rows_written, fill_value=healpix_pixel.dir, dtype=np.uint64)
dataframe["Npix"] = np.full(rows_written, fill_value=healpix_pixel.pixel, dtype=np.uint64)

if add_hipscat_index:
## If we had a meaningful index before, preserve it as a column.
Expand All @@ -304,7 +283,7 @@ def reduce_pixel_shards(
del dataframe, merged_table, tables

if delete_input_files:
pixel_dir = _get_pixel_directory(cache_shard_path, destination_pixel_order, destination_pixel_number)
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)

file_io.remove_directory(pixel_dir, ignore_errors=True, storage_options=storage_options)

Expand Down
Empty file.
125 changes: 0 additions & 125 deletions src/hipscat_import/cross_match/macauff_arguments.py

This file was deleted.

106 changes: 0 additions & 106 deletions src/hipscat_import/cross_match/macauff_map_reduce.py

This file was deleted.

Loading