Skip to content

Commit

Permalink
Implement Python drop_duplicates with cudf::stable_distinct. (#11656)
Browse files Browse the repository at this point in the history
Depends on #13392.

Closes #11638
Closes #12449
Closes #11230
Closes #5286

This PR re-implements Python's `DataFrame.drop_duplicates` / `Series.drop_duplicates` to use the `stable_distinct` algorithm.

This fixed a large number of issues with correctness (ordering the same way as pandas) and also improves performance by eliminating a sorting step.

As a consequence of changing the behavior of `drop_duplicates`, a lot of refactoring was needed. The `drop_duplicates` function was used to implement `unique()`, which cascaded into changes for several groupby functions, one-hot encoding, `np.unique` array function dispatches, and more. Those downstream functions relied on the sorting order of `drop_duplicates` and `unique`, which is _not_ promised by pandas.

Authors:
  - https://github.com/brandon-b-miller
  - Bradley Dice (https://github.com/bdice)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Matthew Roeschke (https://github.com/mroeschke)
  - Nghia Truong (https://github.com/ttnghia)

URL: #11656
  • Loading branch information
brandon-b-miller authored May 23, 2023
1 parent 94bdca6 commit 4fdb60d
Show file tree
Hide file tree
Showing 12 changed files with 140 additions and 144 deletions.
16 changes: 9 additions & 7 deletions python/cudf/cudf/_lib/cpp/stream_compaction.pxd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
# Copyright (c) 2020-2023, NVIDIA CORPORATION.

from libcpp cimport bool
from libcpp.memory cimport unique_ptr
Expand All @@ -20,6 +20,7 @@ from cudf._lib.cpp.types cimport (
cdef extern from "cudf/stream_compaction.hpp" namespace "cudf" \
nogil:
ctypedef enum duplicate_keep_option:
KEEP_ANY 'cudf::duplicate_keep_option::KEEP_ANY'
KEEP_FIRST 'cudf::duplicate_keep_option::KEEP_FIRST'
KEEP_LAST 'cudf::duplicate_keep_option::KEEP_LAST'
KEEP_NONE 'cudf::duplicate_keep_option::KEEP_NONE'
Expand All @@ -33,13 +34,14 @@ cdef extern from "cudf/stream_compaction.hpp" namespace "cudf" \
column_view boolean_mask
) except +

cdef unique_ptr[table] unique(
table_view source_table,
vector[size_type] keys,
duplicate_keep_option keep,
null_equality nulls_equal) except +

cdef size_type distinct_count(
column_view source_table,
null_policy null_handling,
nan_policy nan_handling) except +

cdef unique_ptr[table] stable_distinct(
table_view input,
vector[size_type] keys,
duplicate_keep_option keep,
null_equality nulls_equal,
) except +
39 changes: 4 additions & 35 deletions python/cudf/cudf/_lib/stream_compaction.pyx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
# Copyright (c) 2020-2023, NVIDIA CORPORATION.

from cudf.core.buffer import acquire_spill_lock

Expand All @@ -9,22 +9,19 @@ from libcpp.vector cimport vector

from cudf._lib.column cimport Column
from cudf._lib.cpp.column.column_view cimport column_view
from cudf._lib.cpp.sorting cimport stable_sort_by_key as cpp_stable_sort_by_key
from cudf._lib.cpp.stream_compaction cimport (
apply_boolean_mask as cpp_apply_boolean_mask,
distinct_count as cpp_distinct_count,
drop_nulls as cpp_drop_nulls,
duplicate_keep_option,
unique as cpp_unique,
stable_distinct as cpp_stable_distinct,
)
from cudf._lib.cpp.table.table cimport table
from cudf._lib.cpp.table.table_view cimport table_view
from cudf._lib.cpp.types cimport (
nan_policy,
null_equality,
null_order,
null_policy,
order,
size_type,
)
from cudf._lib.utils cimport columns_from_unique_ptr, table_view_from_columns
Expand Down Expand Up @@ -145,41 +142,13 @@ def drop_duplicates(list columns,
if nulls_are_equal
else null_equality.UNEQUAL
)

cdef vector[order] column_order = (
vector[order](
cpp_keys.size(),
order.ASCENDING
)
)
cdef vector[null_order] null_precedence = (
vector[null_order](
cpp_keys.size(),
null_order.BEFORE
)
)

cdef table_view source_table_view = table_view_from_columns(columns)
cdef table_view keys_view = source_table_view.select(cpp_keys)
cdef unique_ptr[table] sorted_source_table
cdef unique_ptr[table] c_result

with nogil:
# cudf::unique keeps unique rows in each consecutive group of
# equivalent rows. To match the behavior of pandas.DataFrame.
# drop_duplicates, users need to stable sort the input first
# and then invoke cudf::unique.
sorted_source_table = move(
cpp_stable_sort_by_key(
source_table_view,
keys_view,
column_order,
null_precedence
)
)
c_result = move(
cpp_unique(
sorted_source_table.get().view(),
cpp_stable_distinct(
source_table_view,
cpp_keys,
cpp_keep_option,
cpp_nulls_equal
Expand Down
7 changes: 6 additions & 1 deletion python/cudf/cudf/core/_base_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -1476,7 +1476,12 @@ def __array_function__(self, func, types, args, kwargs):
if cudf_func is func:
return NotImplemented
else:
return cudf_func(*args, **kwargs)
result = cudf_func(*args, **kwargs)
if fname == "unique":
# NumPy expects a sorted result for `unique`, which is not
# guaranteed by cudf.Index.unique.
result = result.sort_values()
return result

else:
return NotImplemented
Expand Down
20 changes: 11 additions & 9 deletions python/cudf/cudf/core/column/categorical.py
Original file line number Diff line number Diff line change
Expand Up @@ -1042,8 +1042,15 @@ def data_array_view(
) -> cuda.devicearray.DeviceNDArray:
return self.codes.data_array_view(mode=mode)

def unique(self, preserve_order=False) -> CategoricalColumn:
codes = self.as_numerical.unique(preserve_order=preserve_order)
def unique(self, preserve_order=True) -> CategoricalColumn:
if preserve_order is not True:
warnings.warn(
"The preserve_order argument is deprecated. It will be "
"removed in a future version. As of now, unique always "
"preserves order regardless of the argument's value.",
FutureWarning,
)
codes = self.as_numerical.unique()
return column.build_categorical_column(
categories=self.categories,
codes=column.build_column(codes.base_data, dtype=codes.dtype),
Expand Down Expand Up @@ -1397,9 +1404,7 @@ def _concat(
head = next((obj for obj in objs if obj.valid_count), objs[0])

# Combine and de-dupe the categories
cats = column.concat_columns([o.categories for o in objs]).unique(
preserve_order=True
)
cats = column.concat_columns([o.categories for o in objs]).unique()
objs = [o._set_categories(cats, is_unique=True) for o in objs]
codes = [o.codes for o in objs]

Expand Down Expand Up @@ -1538,10 +1543,7 @@ def _set_categories(

# Ensure new_categories is unique first
if not (is_unique or new_cats.is_unique):
# drop_duplicates() instead of unique() to preserve order
new_cats = cudf.Series(new_cats)._column.unique(
preserve_order=True
)
new_cats = cudf.Series(new_cats)._column.unique()

cur_codes = self.codes
max_cat_size = (
Expand Down
29 changes: 10 additions & 19 deletions python/cudf/cudf/core/column/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -1021,17 +1021,16 @@ def as_categorical_column(self, dtype, **kwargs) -> ColumnBase:
ordered=dtype.ordered,
)

cats = self.unique().astype(self.dtype)
# Categories must be unique and sorted in ascending order.
cats = self.unique().sort_by_values()[0].astype(self.dtype)
label_dtype = min_unsigned_type(len(cats))
labels = self._label_encoding(
cats=cats, dtype=label_dtype, na_sentinel=cudf.Scalar(1)
)

# columns include null index in factorization; remove:
if self.has_nulls():
cats = cats.dropna(drop_nan=False)
min_type = min_unsigned_type(len(cats), 8)
labels = labels - 1
if cudf.dtype(min_type).itemsize < labels.dtype.itemsize:
labels = labels.astype(min_type)

Expand Down Expand Up @@ -1132,25 +1131,17 @@ def searchsorted(
values, side, ascending=ascending, na_position=na_position
)

def unique(self, preserve_order=False) -> ColumnBase:
def unique(self, preserve_order=True) -> ColumnBase:
"""
Get unique values in the data
"""
# TODO: We could avoid performing `drop_duplicates` for
# columns with values that already are unique.
# Few things to note before we can do this optimization is
# the following issue resolved:
# https://github.com/rapidsai/cudf/issues/5286
if preserve_order:
ind = as_column(cupy.arange(0, len(self)))

# dedup based on the column of data only
ind, col = drop_duplicates([ind, self], keys=[1])

# sort col based on ind
map = ind.argsort()
return col.take(map)

if preserve_order is not True:
warnings.warn(
"The preserve_order argument is deprecated. It will be "
"removed in a future version. As of now, unique always "
"preserves order regardless of the argument's value.",
FutureWarning,
)
return drop_duplicates([self], keep="first")[0]

def serialize(self) -> Tuple[dict, list]:
Expand Down
56 changes: 33 additions & 23 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3165,34 +3165,46 @@ def diff(self, periods=1, axis=0):

@_cudf_nvtx_annotate
def drop_duplicates(
self, subset=None, keep="first", inplace=False, ignore_index=False
self,
subset=None,
keep="first",
inplace=False,
ignore_index=False,
):
"""
Return DataFrame with duplicate rows removed, optionally only
considering certain subset of columns.
Return DataFrame with duplicate rows removed.
Considering certain columns is optional. Indexes, including time
indexes are ignored.
Parameters
----------
subset : column label or sequence of labels, optional
Only consider certain columns for identifying duplicates, by
default use all of the columns.
keep : {'first', 'last', False}, default 'first'
keep : {'first', 'last', ``False``}, default 'first'
Determines which duplicates (if any) to keep.
- ``first`` : Drop duplicates except for the first occurrence.
- ``last`` : Drop duplicates except for the last occurrence.
- False : Drop all duplicates.
inplace : bool, default False
- 'first' : Drop duplicates except for the first occurrence.
- 'last' : Drop duplicates except for the last occurrence.
- ``False`` : Drop all duplicates.
inplace : bool, default ``False``
Whether to drop duplicates in place or to return a copy.
ignore_index : bool, default False
If True, the resulting axis will be labeled 0, 1, , n - 1.
ignore_index : bool, default ``False``
If True, the resulting axis will be labeled 0, 1, ..., n - 1.
Returns
-------
DataFrame or None
DataFrame with duplicates removed or None if ``inplace=True``.
See Also
--------
DataFrame.value_counts: Count unique combinations of columns.
Examples
--------
Consider a dataset containing ramen ratings.
>>> import cudf
>>> df = cudf.DataFrame({
... 'brand': ['Yum Yum', 'Yum Yum', 'Indomie', 'Indomie', 'Indomie'],
Expand All @@ -3207,36 +3219,34 @@ def drop_duplicates(
3 Indomie pack 15.0
4 Indomie pack 5.0
By default, it removes duplicate rows based
on all columns. Note that order of
the rows being returned is not guaranteed
to be sorted.
By default, it removes duplicate rows based on all columns.
>>> df.drop_duplicates()
brand style rating
0 Yum Yum cup 4.0
2 Indomie cup 3.5
4 Indomie pack 5.0
3 Indomie pack 15.0
0 Yum Yum cup 4.0
4 Indomie pack 5.0
To remove duplicates on specific column(s),
use `subset`.
To remove duplicates on specific column(s), use ``subset``.
>>> df.drop_duplicates(subset=['brand'])
brand style rating
2 Indomie cup 3.5
0 Yum Yum cup 4.0
2 Indomie cup 3.5
To remove duplicates and keep last occurrences, use `keep`.
To remove duplicates and keep last occurrences, use ``keep``.
>>> df.drop_duplicates(subset=['brand', 'style'], keep='last')
brand style rating
1 Yum Yum cup 4.0
2 Indomie cup 3.5
4 Indomie pack 5.0
1 Yum Yum cup 4.0
""" # noqa: E501
outdf = super().drop_duplicates(
subset=subset, keep=keep, ignore_index=ignore_index
subset=subset,
keep=keep,
ignore_index=ignore_index,
)

return self._mimic_inplace(outdf, inplace=inplace)
Expand Down Expand Up @@ -7693,7 +7703,7 @@ def _find_common_dtypes_and_categories(non_null_columns, dtypes):
# Combine and de-dupe the categories
categories[idx] = cudf.Series(
concat_columns([col.categories for col in cols])
)._column.unique(preserve_order=True)
)._column.unique()
# Set the column dtype to the codes' dtype. The categories
# will be re-assigned at the end
dtypes[idx] = min_scalar_type(len(categories[idx]))
Expand Down
26 changes: 14 additions & 12 deletions python/cudf/cudf/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ def dtypes(self):
2 object int64
3 object int64
"""
index = self.grouping.keys.unique().to_pandas()
index = self.grouping.keys.unique().sort_values().to_pandas()
return pd.DataFrame(
{
name: [self.obj._dtypes[name]] * len(index)
Expand Down Expand Up @@ -864,25 +864,27 @@ def ngroup(self, ascending=True):
5 0
dtype: int64
"""
num_groups = len(index := self.grouping.keys.unique())
index = self.grouping.keys.unique().sort_values()
num_groups = len(index)
_, has_null_group = bitmask_or([*index._columns])

if ascending:
if has_null_group:
group_ids = cudf.Series._from_data(
{None: cp.arange(-1, num_groups - 1)}
)
else:
group_ids = cudf.Series._from_data(
{None: cp.arange(num_groups)}
)
# Count ascending from 0 to num_groups - 1
group_ids = cudf.Series._from_data({None: cp.arange(num_groups)})
elif has_null_group:
# Count descending from num_groups - 1 to 0, but subtract one more
# for the null group making it num_groups - 2 to -1.
group_ids = cudf.Series._from_data(
{None: cp.arange(num_groups - 2, -2, -1)}
)
else:
# Count descending from num_groups - 1 to 0
group_ids = cudf.Series._from_data(
{None: cp.arange(num_groups - 1, -1, -1)}
)

if has_null_group:
group_ids.iloc[0] = cudf.NA
group_ids.iloc[-1] = cudf.NA

group_ids._index = index
return self._broadcast(group_ids)
Expand Down Expand Up @@ -1065,7 +1067,7 @@ def _grouped(self):
column_names=self.obj._column_names,
index_names=self.obj._index_names,
)
group_names = grouped_keys.unique()
group_names = grouped_keys.unique().sort_values()
return (group_names, offsets, grouped_keys, grouped_values)

def _normalize_aggs(
Expand Down
2 changes: 1 addition & 1 deletion python/cudf/cudf/core/reshape.py
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,7 @@ def _get_unique(column, dummy_na):
if isinstance(column, cudf.core.column.CategoricalColumn):
unique = column.categories
else:
unique = column.unique()
unique = column.unique().sort_by_values()[0]
if not dummy_na:
if np.issubdtype(unique.dtype, np.floating):
unique = unique.nans_to_nulls()
Expand Down
Loading

0 comments on commit 4fdb60d

Please sign in to comment.