Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add merge_asof function to catalog #409

Merged
merged 2 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions src/lsdb/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
join_catalog_data_nested,
join_catalog_data_on,
join_catalog_data_through,
merge_asof_catalog_data,
)
from lsdb.dask.partition_indexer import PartitionIndexer
from lsdb.io.schema import get_arrow_schema
Expand Down Expand Up @@ -378,6 +379,53 @@
suffixes=suffixes,
)

def merge_asof(
self,
other: Catalog,
direction: str = "backward",
suffixes: Tuple[str, str] | None = None,
output_catalog_name: str | None = None,
):
"""Uses the pandas `merge_asof` function to merge two catalogs on their indices by distance of keys

Must be along catalog indices, and does not include margin caches, meaning results may be incomplete
for merging points.

This function is intended for use in special cases such as Dust Map Catalogs, for general merges,
the `crossmatch`and `join` functions should be used.

Args:
other (lsdb.Catalog): the right catalog to merge to
suffixes (Tuple[str,str]): the suffixes to apply to each partition's column names
direction (str): the direction to perform the merge_asof

Returns:
A new catalog with the columns from each of the input catalogs with their respective suffixes
added, and the rows merged using merge_asof on the specified columns.
"""
if suffixes is None:
suffixes = (f"_{self.name}", f"_{other.name}")

Check warning on line 407 in src/lsdb/catalog/catalog.py

View check run for this annotation

Codecov / codecov/patch

src/lsdb/catalog/catalog.py#L407

Added line #L407 was not covered by tests

if len(suffixes) != 2:
raise ValueError("`suffixes` must be a tuple with two strings")

Check warning on line 410 in src/lsdb/catalog/catalog.py

View check run for this annotation

Codecov / codecov/patch

src/lsdb/catalog/catalog.py#L410

Added line #L410 was not covered by tests

ddf, ddf_map, alignment = merge_asof_catalog_data(self, other, suffixes=suffixes, direction=direction)

if output_catalog_name is None:
output_catalog_name = (
f"{self.hc_structure.catalog_info.catalog_name}_merge_asof_"
f"{other.hc_structure.catalog_info.catalog_name}"
)

new_catalog_info = dataclasses.replace(
self.hc_structure.catalog_info,
catalog_name=output_catalog_name,
ra_column=self.hc_structure.catalog_info.ra_column + suffixes[0],
dec_column=self.hc_structure.catalog_info.dec_column + suffixes[0],
)
hc_catalog = hc.catalog.Catalog(new_catalog_info, alignment.pixel_tree, schema=get_arrow_schema(ddf))
return Catalog(ddf, ddf_map, hc_catalog)

def join(
self,
other: Catalog,
Expand Down
76 changes: 75 additions & 1 deletion src/lsdb/dask/join_catalog_data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# pylint: disable=duplicate-code

from __future__ import annotations

import warnings
Expand All @@ -8,6 +7,7 @@
import dask
import nested_dask as nd
import nested_pandas as npd
import pandas as pd
from hipscat.catalog.association_catalog import AssociationCatalogInfo
from hipscat.catalog.catalog_info import CatalogInfo
from hipscat.catalog.margin_cache import MarginCacheCatalogInfo
Expand Down Expand Up @@ -228,6 +228,44 @@ def perform_join_through(
return merged


# pylint: disable=too-many-arguments, unused-argument
@dask.delayed
def perform_merge_asof(
left: npd.NestedFrame,
right: npd.NestedFrame,
left_pixel: HealpixPixel,
right_pixel: HealpixPixel,
left_catalog_info: CatalogInfo,
right_catalog_info: CatalogInfo,
suffixes: Tuple[str, str],
direction: str,
):
"""Performs a merge_asof on two catalog partitions

Args:
left (npd.NestedFrame): the left partition to merge
right (npd.NestedFrame): the right partition to merge
left_pixel (HealpixPixel): the HEALPix pixel of the left partition
right_pixel (HealpixPixel): the HEALPix pixel of the right partition
left_catalog_info (hc.CatalogInfo): the catalog info of the left catalog
right_catalog_info (hc.CatalogInfo): the catalog info of the right catalog
suffixes (Tuple[str,str]): the suffixes to apply to each partition's column names
direction (str): The direction to perform the merge_asof

Returns:
A dataframe with the result of merging the left and right partitions on the specified columns with
`merge_asof`
"""
if right_pixel.order > left_pixel.order:
left = filter_by_hipscat_index_to_pixel(left, right_pixel.order, right_pixel.pixel)

left, right = rename_columns_with_suffixes(left, right, suffixes)
left.sort_index(inplace=True)
right.sort_index(inplace=True)
merged = pd.merge_asof(left, right, left_index=True, right_index=True, direction=direction)
smcguire-cmu marked this conversation as resolved.
Show resolved Hide resolved
return merged


def join_catalog_data_on(
left: Catalog, right: Catalog, left_on: str, right_on: str, suffixes: Tuple[str, str]
) -> Tuple[nd.NestedFrame, DaskDFPixelMap, PixelAlignment]:
Expand Down Expand Up @@ -382,3 +420,39 @@ def join_catalog_data_through(
meta_df = generate_meta_df_for_joined_tables([left, extra_df, right], [suffixes[0], "", suffixes[1]])

return construct_catalog_args(joined_partitions, meta_df, alignment)


def merge_asof_catalog_data(
left: Catalog, right: Catalog, suffixes: Tuple[str, str], direction: str = "backward"
) -> Tuple[nd.NestedFrame, DaskDFPixelMap, PixelAlignment]:
"""Uses the pandas `merge_asof` function to merge two catalogs on their indices by distance of keys

Must be along catalog indices, and does not include margin caches, meaning results may be incomplete for
merging points.

This function is intended for use in special cases such as Dust Map Catalogs, for general merges,
the `crossmatch`and `join` functions should be used.

Args:
left (lsdb.Catalog): the left catalog to join
right (lsdb.Catalog): the right catalog to join
suffixes (Tuple[str,str]): the suffixes to apply to each partition's column names
direction (str): the direction to perform the merge_asof

Returns:
A tuple of the dask dataframe with the result of the join, the pixel map from HEALPix
pixel to partition index within the dataframe, and the PixelAlignment of the two input
catalogs.
"""

alignment = align_catalogs(left, right)

left_pixels, right_pixels = get_healpix_pixels_from_alignment(alignment)

joined_partitions = align_and_apply(
[(left, left_pixels), (right, right_pixels)], perform_merge_asof, suffixes, direction
)

meta_df = generate_meta_df_for_joined_tables([left, right], suffixes)

return construct_catalog_args(joined_partitions, meta_df, alignment)
33 changes: 32 additions & 1 deletion tests/lsdb/catalog/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np
import pandas as pd
import pytest
from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN
from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN, hipscat_id_to_healpix


def test_small_sky_join_small_sky_order1(
Expand Down Expand Up @@ -214,3 +214,34 @@ def test_join_nested(small_sky_catalog, small_sky_order1_source_with_margin, ass
check_column_type=False,
check_index_type=False,
)


def test_merge_asof(small_sky_catalog, small_sky_xmatch_catalog, assert_divisions_are_correct):
suffixes = ("_a", "_b")
for direction in ["backward", "forward", "nearest"]:
joined = small_sky_catalog.merge_asof(
small_sky_xmatch_catalog, direction=direction, suffixes=suffixes
)
assert isinstance(joined._ddf, nd.NestedFrame)
assert_divisions_are_correct(joined)
joined_compute = joined.compute()
assert isinstance(joined_compute, npd.NestedFrame)
small_sky_compute = small_sky_catalog.compute().rename(
columns={c: c + suffixes[0] for c in small_sky_catalog.columns}
)
order_1_partition = hipscat_id_to_healpix(small_sky_compute.index.to_numpy(), 1)
left_partitions = [
small_sky_compute[order_1_partition == p.pixel]
for p in small_sky_xmatch_catalog.get_healpix_pixels()
]
small_sky_order1_partitions = [
p.compute().rename(columns={c: c + suffixes[1] for c in small_sky_xmatch_catalog.columns})
for p in small_sky_xmatch_catalog.partitions
]
correct_result = pd.concat(
[
pd.merge_asof(lp, rp, direction=direction, left_index=True, right_index=True)
for lp, rp in zip(left_partitions, small_sky_order1_partitions)
]
)
pd.testing.assert_frame_equal(joined_compute, correct_result)