Skip to content

Commit

Permalink
Adds fine filtering on read_hipscat (#350)
Browse files Browse the repository at this point in the history
* Add fine filtering to read_hipscat

* Update quickstart notebook

* Move catalog types to separate file
  • Loading branch information
camposandro authored Jun 21, 2024
1 parent 029c5de commit 79d4cb3
Show file tree
Hide file tree
Showing 18 changed files with 61 additions and 38 deletions.
9 changes: 7 additions & 2 deletions docs/tutorials/quickstart.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
"\n",
"gaia_path = \"https://epyc.astro.washington.edu/~lincc-frameworks/hipscat_surveys/gaia_dr3/gaia\"\n",
"\n",
"gaia = lsdb.read_hipscat(gaia_path, search_filter=ConeSearch(ra=180, dec=10, radius_arcsec=0.5 * 3600))\n",
"gaia = lsdb.read_hipscat(gaia_path, search_filter=ConeSearch(ra=180, dec=10, radius_arcsec=0.6 * 3600))\n",
"gaia"
]
},
Expand Down Expand Up @@ -181,11 +181,16 @@
"metadata": {},
"outputs": [],
"source": [
"result.to_hipscat(\"lsdb_catalogs\", catalog_name=\"my_object_catalog_x_gaia\")"
"result.to_hipscat(\"./my_object_catalog_x_gaia\", catalog_name=\"my_object_catalog_x_gaia\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "lsdb-env",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
Expand Down
6 changes: 2 additions & 4 deletions src/lsdb/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,8 @@ def search(self, search: AbstractSearch):
A new Catalog containing the points filtered to those matching the search parameters.
"""
filtered_hc_structure = search.filter_hc_catalog(self.hc_structure)
ddf_partition_map, search_ddf = self._perform_search(
filtered_hc_structure, filtered_hc_structure.get_healpix_pixels(), search
)
margin = self.margin.search(filtered_hc_structure, search) if self.margin is not None else None
ddf_partition_map, search_ddf = self._perform_search(filtered_hc_structure, search)
margin = self.margin.search(search) if self.margin is not None else None
return Catalog(search_ddf, ddf_partition_map, filtered_hc_structure, margin=margin)

def merge(
Expand Down
11 changes: 6 additions & 5 deletions src/lsdb/catalog/dataset/healpix_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,22 +130,23 @@ def query(self, expr: str) -> Self:

def _perform_search(
self,
metadata: hc.catalog.Catalog,
filtered_pixels: List[HealpixPixel],
metadata: hc.catalog.Catalog | hc.catalog.MarginCatalog,
search: AbstractSearch,
):
) -> Tuple[dict, dd.core.DataFrame]:
"""Performs a search on the catalog from a list of pixels to search in
Args:
metadata (hc.catalog.Catalog): The metadata of the hipscat catalog.
filtered_pixels (List[HealpixPixel]): List of pixels in the catalog to be searched.
metadata (hc.catalog.Catalog | hc.catalog.MarginCatalog): The metadata of
the hipscat catalog after the coarse filtering is applied. The partitions
it contains are only those that overlap with the spatial region.
search (AbstractSearch): Instance of AbstractSearch.
Returns:
A tuple containing a dictionary mapping pixel to partition index and a dask dataframe
containing the search results
"""
partitions = self._ddf.to_delayed()
filtered_pixels = metadata.get_healpix_pixels()
targeted_partitions = [partitions[self._ddf_pixel_map[pixel]] for pixel in filtered_pixels]
filtered_partitions = (
[search.search_points(partition, metadata.catalog_info) for partition in targeted_partitions]
Expand Down
11 changes: 3 additions & 8 deletions src/lsdb/catalog/margin_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,18 @@ def __init__(
):
super().__init__(ddf, ddf_pixel_map, hc_structure)

def search(self, metadata: hc.catalog.Catalog, search: AbstractSearch):
def search(self, search: AbstractSearch):
"""Find rows by reusable search algorithm.
Filters partitions in the catalog to those that match some rough criteria and their neighbors.
Filters to points that match some finer criteria.
Args:
metadata (hc.catalog.Catalog): The metadata of the hipscat catalog corresponding to the margin.
search (AbstractSearch): Instance of AbstractSearch.
Returns:
A new Catalog containing the points filtered to those matching the search parameters.
"""
# if the margin size is greater than the size of a pixel, this is an invalid search
margin_search_moc = metadata.pixel_tree.to_moc()
filtered_hc_structure = self.hc_structure.filter_by_moc(margin_search_moc)
ddf_partition_map, search_ddf = self._perform_search(
metadata, filtered_hc_structure.get_healpix_pixels(), search
)
filtered_hc_structure = search.filter_hc_catalog(self.hc_structure)
ddf_partition_map, search_ddf = self._perform_search(filtered_hc_structure, search)
return self.__class__(search_ddf, ddf_partition_map, filtered_hc_structure)
4 changes: 3 additions & 1 deletion src/lsdb/core/search/abstract_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from mocpy import MOC

if TYPE_CHECKING:
from lsdb.loaders.hipscat.abstract_catalog_loader import HCCatalogTypeVar
from lsdb.types import HCCatalogTypeVar


# pylint: disable=too-many-instance-attributes, too-many-arguments
Expand All @@ -27,6 +27,8 @@ def __init__(self, fine: bool = True):

def filter_hc_catalog(self, hc_structure: HCCatalogTypeVar) -> HCCatalogTypeVar:
"""Filters the hispcat catalog object to the partitions included in the search"""
if len(hc_structure.get_healpix_pixels()) == 0:
return hc_structure
max_order = hc_structure.get_max_coverage_order()
search_moc = self.generate_search_moc(max_order)
return hc_structure.filter_by_moc(search_moc)
Expand Down
2 changes: 1 addition & 1 deletion src/lsdb/core/search/index_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from lsdb.core.search.abstract_search import AbstractSearch

if TYPE_CHECKING:
from lsdb.loaders.hipscat.abstract_catalog_loader import HCCatalogTypeVar
from lsdb.types import HCCatalogTypeVar


class IndexSearch(AbstractSearch):
Expand Down
2 changes: 1 addition & 1 deletion src/lsdb/core/search/order_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from lsdb.core.search.abstract_search import AbstractSearch

if TYPE_CHECKING:
from lsdb.loaders.hipscat.abstract_catalog_loader import HCCatalogTypeVar
from lsdb.types import HCCatalogTypeVar


class OrderSearch(AbstractSearch):
Expand Down
5 changes: 2 additions & 3 deletions src/lsdb/core/search/pixel_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
from typing import TYPE_CHECKING, List, Tuple

import pandas as pd
from hipscat.catalog.catalog_info import CatalogInfo
from hipscat.pixel_math import HealpixPixel

from lsdb.core.search.abstract_search import AbstractSearch

if TYPE_CHECKING:
from lsdb.loaders.hipscat.abstract_catalog_loader import HCCatalogTypeVar
from lsdb.types import HCCatalogTypeVar


class PixelSearch(AbstractSearch):
Expand All @@ -26,5 +25,5 @@ def __init__(self, pixels: List[Tuple[int, int]]):
def filter_hc_catalog(self, hc_structure: HCCatalogTypeVar) -> HCCatalogTypeVar:
return hc_structure.filter_from_pixel_list(self.pixels)

def search_points(self, frame: pd.DataFrame, metadata: CatalogInfo) -> pd.DataFrame:
def search_points(self, frame: pd.DataFrame, _) -> pd.DataFrame:
return frame
7 changes: 2 additions & 5 deletions src/lsdb/loaders/hipscat/abstract_catalog_loader.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from abc import abstractmethod
from typing import Generic, List, Tuple, Type, TypeVar
from typing import Generic, List, Tuple, Type

import dask.dataframe as dd
import hipscat as hc
Expand All @@ -13,12 +13,9 @@
from hipscat.pixel_math.healpix_pixel_function import get_pixel_argsort

from lsdb.catalog.catalog import DaskDFPixelMap
from lsdb.catalog.dataset.dataset import Dataset
from lsdb.dask.divisions import get_pixels_divisions
from lsdb.loaders.hipscat.hipscat_loading_config import HipscatLoadingConfig

CatalogTypeVar = TypeVar("CatalogTypeVar", bound=Dataset)
HCCatalogTypeVar = TypeVar("HCCatalogTypeVar", bound=HCHealpixDataset)
from lsdb.types import CatalogTypeVar, HCCatalogTypeVar


class AbstractCatalogLoader(Generic[CatalogTypeVar]):
Expand Down
12 changes: 7 additions & 5 deletions src/lsdb/loaders/hipscat/hipscat_catalog_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ def load_catalog(self) -> Catalog:
hc_catalog = self._load_hipscat_catalog(hc.catalog.Catalog)
filtered_hc_catalog = self._filter_hipscat_catalog(hc_catalog)
dask_df, dask_df_pixel_map = self._load_dask_df_and_map(filtered_hc_catalog)
return Catalog(
dask_df, dask_df_pixel_map, filtered_hc_catalog, self._load_margin_catalog(filtered_hc_catalog)
)
catalog = Catalog(dask_df, dask_df_pixel_map, filtered_hc_catalog)
if self.config.search_filter is not None:
catalog = catalog.search(self.config.search_filter)
catalog.margin = self._load_margin_catalog()
return catalog

def _filter_hipscat_catalog(self, hc_catalog: hc.catalog.Catalog) -> hc.catalog.Catalog:
"""Filter the catalog pixels according to the spatial filter provided at loading time.
Expand All @@ -41,15 +43,15 @@ def _filter_hipscat_catalog(self, hc_catalog: hc.catalog.Catalog) -> hc.catalog.
storage_options=hc_catalog.storage_options,
)

def _load_margin_catalog(self, metadata: hc.catalog.Catalog) -> MarginCatalog | None:
def _load_margin_catalog(self) -> MarginCatalog | None:
"""Load the margin catalog. It can be provided using a margin catalog
instance or a path to the catalog on disk."""
margin_catalog = None
if isinstance(self.config.margin_cache, MarginCatalog):
margin_catalog = self.config.margin_cache
if self.config.search_filter is not None:
# pylint: disable=protected-access
margin_catalog = margin_catalog.search(metadata, self.config.search_filter)
margin_catalog = margin_catalog.search(self.config.search_filter)
elif isinstance(self.config.margin_cache, str):
margin_catalog = lsdb.read_hipscat(
path=self.config.margin_cache,
Expand Down
5 changes: 4 additions & 1 deletion src/lsdb/loaders/hipscat/margin_catalog_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ def load_catalog(self) -> MarginCatalog | None:
hc_catalog = self._load_hipscat_catalog(hc.catalog.MarginCatalog)
filtered_hc_catalog = self._filter_hipscat_catalog(hc_catalog)
dask_df, dask_df_pixel_map = self._load_dask_df_and_map(filtered_hc_catalog)
return MarginCatalog(dask_df, dask_df_pixel_map, filtered_hc_catalog)
margin = MarginCatalog(dask_df, dask_df_pixel_map, filtered_hc_catalog)
if self.config.search_filter is not None:
margin = margin.search(self.config.search_filter)
return margin

def _filter_hipscat_catalog(self, hc_catalog: hc.catalog.MarginCatalog) -> hc.catalog.MarginCatalog:
"""Filter the catalog pixels according to the spatial filter provided at loading time.
Expand Down
8 changes: 7 additions & 1 deletion src/lsdb/types.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
from typing import Dict, List, Tuple
from typing import Dict, List, Tuple, TypeVar

from hipscat.catalog.healpix_dataset.healpix_dataset import HealpixDataset as HCHealpixDataset
from hipscat.pixel_math import HealpixPixel
from typing_extensions import TypeAlias

from lsdb.catalog.dataset.dataset import Dataset

# Compute pixel map returns a tuple. The first element is
# the number of data points within the HEALPix pixel, the
# second element is the list of pixels it contains.
HealpixInfo: TypeAlias = Tuple[int, List[int]]

DaskDFPixelMap = Dict[HealpixPixel, int]

CatalogTypeVar = TypeVar("CatalogTypeVar", bound=Dataset)
HCCatalogTypeVar = TypeVar("HCCatalogTypeVar", bound=HCHealpixDataset)
3 changes: 3 additions & 0 deletions tests/data/small_sky_order1_source_margin/catalog_info.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
"catalog_name": "small_sky_order1_source_margin",
"catalog_type": "margin",
"total_rows": 3948,
"epoch": "J2000",
"ra_column": "source_ra",
"dec_column": "source_dec",
"primary_catalog": "small_sky_order1_source",
"margin_threshold": 7200
}
3 changes: 3 additions & 0 deletions tests/data/small_sky_order3_source_margin/catalog_info.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
"catalog_name": "small_sky_order3_source_margin",
"catalog_type": "margin",
"total_rows": 1025,
"epoch": "J2000",
"ra_column": "source_ra",
"dec_column": "source_dec",
"primary_catalog": "small_sky_order3_source",
"margin_threshold": 300
}
3 changes: 3 additions & 0 deletions tests/data/small_sky_source_margin/catalog_info.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
"catalog_name": "small_sky_source_margin",
"catalog_type": "margin",
"total_rows": 395,
"epoch": "J2000",
"ra_column": "source_ra",
"dec_column": "source_dec",
"primary_catalog": "../../../hipscat-import/tests/hipscat_import/data/small_sky_source_catalog",
"margin_threshold": 180
}
3 changes: 3 additions & 0 deletions tests/data/small_sky_xmatch_margin/catalog_info.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
"catalog_name": "small_sky_xmatch_margin",
"catalog_type": "margin",
"total_rows": 26,
"epoch": "J2000",
"ra_column": "ra",
"dec_column": "dec",
"primary_catalog": "small_sky_xmatch",
"margin_threshold": 7200
}
2 changes: 1 addition & 1 deletion tests/lsdb/catalog/test_cone_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def test_invalid_dec_and_negative_radius(small_sky_order1_catalog):

def test_empty_cone_search_with_margin(small_sky_order1_source_with_margin):
ra = 100
dec = -80
dec = 80
radius = 60
cone = small_sky_order1_source_with_margin.cone_search(ra, dec, radius, fine=False)
assert len(cone._ddf_pixel_map) == 0
Expand Down
3 changes: 3 additions & 0 deletions tests/lsdb/loaders/hipscat/test_read_hipscat.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def test_read_hipscat_subset_with_cone_search(small_sky_order1_dir, small_sky_or
assert isinstance(cone_search_catalog_2, lsdb.Catalog)
# The partitions of the catalogs are equivalent
assert cone_search_catalog.get_healpix_pixels() == cone_search_catalog_2.get_healpix_pixels()
pd.testing.assert_frame_equal(cone_search_catalog.compute(), cone_search_catalog_2.compute())


def test_read_hipscat_subset_with_box_search(small_sky_order1_dir, small_sky_order1_catalog):
Expand All @@ -117,6 +118,7 @@ def test_read_hipscat_subset_with_box_search(small_sky_order1_dir, small_sky_ord
assert isinstance(box_search_catalog_2, lsdb.Catalog)
# The partitions of the catalogs are equivalent
assert box_search_catalog.get_healpix_pixels() == box_search_catalog_2.get_healpix_pixels()
pd.testing.assert_frame_equal(box_search_catalog.compute(), box_search_catalog_2.compute())


def test_read_hipscat_subset_with_polygon_search(small_sky_order1_dir, small_sky_order1_catalog):
Expand All @@ -129,6 +131,7 @@ def test_read_hipscat_subset_with_polygon_search(small_sky_order1_dir, small_sky
assert isinstance(polygon_search_catalog_2, lsdb.Catalog)
# The partitions of the catalogs are equivalent
assert polygon_search_catalog.get_healpix_pixels() == polygon_search_catalog_2.get_healpix_pixels()
pd.testing.assert_frame_equal(polygon_search_catalog.compute(), polygon_search_catalog_2.compute())


def test_read_hipscat_subset_with_index_search(
Expand Down

0 comments on commit 79d4cb3

Please sign in to comment.