From d02353d08279402ed13eadb39ebe300b5b68f7a7 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com> Date: Wed, 7 Feb 2024 14:55:36 -0500 Subject: [PATCH] Create partition info csvs on catalog creation. (#209) * Create partition info csvs on catalog creation. * Require recent hipscat library. --- pyproject.toml | 2 +- src/hipscat_import/catalog/run_import.py | 5 ++++- src/hipscat_import/margin_cache/margin_cache.py | 13 +++++++++++-- src/hipscat_import/soap/run_soap.py | 7 +++++++ 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d0263a6c..dce2156e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,7 @@ dependencies = [ "dask[distributed]", "deprecated", "healpy", - "hipscat >= 0.2.0", + "hipscat >= 0.2.4", "ipykernel", # Support for Jupyter notebooks "pandas < 2.1.0", "pyarrow", diff --git a/src/hipscat_import/catalog/run_import.py b/src/hipscat_import/catalog/run_import.py index 729d2a20..a3ce5d3c 100644 --- a/src/hipscat_import/catalog/run_import.py +++ b/src/hipscat_import/catalog/run_import.py @@ -9,6 +9,7 @@ import numpy as np from hipscat import pixel_math from hipscat.catalog import PartitionInfo +from hipscat.io import paths from hipscat.io.parquet_metadata import write_parquet_metadata from tqdm import tqdm @@ -168,10 +169,12 @@ def run(args, client): storage_options=args.output_storage_options, ) step_progress.update(1) + partition_info = PartitionInfo.from_healpix(destination_pixel_map.keys()) + partition_info_file = paths.get_partition_info_pointer(args.catalog_path) + partition_info.write_to_file(partition_info_file, storage_options=args.output_storage_options) if not args.debug_stats_only: write_parquet_metadata(args.catalog_path, storage_options=args.output_storage_options) else: - partition_info = PartitionInfo.from_healpix(destination_pixel_map.keys()) partition_info.write_to_metadata_files( args.catalog_path, storage_options=args.output_storage_options ) diff --git a/src/hipscat_import/margin_cache/margin_cache.py b/src/hipscat_import/margin_cache/margin_cache.py index 84705202..8688ed54 100644 --- a/src/hipscat_import/margin_cache/margin_cache.py +++ b/src/hipscat_import/margin_cache/margin_cache.py @@ -1,6 +1,7 @@ import pandas as pd from dask.distributed import as_completed from hipscat import pixel_math +from hipscat.catalog import PartitionInfo from hipscat.io import file_io, parquet_metadata, paths, write_metadata from tqdm import tqdm @@ -130,10 +131,19 @@ def generate_margin_cache(args, client): with tqdm(total=4, desc="Finishing", disable=not args.progress_bar) as step_progress: parquet_metadata.write_parquet_metadata(args.catalog_path) + step_progress.update(1) total_rows = 0 metadata_path = paths.get_parquet_metadata_pointer(args.catalog_path) - for row_group in parquet_metadata.read_row_group_fragments(metadata_path): + for row_group in parquet_metadata.read_row_group_fragments( + metadata_path, storage_options=args.output_storage_options + ): total_rows += row_group.num_rows + partition_info = PartitionInfo.read_from_file( + metadata_path, storage_options=args.output_storage_options + ) + partition_info_file = paths.get_partition_info_pointer(args.catalog_path) + partition_info.write_to_file(partition_info_file, storage_options=args.output_storage_options) + step_progress.update(1) margin_catalog_info = args.to_catalog_info(int(total_rows)) write_metadata.write_provenance_info( @@ -141,7 +151,6 @@ def generate_margin_cache(args, client): dataset_info=margin_catalog_info, tool_args=args.provenance_info(), ) - step_progress.update(1) write_metadata.write_catalog_info( catalog_base_dir=args.catalog_path, dataset_info=margin_catalog_info ) diff --git a/src/hipscat_import/soap/run_soap.py b/src/hipscat_import/soap/run_soap.py index 321eb011..67349c7c 100644 --- a/src/hipscat_import/soap/run_soap.py +++ b/src/hipscat_import/soap/run_soap.py @@ -3,6 +3,7 @@ The actual logic of the map reduce is in the `map_reduce.py` file. """ +from hipscat.catalog.association_catalog.partition_join_info import PartitionJoinInfo from hipscat.io import parquet_metadata, paths, write_metadata from tqdm import tqdm @@ -57,6 +58,12 @@ def run(args, client): metadata_path = paths.get_parquet_metadata_pointer(args.catalog_path) for row_group in parquet_metadata.read_row_group_fragments(metadata_path): total_rows += row_group.num_rows + partition_join_info = PartitionJoinInfo.read_from_file( + metadata_path, storage_options=args.output_storage_options + ) + partition_join_info.write_to_csv( + catalog_path=args.catalog_path, storage_options=args.output_storage_options + ) else: total_rows = combine_partial_results(args.tmp_path, args.catalog_path) step_progress.update(1)