-
Notifications
You must be signed in to change notification settings - Fork 653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FEAT-#4909: Properly implement map operator #5118
base: master
Are you sure you want to change the base?
Conversation
Codecov Report
@@ Coverage Diff @@
## master #5118 +/- ##
==========================================
+ Coverage 84.56% 89.08% +4.51%
==========================================
Files 256 257 +1
Lines 19349 19613 +264
==========================================
+ Hits 16363 17472 +1109
+ Misses 2986 2141 -845
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
This pull request introduces 1 alert when merging 5c0478cfb1740123b64fd58ccdd8b2a8604dd2ef into 88f7b27 - view on LGTM.com new alerts:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments.
BTW, do we really need to combine all of the map functions into a single one? IMO some of them became really huge, complicated, and hard to read. Especially PartitionManager.map_select_indices
and PandasDataframe._map_axis
.
I would suggest either refactoring them somehow to relax the complexity or splitting some of them into separate methods.
AxisInt = Literal[0, 1] | ||
"""Type for the two possible integer values of an axis argument (0 or 1).""" | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed? I mean, why would we extend internal dataframe API to also be able to accept AxisInt
when we already have Axis
enum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of the codebase (mostly the query compiler) is written to call dataframe methods with a literal int rather than the Axis
enum. I think it would be easier to re-wrap the axis with the enum from within dataframe methods (as is done now) than to go through and fix every instance where relevant dataframe methods are called to use the enum instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see why we need this Axis
enum then. I really don't like this mixing of Axis
, AxisInt
, and actual integers for an axis value. I think we should pick only one of the ways of interpreting an axis and then really stick to this, not introducing a variety of axis types in order to cover an existing zoo of value types.
copy_dtypes : bool, default: False | ||
If True, the dtypes of the resulting dataframe are copied from the original, | ||
and the ``dtypes`` argument is ignored. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we offer a copy_dtypes
option only for the map
operator but not for reduce
and tree_reduce
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really sure, though my guess is that the frequently dimension-reducing nature of reduce/tree-reduce makes the argument less relevant for those cases. Here, I introduced copy_dtypes
as a replacement for dtypes="copy"
, which is a little hacky.
axis: Optional[Union[int, Axis]] = None, | ||
dtypes: Optional[str] = None, | ||
dtypes: Optional[Union[pandas.Series, type]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I remember correctly, there was a discussion regarding limiting the usage of pandas entities in the base classes of Modin internals. Some executions may not require pandas at all and wouldn't like to deal with handling breaking changes introduced by some pandas updates.
May we define the dtypes
type as something abstract like collections.abc.Mapping
so every execution would use whatever container they like?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, that makes senes. Is there some other generic container that would accept pandas.Series
though? It seems like it's not a subclass of Mapping
.
new_columns=new_columns, | ||
) | ||
return self.__constructor__(new_modin_frame) | ||
|
||
# Map partitions operations | ||
# These operations are operations that apply a function to every partition. | ||
abs = Map.register(pandas.DataFrame.abs, dtypes="copy") | ||
# Though all these operations are element-wise, some can be sped up by mapping across a row/col |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate on how the speed-up is achieved?
IMO the cell-wise execution should be beneficial in the general case against row/col-wise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My quick and dirty micro-benchmarks show no difference between specifying an axis vs. applying cell-wise, so perhaps it's best to revert back to cell-wise operations. The hope is that for certain operators, being able to apply across a whole axis rather than having to examine each cell would provide a speedup. I will see if any other benchmarks would justify this theory.
02a1619
to
363dcfd
Compare
Updated benchmarks for this PR (02a16191, slightly older version before a rebase) vs. current master (6f0ff79).
I haven't yet check the sources of speedup (e.g. whether they're from shorter code paths/less partition overhead, or from changing maps to be axis-wise). |
363dcfd
to
429511f
Compare
429511f
to
5c11ab4
Compare
5c11ab4
to
d1596d8
Compare
Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
Signed-off-by: Jonathan Shi <[email protected]>
ab2c1c9
to
07aef83
Compare
CI should be passing now (I ran it on my own repository before pushing here). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May I kindly ask, what was the original idea of the PR? It seems that this PR tries to solve too many problems in one piece. It's really hard to review for me and to make the changes here for you.
I feel that the PR covers the following distinct topics:
- Align how we use
axis
argument in low-level dataframe - Introduce new logic for working with
dtypes/copy_dtypes
parameters - Combine
map
andbroadcast_apply
intomap_partitions
- Combine
apply_full_axis
andbroadcast_apply_full_axis
intomap_partition_full_axis
- Combine
apply_select_indices
andbroadcast_apply_select_indices
intomap_select_indices
- Rework
apply_func_to_indices_both_axis
intomap_select_indices_both_axes
All of these may be solved with small different PRs (rather than one huge). They're probably un-doable in parallel as some of them may block each other, however, I think the changes would make much more sense when introduced by small iterations.
AxisInt = Literal[0, 1] | ||
"""Type for the two possible integer values of an axis argument (0 or 1).""" | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see why we need this Axis
enum then. I really don't like this mixing of Axis
, AxisInt
, and actual integers for an axis value. I think we should pick only one of the ways of interpreting an axis and then really stick to this, not introducing a variety of axis types in order to cover an existing zoo of value types.
join_type : str, default: "left" | ||
Type of join to apply. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have a special enum for this, let's use it
class JoinType(Enum): # noqa: PR01 |
axis=0, | ||
other_partitions=None, | ||
full_axis=False, | ||
apply_indices=[0], | ||
other_apply_indices=None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really want these parameters to be specified? it seems that they just duplicate default values
axis=0, | |
other_partitions=None, | |
full_axis=False, | |
apply_indices=[0], | |
other_apply_indices=None, | |
axis=0, | |
apply_indices=[0], |
the partitions will be concatenated together before the function is called, and then re-split | ||
after it returns. | ||
join_type : str, default: "left" | ||
Type of join to apply. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please elaborate? something like this is expected:
Type of join to apply. | |
Type of join to apply if the concatenation of `self` and `other` would be required. |
dtypes=dtypes, | ||
) | ||
if axis == Axis.CELL_WISE: | ||
return self._map_cellwise(func, dtypes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does cell-wise map ignore all other parameters?
new_partitions = self._partition_mgr_cls.map_partitions( | ||
self._partitions, func | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we ignore axis
here? why does the .map_partitions
call is inside of _map_axis
that's supposed to call function axis-wise only?
*, | ||
axis: Optional[Union[AxisInt, Axis]] = None, | ||
other: Optional["PandasDataframe"] = None, | ||
full_axis=False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this parameter if we have a separate method for this (map_full_axis
)?
kw = self._make_init_labels_args(new_partitions, new_index, new_columns) | ||
if copy_dtypes: | ||
kw["dtypes"] = self._dtypes | ||
elif isinstance(dtypes, type): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
judging by the method's signature we are only supposed to allow pandas.Series
to be a dtype
parameter, why is this logic here then? Let's either change the signature or adapt the logic somehow
apply_indices=None, | ||
numeric_indices=None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really want these two parameters to exist at the same time? we can easily end-up in an ambiguous situation with this set of parameters:
md_df.map_select_indices(
apply_indices=["a", "b"],
numeric_indices=[1, 2, 3, 4, 5], ...
) # what's the method supposed to do?
|
||
def rename( | ||
def window( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we're adding this here if there's no implementation? Shouldn't it be located in the base class then?
Thanks for taking the time to review @dchigarev. Broadly speaking, the purpose of this PR is to make calling the various partition application methods more uniform, and remove misleading "broadcast" nomenclature from the codebase (my understanding is that when the functions were originally written, the intent was for the functions to broadcast arguments to match dimensions like in some numpy functions). I'll see if I can split this into several smaller PRs; your suggestions for how to break it down makes sense, although this fragmentation might cause some inconsistencies between how different mapping methods are used. I'll double check with @RehanSD (who assigned me to this task) if this is a viable approach. |
I've decided to split this into smaller parts as you suggested, starting with #5426 and #5427. Thanks again for the advice @dchigarev. |
What do these changes do?
This PR cleans up the interfaces of the various
map
,apply
, andbroadcast
dataframe and partition manager methods.Since
reduce
andtreereduce
both use these methods, these are also affected by the aforementioned changes. The changes also incidentally address #4912 and (partially) #5094, but those changes can be separated out fairly easily if this PR is too large.Overall, the following changes have been made to the dataframe API (the partition manager changes are very similar):
A lot of logic that used to be in separate functions got moved into nested if/else chains with this refactor: suggestions on how to clean up the code would be appreciated.
Microbenchmarks
All tests were run on an EC2 t2.2xlarge instance (8 CPUs, 32 GiB RAM, 128 GB disk, Ubuntu Jammy AMD64) with the Ray backend, with int64 dataframes of size 2^16 x 2^14. Each test was run 5 times and averaged.
These benchmarks seem to indicate no appreciable performance difference on datasets of this size.
abs
The abs function is changed to map across rows rather than cell-wise.
apply
The test ran
df.apply(np.sum, axis=0)
.describe
flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
git commit -s
map
operator #4909, BUG:first_valid_index
errors on dataframe with only None/NaN values #4912, BUG: Passing string asaxis
argument leads to incorrect behavior #5094docs/development/architecture.rst
is up-to-date