Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove margin fine filtering, and healpy dependency. #442

Merged
merged 6 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ need to install or upgrade versions of dependencies to work with hats-import.
.. tip::
Installing on Mac

``healpy`` is a very necessary dependency for hats libraries at this time, but
``healpy`` is an optional dependency for hats-import (included in the ``full`` extra)
to support converting from older HiPSCat catalogs, but
native prebuilt binaries for healpy on Apple Silicon Macs
`do not yet exist <https://healpy.readthedocs.io/en/latest/install.html#binary-installation-with-pip-recommended-for-most-other-python-users>`_,
so it's recommended to install via conda before proceeding to hats-import.
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ full = [
"fsspec[full]", # complete file system specs.
"ipykernel", # Support for Jupyter notebooks
"ipywidgets", # useful for tqdm in notebooks.
"healpy", # used only in hipscat conversion
]

[build-system]
Expand Down
16 changes: 6 additions & 10 deletions src/hats_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,16 @@ def _iterate_input_file(
else:
# Set up the pixel data
if isinstance(data, pd.DataFrame):
mapped_pixels = hp.ang2pix(
2**highest_order,
mapped_pixels = hp.radec2pix(
highest_order,
data[ra_column].to_numpy(copy=False, dtype=float),
data[dec_column].to_numpy(copy=False, dtype=float),
lonlat=True,
nest=True,
)
else:
mapped_pixels = hp.ang2pix(
2**highest_order,
mapped_pixels = hp.radec2pix(
highest_order,
data[ra_column].to_numpy(),
data[dec_column].to_numpy(),
lonlat=True,
nest=True,
)
yield chunk_number, data, mapped_pixels

Expand Down Expand Up @@ -299,8 +295,8 @@ def reduce_pixel_shards(
SPATIAL_INDEX_COLUMN,
[
pixel_math.compute_spatial_index(
merged_table[ra_column].to_numpy(),
merged_table[dec_column].to_numpy(),
merged_table[ra_column].to_numpy().astype(np.float64),
merged_table[dec_column].to_numpy().astype(np.float64),
)
],
).sort_by(SPATIAL_INDEX_COLUMN)
Expand Down
21 changes: 9 additions & 12 deletions src/hats_import/hipscat_conversion/run_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import tempfile
from typing import no_type_check

import hats.pixel_math.healpix_shim as hp
import hats.pixel_math.healpix_shim as healpix
import numpy as np
import pyarrow.parquet as pq
from dask.distributed import as_completed, get_worker
Expand Down Expand Up @@ -128,12 +128,10 @@ def _convert_partition_file(pixel, args, schema, ra_column, dec_column):
0,
"_healpix_29",
[
hp.ang2pix(
2**29,
healpix.radec2pix(
29,
table[ra_column].to_numpy(),
table[dec_column].to_numpy(),
nest=True,
lonlat=True,
)
],
)
Expand All @@ -155,7 +153,11 @@ def _convert_partition_file(pixel, args, schema, ra_column, dec_column):
raise exception


# pylint: disable=import-outside-toplevel
def _write_nested_fits_map(input_dir, output_dir):
# Healpy is an optional dependency, used only for reads of legacy fits files.
import healpy as hp

input_file = input_dir / "point_map.fits"
if not input_file.exists():
return
Expand All @@ -169,11 +171,6 @@ def _write_nested_fits_map(input_dir, output_dir):
map_fits_image = hp.read_map(_tmp_file.name)
else:
map_fits_image = map_fits_image[0]
map_fits_image = map_fits_image.astype(np.int32)

output_file = output_dir / "point_map.fits"
with tempfile.NamedTemporaryFile() as _tmp_file:
with output_file.open("wb") as _map_file:
hp.write_map(
_tmp_file.name, map_fits_image, overwrite=True, dtype=np.int32, nest=True, coord="CEL"
)
_map_file.write(_tmp_file.read())
file_io.write_fits_image(map_fits_image, output_dir / "point_map.fits")
9 changes: 5 additions & 4 deletions src/hats_import/margin_cache/margin_cache_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
order of healpix partitioning in the source catalog. if `margin_order` is left
default or set to -1, then the `margin_order` will be set dynamically to the
highest partition order plus 1."""
fine_filtering: bool = True
fine_filtering: bool = False
"""should we perform the precise boundary checking? if false, some results may be
greater than `margin_threshold` away from the border (but within `margin_order`)."""

Expand All @@ -54,6 +54,9 @@
if len(self.catalog.get_healpix_pixels()) == 0:
raise ValueError("debug_filter_pixel_list has created empty catalog")

if self.fine_filtering:
raise NotImplementedError("Fine filtering temporarily removed.")

Check warning on line 58 in src/hats_import/margin_cache/margin_cache_arguments.py

View check run for this annotation

Codecov / codecov/patch

src/hats_import/margin_cache/margin_cache_arguments.py#L58

Added line #L58 was not covered by tests

highest_order = int(self.catalog.partition_info.get_highest_order())

if self.margin_order < 0:
Expand All @@ -64,9 +67,7 @@
"margin_order must be of a higher order than the highest order catalog partition pixel."
)

margin_pixel_nside = hp.order2nside(self.margin_order)
margin_pixel_avgsize = hp.nside2resol(margin_pixel_nside, arcmin=True)
margin_pixel_mindist = hp.avgsize2mindist(margin_pixel_avgsize)
margin_pixel_mindist = hp.order2mindist(self.margin_order)
if margin_pixel_mindist * 60.0 < self.margin_threshold:
raise ValueError("margin pixels must be larger than margin_threshold")

Expand Down
26 changes: 8 additions & 18 deletions src/hats_import/margin_cache/margin_cache_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds
from hats import pixel_math
from hats.io import file_io, paths
from hats.pixel_math.healpix_pixel import HealpixPixel

from hats_import.margin_cache.margin_cache_resume_plan import MarginCachePlan
from hats_import.pipeline_resume_plan import get_pixel_cache_directory, print_task_failure


# pylint: disable=too-many-arguments
# pylint: disable=too-many-arguments, unused-argument
def map_pixel_shards(
partition_file,
mapping_key,
Expand All @@ -26,6 +25,9 @@ def map_pixel_shards(
):
"""Creates margin cache shards from a source partition file."""
try:
if fine_filtering:
raise NotImplementedError("Fine filtering temporarily removed.")

schema = file_io.read_parquet_metadata(original_catalog_metadata).schema.to_arrow_schema()
data = file_io.read_parquet_file_to_pandas(partition_file, schema=schema)
source_pixel = HealpixPixel(data["Norder"].iloc[0], data["Npix"].iloc[0])
Expand All @@ -41,12 +43,10 @@ def map_pixel_shards(
f"margin_pixel >= {margin_pixel_range_start} and margin_pixel < {margin_pixel_range_end}"
)

margin_pixel_list = hp.ang2pix(
2**margin_order,
margin_pixel_list = hp.radec2pix(
margin_order,
data[ra_column].values,
data[dec_column].values,
lonlat=True,
nest=True,
)
margin_pixel_filter = pd.DataFrame(
{"margin_pixel": margin_pixel_list, "filter_value": np.arange(0, len(margin_pixel_list))}
Expand Down Expand Up @@ -78,6 +78,7 @@ def map_pixel_shards(
raise exception


# pylint: disable=too-many-arguments, unused-argument
def _to_pixel_shard(
filtered_data,
pixel,
Expand All @@ -89,18 +90,7 @@ def _to_pixel_shard(
fine_filtering,
):
"""Do boundary checking for the cached partition and then output remaining data."""
if fine_filtering:
margin_check = pixel_math.check_margin_bounds(
filtered_data[ra_column].values,
filtered_data[dec_column].values,
pixel.order,
pixel.pixel,
margin_threshold,
)

margin_data = filtered_data.iloc[margin_check]
else:
margin_data = filtered_data
margin_data = filtered_data

num_rows = len(margin_data)
if num_rows:
Expand Down
5 changes: 2 additions & 3 deletions src/hats_import/soap/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import hats.pixel_math.healpix_shim as hp
import numpy as np
from hats import read_hats
from hats import pixel_math, read_hats
from hats.catalog import Catalog
from hats.io import file_io
from hats.pixel_math.healpix_pixel import HealpixPixel
Expand Down Expand Up @@ -193,8 +193,7 @@ def source_to_object_map(object_catalog, source_catalog):

for source, objects in source_to_object.items():
# get all neighboring pixels
nside = hp.order2nside(source.order)
neighbors = hp.get_all_neighbours(nside, source.pixel, nest=True)
neighbors = pixel_math.get_margin(source.order, source.pixel, 0)

## get rid of -1s and normalize to max order
explosion_factor = 4 ** (max_order - source.order)
Expand Down
6 changes: 2 additions & 4 deletions tests/hats_import/catalog/test_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,12 +436,10 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path):
file2_data.to_parquet(shard_dir / "file_2_shard_1.parquet")

combined_data = pd.concat([file1_data, file2_data])
combined_data["norder19_healpix"] = hp.ang2pix(
2**19,
combined_data["norder19_healpix"] = hp.radec2pix(
19,
combined_data["ra"].values,
combined_data["dec"].values,
lonlat=True,
nest=True,
)
## Use this to prune generated columns like Norder, Npix, and _healpix_29
comparison_columns = ["source_id", "object_id", "time", "ra", "dec"]
Expand Down
11 changes: 11 additions & 0 deletions tests/hats_import/hipscat_conversion/test_run_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ def test_bad_args():
runner.run(args, None)


# pylint: disable=unused-import
try:
import healpy as hp

HAVE_HEALPY = True
except ImportError:
HAVE_HEALPY = False


@pytest.mark.skipif(not HAVE_HEALPY, reason="healpy is not installed")
@pytest.mark.dask
def test_run_conversion_object(
test_data_dir,
Expand Down Expand Up @@ -88,6 +98,7 @@ def test_run_conversion_object(
assert data.index.name is None


@pytest.mark.skipif(not HAVE_HEALPY, reason="healpy is not installed")
@pytest.mark.dask
def test_run_conversion_source(
test_data_dir,
Expand Down
2 changes: 1 addition & 1 deletion tests/hats_import/margin_cache/test_margin_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def test_margin_cache_gen(small_sky_source_catalog, tmp_path, dask_client):

data = pd.read_parquet(test_file)

assert len(data) == 13
assert len(data) == 88

assert all(data[paths.PARTITION_ORDER] == norder)
assert all(data[paths.PARTITION_PIXEL] == npix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def test_to_pixel_shard_equator(tmp_path, basic_data_shard_df):

assert os.path.exists(path)

validate_result_dataframe(path, 2)
validate_result_dataframe(path, 360)


@pytest.mark.timeout(5)
Expand All @@ -79,7 +79,7 @@ def test_to_pixel_shard_polar(tmp_path, polar_data_shard_df):
def test_map_pixel_shards_error(tmp_path, capsys):
"""Test error behavior on reduce stage. e.g. by not creating the original
catalog parquet files."""
with pytest.raises(FileNotFoundError):
with pytest.raises(NotImplementedError):
margin_cache_map_reduce.map_pixel_shards(
paths.pixel_catalog_file(tmp_path, HealpixPixel(1, 0)),
mapping_key="1_21",
Expand All @@ -94,9 +94,10 @@ def test_map_pixel_shards_error(tmp_path, capsys):
)

captured = capsys.readouterr()
assert "Parquet file does not exist" in captured.out
assert "Fine filtering temporarily removed" in captured.out


@pytest.mark.skip()
@pytest.mark.timeout(30)
def test_map_pixel_shards_fine(tmp_path, test_data_dir, small_sky_source_catalog):
"""Test basic mapping behavior, with fine filtering enabled."""
Expand Down Expand Up @@ -210,7 +211,7 @@ def test_reduce_margin_shards(tmp_path):
hats_indexes = pixel_math.compute_spatial_index(ras, dec)
margin_order = np.full(360, 0)
margin_dir = np.full(360, 0)
margin_pixels = hp.ang2pix(2**3, ras, dec, lonlat=True, nest=True)
margin_pixels = hp.radec2pix(3, ras, dec)

test_df = pd.DataFrame(
data=zip(hats_indexes, ras, dec, norder, ndir, npix, margin_order, margin_dir, margin_pixels),
Expand Down
4 changes: 2 additions & 2 deletions tests/hats_import/margin_cache/test_margin_round_trip.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def test_margin_import_gaia_minimum(

data = pd.read_parquet(test_file)

assert len(data) == 1
assert len(data) == 4


@pytest.mark.dask(timeout=180)
Expand Down Expand Up @@ -117,7 +117,7 @@ def test_margin_import_mixed_schema_csv(
catalog = read_hats(args.catalog_path)
assert catalog.on_disk
assert catalog.catalog_path == args.catalog_path
assert len(catalog.get_healpix_pixels()) == 5
assert len(catalog.get_healpix_pixels()) == 19

norder = 2
npix = 187
Expand Down