Skip to content

Use official to_backend API for GPU/CPU backend conversion #375

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions merlin/core/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,9 @@ def convert_data(x, cpu=True, to_collection=None, npartitions=1):
if isinstance(x, dd.DataFrame):
# If input is a dask_cudf collection, convert
# to a pandas-backed Dask collection
if hasattr(x, "to_backend"):
# Requires dask>=2023.1.1
return x.to_backend("pandas")
if cudf is None or not isinstance(x, dask_cudf.DataFrame):
# Already a Pandas-backed collection
return x
Expand All @@ -676,6 +679,9 @@ def convert_data(x, cpu=True, to_collection=None, npartitions=1):
return dd.from_pandas(_x, sort=False, npartitions=npartitions) if to_collection else _x
elif cudf and dask_cudf:
if isinstance(x, dd.DataFrame):
if hasattr(x, "to_backend"):
# Requires dask>=2023.1.1
return x.to_backend("cudf")
# If input is a Dask collection, convert to dask_cudf
if isinstance(x, dask_cudf.DataFrame):
# Already a cudf-backed Dask collection
Expand Down
24 changes: 21 additions & 3 deletions merlin/io/dataframe_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@ def to_ddf(self, columns=None, cpu=None):
cpu = self.cpu if cpu is None else cpu

# Move data from gpu to cpu if necessary
_ddf = self._move_ddf("cpu") if (cpu and not self.cpu) else self._ddf
if hasattr(self._ddf, "to_backend"):
# Requires dask>=2023.1.1
if cpu:
_ddf = self._ddf.to_backend("pandas")
elif cpu is False:
_ddf = self._ddf.to_backend("cudf")
else:
_ddf = self._ddf
else:
_ddf = self._move_ddf("cpu") if (cpu and not self.cpu) else self._ddf

if isinstance(columns, list):
return _ddf[columns]
Expand All @@ -49,14 +58,22 @@ def to_ddf(self, columns=None, cpu=None):
def to_cpu(self):
if self.cpu:
return
self._ddf = self._move_ddf("cpu")
if hasattr(self._ddf, "to_backend"):
# Requires dask>=2023.1.1
self._ddf = self._ddf.to_backend("pandas")
else:
self._ddf = self._move_ddf("cpu")
self.cpu = True
self.moved_collection = not self.moved_collection

def to_gpu(self):
if not self.cpu:
return
self._ddf = self._move_ddf("gpu")
if hasattr(self._ddf, "to_backend"):
# Requires dask>=2023.1.1
self._ddf = self._ddf.to_backend("cudf")
else:
self._ddf = self._move_ddf("gpu")
self.cpu = False
self.moved_collection = not self.moved_collection

Expand All @@ -66,6 +83,7 @@ def num_rows(self):

def _move_ddf(self, destination):
"""Move the collection between cpu and gpu memory."""
# TODO: Remove this method when we pin to dask>=2013.1.1
_ddf = self._ddf
if (
self.moved_collection
Expand Down
Loading