Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERF-#6433: Implement '.dropna()' using map-reduce pattern #6472

Merged
merged 4 commits into from
Aug 21, 2023

Conversation

dchigarev
Copy link
Collaborator

@dchigarev dchigarev commented Aug 8, 2023

This PR replaces a full-column implementation of .dropna() with a chain of two row-axis/map functions.

At the first stage, we submit a row-axis function evaluating whether each column should be dropped for this specific row partition and builds a boolean mask for that matter, at the second stage those masks are being merged and the columns to be dropped are being dropped.

perf results

Here's the comparison for df.dropna(axis=1, how="any") between these two implementations:

code to measure
import modin.pandas as pd
import modin.config as cfg
import numpy as np
from timeit import default_timer as timer

cfg.BenchmarkMode.put(True)

# start all the workers
pd.DataFrame([np.arange(cfg.MinPartitionSize.get()) for _ in range(cfg.NPartitions.get() ** 2)]).to_numpy()

shapes = [
    [5000, 5000],
    [1_000_000, 32],
    [10_000_000, 32],
    [10_000_000, 256]
]

for how in ("any",):
    for shape in shapes:
        # data-gen was copied from asv TimeDropna benchmark
        row, col = shape
        df = pd.DataFrame(np.random.randn(row, col))
        df.iloc[row // 20 : row // 10, col // 3 : col // 2] = np.nan
        df["foo"] = "bar"

        t1 = timer()
        df.dropna(axis=1, how=how)
        print(f"{shape=}, {how=}: {timer() - t1}")

image

What do these changes do?

  • first commit message and PR title follow format outlined here

    NOTE: If you edit the PR title to match this format, you need to add another commit (even if it's empty) or amend your last commit for the CI job that checks the PR title to pick up the new PR title.

  • passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • signed commit with git commit -s
  • Resolves Reimplement .dropna() using MapReduce pattern #6433
  • tests are passing
  • module layout described at docs/development/architecture.rst is up-to-date

Comment on lines 2839 to 2841
processable_amount_of_partitions = (
np.prod(self._modin_frame._partitions.shape) < CpuCount.get() * 32
)
Copy link
Collaborator Author

@dchigarev dchigarev Aug 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise I was getting ~60s. for the case of a square-like frame with the shape of (5000, 5000), when normally it should pass for about 1-2 seconds

@@ -2868,13 +2909,19 @@ def _pick_axis(get_axis, sizes_cache):
if axis == 0:
# Pass shape caches instead of values in order to not trigger shape computation.
new_index, new_row_lengths = _pick_axis(
self._get_index, self._row_lengths_cache
self.copy_index_cache, self._row_lengths_cache
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.copy_index_cache() doesn't trigger index materialization in comparison with ._get_index() that does it

@dchigarev dchigarev marked this pull request as ready for review August 9, 2023 13:40
@dchigarev dchigarev requested a review from a team as a code owner August 9, 2023 13:40
Comment on lines 2839 to 2841
processable_amount_of_partitions = (
np.prod(self._modin_frame._partitions.shape) < CpuCount.get() * 32
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not mix abstraction levels and separate this condition into a dataframe method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's separate this condition into a dataframe method.

well, that would be quite hard...

here I want to dispatch between two different implementations of .dropna(). Since dataframe level doesn't define dropna, then I have to do this dispatching at the QC level.

The idea of a "general dispatcher" at the dataframe level was discussed at #5394 but trying suggested ideas didn't really find success, so there's a lot of work to be done in this regard

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just hide implementation details something like that:

        processable_amount_of_partitions = self._modin_frame.processable_amount_of_partitions_for_dropna

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except for the .processable_amount_of_partitions_for_dropna field, I simply added a property .num_parts and kept the deciding logic at the QC, so the line now looks like:

processable_amount_of_partitions = self._modin_frame.num_parts < CpuCount.get() * 32

It feels wrong adding the .processable_amount_of_partitions_for_dropna field, as the modin_frame layer should know nothing about the dropna and its implementation desires in terms of partitions amount

Signed-off-by: Dmitry Chigarev <[email protected]>
Copy link
Collaborator

@anmyachev anmyachev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@YarShev YarShev merged commit cfee6b4 into modin-project:master Aug 21, 2023
37 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reimplement .dropna() using MapReduce pattern
3 participants