Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address smoke test failure - use new method. #16

Merged
merged 1 commit into from
Jan 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/lsdb_macauff/import_pipeline/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from hipscat.io import file_io, paths
from hipscat.pixel_math.healpix_pixel import HealpixPixel
from hipscat.pixel_math.healpix_pixel_function import get_pixel_argsort
from hipscat_import.catalog.map_reduce import _get_pixel_directory, _iterate_input_file
from hipscat_import.catalog.map_reduce import _iterate_input_file
from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory

from lsdb_macauff.import_pipeline.resume_plan import MacauffResumePlan

Expand Down Expand Up @@ -62,7 +63,7 @@ def split_associations(

filtered_data = data.filter(items=data_indexes, axis=0)

pixel_dir = _get_pixel_directory(tmp_path, pixel.order, pixel.pixel)
pixel_dir = get_pixel_cache_directory(tmp_path, pixel)
file_io.make_directory(pixel_dir, exist_ok=True)
output_file = file_io.append_paths_to_pointer(
pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet"
Expand All @@ -76,7 +77,7 @@ def split_associations(
def reduce_associations(left_pixel, tmp_path, catalog_path, reduce_key):
"""For all points determined to be in the target left_pixel, map them to the appropriate right_pixel
and aggregate into a single parquet file."""
inputs = _get_pixel_directory(tmp_path, left_pixel.order, left_pixel.pixel)
inputs = get_pixel_cache_directory(tmp_path, left_pixel)

if not file_io.directory_has_contents(inputs):
MacauffResumePlan.reducing_key_done(
Expand Down