diff --git a/pyproject.toml b/pyproject.toml index 116b688d..504bc2ce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,11 +20,13 @@ dependencies = [ # dask distributed eases the creation of parallel dask clients. # dask diagnostics is required to spin up the dashboard for profiling. "dask[complete]", + "deprecated", "hipscat>=0.3.8", + "lsst-sphgeom", # To handle spherical sky polygons + "nested-dask", + "nested-pandas", "pyarrow", - "deprecated", "scipy", # kdtree - "lsst-sphgeom", # To handle spherical sky polygons ] [project.urls] diff --git a/src/lsdb/catalog/association_catalog.py b/src/lsdb/catalog/association_catalog.py index ffbab67b..50d0d1fb 100644 --- a/src/lsdb/catalog/association_catalog.py +++ b/src/lsdb/catalog/association_catalog.py @@ -1,5 +1,5 @@ -import dask.dataframe as dd import hipscat as hc +import nested_dask as nd from lsdb.catalog.dataset.healpix_dataset import HealpixDataset from lsdb.types import DaskDFPixelMap @@ -18,7 +18,7 @@ class AssociationCatalog(HealpixDataset): def __init__( self, - ddf: dd.DataFrame, + ddf: nd.NestedFrame, ddf_pixel_map: DaskDFPixelMap, hc_structure: hc.catalog.AssociationCatalog, ): diff --git a/src/lsdb/catalog/catalog.py b/src/lsdb/catalog/catalog.py index 48b10786..5c00e6bd 100644 --- a/src/lsdb/catalog/catalog.py +++ b/src/lsdb/catalog/catalog.py @@ -3,8 +3,9 @@ import dataclasses from typing import List, Tuple, Type -import dask.dataframe as dd import hipscat as hc +import nested_dask as nd +import nested_pandas as npd import pandas as pd from hipscat.catalog.index.index_catalog import IndexCatalog as HCIndexCatalog from hipscat.pixel_math.polygon_filter import SphericalCoordinates @@ -18,7 +19,11 @@ from lsdb.core.search.abstract_search import AbstractSearch from lsdb.core.search.pixel_search import PixelSearch from lsdb.dask.crossmatch_catalog_data import crossmatch_catalog_data -from lsdb.dask.join_catalog_data import join_catalog_data_on, join_catalog_data_through +from lsdb.dask.join_catalog_data import ( + join_catalog_data_nested, + join_catalog_data_on, + join_catalog_data_through, +) from lsdb.dask.partition_indexer import PartitionIndexer from lsdb.io.schema import get_arrow_schema from lsdb.types import DaskDFPixelMap @@ -38,7 +43,7 @@ class Catalog(HealpixDataset): def __init__( self, - ddf: dd.DataFrame, + ddf: nd.NestedFrame, ddf_pixel_map: DaskDFPixelMap, hc_structure: hc.catalog.Catalog, margin: MarginCatalog | None = None, @@ -61,7 +66,7 @@ def partitions(self): """Returns the partitions of the catalog""" return PartitionIndexer(self) - def head(self, n: int = 5) -> pd.DataFrame: + def head(self, n: int = 5) -> npd.NestedFrame: """Returns a few rows of data for previewing purposes. Args: @@ -80,7 +85,7 @@ def head(self, n: int = 5) -> pd.DataFrame: dfs.append(partition_head) remaining_rows -= len(partition_head) if len(dfs) > 0: - return pd.concat(dfs) + return npd.NestedFrame(pd.concat(dfs)) return self._ddf._meta def query(self, expr: str) -> Catalog: @@ -154,8 +159,8 @@ def crossmatch( The class will have been initialized with the following parameters, which the crossmatch function should use: - - left: pd.DataFrame, - - right: pd.DataFrame, + - left: npd.NestedFrame, + - right: npd.NestedFrame, - left_order: int, - left_pixel: int, - right_order: int, @@ -315,9 +320,9 @@ def search(self, search: AbstractSearch): A new Catalog containing the points filtered to those matching the search parameters. """ filtered_hc_structure = search.filter_hc_catalog(self.hc_structure) - ddf_partition_map, search_ddf = self._perform_search(filtered_hc_structure, search) + ddf_partition_map, search_ndf = self._perform_search(filtered_hc_structure, search) margin = self.margin.search(search) if self.margin is not None else None - return Catalog(search_ddf, ddf_partition_map, filtered_hc_structure, margin=margin) + return Catalog(search_ndf, ddf_partition_map, filtered_hc_structure, margin=margin) def merge( self, @@ -329,7 +334,7 @@ def merge( left_index: bool = False, right_index: bool = False, suffixes: Tuple[str, str] | None = None, - ) -> dd.DataFrame: + ) -> nd.NestedFrame: """Performs the merge of two catalog Dataframes More information about pandas merge is available @@ -444,3 +449,60 @@ def join( ) hc_catalog = hc.catalog.Catalog(new_catalog_info, alignment.pixel_tree, schema=get_arrow_schema(ddf)) return Catalog(ddf, ddf_map, hc_catalog) + + def join_nested( + self, + other: Catalog, + left_on: str | None = None, + right_on: str | None = None, + nested_column_name: str | None = None, + output_catalog_name: str | None = None, + ) -> Catalog: + """Perform a spatial join to another catalog by adding the other catalog as a nested column + + Joins two catalogs together on a shared column value, merging rows where they match. + + The result is added as a nested dataframe column using + `nested-dask `__, where the right catalog's columns + are encoded within a column in the resulting dataframe. For more information, view the + `nested-dask documentation `__. + + The operation only joins data from matching partitions and their margin caches, and does not join rows + that have a matching column value but are in separate partitions in the sky. For a more general join, + see the `merge` function. + + Args: + other (Catalog): the right catalog to join to + left_on (str): the name of the column in the left catalog to join on + right_on (str): the name of the column in the right catalog to join on + nested_column_name (str): the name of the nested column in the resulting dataframe storing the + joined columns in the right catalog. (Default: name of right catalog) + output_catalog_name (str): The name of the resulting catalog to be stored in metadata + + Returns: + A new catalog with the columns from each of the input catalogs with their respective suffixes + added, and the rows merged on the specified columns. + """ + + if left_on is None or right_on is None: + raise ValueError("Both of left_on and right_on must be set") + + if left_on not in self._ddf.columns: + raise ValueError("left_on must be a column in the left catalog") + + if right_on not in other._ddf.columns: + raise ValueError("right_on must be a column in the right catalog") + + ddf, ddf_map, alignment = join_catalog_data_nested( + self, other, left_on, right_on, nested_column_name=nested_column_name + ) + + if output_catalog_name is None: + output_catalog_name = self.hc_structure.catalog_info.catalog_name + + new_catalog_info = dataclasses.replace( + self.hc_structure.catalog_info, + catalog_name=output_catalog_name, + ) + hc_catalog = hc.catalog.Catalog(new_catalog_info, alignment.pixel_tree) + return Catalog(ddf, ddf_map, hc_catalog) diff --git a/src/lsdb/catalog/dataset/dataset.py b/src/lsdb/catalog/dataset/dataset.py index 34d7dd33..70d9d91e 100644 --- a/src/lsdb/catalog/dataset/dataset.py +++ b/src/lsdb/catalog/dataset/dataset.py @@ -1,8 +1,8 @@ from typing import List -import dask.dataframe as dd import hipscat as hc -import pandas as pd +import nested_dask as nd +import nested_pandas as npd from dask.delayed import Delayed @@ -11,7 +11,7 @@ class Dataset: def __init__( self, - ddf: dd.DataFrame, + ddf: nd.NestedFrame, hc_structure: hc.catalog.Dataset, ): """Initialise a Catalog object. @@ -34,7 +34,7 @@ def _repr_html_(self): data = self._ddf._repr_data().to_html(max_rows=5, show_dimensions=False, notebook=True) return f"
lsdb Catalog {self.name}:
{data}" - def compute(self) -> pd.DataFrame: + def compute(self) -> npd.NestedFrame: """Compute dask distributed dataframe to pandas dataframe""" return self._ddf.compute() diff --git a/src/lsdb/catalog/dataset/healpix_dataset.py b/src/lsdb/catalog/dataset/healpix_dataset.py index a5374252..56f7b603 100644 --- a/src/lsdb/catalog/dataset/healpix_dataset.py +++ b/src/lsdb/catalog/dataset/healpix_dataset.py @@ -7,6 +7,8 @@ import dask.dataframe as dd import healpy as hp import hipscat as hc +import nested_dask as nd +import nested_pandas as npd import numpy as np import pandas as pd from dask.delayed import Delayed, delayed @@ -38,7 +40,7 @@ class HealpixDataset(Dataset): def __init__( self, - ddf: dd.DataFrame, + ddf: nd.NestedFrame, ddf_pixel_map: DaskDFPixelMap, hc_structure: HCHealpixDataset, ): @@ -57,7 +59,7 @@ def __init__( def __getitem__(self, item): result = self._ddf.__getitem__(item) - if isinstance(result, dd.DataFrame): + if isinstance(result, nd.NestedFrame): return self.__class__(result, self._ddf_pixel_map, self.hc_structure) return result @@ -79,7 +81,7 @@ def get_ordered_healpix_pixels(self) -> List[HealpixPixel]: pixels = self.get_healpix_pixels() return np.array(pixels)[get_pixel_argsort(pixels)] - def get_partition(self, order: int, pixel: int) -> dd.DataFrame: + def get_partition(self, order: int, pixel: int) -> nd.NestedFrame: """Get the dask partition for a given HEALPix pixel Args: @@ -124,14 +126,14 @@ def query(self, expr: str) -> Self: A catalog that contains the data from the original catalog that complies with the query expression """ - ddf = self._ddf.query(expr) - return self.__class__(ddf, self._ddf_pixel_map, self.hc_structure) + ndf = self._ddf.query(expr) + return self.__class__(ndf, self._ddf_pixel_map, self.hc_structure) def _perform_search( self, metadata: hc.catalog.Catalog | hc.catalog.MarginCatalog, search: AbstractSearch, - ) -> Tuple[DaskDFPixelMap, dd.DataFrame]: + ) -> Tuple[DaskDFPixelMap, nd.NestedFrame]: """Performs a search on the catalog from a list of pixels to search in Args: @@ -146,7 +148,7 @@ def _perform_search( """ filtered_pixels = metadata.get_healpix_pixels() if len(filtered_pixels) == 0: - return {}, dd.from_pandas(self._ddf._meta) + return {}, nd.NestedFrame.from_pandas(self._ddf._meta) target_partitions_indices = [self._ddf_pixel_map[pixel] for pixel in filtered_pixels] filtered_partitions_ddf = self._ddf.partitions[target_partitions_indices] if search.fine: @@ -161,7 +163,7 @@ def _perform_search( def map_partitions( self, - func: Callable[..., pd.DataFrame], + func: Callable[..., npd.NestedFrame], *args, meta: pd.DataFrame | pd.Series | Dict | Iterable | Tuple | None = None, include_pixel: bool = False, @@ -173,11 +175,11 @@ def map_partitions( Args: func (Callable): The function applied to each partition, which will be called with: - `func(partition: pd.DataFrame, *args, **kwargs)` with the additional args and kwargs passed to - the `map_partitions` function. If the `include_pixel` parameter is set, the function will be - called with the `healpix_pixel` as the second positional argument set to the healpix pixel + `func(partition: npd.NestedFrame, *args, **kwargs)` with the additional args and kwargs passed + to the `map_partitions` function. If the `include_pixel` parameter is set, the function will + be called with the `healpix_pixel` as the second positional argument set to the healpix pixel of the partition as - `func(partition: pd.DataFrame, healpix_pixel: HealpixPixel, *args, **kwargs)` + `func(partition: npd.NestedFrame, healpix_pixel: HealpixPixel, *args, **kwargs)` *args: Additional positional arguments to call `func` with. meta (pd.DataFrame | pd.Series | Dict | Iterable | Tuple | None): An empty pandas DataFrame that has columns matching the output of the function applied to a partition. Other types are @@ -222,8 +224,10 @@ def apply_func(df, *args, partition_info=None, **kwargs): else: output_ddf = self._ddf.map_partitions(func, *args, meta=meta, **kwargs) - if isinstance(output_ddf, dd.DataFrame): - return self.__class__(output_ddf, self._ddf_pixel_map, self.hc_structure) + if isinstance(output_ddf, nd.NestedFrame) | isinstance(output_ddf, dd.DataFrame): + return self.__class__( + nd.NestedFrame.from_dask_dataframe(output_ddf), self._ddf_pixel_map, self.hc_structure + ) warnings.warn( "output of the function must be a DataFrame to generate an LSDB `Catalog`. `map_partitions` " "will return a dask object instead of a Catalog.", @@ -247,7 +251,7 @@ def prune_empty_partitions(self, persist: bool = False) -> Self: search_ddf = ( self._ddf.partitions[non_empty_partitions] if len(non_empty_partitions) > 0 - else dd.from_pandas(self._ddf._meta, npartitions=1) + else nd.NestedFrame.from_pandas(self._ddf._meta, npartitions=1) ) ddf_partition_map = {pixel: i for i, pixel in enumerate(non_empty_pixels)} filtered_hc_structure = self.hc_structure.filter_from_pixel_list(non_empty_pixels) @@ -276,7 +280,7 @@ def _get_non_empty_partitions(self) -> Tuple[List[HealpixPixel], np.ndarray]: def skymap_data( self, - func: Callable[[pd.DataFrame, HealpixPixel], Any], + func: Callable[[npd.NestedFrame, HealpixPixel], Any], order: int | None = None, default_value: Any = 0.0, **kwargs, @@ -284,7 +288,7 @@ def skymap_data( """Perform a function on each partition of the catalog, returning a dict of values for each pixel. Args: - func (Callable[[pd.DataFrame, HealpixPixel], Any]): A function that takes a pandas + func (Callable[[npd.NestedFrame, HealpixPixel], Any]): A function that takes a pandas DataFrame with the data in a partition, the HealpixPixel of the partition, and any other keyword arguments and returns an aggregated value order (int | None): The HEALPix order to compute the skymap at. If None (default), @@ -321,7 +325,7 @@ def skymap_data( def skymap_histogram( self, - func: Callable[[pd.DataFrame, HealpixPixel], Any], + func: Callable[[npd.NestedFrame, HealpixPixel], Any], order: int | None = None, default_value: Any = 0.0, **kwargs, @@ -330,8 +334,8 @@ def skymap_histogram( a given order Args: - func (Callable[[pd.DataFrame], HealpixPixel, Any]): A function that takes a pandas DataFrame and - the HealpixPixel the partition is from and returns a value + func (Callable[[npd.NestedFrame, HealpixPixel], Any]): A function that takes a pandas DataFrame + and the HealpixPixel the partition is from and returns a value order (int | None): The HEALPix order to compute the skymap at. If None (default), will compute for each partition in the catalog at their own orders. If a value other than None, each partition will be grouped by pixel number at the order @@ -357,7 +361,7 @@ def skymap_histogram( def skymap( self, - func: Callable[[pd.DataFrame, HealpixPixel], Any], + func: Callable[[npd.NestedFrame, HealpixPixel], Any], order: int | None = None, default_value: Any = hp.pixelfunc.UNSEEN, projection="moll", @@ -367,8 +371,8 @@ def skymap( """Plot a skymap of an aggregate function applied over each partition Args: - func (Callable[[pd.DataFrame], HealpixPixel, Any]): A function that takes a pandas DataFrame and - the HealpixPixel the partition is from and returns a value + func (Callable[[npd.NestedFrame, HealpixPixel], Any]): A function that takes a pandas DataFrame + and the HealpixPixel the partition is from and returns a value order (int | None): The HEALPix order to compute the skymap at. If None (default), will compute for each partition in the catalog at their own orders. If a value other than None, each partition will be grouped by pixel number at the order diff --git a/src/lsdb/catalog/margin_catalog.py b/src/lsdb/catalog/margin_catalog.py index 1e18e96d..7c390c63 100644 --- a/src/lsdb/catalog/margin_catalog.py +++ b/src/lsdb/catalog/margin_catalog.py @@ -1,5 +1,5 @@ -import dask.dataframe as dd import hipscat as hc +import nested_dask as nd from lsdb.catalog.dataset.healpix_dataset import HealpixDataset from lsdb.core.search.abstract_search import AbstractSearch @@ -19,7 +19,7 @@ class MarginCatalog(HealpixDataset): def __init__( self, - ddf: dd.DataFrame, + ddf: nd.NestedFrame, ddf_pixel_map: DaskDFPixelMap, hc_structure: hc.catalog.MarginCatalog, ): @@ -38,5 +38,5 @@ def search(self, search: AbstractSearch): A new Catalog containing the points filtered to those matching the search parameters. """ filtered_hc_structure = search.filter_hc_catalog(self.hc_structure) - ddf_partition_map, search_ddf = self._perform_search(filtered_hc_structure, search) - return self.__class__(search_ddf, ddf_partition_map, filtered_hc_structure) + ddf_partition_map, search_ndf = self._perform_search(filtered_hc_structure, search) + return self.__class__(search_ndf, ddf_partition_map, filtered_hc_structure) diff --git a/src/lsdb/core/crossmatch/abstract_crossmatch_algorithm.py b/src/lsdb/core/crossmatch/abstract_crossmatch_algorithm.py index 8b723823..5d8a6d8e 100644 --- a/src/lsdb/core/crossmatch/abstract_crossmatch_algorithm.py +++ b/src/lsdb/core/crossmatch/abstract_crossmatch_algorithm.py @@ -3,6 +3,7 @@ from abc import ABC from typing import TYPE_CHECKING, Tuple +import nested_pandas as npd import numpy as np import numpy.typing as npt import pandas as pd @@ -34,8 +35,8 @@ class AbstractCrossmatchAlgorithm(ABC): The class will have been initialized with the following parameters, which the crossmatch function should use: - - left: pd.DataFrame, - - right: pd.DataFrame, + - left: npd.NestedFrame, + - right: npd.NestedFrame, - left_order: int, - left_pixel: int, - right_order: int, @@ -56,8 +57,8 @@ class AbstractCrossmatchAlgorithm(ABC): def __init__( self, - left: pd.DataFrame, - right: pd.DataFrame, + left: npd.NestedFrame, + right: npd.NestedFrame, left_order: int, left_pixel: int, right_order: int, @@ -97,7 +98,7 @@ def __init__( self.right_margin_catalog_info = right_margin_catalog_info self.suffixes = suffixes - def crossmatch(self, **kwargs) -> pd.DataFrame: + def crossmatch(self, **kwargs) -> npd.NestedFrame: """Perform a crossmatch""" l_inds, r_inds, extra_cols = self.perform_crossmatch(**kwargs) if not len(l_inds) == len(r_inds) == len(extra_cols): @@ -159,7 +160,7 @@ def _rename_columns_with_suffix(dataframe, suffix): dataframe.rename(columns=columns_renamed, inplace=True) @classmethod - def _append_extra_columns(cls, dataframe: pd.DataFrame, extra_columns: pd.DataFrame | None = None): + def _append_extra_columns(cls, dataframe: npd.NestedFrame, extra_columns: pd.DataFrame | None = None): """Adds crossmatch extra columns to the resulting Dataframe.""" if cls.extra_columns is None: return @@ -187,7 +188,7 @@ def _create_crossmatch_df( left_idx: npt.NDArray[np.int64], right_idx: npt.NDArray[np.int64], extra_cols: pd.DataFrame, - ) -> pd.DataFrame: + ) -> npd.NestedFrame: """Creates a df containing the crossmatch result from matching indices and additional columns Args: @@ -216,4 +217,4 @@ def _create_crossmatch_df( out.set_index(HIPSCAT_ID_COLUMN, inplace=True) extra_cols.index = out.index self._append_extra_columns(out, extra_cols) - return out + return npd.NestedFrame(out) diff --git a/src/lsdb/core/plotting/skymap.py b/src/lsdb/core/plotting/skymap.py index 11cae196..8bd4035c 100644 --- a/src/lsdb/core/plotting/skymap.py +++ b/src/lsdb/core/plotting/skymap.py @@ -3,16 +3,16 @@ from typing import Any, Callable, Dict import healpy as hp +import nested_pandas as npd import numpy as np -import pandas as pd from dask import delayed from hipscat.pixel_math import HealpixPixel, hipscat_id_to_healpix @delayed def perform_inner_skymap( - partition: pd.DataFrame, - func: Callable[[pd.DataFrame, HealpixPixel], Any], + partition: npd.NestedFrame, + func: Callable[[npd.NestedFrame, HealpixPixel], Any], pixel: HealpixPixel, target_order: int, default_value: Any = 0, diff --git a/src/lsdb/core/search/abstract_search.py b/src/lsdb/core/search/abstract_search.py index d4243cd8..4237f517 100644 --- a/src/lsdb/core/search/abstract_search.py +++ b/src/lsdb/core/search/abstract_search.py @@ -3,7 +3,7 @@ from abc import ABC, abstractmethod from typing import TYPE_CHECKING -import pandas as pd +import nested_pandas as npd from hipscat.catalog.catalog_info import CatalogInfo from mocpy import MOC @@ -40,5 +40,5 @@ def generate_search_moc(self, max_order: int) -> MOC: ) @abstractmethod - def search_points(self, frame: pd.DataFrame, metadata: CatalogInfo) -> pd.DataFrame: + def search_points(self, frame: npd.NestedFrame, metadata: CatalogInfo) -> npd.NestedFrame: """Determine the search results within a data frame""" diff --git a/src/lsdb/core/search/box_search.py b/src/lsdb/core/search/box_search.py index c516e5b8..db594fbd 100644 --- a/src/lsdb/core/search/box_search.py +++ b/src/lsdb/core/search/box_search.py @@ -2,8 +2,8 @@ from typing import Tuple +import nested_pandas as npd import numpy as np -import pandas as pd from hipscat.catalog.catalog_info import CatalogInfo from hipscat.pixel_math.box_filter import generate_box_moc, wrap_ra_angles from hipscat.pixel_math.validators import validate_box_search @@ -34,21 +34,21 @@ def __init__( def generate_search_moc(self, max_order: int) -> MOC: return generate_box_moc(self.ra, self.dec, max_order) - def search_points(self, frame: pd.DataFrame, metadata: CatalogInfo) -> pd.DataFrame: + def search_points(self, frame: npd.NestedFrame, metadata: CatalogInfo) -> npd.NestedFrame: """Determine the search results within a data frame""" return box_filter(frame, self.ra, self.dec, metadata) def box_filter( - data_frame: pd.DataFrame, + data_frame: npd.NestedFrame, ra: Tuple[float, float] | None, dec: Tuple[float, float] | None, metadata: CatalogInfo, -): +) -> npd.NestedFrame: """Filters a dataframe to only include points within the specified box region. Args: - data_frame (pd.DataFrame): DataFrame containing points in the sky + data_frame (npd.NestedFrame): DataFrame containing points in the sky ra (Tuple[float, float]): Right ascension range, in degrees dec (Tuple[float, float]): Declination range, in degrees metadata (hc.catalog.Catalog): hipscat `Catalog` with catalog_info that matches `data_frame` diff --git a/src/lsdb/core/search/cone_search.py b/src/lsdb/core/search/cone_search.py index e728a61c..d37734c8 100644 --- a/src/lsdb/core/search/cone_search.py +++ b/src/lsdb/core/search/cone_search.py @@ -1,4 +1,4 @@ -import pandas as pd +import nested_pandas as npd from astropy.coordinates import SkyCoord from hipscat.catalog.catalog_info import CatalogInfo from hipscat.pixel_math.cone_filter import generate_cone_moc @@ -26,16 +26,16 @@ def __init__(self, ra: float, dec: float, radius_arcsec: float, fine: bool = Tru def generate_search_moc(self, max_order: int) -> MOC: return generate_cone_moc(self.ra, self.dec, self.radius_arcsec, max_order) - def search_points(self, frame: pd.DataFrame, metadata: CatalogInfo) -> pd.DataFrame: + def search_points(self, frame: npd.NestedFrame, metadata: CatalogInfo) -> npd.NestedFrame: """Determine the search results within a data frame""" return cone_filter(frame, self.ra, self.dec, self.radius_arcsec, metadata) -def cone_filter(data_frame: pd.DataFrame, ra, dec, radius_arcsec, metadata: CatalogInfo): +def cone_filter(data_frame: npd.NestedFrame, ra, dec, radius_arcsec, metadata: CatalogInfo): """Filters a dataframe to only include points within the specified cone Args: - data_frame (pd.DataFrame): DataFrame containing points in the sky + data_frame (npd.NestedFrame): DataFrame containing points in the sky ra (float): Right Ascension of the center of the cone in degrees dec (float): Declination of the center of the cone in degrees radius_arcsec (float): Radius of the cone in arcseconds diff --git a/src/lsdb/core/search/index_search.py b/src/lsdb/core/search/index_search.py index e4d3b477..cc2f0c8d 100644 --- a/src/lsdb/core/search/index_search.py +++ b/src/lsdb/core/search/index_search.py @@ -2,7 +2,7 @@ from typing import TYPE_CHECKING -import pandas as pd +import nested_pandas as npd from hipscat.catalog.index.index_catalog import IndexCatalog from lsdb.core.search.abstract_search import AbstractSearch @@ -29,6 +29,6 @@ def filter_hc_catalog(self, hc_structure: HCCatalogTypeVar) -> HCCatalogTypeVar: healpix_pixels = self.catalog_index.loc_partitions(self.ids) return hc_structure.filter_from_pixel_list(healpix_pixels) - def search_points(self, frame: pd.DataFrame, _) -> pd.DataFrame: + def search_points(self, frame: npd.NestedFrame, _) -> npd.NestedFrame: """Determine the search results within a data frame""" return frame[frame[self.catalog_index.catalog_info.indexing_column].isin(self.ids)] diff --git a/src/lsdb/core/search/order_search.py b/src/lsdb/core/search/order_search.py index 3a3ed0b1..e0a013e7 100644 --- a/src/lsdb/core/search/order_search.py +++ b/src/lsdb/core/search/order_search.py @@ -2,7 +2,7 @@ from typing import TYPE_CHECKING -import pandas as pd +import nested_pandas as npd from lsdb.core.search.abstract_search import AbstractSearch @@ -32,6 +32,6 @@ def filter_hc_catalog(self, hc_structure: HCCatalogTypeVar) -> HCCatalogTypeVar: pixels = [p for p in hc_structure.get_healpix_pixels() if self.min_order <= p.order <= max_order] return hc_structure.filter_from_pixel_list(pixels) - def search_points(self, frame: pd.DataFrame, _) -> pd.DataFrame: + def search_points(self, frame: npd.NestedFrame, _) -> npd.NestedFrame: """Determine the search results within a data frame.""" return frame diff --git a/src/lsdb/core/search/pixel_search.py b/src/lsdb/core/search/pixel_search.py index 3126f044..1bb6abde 100644 --- a/src/lsdb/core/search/pixel_search.py +++ b/src/lsdb/core/search/pixel_search.py @@ -2,7 +2,7 @@ from typing import TYPE_CHECKING, List, Tuple -import pandas as pd +import nested_pandas as npd from hipscat.pixel_math import HealpixPixel from lsdb.core.search.abstract_search import AbstractSearch @@ -25,5 +25,5 @@ def __init__(self, pixels: List[Tuple[int, int]]): def filter_hc_catalog(self, hc_structure: HCCatalogTypeVar) -> HCCatalogTypeVar: return hc_structure.filter_from_pixel_list(self.pixels) - def search_points(self, frame: pd.DataFrame, _) -> pd.DataFrame: + def search_points(self, frame: npd.NestedFrame, _) -> npd.NestedFrame: return frame diff --git a/src/lsdb/core/search/polygon_search.py b/src/lsdb/core/search/polygon_search.py index d08cac69..52905723 100644 --- a/src/lsdb/core/search/polygon_search.py +++ b/src/lsdb/core/search/polygon_search.py @@ -1,8 +1,8 @@ from typing import List, Tuple import healpy as hp +import nested_pandas as npd import numpy as np -import pandas as pd from hipscat.catalog.catalog_info import CatalogInfo from hipscat.pixel_math.polygon_filter import CartesianCoordinates, SphericalCoordinates, generate_polygon_moc from hipscat.pixel_math.validators import validate_declination_values, validate_polygon @@ -29,16 +29,18 @@ def __init__(self, vertices: List[SphericalCoordinates], fine: bool = True): def generate_search_moc(self, max_order: int) -> MOC: return generate_polygon_moc(self.vertices_xyz, max_order) - def search_points(self, frame: pd.DataFrame, metadata: CatalogInfo) -> pd.DataFrame: + def search_points(self, frame: npd.NestedFrame, metadata: CatalogInfo) -> npd.NestedFrame: """Determine the search results within a data frame""" return polygon_filter(frame, self.polygon, metadata) -def polygon_filter(data_frame: pd.DataFrame, polygon: ConvexPolygon, metadata: CatalogInfo): +def polygon_filter( + data_frame: npd.NestedFrame, polygon: ConvexPolygon, metadata: CatalogInfo +) -> npd.NestedFrame: """Filters a dataframe to only include points within the specified polygon. Args: - data_frame (pd.DataFrame): DataFrame containing points in the sky + data_frame (npd.NestedFrame): DataFrame containing points in the sky polygon (ConvexPolygon): Convex spherical polygon of interest, used to filter points metadata (hc.catalog.Catalog): hipscat `Catalog` with catalog_info that matches `dataframe` diff --git a/src/lsdb/dask/crossmatch_catalog_data.py b/src/lsdb/dask/crossmatch_catalog_data.py index fd745586..cd90b070 100644 --- a/src/lsdb/dask/crossmatch_catalog_data.py +++ b/src/lsdb/dask/crossmatch_catalog_data.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING, Tuple, Type import dask -import dask.dataframe as dd +import nested_dask as nd from hipscat.pixel_tree import PixelAlignment from lsdb.core.crossmatch.abstract_crossmatch_algorithm import AbstractCrossmatchAlgorithm @@ -81,7 +81,7 @@ def crossmatch_catalog_data( Type[AbstractCrossmatchAlgorithm] | BuiltInCrossmatchAlgorithm ) = BuiltInCrossmatchAlgorithm.KD_TREE, **kwargs, -) -> Tuple[dd.DataFrame, DaskDFPixelMap, PixelAlignment]: +) -> Tuple[nd.NestedFrame, DaskDFPixelMap, PixelAlignment]: """Cross-matches the data from two catalogs Args: diff --git a/src/lsdb/dask/join_catalog_data.py b/src/lsdb/dask/join_catalog_data.py index 303a9ca5..045d3fbd 100644 --- a/src/lsdb/dask/join_catalog_data.py +++ b/src/lsdb/dask/join_catalog_data.py @@ -6,14 +6,15 @@ from typing import TYPE_CHECKING, List, Tuple import dask -import dask.dataframe as dd -import pandas as pd +import nested_dask as nd +import nested_pandas as npd from hipscat.catalog.association_catalog import AssociationCatalogInfo from hipscat.catalog.catalog_info import CatalogInfo from hipscat.catalog.margin_cache import MarginCacheCatalogInfo from hipscat.pixel_math import HealpixPixel from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN from hipscat.pixel_tree import PixelAlignment +from nested_pandas.series.packer import pack_flat from lsdb.catalog.association_catalog import AssociationCatalog from lsdb.dask.merge_catalog_functions import ( @@ -23,6 +24,7 @@ construct_catalog_args, filter_by_hipscat_index_to_pixel, generate_meta_df_for_joined_tables, + generate_meta_df_for_nested_tables, get_healpix_pixels_from_alignment, ) from lsdb.types import DaskDFPixelMap @@ -34,12 +36,12 @@ NON_JOINING_ASSOCIATION_COLUMNS = ["Norder", "Dir", "Npix", "join_Norder", "join_Dir", "join_Npix"] -def rename_columns_with_suffixes(left: pd.DataFrame, right: pd.DataFrame, suffixes: Tuple[str, str]): +def rename_columns_with_suffixes(left: npd.NestedFrame, right: npd.NestedFrame, suffixes: Tuple[str, str]): """Renames two dataframes with the suffixes specified Args: - left (pd.DataFrame): the left dataframe to apply the first suffix to - right (pd.DataFrame): the right dataframe to apply the second suffix to + left (npd.NestedFrame): the left dataframe to apply the first suffix to + right (npd.NestedFrame): the right dataframe to apply the second suffix to suffixes (Tuple[str, str]): the pair of suffixes to apply to the dataframes Returns: @@ -55,9 +57,9 @@ def rename_columns_with_suffixes(left: pd.DataFrame, right: pd.DataFrame, suffix # pylint: disable=too-many-arguments, unused-argument @dask.delayed def perform_join_on( - left: pd.DataFrame, - right: pd.DataFrame, - right_margin: pd.DataFrame, + left: npd.NestedFrame, + right: npd.NestedFrame, + right_margin: npd.NestedFrame, left_pixel: HealpixPixel, right_pixel: HealpixPixel, right_margin_pixel: HealpixPixel, @@ -72,9 +74,9 @@ def perform_join_on( """Performs a join on two catalog partitions Args: - left (pd.DataFrame): the left partition to merge - right (pd.DataFrame): the right partition to merge - right_margin (pd.DataFrame): the right margin partition to merge + left (npd.NestedFrame): the left partition to merge + right (npd.NestedFrame): the right partition to merge + right_margin (npd.NestedFrame): the right margin partition to merge left_pixel (HealpixPixel): the HEALPix pixel of the left partition right_pixel (HealpixPixel): the HEALPix pixel of the right partition right_margin_pixel (HealpixPixel): the HEALPix pixel of the right margin partition @@ -102,13 +104,63 @@ def perform_join_on( return merged +# pylint: disable=too-many-arguments, unused-argument +@dask.delayed +def perform_join_nested( + left: npd.NestedFrame, + right: npd.NestedFrame, + right_margin: npd.NestedFrame, + left_pixel: HealpixPixel, + right_pixel: HealpixPixel, + right_margin_pixel: HealpixPixel, + left_catalog_info: CatalogInfo, + right_catalog_info: CatalogInfo, + right_margin_catalog_info: MarginCacheCatalogInfo, + left_on: str, + right_on: str, + right_columns: List[str], + right_name: str, +): + """Performs a join on two catalog partitions by adding the right catalog a nested column using + nested-pandas + + Args: + left (npd.NestedFrame): the left partition to merge + right (npd.NestedFrame): the right partition to merge + right_margin (npd.NestedFrame): the right margin partition to merge + left_pixel (HealpixPixel): the HEALPix pixel of the left partition + right_pixel (HealpixPixel): the HEALPix pixel of the right partition + right_margin_pixel (HealpixPixel): the HEALPix pixel of the right margin partition + left_catalog_info (hc.CatalogInfo): the catalog info of the left catalog + right_catalog_info (hc.CatalogInfo): the catalog info of the right catalog + right_margin_catalog_info (hc.MarginCacheCatalogInfo): the catalog info of the right margin catalog + left_on (str): the column to join on from the left partition + right_on (str): the column to join on from the right partition + right_columns (List[str]): the columns to include from the right margin partition + right_name (str): the name of the nested column in the resulting df to join the right catalog into + + Returns: + A dataframe with the result of merging the left and right partitions on the specified columns + """ + if right_pixel.order > left_pixel.order: + left = filter_by_hipscat_index_to_pixel(left, right_pixel.order, right_pixel.pixel) + + right_joined_df = concat_partition_and_margin(right, right_margin, right_columns) + + right_joined_df = pack_flat(npd.NestedFrame(right_joined_df.set_index(right_on))).rename(right_name) + + merged = left.reset_index().merge(right_joined_df, left_on=left_on, right_index=True) + merged.set_index(HIPSCAT_ID_COLUMN, inplace=True) + return merged + + # pylint: disable=too-many-arguments, unused-argument @dask.delayed def perform_join_through( - left: pd.DataFrame, - right: pd.DataFrame, - right_margin: pd.DataFrame, - through: pd.DataFrame, + left: npd.NestedFrame, + right: npd.NestedFrame, + right_margin: npd.NestedFrame, + through: npd.NestedFrame, left_pixel: HealpixPixel, right_pixel: HealpixPixel, right_margin_pixel: HealpixPixel, @@ -123,18 +175,19 @@ def perform_join_through( """Performs a join on two catalog partitions through an association catalog Args: - left (pd.DataFrame): the left partition to merge - right (pd.DataFrame): the right partition to merge - right_margin (pd.DataFrame): the right margin partition to merge - through (pd.DataFrame): the association column partition to merge with + left (npd.NestedFrame): the left partition to merge + right (npd.NestedFrame): the right partition to merge + right_margin (npd.NestedFrame): the right margin partition to merge + through (npd.NestedFrame): the association column partition to merge with left_pixel (HealpixPixel): the HEALPix pixel of the left partition right_pixel (HealpixPixel): the HEALPix pixel of the right partition right_margin_pixel (HealpixPixel): the HEALPix pixel of the right margin partition through_pixel (HealpixPixel): the HEALPix pixel of the association partition - left_catalog (hc.Catalog): the hipscat structure of the left catalog - right_catalog (hc.Catalog): the hipscat structure of the right catalog - right_margin_catalog (hc.Catalog): the hipscat structure of the right margin catalog - assoc_catalog (hc.AssociationCatalog): the hipscat structure of the association catalog + left_catalog_info (hc.CatalogInfo): the hipscat structure of the left catalog + right_catalog_info (hc.CatalogInfo): the hipscat structure of the right catalog + right_margin_catalog_info (hc.MarginCacheCatalogInfo): the hipscat structure of the right margin + catalog + assoc_catalog_info (hc.AssociationCatalogInfo): the hipscat structure of the association catalog suffixes (Tuple[str,str]): the suffixes to apply to each partition's column names right_columns (List[str]): the columns to include from the right margin partition @@ -177,7 +230,7 @@ def perform_join_through( def join_catalog_data_on( left: Catalog, right: Catalog, left_on: str, right_on: str, suffixes: Tuple[str, str] -) -> Tuple[dd.DataFrame, DaskDFPixelMap, PixelAlignment]: +) -> Tuple[nd.NestedFrame, DaskDFPixelMap, PixelAlignment]: """Joins two catalogs spatially on a specified column Args: @@ -216,9 +269,59 @@ def join_catalog_data_on( return construct_catalog_args(joined_partitions, meta_df, alignment) +def join_catalog_data_nested( + left: Catalog, + right: Catalog, + left_on: str, + right_on: str, + nested_column_name: str | None = None, +) -> Tuple[nd.NestedFrame, DaskDFPixelMap, PixelAlignment]: + """Joins two catalogs spatially on a specified column, adding the right as a nested column with nested + dask + + Args: + left (lsdb.Catalog): the left catalog to join + right (lsdb.Catalog): the right catalog to join + left_on (str): the column to join on from the left partition + right_on (str): the column to join on from the right partition + nested_column_name (str): the name of the nested column in the final output, if None, defaults to + name of the right catalog + + Returns: + A tuple of the dask dataframe with the result of the join, the pixel map from HEALPix + pixel to partition index within the dataframe, and the PixelAlignment of the two input + catalogs. + """ + if right.margin is None: + warnings.warn( + "Right catalog does not have a margin cache. Results may be incomplete and/or inaccurate.", + RuntimeWarning, + ) + + if nested_column_name is None: + nested_column_name = right.name + + alignment = align_catalogs(left, right) + + left_pixels, right_pixels = get_healpix_pixels_from_alignment(alignment) + + joined_partitions = align_and_apply( + [(left, left_pixels), (right, right_pixels), (right.margin, right_pixels)], + perform_join_nested, + left_on, + right_on, + right.columns, + nested_column_name, + ) + + meta_df = generate_meta_df_for_nested_tables([left], right, nested_column_name, join_column_name=right_on) + + return construct_catalog_args(joined_partitions, meta_df, alignment) + + def join_catalog_data_through( left: Catalog, right: Catalog, association: AssociationCatalog, suffixes: Tuple[str, str] -) -> Tuple[dd.DataFrame, DaskDFPixelMap, PixelAlignment]: +) -> Tuple[nd.NestedFrame, DaskDFPixelMap, PixelAlignment]: """Joins two catalogs with an association table Args: diff --git a/src/lsdb/dask/merge_catalog_functions.py b/src/lsdb/dask/merge_catalog_functions.py index 6441b48f..632fa1ea 100644 --- a/src/lsdb/dask/merge_catalog_functions.py +++ b/src/lsdb/dask/merge_catalog_functions.py @@ -1,9 +1,10 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Callable, List, Sequence, Tuple, cast +from typing import TYPE_CHECKING, Callable, List, Sequence, Tuple -import dask.dataframe as dd import healpy as hp +import nested_dask as nd +import nested_pandas as npd import numpy as np import numpy.typing as npt import pandas as pd @@ -24,13 +25,13 @@ def concat_partition_and_margin( - partition: pd.DataFrame, margin: pd.DataFrame | None, right_columns: List[str] -) -> pd.DataFrame: + partition: npd.NestedFrame, margin: npd.NestedFrame | None, right_columns: List[str] +) -> npd.NestedFrame: """Concatenates a partition and margin dataframe together Args: - partition (pd.DataFrame): The partition dataframe - margin (pd.DataFrame): The margin dataframe + partition (npd.NestedFrame): The partition dataframe + margin (npd.NestedFrame): The margin dataframe Returns: The concatenated dataframe with the partition on top and the margin on the bottom @@ -54,7 +55,7 @@ def concat_partition_and_margin( margin_renamed = margin[margin_columns_no_hive].rename(columns=rename_columns) margin_filtered = margin_renamed[right_columns] joined_df = pd.concat([partition, margin_filtered]) if margin_filtered is not None else partition - return joined_df + return npd.NestedFrame(joined_df) def align_catalogs(left: Catalog, right: Catalog, add_right_margin: bool = True) -> PixelAlignment: @@ -158,11 +159,11 @@ def apply_func(*partitions_and_pixels): return resulting_partitions -def filter_by_hipscat_index_to_pixel(dataframe: pd.DataFrame, order: int, pixel: int) -> pd.DataFrame: +def filter_by_hipscat_index_to_pixel(dataframe: npd.NestedFrame, order: int, pixel: int) -> npd.NestedFrame: """Filters a catalog dataframe to the points within a specified HEALPix pixel using the hipscat index Args: - dataframe (pd.DataFrame): The dataframe to filter + dataframe (npd.NestedFrame): The dataframe to filter order (int): The order of the HEALPix pixel to filter to pixel (int): The pixel number in NESTED numbering of the HEALPix pixel to filter to @@ -176,13 +177,13 @@ def filter_by_hipscat_index_to_pixel(dataframe: pd.DataFrame, order: int, pixel: def construct_catalog_args( - partitions: List[Delayed], meta_df: pd.DataFrame, alignment: PixelAlignment -) -> Tuple[dd.DataFrame, DaskDFPixelMap, PixelAlignment]: + partitions: List[Delayed], meta_df: npd.NestedFrame, alignment: PixelAlignment +) -> Tuple[nd.NestedFrame, DaskDFPixelMap, PixelAlignment]: """Constructs the arguments needed to create a catalog from a list of delayed partitions Args: partitions (List[Delayed]): The list of delayed partitions to create the catalog from - meta_df (pd.DataFrame): The dask meta schema for the partitions + meta_df (npd.NestedFrame): The dask meta schema for the partitions alignment (PixelAlignment): The alignment used to create the delayed partitions Returns: @@ -193,9 +194,8 @@ def construct_catalog_args( partition_map = get_partition_map_from_alignment_pixels(alignment.pixel_mapping) # create dask df from delayed partitions divisions = get_pixels_divisions(list(partition_map.keys())) - partitions = partitions if len(partitions) > 0 else [delayed(pd.DataFrame([]))] - ddf = dd.from_delayed(partitions, meta=meta_df, divisions=divisions) - ddf = cast(dd.DataFrame, ddf) + partitions = partitions if len(partitions) > 0 else [delayed(meta_df.copy())] + ddf = nd.NestedFrame.from_delayed(partitions, meta=meta_df, divisions=divisions, verify_meta=True) return ddf, partition_map, alignment @@ -229,7 +229,7 @@ def generate_meta_df_for_joined_tables( extra_columns: pd.DataFrame | None = None, index_name: str = HIPSCAT_ID_COLUMN, index_type: npt.DTypeLike = np.uint64, -) -> pd.DataFrame: +) -> npd.NestedFrame: """Generates a Dask meta DataFrame that would result from joining two catalogs Creates an empty dataframe with the columns of each catalog appended with a suffix. Allows specifying @@ -256,7 +256,56 @@ def generate_meta_df_for_joined_tables( meta.update(extra_columns) index = pd.Index(pd.Series(dtype=index_type), name=index_name) meta_df = pd.DataFrame(meta, index) - return meta_df + return npd.NestedFrame(meta_df) + + +def generate_meta_df_for_nested_tables( + catalogs: Sequence[Catalog], + nested_catalog: Catalog, + nested_column_name: str, + join_column_name: str, + extra_columns: pd.DataFrame | None = None, + index_name: str = HIPSCAT_ID_COLUMN, + index_type: npt.DTypeLike = np.uint64, +) -> npd.NestedFrame: + """Generates a Dask meta DataFrame that would result from joining two catalogs, adding the right as a + nested frame + + Creates an empty dataframe with the columns of the left catalog, and a nested column with the right + catalog. Allows specifying extra columns that should also be added, and the name of the index of the + resulting dataframe. + + Args: + catalogs (Sequence[lsdb.Catalog]): The catalogs to merge together + nested_catalog (Catalog): The catalog to add as a nested column + nested_column_name (str): The name of the nested column + join_column_name (str): The name of the column in the right catalog to join on + extra_columns (pd.Dataframe): Any additional columns to the merged catalogs + index_name (str): The name of the index in the resulting DataFrame + index_type (npt.DTypeLike): The type of the index in the resulting DataFrame + + Returns: + An empty dataframe with the right catalog joined to the left as a nested column, and any extra + columns specified, with the index name set. + """ + meta = {} + # Construct meta for crossmatched catalog columns + for table in catalogs: + for name, col_type in table.dtypes.items(): + meta[name] = pd.Series(dtype=col_type) + # Construct meta for crossmatch result columns + if extra_columns is not None: + meta.update(extra_columns) + index = pd.Index(pd.Series(dtype=index_type), name=index_name) + meta_df = pd.DataFrame(meta, index) + + # make an empty copy of the nested catalog, removing the column that will be joined on (and removed from + # the eventual dataframe) + # pylint: disable=protected-access + nested_catalog_meta = nested_catalog._ddf._meta.copy().iloc[:0].drop(join_column_name, axis=1) + + # Use nested-pandas to make the resulting meta with the nested catalog meta as a nested column + return npd.NestedFrame(meta_df).add_nested(nested_catalog_meta, nested_column_name) def get_partition_map_from_alignment_pixels(join_pixels: pd.DataFrame) -> DaskDFPixelMap: diff --git a/src/lsdb/io/to_hipscat.py b/src/lsdb/io/to_hipscat.py index db60b081..8293a43e 100644 --- a/src/lsdb/io/to_hipscat.py +++ b/src/lsdb/io/to_hipscat.py @@ -7,7 +7,7 @@ import dask import hipscat as hc -import pandas as pd +import nested_pandas as npd from hipscat.catalog.healpix_dataset.healpix_dataset import HealpixDataset as HCHealpixDataset from hipscat.io import FilePointer from hipscat.pixel_math import HealpixPixel @@ -20,7 +20,7 @@ @dask.delayed def perform_write( - df: pd.DataFrame, + df: npd.NestedFrame, hp_pixel: HealpixPixel, base_catalog_dir: FilePointer, storage_options: dict | None = None, @@ -31,7 +31,7 @@ def perform_write( To be used as a dask delayed method as part of a dask task graph. Args: - df (pd.DataFrame): dataframe to write to file + df (npd.NestedFrame): dataframe to write to file hp_pixel (HealpixPixel): HEALPix pixel of file to be written base_catalog_dir (FilePointer): Location of the base catalog directory to write to storage_options (dict): fsspec storage options diff --git a/src/lsdb/loaders/dataframe/dataframe_catalog_loader.py b/src/lsdb/loaders/dataframe/dataframe_catalog_loader.py index 7bb54c88..e9a86590 100644 --- a/src/lsdb/loaders/dataframe/dataframe_catalog_loader.py +++ b/src/lsdb/loaders/dataframe/dataframe_catalog_loader.py @@ -6,8 +6,9 @@ from typing import Dict, List, Tuple import astropy.units as u -import dask.dataframe as dd import hipscat as hc +import nested_dask as nd +import nested_pandas as npd import numpy as np import pandas as pd import pyarrow as pa @@ -67,7 +68,7 @@ def __init__( automatically inferred from the provided DataFrame using `pa.Schema.from_pandas`. **kwargs: Arguments to pass to the creation of the catalog info. """ - self.dataframe = dataframe + self.dataframe = npd.NestedFrame(dataframe) self.lowest_order = lowest_order self.highest_order = highest_order self.drop_empty_siblings = drop_empty_siblings @@ -177,7 +178,7 @@ def _compute_pixel_list(self) -> List[HealpixPixel]: def _generate_dask_df_and_map( self, pixel_list: List[HealpixPixel] - ) -> Tuple[dd.DataFrame, DaskDFPixelMap, int]: + ) -> Tuple[nd.NestedFrame, DaskDFPixelMap, int]: """Load Dask DataFrame from HEALPix pixel Dataframes and generate a mapping of HEALPix pixels to HEALPix Dataframes @@ -189,7 +190,7 @@ def _generate_dask_df_and_map( to the respective Pandas Dataframes and the total number of rows. """ # Dataframes for each destination HEALPix pixel - pixel_dfs: List[pd.DataFrame] = [] + pixel_dfs: List[npd.NestedFrame] = [] # Mapping HEALPix pixels to the respective Dataframe indices ddf_pixel_map: Dict[HealpixPixel, int] = {} diff --git a/src/lsdb/loaders/dataframe/from_dataframe_utils.py b/src/lsdb/loaders/dataframe/from_dataframe_utils.py index 4895456b..1e584cd0 100644 --- a/src/lsdb/loaders/dataframe/from_dataframe_utils.py +++ b/src/lsdb/loaders/dataframe/from_dataframe_utils.py @@ -1,6 +1,7 @@ from typing import List, Tuple -import dask.dataframe as dd +import nested_dask as nd +import nested_pandas as npd import numpy as np import pandas as pd import pyarrow as pa @@ -13,12 +14,12 @@ def _generate_dask_dataframe( - pixel_dfs: List[pd.DataFrame], pixels: List[HealpixPixel], use_pyarrow_types: bool = True -) -> Tuple[dd.DataFrame, int]: + pixel_dfs: List[npd.NestedFrame], pixels: List[HealpixPixel], use_pyarrow_types: bool = True +) -> Tuple[nd.NestedFrame, int]: """Create the Dask Dataframe from the list of HEALPix pixel Dataframes Args: - pixel_dfs (List[pd.DataFrame]): The list of HEALPix pixel Dataframes + pixel_dfs (List[npd.NestedFrame]): The list of HEALPix pixel Dataframes pixels (List[HealpixPixel]): The list of HEALPix pixels in the catalog use_pyarrow_types (bool): If True, use pyarrow types. Defaults to True. @@ -29,8 +30,7 @@ def _generate_dask_dataframe( schema = pixel_dfs[0].iloc[:0, :].copy() if len(pixels) > 0 else [] delayed_dfs = [delayed(df) for df in pixel_dfs] divisions = get_pixels_divisions(pixels) - ddf = dd.from_delayed(delayed_dfs, meta=schema, divisions=divisions) - ddf = ddf if isinstance(ddf, dd.DataFrame) else ddf.to_frame() + ddf = nd.NestedFrame.from_delayed(delayed_dfs, meta=schema, divisions=divisions) return ddf, len(ddf) @@ -53,7 +53,9 @@ def _convert_dtypes_to_pyarrow(df: pd.DataFrame) -> pd.DataFrame: return pd.DataFrame(new_series, index=df_index, copy=False) -def _append_partition_information_to_dataframe(dataframe: pd.DataFrame, pixel: HealpixPixel) -> pd.DataFrame: +def _append_partition_information_to_dataframe( + dataframe: npd.NestedFrame, pixel: HealpixPixel +) -> npd.NestedFrame: """Append partitioning information to a HEALPix dataframe Args: @@ -77,7 +79,7 @@ def _append_partition_information_to_dataframe(dataframe: pd.DataFrame, pixel: H return _order_partition_dataframe_columns(dataframe) -def _format_margin_partition_dataframe(dataframe: pd.DataFrame) -> pd.DataFrame: +def _format_margin_partition_dataframe(dataframe: npd.NestedFrame) -> npd.NestedFrame: """Finalizes the dataframe for a margin catalog partition Args: @@ -111,7 +113,7 @@ def _format_margin_partition_dataframe(dataframe: pd.DataFrame) -> pd.DataFrame: return _order_partition_dataframe_columns(dataframe) -def _order_partition_dataframe_columns(dataframe: pd.DataFrame) -> pd.DataFrame: +def _order_partition_dataframe_columns(dataframe: npd.NestedFrame) -> npd.NestedFrame: """Reorder columns of a partition dataframe so that pixel information always appears in the same sequence diff --git a/src/lsdb/loaders/dataframe/margin_catalog_generator.py b/src/lsdb/loaders/dataframe/margin_catalog_generator.py index 29775341..e22c4904 100644 --- a/src/lsdb/loaders/dataframe/margin_catalog_generator.py +++ b/src/lsdb/loaders/dataframe/margin_catalog_generator.py @@ -2,9 +2,10 @@ from typing import Dict, List, Tuple -import dask.dataframe as dd import healpy as hp import hipscat as hc +import nested_dask as nd +import nested_pandas as npd import numpy as np import pandas as pd from hipscat import pixel_math @@ -39,7 +40,7 @@ def __init__( margin_threshold (float): The size of the margin cache boundary, in arcseconds use_pyarrow_types (bool): If True, use pyarrow types. Defaults to True. """ - self.dataframe = catalog.compute().copy() + self.dataframe: npd.NestedFrame = catalog.compute().copy() self.hc_structure = catalog.hc_structure self.margin_threshold = margin_threshold self.margin_order = self._set_margin_order(margin_order) @@ -83,7 +84,7 @@ def create_catalog(self) -> MarginCatalog | None: margin_structure = hc.catalog.MarginCatalog(margin_catalog_info, margin_pixels, schema=self.schema) return MarginCatalog(ddf, ddf_pixel_map, margin_structure) - def _get_margins(self) -> Tuple[List[HealpixPixel], List[pd.DataFrame]]: + def _get_margins(self) -> Tuple[List[HealpixPixel], List[npd.NestedFrame]]: """Generates the list of pixels that have margin data, and the dataframes with the margin data for each partition @@ -101,7 +102,7 @@ def _get_margins(self) -> Tuple[List[HealpixPixel], List[pd.DataFrame]]: def _generate_dask_df_and_map( self, pixels: List[HealpixPixel], partitions: List[pd.DataFrame] - ) -> Tuple[dd.DataFrame, Dict[HealpixPixel, int], int]: + ) -> Tuple[nd.NestedFrame, Dict[HealpixPixel, int], int]: """Create the Dask Dataframe containing the data points in the margins for the catalog as well as the mapping of those HEALPix to Dataframes @@ -163,7 +164,7 @@ def _create_margins(self, margin_pairs_df: pd.DataFrame) -> Dict[HealpixPixel, p Returns: A dictionary mapping each margin pixel to the respective DataFrame. """ - margin_pixel_df_map: Dict[HealpixPixel, pd.DataFrame] = {} + margin_pixel_df_map: Dict[HealpixPixel, npd.NestedFrame] = {} self.dataframe["margin_pixel"] = hp.ang2pix( 2**self.margin_order, self.dataframe[self.hc_structure.catalog_info.ra_column].to_numpy(), @@ -183,7 +184,9 @@ def _create_margins(self, margin_pairs_df: pd.DataFrame) -> Dict[HealpixPixel, p margin_pixel_df_map[margin_pixel] = df return margin_pixel_df_map - def _get_data_in_margin(self, partition_df: pd.DataFrame, margin_pixel: HealpixPixel) -> pd.DataFrame: + def _get_data_in_margin( + self, partition_df: npd.NestedFrame, margin_pixel: HealpixPixel + ) -> npd.NestedFrame: """Calculate the margin boundaries for the HEALPix and include the points on the margin according to the specified threshold diff --git a/src/lsdb/loaders/hipscat/abstract_catalog_loader.py b/src/lsdb/loaders/hipscat/abstract_catalog_loader.py index 2d886909..de66214a 100644 --- a/src/lsdb/loaders/hipscat/abstract_catalog_loader.py +++ b/src/lsdb/loaders/hipscat/abstract_catalog_loader.py @@ -3,10 +3,10 @@ from abc import abstractmethod from typing import Generic, List, Tuple, Type -import dask.dataframe as dd import hipscat as hc +import nested_dask as nd +import nested_pandas as npd import numpy as np -import pandas as pd import pyarrow as pa from hipscat.catalog.healpix_dataset.healpix_dataset import HealpixDataset as HCHealpixDataset from hipscat.io.file_io import file_io @@ -54,7 +54,7 @@ def _load_hipscat_catalog(self, catalog_type: Type[HCCatalogTypeVar]) -> HCCatal ) return hc_catalog - def _load_dask_df_and_map(self, catalog: HCHealpixDataset) -> Tuple[dd.DataFrame, DaskDFPixelMap]: + def _load_dask_df_and_map(self, catalog: HCHealpixDataset) -> Tuple[nd.NestedFrame, DaskDFPixelMap]: """Load Dask DF from parquet files and make dict of HEALPix pixel to partition index""" pixels = catalog.get_healpix_pixels() ordered_pixels = np.array(pixels)[get_pixel_argsort(pixels)] @@ -77,10 +77,10 @@ def _get_paths_from_pixels( def _load_df_from_paths( self, catalog: HCHealpixDataset, paths: List[hc.io.FilePointer], divisions: Tuple[int, ...] | None - ) -> dd.DataFrame: + ) -> nd.NestedFrame: dask_meta_schema = self._create_dask_meta_schema(catalog.schema) if len(paths) > 0: - return dd.from_map( + return nd.NestedFrame.from_map( file_io.read_parquet_file_to_pandas, paths, columns=self.config.columns, @@ -90,14 +90,14 @@ def _load_df_from_paths( storage_options=self.storage_options, **self._get_kwargs(), ) - return dd.from_pandas(dask_meta_schema, npartitions=1) + return nd.NestedFrame.from_pandas(dask_meta_schema, npartitions=1) - def _create_dask_meta_schema(self, schema: pa.Schema) -> pd.DataFrame: + def _create_dask_meta_schema(self, schema: pa.Schema) -> npd.NestedFrame: """Creates the Dask meta DataFrame from the HiPSCat catalog schema.""" dask_meta_schema = schema.empty_table().to_pandas(types_mapper=self.config.get_dtype_mapper()) if self.config.columns is not None: dask_meta_schema = dask_meta_schema[self.config.columns] - return dask_meta_schema + return npd.NestedFrame(dask_meta_schema) def _get_kwargs(self) -> dict: """Constructs additional arguments for the `read_parquet` call""" diff --git a/src/lsdb/loaders/hipscat/association_catalog_loader.py b/src/lsdb/loaders/hipscat/association_catalog_loader.py index e283d0e2..a91eac4e 100644 --- a/src/lsdb/loaders/hipscat/association_catalog_loader.py +++ b/src/lsdb/loaders/hipscat/association_catalog_loader.py @@ -1,5 +1,5 @@ -import dask.dataframe as dd import hipscat as hc +import nested_dask as nd from lsdb.catalog.association_catalog import AssociationCatalog from lsdb.loaders.hipscat.abstract_catalog_loader import AbstractCatalogLoader @@ -23,5 +23,5 @@ def load_catalog(self) -> AssociationCatalog: def _load_empty_dask_df_and_map(self, hc_catalog): dask_meta_schema = self._create_dask_meta_schema(hc_catalog.schema) - ddf = dd.from_pandas(dask_meta_schema, npartitions=1) + ddf = nd.NestedFrame.from_pandas(dask_meta_schema, npartitions=1) return ddf, {} diff --git a/tests/lsdb/catalog/test_association_catalog.py b/tests/lsdb/catalog/test_association_catalog.py index b57ba826..6303a89e 100644 --- a/tests/lsdb/catalog/test_association_catalog.py +++ b/tests/lsdb/catalog/test_association_catalog.py @@ -1,4 +1,5 @@ import hipscat as hc +import nested_dask as nd import pandas as pd import lsdb @@ -8,6 +9,7 @@ def test_load_association(small_sky_to_xmatch_dir): small_sky_to_xmatch = lsdb.read_hipscat(small_sky_to_xmatch_dir) assert isinstance(small_sky_to_xmatch, AssociationCatalog) + assert isinstance(small_sky_to_xmatch._ddf, nd.NestedFrame) assert small_sky_to_xmatch.get_healpix_pixels() == small_sky_to_xmatch.hc_structure.get_healpix_pixels() assert repr(small_sky_to_xmatch) == repr(small_sky_to_xmatch._ddf) for healpix_pixel in small_sky_to_xmatch.get_healpix_pixels(): @@ -26,4 +28,5 @@ def test_load_association(small_sky_to_xmatch_dir): def test_load_soft_association(small_sky_to_xmatch_soft_dir): small_sky_to_xmatch_soft = lsdb.read_hipscat(small_sky_to_xmatch_soft_dir) assert isinstance(small_sky_to_xmatch_soft, AssociationCatalog) + assert isinstance(small_sky_to_xmatch_soft._ddf, nd.NestedFrame) assert len(small_sky_to_xmatch_soft.compute()) == 0 diff --git a/tests/lsdb/catalog/test_box_search.py b/tests/lsdb/catalog/test_box_search.py index 4e776517..f40f1f8a 100644 --- a/tests/lsdb/catalog/test_box_search.py +++ b/tests/lsdb/catalog/test_box_search.py @@ -1,3 +1,5 @@ +import nested_dask as nd +import nested_pandas as npd import numpy as np import pytest from hipscat.pixel_math.validators import ValidatorsErrors @@ -5,7 +7,9 @@ def test_box_search_ra_filters_correct_points(small_sky_order1_catalog, assert_divisions_are_correct): ra_search_catalog = small_sky_order1_catalog.box_search(ra=(280, 300)) + assert isinstance(ra_search_catalog._ddf, nd.NestedFrame) ra_search_df = ra_search_catalog.compute() + assert isinstance(ra_search_df, npd.NestedFrame) ra_values = ra_search_df[small_sky_order1_catalog.hc_structure.catalog_info.ra_column] assert len(ra_search_df) < len(small_sky_order1_catalog.compute()) assert all(280 <= ra <= 300 for ra in ra_values) diff --git a/tests/lsdb/catalog/test_catalog.py b/tests/lsdb/catalog/test_catalog.py index a3bed877..183011fd 100644 --- a/tests/lsdb/catalog/test_catalog.py +++ b/tests/lsdb/catalog/test_catalog.py @@ -3,6 +3,8 @@ import dask.array as da import dask.dataframe as dd import healpy as hp +import nested_dask as nd +import nested_pandas as npd import numpy as np import pandas as pd import pytest @@ -46,13 +48,16 @@ def test_get_catalog_partition_gets_correct_partition(small_sky_order1_catalog): pixel = HealpixPixel(order=hp_order, pixel=hp_pixel) partition_index = small_sky_order1_catalog._ddf_pixel_map[pixel] ddf_partition = small_sky_order1_catalog._ddf.partitions[partition_index] - dd.utils.assert_eq(partition, ddf_partition) + assert isinstance(partition, nd.NestedFrame) + assert isinstance(partition.compute(), npd.NestedFrame) + pd.testing.assert_frame_equal(partition.compute(), ddf_partition.compute()) def test_head(small_sky_order1_catalog): # By default, head returns 5 rows expected_df = small_sky_order1_catalog._ddf.partitions[0].compute()[:5] head_df = small_sky_order1_catalog.head() + assert isinstance(head_df, npd.NestedFrame) assert len(head_df) == 5 pd.testing.assert_frame_equal(expected_df, head_df) # But we can also specify the number of rows we desire @@ -66,8 +71,8 @@ def test_head_rows_less_than_requested(small_sky_order1_catalog): schema = small_sky_order1_catalog.dtypes two_rows = small_sky_order1_catalog._ddf.partitions[0].compute()[:2] tiny_df = pd.DataFrame(data=two_rows, columns=schema.index, dtype=schema.to_numpy()) - altered_ddf = dd.from_pandas(tiny_df, npartitions=1) - catalog = lsdb.Catalog(altered_ddf, {}, small_sky_order1_catalog.hc_structure) + altered_ndf = nd.NestedFrame.from_pandas(tiny_df, npartitions=1) + catalog = lsdb.Catalog(altered_ndf, {}, small_sky_order1_catalog.hc_structure) # The head only contains two values assert len(catalog.head()) == 2 @@ -77,8 +82,8 @@ def test_head_first_partition_is_empty(small_sky_order1_catalog): schema = small_sky_order1_catalog.dtypes empty_df = pd.DataFrame(columns=schema.index, dtype=schema.to_numpy()) empty_ddf = dd.from_pandas(empty_df, npartitions=1) - altered_ddf = dd.concat([empty_ddf, small_sky_order1_catalog._ddf]) - catalog = lsdb.Catalog(altered_ddf, {}, small_sky_order1_catalog.hc_structure) + altered_ndf = nd.NestedFrame.from_dask_dataframe(dd.concat([empty_ddf, small_sky_order1_catalog._ddf])) + catalog = lsdb.Catalog(altered_ndf, {}, small_sky_order1_catalog.hc_structure) # The first partition is empty first_partition_df = catalog._ddf.partitions[0].compute() assert len(first_partition_df) == 0 @@ -101,24 +106,22 @@ def test_query(small_sky_order1_catalog): ] # Simple query, with no value injection or backticks result_catalog = small_sky_order1_catalog.query("ra > 300 and dec < -50") + assert isinstance(result_catalog._ddf, nd.NestedFrame) pd.testing.assert_frame_equal(result_catalog._ddf.compute(), expected_ddf.compute()) # Query with value injection ra, dec = 300, -50 result_catalog = small_sky_order1_catalog.query(f"ra > {ra} and dec < {dec}") + assert isinstance(result_catalog._ddf, nd.NestedFrame) pd.testing.assert_frame_equal(result_catalog._ddf.compute(), expected_ddf.compute()) # Query with backticks (for invalid Python variables names) new_columns = {"ra": "right ascension"} expected_ddf = expected_ddf.rename(columns=new_columns) small_sky_order1_catalog._ddf = small_sky_order1_catalog._ddf.rename(columns=new_columns) result_catalog = small_sky_order1_catalog.query("`right ascension` > 300 and dec < -50") + assert isinstance(result_catalog._ddf, nd.NestedFrame) pd.testing.assert_frame_equal(result_catalog._ddf.compute(), expected_ddf.compute()) -def test_query_no_arguments(small_sky_order1_catalog): - with pytest.raises(ValueError): - small_sky_order1_catalog.query(None) - - def test_query_margin(small_sky_xmatch_with_margin): expected_ddf = small_sky_xmatch_with_margin._ddf.copy()[ (small_sky_xmatch_with_margin._ddf["ra"] > 300) & (small_sky_xmatch_with_margin._ddf["dec"] < -50) @@ -132,11 +135,13 @@ def test_query_margin(small_sky_xmatch_with_margin): assert result_catalog.margin is not None pd.testing.assert_frame_equal(result_catalog.compute(), expected_ddf.compute()) pd.testing.assert_frame_equal(result_catalog.margin.compute(), expected_margin_ddf.compute()) + assert isinstance(result_catalog.margin._ddf, nd.NestedFrame) def test_assign_no_arguments(small_sky_order1_catalog): result_catalog = small_sky_order1_catalog.assign() pd.testing.assert_frame_equal(result_catalog._ddf.compute(), small_sky_order1_catalog._ddf.compute()) + assert isinstance(result_catalog._ddf, nd.NestedFrame) def test_assign_with_callable(small_sky_order1_catalog): @@ -145,6 +150,7 @@ def test_assign_with_callable(small_sky_order1_catalog): expected_ddf = small_sky_order1_catalog._ddf.copy() expected_ddf["squared_ra_err"] = expected_ddf["ra_error"] ** 2 pd.testing.assert_frame_equal(result_catalog.compute(), expected_ddf.compute()) + assert isinstance(result_catalog._ddf, nd.NestedFrame) def test_assign_with_series(small_sky_order1_catalog): @@ -155,6 +161,7 @@ def test_assign_with_series(small_sky_order1_catalog): expected_ddf = small_sky_order1_catalog._ddf.copy() expected_ddf["new_column"] = squared_ra_err pd.testing.assert_frame_equal(result_catalog.compute(), expected_ddf.compute()) + assert isinstance(result_catalog._ddf, nd.NestedFrame) def test_assign_with_multiple_columns(small_sky_order1_catalog): @@ -260,6 +267,8 @@ def test_prune_empty_partitions(small_sky_order1_catalog): _, non_empty_partitions = pruned_catalog._get_non_empty_partitions() assert pruned_catalog._ddf.npartitions - len(non_empty_partitions) == 0 pd.testing.assert_frame_equal(catalog.compute(), pruned_catalog.compute()) + assert isinstance(pruned_catalog._ddf, nd.NestedFrame) + assert isinstance(pruned_catalog.compute(), npd.NestedFrame) def test_prune_empty_partitions_with_none_to_remove(small_sky_order1_catalog): @@ -467,6 +476,8 @@ def test_square_bracket_columns(small_sky_order1_catalog): column_subset = small_sky_order1_catalog[columns] assert all(column_subset.columns == columns) assert isinstance(column_subset, Catalog) + assert isinstance(column_subset._ddf, nd.NestedFrame) + assert isinstance(column_subset.compute(), npd.NestedFrame) pd.testing.assert_frame_equal(column_subset.compute(), small_sky_order1_catalog.compute()[columns]) assert np.all( column_subset.compute().index.to_numpy() == small_sky_order1_catalog.compute().index.to_numpy() @@ -484,6 +495,8 @@ def test_square_bracket_column(small_sky_order1_catalog): def test_square_bracket_filter(small_sky_order1_catalog): filtered_id = small_sky_order1_catalog[small_sky_order1_catalog["id"] > 750] assert isinstance(filtered_id, Catalog) + assert isinstance(filtered_id._ddf, nd.NestedFrame) + assert isinstance(filtered_id.compute(), npd.NestedFrame) ss_computed = small_sky_order1_catalog.compute() pd.testing.assert_frame_equal(filtered_id.compute(), ss_computed[ss_computed["id"] > 750]) assert np.all( @@ -498,9 +511,11 @@ def add_col(df): mapped = small_sky_order1_catalog.map_partitions(add_col) assert isinstance(mapped, Catalog) + assert isinstance(mapped._ddf, nd.NestedFrame) assert "a" in mapped.columns assert mapped.dtypes["a"] == mapped.dtypes["ra"] mapcomp = mapped.compute() + assert isinstance(mapcomp, npd.NestedFrame) assert np.all(mapcomp["a"] == mapcomp["ra"] + 1) @@ -562,10 +577,14 @@ def test_square_bracket_single_partition(small_sky_order1_catalog): index = 1 subset = small_sky_order1_catalog.partitions[index] assert isinstance(subset, Catalog) + assert isinstance(subset._ddf, nd.NestedFrame) assert 1 == len(subset._ddf_pixel_map) pixel = subset.get_healpix_pixels()[0] assert index == small_sky_order1_catalog.get_partition_index(pixel.order, pixel.pixel) - dd.assert_eq(small_sky_order1_catalog._ddf.partitions[index], subset._ddf) + pd.testing.assert_frame_equal( + small_sky_order1_catalog._ddf.partitions[index].compute(), subset._ddf.compute() + ) + assert isinstance(subset.compute(), npd.NestedFrame) def test_square_bracket_multiple_partitions(small_sky_order1_catalog): @@ -577,7 +596,7 @@ def test_square_bracket_multiple_partitions(small_sky_order1_catalog): original_index = small_sky_order1_catalog.get_partition_index(pixel.order, pixel.pixel) original_partition = small_sky_order1_catalog._ddf.partitions[original_index] subset_partition = subset._ddf.partitions[partition_index] - dd.assert_eq(original_partition, subset_partition) + pd.testing.assert_frame_equal(original_partition.compute(), subset_partition.compute()) def test_square_bracket_slice_partitions(small_sky_order1_catalog): @@ -586,8 +605,8 @@ def test_square_bracket_slice_partitions(small_sky_order1_catalog): assert 2 == len(subset._ddf_pixel_map) subset_2 = small_sky_order1_catalog.partitions[0:2] assert isinstance(subset, Catalog) - dd.assert_eq(subset_2._ddf, subset._ddf) + pd.testing.assert_frame_equal(subset_2.compute(), subset.compute()) assert subset_2.get_healpix_pixels() == subset.get_healpix_pixels() subset_3 = small_sky_order1_catalog.partitions[0:2:1] assert subset_3.get_healpix_pixels() == subset.get_healpix_pixels() - dd.assert_eq(subset_3._ddf, subset._ddf) + pd.testing.assert_frame_equal(subset_3.compute(), subset.compute()) diff --git a/tests/lsdb/catalog/test_cone_search.py b/tests/lsdb/catalog/test_cone_search.py index 0a18daa9..188d556d 100644 --- a/tests/lsdb/catalog/test_cone_search.py +++ b/tests/lsdb/catalog/test_cone_search.py @@ -1,3 +1,5 @@ +import nested_dask as nd +import nested_pandas as npd import pandas as pd import pytest from astropy.coordinates import SkyCoord @@ -11,7 +13,9 @@ def test_cone_search_filters_correct_points(small_sky_order1_catalog, assert_div radius = radius_degrees * 3600 center_coord = SkyCoord(ra, dec, unit="deg") cone_search_catalog = small_sky_order1_catalog.cone_search(ra, dec, radius) + assert isinstance(cone_search_catalog._ddf, nd.NestedFrame) cone_search_df = cone_search_catalog.compute() + assert isinstance(cone_search_df, npd.NestedFrame) for _, row in small_sky_order1_catalog.compute().iterrows(): row_ra = row[small_sky_order1_catalog.hc_structure.catalog_info.ra_column] row_dec = row[small_sky_order1_catalog.hc_structure.catalog_info.dec_column] diff --git a/tests/lsdb/catalog/test_crossmatch.py b/tests/lsdb/catalog/test_crossmatch.py index 648d1611..c942ea84 100644 --- a/tests/lsdb/catalog/test_crossmatch.py +++ b/tests/lsdb/catalog/test_crossmatch.py @@ -1,3 +1,5 @@ +import nested_dask as nd +import nested_pandas as npd import numpy as np import pandas as pd import pyarrow as pa @@ -17,9 +19,12 @@ class TestCrossmatch: @staticmethod def test_kdtree_crossmatch(algo, small_sky_catalog, small_sky_xmatch_catalog, xmatch_correct): with pytest.warns(RuntimeWarning, match="Results may be incomplete and/or inaccurate"): - xmatched = small_sky_catalog.crossmatch( + xmatched_cat = small_sky_catalog.crossmatch( small_sky_xmatch_catalog, algorithm=algo, radius_arcsec=0.01 * 3600 - ).compute() + ) + assert isinstance(xmatched_cat._ddf, nd.NestedFrame) + xmatched = xmatched_cat.compute() + assert isinstance(xmatched, npd.NestedFrame) assert len(xmatched) == len(xmatch_correct) for _, correct_row in xmatch_correct.iterrows(): assert correct_row["ss_id"] in xmatched["id_small_sky"].to_numpy() diff --git a/tests/lsdb/catalog/test_index_search.py b/tests/lsdb/catalog/test_index_search.py index 1fddafe5..20481703 100644 --- a/tests/lsdb/catalog/test_index_search.py +++ b/tests/lsdb/catalog/test_index_search.py @@ -1,3 +1,5 @@ +import nested_dask as nd +import nested_pandas as npd from hipscat.catalog.index.index_catalog import IndexCatalog @@ -5,7 +7,9 @@ def test_index_search(small_sky_order1_catalog, small_sky_order1_id_index_dir, a catalog_index = IndexCatalog.read_from_hipscat(small_sky_order1_id_index_dir) # Searching for an object that does not exist index_search_catalog = small_sky_order1_catalog.index_search([900], catalog_index) + assert isinstance(index_search_catalog._ddf, nd.NestedFrame) index_search_df = index_search_catalog.compute() + assert isinstance(index_search_df, npd.NestedFrame) assert len(index_search_df) == 0 assert_divisions_are_correct(index_search_catalog) # Searching for an object that exists diff --git a/tests/lsdb/catalog/test_join.py b/tests/lsdb/catalog/test_join.py index 56e87357..46b964bd 100644 --- a/tests/lsdb/catalog/test_join.py +++ b/tests/lsdb/catalog/test_join.py @@ -1,3 +1,5 @@ +import nested_dask as nd +import nested_pandas as npd import numpy as np import pandas as pd import pytest @@ -12,6 +14,7 @@ def test_small_sky_join_small_sky_order1( joined = small_sky_catalog.join( small_sky_order1_catalog, left_on="id", right_on="id", suffixes=suffixes ) + assert isinstance(joined._ddf, nd.NestedFrame) for col_name, dtype in small_sky_catalog.dtypes.items(): assert (col_name + suffixes[0], dtype) in joined.dtypes.items() for col_name, dtype in small_sky_order1_catalog.dtypes.items(): @@ -20,6 +23,7 @@ def test_small_sky_join_small_sky_order1( assert joined._ddf.index.dtype == np.uint64 joined_compute = joined.compute() + assert isinstance(joined_compute, npd.NestedFrame) small_sky_compute = small_sky_catalog.compute() small_sky_order1_compute = small_sky_order1_catalog.compute() assert len(joined_compute) == len(small_sky_compute) @@ -68,8 +72,10 @@ def test_join_association(small_sky_catalog, small_sky_xmatch_catalog, small_sky joined = small_sky_catalog.join( small_sky_xmatch_catalog, through=small_sky_to_xmatch_catalog, suffixes=suffixes ) + assert isinstance(joined._ddf, nd.NestedFrame) assert joined._ddf.npartitions == len(small_sky_to_xmatch_catalog.hc_structure.join_info.data_frame) joined_data = joined.compute() + assert isinstance(joined_data, npd.NestedFrame) association_data = small_sky_to_xmatch_catalog.compute() assert len(joined_data) == len(association_data) @@ -179,3 +185,32 @@ def test_join_source_margin_soft( suffixes=suffixes, ) pd.testing.assert_frame_equal(joined.compute(), joined_on.compute()) + + +def test_join_nested(small_sky_catalog, small_sky_order1_source_with_margin, assert_divisions_are_correct): + joined = small_sky_catalog.join_nested( + small_sky_order1_source_with_margin, + left_on="id", + right_on="object_id", + nested_column_name="sources", + ) + for col_name, dtype in small_sky_catalog.dtypes.items(): + assert (col_name, dtype) in joined.dtypes.items() + for col_name, dtype in small_sky_order1_source_with_margin.dtypes.items(): + if col_name != "object_id": + assert (col_name, dtype.pyarrow_dtype) in joined["sources"].dtypes.fields.items() + assert_divisions_are_correct(joined) + joined_compute = joined.compute() + source_compute = small_sky_order1_source_with_margin.compute() + assert isinstance(joined_compute, npd.NestedFrame) + for _, row in joined_compute.iterrows(): + row_id = row["id"] + pd.testing.assert_frame_equal( + row["sources"].sort_values("source_ra").reset_index(drop=True), + pd.DataFrame(source_compute[source_compute["object_id"] == row_id].set_index("object_id")) + .sort_values("source_ra") + .reset_index(drop=True), + check_dtype=False, + check_column_type=False, + check_index_type=False, + ) diff --git a/tests/lsdb/catalog/test_margin_catalog.py b/tests/lsdb/catalog/test_margin_catalog.py index 8b9716ca..98d1cff2 100644 --- a/tests/lsdb/catalog/test_margin_catalog.py +++ b/tests/lsdb/catalog/test_margin_catalog.py @@ -1,6 +1,7 @@ from pathlib import Path import hipscat as hc +import nested_dask as nd import pandas as pd import lsdb @@ -10,6 +11,7 @@ def test_read_margin_catalog(small_sky_xmatch_margin_dir): margin = lsdb.read_hipscat(small_sky_xmatch_margin_dir) assert isinstance(margin, MarginCatalog) + assert isinstance(margin._ddf, nd.NestedFrame) hc_margin = hc.catalog.MarginCatalog.read_from_hipscat(small_sky_xmatch_margin_dir) assert margin.hc_structure.catalog_info == hc_margin.catalog_info assert margin.hc_structure.get_healpix_pixels() == hc_margin.get_healpix_pixels() diff --git a/tests/lsdb/catalog/test_merge.py b/tests/lsdb/catalog/test_merge.py index adf79def..51336b37 100644 --- a/tests/lsdb/catalog/test_merge.py +++ b/tests/lsdb/catalog/test_merge.py @@ -1,4 +1,4 @@ -import dask.dataframe as dd +import nested_dask as nd import pandas as pd import pytest @@ -10,10 +10,10 @@ def test_catalog_merge_on_indices(small_sky_catalog, small_sky_order1_catalog, h small_sky_catalog._ddf = small_sky_catalog._ddf.set_index("id") small_sky_order1_catalog._ddf = small_sky_order1_catalog._ddf.set_index("id") # The wrapper outputs the same result as the underlying pandas merge - merged_ddf = small_sky_catalog.merge(small_sky_order1_catalog, **kwargs) - assert isinstance(merged_ddf, dd.DataFrame) + merged_ndf = small_sky_catalog.merge(small_sky_order1_catalog, **kwargs) + assert isinstance(merged_ndf, nd.NestedFrame) expected_df = small_sky_catalog._ddf.merge(small_sky_order1_catalog._ddf, **kwargs) - pd.testing.assert_frame_equal(expected_df.compute(), merged_ddf.compute()) + pd.testing.assert_frame_equal(expected_df.compute(), merged_ndf.compute()) @pytest.mark.parametrize("how", ["left", "right", "inner", "outer"]) @@ -24,7 +24,7 @@ def test_catalog_merge_on_columns(small_sky_catalog, small_sky_order1_catalog, h small_sky_order1_catalog._ddf = small_sky_order1_catalog._ddf.reset_index() # The wrapper outputs the same result as the underlying pandas merge merged_ddf = small_sky_catalog.merge(small_sky_order1_catalog, **kwargs) - assert isinstance(merged_ddf, dd.DataFrame) + assert isinstance(merged_ddf, nd.NestedFrame) expected_df = small_sky_catalog._ddf.merge(small_sky_order1_catalog._ddf, **kwargs) pd.testing.assert_frame_equal(expected_df.compute(), merged_ddf.compute()) @@ -38,7 +38,7 @@ def test_catalog_merge_on_index_and_column(small_sky_catalog, small_sky_order1_c small_sky_order1_catalog._ddf = small_sky_order1_catalog._ddf.reset_index() # The wrapper outputs the same result as the underlying pandas merge merged_ddf = small_sky_catalog.merge(small_sky_order1_catalog, **kwargs) - assert isinstance(merged_ddf, dd.DataFrame) + assert isinstance(merged_ddf, nd.NestedFrame) expected_df = small_sky_catalog._ddf.merge(small_sky_order1_catalog._ddf, **kwargs) pd.testing.assert_frame_equal(expected_df.compute(), merged_ddf.compute()) @@ -52,7 +52,7 @@ def test_catalog_merge_invalid_suffixes(small_sky_catalog, small_sky_order1_cata def test_catalog_merge_no_suffixes(small_sky_catalog, small_sky_order1_catalog): merged_ddf = small_sky_catalog.merge(small_sky_order1_catalog, how="inner", on="id") - assert isinstance(merged_ddf, dd.DataFrame) + assert isinstance(merged_ddf, nd.NestedFrame) # Get the columns with the same name in both catalogs non_join_columns_left = small_sky_catalog._ddf.columns.drop("id") non_join_columns_right = small_sky_order1_catalog._ddf.columns.drop("id") diff --git a/tests/lsdb/catalog/test_order_search.py b/tests/lsdb/catalog/test_order_search.py index 7a56245e..f53aa9c9 100644 --- a/tests/lsdb/catalog/test_order_search.py +++ b/tests/lsdb/catalog/test_order_search.py @@ -1,3 +1,4 @@ +import nested_dask as nd import pandas as pd import pytest @@ -6,6 +7,7 @@ def test_order_search_filters_correct_pixels(small_sky_source_catalog, assert_divisions_are_correct): order_search_catalog = small_sky_source_catalog.order_search(min_order=1, max_order=1) + assert isinstance(order_search_catalog._ddf, nd.NestedFrame) pixel_orders = [pixel.order for pixel in order_search_catalog.get_healpix_pixels()] assert all(order == 1 for order in pixel_orders) assert_divisions_are_correct(order_search_catalog) diff --git a/tests/lsdb/catalog/test_pixel_search.py b/tests/lsdb/catalog/test_pixel_search.py index 2dc4bba4..41c4e1ba 100644 --- a/tests/lsdb/catalog/test_pixel_search.py +++ b/tests/lsdb/catalog/test_pixel_search.py @@ -1,3 +1,4 @@ +import nested_dask as nd import pandas as pd from hipscat.pixel_math import HealpixPixel @@ -7,6 +8,7 @@ def test_pixel_search(small_sky_catalog, small_sky_order1_catalog): # Searching for pixels at a higher order catalog = small_sky_catalog.pixel_search([(1, 44), (1, 45)]) + assert isinstance(catalog._ddf, nd.NestedFrame) assert 1 == len(catalog._ddf_pixel_map) assert [HealpixPixel(0, 11)] == catalog.get_healpix_pixels() # Searching for pixels at a lower order diff --git a/tests/lsdb/catalog/test_polygon_search.py b/tests/lsdb/catalog/test_polygon_search.py index 4cd7add3..4dd775c9 100644 --- a/tests/lsdb/catalog/test_polygon_search.py +++ b/tests/lsdb/catalog/test_polygon_search.py @@ -1,3 +1,5 @@ +import nested_dask as nd +import nested_pandas as npd import numpy as np import numpy.testing as npt import pytest @@ -10,7 +12,9 @@ def test_polygon_search_filters_correct_points(small_sky_order1_catalog, assert_ vertices = [(300, -50), (300, -55), (272, -55), (272, -50)] polygon, _ = get_cartesian_polygon(vertices) polygon_search_catalog = small_sky_order1_catalog.polygon_search(vertices) + assert isinstance(polygon_search_catalog._ddf, nd.NestedFrame) polygon_search_df = polygon_search_catalog.compute() + assert isinstance(polygon_search_df, npd.NestedFrame) ra_values_radians = np.radians( polygon_search_df[small_sky_order1_catalog.hc_structure.catalog_info.ra_column] ) diff --git a/tests/lsdb/loaders/dataframe/test_from_dataframe.py b/tests/lsdb/loaders/dataframe/test_from_dataframe.py index df33749e..048878d5 100644 --- a/tests/lsdb/loaders/dataframe/test_from_dataframe.py +++ b/tests/lsdb/loaders/dataframe/test_from_dataframe.py @@ -3,6 +3,8 @@ import astropy.units as u import healpy as hp import hipscat as hc +import nested_dask as nd +import nested_pandas as npd import numpy as np import numpy.testing as npt import pandas as pd @@ -40,6 +42,7 @@ def test_from_dataframe( # Read CSV file for the small sky order 1 catalog catalog = lsdb.from_dataframe(small_sky_order1_df, margin_threshold=None, **kwargs) assert isinstance(catalog, lsdb.Catalog) + assert isinstance(catalog._ddf, nd.NestedFrame) # Catalogs have the same information assert catalog.hc_structure.catalog_info == small_sky_order1_catalog.hc_structure.catalog_info # Index is set to hipscat index @@ -54,6 +57,8 @@ def test_from_dataframe( expected_schema = hc.read_from_hipscat(small_sky_order1_dir).schema assert catalog.hc_structure.schema.equals(expected_schema) + assert isinstance(catalog.compute(), npd.NestedFrame) + def test_from_dataframe_catalog_of_invalid_type(small_sky_order1_df, small_sky_order1_catalog): """Tests that an exception is thrown if the catalog is not of type OBJECT or SOURCE""" @@ -219,6 +224,7 @@ def test_from_dataframe_small_sky_source_with_margins(small_sky_source_df, small assert catalog.margin is not None assert isinstance(catalog.margin, MarginCatalog) + assert isinstance(catalog.margin._ddf, nd.NestedFrame) assert catalog.margin.get_healpix_pixels() == small_sky_source_margin_catalog.get_healpix_pixels() # The points of this margin catalog are present in one partition only @@ -228,6 +234,7 @@ def test_from_dataframe_small_sky_source_with_margins(small_sky_source_df, small small_sky_source_margin_catalog.compute().sort_index(), check_like=True, ) + assert isinstance(catalog.margin.compute(), npd.NestedFrame) # The margin and main catalog's schemas are the same assert catalog.margin.hc_structure.schema is catalog.hc_structure.schema diff --git a/tests/lsdb/loaders/hipscat/test_read_hipscat.py b/tests/lsdb/loaders/hipscat/test_read_hipscat.py index fe0df769..6916bd2a 100644 --- a/tests/lsdb/loaders/hipscat/test_read_hipscat.py +++ b/tests/lsdb/loaders/hipscat/test_read_hipscat.py @@ -1,6 +1,8 @@ from pathlib import Path import hipscat as hc +import nested_dask as nd +import nested_pandas as npd import numpy as np import numpy.testing as npt import pandas as pd @@ -17,9 +19,11 @@ def test_read_hipscat(small_sky_order1_dir, small_sky_order1_hipscat_catalog, assert_divisions_are_correct): catalog = lsdb.read_hipscat(small_sky_order1_dir) assert isinstance(catalog, lsdb.Catalog) + assert isinstance(catalog._ddf, nd.NestedFrame) assert catalog.hc_structure.catalog_base_dir == small_sky_order1_hipscat_catalog.catalog_base_dir assert catalog.get_healpix_pixels() == small_sky_order1_hipscat_catalog.get_healpix_pixels() assert len(catalog.compute().columns) == 8 + assert isinstance(catalog.compute(), npd.NestedFrame) assert_divisions_are_correct(catalog) @@ -65,9 +69,11 @@ def test_parquet_data_in_partitions_match_files(small_sky_order1_dir, small_sky_ def test_read_hipscat_specify_catalog_type(small_sky_catalog, small_sky_dir): catalog = lsdb.read_hipscat(small_sky_dir, catalog_type=lsdb.Catalog) assert isinstance(catalog, lsdb.Catalog) + assert isinstance(catalog._ddf, nd.NestedFrame) pd.testing.assert_frame_equal(catalog.compute(), small_sky_catalog.compute()) assert catalog.get_healpix_pixels() == small_sky_catalog.get_healpix_pixels() assert catalog.hc_structure.catalog_info == small_sky_catalog.hc_structure.catalog_info + assert isinstance(catalog.compute(), npd.NestedFrame) def test_read_hipscat_specify_wrong_catalog_type(small_sky_dir): @@ -79,7 +85,9 @@ def test_catalog_with_margin_object(small_sky_xmatch_dir, small_sky_xmatch_margi catalog = lsdb.read_hipscat(small_sky_xmatch_dir, margin_cache=small_sky_xmatch_margin_catalog) assert isinstance(catalog, lsdb.Catalog) assert isinstance(catalog.margin, lsdb.MarginCatalog) + assert isinstance(catalog._ddf, nd.NestedFrame) assert catalog.margin is small_sky_xmatch_margin_catalog + assert isinstance(catalog.margin._ddf, nd.NestedFrame) def test_catalog_with_margin_file_pointer( @@ -89,6 +97,8 @@ def test_catalog_with_margin_file_pointer( catalog = lsdb.read_hipscat(small_sky_xmatch_dir, margin_cache=small_sky_xmatch_margin_fp) assert isinstance(catalog, lsdb.Catalog) assert isinstance(catalog.margin, lsdb.MarginCatalog) + assert isinstance(catalog._ddf, nd.NestedFrame) + assert isinstance(catalog.margin._ddf, nd.NestedFrame) assert ( catalog.margin.hc_structure.catalog_info == small_sky_xmatch_margin_catalog.hc_structure.catalog_info ) @@ -103,6 +113,8 @@ def test_catalog_with_margin_path( catalog = lsdb.read_hipscat(small_sky_xmatch_dir, margin_cache=small_sky_xmatch_margin_dir) assert isinstance(catalog, lsdb.Catalog) assert isinstance(catalog.margin, lsdb.MarginCatalog) + assert isinstance(catalog._ddf, nd.NestedFrame) + assert isinstance(catalog.margin._ddf, nd.NestedFrame) assert ( catalog.margin.hc_structure.catalog_info == small_sky_xmatch_margin_catalog.hc_structure.catalog_info ) @@ -221,6 +233,9 @@ def test_read_hipscat_with_backend(small_sky_dir): catalog = lsdb.read_hipscat(small_sky_dir, dtype_backend=None) assert all(isinstance(col_type, np.dtype) for col_type in catalog.dtypes) + assert isinstance(catalog._ddf, nd.NestedFrame) + assert isinstance(catalog.compute(), npd.NestedFrame) + def test_read_hipscat_with_invalid_backend(small_sky_dir): with pytest.raises(ValueError, match="data type backend must be either"):