From 655869015ef461e923d2fbc3b5bd30ca2c1e562b Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 20 Feb 2025 13:20:29 -0800 Subject: [PATCH] Revert "Register serialization at import" This reverts commit e47cd6f84f5b06f57038ba929031020be1efa475. --- .../cudf_polars/experimental/parallel.py | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/python/cudf_polars/cudf_polars/experimental/parallel.py b/python/cudf_polars/cudf_polars/experimental/parallel.py index a3191c49094..51cdf9b765a 100644 --- a/python/cudf_polars/cudf_polars/experimental/parallel.py +++ b/python/cudf_polars/cudf_polars/experimental/parallel.py @@ -28,16 +28,6 @@ from cudf_polars.experimental.dispatch import LowerIRTransformer -try: - from cudf_polars.experimental.dask_serialize import register - - register() -except ImportError: - # The dask_serialize module has the necessary Distributed imports, if Distributed - # is not installed that will raise ImportError. - pass - - @lower_ir_node.register(IR) def _(ir: IR, rec: LowerIRTransformer) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: # Default logic - Requires single partition @@ -137,16 +127,30 @@ def task_graph( return graph, (key_name, 0) +def _register_serialize() -> None: + """Register Dask/cudf-polars serializers in calling process.""" + from cudf_polars.experimental.dask_serialize import register + + register() + + def get_client(): """Get appropriate Dask client or scheduler.""" + _register_serialize() + try: # pragma: no cover; block depends on executor type and Distributed cluster from distributed import get_client - return get_client().get + client = get_client() except (ImportError, ValueError): from dask import get return get + else: + client.run(_register_serialize) + client.run_on_scheduler(_register_serialize) + + return client.get def evaluate_dask(ir: IR) -> DataFrame: