Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix margin cache generation leaving empty shard dirs #205

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/hipscat_import/margin_cache/margin_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,5 +145,7 @@ def generate_margin_cache(args, client):
catalog_base_dir=args.catalog_path, dataset_info=margin_catalog_info
)
step_progress.update(1)
cache_path = mcmr.get_cache_directory(args.catalog_path)
file_io.remove_directory(args.tmp_path, ignore_errors=True)
file_io.remove_directory(cache_path, ignore_errors=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. It looks like the margin cache process doesn't make use of the args.tmp_path at all, and would have been cleaned up by the above step, if it were. I created a branch that uses the tmp_path instead (https://github.com/astronomy-commons/hipscat-import/tree/delucchi/margin_tmp). Can you see if that addresses the empty shard dir problem? I'd like users to be able to use a different path for tmp files (see this doc: https://hipscat-import.readthedocs.io/en/latest/catalogs/temp_files.html)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see! That makes sense, and yeah I tested with that branch and it solves the issue. I can close this PR and we can go with that solution if you think that's better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Sending you the PR.

step_progress.update(1)
11 changes: 9 additions & 2 deletions src/hipscat_import/margin_cache/margin_cache_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ def _to_pixel_shard(data, margin_threshold, output_path, ra_column, dec_column):

if len(margin_data):
# generate a file name for our margin shard, that uses both sets of Norder/Npix
partition_dir = get_pixel_cache_directory(output_path, HealpixPixel(order, pix))
cache_path = get_cache_directory(output_path)
partition_dir = get_pixel_cache_directory(cache_path, HealpixPixel(order, pix))
shard_dir = paths.pixel_directory(partition_dir, source_order, source_pix)

file_io.make_directory(shard_dir, exist_ok=True)
Expand Down Expand Up @@ -96,9 +97,15 @@ def _to_pixel_shard(data, margin_threshold, output_path, ra_column, dec_column):
del data, margin_data, final_df


def get_cache_directory(output_path):
"""Generate the base path to store the pixel shards under"""
return file_io.append_paths_to_pointer(output_path, "cache")


def reduce_margin_shards(output_path, partition_order, partition_pixel):
"""Reduce all partition pixel directories into a single file"""
shard_dir = get_pixel_cache_directory(output_path, HealpixPixel(partition_order, partition_pixel))
cache_path = get_cache_directory(output_path)
shard_dir = get_pixel_cache_directory(cache_path, HealpixPixel(partition_order, partition_pixel))

if file_io.does_file_or_directory_exist(shard_dir):
data = ds.dataset(shard_dir, format="parquet")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from hipscat.pixel_math.healpix_pixel import HealpixPixel

from hipscat_import.margin_cache import margin_cache_map_reduce
from hipscat_import.margin_cache.margin_cache_map_reduce import get_cache_directory
from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory

keep_cols = ["weird_ra", "weird_dec"]
Expand Down Expand Up @@ -43,7 +44,9 @@ def test_to_pixel_shard_equator(tmp_path, basic_data_shard_df):
dec_column="weird_dec",
)

path = os.path.join(tmp_path, "order_1", "dir_0", "pixel_21", "Norder=1", "Dir=0", "Npix=0.parquet")
cache_path = get_cache_directory(tmp_path)

path = os.path.join(cache_path, "order_1", "dir_0", "pixel_21", "Norder=1", "Dir=0", "Npix=0.parquet")

assert os.path.exists(path)

Expand All @@ -60,7 +63,9 @@ def test_to_pixel_shard_polar(tmp_path, polar_data_shard_df):
dec_column="weird_dec",
)

path = os.path.join(tmp_path, "order_2", "dir_0", "pixel_15", "Norder=2", "Dir=0", "Npix=0.parquet")
cache_path = get_cache_directory(tmp_path)

path = os.path.join(cache_path, "order_2", "dir_0", "pixel_15", "Norder=2", "Dir=0", "Npix=0.parquet")

assert os.path.exists(path)

Expand All @@ -69,7 +74,8 @@ def test_to_pixel_shard_polar(tmp_path, polar_data_shard_df):

@pytest.mark.dask
def test_reduce_margin_shards(tmp_path, basic_data_shard_df):
partition_dir = get_pixel_cache_directory(tmp_path, HealpixPixel(1, 21))
cache_path = get_cache_directory(tmp_path)
partition_dir = get_pixel_cache_directory(cache_path, HealpixPixel(1, 21))
shard_dir = paths.pixel_directory(partition_dir, 1, 21)

os.makedirs(shard_dir)
Expand Down
Loading