From 3dbfd341566580fbdef1f652ce6d178fdd86b9a7 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com> Date: Thu, 1 Feb 2024 15:34:01 -0500 Subject: [PATCH] Use tmp_path in margin generation (#207) * Use tmp_path in margin generation * Lint issue. --- src/hipscat_import/margin_cache/margin_cache.py | 3 ++- src/hipscat_import/margin_cache/margin_cache_map_reduce.py | 7 ++++--- .../margin_cache/test_margin_cache_map_reduce.py | 5 +++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/hipscat_import/margin_cache/margin_cache.py b/src/hipscat_import/margin_cache/margin_cache.py index 6f30551e..84705202 100644 --- a/src/hipscat_import/margin_cache/margin_cache.py +++ b/src/hipscat_import/margin_cache/margin_cache.py @@ -59,7 +59,7 @@ def _map_to_margin_shards(client, args, partition_pixels, margin_pairs): partition_file=partition_file, margin_pairs=mp_future, margin_threshold=args.margin_threshold, - output_path=args.catalog_path, + output_path=args.tmp_path, margin_order=args.margin_order, ra_column=args.catalog.catalog_info.ra_column, dec_column=args.catalog.catalog_info.dec_column, @@ -83,6 +83,7 @@ def _reduce_margin_shards(client, args, partition_pixels): futures.append( client.submit( mcmr.reduce_margin_shards, + intermediate_directory=args.tmp_path, output_path=args.catalog_path, partition_order=pix.order, partition_pixel=pix.pixel, diff --git a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py index 4eb6f194..8ede0e6e 100644 --- a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py +++ b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py @@ -96,10 +96,11 @@ def _to_pixel_shard(data, margin_threshold, output_path, ra_column, dec_column): del data, margin_data, final_df -def reduce_margin_shards(output_path, partition_order, partition_pixel): +def reduce_margin_shards(intermediate_directory, output_path, partition_order, partition_pixel): """Reduce all partition pixel directories into a single file""" - shard_dir = get_pixel_cache_directory(output_path, HealpixPixel(partition_order, partition_pixel)) - + shard_dir = get_pixel_cache_directory( + intermediate_directory, HealpixPixel(partition_order, partition_pixel) + ) if file_io.does_file_or_directory_exist(shard_dir): data = ds.dataset(shard_dir, format="parquet") full_df = data.to_table().to_pandas() diff --git a/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py b/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py index 5f150d57..deec432c 100644 --- a/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py +++ b/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py @@ -69,7 +69,8 @@ def test_to_pixel_shard_polar(tmp_path, polar_data_shard_df): @pytest.mark.dask def test_reduce_margin_shards(tmp_path, basic_data_shard_df): - partition_dir = get_pixel_cache_directory(tmp_path, HealpixPixel(1, 21)) + intermediate_dir = os.path.join(tmp_path, "intermediate") + partition_dir = get_pixel_cache_directory(intermediate_dir, HealpixPixel(1, 21)) shard_dir = paths.pixel_directory(partition_dir, 1, 21) os.makedirs(shard_dir) @@ -82,7 +83,7 @@ def test_reduce_margin_shards(tmp_path, basic_data_shard_df): shard_df.to_parquet(first_shard_path) shard_df.to_parquet(second_shard_path) - margin_cache_map_reduce.reduce_margin_shards(tmp_path, 1, 21) + margin_cache_map_reduce.reduce_margin_shards(intermediate_dir, tmp_path, 1, 21) result_path = paths.pixel_catalog_file(tmp_path, 1, 21)