diff --git a/queries/polars/utils.py b/queries/polars/utils.py index 200dfef..bbc4910 100644 --- a/queries/polars/utils.py +++ b/queries/polars/utils.py @@ -130,24 +130,67 @@ def run_query(query_number: int, lf: pl.LazyFrame) -> None: new_streaming = settings.run.polars_new_streaming eager = settings.run.polars_eager gpu = settings.run.polars_gpu + cloud = settings.run.polars_cloud - if sum([eager, streaming, new_streaming, gpu]) > 1: - msg = "Please specify at most one of eager, streaming, new_streaming or gpu" + if sum([eager, streaming, new_streaming, gpu, cloud]) > 1: + msg = "Please specify at most one of eager, streaming, new_streaming, cloud or gpu" raise ValueError(msg) if settings.run.polars_show_plan: - print(lf.explain(streaming=streaming, new_streaming=new_streaming, optimized=eager)) + print( + lf.explain( + streaming=streaming, new_streaming=new_streaming, optimized=eager + ) + ) engine = obtain_engine_config() # Eager load engine backend, so we don't time that. _preload_engine(engine) - query = partial( - lf.collect, streaming=streaming, new_streaming=new_streaming, no_optimization=eager, engine=engine - ) + + if cloud: + import os + + import polars_cloud as pc + + os.environ["POLARS_SKIP_CLIENT_CHECK"] = "1" + + class PatchedComputeContext(pc.ComputeContext): + def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] + self._interactive = True + self._compute_address = "localhost:5051" + self._compute_public_key = b"" + + def get_status(self: pc.ComputeContext) -> pc.ComputeContextStatus: + """Get the status of the compute cluster.""" + return pc.ComputeContextStatus.RUNNING + + pc.ComputeContext.__init__ = PatchedComputeContext.__init__ + pc.ComputeContext.get_status = PatchedComputeContext.get_status + + def query(): + result = pc.spawn(lf, dst="file:///tmp/dst/", distributed=True).await_result() + + if settings.run.show_results: + print(result.plan()) + return ( + result + .lazy() + .collect() + ) + else: + query = partial( + lf.collect, + streaming=streaming, + new_streaming=new_streaming, + no_optimization=eager, + engine=engine, + ) if gpu: library_name = f"polars-gpu-{settings.run.use_rmm_mr}" elif eager: library_name = "polars-eager" + elif cloud: + library_name = "polars-cloud" else: library_name = "polars" diff --git a/settings.py b/settings.py index c6e142e..a526bf4 100644 --- a/settings.py +++ b/settings.py @@ -7,6 +7,7 @@ IoType: TypeAlias = Literal["skip", "parquet", "feather", "csv"] +# Set via PATH_ class Paths(BaseSettings): answers: Path = Path("data/answers") tables: Path = Path("data/tables") @@ -21,6 +22,7 @@ class Paths(BaseSettings): ) +# Set via RUN_ class Run(BaseSettings): io_type: IoType = "parquet" @@ -32,6 +34,7 @@ class Run(BaseSettings): polars_eager: bool = False polars_streaming: bool = False polars_new_streaming: bool = False + polars_cloud: bool = False polars_gpu: bool = False # Use GPU engine? polars_gpu_device: int = 0 # The GPU device to run on for polars GPU # Which style of GPU memory resource to use