Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#6990: Implement lazy execution for the Ray virtual partitions. #6991

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

AndreyPavlenko
Copy link
Collaborator

What do these changes do?

  • first commit message and PR title follow format outlined here

    NOTE: If you edit the PR title to match this format, you need to add another commit (even if it's empty) or amend your last commit for the CI job that checks the PR title to pick up the new PR title.

  • passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • signed commit with git commit -s
  • Resolves Implement lazy execution for the Ray virtual partitions. #6990
  • tests added and passing
  • module layout described at docs/development/architecture.rst is up-to-date

@AndreyPavlenko AndreyPavlenko force-pushed the issue-6990 branch 2 times, most recently from bf2943d to ea540cc Compare March 1, 2024 20:38
@AndreyPavlenko AndreyPavlenko force-pushed the issue-6990 branch 13 times, most recently from 128509d to 8ce0b34 Compare March 20, 2024 20:17
Copy link
Collaborator

@anmyachev anmyachev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Judging by the annotations, we need to write a lot more tests to cover most of the changes.

from .utils import initialize_ray

__all__ = [
"initialize_ray",
"RayWrapper",
"MaterializationHook",
"SignalActor",
"RayObjectRefTypes",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This item has been deleted

@@ -214,7 +214,7 @@ def wait(cls, obj_ids, num_returns=None):
num_returns : int, optional
"""
if not isinstance(obj_ids, Sequence):
obj_ids = list(obj_ids)
obj_ids = list(obj_ids) if isinstance(obj_ids, Iterable) else [obj_ids]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be deleted.

@@ -868,7 +868,7 @@ class LazyExecution(EnvironmentVariable, type=str):
"""

varname = "MODIN_LAZY_EXECUTION"
choices = ("Auto", "On", "Off")
choices = ("Auto", "On", "Off", "Axis")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why introduce a new mode?

Comment on lines +456 to +459
try:
ref = ray.get(ref, timeout=0)
except ray.exceptions.GetTimeoutError:
return False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an object has been calculated and placed in distributed storage, will materialization occur here?

If this approach can be effective, then it is worth considering the possibility of using it in other places.

@@ -419,7 +424,7 @@ def eager_exec(self, func, *args, length=None, width=None, **kwargs):
LazyExecution.subscribe(_configure_lazy_exec)


class SlicerHook(MaterializationHook):
class SlicerHook(MaterializationHook, DeferredExecution):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the idea behind this change?

from .partition import PandasOnRayDataframePartition


class PandasOnRayDataframeVirtualPartition(BaseDataframeAxisPartition):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not such inheritance?

Suggested change
class PandasOnRayDataframeVirtualPartition(BaseDataframeAxisPartition):
class PandasOnRayDataframeVirtualPartition(PandasDataframeAxisPartition):

@@ -42,6 +54,82 @@ class PandasOnRayDataframePartitionManager(GenericRayDataframePartitionManager):
_execution_wrapper = RayWrapper
materialize_futures = RayWrapper.materialize

if LazyExecution.get() in ("On", "Axis"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether to use this function or not is determined during the first import without the possibility of further replacement. As far as I remember, in all other places, functions are defined on each call.


@classmethod
@_inherit_docstrings(GenericRayDataframePartitionManager.get_indices)
def get_indices(cls, axis, partitions, index_func=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tried making lazy changes to the already existing get_indices? (without overriding)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we call this get_indices, do we trigger the entire lazy execution tree? If so, do we keep the result the consumers depend on?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g., if we had a lazy apply and computed indices, would we keep the result of the apply?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What this function is trying to do is to avoid the partitions concatenation. It could be possible in the case when all the partitions are the result of a deferred split operation. Look at the description of the find_non_split_block() function. There is an example of such an execution tree. If we can find in the tree the non-split partition, we can just get the index out of there and, thus, avoid the concatenation.

Comment on lines +36 to +38
PandasOnRayDataframeColumnPartition,
PandasOnRayDataframeRowPartition,
PandasOnRayDataframeVirtualPartition,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tried making changes to existing classes?

axis = 0

@remote_function
def _remote_concat(dfs): # pragma: no cover # noqa: GL08
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that the concat works as intended, given the message about the naming of the function arguments?

@@ -29,13 +29,20 @@

import pandas
import ray
import ray.exceptions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that a lot of the changes in this file are not directly affected by this pull request and therefore it would be great to move them into a separate pull request.

@@ -12,24 +12,36 @@
# governing permissions and limitations under the License.

"""Module houses class that implements ``GenericRayDataframePartitionManager`` using Ray."""
import math
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import math
import math

PandasOnRayDataframeRowPartition,
)

if LazyExecution.get() in ("On", "Axis"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic should probably be placed in modin/core/execution/ray/implementations/pandas_on_ray/partitioning/init.py.

# governing permissions and limitations under the License.

"""Module houses classes responsible for storing a virtual partition and applying a function to it."""
import math
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import math
import math

"""

partition_type = PandasOnRayDataframePartition
instance_type = ray.ObjectRef
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
instance_type = ray.ObjectRef

@anmyachev, can this be removed?

list of lengths or None
Estimated chunk lengths, that could be different form the real ones.
bool
Whether the specified partitions represent the full block or just the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate a little on this?

manual_partition=False,
**kwargs,
) -> Union[List[PandasOnRayDataframePartition], PandasOnRayDataframePartition]:
if not manual_partition:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this parameter have effect only in case of False? Should we copy the related logic from the base class?

lengths: Union[List[Union[ObjectRefType, int]], None],
):
self.num_splits = num_splits
self.skip_chunks = set()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put a comment what this is for.

Comment on lines +229 to +231
PandasOnRayDataframeColumnPartition
if self.axis
else PandasOnRayDataframeRowPartition
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
PandasOnRayDataframeColumnPartition
if self.axis
else PandasOnRayDataframeRowPartition
PandasOnRayDataframeRowPartition
if self.axis
else PandasOnRayDataframeColumnPartition

Should this be so?

@@ -391,22 +408,24 @@ def _deconstruct_list(
"""
for obj in lst:
if isinstance(obj, DeferredExecution):
if out_pos := getattr(obj, "out_pos", None):
if obj.has_result:
obj = obj.data
Copy link
Collaborator

@arunjose696 arunjose696 Jun 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
obj = obj.data
out_append(obj.data)

I think it would be better to append obj.data in this if branch and remove the continue statements in all the else statements.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If obj.data is a list, we need to deconstruct it either. Thus, we assign it to obj and go to the if isinstance(obj, ListOrTuple) check.

if obj.subscribers == 0:
output[out_pos + 1] = 0
result_consumers.remove(obj)
continue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
continue

else:
out_append(_Tag.CHAIN)
yield cls._deconstruct_chain(obj, output, stack, result_consumers)
out_append(_Tag.END)
elif isinstance(obj, ListOrTuple):
continue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
continue

elif isinstance(obj, ListOrTuple):
continue

if isinstance(obj, ListOrTuple):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if isinstance(obj, ListOrTuple):
elif isinstance(obj, ListOrTuple):

Comment on lines +415 to +420
out_append(_Tag.REF)
out_append(out_pos)
output[out_pos] = out_pos
if obj.subscribers == 0:
output[out_pos + 1] = 0
result_consumers.remove(obj)
Copy link
Collaborator

@arunjose696 arunjose696 Jun 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this code is duplicated

if de.subscribers == 0:
# We may have subscribed to the same node multiple times.
# It could happen, for example, if it's passed to the args
# multiple times, or it's one of the parent nodes and also
# passed to the args. In this case, there are no multiple
# subscribers, and we don't need to return the result.
output[out_pos + 1] = 0
result_consumers.remove(de)
and we have reason for this deconstruct_chain, could it be reused?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it makes sense to create a separate function just in order to reuse 3 lines of trivial code. Besides, it will cost a function call. Probably, a comment should be added here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah a comment should be sufficent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement lazy execution for the Ray virtual partitions.
4 participants