From b5f50b9e65b97c197b2923939b154b9b6d09e70b Mon Sep 17 00:00:00 2001 From: Michael Akerman Date: Fri, 19 Jul 2024 16:58:53 +0200 Subject: [PATCH 1/6] FIX-#7346: Handle execution on Dask workers to avoid creating conflicting Clients Signed-off-by: Michael Akerman --- .../execution/dask/common/engine_wrapper.py | 25 ++++++++++++++++--- modin/core/execution/dask/common/utils.py | 11 ++++++++ 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/modin/core/execution/dask/common/engine_wrapper.py b/modin/core/execution/dask/common/engine_wrapper.py index c79f83e7d68..2ae7afb783b 100644 --- a/modin/core/execution/dask/common/engine_wrapper.py +++ b/modin/core/execution/dask/common/engine_wrapper.py @@ -19,6 +19,25 @@ from dask.distributed import wait from distributed import Future from distributed.client import default_client +from distributed.worker import get_worker + + +def get_dask_client(): + """ + Get the Dask client, reusing the worker's client if execution is on a Dask worker. + + Returns + ------- + distributed.Client + The Dask client. + """ + try: + client = default_client() + except ValueError: + # We ought to be in a worker process + worker = get_worker() + client = worker.client + return client def _deploy_dask_func(func, *args, return_pandas_df=None, **kwargs): # pragma: no cover @@ -83,7 +102,7 @@ def deploy( list The result of ``func`` split into parts in accordance with ``num_returns``. """ - client = default_client() + client = get_dask_client() args = [] if f_args is None else f_args kwargs = {} if f_kwargs is None else f_kwargs if callable(func): @@ -137,7 +156,7 @@ def materialize(cls, future): Any An object(s) from the distributed memory. """ - client = default_client() + client = get_dask_client() return client.gather(future) @classmethod @@ -164,7 +183,7 @@ def put(cls, data, **kwargs): # {'sep': , \ # 'delimiter': ... data = UserDict(data) - client = default_client() + client = get_dask_client() return client.scatter(data, **kwargs) @classmethod diff --git a/modin/core/execution/dask/common/utils.py b/modin/core/execution/dask/common/utils.py index 067a94fcdf0..1d8d9425733 100644 --- a/modin/core/execution/dask/common/utils.py +++ b/modin/core/execution/dask/common/utils.py @@ -30,6 +30,17 @@ def initialize_dask(): """Initialize Dask environment.""" from distributed.client import default_client + from distributed.worker import get_worker + + try: + # Check if running within a Dask worker process + get_worker() + # If the above line does not raise an error, we are in a worker process + # and should not create a new client + return + except ValueError: + # Not in a Dask worker, proceed to check for or create a client + pass try: client = default_client() From 3d17773f1061ccdd899200d6e8a2b6686d687a71 Mon Sep 17 00:00:00 2001 From: Michael Akerman Date: Mon, 22 Jul 2024 19:26:25 +0200 Subject: [PATCH 2/6] Add tests for FIX-#7346 Signed-off-by: Michael Akerman --- .../storage_formats/pandas/test_internals.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/modin/tests/core/storage_formats/pandas/test_internals.py b/modin/tests/core/storage_formats/pandas/test_internals.py index 4113f3ce0ed..2cf8d5843ae 100644 --- a/modin/tests/core/storage_formats/pandas/test_internals.py +++ b/modin/tests/core/storage_formats/pandas/test_internals.py @@ -2784,3 +2784,23 @@ def dataframe_test_default_property(df): match="> is not currently supported", ): pd.DataFrame([[1]]).dataframe_test_default_property + +def test_daemonic_worker_protection(): + # Test for issue #7346, wherein some operations on Dask cause a second submission of a task to + # the Dask client from the worker scope, which should not cause a new client to be created + + def submission_triggering_row_operation(row): + row_to_dict = row.to_dict() + dict_to_row = pd.Series(row_to_dict) + return dict_to_row + + df = pd.DataFrame( + { + "A": ["a", "b", "c", "d"], + "B": [1, 2, 3, 4], + "C": [1, 2, 3, 4], + "D": [1, 2, 3, 4], + } + ) + + df = df.apply(submission_triggering_row_operation, axis=1) \ No newline at end of file From 6eb544242aa57511083c8054911f9ce476abcd21 Mon Sep 17 00:00:00 2001 From: Michael Akerman Date: Mon, 29 Jul 2024 10:32:05 -0400 Subject: [PATCH 3/6] Update modin/tests/core/storage_formats/pandas/test_internals.py Co-authored-by: Iaroslav Igoshev --- modin/tests/core/storage_formats/pandas/test_internals.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/modin/tests/core/storage_formats/pandas/test_internals.py b/modin/tests/core/storage_formats/pandas/test_internals.py index 2cf8d5843ae..613f430adf9 100644 --- a/modin/tests/core/storage_formats/pandas/test_internals.py +++ b/modin/tests/core/storage_formats/pandas/test_internals.py @@ -2785,6 +2785,14 @@ def dataframe_test_default_property(df): ): pd.DataFrame([[1]]).dataframe_test_default_property +@pytest.mark.parametrize( + "modify_config", + [ + {Engine: "Ray"}, + {Engine: "Dask"}, + ], + indirect=True, +) def test_daemonic_worker_protection(): # Test for issue #7346, wherein some operations on Dask cause a second submission of a task to # the Dask client from the worker scope, which should not cause a new client to be created From c6eb9000cf9750e7d54d3c6b03c9453ca774a035 Mon Sep 17 00:00:00 2001 From: Michael Akerman Date: Mon, 29 Jul 2024 16:36:03 +0200 Subject: [PATCH 4/6] Fixed linting issues Signed-off-by: Michael Akerman --- modin/tests/core/storage_formats/pandas/test_internals.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/modin/tests/core/storage_formats/pandas/test_internals.py b/modin/tests/core/storage_formats/pandas/test_internals.py index 613f430adf9..99326b26ea9 100644 --- a/modin/tests/core/storage_formats/pandas/test_internals.py +++ b/modin/tests/core/storage_formats/pandas/test_internals.py @@ -2785,6 +2785,7 @@ def dataframe_test_default_property(df): ): pd.DataFrame([[1]]).dataframe_test_default_property + @pytest.mark.parametrize( "modify_config", [ @@ -2801,7 +2802,7 @@ def submission_triggering_row_operation(row): row_to_dict = row.to_dict() dict_to_row = pd.Series(row_to_dict) return dict_to_row - + df = pd.DataFrame( { "A": ["a", "b", "c", "d"], @@ -2811,4 +2812,4 @@ def submission_triggering_row_operation(row): } ) - df = df.apply(submission_triggering_row_operation, axis=1) \ No newline at end of file + df = df.apply(submission_triggering_row_operation, axis=1) From 36e10360cd75e10434dcfa2cf3249ac2a6ad0e20 Mon Sep 17 00:00:00 2001 From: Michael Akerman Date: Thu, 1 Aug 2024 20:27:42 +0200 Subject: [PATCH 5/6] Include fixture in test function for Engine selection Signed-off-by: Michael Akerman --- modin/tests/core/storage_formats/pandas/test_internals.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modin/tests/core/storage_formats/pandas/test_internals.py b/modin/tests/core/storage_formats/pandas/test_internals.py index 99326b26ea9..1d250b0b17c 100644 --- a/modin/tests/core/storage_formats/pandas/test_internals.py +++ b/modin/tests/core/storage_formats/pandas/test_internals.py @@ -2794,10 +2794,10 @@ def dataframe_test_default_property(df): ], indirect=True, ) -def test_daemonic_worker_protection(): +def test_daemonic_worker_protection(modify_config): # Test for issue #7346, wherein some operations on Dask cause a second submission of a task to # the Dask client from the worker scope, which should not cause a new client to be created - + def submission_triggering_row_operation(row): row_to_dict = row.to_dict() dict_to_row = pd.Series(row_to_dict) @@ -2812,4 +2812,4 @@ def submission_triggering_row_operation(row): } ) - df = df.apply(submission_triggering_row_operation, axis=1) + df.apply(submission_triggering_row_operation, axis=1) From 036b91ece6f2ac3fc707609fee4fa6390fdb1c83 Mon Sep 17 00:00:00 2001 From: Michael Akerman Date: Tue, 20 Aug 2024 15:19:59 +0200 Subject: [PATCH 6/6] Fix linting error due to whitespace Signed-off-by: Michael Akerman --- modin/tests/core/storage_formats/pandas/test_internals.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/tests/core/storage_formats/pandas/test_internals.py b/modin/tests/core/storage_formats/pandas/test_internals.py index 1d250b0b17c..0ebc9f4eef8 100644 --- a/modin/tests/core/storage_formats/pandas/test_internals.py +++ b/modin/tests/core/storage_formats/pandas/test_internals.py @@ -2797,7 +2797,7 @@ def dataframe_test_default_property(df): def test_daemonic_worker_protection(modify_config): # Test for issue #7346, wherein some operations on Dask cause a second submission of a task to # the Dask client from the worker scope, which should not cause a new client to be created - + def submission_triggering_row_operation(row): row_to_dict = row.to_dict() dict_to_row = pd.Series(row_to_dict)