Skip to content

Commit

Permalink
feat: Add join_asof support for pandas and dask (#911)
Browse files Browse the repository at this point in the history
  • Loading branch information
raisadz authored Sep 6, 2024
1 parent fdc8f88 commit 029f590
Show file tree
Hide file tree
Showing 7 changed files with 367 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/api-reference/dataframe.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
- item
- iter_rows
- join
- join_asof
- lazy
- null_count
- pipe
Expand Down
1 change: 1 addition & 0 deletions docs/api-reference/lazyframe.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- group_by
- head
- join
- join_asof
- lazy
- pipe
- rename
Expand Down
11 changes: 11 additions & 0 deletions narwhals/_arrow/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,17 @@ def join(
),
)

def join_asof(
self,
other: Self,
*,
left_on: str,
right_on: str,
strategy: Literal["backward", "forward", "nearest"] = "backward",
) -> Self:
msg = "join_asof is not yet supported on PyArrow tables"
raise NotImplementedError(msg)

def drop(self: Self, columns: list[str], strict: bool) -> Self: # noqa: FBT001
to_drop = parse_columns_to_drop(
compliant_frame=self, columns=columns, strict=strict
Expand Down
20 changes: 20 additions & 0 deletions narwhals/_dask/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,26 @@ def join(
),
)

def join_asof(
self,
other: Self,
*,
left_on: str,
right_on: str,
strategy: Literal["backward", "forward", "nearest"] = "backward",
) -> Self:
plx = self.__native_namespace__()
return self._from_native_frame(
plx.merge_asof(
self._native_frame,
other._native_frame,
left_on=left_on,
right_on=right_on,
direction=strategy,
suffixes=("", "_right"),
),
)

def group_by(self, *by: str) -> DaskLazyGroupBy:
from narwhals._dask.group_by import DaskLazyGroupBy

Expand Down
20 changes: 20 additions & 0 deletions narwhals/_pandas_like/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,26 @@ def join(
),
)

def join_asof(
self,
other: Self,
*,
left_on: str,
right_on: str,
strategy: Literal["backward", "forward", "nearest"] = "backward",
) -> Self:
plx = self.__native_namespace__()
return self._from_native_frame(
plx.merge_asof(
self._native_frame,
other._native_frame,
left_on=left_on,
right_on=right_on,
direction=strategy,
suffixes=("", "_right"),
),
)

# --- partial reduction ---

def head(self, n: int) -> Self:
Expand Down
202 changes: 202 additions & 0 deletions narwhals/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,29 @@ def gather_every(self: Self, n: int, offset: int = 0) -> Self:
self._compliant_frame.gather_every(n=n, offset=offset)
)

def join_asof(
self,
other: Self,
*,
left_on: str,
right_on: str,
strategy: Literal["backward", "forward", "nearest"] = "backward",
) -> Self:
_supported_strategies = ("backward", "forward", "nearest")

if strategy not in _supported_strategies:
msg = f"Only the following strategies are supported: {_supported_strategies}; found '{strategy}'."
raise NotImplementedError(msg)

return self._from_compliant_dataframe(
self._compliant_frame.join_asof(
self._extract_compliant(other),
left_on=left_on,
right_on=right_on,
strategy=strategy,
)
)


class DataFrame(BaseFrame[FrameT]):
"""
Expand Down Expand Up @@ -1839,6 +1862,96 @@ def join(
"""
return super().join(other, how=how, left_on=left_on, right_on=right_on)

def join_asof(
self,
other: Self,
*,
left_on: str,
right_on: str,
strategy: Literal["backward", "forward", "nearest"] = "backward",
) -> Self:
"""
Perform an asof join.
This is similar to a left-join except that we match on nearest key rather than equal keys.
Both DataFrames must be sorted by the asof_join key.
Arguments:
other: DataFrame to join with.
left_on: Name(s) of the left join column(s).
right_on: Name(s) of the right join column(s).
strategy: Join strategy. The default is "backward".
* *backward*: selects the last row in the right DataFrame whose "on" key is less than or equal to the left's key.
* *forward*: selects the first row in the right DataFrame whose "on" key is greater than or equal to the left's key.
* *nearest*: search selects the last row in the right DataFrame whose value is nearest to the left's key.
Returns:
A new joined DataFrame
Examples:
>>> from datetime import datetime
>>> import narwhals as nw
>>> import pandas as pd
>>> import polars as pl
>>> data_gdp = {
... "datetime": [
... datetime(2016, 1, 1),
... datetime(2017, 1, 1),
... datetime(2018, 1, 1),
... datetime(2019, 1, 1),
... datetime(2020, 1, 1),
... ],
... "gdp": [4164, 4411, 4566, 4696, 4827],
... }
>>> data_population = {
... "datetime": [
... datetime(2016, 3, 1),
... datetime(2018, 8, 1),
... datetime(2019, 1, 1),
... ],
... "population": [82.19, 82.66, 83.12],
... }
>>> gdp_pd = pd.DataFrame(data_gdp)
>>> population_pd = pd.DataFrame(data_population)
>>> gdp_pl = pl.DataFrame(data_gdp).sort("datetime")
>>> population_pl = pl.DataFrame(data_population).sort("datetime")
Let's define a dataframe-agnostic function in which we join over "datetime" column:
>>> @nw.narwhalify
... def join_asof_date(df, other_any, strategy):
... return df.join_asof(
... other_any, left_on="datetime", right_on="datetime", strategy=strategy
... )
>>> # We can now pass either pandas or Polars to the function:
>>> join_asof_date(population_pd, gdp_pd, strategy="backward")
datetime population gdp
0 2016-03-01 82.19 4164
1 2018-08-01 82.66 4566
2 2019-01-01 83.12 4696
>>> join_asof_date(population_pl, gdp_pl, strategy="backward")
shape: (3, 3)
┌─────────────────────┬────────────┬──────┐
│ datetime ┆ population ┆ gdp │
│ --- ┆ --- ┆ --- │
│ datetime[μs] ┆ f64 ┆ i64 │
╞═════════════════════╪════════════╪══════╡
│ 2016-03-01 00:00:00 ┆ 82.19 ┆ 4164 │
│ 2018-08-01 00:00:00 ┆ 82.66 ┆ 4566 │
│ 2019-01-01 00:00:00 ┆ 83.12 ┆ 4696 │
└─────────────────────┴────────────┴──────┘
"""
return super().join_asof(
other, left_on=left_on, right_on=right_on, strategy=strategy
)

# --- descriptive ---
def is_duplicated(self: Self) -> Series:
r"""
Expand Down Expand Up @@ -3378,6 +3491,95 @@ def join(
"""
return super().join(other, how=how, left_on=left_on, right_on=right_on)

def join_asof(
self,
other: Self,
*,
left_on: str,
right_on: str,
strategy: Literal["backward", "forward", "nearest"] = "backward",
) -> Self:
"""
Perform an asof join.
This is similar to a left-join except that we match on nearest key rather than equal keys.
Both DataFrames must be sorted by the asof_join key.
Arguments:
other: DataFrame to join with.
left_on: Name(s) of the left join column(s).
right_on: Name(s) of the right join column(s).
strategy: Join strategy. The default is "backward".
* *backward*: selects the last row in the right DataFrame whose "on" key is less than or equal to the left's key.
* *forward*: selects the first row in the right DataFrame whose "on" key is greater than or equal to the left's key.
* *nearest*: search selects the last row in the right DataFrame whose value is nearest to the left's key.
Returns:
A new joined DataFrame
Examples:
>>> from datetime import datetime
>>> import narwhals as nw
>>> import pandas as pd
>>> import polars as pl
>>> data_gdp = {
... "datetime": [
... datetime(2016, 1, 1),
... datetime(2017, 1, 1),
... datetime(2018, 1, 1),
... datetime(2019, 1, 1),
... datetime(2020, 1, 1),
... ],
... "gdp": [4164, 4411, 4566, 4696, 4827],
... }
>>> data_population = {
... "datetime": [
... datetime(2016, 3, 1),
... datetime(2018, 8, 1),
... datetime(2019, 1, 1),
... ],
... "population": [82.19, 82.66, 83.12],
... }
>>> gdp_pd = pd.DataFrame(data_gdp)
>>> population_pd = pd.DataFrame(data_population)
>>> gdp_pl = pl.LazyFrame(data_gdp).sort("datetime")
>>> population_pl = pl.LazyFrame(data_population).sort("datetime")
Let's define a dataframe-agnostic function in which we join over "datetime" column:
>>> @nw.narwhalify
... def join_asof_date(df, other_any, strategy):
... return df.join_asof(
... other_any, left_on="datetime", right_on="datetime", strategy=strategy
... )
>>> # We can now pass either pandas or Polars to the function:
>>> join_asof_date(population_pd, gdp_pd, strategy="backward")
datetime population gdp
0 2016-03-01 82.19 4164
1 2018-08-01 82.66 4566
2 2019-01-01 83.12 4696
>>> join_asof_date(population_pl, gdp_pl, strategy="backward").collect()
shape: (3, 3)
┌─────────────────────┬────────────┬──────┐
│ datetime ┆ population ┆ gdp │
│ --- ┆ --- ┆ --- │
│ datetime[μs] ┆ f64 ┆ i64 │
╞═════════════════════╪════════════╪══════╡
│ 2016-03-01 00:00:00 ┆ 82.19 ┆ 4164 │
│ 2018-08-01 00:00:00 ┆ 82.66 ┆ 4566 │
│ 2019-01-01 00:00:00 ┆ 83.12 ┆ 4696 │
└─────────────────────┴────────────┴──────┘
"""
return super().join_asof(
other, left_on=left_on, right_on=right_on, strategy=strategy
)

def clone(self) -> Self:
r"""
Create a copy of this DataFrame.
Expand Down
Loading

0 comments on commit 029f590

Please sign in to comment.