Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tunable run configuration #91

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ __pycache__/
target/
Cargo.lock

# Environment
.env*

# Project
/data/tables/
plots/
/output/
187 changes: 131 additions & 56 deletions queries/common_utils.py
Original file line number Diff line number Diff line change
@@ -1,77 +1,152 @@
import os
import re
import subprocess
import sys
from pathlib import Path
from subprocess import run
from typing import Any

from linetimer import CodeTimer

INCLUDE_IO = bool(os.environ.get("INCLUDE_IO", False))
SHOW_RESULTS = bool(os.environ.get("SHOW_RESULTS", False))
LOG_TIMINGS = bool(os.environ.get("LOG_TIMINGS", False))
SCALE_FACTOR = os.environ.get("SCALE_FACTOR", "1")
WRITE_PLOT = bool(os.environ.get("WRITE_PLOT", False))
FILE_TYPE = os.environ.get("FILE_TYPE", "parquet")
SPARK_LOG_LEVEL = os.environ.get("SPARK_LOG_LEVEL", "ERROR")
from queries.settings import Library, Settings

print("include io:", INCLUDE_IO)
print("show results:", SHOW_RESULTS)
print("log timings:", LOG_TIMINGS)
print("file type:", FILE_TYPE)
settings = Settings()


CWD = Path(__file__).parent
ROOT = CWD.parent
DATASET_BASE_DIR = ROOT / "data" / "tables" / f"scale-{SCALE_FACTOR}"
ANSWERS_BASE_DIR = ROOT / "data" / "answers"
TIMINGS_FILE = ROOT / os.environ.get("TIMINGS_FILE", "timings.csv")
DEFAULT_PLOTS_DIR = ROOT / "plots"
DATASET_BASE_DIR = settings.paths.tables / f"scale-{settings.scale_factor}"
TIMINGS_FILE = "timings.csv"


def append_row(
solution: str, q: str, secs: float, version: str, success: bool = True
) -> None:
with TIMINGS_FILE.open("a") as f:
def append_row(solution: str, version: str, query_number: int, time: float) -> None:
settings.paths.timings.mkdir(exist_ok=True, parents=True)
with (settings.paths.timings / TIMINGS_FILE).open("a") as f:
if f.tell() == 0:
f.write("solution,version,query_no,duration[s],include_io,success\n")
f.write(f"{solution},{version},{q},{secs},{INCLUDE_IO},{success}\n")


def on_second_call(func: Any) -> Any:
def helper(*args: Any, **kwargs: Any) -> Any:
helper.calls += 1 # type: ignore[attr-defined]

# first call is outside the function
# this call must set the result
if helper.calls == 1: # type: ignore[attr-defined]
# include IO will compute the result on the 2nd call
if not INCLUDE_IO:
helper.result = func(*args, **kwargs) # type: ignore[attr-defined]
return helper.result # type: ignore[attr-defined]
f.write("solution,version,query_number,duration[s]\n")

# second call is in the query, now we set the result
if INCLUDE_IO and helper.calls == 2: # type: ignore[attr-defined]
helper.result = func(*args, **kwargs) # type: ignore[attr-defined]
line = ",".join([solution, version, str(query_number), str(time)]) + "\n"
f.write(line)

return helper.result # type: ignore[attr-defined]

helper.calls = 0 # type: ignore[attr-defined]
helper.result = None # type: ignore[attr-defined]
def run_all_queries_all_libs() -> None:
for lib in settings.libraries:
run_all_queries(lib)

return helper

def run_query(
lib_name: str,
query_number: int,
args: list[str] | None = None,
executable: str = sys.executable,
) -> None:
"""Run a single query for the specified library."""
module = f"queries.{lib_name}.q{query_number}"
command = [executable, "-m", module]
if args:
command += args

subprocess.run(command)


def run_all_queries(lib: Library) -> None:
print(settings.model_dump_json())

executable = _set_up_venv(lib)
args = _parameters_to_cli_args(lib.parameters)

query_numbers = get_query_numbers(lib.name)

with CodeTimer(name=f"Overall execution of ALL {lib.name} queries", unit="s"):
for i in query_numbers:
run_query(lib.name, i, args=args, executable=str(executable))


def _set_up_venv(lib: Library) -> Path:
"""Set up a virtual environment for the given library.

Returns the path to the Python executable.
"""
venv_path = _create_venv(lib)
_install_lib(lib, venv_path)
return venv_path / "bin" / "python"


def _create_venv(lib: Library) -> str:
"""Create a virtual environment for the given library.

Returns the path to the virtual environment root.
"""
venv_name = f".venv-{lib.name}"
if lib.version is not None:
venv_name += f"-{lib.version}"
venv_path = settings.paths.venvs / venv_name
subprocess.run([sys.executable, "-m", "uv", "venv", str(venv_path)])
return venv_path


def _install_lib(lib: Library, venv_path: Path) -> None:
"""Install the library in the given virtual environment."""
# Prepare environment
current_venv = os.environ.pop("VIRTUAL_ENV", None)
current_conda = os.environ.pop("CONDA_PREFIX", None)
os.environ["VIRTUAL_ENV"] = str(venv_path)

# Install
pip_spec = _get_pip_specifier(lib)
subprocess.run([sys.executable, "-m", "uv", "pip", "install", pip_spec])
# TODO: Clean up installing dependencies
subprocess.run(
[
sys.executable,
"-m",
"uv",
"pip",
"install",
"linetimer",
"pydantic",
"pydantic-settings",
]
)

# Restore environment
os.environ.pop("VIRTUAL_ENV")
if current_venv is not None:
os.environ["VIRTUAL_ENV"] = current_venv
if current_conda is not None:
os.environ["CONDA_PREFIX"] = current_conda


def _get_pip_specifier(lib: Library) -> str:
if lib.version is None:
return lib.name
else:
return f"{lib.name}=={lib.version}"


def _parameters_to_cli_args(params: dict[str, Any] | None) -> list[str]:
if params is None:
return []
args = []
for name, value in params.items():
name = name.replace("_", "-")
if value is True:
command = f"--{name}"
elif value is False:
command = f"--no-{name}"
else:
command = f"--{name}={value}"
args.append(command)
return args


def get_query_numbers(library_name: str) -> list[int]:
"""Get the query numbers that are implemented for the given library."""
query_numbers = []

path = Path(__file__).parent / library_name
expr = re.compile(r"q(\d+).py$")

def execute_all(solution: str) -> None:
package_name = f"{solution}"
for file in path.iterdir():
match = expr.search(str(file))
if match is not None:
query_numbers.append(int(match.group(1)))

expr = re.compile(r"q(\d+).py$")
num_queries = 0
for file in (CWD / package_name).iterdir():
g = expr.search(str(file))
if g is not None:
num_queries = max(int(g.group(1)), num_queries)

with CodeTimer(name=f"Overall execution of ALL {solution} queries", unit="s"):
for i in range(1, num_queries + 1):
run([sys.executable, "-m", f"queries.{package_name}.q{i}"])
return sorted(query_numbers)
19 changes: 9 additions & 10 deletions queries/duckdb/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@
from polars.testing import assert_frame_equal

from queries.common_utils import (
ANSWERS_BASE_DIR,
DATASET_BASE_DIR,
FILE_TYPE,
INCLUDE_IO,
LOG_TIMINGS,
SHOW_RESULTS,
append_row,
settings,
)


Expand All @@ -40,15 +39,15 @@ def _scan_ds(path: Path) -> str:
return path_str


def get_query_answer(query: int, base_dir: Path = ANSWERS_BASE_DIR) -> pl.LazyFrame:
path = base_dir / f"q{query}.parquet"
return pl.scan_parquet(path)
def test_results(query_number: int, result: pl.DataFrame) -> None:
answer = _get_query_answer(query_number)
assert_frame_equal(result, answer, check_dtype=False)


def test_results(q_num: int, result_df: pl.DataFrame) -> None:
with CodeTimer(name=f"Testing result of duckdb Query {q_num}", unit="s"):
answer = get_query_answer(q_num).collect()
assert_frame_equal(left=result_df, right=answer, check_dtype=False)
def _get_query_answer(query_number: int) -> pl.DataFrame:
file_name = f"q{query_number}.parquet"
file_path = settings.paths.answers / file_name
return pl.read_parquet(file_path)


def get_line_item_ds(base_dir: Path = DATASET_BASE_DIR) -> str:
Expand Down Expand Up @@ -100,7 +99,7 @@ def query() -> None:
else:
test_results(q_num, result)

if SHOW_RESULTS:
if settings.print_query_output:
print(result)

query()
48 changes: 23 additions & 25 deletions queries/pandas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,21 @@
import pandas as pd
from linetimer import CodeTimer, linetimer
from pandas.api.types import is_string_dtype
from pandas.core.frame import DataFrame as PandasDF
from pandas.testing import assert_series_equal

from queries.common_utils import (
ANSWERS_BASE_DIR,
DATASET_BASE_DIR,
FILE_TYPE,
LOG_TIMINGS,
SHOW_RESULTS,
append_row,
on_second_call,
settings,
)

pd.options.mode.copy_on_write = True


def _read_ds(path: Path) -> PandasDF:
def _read_ds(path: Path) -> pd.DataFrame:
path_str = f"{path}.{FILE_TYPE}"
if FILE_TYPE == "parquet":
return pd.read_parquet(path_str, dtype_backend="pyarrow")
Expand All @@ -33,62 +31,62 @@ def _read_ds(path: Path) -> PandasDF:
raise ValueError(msg)


def get_query_answer(query: int, base_dir: Path = ANSWERS_BASE_DIR) -> PandasDF:
path = base_dir / f"q{query}.parquet"
return pd.read_parquet(path, dtype_backend="pyarrow")
def test_results(query_number: int, result_df: pd.DataFrame) -> None:
answer = _get_query_answer(query_number)

for c, t in answer.dtypes.items():
s1 = result_df[c]
s2 = answer[c]

def test_results(q_num: int, result_df: PandasDF) -> None:
with CodeTimer(name=f"Testing result of pandas Query {q_num}", unit="s"):
answer = get_query_answer(q_num)
if is_string_dtype(t):
s1 = s1.apply(lambda x: x.strip())

for c, t in answer.dtypes.items():
s1 = result_df[c]
s2 = answer[c]
assert_series_equal(left=s1, right=s2, check_index=False, check_dtype=False)

if is_string_dtype(t):
s1 = s1.apply(lambda x: x.strip())

assert_series_equal(left=s1, right=s2, check_index=False, check_dtype=False)
def _get_query_answer(query_number: int) -> pd.DataFrame:
file_name = f"q{query_number}.parquet"
file_path = settings.paths.answers / file_name
return pd.read_parquet(file_path, dtype_backend="pyarrow")


@on_second_call
def get_line_item_ds(base_dir: Path = DATASET_BASE_DIR) -> PandasDF:
def get_line_item_ds(base_dir: Path = DATASET_BASE_DIR) -> pd.DataFrame:
return _read_ds(base_dir / "lineitem")


@on_second_call
def get_orders_ds(base_dir: Path = DATASET_BASE_DIR) -> PandasDF:
def get_orders_ds(base_dir: Path = DATASET_BASE_DIR) -> pd.DataFrame:
return _read_ds(base_dir / "orders")


@on_second_call
def get_customer_ds(base_dir: Path = DATASET_BASE_DIR) -> PandasDF:
def get_customer_ds(base_dir: Path = DATASET_BASE_DIR) -> pd.DataFrame:
return _read_ds(base_dir / "customer")


@on_second_call
def get_region_ds(base_dir: Path = DATASET_BASE_DIR) -> PandasDF:
def get_region_ds(base_dir: Path = DATASET_BASE_DIR) -> pd.DataFrame:
return _read_ds(base_dir / "region")


@on_second_call
def get_nation_ds(base_dir: Path = DATASET_BASE_DIR) -> PandasDF:
def get_nation_ds(base_dir: Path = DATASET_BASE_DIR) -> pd.DataFrame:
return _read_ds(base_dir / "nation")


@on_second_call
def get_supplier_ds(base_dir: Path = DATASET_BASE_DIR) -> PandasDF:
def get_supplier_ds(base_dir: Path = DATASET_BASE_DIR) -> pd.DataFrame:
return _read_ds(base_dir / "supplier")


@on_second_call
def get_part_ds(base_dir: Path = DATASET_BASE_DIR) -> PandasDF:
def get_part_ds(base_dir: Path = DATASET_BASE_DIR) -> pd.DataFrame:
return _read_ds(base_dir / "part")


@on_second_call
def get_part_supp_ds(base_dir: Path = DATASET_BASE_DIR) -> PandasDF:
def get_part_supp_ds(base_dir: Path = DATASET_BASE_DIR) -> pd.DataFrame:
return _read_ds(base_dir / "partsupp")


Expand All @@ -107,7 +105,7 @@ def run() -> None:
else:
test_results(q_num, result)

if SHOW_RESULTS:
if settings.print_query_output:
print(result)

run()
6 changes: 6 additions & 0 deletions queries/polars/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from queries.common_utils import run_all_queries
from queries.polars.utils import parse_lib_settings

if __name__ == "__main__":
lib = parse_lib_settings()
run_all_queries(lib)
4 changes: 0 additions & 4 deletions queries/polars/executor.py

This file was deleted.

Loading
Loading