Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#7346: Handle execution on Dask workers to avoid creating conflic… #7347

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

data-makerman
Copy link

…ting Clients

What do these changes do?

Check if execution is happening on a Dask worker node and, if so, avoid creating conflicting clients. Worker nodes are not allowed to make additional clients.

  • first commit message and PR title follow format outlined here

    NOTE: If you edit the PR title to match this format, you need to add another commit (even if it's empty) or amend your last commit for the CI job that checks the PR title to pick up the new PR title.

  • passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • signed commit with git commit -s
  • Resolves BUG: Apply on axis=1 causes "daemonic processes are not allowed to have children" on some operations on Dask engine, or launches Ray instance #7346
  • tests added and passing
    • After setting up a conda env using environment-dev.yml, the full pytest suite fails reporting missing packages. pytest modin/tests/core/storage_formats/pandas/test_internals.py is passing.
  • module layout described at docs/development/architecture.rst is up-to-date

…ating conflicting Clients

Signed-off-by: Michael Akerman <[email protected]>
Copy link
Collaborator

@devin-petersohn devin-petersohn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @data-makerman, great patch! Would you be able to add a new test with the breaking code from your issue #7346 to the end of modin/tests/core/storage_foramts/pandas/test_internals.py? I tested this and it works in Ray, so we shouldn't need to exclude any engine from the test.

@@ -30,6 +30,17 @@
def initialize_dask():
"""Initialize Dask environment."""
from distributed.client import default_client
from distributed.worker import get_worker

try:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why initialize_dask is called in a worker? Could you verify if this is the case?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely don't fully understand it. I can verify that this PR stops the issue and prevents multiple instances of a Dask cluster from starting under any of the operations where I was observing the behavior. Since it occurs in an apply context on a remote worker, it was beyond my available time (or existing technical skill) to debug exactly what was happening on the Dask worker. It seems possible that there's some other root cause leading to a call to initialize_dask.

I can verify by inference that initialize_dask is being called inside a worker, because it appears to be the only place in Modin 0.31 where the distributed.Client class is ever instantiated, and I can observe in the stdout that multiple Clients are being created as daemonic processes on Dask during the apply operation demonstrated in #7346, but only when working with Modin (not with the equivalent operation in Pandas).

I can hazard a partial guess as to what might be happening that would require further study based on some very confusing behavior I observed: sometimes while attempting to use client.submit(lambda x: x.apply(foo), pandas_df) directly on a Pandas dataframe (not Modin), I saw the same error, but only if Modin had been imported using import modin.pandas as pd. It made me wonder if Dask was calling a pd function while pd had been masked in the worker's namespace by Modin?

I think I can probably create a working example of that if I have enough time later, which might help find the root cause.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I have a decent understanding of what is going on, but there's still something weird happening that I can't explain.

Modin is never fully initialized on the workers. Modin's initialization code is never run on the workers, unless a worker runs a task requires subscribing to an engine it will never have this problem. Link:

class FactoryDispatcher(object):
"""
Class that routes IO-work to the factories.
This class is responsible for keeping selected factory up-to-date and dispatching
calls of IO-functions to its actual execution-specific implementations.
"""
__factory: factories.BaseFactory = None
@classmethod
def get_factory(cls) -> factories.BaseFactory:
"""Get current factory."""
if cls.__factory is None:
from modin.pandas import _update_engine
Engine.subscribe(_update_engine)
Engine.subscribe(cls._update_factory)
StorageFormat.subscribe(cls._update_factory)
return cls.__factory

The Series constructor ends up in this path via from_pandas. So when you call pd.Series(...) from within an apply function, cloudpickle will serialize all of the dependencies of that call and then unpack it within a worker. This could potentially explain why @data-makerman is seeing this happening with pandas after using Modin (but that still seems like a bug to me and is worth investigating more).

Now, what I don't understand is that in ipython and jupyter, instead of hitting this problem we have something else entirely, which is that Modin starts a Ray cluster inside a Dask cluster:

Screenshot 2024-07-22 at 4 14 48 PM

I don't really even understand why it's different, because if I use %run issue-7346.py within ipython I get the same issue as before with Dask:

Screenshot 2024-07-22 at 4 20 35 PM

So I think this patch is absolutely correct in detecting that we are in a worker and avoiding initializing a second Dask cluster, but I will follow on with a patch for this weird ipython issue.

Also, the reason this doesn't happen with Ray is because Ray uses a global variable that all workers share to ensure that one and only one Ray cluster are initialized in the same client/worker. We bypass initialization if Ray is initialized here:

if not ray.is_initialized() or override_is_cluster:

Ray will always be initialized from within a worker.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, what I don't understand is that in ipython and jupyter, instead of hitting this problem we have something else entirely, which is that Modin starts a Ray cluster inside a Dask cluster:

This probably has to do with the fact that workers know nothing about Modin configs set in the main process.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YarShev I think that is correct, but do you know why it would be different in ipython vs the python script?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would guess there is a different strategy for importing modules in ipython.

@data-makerman data-makerman force-pushed the Fix-#7346--Handle-execution-on-Dask-workers-to-avoid-creating-conflicting-Clients branch from a360f93 to 3d17773 Compare July 22, 2024 17:51
@data-makerman
Copy link
Author

Thanks @data-makerman, great patch! Would you be able to add a new test with the breaking code from your issue #7346 to the end of modin/tests/core/storage_foramts/pandas/test_internals.py? I tested this and it works in Ray, so we shouldn't need to exclude any engine from the test.

Added, although I'm not 100% sure I matched the desired test format and standard so please let me know if any changes are needed! The new test is passing on my branch.

That said, I don't understand how to test that the test properly fails. If I revert my fix commit and run it as-is with the pytest -s option, I can see that it's starting Ray (and therefore NOT failing). The test DOES fail if I uninstall ray-core, though, so I can tell it's properly failing without the fix if Dask is the only option. Please let me know if you have any advice!

@devin-petersohn
Copy link
Collaborator

@data-makerman you can use MODIN_ENGINE=dask python -m pytest ... we have a config environment variable that should be make it easy to test.

@data-makerman
Copy link
Author

At this point the pipeline is failing on tests which don't seem to have anything to do with changes in this PR, and I'm frankly a bit at a loss as to what they might mean. I have merged the current state of the main branch of Modin into this feature branch and restarted the checks, but if they still fail I would appreciate any input on fixes!

The failures are variations on:

botocore.exceptions.ClientError: An error occurred (InvalidLocationConstraint) when calling the CreateBucket operation: The specified location-constraint is not valid

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants