From b082078effc50d6660b874f6f5d07df17d97a7f9 Mon Sep 17 00:00:00 2001 From: Sean McGuire Date: Tue, 30 Jan 2024 18:39:51 -0500 Subject: [PATCH 1/2] fix margin cache generation leaving shard dirs --- src/hipscat_import/margin_cache/margin_cache.py | 2 ++ .../margin_cache/margin_cache_map_reduce.py | 10 ++++++++-- .../margin_cache/test_margin_cache_map_reduce.py | 12 +++++++++--- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/src/hipscat_import/margin_cache/margin_cache.py b/src/hipscat_import/margin_cache/margin_cache.py index 6f30551e..d0a00515 100644 --- a/src/hipscat_import/margin_cache/margin_cache.py +++ b/src/hipscat_import/margin_cache/margin_cache.py @@ -145,5 +145,7 @@ def generate_margin_cache(args, client): catalog_base_dir=args.catalog_path, dataset_info=margin_catalog_info ) step_progress.update(1) + cache_path = mcmr.get_cache_directory(args.catalog_path) file_io.remove_directory(args.tmp_path, ignore_errors=True) + file_io.remove_directory(cache_path, ignore_errors=True) step_progress.update(1) diff --git a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py index 4eb6f194..c0b2a871 100644 --- a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py +++ b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py @@ -55,7 +55,8 @@ def _to_pixel_shard(data, margin_threshold, output_path, ra_column, dec_column): if len(margin_data): # generate a file name for our margin shard, that uses both sets of Norder/Npix - partition_dir = get_pixel_cache_directory(output_path, HealpixPixel(order, pix)) + cache_path = get_cache_directory(output_path) + partition_dir = get_pixel_cache_directory(cache_path, HealpixPixel(order, pix)) shard_dir = paths.pixel_directory(partition_dir, source_order, source_pix) file_io.make_directory(shard_dir, exist_ok=True) @@ -96,9 +97,14 @@ def _to_pixel_shard(data, margin_threshold, output_path, ra_column, dec_column): del data, margin_data, final_df +def get_cache_directory(output_path): + return file_io.append_paths_to_pointer(output_path, "cache") + + def reduce_margin_shards(output_path, partition_order, partition_pixel): """Reduce all partition pixel directories into a single file""" - shard_dir = get_pixel_cache_directory(output_path, HealpixPixel(partition_order, partition_pixel)) + cache_path = get_cache_directory(output_path) + shard_dir = get_pixel_cache_directory(cache_path, HealpixPixel(partition_order, partition_pixel)) if file_io.does_file_or_directory_exist(shard_dir): data = ds.dataset(shard_dir, format="parquet") diff --git a/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py b/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py index 5f150d57..260c878e 100644 --- a/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py +++ b/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py @@ -6,6 +6,7 @@ from hipscat.pixel_math.healpix_pixel import HealpixPixel from hipscat_import.margin_cache import margin_cache_map_reduce +from hipscat_import.margin_cache.margin_cache_map_reduce import get_cache_directory from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory keep_cols = ["weird_ra", "weird_dec"] @@ -43,7 +44,9 @@ def test_to_pixel_shard_equator(tmp_path, basic_data_shard_df): dec_column="weird_dec", ) - path = os.path.join(tmp_path, "order_1", "dir_0", "pixel_21", "Norder=1", "Dir=0", "Npix=0.parquet") + cache_path = get_cache_directory(tmp_path) + + path = os.path.join(cache_path, "order_1", "dir_0", "pixel_21", "Norder=1", "Dir=0", "Npix=0.parquet") assert os.path.exists(path) @@ -60,7 +63,9 @@ def test_to_pixel_shard_polar(tmp_path, polar_data_shard_df): dec_column="weird_dec", ) - path = os.path.join(tmp_path, "order_2", "dir_0", "pixel_15", "Norder=2", "Dir=0", "Npix=0.parquet") + cache_path = get_cache_directory(tmp_path) + + path = os.path.join(cache_path, "order_2", "dir_0", "pixel_15", "Norder=2", "Dir=0", "Npix=0.parquet") assert os.path.exists(path) @@ -69,7 +74,8 @@ def test_to_pixel_shard_polar(tmp_path, polar_data_shard_df): @pytest.mark.dask def test_reduce_margin_shards(tmp_path, basic_data_shard_df): - partition_dir = get_pixel_cache_directory(tmp_path, HealpixPixel(1, 21)) + cache_path = get_cache_directory(tmp_path) + partition_dir = get_pixel_cache_directory(cache_path, HealpixPixel(1, 21)) shard_dir = paths.pixel_directory(partition_dir, 1, 21) os.makedirs(shard_dir) From a641c799e8de03f2c31eeb0d97b983d2b4f9c0b0 Mon Sep 17 00:00:00 2001 From: Sean McGuire Date: Tue, 30 Jan 2024 19:00:00 -0500 Subject: [PATCH 2/2] add docstring --- src/hipscat_import/margin_cache/margin_cache_map_reduce.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py index c0b2a871..f9a82428 100644 --- a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py +++ b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py @@ -98,6 +98,7 @@ def _to_pixel_shard(data, margin_threshold, output_path, ra_column, dec_column): def get_cache_directory(output_path): + """Generate the base path to store the pixel shards under""" return file_io.append_paths_to_pointer(output_path, "cache")