From f7c6408719fb9c57192afc906e9c760607071da6 Mon Sep 17 00:00:00 2001 From: "Derek T. Jones" Date: Fri, 27 Dec 2024 21:08:14 +0000 Subject: [PATCH 1/5] Avoid overwriting valid catalogs. Strong refusal without options, since it's easier and safer to move the catalog out of the way. --- src/hats_import/catalog/run_import.py | 9 ++++++++- src/hats_import/margin_cache/margin_cache.py | 7 +++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/hats_import/catalog/run_import.py b/src/hats_import/catalog/run_import.py index 745a367a..62beaabb 100644 --- a/src/hats_import/catalog/run_import.py +++ b/src/hats_import/catalog/run_import.py @@ -1,16 +1,18 @@ """Import a set of non-hats files using dask for parallelization -Methods in this file set up a dask pipeline using futures. +Methods in this file set up a dask pipeline using futures. The actual logic of the map reduce is in the `map_reduce.py` file. """ import os +from pathlib import Path import pickle import hats.io.file_io as io from hats.catalog import PartitionInfo from hats.io import paths from hats.io.parquet_metadata import write_parquet_metadata +from hats.io.validation import is_valid_catalog import hats_import.catalog.map_reduce as mr from hats_import.catalog.arguments import ImportArguments @@ -24,6 +26,11 @@ def run(args, client): if not isinstance(args, ImportArguments): raise ValueError("args must be type ImportArguments") + # Verify that the planned output path is not occupied by a valid catalog + potential_path = Path(args.output_path) / args.output_artifact_name + if is_valid_catalog(potential_path): + raise ValueError(f"Output path {potential_path} already contains a valid catalog") + resume_plan = ResumePlan(import_args=args) pickled_reader_file = os.path.join(resume_plan.tmp_path, "reader.pickle") diff --git a/src/hats_import/margin_cache/margin_cache.py b/src/hats_import/margin_cache/margin_cache.py index ace4af4f..1ca9f784 100644 --- a/src/hats_import/margin_cache/margin_cache.py +++ b/src/hats_import/margin_cache/margin_cache.py @@ -1,8 +1,10 @@ +from pathlib import Path from hats.catalog import PartitionInfo from hats.io import file_io, parquet_metadata, paths import hats_import.margin_cache.margin_cache_map_reduce as mcmr from hats_import.margin_cache.margin_cache_resume_plan import MarginCachePlan +from hats.io.validation import is_valid_catalog # pylint: disable=too-many-locals,too-many-arguments @@ -15,6 +17,11 @@ def generate_margin_cache(args, client): args (MarginCacheArguments): A valid `MarginCacheArguments` object. client (dask.distributed.Client): A dask distributed client object. """ + potential_path = Path(args.output_path) / args.output_artifact_name + # Verify that the planned output path is not occupied by a valid catalog + if is_valid_catalog(potential_path): + raise ValueError(f"Output path {potential_path} already contains a valid catalog") + resume_plan = MarginCachePlan(args) original_catalog_metadata = paths.get_common_metadata_pointer(args.input_catalog_path) From e79297069c718d4be85aa75b6da21de076393e23 Mon Sep 17 00:00:00 2001 From: Derek Jones Date: Tue, 31 Dec 2024 12:19:02 -0800 Subject: [PATCH 2/5] Add unit tests to capture catalog overwrite protection. --- tests/hats_import/catalog/test_run_import.py | 31 ++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/tests/hats_import/catalog/test_run_import.py b/tests/hats_import/catalog/test_run_import.py index 3070de72..d1057ab2 100644 --- a/tests/hats_import/catalog/test_run_import.py +++ b/tests/hats_import/catalog/test_run_import.py @@ -17,6 +17,9 @@ from hats_import.catalog.file_readers import CsvReader from hats_import.catalog.resume_plan import ResumePlan +import hats_import.margin_cache.margin_cache as margin_runner +from hats_import.margin_cache.margin_cache_arguments import MarginCacheArguments + def test_empty_args(): """Runner should fail with empty arguments""" @@ -31,6 +34,34 @@ def test_bad_args(): runner.run(args, None) +def test_no_import_overwrite(small_sky_object_catalog, parquet_shards_dir): + """Runner should refuse to overwrite a valid catalog""" + catalog_dir = small_sky_object_catalog.parent + catalog_name = small_sky_object_catalog.name + args = ImportArguments( + input_path=parquet_shards_dir, + output_path=catalog_dir, + output_artifact_name=catalog_name, + file_reader="parquet", + ) + with pytest.raises(ValueError, match="already contains a valid catalog"): + runner.run(args, None) + + +def test_no_margin_cache_overwrite(small_sky_object_catalog): + """Runner should refuse to generate margin cache which overwrites valid catalog""" + catalog_dir = small_sky_object_catalog.parent + catalog_name = small_sky_object_catalog.name + args = MarginCacheArguments( + input_catalog_path=small_sky_object_catalog, + output_path=catalog_dir, + margin_threshold=10.0, + output_artifact_name=catalog_name, + ) + with pytest.raises(ValueError, match="already contains a valid catalog"): + margin_runner.generate_margin_cache(args, None) + + @pytest.mark.dask def test_resume_dask_runner( dask_client, From 422662c579084a18617ac70845eeaf6487164fc9 Mon Sep 17 00:00:00 2001 From: Derek Jones Date: Tue, 31 Dec 2024 14:16:07 -0800 Subject: [PATCH 3/5] Fix formatting --- src/hats_import/catalog/run_import.py | 2 +- src/hats_import/margin_cache/margin_cache.py | 3 ++- tests/hats_import/catalog/test_run_import.py | 3 +-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/hats_import/catalog/run_import.py b/src/hats_import/catalog/run_import.py index 62beaabb..e8ce5b4c 100644 --- a/src/hats_import/catalog/run_import.py +++ b/src/hats_import/catalog/run_import.py @@ -5,8 +5,8 @@ """ import os -from pathlib import Path import pickle +from pathlib import Path import hats.io.file_io as io from hats.catalog import PartitionInfo diff --git a/src/hats_import/margin_cache/margin_cache.py b/src/hats_import/margin_cache/margin_cache.py index 1ca9f784..4beddfbc 100644 --- a/src/hats_import/margin_cache/margin_cache.py +++ b/src/hats_import/margin_cache/margin_cache.py @@ -1,10 +1,11 @@ from pathlib import Path + from hats.catalog import PartitionInfo from hats.io import file_io, parquet_metadata, paths +from hats.io.validation import is_valid_catalog import hats_import.margin_cache.margin_cache_map_reduce as mcmr from hats_import.margin_cache.margin_cache_resume_plan import MarginCachePlan -from hats.io.validation import is_valid_catalog # pylint: disable=too-many-locals,too-many-arguments diff --git a/tests/hats_import/catalog/test_run_import.py b/tests/hats_import/catalog/test_run_import.py index d1057ab2..1e895f6d 100644 --- a/tests/hats_import/catalog/test_run_import.py +++ b/tests/hats_import/catalog/test_run_import.py @@ -13,11 +13,10 @@ from hats.pixel_math.sparse_histogram import SparseHistogram import hats_import.catalog.run_import as runner +import hats_import.margin_cache.margin_cache as margin_runner from hats_import.catalog.arguments import ImportArguments from hats_import.catalog.file_readers import CsvReader from hats_import.catalog.resume_plan import ResumePlan - -import hats_import.margin_cache.margin_cache as margin_runner from hats_import.margin_cache.margin_cache_arguments import MarginCacheArguments From c3fb952e961fafae298bc6fa491f9ced402b48d4 Mon Sep 17 00:00:00 2001 From: Derek Jones Date: Tue, 31 Dec 2024 14:23:57 -0800 Subject: [PATCH 4/5] Reorganize to limit lines per function, per pylint. --- src/hats_import/catalog/run_import.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/hats_import/catalog/run_import.py b/src/hats_import/catalog/run_import.py index e8ce5b4c..6db80cd5 100644 --- a/src/hats_import/catalog/run_import.py +++ b/src/hats_import/catalog/run_import.py @@ -19,18 +19,27 @@ from hats_import.catalog.resume_plan import ResumePlan -def run(args, client): - """Run catalog creation pipeline.""" +def _validate_arguments(args): + """ + Verify that the args for run are valid: they exist, are of the appropriate type, + and do not specify an output which is a valid catalog. + + Raises ValueError if they are invalid. + """ if not args: raise ValueError("args is required and should be type ImportArguments") if not isinstance(args, ImportArguments): raise ValueError("args must be type ImportArguments") - # Verify that the planned output path is not occupied by a valid catalog potential_path = Path(args.output_path) / args.output_artifact_name if is_valid_catalog(potential_path): raise ValueError(f"Output path {potential_path} already contains a valid catalog") + +def run(args, client): + """Run catalog creation pipeline.""" + _validate_arguments(args) + resume_plan = ResumePlan(import_args=args) pickled_reader_file = os.path.join(resume_plan.tmp_path, "reader.pickle") From 1aa15a814d8665987784896090a863aba4754501 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com> Date: Mon, 6 Jan 2025 10:18:12 -0500 Subject: [PATCH 5/5] Add description for pypi (#467) --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 660d0fed..4c241b1f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,7 @@ [project] name = "hats-import" license = {file = "LICENSE"} +description = "Utility for ingesting large survey data into HATS structure" readme = "README.md" authors = [ { name = "LINCC Frameworks", email = "lincc-frameworks-team@lists.lsst.org" }