From f4bc6f93fa29e3fee9d5c3a05729815940a250c9 Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Mon, 4 Mar 2024 11:47:17 +0100 Subject: [PATCH 1/9] Ignore .env file --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 64f56db..89e9750 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,9 @@ __pycache__/ target/ Cargo.lock +# Environment +.env* + # Project /data/tables/ plots/ From 957169450cd53141fd0a85d86103707ac0bb4946 Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Mon, 4 Mar 2024 12:17:24 +0100 Subject: [PATCH 2/9] Add pydantic settings --- queries/settings.py | 45 +++++++++++++++++++++++++++++++++++++++++++++ requirements.in | 2 ++ 2 files changed, 47 insertions(+) create mode 100644 queries/settings.py diff --git a/queries/settings.py b/queries/settings.py new file mode 100644 index 0000000..ed689f4 --- /dev/null +++ b/queries/settings.py @@ -0,0 +1,45 @@ +from pathlib import Path +from typing import Any, Literal + +from pydantic import BaseModel +from pydantic_settings import BaseSettings, SettingsConfigDict + +type FILE_FORMAT = Literal["skip", "parquet", "feather"] + + +class Library(BaseModel): + name: str + version: str = "latest" + parameters: dict[str, Any] = {} + + +class Paths(BaseSettings): + dataset: Path = Path("data/tables") + answers: Path = Path("data/answers") + + timings: Path | None = None + plots: Path | None = None + + model_config = SettingsConfigDict( + env_prefix="tpch_path_", env_file=".env", extra="ignore" + ) + + +class Settings(BaseSettings): + scale_factor: int = 1 + file_formats: set[FILE_FORMAT] = {"skip"} + + print_query_output: bool = False + + paths: Paths = Paths() + + libraries: list[Library] = [ + Library(name="polars"), + Library(name="duckdb"), + Library(name="pandas"), + Library(name="pyspark"), + ] + + model_config = SettingsConfigDict( + env_prefix="tpch_", env_file=".env", extra="ignore" + ) diff --git a/requirements.in b/requirements.in index 890b244..aaa514c 100644 --- a/requirements.in +++ b/requirements.in @@ -10,3 +10,5 @@ setuptools # Required by pyspark linetimer plotnine plotly +pydantic +pydantic-settings From 9ed09c514924a1d3e5c30ed0fabdcd4c404e85cd Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Mon, 4 Mar 2024 13:42:16 +0100 Subject: [PATCH 3/9] Remove spark log level env var --- queries/common_utils.py | 1 - queries/pyspark/utils.py | 5 +---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/queries/common_utils.py b/queries/common_utils.py index 9c180e6..7efb243 100644 --- a/queries/common_utils.py +++ b/queries/common_utils.py @@ -13,7 +13,6 @@ SCALE_FACTOR = os.environ.get("SCALE_FACTOR", "1") WRITE_PLOT = bool(os.environ.get("WRITE_PLOT", False)) FILE_TYPE = os.environ.get("FILE_TYPE", "parquet") -SPARK_LOG_LEVEL = os.environ.get("SPARK_LOG_LEVEL", "ERROR") print("include io:", INCLUDE_IO) print("show results:", SHOW_RESULTS) diff --git a/queries/pyspark/utils.py b/queries/pyspark/utils.py index a5657c5..2446840 100644 --- a/queries/pyspark/utils.py +++ b/queries/pyspark/utils.py @@ -11,19 +11,16 @@ DATASET_BASE_DIR, LOG_TIMINGS, SHOW_RESULTS, - SPARK_LOG_LEVEL, append_row, on_second_call, ) -print("SPARK_LOG_LEVEL:", SPARK_LOG_LEVEL) - def get_or_create_spark() -> SparkSession: spark = ( SparkSession.builder.appName("spark_queries").master("local[*]").getOrCreate() ) - spark.sparkContext.setLogLevel(SPARK_LOG_LEVEL) + spark.sparkContext.setLogLevel("ERROR") return spark From 08c8dd4b030ff44a3fe6ded262ab7605850b511c Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Mon, 4 Mar 2024 14:05:52 +0100 Subject: [PATCH 4/9] Replace SHOW_RESULTS with settings --- queries/common_utils.py | 7 +++++-- queries/duckdb/utils.py | 4 ++-- queries/pandas/utils.py | 4 ++-- queries/polars/utils.py | 4 ++-- queries/pyspark/utils.py | 4 ++-- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/queries/common_utils.py b/queries/common_utils.py index 7efb243..1a34abe 100644 --- a/queries/common_utils.py +++ b/queries/common_utils.py @@ -7,15 +7,18 @@ from linetimer import CodeTimer +from queries.settings import Settings + +settings = Settings() +print(settings.model_dump_json()) + INCLUDE_IO = bool(os.environ.get("INCLUDE_IO", False)) -SHOW_RESULTS = bool(os.environ.get("SHOW_RESULTS", False)) LOG_TIMINGS = bool(os.environ.get("LOG_TIMINGS", False)) SCALE_FACTOR = os.environ.get("SCALE_FACTOR", "1") WRITE_PLOT = bool(os.environ.get("WRITE_PLOT", False)) FILE_TYPE = os.environ.get("FILE_TYPE", "parquet") print("include io:", INCLUDE_IO) -print("show results:", SHOW_RESULTS) print("log timings:", LOG_TIMINGS) print("file type:", FILE_TYPE) diff --git a/queries/duckdb/utils.py b/queries/duckdb/utils.py index 539d082..c9e1cb2 100644 --- a/queries/duckdb/utils.py +++ b/queries/duckdb/utils.py @@ -14,8 +14,8 @@ FILE_TYPE, INCLUDE_IO, LOG_TIMINGS, - SHOW_RESULTS, append_row, + settings, ) @@ -100,7 +100,7 @@ def query() -> None: else: test_results(q_num, result) - if SHOW_RESULTS: + if settings.print_query_output: print(result) query() diff --git a/queries/pandas/utils.py b/queries/pandas/utils.py index 7b790ad..862f924 100644 --- a/queries/pandas/utils.py +++ b/queries/pandas/utils.py @@ -14,9 +14,9 @@ DATASET_BASE_DIR, FILE_TYPE, LOG_TIMINGS, - SHOW_RESULTS, append_row, on_second_call, + settings, ) pd.options.mode.copy_on_write = True @@ -107,7 +107,7 @@ def run() -> None: else: test_results(q_num, result) - if SHOW_RESULTS: + if settings.print_query_output: print(result) run() diff --git a/queries/polars/utils.py b/queries/polars/utils.py index f375d56..9015af4 100644 --- a/queries/polars/utils.py +++ b/queries/polars/utils.py @@ -12,8 +12,8 @@ FILE_TYPE, INCLUDE_IO, LOG_TIMINGS, - SHOW_RESULTS, append_row, + settings, ) SHOW_PLAN = bool(os.environ.get("SHOW_PLAN", False)) @@ -95,7 +95,7 @@ def query() -> None: else: test_results(q_num, result) - if SHOW_RESULTS: + if settings.print_query_output: print(result) query() diff --git a/queries/pyspark/utils.py b/queries/pyspark/utils.py index 2446840..fa41b4c 100644 --- a/queries/pyspark/utils.py +++ b/queries/pyspark/utils.py @@ -10,9 +10,9 @@ ANSWERS_BASE_DIR, DATASET_BASE_DIR, LOG_TIMINGS, - SHOW_RESULTS, append_row, on_second_call, + settings, ) @@ -126,7 +126,7 @@ def run() -> None: else: test_results(q_num, pdf) - if SHOW_RESULTS: + if settings.print_query_output: print(pdf) run() From 5e23fdeb8dcc17844fb48f5c665177786b037657 Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Mon, 4 Mar 2024 14:28:46 +0100 Subject: [PATCH 5/9] scale factor --- queries/common_utils.py | 3 +-- run.sh | 6 +++--- scripts/prepare_data.py | 5 ++--- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/queries/common_utils.py b/queries/common_utils.py index 1a34abe..997baa6 100644 --- a/queries/common_utils.py +++ b/queries/common_utils.py @@ -14,7 +14,6 @@ INCLUDE_IO = bool(os.environ.get("INCLUDE_IO", False)) LOG_TIMINGS = bool(os.environ.get("LOG_TIMINGS", False)) -SCALE_FACTOR = os.environ.get("SCALE_FACTOR", "1") WRITE_PLOT = bool(os.environ.get("WRITE_PLOT", False)) FILE_TYPE = os.environ.get("FILE_TYPE", "parquet") @@ -25,7 +24,7 @@ CWD = Path(__file__).parent ROOT = CWD.parent -DATASET_BASE_DIR = ROOT / "data" / "tables" / f"scale-{SCALE_FACTOR}" +DATASET_BASE_DIR = ROOT / "data" / "tables" / f"scale-{settings.scale_factor}" ANSWERS_BASE_DIR = ROOT / "data" / "answers" TIMINGS_FILE = ROOT / os.environ.get("TIMINGS_FILE", "timings.csv") DEFAULT_PLOTS_DIR = ROOT / "plots" diff --git a/run.sh b/run.sh index 3e06a04..16f5c9d 100755 --- a/run.sh +++ b/run.sh @@ -1,6 +1,6 @@ -export LOG_TIMINGS=1 -export WRITE_PLOT=1 -export SCALE_FACTOR=1 +export TPCH_PATH_TIMINGS=results/timings.csv +export TPCH_PATH_PLOTS=results/plots +export TPCH_SCALE_FACTOR=1 echo run with cached IO make tables diff --git a/scripts/prepare_data.py b/scripts/prepare_data.py index c63c5c8..f44b8f6 100644 --- a/scripts/prepare_data.py +++ b/scripts/prepare_data.py @@ -1,12 +1,11 @@ -import sys from pathlib import Path import polars as pl -SCALE_FACTOR = int(sys.argv[1]) +from queries.common_utils import settings ROOT_DIR = Path(__file__).parent.parent -TABLES_DIR = ROOT_DIR / "data" / "tables" / f"scale-{SCALE_FACTOR}" +TABLES_DIR = ROOT_DIR / "data" / "tables" / f"scale-{settings.scale_factor}" table_columns = { "customer": [ From c921448fa8948a73ed2b2f1ac5c523cda54d23f0 Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Wed, 6 Mar 2024 17:08:00 +0100 Subject: [PATCH 6/9] WIP --- .gitignore | 2 +- queries/common_utils.py | 103 +++++++++++++++-------------- queries/duckdb/utils.py | 15 ++--- queries/pandas/utils.py | 44 ++++++------- queries/polars/executor.py | 4 +- queries/polars/utils.py | 128 +++++++++++++++++++------------------ queries/pyspark/utils.py | 40 ++++++------ queries/settings.py | 2 +- scripts/plot_results.py | 9 +-- scripts/prepare_data.py | 2 +- 10 files changed, 171 insertions(+), 178 deletions(-) diff --git a/.gitignore b/.gitignore index 89e9750..dcedc2b 100644 --- a/.gitignore +++ b/.gitignore @@ -25,4 +25,4 @@ Cargo.lock # Project /data/tables/ -plots/ +/output/ diff --git a/queries/common_utils.py b/queries/common_utils.py index 997baa6..8c67556 100644 --- a/queries/common_utils.py +++ b/queries/common_utils.py @@ -1,78 +1,77 @@ -import os import re +import subprocess import sys from pathlib import Path -from subprocess import run -from typing import Any from linetimer import CodeTimer -from queries.settings import Settings +from queries.settings import Library, Settings settings = Settings() print(settings.model_dump_json()) -INCLUDE_IO = bool(os.environ.get("INCLUDE_IO", False)) -LOG_TIMINGS = bool(os.environ.get("LOG_TIMINGS", False)) -WRITE_PLOT = bool(os.environ.get("WRITE_PLOT", False)) -FILE_TYPE = os.environ.get("FILE_TYPE", "parquet") -print("include io:", INCLUDE_IO) -print("log timings:", LOG_TIMINGS) -print("file type:", FILE_TYPE) +DATASET_BASE_DIR = settings.paths.tables / f"scale-{settings.scale_factor}" +TIMINGS_FILE = "timings.csv" -CWD = Path(__file__).parent -ROOT = CWD.parent -DATASET_BASE_DIR = ROOT / "data" / "tables" / f"scale-{settings.scale_factor}" -ANSWERS_BASE_DIR = ROOT / "data" / "answers" -TIMINGS_FILE = ROOT / os.environ.get("TIMINGS_FILE", "timings.csv") -DEFAULT_PLOTS_DIR = ROOT / "plots" +def append_row(solution: str, version: str, query_number: int, time: float) -> None: + settings.paths.timings.mkdir(exist_ok=True, parents=True) + with (settings.paths.timings / TIMINGS_FILE).open("a") as f: + if f.tell() == 0: + f.write("solution,version,query_number,duration[s]\n") + line = ",".join([solution, version, str(query_number), str(time)]) + "\n" + f.write(line) -def append_row( - solution: str, q: str, secs: float, version: str, success: bool = True -) -> None: - with TIMINGS_FILE.open("a") as f: - if f.tell() == 0: - f.write("solution,version,query_no,duration[s],include_io,success\n") - f.write(f"{solution},{version},{q},{secs},{INCLUDE_IO},{success}\n") +# def on_second_call(func: Any) -> Any: +# def helper(*args: Any, **kwargs: Any) -> Any: +# helper.calls += 1 # type: ignore[attr-defined] + +# # first call is outside the function +# # this call must set the result +# if helper.calls == 1: # type: ignore[attr-defined] +# # include IO will compute the result on the 2nd call +# if not INCLUDE_IO: +# helper.result = func(*args, **kwargs) # type: ignore[attr-defined] +# return helper.result # type: ignore[attr-defined] + +# # second call is in the query, now we set the result +# if INCLUDE_IO and helper.calls == 2: # type: ignore[attr-defined] +# helper.result = func(*args, **kwargs) # type: ignore[attr-defined] -def on_second_call(func: Any) -> Any: - def helper(*args: Any, **kwargs: Any) -> Any: - helper.calls += 1 # type: ignore[attr-defined] +# return helper.result # type: ignore[attr-defined] - # first call is outside the function - # this call must set the result - if helper.calls == 1: # type: ignore[attr-defined] - # include IO will compute the result on the 2nd call - if not INCLUDE_IO: - helper.result = func(*args, **kwargs) # type: ignore[attr-defined] - return helper.result # type: ignore[attr-defined] +# helper.calls = 0 # type: ignore[attr-defined] +# helper.result = None # type: ignore[attr-defined] - # second call is in the query, now we set the result - if INCLUDE_IO and helper.calls == 2: # type: ignore[attr-defined] - helper.result = func(*args, **kwargs) # type: ignore[attr-defined] +# return helper - return helper.result # type: ignore[attr-defined] - helper.calls = 0 # type: ignore[attr-defined] - helper.result = None # type: ignore[attr-defined] +def execute_all(library: Library) -> None: + """Run all queries for the given library.""" + query_numbers = get_query_numbers(library.name) - return helper + with CodeTimer(name=f"Overall execution of ALL {library.name} queries", unit="s"): + for i in query_numbers: + run_query(library.name, i) -def execute_all(solution: str) -> None: - package_name = f"{solution}" +def get_query_numbers(library_name: str) -> list[int]: + """Get the query numbers that are implemented for the given library.""" + query_numbers = [] + path = Path(__file__).parent / library_name expr = re.compile(r"q(\d+).py$") - num_queries = 0 - for file in (CWD / package_name).iterdir(): - g = expr.search(str(file)) - if g is not None: - num_queries = max(int(g.group(1)), num_queries) - - with CodeTimer(name=f"Overall execution of ALL {solution} queries", unit="s"): - for i in range(1, num_queries + 1): - run([sys.executable, "-m", f"queries.{package_name}.q{i}"]) + + for file in path.iterdir(): + match = expr.search(str(file)) + if match is not None: + query_numbers.append(int(match.group(1))) + + return sorted(query_numbers) + + +def run_query(library_name: str, query_number: int) -> None: + subprocess.run([sys.executable, "-m", f"queries.{library_name}.q{query_number}"]) diff --git a/queries/duckdb/utils.py b/queries/duckdb/utils.py index c9e1cb2..764314a 100644 --- a/queries/duckdb/utils.py +++ b/queries/duckdb/utils.py @@ -9,7 +9,6 @@ from polars.testing import assert_frame_equal from queries.common_utils import ( - ANSWERS_BASE_DIR, DATASET_BASE_DIR, FILE_TYPE, INCLUDE_IO, @@ -40,15 +39,15 @@ def _scan_ds(path: Path) -> str: return path_str -def get_query_answer(query: int, base_dir: Path = ANSWERS_BASE_DIR) -> pl.LazyFrame: - path = base_dir / f"q{query}.parquet" - return pl.scan_parquet(path) +def test_results(query_number: int, result: pl.DataFrame) -> None: + answer = _get_query_answer(query_number) + assert_frame_equal(result, answer, check_dtype=False) -def test_results(q_num: int, result_df: pl.DataFrame) -> None: - with CodeTimer(name=f"Testing result of duckdb Query {q_num}", unit="s"): - answer = get_query_answer(q_num).collect() - assert_frame_equal(left=result_df, right=answer, check_dtype=False) +def _get_query_answer(query_number: int) -> pl.DataFrame: + file_name = f"q{query_number}.parquet" + file_path = settings.paths.answers / file_name + return pl.read_parquet(file_path) def get_line_item_ds(base_dir: Path = DATASET_BASE_DIR) -> str: diff --git a/queries/pandas/utils.py b/queries/pandas/utils.py index 862f924..f6c8dc9 100644 --- a/queries/pandas/utils.py +++ b/queries/pandas/utils.py @@ -6,11 +6,9 @@ import pandas as pd from linetimer import CodeTimer, linetimer from pandas.api.types import is_string_dtype -from pandas.core.frame import DataFrame as PandasDF from pandas.testing import assert_series_equal from queries.common_utils import ( - ANSWERS_BASE_DIR, DATASET_BASE_DIR, FILE_TYPE, LOG_TIMINGS, @@ -22,7 +20,7 @@ pd.options.mode.copy_on_write = True -def _read_ds(path: Path) -> PandasDF: +def _read_ds(path: Path) -> pd.DataFrame: path_str = f"{path}.{FILE_TYPE}" if FILE_TYPE == "parquet": return pd.read_parquet(path_str, dtype_backend="pyarrow") @@ -33,62 +31,62 @@ def _read_ds(path: Path) -> PandasDF: raise ValueError(msg) -def get_query_answer(query: int, base_dir: Path = ANSWERS_BASE_DIR) -> PandasDF: - path = base_dir / f"q{query}.parquet" - return pd.read_parquet(path, dtype_backend="pyarrow") +def test_results(query_number: int, result_df: pd.DataFrame) -> None: + answer = _get_query_answer(query_number) + for c, t in answer.dtypes.items(): + s1 = result_df[c] + s2 = answer[c] -def test_results(q_num: int, result_df: PandasDF) -> None: - with CodeTimer(name=f"Testing result of pandas Query {q_num}", unit="s"): - answer = get_query_answer(q_num) + if is_string_dtype(t): + s1 = s1.apply(lambda x: x.strip()) - for c, t in answer.dtypes.items(): - s1 = result_df[c] - s2 = answer[c] + assert_series_equal(left=s1, right=s2, check_index=False, check_dtype=False) - if is_string_dtype(t): - s1 = s1.apply(lambda x: x.strip()) - assert_series_equal(left=s1, right=s2, check_index=False, check_dtype=False) +def _get_query_answer(query_number: int) -> pd.DataFrame: + file_name = f"q{query_number}.parquet" + file_path = settings.paths.answers / file_name + return pd.read_parquet(file_path, dtype_backend="pyarrow") @on_second_call -def get_line_item_ds(base_dir: Path = DATASET_BASE_DIR) -> PandasDF: +def get_line_item_ds(base_dir: Path = DATASET_BASE_DIR) -> pd.DataFrame: return _read_ds(base_dir / "lineitem") @on_second_call -def get_orders_ds(base_dir: Path = DATASET_BASE_DIR) -> PandasDF: +def get_orders_ds(base_dir: Path = DATASET_BASE_DIR) -> pd.DataFrame: return _read_ds(base_dir / "orders") @on_second_call -def get_customer_ds(base_dir: Path = DATASET_BASE_DIR) -> PandasDF: +def get_customer_ds(base_dir: Path = DATASET_BASE_DIR) -> pd.DataFrame: return _read_ds(base_dir / "customer") @on_second_call -def get_region_ds(base_dir: Path = DATASET_BASE_DIR) -> PandasDF: +def get_region_ds(base_dir: Path = DATASET_BASE_DIR) -> pd.DataFrame: return _read_ds(base_dir / "region") @on_second_call -def get_nation_ds(base_dir: Path = DATASET_BASE_DIR) -> PandasDF: +def get_nation_ds(base_dir: Path = DATASET_BASE_DIR) -> pd.DataFrame: return _read_ds(base_dir / "nation") @on_second_call -def get_supplier_ds(base_dir: Path = DATASET_BASE_DIR) -> PandasDF: +def get_supplier_ds(base_dir: Path = DATASET_BASE_DIR) -> pd.DataFrame: return _read_ds(base_dir / "supplier") @on_second_call -def get_part_ds(base_dir: Path = DATASET_BASE_DIR) -> PandasDF: +def get_part_ds(base_dir: Path = DATASET_BASE_DIR) -> pd.DataFrame: return _read_ds(base_dir / "part") @on_second_call -def get_part_supp_ds(base_dir: Path = DATASET_BASE_DIR) -> PandasDF: +def get_part_supp_ds(base_dir: Path = DATASET_BASE_DIR) -> pd.DataFrame: return _read_ds(base_dir / "partsupp") diff --git a/queries/polars/executor.py b/queries/polars/executor.py index 0fc97f5..ad083df 100644 --- a/queries/polars/executor.py +++ b/queries/polars/executor.py @@ -1,4 +1,6 @@ from queries.common_utils import execute_all +from queries.settings import Library if __name__ == "__main__": - execute_all("polars") + lib = Library(name="polars") + execute_all(lib) diff --git a/queries/polars/utils.py b/queries/polars/utils.py index 9015af4..5bb695f 100644 --- a/queries/polars/utils.py +++ b/queries/polars/utils.py @@ -1,101 +1,103 @@ -import os -import timeit -from pathlib import Path - import polars as pl -from linetimer import CodeTimer, linetimer +from linetimer import CodeTimer from polars.testing import assert_frame_equal from queries.common_utils import ( - ANSWERS_BASE_DIR, DATASET_BASE_DIR, - FILE_TYPE, - INCLUDE_IO, - LOG_TIMINGS, append_row, settings, ) +from queries.settings import FILE_FORMAT -SHOW_PLAN = bool(os.environ.get("SHOW_PLAN", False)) -STREAMING = bool(os.environ.get("STREAMING", False)) +def check_result(result: pl.DataFrame, query_number: int) -> None: + """Check the result of a query against the pre-defined answers.""" + answer = _get_query_answer(query_number) + assert_frame_equal(result, answer, check_dtype=False) -def _scan_ds(path: Path) -> pl.LazyFrame: - path_str = f"{path}.{FILE_TYPE}" - if FILE_TYPE == "parquet": - scan = pl.scan_parquet(path_str) - elif FILE_TYPE == "feather": - scan = pl.scan_ipc(path_str) - else: - msg = f"file type: {FILE_TYPE} not expected" - raise ValueError(msg) - if INCLUDE_IO: - return scan - return scan.collect().rechunk().lazy() + +def _get_query_answer(query_number: int) -> pl.DataFrame: + """Load the query answer from a Parquet file.""" + file_name = f"q{query_number}.parquet" + file_path = settings.paths.answers / file_name + return pl.read_parquet(file_path) -def get_query_answer(query: int, base_dir: Path = ANSWERS_BASE_DIR) -> pl.LazyFrame: - return pl.scan_parquet(base_dir / f"q{query}.parquet") +def _scan_table(table_name: str, file_format: FILE_FORMAT) -> pl.LazyFrame: + preload_data = file_format == "skip" + if preload_data: + file_format = "parquet" + file_name = f"{table_name}.{file_format}" + file_path = DATASET_BASE_DIR / file_name + + if file_format == "parquet": + scan = pl.scan_parquet(file_path) + elif file_format == "feather": + scan = pl.scan_ipc(file_path) + else: + msg = f"unexpected file format: {file_format}" + raise ValueError(msg) -def test_results(q_num: int, result_df: pl.DataFrame) -> None: - with CodeTimer(name=f"Testing result of polars Query {q_num}", unit="s"): - answer = get_query_answer(q_num).collect() - assert_frame_equal(left=result_df, right=answer, check_dtype=False) + if preload_data: + return scan.collect().rechunk().lazy() + return scan -def get_line_item_ds(base_dir: Path = DATASET_BASE_DIR) -> pl.LazyFrame: - return _scan_ds(base_dir / "lineitem") +def get_customer_ds(file_format: FILE_FORMAT = "skip") -> pl.LazyFrame: + return _scan_table("customer", file_format) -def get_orders_ds(base_dir: Path = DATASET_BASE_DIR) -> pl.LazyFrame: - return _scan_ds(base_dir / "orders") +def get_line_item_ds(file_format: FILE_FORMAT = "skip") -> pl.LazyFrame: + return _scan_table("lineitem", file_format) -def get_customer_ds(base_dir: Path = DATASET_BASE_DIR) -> pl.LazyFrame: - return _scan_ds(base_dir / "customer") +def get_nation_ds(file_format: FILE_FORMAT = "skip") -> pl.LazyFrame: + return _scan_table("nation", file_format) -def get_region_ds(base_dir: Path = DATASET_BASE_DIR) -> pl.LazyFrame: - return _scan_ds(base_dir / "region") +def get_orders_ds(file_format: FILE_FORMAT = "skip") -> pl.LazyFrame: + return _scan_table("orders", file_format) -def get_nation_ds(base_dir: Path = DATASET_BASE_DIR) -> pl.LazyFrame: - return _scan_ds(base_dir / "nation") +def get_part_ds(file_format: FILE_FORMAT = "skip") -> pl.LazyFrame: + return _scan_table("part", file_format) -def get_supplier_ds(base_dir: Path = DATASET_BASE_DIR) -> pl.LazyFrame: - return _scan_ds(base_dir / "supplier") +def get_part_supp_ds(file_format: FILE_FORMAT = "skip") -> pl.LazyFrame: + return _scan_table("partsupp", file_format) -def get_part_ds(base_dir: Path = DATASET_BASE_DIR) -> pl.LazyFrame: - return _scan_ds(base_dir / "part") +def get_region_ds(file_format: FILE_FORMAT = "skip") -> pl.LazyFrame: + return _scan_table("region", file_format) -def get_part_supp_ds(base_dir: Path = DATASET_BASE_DIR) -> pl.LazyFrame: - return _scan_ds(base_dir / "partsupp") +def get_supplier_ds(file_format: FILE_FORMAT = "skip") -> pl.LazyFrame: + return _scan_table("supplier", file_format) -def run_query(q_num: int, lp: pl.LazyFrame) -> None: - @linetimer(name=f"Overall execution of polars Query {q_num}", unit="s") # type: ignore[misc] - def query() -> None: - if SHOW_PLAN: - print(lp.explain()) - with CodeTimer(name=f"Get result of polars Query {q_num}", unit="s"): - t0 = timeit.default_timer() - result = lp.collect(streaming=STREAMING) +def run_query( + query_number: int, + query_plan: pl.LazyFrame, + print_query_plan: bool = False, + streaming: bool = False, +) -> None: + if print_query_plan: + print(query_plan.explain()) - secs = timeit.default_timer() - t0 + with CodeTimer(name=f"Execute Polars query {query_number}", unit="s") as timer: + result = query_plan.collect(streaming=streaming) - if LOG_TIMINGS: - append_row( - solution="polars", version=pl.__version__, q=f"q{q_num}", secs=secs - ) - else: - test_results(q_num, result) + if settings.print_query_output: + print(result) - if settings.print_query_output: - print(result) + check_result(result, query_number) - query() + if settings.paths.timings is not None: + append_row( + solution="polars", + version=pl.__version__, + query_number=query_number, + time=timer.took, + ) diff --git a/queries/pyspark/utils.py b/queries/pyspark/utils.py index fa41b4c..dc0157b 100644 --- a/queries/pyspark/utils.py +++ b/queries/pyspark/utils.py @@ -1,13 +1,13 @@ import timeit from pathlib import Path +import pandas as pd from linetimer import CodeTimer, linetimer -from pandas.core.frame import DataFrame as PandasDF +from pandas.testing import assert_series_equal from pyspark.sql import DataFrame as SparkDF from pyspark.sql import SparkSession from queries.common_utils import ( - ANSWERS_BASE_DIR, DATASET_BASE_DIR, LOG_TIMINGS, append_row, @@ -31,32 +31,28 @@ def _read_parquet_ds(path: Path, table_name: str) -> SparkDF: return df -def get_query_answer(query: int, base_dir: Path = ANSWERS_BASE_DIR) -> PandasDF: - import pandas as pd +def test_results(query_number: int, result_df: pd.DataFrame) -> None: + answer = _get_query_answer(query_number) - path = base_dir / f"q{query}.parquet" - return pd.read_parquet(path) + for c, t in answer.dtypes.items(): + s1 = result_df[c] + s2 = answer[c] + if t.name == "object": + s1 = s1.astype("string").apply(lambda x: x.strip()) + s2 = s2.astype("string").apply(lambda x: x.strip()) -def test_results(q_num: int, result_df: PandasDF) -> None: - import pandas as pd + elif t.name.startswith("int"): + s1 = s1.astype("int64") + s2 = s2.astype("int64") - with CodeTimer(name=f"Testing result of PySpark Query {q_num}", unit="s"): - answer = get_query_answer(q_num) + assert_series_equal(left=s1, right=s2, check_index=False, check_dtype=False) - for c, t in answer.dtypes.items(): - s1 = result_df[c] - s2 = answer[c] - if t.name == "object": - s1 = s1.astype("string").apply(lambda x: x.strip()) - s2 = s2.astype("string").apply(lambda x: x.strip()) - - elif t.name.startswith("int"): - s1 = s1.astype("int64") - s2 = s2.astype("int64") - - pd.testing.assert_series_equal(left=s1, right=s2, check_index=False) +def _get_query_answer(query_number: int) -> pd.DataFrame: + file_name = f"q{query_number}.parquet" + file_path = settings.paths.answers / file_name + return pd.read_parquet(file_path, dtype_backend="pyarrow") @on_second_call diff --git a/queries/settings.py b/queries/settings.py index ed689f4..9abc30e 100644 --- a/queries/settings.py +++ b/queries/settings.py @@ -14,7 +14,7 @@ class Library(BaseModel): class Paths(BaseSettings): - dataset: Path = Path("data/tables") + tables: Path = Path("data/tables") answers: Path = Path("data/answers") timings: Path | None = None diff --git a/scripts/plot_results.py b/scripts/plot_results.py index 5cc28ae..19e5e6b 100644 --- a/scripts/plot_results.py +++ b/scripts/plot_results.py @@ -7,13 +7,12 @@ ``` """ -from pathlib import Path from typing import Any import plotly.express as px import polars as pl -from queries.common_utils import DEFAULT_PLOTS_DIR, INCLUDE_IO, TIMINGS_FILE, WRITE_PLOT +from queries.common_utils import INCLUDE_IO, TIMINGS_FILE, settings # colors for each bar COLORS = { @@ -101,7 +100,7 @@ def add_annotations(fig: Any, limit: int, df: pl.DataFrame) -> None: def write_plot_image(fig: Any) -> None: - path = Path(DEFAULT_PLOTS_DIR) + path = settings.paths.plots if not path.exists(): path.mkdir() @@ -164,7 +163,7 @@ def plot( add_annotations(fig, limit, df) - if WRITE_PLOT: + if settings.paths.plots is not None: write_plot_image(fig) # display the object using available environment context @@ -172,8 +171,6 @@ def plot( if __name__ == "__main__": - print("write plot:", WRITE_PLOT) - e = pl.lit(True) max_query = 8 diff --git a/scripts/prepare_data.py b/scripts/prepare_data.py index f44b8f6..243241f 100644 --- a/scripts/prepare_data.py +++ b/scripts/prepare_data.py @@ -5,7 +5,7 @@ from queries.common_utils import settings ROOT_DIR = Path(__file__).parent.parent -TABLES_DIR = ROOT_DIR / "data" / "tables" / f"scale-{settings.scale_factor}" +TABLES_DIR = settings.paths.tables / f"scale-{settings.scale_factor}" table_columns = { "customer": [ From d5cf588dd5cb2a17dd6bfed5e59c5ee0db25f933 Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Wed, 6 Mar 2024 21:36:35 +0100 Subject: [PATCH 7/9] Add venv creation capability --- queries/common_utils.py | 45 +++++++++++++++++++++++++++++++++++++++++ queries/settings.py | 3 ++- requirements.in | 14 +++++++------ requirements.txt | 14 +++++++++++++ 4 files changed, 69 insertions(+), 7 deletions(-) diff --git a/queries/common_utils.py b/queries/common_utils.py index 8c67556..302877c 100644 --- a/queries/common_utils.py +++ b/queries/common_utils.py @@ -1,3 +1,4 @@ +import os import re import subprocess import sys @@ -75,3 +76,47 @@ def get_query_numbers(library_name: str) -> list[int]: def run_query(library_name: str, query_number: int) -> None: subprocess.run([sys.executable, "-m", f"queries.{library_name}.q{query_number}"]) + + +def set_up_venv(lib: Library) -> str: + """Set up a virtual environment for the given library. + + Returns the path to the Python executable. + """ + venv_path = _create_venv(lib) + _install_lib(lib, venv_path) + return venv_path / "bin" / "python" + + +def _create_venv(lib: Library) -> str: + """Create a virtual environment for the given library. + + Returns the path to the virtual environment root. + """ + venv_name = f".venv-{lib.name}-{lib.version}" + venv_path = settings.paths.venvs / venv_name + subprocess.run([sys.executable, "-m", "uv", "venv", str(venv_path)]) + return venv_path + + +def _install_lib(lib: Library, venv_path: Path) -> None: + """Install the library in the given virtual environment.""" + current_venv = os.environ.pop("VIRTUAL_ENV", None) + current_conda = os.environ.pop("CONDA_PREFIX", None) + + os.environ["VIRTUAL_ENV"] = str(venv_path) + pip_spec = _get_pip_specifier(lib) + subprocess.run([sys.executable, "-m", "uv", "pip", "install", pip_spec]) + os.environ.pop("VIRTUAL_ENV") + + if current_venv is not None: + os.environ["VIRTUAL_ENV"] = current_venv + if current_conda is not None: + os.environ["CONDA_PREFIX"] = current_conda + + +def _get_pip_specifier(lib: Library) -> str: + if lib.version is None: + return lib.name + else: + return f"{lib.name}=={lib.version}" diff --git a/queries/settings.py b/queries/settings.py index 9abc30e..997ed36 100644 --- a/queries/settings.py +++ b/queries/settings.py @@ -9,13 +9,14 @@ class Library(BaseModel): name: str - version: str = "latest" + version: str | None = None parameters: dict[str, Any] = {} class Paths(BaseSettings): tables: Path = Path("data/tables") answers: Path = Path("data/answers") + venvs: Path = Path("venvs") timings: Path | None = None plots: Path | None = None diff --git a/requirements.in b/requirements.in index aaa514c..523dfcf 100644 --- a/requirements.in +++ b/requirements.in @@ -1,3 +1,11 @@ +linetimer +pydantic +pydantic-settings +uv + +plotnine +plotly + duckdb pandas>=2.0 polars @@ -6,9 +14,3 @@ pyspark pyarrow # Required by duckdb/pandas fastparquet # Required by pandas setuptools # Required by pyspark - -linetimer -plotnine -plotly -pydantic -pydantic-settings diff --git a/requirements.txt b/requirements.txt index ba36c2c..647deab 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,7 @@ # This file was autogenerated by uv via the following command: # uv pip compile requirements.in +annotated-types==0.6.0 + # via pydantic contourpy==1.2.0 # via matplotlib cramjam==2.8.2 @@ -53,6 +55,11 @@ polars==0.20.14 py4j==0.10.9.7 # via pyspark pyarrow==15.0.0 +pydantic==2.6.3 + # via pydantic-settings +pydantic-core==2.16.3 + # via pydantic +pydantic-settings==2.2.1 pyparsing==3.1.2 # via matplotlib pyspark==3.5.1 @@ -60,6 +67,8 @@ python-dateutil==2.9.0.post0 # via # matplotlib # pandas +python-dotenv==1.0.1 + # via pydantic-settings pytz==2024.1 # via pandas scipy==1.12.0 @@ -76,5 +85,10 @@ statsmodels==0.14.1 # via plotnine tenacity==8.2.3 # via plotly +typing-extensions==4.10.0 + # via + # pydantic + # pydantic-core tzdata==2024.1 # via pandas +uv==0.1.15 From f0e98f0998ca871422ab6bbb8536264353ae2f1b Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Thu, 7 Mar 2024 00:58:39 +0100 Subject: [PATCH 8/9] Parametrize polars queries --- queries/common_utils.py | 101 +++++++++++++++++++++++++++++----------- queries/polars/q1.py | 13 ++++-- queries/polars/q10.py | 12 +++-- queries/polars/q11.py | 12 +++-- queries/polars/q12.py | 12 +++-- queries/polars/q13.py | 12 +++-- queries/polars/q14.py | 12 +++-- queries/polars/q15.py | 12 +++-- queries/polars/q16.py | 12 +++-- queries/polars/q17.py | 12 +++-- queries/polars/q18.py | 12 +++-- queries/polars/q19.py | 12 +++-- queries/polars/q2.py | 12 +++-- queries/polars/q20.py | 12 +++-- queries/polars/q21.py | 12 +++-- queries/polars/q22.py | 12 +++-- queries/polars/q3.py | 12 +++-- queries/polars/q4.py | 12 +++-- queries/polars/q5.py | 12 +++-- queries/polars/q6.py | 12 +++-- queries/polars/q7.py | 12 +++-- queries/polars/q8.py | 12 +++-- queries/polars/q9.py | 12 +++-- queries/polars/utils.py | 32 +++++++++++++ 24 files changed, 305 insertions(+), 93 deletions(-) diff --git a/queries/common_utils.py b/queries/common_utils.py index 302877c..58fcc25 100644 --- a/queries/common_utils.py +++ b/queries/common_utils.py @@ -3,6 +3,7 @@ import subprocess import sys from pathlib import Path +from typing import Any from linetimer import CodeTimer @@ -50,35 +51,18 @@ def append_row(solution: str, version: str, query_number: int, time: float) -> N # return helper -def execute_all(library: Library) -> None: - """Run all queries for the given library.""" - query_numbers = get_query_numbers(library.name) - - with CodeTimer(name=f"Overall execution of ALL {library.name} queries", unit="s"): - for i in query_numbers: - run_query(library.name, i) - - -def get_query_numbers(library_name: str) -> list[int]: - """Get the query numbers that are implemented for the given library.""" - query_numbers = [] - - path = Path(__file__).parent / library_name - expr = re.compile(r"q(\d+).py$") - - for file in path.iterdir(): - match = expr.search(str(file)) - if match is not None: - query_numbers.append(int(match.group(1))) - - return sorted(query_numbers) +def run_all_queries(lib: Library) -> None: + executable = _set_up_venv(lib) + args = _parameters_to_cli_args(lib.parameters) + query_numbers = get_query_numbers(lib.name) -def run_query(library_name: str, query_number: int) -> None: - subprocess.run([sys.executable, "-m", f"queries.{library_name}.q{query_number}"]) + with CodeTimer(name=f"Overall execution of ALL {lib.name} queries", unit="s"): + for i in query_numbers: + run_query(lib.name, i, args=args, executable=str(executable)) -def set_up_venv(lib: Library) -> str: +def _set_up_venv(lib: Library) -> Path: """Set up a virtual environment for the given library. Returns the path to the Python executable. @@ -101,14 +85,30 @@ def _create_venv(lib: Library) -> str: def _install_lib(lib: Library, venv_path: Path) -> None: """Install the library in the given virtual environment.""" + # Prepare environment current_venv = os.environ.pop("VIRTUAL_ENV", None) current_conda = os.environ.pop("CONDA_PREFIX", None) - os.environ["VIRTUAL_ENV"] = str(venv_path) + + # Install pip_spec = _get_pip_specifier(lib) subprocess.run([sys.executable, "-m", "uv", "pip", "install", pip_spec]) + # TODO: Clean up installing dependencies + subprocess.run( + [ + sys.executable, + "-m", + "uv", + "pip", + "install", + "linetimer", + "pydantic", + "pydantic-settings", + ] + ) + + # Restore environment os.environ.pop("VIRTUAL_ENV") - if current_venv is not None: os.environ["VIRTUAL_ENV"] = current_venv if current_conda is not None: @@ -120,3 +120,50 @@ def _get_pip_specifier(lib: Library) -> str: return lib.name else: return f"{lib.name}=={lib.version}" + + +def _parameters_to_cli_args(params: dict[str, Any] | None) -> list[str]: + if params is None: + return [] + args = [] + for name, value in params.items(): + name = name.replace("_", "-") + if value is True: + command = f"--{name}" + elif value is False: + command = f"--no-{name}" + else: + command = f"--{name}={value}" + args.append(command) + return args + + +def get_query_numbers(library_name: str) -> list[int]: + """Get the query numbers that are implemented for the given library.""" + query_numbers = [] + + path = Path(__file__).parent / library_name + expr = re.compile(r"q(\d+).py$") + + for file in path.iterdir(): + match = expr.search(str(file)) + if match is not None: + query_numbers.append(int(match.group(1))) + + return sorted(query_numbers) + + +def run_query( + library_name: str, + query_number: int, + args: list[str] | None = None, + executable: str = sys.executable, +) -> None: + """Run a single query for the specified library.""" + module = f"queries.{library_name}.q{query_number}" + command = [executable, "-m", module] + if args: + command += args + + print(command) + subprocess.run(command) diff --git a/queries/polars/q1.py b/queries/polars/q1.py index 37f8afe..9759ae3 100644 --- a/queries/polars/q1.py +++ b/queries/polars/q1.py @@ -7,9 +7,10 @@ Q_NUM = 1 -def q() -> None: +def q() -> pl.LazyFrame: var_1 = date(1998, 9, 2) q = utils.get_line_item_ds() + q_final = ( q.filter(pl.col("l_shipdate") <= var_1) .group_by("l_returnflag", "l_linestatus") @@ -34,8 +35,14 @@ def q() -> None: .sort("l_returnflag", "l_linestatus") ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/q10.py b/queries/polars/q10.py index f17c548..6c92664 100644 --- a/queries/polars/q10.py +++ b/queries/polars/q10.py @@ -7,7 +7,7 @@ Q_NUM = 10 -def q() -> None: +def q() -> pl.LazyFrame: customer_ds = utils.get_customer_ds() orders_ds = utils.get_orders_ds() line_item_ds = utils.get_line_item_ds() @@ -54,8 +54,14 @@ def q() -> None: .limit(20) ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/q11.py b/queries/polars/q11.py index 0493d2c..d708d22 100644 --- a/queries/polars/q11.py +++ b/queries/polars/q11.py @@ -5,7 +5,7 @@ Q_NUM = 11 -def q() -> None: +def q() -> pl.LazyFrame: supplier_ds = utils.get_supplier_ds() part_supp_ds = utils.get_part_supp_ds() nation_ds = utils.get_nation_ds() @@ -38,8 +38,14 @@ def q() -> None: .sort("value", descending=True) ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/q12.py b/queries/polars/q12.py index 5f9fd14..1446bcc 100644 --- a/queries/polars/q12.py +++ b/queries/polars/q12.py @@ -7,7 +7,7 @@ Q_NUM = 12 -def q() -> None: +def q() -> pl.LazyFrame: line_item_ds = utils.get_line_item_ds() orders_ds = utils.get_orders_ds() @@ -37,8 +37,14 @@ def q() -> None: .sort("l_shipmode") ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/q13.py b/queries/polars/q13.py index 26061f3..1a43416 100644 --- a/queries/polars/q13.py +++ b/queries/polars/q13.py @@ -5,7 +5,7 @@ Q_NUM = 13 -def q() -> None: +def q() -> pl.LazyFrame: var_1 = "special" var_2 = "requests" @@ -25,8 +25,14 @@ def q() -> None: .sort(by=["custdist", "c_count"], descending=[True, True]) ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/q14.py b/queries/polars/q14.py index 2c4600c..bddaa19 100644 --- a/queries/polars/q14.py +++ b/queries/polars/q14.py @@ -7,7 +7,7 @@ Q_NUM = 14 -def q() -> None: +def q() -> pl.LazyFrame: line_item_ds = utils.get_line_item_ds() part_ds = utils.get_part_ds() @@ -31,8 +31,14 @@ def q() -> None: ) ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/q15.py b/queries/polars/q15.py index 908bf8e..48cb223 100644 --- a/queries/polars/q15.py +++ b/queries/polars/q15.py @@ -7,7 +7,7 @@ Q_NUM = 15 -def q() -> None: +def q() -> pl.LazyFrame: line_item_ds = utils.get_line_item_ds() supplier_ds = utils.get_supplier_ds() @@ -35,8 +35,14 @@ def q() -> None: .sort("s_suppkey") ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/q16.py b/queries/polars/q16.py index 2826bef..0ad2d62 100644 --- a/queries/polars/q16.py +++ b/queries/polars/q16.py @@ -5,7 +5,7 @@ Q_NUM = 16 -def q() -> None: +def q() -> pl.LazyFrame: part_supp_ds = utils.get_part_supp_ds() part_ds = utils.get_part_ds() supplier_ds = ( @@ -31,8 +31,14 @@ def q() -> None: ) ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/q17.py b/queries/polars/q17.py index f658605..96bf4f3 100644 --- a/queries/polars/q17.py +++ b/queries/polars/q17.py @@ -5,7 +5,7 @@ Q_NUM = 17 -def q() -> None: +def q() -> pl.LazyFrame: var_1 = "Brand#23" var_2 = "MED BOX" @@ -27,8 +27,14 @@ def q() -> None: .select((pl.col("l_extendedprice").sum() / 7.0).round(2).alias("avg_yearly")) ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/q18.py b/queries/polars/q18.py index 478e370..4f1671e 100644 --- a/queries/polars/q18.py +++ b/queries/polars/q18.py @@ -5,7 +5,7 @@ Q_NUM = 18 -def q() -> None: +def q() -> pl.LazyFrame: customer_ds = utils.get_customer_ds() line_item_ds = utils.get_line_item_ds() orders_ds = utils.get_orders_ds() @@ -34,8 +34,14 @@ def q() -> None: .limit(100) ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/q19.py b/queries/polars/q19.py index a1f281f..05f35df 100644 --- a/queries/polars/q19.py +++ b/queries/polars/q19.py @@ -5,7 +5,7 @@ Q_NUM = 19 -def q() -> None: +def q() -> pl.LazyFrame: line_item_ds = utils.get_line_item_ds() part_ds = utils.get_part_ds() @@ -47,8 +47,14 @@ def q() -> None: ) ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/q2.py b/queries/polars/q2.py index 4646ba7..cf99ba5 100644 --- a/queries/polars/q2.py +++ b/queries/polars/q2.py @@ -5,7 +5,7 @@ Q_NUM = 2 -def q() -> None: +def q() -> pl.LazyFrame: var_1 = 15 var_2 = "BRASS" var_3 = "EUROPE" @@ -50,8 +50,14 @@ def q() -> None: .with_columns(pl.col(pl.String).str.strip_chars().name.keep()) ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/q20.py b/queries/polars/q20.py index b35cc55..19dac2d 100644 --- a/queries/polars/q20.py +++ b/queries/polars/q20.py @@ -7,7 +7,7 @@ Q_NUM = 20 -def q() -> None: +def q() -> pl.LazyFrame: line_item_ds = utils.get_line_item_ds() nation_ds = utils.get_nation_ds() supplier_ds = utils.get_supplier_ds() @@ -46,8 +46,14 @@ def q() -> None: .sort("s_name") ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/q21.py b/queries/polars/q21.py index 6ef7f76..38eefd2 100644 --- a/queries/polars/q21.py +++ b/queries/polars/q21.py @@ -5,7 +5,7 @@ Q_NUM = 21 -def q() -> None: +def q() -> pl.LazyFrame: line_item_ds = utils.get_line_item_ds() supplier_ds = utils.get_supplier_ds() nation_ds = utils.get_nation_ds() @@ -39,8 +39,14 @@ def q() -> None: .limit(100) ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/q22.py b/queries/polars/q22.py index 1af6ff7..dcd7155 100644 --- a/queries/polars/q22.py +++ b/queries/polars/q22.py @@ -5,7 +5,7 @@ Q_NUM = 22 -def q() -> None: +def q() -> pl.LazyFrame: orders_ds = utils.get_orders_ds() customer_ds = utils.get_customer_ds() @@ -39,8 +39,14 @@ def q() -> None: .sort("cntrycode") ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/q3.py b/queries/polars/q3.py index 7f77b0d..3d3f35a 100644 --- a/queries/polars/q3.py +++ b/queries/polars/q3.py @@ -7,7 +7,7 @@ Q_NUM = 3 -def q() -> None: +def q() -> pl.LazyFrame: var_1 = var_2 = date(1995, 3, 15) var_3 = "BUILDING" @@ -36,8 +36,14 @@ def q() -> None: .limit(10) ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/q4.py b/queries/polars/q4.py index 62556da..91ad8ae 100644 --- a/queries/polars/q4.py +++ b/queries/polars/q4.py @@ -7,7 +7,7 @@ Q_NUM = 4 -def q() -> None: +def q() -> pl.LazyFrame: var_1 = date(1993, 7, 1) var_2 = date(1993, 10, 1) @@ -25,8 +25,14 @@ def q() -> None: .with_columns(pl.col("order_count").cast(pl.Int64)) ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/q5.py b/queries/polars/q5.py index 79df48b..1d95f70 100644 --- a/queries/polars/q5.py +++ b/queries/polars/q5.py @@ -7,7 +7,7 @@ Q_NUM = 5 -def q() -> None: +def q() -> pl.LazyFrame: var_1 = "ASIA" var_2 = date(1994, 1, 1) var_3 = date(1995, 1, 1) @@ -39,8 +39,14 @@ def q() -> None: .sort(by="revenue", descending=True) ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/q6.py b/queries/polars/q6.py index 819be70..26484ec 100644 --- a/queries/polars/q6.py +++ b/queries/polars/q6.py @@ -7,7 +7,7 @@ Q_NUM = 6 -def q() -> None: +def q() -> pl.LazyFrame: var_1 = date(1994, 1, 1) var_2 = date(1995, 1, 1) var_3 = 24 @@ -26,8 +26,14 @@ def q() -> None: .select(pl.sum("revenue")) ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/q7.py b/queries/polars/q7.py index 1dd025d..6e8d837 100644 --- a/queries/polars/q7.py +++ b/queries/polars/q7.py @@ -7,7 +7,7 @@ Q_NUM = 7 -def q() -> None: +def q() -> pl.LazyFrame: nation_ds = utils.get_nation_ds() customer_ds = utils.get_customer_ds() line_item_ds = utils.get_line_item_ds() @@ -52,8 +52,14 @@ def q() -> None: .sort(by=["supp_nation", "cust_nation", "l_year"]) ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/q8.py b/queries/polars/q8.py index 663d7ac..ff66fc6 100644 --- a/queries/polars/q8.py +++ b/queries/polars/q8.py @@ -7,7 +7,7 @@ Q_NUM = 8 -def q() -> None: +def q() -> pl.LazyFrame: part_ds = utils.get_part_ds() supplier_ds = utils.get_supplier_ds() line_item_ds = utils.get_line_item_ds() @@ -46,8 +46,14 @@ def q() -> None: .sort("o_year") ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/q9.py b/queries/polars/q9.py index b954ef2..cba43b5 100644 --- a/queries/polars/q9.py +++ b/queries/polars/q9.py @@ -5,7 +5,7 @@ Q_NUM = 9 -def q() -> None: +def q() -> pl.LazyFrame: part_ds = utils.get_part_ds() supplier_ds = utils.get_supplier_ds() line_item_ds = utils.get_line_item_ds() @@ -37,8 +37,14 @@ def q() -> None: .sort(by=["nation", "o_year"], descending=[False, True]) ) - utils.run_query(Q_NUM, q_final) + return q_final + + +def main() -> None: + args = utils.parse_parameters() + query_plan = q() + utils.run_query(Q_NUM, query_plan, **vars(args)) if __name__ == "__main__": - q() + main() diff --git a/queries/polars/utils.py b/queries/polars/utils.py index 5bb695f..a613e87 100644 --- a/queries/polars/utils.py +++ b/queries/polars/utils.py @@ -1,3 +1,5 @@ +import argparse + import polars as pl from linetimer import CodeTimer from polars.testing import assert_frame_equal @@ -101,3 +103,33 @@ def run_query( query_number=query_number, time=timer.took, ) + + +def parse_parameters() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Process library-specific parameters") + parser.add_argument( + "--streaming", + action=argparse.BooleanOptionalAction, + default=False, + help="Run the query in streaming mode.", + ) + parser.add_argument( + "--print-query-plan", + action=argparse.BooleanOptionalAction, + default=False, + help="Print the query plan before executing the query.", + ) + return parser.parse_args() + + +def str2bool(v): + if isinstance(v, bool): + print("SUP BRO") + return v + if v.lower() in ("yes", "true", "t", "y", "1"): + return True + elif v.lower() in ("no", "false", "f", "n", "0"): + return False + else: + msg = "Boolean value expected." + raise argparse.ArgumentTypeError(msg) From a85502bb73cd682a073a191efee2a27baf08ede6 Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Thu, 7 Mar 2024 01:44:25 +0100 Subject: [PATCH 9/9] More stuff --- queries/common_utils.py | 57 +++++++++++++------------------------- queries/polars/__main__.py | 6 ++++ queries/polars/executor.py | 6 ---- queries/polars/utils.py | 31 ++++++++++++--------- 4 files changed, 44 insertions(+), 56 deletions(-) create mode 100644 queries/polars/__main__.py delete mode 100644 queries/polars/executor.py diff --git a/queries/common_utils.py b/queries/common_utils.py index 58fcc25..1da4571 100644 --- a/queries/common_utils.py +++ b/queries/common_utils.py @@ -10,7 +10,6 @@ from queries.settings import Library, Settings settings = Settings() -print(settings.model_dump_json()) DATASET_BASE_DIR = settings.paths.tables / f"scale-{settings.scale_factor}" @@ -27,31 +26,29 @@ def append_row(solution: str, version: str, query_number: int, time: float) -> N f.write(line) -# def on_second_call(func: Any) -> Any: -# def helper(*args: Any, **kwargs: Any) -> Any: -# helper.calls += 1 # type: ignore[attr-defined] +def run_all_queries_all_libs() -> None: + for lib in settings.libraries: + run_all_queries(lib) -# # first call is outside the function -# # this call must set the result -# if helper.calls == 1: # type: ignore[attr-defined] -# # include IO will compute the result on the 2nd call -# if not INCLUDE_IO: -# helper.result = func(*args, **kwargs) # type: ignore[attr-defined] -# return helper.result # type: ignore[attr-defined] -# # second call is in the query, now we set the result -# if INCLUDE_IO and helper.calls == 2: # type: ignore[attr-defined] -# helper.result = func(*args, **kwargs) # type: ignore[attr-defined] - -# return helper.result # type: ignore[attr-defined] - -# helper.calls = 0 # type: ignore[attr-defined] -# helper.result = None # type: ignore[attr-defined] +def run_query( + lib_name: str, + query_number: int, + args: list[str] | None = None, + executable: str = sys.executable, +) -> None: + """Run a single query for the specified library.""" + module = f"queries.{lib_name}.q{query_number}" + command = [executable, "-m", module] + if args: + command += args -# return helper + subprocess.run(command) def run_all_queries(lib: Library) -> None: + print(settings.model_dump_json()) + executable = _set_up_venv(lib) args = _parameters_to_cli_args(lib.parameters) @@ -77,7 +74,9 @@ def _create_venv(lib: Library) -> str: Returns the path to the virtual environment root. """ - venv_name = f".venv-{lib.name}-{lib.version}" + venv_name = f".venv-{lib.name}" + if lib.version is not None: + venv_name += f"-{lib.version}" venv_path = settings.paths.venvs / venv_name subprocess.run([sys.executable, "-m", "uv", "venv", str(venv_path)]) return venv_path @@ -151,19 +150,3 @@ def get_query_numbers(library_name: str) -> list[int]: query_numbers.append(int(match.group(1))) return sorted(query_numbers) - - -def run_query( - library_name: str, - query_number: int, - args: list[str] | None = None, - executable: str = sys.executable, -) -> None: - """Run a single query for the specified library.""" - module = f"queries.{library_name}.q{query_number}" - command = [executable, "-m", module] - if args: - command += args - - print(command) - subprocess.run(command) diff --git a/queries/polars/__main__.py b/queries/polars/__main__.py new file mode 100644 index 0000000..cf6f081 --- /dev/null +++ b/queries/polars/__main__.py @@ -0,0 +1,6 @@ +from queries.common_utils import run_all_queries +from queries.polars.utils import parse_lib_settings + +if __name__ == "__main__": + lib = parse_lib_settings() + run_all_queries(lib) diff --git a/queries/polars/executor.py b/queries/polars/executor.py deleted file mode 100644 index ad083df..0000000 --- a/queries/polars/executor.py +++ /dev/null @@ -1,6 +0,0 @@ -from queries.common_utils import execute_all -from queries.settings import Library - -if __name__ == "__main__": - lib = Library(name="polars") - execute_all(lib) diff --git a/queries/polars/utils.py b/queries/polars/utils.py index a613e87..fa6117d 100644 --- a/queries/polars/utils.py +++ b/queries/polars/utils.py @@ -9,7 +9,7 @@ append_row, settings, ) -from queries.settings import FILE_FORMAT +from queries.settings import FILE_FORMAT, Library def check_result(result: pl.DataFrame, query_number: int) -> None: @@ -106,6 +106,11 @@ def run_query( def parse_parameters() -> argparse.Namespace: + parser = _set_up_arg_parser() + return parser.parse_args() + + +def _set_up_arg_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Process library-specific parameters") parser.add_argument( "--streaming", @@ -119,17 +124,17 @@ def parse_parameters() -> argparse.Namespace: default=False, help="Print the query plan before executing the query.", ) - return parser.parse_args() + return parser -def str2bool(v): - if isinstance(v, bool): - print("SUP BRO") - return v - if v.lower() in ("yes", "true", "t", "y", "1"): - return True - elif v.lower() in ("no", "false", "f", "n", "0"): - return False - else: - msg = "Boolean value expected." - raise argparse.ArgumentTypeError(msg) +def parse_lib_settings() -> argparse.Namespace: + parser = _set_up_arg_parser() + parser.add_argument( + "--version", + help="The library version to use. Defaults to the latest published version.", + ) + args = parser.parse_args() + + args_dict = vars(args) + version = args_dict.pop("version", None) + return Library(name="polars", version=version, parameters=args_dict)