diff --git a/python/cudf_polars/cudf_polars/containers/column.py b/python/cudf_polars/cudf_polars/containers/column.py index 2c83e05fe9c..f296b2dc828 100644 --- a/python/cudf_polars/cudf_polars/containers/column.py +++ b/python/cudf_polars/cudf_polars/containers/column.py @@ -26,6 +26,8 @@ import polars as pl + from cudf_polars.typing import ColumnHeader, ColumnOptions + __all__: list[str] = ["Column"] @@ -55,6 +57,65 @@ def __init__( self.name = name self.set_sorted(is_sorted=is_sorted, order=order, null_order=null_order) + @classmethod + def deserialize( + cls, header: ColumnHeader, frames: tuple[memoryview, plc.gpumemoryview] + ) -> Self: + """ + Create a Column from a serialized representation returned by `.serialize()`. + + Parameters + ---------- + header + The (unpickled) metadata required to reconstruct the object. + frames + Two-tuple of frames (a memoryview and a gpumemoryview). + + Returns + ------- + Column + The deserialized Column. + """ + packed_metadata, packed_gpu_data = frames + (plc_column,) = plc.contiguous_split.unpack_from_memoryviews( + packed_metadata, packed_gpu_data + ).columns() + return cls(plc_column, **header["column_kwargs"]) + + def serialize( + self, + ) -> tuple[ColumnHeader, tuple[memoryview, plc.gpumemoryview]]: + """ + Serialize the Column into header and frames. + + Follows the Dask serialization scheme with a picklable header (dict) and + a tuple of frames (in this case a contiguous host and device buffer). + + To enable dask support, dask serializers must be registered + + >>> from cudf_polars.experimental.dask_serialize import register + >>> register() + + Returns + ------- + header + A dict containing any picklable metadata required to reconstruct the object. + frames + Two-tuple of frames suitable for passing to `plc.contiguous_split.unpack_from_memoryviews` + """ + packed = plc.contiguous_split.pack(plc.Table([self.obj])) + column_kwargs: ColumnOptions = { + "is_sorted": self.is_sorted, + "order": self.order, + "null_order": self.null_order, + "name": self.name, + } + header: ColumnHeader = { + "column_kwargs": column_kwargs, + "frame_count": 2, + } + return header, packed.release() + @functools.cached_property def obj_scalar(self) -> plc.Scalar: """ diff --git a/python/cudf_polars/cudf_polars/containers/dataframe.py b/python/cudf_polars/cudf_polars/containers/dataframe.py index 36e0fbe370e..a605b476197 100644 --- a/python/cudf_polars/cudf_polars/containers/dataframe.py +++ b/python/cudf_polars/cudf_polars/containers/dataframe.py @@ -1,13 +1,12 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 """A dataframe, with some properties.""" from __future__ import annotations -import pickle from functools import cached_property -from typing import TYPE_CHECKING, Any, cast +from typing import TYPE_CHECKING, cast import pyarrow as pa @@ -23,6 +22,8 @@ from typing_extensions import Self + from cudf_polars.typing import ColumnOptions, DataFrameHeader + __all__: list[str] = ["DataFrame"] @@ -150,7 +151,7 @@ def from_table(cls, table: plc.Table, names: Sequence[str]) -> Self: @classmethod def deserialize( - cls, header: Mapping[str, Any], frames: tuple[memoryview, plc.gpumemoryview] + cls, header: DataFrameHeader, frames: tuple[memoryview, plc.gpumemoryview] ) -> Self: """ Create a DataFrame from a serialized representation returned by `.serialize()`. @@ -178,7 +179,7 @@ def deserialize( def serialize( self, - ) -> tuple[Mapping[str, Any], tuple[memoryview, plc.gpumemoryview]]: + ) -> tuple[DataFrameHeader, tuple[memoryview, plc.gpumemoryview]]: """ Serialize the table into header and frames. @@ -187,20 +188,20 @@ def serialize( To enable dask support, dask serializers must be registered - >>> from cudf_polars.experimental.dask_serialize import register - >>> register() + >>> from cudf_polars.experimental.dask_serialize import register + >>> register() Returns ------- header A dict containing any picklable metadata required to reconstruct the object. frames - Two-tuple of frames suitable for passing to `unpack_from_memoryviews` + Two-tuple of frames suitable for passing to `plc.contiguous_split.unpack_from_memoryviews` """ packed = plc.contiguous_split.pack(self.table) # Keyword arguments for `Column.__init__`. - columns_kwargs = [ + columns_kwargs: list[ColumnOptions] = [ { "is_sorted": col.is_sorted, "order": col.order, @@ -209,10 +210,8 @@ def serialize( } for col in self.columns ] - header = { + header: DataFrameHeader = { "columns_kwargs": columns_kwargs, - # Dask Distributed uses "type-serialized" to dispatch deserialization - "type-serialized": pickle.dumps(type(self)), "frame_count": 2, } return header, packed.release() diff --git a/python/cudf_polars/cudf_polars/experimental/dask_serialize.py b/python/cudf_polars/cudf_polars/experimental/dask_serialize.py index aae78e07690..09a9556bb31 100644 --- a/python/cudf_polars/cudf_polars/experimental/dask_serialize.py +++ b/python/cudf_polars/cudf_polars/experimental/dask_serialize.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 """Dask serialization.""" @@ -12,7 +12,7 @@ import pylibcudf as plc import rmm -from cudf_polars.containers import DataFrame +from cudf_polars.containers import Column, DataFrame __all__ = ["register"] @@ -20,8 +20,8 @@ def register() -> None: """Register dask serialization routines for DataFrames.""" - @cuda_serialize.register(DataFrame) - def _(x: DataFrame): + @cuda_serialize.register((Column, DataFrame)) + def _(x: DataFrame | Column): with log_errors(): header, frames = x.serialize() return header, list(frames) # Dask expect a list of frames @@ -32,8 +32,14 @@ def _(header, frames): assert len(frames) == 2 return DataFrame.deserialize(header, tuple(frames)) - @dask_serialize.register(DataFrame) - def _(x: DataFrame): + @cuda_deserialize.register(Column) + def _(header, frames): + with log_errors(): + assert len(frames) == 2 + return Column.deserialize(header, tuple(frames)) + + @dask_serialize.register((Column, DataFrame)) + def _(x: DataFrame | Column): with log_errors(): header, (metadata, gpudata) = x.serialize() @@ -57,3 +63,11 @@ def _(header, frames) -> DataFrame: # Copy the second frame (the gpudata in host memory) back to the gpu frames = frames[0], plc.gpumemoryview(rmm.DeviceBuffer.to_device(frames[1])) return DataFrame.deserialize(header, frames) + + @dask_deserialize.register(Column) + def _(header, frames) -> Column: + with log_errors(): + assert len(frames) == 2 + # Copy the second frame (the gpudata in host memory) back to the gpu + frames = frames[0], plc.gpumemoryview(rmm.DeviceBuffer.to_device(frames[1])) + return Column.deserialize(header, frames) diff --git a/python/cudf_polars/cudf_polars/typing/__init__.py b/python/cudf_polars/cudf_polars/typing/__init__.py index 52be130ab90..7a5795867ca 100644 --- a/python/cudf_polars/cudf_polars/typing/__init__.py +++ b/python/cudf_polars/cudf_polars/typing/__init__.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 """Typing utilities for cudf_polars.""" @@ -6,7 +6,7 @@ from __future__ import annotations from collections.abc import Hashable, Mapping -from typing import TYPE_CHECKING, Any, Literal, Protocol, TypeVar, Union +from typing import TYPE_CHECKING, Any, Literal, Protocol, TypeVar, TypedDict, Union from polars.polars import _expr_nodes as pl_expr, _ir_nodes as pl_ir @@ -145,3 +145,32 @@ def state(self) -> Mapping[str, Any]: IRTransformer: TypeAlias = GenericTransformer["ir.IR", "ir.IR"] """Protocol for transformation of IR nodes.""" + + +class ColumnOptions(TypedDict): + """ + Column constructor options. + + Notes + ----- + Used to serialize Column and DataFrame containers. + """ + + is_sorted: plc.types.Sorted + order: plc.types.Order + null_order: plc.types.NullOrder + name: str | None + + +class ColumnHeader(TypedDict): + """Column serialization header.""" + + column_kwargs: ColumnOptions + frame_count: int + + +class DataFrameHeader(TypedDict): + """DataFrame serialization header.""" + + columns_kwargs: list[ColumnOptions] + frame_count: int diff --git a/python/cudf_polars/tests/experimental/test_dask_serialize.py b/python/cudf_polars/tests/experimental/test_dask_serialize.py index e556b7e4445..e0da2e834fc 100644 --- a/python/cudf_polars/tests/experimental/test_dask_serialize.py +++ b/python/cudf_polars/tests/experimental/test_dask_serialize.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations @@ -38,3 +38,12 @@ def test_dask_serialization_roundtrip(arrow_tbl, protocol): res = deserialize(header, frames, deserializers=[protocol]) assert_frame_equal(df.to_polars(), res.to_polars()) + + # Check that we can serialize individual columns + for column in df.columns: + expect = DataFrame([column]) + + header, frames = serialize(column, on_error="raise", serializers=[protocol]) + res = deserialize(header, frames, deserializers=[protocol]) + + assert_frame_equal(expect.to_polars(), DataFrame([res]).to_polars())