-
Notifications
You must be signed in to change notification settings - Fork 653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FEAT-#6990: Implement lazy execution for the Ray virtual partitions. #6991
base: master
Are you sure you want to change the base?
Conversation
bf2943d
to
ea540cc
Compare
modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py
Fixed
Show fixed
Hide fixed
2e3390b
to
d98324d
Compare
modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py
Fixed
Show fixed
Hide fixed
128509d
to
8ce0b34
Compare
8cc6583
to
b09a944
Compare
b09a944
to
92fe2f7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Judging by the annotations, we need to write a lot more tests to cover most of the changes.
from .utils import initialize_ray | ||
|
||
__all__ = [ | ||
"initialize_ray", | ||
"RayWrapper", | ||
"MaterializationHook", | ||
"SignalActor", | ||
"RayObjectRefTypes", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This item has been deleted
@@ -214,7 +214,7 @@ def wait(cls, obj_ids, num_returns=None): | |||
num_returns : int, optional | |||
""" | |||
if not isinstance(obj_ids, Sequence): | |||
obj_ids = list(obj_ids) | |||
obj_ids = list(obj_ids) if isinstance(obj_ids, Iterable) else [obj_ids] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be deleted.
@@ -868,7 +868,7 @@ class LazyExecution(EnvironmentVariable, type=str): | |||
""" | |||
|
|||
varname = "MODIN_LAZY_EXECUTION" | |||
choices = ("Auto", "On", "Off") | |||
choices = ("Auto", "On", "Off", "Axis") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why introduce a new mode?
try: | ||
ref = ray.get(ref, timeout=0) | ||
except ray.exceptions.GetTimeoutError: | ||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If an object has been calculated and placed in distributed storage, will materialization occur here?
If this approach can be effective, then it is worth considering the possibility of using it in other places.
@@ -419,7 +424,7 @@ def eager_exec(self, func, *args, length=None, width=None, **kwargs): | |||
LazyExecution.subscribe(_configure_lazy_exec) | |||
|
|||
|
|||
class SlicerHook(MaterializationHook): | |||
class SlicerHook(MaterializationHook, DeferredExecution): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the idea behind this change?
from .partition import PandasOnRayDataframePartition | ||
|
||
|
||
class PandasOnRayDataframeVirtualPartition(BaseDataframeAxisPartition): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not such inheritance?
class PandasOnRayDataframeVirtualPartition(BaseDataframeAxisPartition): | |
class PandasOnRayDataframeVirtualPartition(PandasDataframeAxisPartition): |
@@ -42,6 +54,82 @@ class PandasOnRayDataframePartitionManager(GenericRayDataframePartitionManager): | |||
_execution_wrapper = RayWrapper | |||
materialize_futures = RayWrapper.materialize | |||
|
|||
if LazyExecution.get() in ("On", "Axis"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether to use this function or not is determined during the first import without the possibility of further replacement. As far as I remember, in all other places, functions are defined on each call.
|
||
@classmethod | ||
@_inherit_docstrings(GenericRayDataframePartitionManager.get_indices) | ||
def get_indices(cls, axis, partitions, index_func=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you tried making lazy changes to the already existing get_indices
? (without overriding)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we call this get_indices, do we trigger the entire lazy execution tree? If so, do we keep the result the consumers depend on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
E.g., if we had a lazy apply and computed indices, would we keep the result of the apply?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What this function is trying to do is to avoid the partitions concatenation. It could be possible in the case when all the partitions are the result of a deferred split operation. Look at the description of the find_non_split_block()
function. There is an example of such an execution tree. If we can find in the tree the non-split partition, we can just get the index out of there and, thus, avoid the concatenation.
PandasOnRayDataframeColumnPartition, | ||
PandasOnRayDataframeRowPartition, | ||
PandasOnRayDataframeVirtualPartition, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you tried making changes to existing classes?
axis = 0 | ||
|
||
@remote_function | ||
def _remote_concat(dfs): # pragma: no cover # noqa: GL08 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure that the concat works as intended, given the message about the naming of the function arguments?
@@ -29,13 +29,20 @@ | |||
|
|||
import pandas | |||
import ray | |||
import ray.exceptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that a lot of the changes in this file are not directly affected by this pull request and therefore it would be great to move them into a separate pull request.
@@ -12,24 +12,36 @@ | |||
# governing permissions and limitations under the License. | |||
|
|||
"""Module houses class that implements ``GenericRayDataframePartitionManager`` using Ray.""" | |||
import math |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import math | |
import math |
PandasOnRayDataframeRowPartition, | ||
) | ||
|
||
if LazyExecution.get() in ("On", "Axis"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic should probably be placed in modin/core/execution/ray/implementations/pandas_on_ray/partitioning/init.py.
# governing permissions and limitations under the License. | ||
|
||
"""Module houses classes responsible for storing a virtual partition and applying a function to it.""" | ||
import math |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import math | |
import math |
""" | ||
|
||
partition_type = PandasOnRayDataframePartition | ||
instance_type = ray.ObjectRef |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instance_type = ray.ObjectRef |
@anmyachev, can this be removed?
list of lengths or None | ||
Estimated chunk lengths, that could be different form the real ones. | ||
bool | ||
Whether the specified partitions represent the full block or just the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate a little on this?
manual_partition=False, | ||
**kwargs, | ||
) -> Union[List[PandasOnRayDataframePartition], PandasOnRayDataframePartition]: | ||
if not manual_partition: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this parameter have effect only in case of False? Should we copy the related logic from the base class?
lengths: Union[List[Union[ObjectRefType, int]], None], | ||
): | ||
self.num_splits = num_splits | ||
self.skip_chunks = set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's put a comment what this is for.
PandasOnRayDataframeColumnPartition | ||
if self.axis | ||
else PandasOnRayDataframeRowPartition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PandasOnRayDataframeColumnPartition | |
if self.axis | |
else PandasOnRayDataframeRowPartition | |
PandasOnRayDataframeRowPartition | |
if self.axis | |
else PandasOnRayDataframeColumnPartition |
Should this be so?
@@ -391,22 +408,24 @@ def _deconstruct_list( | |||
""" | |||
for obj in lst: | |||
if isinstance(obj, DeferredExecution): | |||
if out_pos := getattr(obj, "out_pos", None): | |||
if obj.has_result: | |||
obj = obj.data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
obj = obj.data | |
out_append(obj.data) |
I think it would be better to append obj.data in this if branch and remove the continue statements in all the else statements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If obj.data
is a list, we need to deconstruct it either. Thus, we assign it to obj
and go to the if isinstance(obj, ListOrTuple)
check.
if obj.subscribers == 0: | ||
output[out_pos + 1] = 0 | ||
result_consumers.remove(obj) | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
continue | |
else: | ||
out_append(_Tag.CHAIN) | ||
yield cls._deconstruct_chain(obj, output, stack, result_consumers) | ||
out_append(_Tag.END) | ||
elif isinstance(obj, ListOrTuple): | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
continue | |
elif isinstance(obj, ListOrTuple): | ||
continue | ||
|
||
if isinstance(obj, ListOrTuple): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if isinstance(obj, ListOrTuple): | |
elif isinstance(obj, ListOrTuple): |
out_append(_Tag.REF) | ||
out_append(out_pos) | ||
output[out_pos] = out_pos | ||
if obj.subscribers == 0: | ||
output[out_pos + 1] = 0 | ||
result_consumers.remove(obj) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this code is duplicated
modin/modin/core/execution/ray/common/deferred_execution.py
Lines 326 to 333 in 92fe2f7
if de.subscribers == 0: | |
# We may have subscribed to the same node multiple times. | |
# It could happen, for example, if it's passed to the args | |
# multiple times, or it's one of the parent nodes and also | |
# passed to the args. In this case, there are no multiple | |
# subscribers, and we don't need to return the result. | |
output[out_pos + 1] = 0 | |
result_consumers.remove(de) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it makes sense to create a separate function just in order to reuse 3 lines of trivial code. Besides, it will cost a function call. Probably, a comment should be added here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah a comment should be sufficent.
What do these changes do?
flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
git commit -s
docs/development/architecture.rst
is up-to-date