Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add polars-cloud runs #141

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 49 additions & 6 deletions queries/polars/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,24 +130,67 @@ def run_query(query_number: int, lf: pl.LazyFrame) -> None:
new_streaming = settings.run.polars_new_streaming
eager = settings.run.polars_eager
gpu = settings.run.polars_gpu
cloud = settings.run.polars_cloud

if sum([eager, streaming, new_streaming, gpu]) > 1:
msg = "Please specify at most one of eager, streaming, new_streaming or gpu"
if sum([eager, streaming, new_streaming, gpu, cloud]) > 1:
msg = "Please specify at most one of eager, streaming, new_streaming, cloud or gpu"
raise ValueError(msg)
if settings.run.polars_show_plan:
print(lf.explain(streaming=streaming, new_streaming=new_streaming, optimized=eager))
print(
lf.explain(
streaming=streaming, new_streaming=new_streaming, optimized=eager
)
)

engine = obtain_engine_config()
# Eager load engine backend, so we don't time that.
_preload_engine(engine)
query = partial(
lf.collect, streaming=streaming, new_streaming=new_streaming, no_optimization=eager, engine=engine
)

if cloud:
import os

import polars_cloud as pc

os.environ["POLARS_SKIP_CLIENT_CHECK"] = "1"

class PatchedComputeContext(pc.ComputeContext):
def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def]
self._interactive = True
self._compute_address = "localhost:5051"
self._compute_public_key = b""

def get_status(self: pc.ComputeContext) -> pc.ComputeContextStatus:
"""Get the status of the compute cluster."""
return pc.ComputeContextStatus.RUNNING

pc.ComputeContext.__init__ = PatchedComputeContext.__init__
pc.ComputeContext.get_status = PatchedComputeContext.get_status

def query():
result = pc.spawn(lf, dst="file:///tmp/dst/", distributed=True).await_result()

if settings.run.show_results:
print(result.plan())
return (
result
.lazy()
.collect()
)
else:
query = partial(
lf.collect,
streaming=streaming,
new_streaming=new_streaming,
no_optimization=eager,
engine=engine,
)

if gpu:
library_name = f"polars-gpu-{settings.run.use_rmm_mr}"
elif eager:
library_name = "polars-eager"
elif cloud:
library_name = "polars-cloud"
else:
library_name = "polars"

Expand Down
3 changes: 3 additions & 0 deletions settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
IoType: TypeAlias = Literal["skip", "parquet", "feather", "csv"]


# Set via PATH_<NAME>
class Paths(BaseSettings):
answers: Path = Path("data/answers")
tables: Path = Path("data/tables")
Expand All @@ -21,6 +22,7 @@ class Paths(BaseSettings):
)


# Set via RUN_<NAME>
class Run(BaseSettings):
io_type: IoType = "parquet"

Expand All @@ -32,6 +34,7 @@ class Run(BaseSettings):
polars_eager: bool = False
polars_streaming: bool = False
polars_new_streaming: bool = False
polars_cloud: bool = False
polars_gpu: bool = False # Use GPU engine?
polars_gpu_device: int = 0 # The GPU device to run on for polars GPU
# Which style of GPU memory resource to use
Expand Down
Loading