Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#4909: Properly implement map operator #5118

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

noloerino
Copy link
Collaborator

@noloerino noloerino commented Oct 12, 2022

What do these changes do?

This PR cleans up the interfaces of the various map, apply, and broadcast dataframe and partition manager methods.

Since reduce and treereduce both use these methods, these are also affected by the aforementioned changes. The changes also incidentally address #4912 and (partially) #5094, but those changes can be separated out fairly easily if this PR is too large.

Overall, the following changes have been made to the dataframe API (the partition manager changes are very similar):

Old method New method
map
and
broadcast_apply
map_partitions
apply_full_axis
and
broadcast_apply_full_axis
map_partition_full_axis
apply_select_indices
and
broadcast_apply_select_indices
map_select_indices
apply_func_to_indices_both_axis
map_select_indices_both_axes

A lot of logic that used to be in separate functions got moved into nested if/else chains with this refactor: suggestions on how to clean up the code would be appreciated.

Microbenchmarks

All tests were run on an EC2 t2.2xlarge instance (8 CPUs, 32 GiB RAM, 128 GB disk, Ubuntu Jammy AMD64) with the Ray backend, with int64 dataframes of size 2^16 x 2^14. Each test was run 5 times and averaged.

These benchmarks seem to indicate no appreciable performance difference on datasets of this size.

abs

The abs function is changed to map across rows rather than cell-wise.

  • PR (f5ef6f9e): 0.0354s
  • master (c070b65): 0.0352s

apply

The test ran df.apply(np.sum, axis=0).

  • PR: 9.0078s
  • master: 9.0166

describe

  • PR: 32.3454s
  • master: 32.0462

@codecov
Copy link

codecov bot commented Oct 12, 2022

Codecov Report

Merging #5118 (f5ef6f9) into master (abcf1e9) will increase coverage by 4.51%.
The diff coverage is 94.87%.

@@            Coverage Diff             @@
##           master    #5118      +/-   ##
==========================================
+ Coverage   84.56%   89.08%   +4.51%     
==========================================
  Files         256      257       +1     
  Lines       19349    19613     +264     
==========================================
+ Hits        16363    17472    +1109     
+ Misses       2986     2141     -845     
Impacted Files Coverage Δ
...ns/pandas_on_ray/partitioning/partition_manager.py 71.73% <ø> (+4.34%) ⬆️
modin/core/dataframe/base/dataframe/dataframe.py 95.34% <60.00%> (-4.66%) ⬇️
...dataframe/pandas/partitioning/partition_manager.py 88.88% <91.50%> (+1.98%) ⬆️
modin/core/dataframe/pandas/dataframe/dataframe.py 95.18% <94.49%> (-0.06%) ⬇️
modin/core/dataframe/algebra/binary.py 100.00% <100.00%> (ø)
modin/core/dataframe/base/dataframe/utils.py 100.00% <100.00%> (ø)
...me/pandas/interchange/dataframe_protocol/column.py 93.33% <100.00%> (ø)
...pandas/interchange/dataframe_protocol/dataframe.py 96.72% <100.00%> (ø)
...odin/core/storage_formats/pandas/query_compiler.py 96.34% <100.00%> (+0.40%) ⬆️
...ecution/ray/implementations/pandas_on_ray/io/io.py 93.33% <100.00%> (ø)
... and 51 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@lgtm-com
Copy link

lgtm-com bot commented Oct 12, 2022

This pull request introduces 1 alert when merging 5c0478cfb1740123b64fd58ccdd8b2a8604dd2ef into 88f7b27 - view on LGTM.com

new alerts:

  • 1 for Wrong name for an argument in a class instantiation

@noloerino noloerino marked this pull request as ready for review October 12, 2022 20:32
@noloerino noloerino requested a review from a team as a code owner October 12, 2022 20:32
Copy link
Collaborator

@dchigarev dchigarev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments.

BTW, do we really need to combine all of the map functions into a single one? IMO some of them became really huge, complicated, and hard to read. Especially PartitionManager.map_select_indices and PandasDataframe._map_axis.

I would suggest either refactoring them somehow to relax the complexity or splitting some of them into separate methods.

Comment on lines +45 to +48
AxisInt = Literal[0, 1]
"""Type for the two possible integer values of an axis argument (0 or 1)."""


Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? I mean, why would we extend internal dataframe API to also be able to accept AxisInt when we already have Axis enum?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of the codebase (mostly the query compiler) is written to call dataframe methods with a literal int rather than the Axis enum. I think it would be easier to re-wrap the axis with the enum from within dataframe methods (as is done now) than to go through and fix every instance where relevant dataframe methods are called to use the enum instead.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why we need this Axis enum then. I really don't like this mixing of Axis, AxisInt, and actual integers for an axis value. I think we should pick only one of the ways of interpreting an axis and then really stick to this, not introducing a variety of axis types in order to cover an existing zoo of value types.

Comment on lines +123 to +117
copy_dtypes : bool, default: False
If True, the dtypes of the resulting dataframe are copied from the original,
and the ``dtypes`` argument is ignored.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we offer a copy_dtypes option only for the map operator but not for reduce and tree_reduce?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure, though my guess is that the frequently dimension-reducing nature of reduce/tree-reduce makes the argument less relevant for those cases. Here, I introduced copy_dtypes as a replacement for dtypes="copy", which is a little hacky.

axis: Optional[Union[int, Axis]] = None,
dtypes: Optional[str] = None,
dtypes: Optional[Union[pandas.Series, type]] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly, there was a discussion regarding limiting the usage of pandas entities in the base classes of Modin internals. Some executions may not require pandas at all and wouldn't like to deal with handling breaking changes introduced by some pandas updates.

May we define the dtypes type as something abstract like collections.abc.Mapping so every execution would use whatever container they like?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that makes senes. Is there some other generic container that would accept pandas.Series though? It seems like it's not a subclass of Mapping.

modin/core/dataframe/pandas/dataframe/dataframe.py Outdated Show resolved Hide resolved
modin/core/dataframe/pandas/dataframe/dataframe.py Outdated Show resolved Hide resolved
modin/core/dataframe/base/dataframe/dataframe.py Outdated Show resolved Hide resolved
new_columns=new_columns,
)
return self.__constructor__(new_modin_frame)

# Map partitions operations
# These operations are operations that apply a function to every partition.
abs = Map.register(pandas.DataFrame.abs, dtypes="copy")
# Though all these operations are element-wise, some can be sped up by mapping across a row/col
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate on how the speed-up is achieved?

IMO the cell-wise execution should be beneficial in the general case against row/col-wise.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My quick and dirty micro-benchmarks show no difference between specifying an axis vs. applying cell-wise, so perhaps it's best to revert back to cell-wise operations. The hope is that for certain operators, being able to apply across a whole axis rather than having to examine each cell would provide a speedup. I will see if any other benchmarks would justify this theory.

@noloerino noloerino force-pushed the map-operator branch 3 times, most recently from 02a1619 to 363dcfd Compare November 1, 2022 19:42
@noloerino
Copy link
Collaborator Author

noloerino commented Nov 1, 2022

Updated benchmarks for this PR (02a16191, slightly older version before a rebase) vs. current master (6f0ff79).

df.abs() (2^16 x 2^14)

  • 02a16191 - 0.0352s
  • 6f0ff79 - 0.0354s

df.apply(np.sum, axis=0) (2^16 x 2^14)

  • 02a16191 - 6.531s
  • 6f0ff79 - 8.072s

df1 + df2 (2^15 x 2^14 each)

  • 02a16191 - 0.0218s
  • 6f0ff79 - 0.0212s

df.describe() (2^16 x 2^14)

df.isna().any() (2^16 x 2^14)

  • 02a16191 - 0.0782s
  • 6f0ff79 - 0.0858s

I haven't yet check the sources of speedup (e.g. whether they're from shorter code paths/less partition overhead, or from changing maps to be axis-wise).

Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
@noloerino
Copy link
Collaborator Author

CI should be passing now (I ran it on my own repository before pushing here).

Copy link
Collaborator

@dchigarev dchigarev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I kindly ask, what was the original idea of the PR? It seems that this PR tries to solve too many problems in one piece. It's really hard to review for me and to make the changes here for you.

I feel that the PR covers the following distinct topics:

  1. Align how we use axis argument in low-level dataframe
  2. Introduce new logic for working with dtypes/copy_dtypes parameters
  3. Combine map and broadcast_apply into map_partitions
  4. Combine apply_full_axis and broadcast_apply_full_axis into map_partition_full_axis
  5. Combine apply_select_indices and broadcast_apply_select_indices into map_select_indices
  6. Rework apply_func_to_indices_both_axis into map_select_indices_both_axes

All of these may be solved with small different PRs (rather than one huge). They're probably un-doable in parallel as some of them may block each other, however, I think the changes would make much more sense when introduced by small iterations.

Comment on lines +45 to +48
AxisInt = Literal[0, 1]
"""Type for the two possible integer values of an axis argument (0 or 1)."""


Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why we need this Axis enum then. I really don't like this mixing of Axis, AxisInt, and actual integers for an axis value. I think we should pick only one of the ways of interpreting an axis and then really stick to this, not introducing a variety of axis types in order to cover an existing zoo of value types.

Comment on lines +1893 to +1894
join_type : str, default: "left"
Type of join to apply.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have a special enum for this, let's use it

class JoinType(Enum): # noqa: PR01

Comment on lines +1039 to +1043
axis=0,
other_partitions=None,
full_axis=False,
apply_indices=[0],
other_apply_indices=None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really want these parameters to be specified? it seems that they just duplicate default values

Suggested change
axis=0,
other_partitions=None,
full_axis=False,
apply_indices=[0],
other_apply_indices=None,
axis=0,
apply_indices=[0],

the partitions will be concatenated together before the function is called, and then re-split
after it returns.
join_type : str, default: "left"
Type of join to apply.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please elaborate? something like this is expected:

Suggested change
Type of join to apply.
Type of join to apply if the concatenation of `self` and `other` would be required.

dtypes=dtypes,
)
if axis == Axis.CELL_WISE:
return self._map_cellwise(func, dtypes)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does cell-wise map ignore all other parameters?

Comment on lines +1914 to +1916
new_partitions = self._partition_mgr_cls.map_partitions(
self._partitions, func
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we ignore axis here? why does the .map_partitions call is inside of _map_axis that's supposed to call function axis-wise only?

*,
axis: Optional[Union[AxisInt, Axis]] = None,
other: Optional["PandasDataframe"] = None,
full_axis=False,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this parameter if we have a separate method for this (map_full_axis)?

kw = self._make_init_labels_args(new_partitions, new_index, new_columns)
if copy_dtypes:
kw["dtypes"] = self._dtypes
elif isinstance(dtypes, type):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

judging by the method's signature we are only supposed to allow pandas.Series to be a dtype parameter, why is this logic here then? Let's either change the signature or adapt the logic somehow

Comment on lines +2080 to +2081
apply_indices=None,
numeric_indices=None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really want these two parameters to exist at the same time? we can easily end-up in an ambiguous situation with this set of parameters:

md_df.map_select_indices(
    apply_indices=["a", "b"],
    numeric_indices=[1, 2, 3, 4, 5], ...
) # what's the method supposed to do?


def rename(
def window(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we're adding this here if there's no implementation? Shouldn't it be located in the base class then?

@noloerino
Copy link
Collaborator Author

Thanks for taking the time to review @dchigarev. Broadly speaking, the purpose of this PR is to make calling the various partition application methods more uniform, and remove misleading "broadcast" nomenclature from the codebase (my understanding is that when the functions were originally written, the intent was for the functions to broadcast arguments to match dimensions like in some numpy functions).

I'll see if I can split this into several smaller PRs; your suggestions for how to break it down makes sense, although this fragmentation might cause some inconsistencies between how different mapping methods are used. I'll double check with @RehanSD (who assigned me to this task) if this is a viable approach.

@noloerino
Copy link
Collaborator Author

I've decided to split this into smaller parts as you suggested, starting with #5426 and #5427. Thanks again for the advice @dchigarev.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

FEAT: Properly implement map operator
2 participants