Skip to content

Commit

Permalink
Add Column.serialize to cudf-polars (#17990)
Browse files Browse the repository at this point in the history
It will be useful to serialize individual columns during multi-GPU cudf-polars execution. For example, the `Expr`-decomposition approach proposed in #17941 may "require" `Column` serialization (or an ugly workaround).

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Matthew Murray (https://github.com/Matt711)
  - Lawrence Mitchell (https://github.com/wence-)

URL: #17990
  • Loading branch information
rjzamora authored Feb 13, 2025
1 parent 53eee38 commit 3fa56d0
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 21 deletions.
61 changes: 61 additions & 0 deletions python/cudf_polars/cudf_polars/containers/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

import polars as pl

from cudf_polars.typing import ColumnHeader, ColumnOptions

__all__: list[str] = ["Column"]


Expand Down Expand Up @@ -55,6 +57,65 @@ def __init__(
self.name = name
self.set_sorted(is_sorted=is_sorted, order=order, null_order=null_order)

@classmethod
def deserialize(
cls, header: ColumnHeader, frames: tuple[memoryview, plc.gpumemoryview]
) -> Self:
"""
Create a Column from a serialized representation returned by `.serialize()`.
Parameters
----------
header
The (unpickled) metadata required to reconstruct the object.
frames
Two-tuple of frames (a memoryview and a gpumemoryview).
Returns
-------
Column
The deserialized Column.
"""
packed_metadata, packed_gpu_data = frames
(plc_column,) = plc.contiguous_split.unpack_from_memoryviews(
packed_metadata, packed_gpu_data
).columns()
return cls(plc_column, **header["column_kwargs"])

def serialize(
self,
) -> tuple[ColumnHeader, tuple[memoryview, plc.gpumemoryview]]:
"""
Serialize the Column into header and frames.
Follows the Dask serialization scheme with a picklable header (dict) and
a tuple of frames (in this case a contiguous host and device buffer).
To enable dask support, dask serializers must be registered
>>> from cudf_polars.experimental.dask_serialize import register
>>> register()
Returns
-------
header
A dict containing any picklable metadata required to reconstruct the object.
frames
Two-tuple of frames suitable for passing to `plc.contiguous_split.unpack_from_memoryviews`
"""
packed = plc.contiguous_split.pack(plc.Table([self.obj]))
column_kwargs: ColumnOptions = {
"is_sorted": self.is_sorted,
"order": self.order,
"null_order": self.null_order,
"name": self.name,
}
header: ColumnHeader = {
"column_kwargs": column_kwargs,
"frame_count": 2,
}
return header, packed.release()

@functools.cached_property
def obj_scalar(self) -> plc.Scalar:
"""
Expand Down
23 changes: 11 additions & 12 deletions python/cudf_polars/cudf_polars/containers/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

"""A dataframe, with some properties."""

from __future__ import annotations

import pickle
from functools import cached_property
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, cast

import pyarrow as pa

Expand All @@ -23,6 +22,8 @@

from typing_extensions import Self

from cudf_polars.typing import ColumnOptions, DataFrameHeader


__all__: list[str] = ["DataFrame"]

Expand Down Expand Up @@ -150,7 +151,7 @@ def from_table(cls, table: plc.Table, names: Sequence[str]) -> Self:

@classmethod
def deserialize(
cls, header: Mapping[str, Any], frames: tuple[memoryview, plc.gpumemoryview]
cls, header: DataFrameHeader, frames: tuple[memoryview, plc.gpumemoryview]
) -> Self:
"""
Create a DataFrame from a serialized representation returned by `.serialize()`.
Expand Down Expand Up @@ -178,7 +179,7 @@ def deserialize(

def serialize(
self,
) -> tuple[Mapping[str, Any], tuple[memoryview, plc.gpumemoryview]]:
) -> tuple[DataFrameHeader, tuple[memoryview, plc.gpumemoryview]]:
"""
Serialize the table into header and frames.
Expand All @@ -187,20 +188,20 @@ def serialize(
To enable dask support, dask serializers must be registered
>>> from cudf_polars.experimental.dask_serialize import register
>>> register()
>>> from cudf_polars.experimental.dask_serialize import register
>>> register()
Returns
-------
header
A dict containing any picklable metadata required to reconstruct the object.
frames
Two-tuple of frames suitable for passing to `unpack_from_memoryviews`
Two-tuple of frames suitable for passing to `plc.contiguous_split.unpack_from_memoryviews`
"""
packed = plc.contiguous_split.pack(self.table)

# Keyword arguments for `Column.__init__`.
columns_kwargs = [
columns_kwargs: list[ColumnOptions] = [
{
"is_sorted": col.is_sorted,
"order": col.order,
Expand All @@ -209,10 +210,8 @@ def serialize(
}
for col in self.columns
]
header = {
header: DataFrameHeader = {
"columns_kwargs": columns_kwargs,
# Dask Distributed uses "type-serialized" to dispatch deserialization
"type-serialized": pickle.dumps(type(self)),
"frame_count": 2,
}
return header, packed.release()
Expand Down
26 changes: 20 additions & 6 deletions python/cudf_polars/cudf_polars/experimental/dask_serialize.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

"""Dask serialization."""
Expand All @@ -12,16 +12,16 @@
import pylibcudf as plc
import rmm

from cudf_polars.containers import DataFrame
from cudf_polars.containers import Column, DataFrame

__all__ = ["register"]


def register() -> None:
"""Register dask serialization routines for DataFrames."""

@cuda_serialize.register(DataFrame)
def _(x: DataFrame):
@cuda_serialize.register((Column, DataFrame))
def _(x: DataFrame | Column):
with log_errors():
header, frames = x.serialize()
return header, list(frames) # Dask expect a list of frames
Expand All @@ -32,8 +32,14 @@ def _(header, frames):
assert len(frames) == 2
return DataFrame.deserialize(header, tuple(frames))

@dask_serialize.register(DataFrame)
def _(x: DataFrame):
@cuda_deserialize.register(Column)
def _(header, frames):
with log_errors():
assert len(frames) == 2
return Column.deserialize(header, tuple(frames))

@dask_serialize.register((Column, DataFrame))
def _(x: DataFrame | Column):
with log_errors():
header, (metadata, gpudata) = x.serialize()

Expand All @@ -57,3 +63,11 @@ def _(header, frames) -> DataFrame:
# Copy the second frame (the gpudata in host memory) back to the gpu
frames = frames[0], plc.gpumemoryview(rmm.DeviceBuffer.to_device(frames[1]))
return DataFrame.deserialize(header, frames)

@dask_deserialize.register(Column)
def _(header, frames) -> Column:
with log_errors():
assert len(frames) == 2
# Copy the second frame (the gpudata in host memory) back to the gpu
frames = frames[0], plc.gpumemoryview(rmm.DeviceBuffer.to_device(frames[1]))
return Column.deserialize(header, frames)
33 changes: 31 additions & 2 deletions python/cudf_polars/cudf_polars/typing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

"""Typing utilities for cudf_polars."""

from __future__ import annotations

from collections.abc import Hashable, Mapping
from typing import TYPE_CHECKING, Any, Literal, Protocol, TypeVar, Union
from typing import TYPE_CHECKING, Any, Literal, Protocol, TypeVar, TypedDict, Union

from polars.polars import _expr_nodes as pl_expr, _ir_nodes as pl_ir

Expand Down Expand Up @@ -145,3 +145,32 @@ def state(self) -> Mapping[str, Any]:

IRTransformer: TypeAlias = GenericTransformer["ir.IR", "ir.IR"]
"""Protocol for transformation of IR nodes."""


class ColumnOptions(TypedDict):
"""
Column constructor options.
Notes
-----
Used to serialize Column and DataFrame containers.
"""

is_sorted: plc.types.Sorted
order: plc.types.Order
null_order: plc.types.NullOrder
name: str | None


class ColumnHeader(TypedDict):
"""Column serialization header."""

column_kwargs: ColumnOptions
frame_count: int


class DataFrameHeader(TypedDict):
"""DataFrame serialization header."""

columns_kwargs: list[ColumnOptions]
frame_count: int
11 changes: 10 additions & 1 deletion python/cudf_polars/tests/experimental/test_dask_serialize.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations
Expand Down Expand Up @@ -38,3 +38,12 @@ def test_dask_serialization_roundtrip(arrow_tbl, protocol):
res = deserialize(header, frames, deserializers=[protocol])

assert_frame_equal(df.to_polars(), res.to_polars())

# Check that we can serialize individual columns
for column in df.columns:
expect = DataFrame([column])

header, frames = serialize(column, on_error="raise", serializers=[protocol])
res = deserialize(header, frames, deserializers=[protocol])

assert_frame_equal(expect.to_polars(), DataFrame([res]).to_polars())

0 comments on commit 3fa56d0

Please sign in to comment.