Skip to content

Commit

Permalink
Revert "Register serialization at import"
Browse files Browse the repository at this point in the history
This reverts commit e47cd6f.
  • Loading branch information
pentschev committed Feb 20, 2025
1 parent 54f7dcb commit 6558690
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions python/cudf_polars/cudf_polars/experimental/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,6 @@
from cudf_polars.experimental.dispatch import LowerIRTransformer


try:
from cudf_polars.experimental.dask_serialize import register

register()
except ImportError:
# The dask_serialize module has the necessary Distributed imports, if Distributed
# is not installed that will raise ImportError.
pass


@lower_ir_node.register(IR)
def _(ir: IR, rec: LowerIRTransformer) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
# Default logic - Requires single partition
Expand Down Expand Up @@ -137,16 +127,30 @@ def task_graph(
return graph, (key_name, 0)


def _register_serialize() -> None:
"""Register Dask/cudf-polars serializers in calling process."""
from cudf_polars.experimental.dask_serialize import register

register()


def get_client():
"""Get appropriate Dask client or scheduler."""
_register_serialize()

try: # pragma: no cover; block depends on executor type and Distributed cluster
from distributed import get_client

return get_client().get
client = get_client()
except (ImportError, ValueError):
from dask import get

return get
else:
client.run(_register_serialize)
client.run_on_scheduler(_register_serialize)

return client.get


def evaluate_dask(ir: IR) -> DataFrame:
Expand Down

0 comments on commit 6558690

Please sign in to comment.