From 7d40abe00cbc5c90b499c86b7cd445d33a88d5d2 Mon Sep 17 00:00:00 2001 From: Sean McGuire Date: Fri, 23 Aug 2024 16:40:47 -0400 Subject: [PATCH 1/2] Add merge_asof function --- src/lsdb/catalog/catalog.py | 48 ++++++++++++++++++ src/lsdb/dask/join_catalog_data.py | 79 +++++++++++++++++++++++++++++- tests/lsdb/catalog/test_join.py | 33 ++++++++++++- 3 files changed, 158 insertions(+), 2 deletions(-) diff --git a/src/lsdb/catalog/catalog.py b/src/lsdb/catalog/catalog.py index 5c00e6bd..34830739 100644 --- a/src/lsdb/catalog/catalog.py +++ b/src/lsdb/catalog/catalog.py @@ -23,6 +23,7 @@ join_catalog_data_nested, join_catalog_data_on, join_catalog_data_through, + merge_asof_catalog_data, ) from lsdb.dask.partition_indexer import PartitionIndexer from lsdb.io.schema import get_arrow_schema @@ -378,6 +379,53 @@ def merge( suffixes=suffixes, ) + def merge_asof( + self, + other: Catalog, + direction: str = "backward", + suffixes: Tuple[str, str] | None = None, + output_catalog_name: str | None = None, + ): + """Uses the pandas `merge_asof` function to merge two catalogs on their indices by distance of keys + + Must be along catalog indices, and does not include margin caches, meaning results may be incomplete + for merging points. + + This function is intended for use in special cases such as Dust Map Catalogs, for general merges, + the `crossmatch`and `join` functions should be used. + + Args: + other (lsdb.Catalog): the right catalog to merge to + suffixes (Tuple[str,str]): the suffixes to apply to each partition's column names + direction (str): the direction to perform the merge_asof + + Returns: + A new catalog with the columns from each of the input catalogs with their respective suffixes + added, and the rows merged using merge_asof on the specified columns. + """ + if suffixes is None: + suffixes = (f"_{self.name}", f"_{other.name}") + + if len(suffixes) != 2: + raise ValueError("`suffixes` must be a tuple with two strings") + + ddf, ddf_map, alignment = merge_asof_catalog_data(self, other, suffixes=suffixes, direction=direction) + + if output_catalog_name is None: + output_catalog_name = ( + f"{self.hc_structure.catalog_info.catalog_name} merge_asof " + f"{other.hc_structure.catalog_info.catalog_name}" + ) + + new_catalog_info = dataclasses.replace( + self.hc_structure.catalog_info, + catalog_name=output_catalog_name, + ra_column=self.hc_structure.catalog_info.ra_column + suffixes[0], + dec_column=self.hc_structure.catalog_info.dec_column + suffixes[0], + ) + hc_catalog = hc.catalog.Catalog(new_catalog_info, alignment.pixel_tree, schema=get_arrow_schema(ddf)) + return Catalog(ddf, ddf_map, hc_catalog) + def join( self, other: Catalog, diff --git a/src/lsdb/dask/join_catalog_data.py b/src/lsdb/dask/join_catalog_data.py index 045d3fbd..6ef1d7e8 100644 --- a/src/lsdb/dask/join_catalog_data.py +++ b/src/lsdb/dask/join_catalog_data.py @@ -1,5 +1,4 @@ # pylint: disable=duplicate-code - from __future__ import annotations import warnings @@ -8,6 +7,7 @@ import dask import nested_dask as nd import nested_pandas as npd +import pandas as pd from hipscat.catalog.association_catalog import AssociationCatalogInfo from hipscat.catalog.catalog_info import CatalogInfo from hipscat.catalog.margin_cache import MarginCacheCatalogInfo @@ -228,6 +228,42 @@ def perform_join_through( return merged +# pylint: disable=too-many-arguments, unused-argument +@dask.delayed +def perform_merge_asof( + left: npd.NestedFrame, + right: npd.NestedFrame, + left_pixel: HealpixPixel, + right_pixel: HealpixPixel, + left_catalog_info: CatalogInfo, + right_catalog_info: CatalogInfo, + suffixes: Tuple[str, str], + direction: str, +): + """Performs a merge_asof on two catalog partitions + + Args: + left (npd.NestedFrame): the left partition to merge + right (npd.NestedFrame): the right partition to merge + left_pixel (HealpixPixel): the HEALPix pixel of the left partition + right_pixel (HealpixPixel): the HEALPix pixel of the right partition + left_catalog_info (hc.CatalogInfo): the catalog info of the left catalog + right_catalog_info (hc.CatalogInfo): the catalog info of the right catalog + suffixes (Tuple[str,str]): the suffixes to apply to each partition's column names + direction (str): The direction to perform the merge_asof + + Returns: + A dataframe with the result of merging the left and right partitions on the specified columns with + `merge_asof` + """ + if right_pixel.order > left_pixel.order: + left = filter_by_hipscat_index_to_pixel(left, right_pixel.order, right_pixel.pixel) + + left, right = rename_columns_with_suffixes(left, right, suffixes) + merged = pd.merge_asof(left, right, left_index=True, right_index=True, direction=direction) + return merged + + def join_catalog_data_on( left: Catalog, right: Catalog, left_on: str, right_on: str, suffixes: Tuple[str, str] ) -> Tuple[nd.NestedFrame, DaskDFPixelMap, PixelAlignment]: @@ -382,3 +418,44 @@ def join_catalog_data_through( meta_df = generate_meta_df_for_joined_tables([left, extra_df, right], [suffixes[0], "", suffixes[1]]) return construct_catalog_args(joined_partitions, meta_df, alignment) + + +def merge_asof_catalog_data( + left: Catalog, right: Catalog, suffixes: Tuple[str, str], direction: str = "backward" +) -> Tuple[nd.NestedFrame, DaskDFPixelMap, PixelAlignment]: + """Uses the pandas `merge_asof` function to merge two catalogs on their indices by distance of keys + + Must be along catalog indices, and does not include margin caches, meaning results may be incomplete for + merging points. + + This function is intended for use in special cases such as Dust Map Catalogs, for general merges, + the `crossmatch`and `join` functions should be used. + + Args: + left (lsdb.Catalog): the left catalog to join + right (lsdb.Catalog): the right catalog to join + suffixes (Tuple[str,str]): the suffixes to apply to each partition's column names + direction (str): the direction to perform the merge_asof + + Returns: + A tuple of the dask dataframe with the result of the join, the pixel map from HEALPix + pixel to partition index within the dataframe, and the PixelAlignment of the two input + catalogs. + """ + if right.margin is None: + warnings.warn( + "Right catalog does not have a margin cache. Results may be incomplete and/or inaccurate.", + RuntimeWarning, + ) + + alignment = align_catalogs(left, right) + + left_pixels, right_pixels = get_healpix_pixels_from_alignment(alignment) + + joined_partitions = align_and_apply( + [(left, left_pixels), (right, right_pixels)], perform_merge_asof, suffixes, direction + ) + + meta_df = generate_meta_df_for_joined_tables([left, right], suffixes) + + return construct_catalog_args(joined_partitions, meta_df, alignment) diff --git a/tests/lsdb/catalog/test_join.py b/tests/lsdb/catalog/test_join.py index 46b964bd..a147142e 100644 --- a/tests/lsdb/catalog/test_join.py +++ b/tests/lsdb/catalog/test_join.py @@ -3,7 +3,7 @@ import numpy as np import pandas as pd import pytest -from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN +from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN, hipscat_id_to_healpix def test_small_sky_join_small_sky_order1( @@ -214,3 +214,34 @@ def test_join_nested(small_sky_catalog, small_sky_order1_source_with_margin, ass check_column_type=False, check_index_type=False, ) + + +def test_merge_asof(small_sky_catalog, small_sky_xmatch_catalog, assert_divisions_are_correct): + suffixes = ("_a", "_b") + for direction in ["backward", "forward", "nearest"]: + joined = small_sky_catalog.merge_asof( + small_sky_xmatch_catalog, direction=direction, suffixes=suffixes + ) + assert isinstance(joined._ddf, nd.NestedFrame) + assert_divisions_are_correct(joined) + joined_compute = joined.compute() + assert isinstance(joined_compute, npd.NestedFrame) + small_sky_compute = small_sky_catalog.compute().rename( + columns={c: c + suffixes[0] for c in small_sky_catalog.columns} + ) + order_1_partition = hipscat_id_to_healpix(small_sky_compute.index.to_numpy(), 1) + left_partitions = [ + small_sky_compute[order_1_partition == p.pixel] + for p in small_sky_xmatch_catalog.get_healpix_pixels() + ] + small_sky_order1_partitions = [ + p.compute().rename(columns={c: c + suffixes[1] for c in small_sky_xmatch_catalog.columns}) + for p in small_sky_xmatch_catalog.partitions + ] + correct_result = pd.concat( + [ + pd.merge_asof(lp, rp, direction=direction, left_index=True, right_index=True) + for lp, rp in zip(left_partitions, small_sky_order1_partitions) + ] + ) + pd.testing.assert_frame_equal(joined_compute, correct_result) From 55ede314005eaab6f69b90cf29225bfdf7f16bf5 Mon Sep 17 00:00:00 2001 From: Sean McGuire Date: Mon, 26 Aug 2024 18:40:03 -0400 Subject: [PATCH 2/2] pr comments --- src/lsdb/catalog/catalog.py | 2 +- src/lsdb/dask/join_catalog_data.py | 7 ++----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/lsdb/catalog/catalog.py b/src/lsdb/catalog/catalog.py index 34830739..ee9a2b55 100644 --- a/src/lsdb/catalog/catalog.py +++ b/src/lsdb/catalog/catalog.py @@ -413,7 +413,7 @@ def merge_asof( if output_catalog_name is None: output_catalog_name = ( - f"{self.hc_structure.catalog_info.catalog_name} merge_asof " + f"{self.hc_structure.catalog_info.catalog_name}_merge_asof_" f"{other.hc_structure.catalog_info.catalog_name}" ) diff --git a/src/lsdb/dask/join_catalog_data.py b/src/lsdb/dask/join_catalog_data.py index 6ef1d7e8..b3db46fc 100644 --- a/src/lsdb/dask/join_catalog_data.py +++ b/src/lsdb/dask/join_catalog_data.py @@ -260,6 +260,8 @@ def perform_merge_asof( left = filter_by_hipscat_index_to_pixel(left, right_pixel.order, right_pixel.pixel) left, right = rename_columns_with_suffixes(left, right, suffixes) + left.sort_index(inplace=True) + right.sort_index(inplace=True) merged = pd.merge_asof(left, right, left_index=True, right_index=True, direction=direction) return merged @@ -442,11 +444,6 @@ def merge_asof_catalog_data( pixel to partition index within the dataframe, and the PixelAlignment of the two input catalogs. """ - if right.margin is None: - warnings.warn( - "Right catalog does not have a margin cache. Results may be incomplete and/or inaccurate.", - RuntimeWarning, - ) alignment = align_catalogs(left, right)