diff --git a/queries/dask/q1.py b/queries/dask/q1.py index 665d72e..7298033 100644 --- a/queries/dask/q1.py +++ b/queries/dask/q1.py @@ -10,36 +10,30 @@ def q() -> None: - VAR1 = date(1998, 9, 2) - - lineitem = utils.get_line_item_ds + line_item_ds = utils.get_line_item_ds # first call one time to cache in case we don't include the IO times - lineitem() + line_item_ds() def query() -> pd.DataFrame: - nonlocal lineitem - lineitem = lineitem() + nonlocal line_item_ds + line_item_ds = line_item_ds() + + var1 = date(1998, 9, 2) - sel = lineitem.l_shipdate <= VAR1 - lineitem_filtered = lineitem[sel] + filt = line_item_ds[line_item_ds["l_shipdate"] <= var1] # This is lenient towards pandas as normally an optimizer should decide # that this could be computed before the groupby aggregation. # Other implementations don't enjoy this benefit. - lineitem_filtered["disc_price"] = lineitem_filtered.l_extendedprice * ( - 1 - lineitem_filtered.l_discount - ) - lineitem_filtered["charge"] = ( - lineitem_filtered.l_extendedprice - * (1 - lineitem_filtered.l_discount) - * (1 + lineitem_filtered.l_tax) + filt["disc_price"] = filt.l_extendedprice * (1.0 - filt.l_discount) + filt["charge"] = ( + filt.l_extendedprice * (1.0 - filt.l_discount) * (1.0 + filt.l_tax) ) # `groupby(as_index=False)` is not yet implemented by Dask: # https://github.com/dask/dask/issues/5834 - gb = lineitem_filtered.groupby(["l_returnflag", "l_linestatus"]) - - total = gb.agg( + gb = filt.groupby(["l_returnflag", "l_linestatus"]) + agg = gb.agg( sum_qty=pd.NamedAgg(column="l_quantity", aggfunc="sum"), sum_base_price=pd.NamedAgg(column="l_extendedprice", aggfunc="sum"), sum_disc_price=pd.NamedAgg(column="disc_price", aggfunc="sum"), @@ -48,9 +42,9 @@ def query() -> pd.DataFrame: avg_price=pd.NamedAgg(column="l_extendedprice", aggfunc="mean"), avg_disc=pd.NamedAgg(column="l_discount", aggfunc="mean"), count_order=pd.NamedAgg(column="l_orderkey", aggfunc="size"), - ) + ).reset_index() - result_df = total.reset_index().sort_values(["l_returnflag", "l_linestatus"]) + result_df = agg.sort_values(["l_returnflag", "l_linestatus"]) return result_df.compute() # type: ignore[no-any-return] diff --git a/queries/dask/q2.py b/queries/dask/q2.py index 7179d91..b85b90b 100644 --- a/queries/dask/q2.py +++ b/queries/dask/q2.py @@ -11,10 +11,6 @@ def q() -> None: - var1 = 15 - var2 = "BRASS" - var3 = "EUROPE" - region_ds = utils.get_region_ds nation_ds = utils.get_nation_ds supplier_ds = utils.get_supplier_ds @@ -40,99 +36,26 @@ def query() -> pd.DataFrame: part_ds = part_ds() part_supp_ds = part_supp_ds() - nation_filtered = nation_ds.loc[:, ["n_nationkey", "n_name", "n_regionkey"]] - region_filtered = region_ds[(region_ds["r_name"] == var3)] - region_filtered = region_filtered.loc[:, ["r_regionkey"]] - r_n_merged = nation_filtered.merge( - region_filtered, left_on="n_regionkey", right_on="r_regionkey", how="inner" - ) - r_n_merged = r_n_merged.loc[:, ["n_nationkey", "n_name"]] - supplier_filtered = supplier_ds.loc[ - :, - [ - "s_suppkey", - "s_name", - "s_address", - "s_nationkey", - "s_phone", - "s_acctbal", - "s_comment", - ], - ] - s_r_n_merged = r_n_merged.merge( - supplier_filtered, - left_on="n_nationkey", - right_on="s_nationkey", - how="inner", - ) - s_r_n_merged = s_r_n_merged.loc[ - :, - [ - "n_name", - "s_suppkey", - "s_name", - "s_address", - "s_phone", - "s_acctbal", - "s_comment", - ], - ] - partsupp_filtered = part_supp_ds.loc[ - :, ["ps_partkey", "ps_suppkey", "ps_supplycost"] - ] - ps_s_r_n_merged = s_r_n_merged.merge( - partsupp_filtered, left_on="s_suppkey", right_on="ps_suppkey", how="inner" - ) - ps_s_r_n_merged = ps_s_r_n_merged.loc[ - :, - [ - "n_name", - "s_name", - "s_address", - "s_phone", - "s_acctbal", - "s_comment", - "ps_partkey", - "ps_supplycost", - ], - ] - part_filtered = part_ds.loc[:, ["p_partkey", "p_mfgr", "p_size", "p_type"]] - part_filtered = part_filtered[ - (part_filtered["p_size"] == var1) - # & (part_filtered["p_type"].astype(str).str.endswith(var2)) - & (part_filtered["p_type"].str.endswith(var2)) - ] - part_filtered = part_filtered.loc[:, ["p_partkey", "p_mfgr"]] - merged_df = part_filtered.merge( - ps_s_r_n_merged, left_on="p_partkey", right_on="ps_partkey", how="inner" + var1 = 15 + var2 = "BRASS" + var3 = "EUROPE" + + jn = ( + part_ds.merge(part_supp_ds, left_on="p_partkey", right_on="ps_partkey") + .merge(supplier_ds, left_on="ps_suppkey", right_on="s_suppkey") + .merge(nation_ds, left_on="s_nationkey", right_on="n_nationkey") + .merge(region_ds, left_on="n_regionkey", right_on="r_regionkey") ) - merged_df = merged_df.loc[ - :, - [ - "n_name", - "s_name", - "s_address", - "s_phone", - "s_acctbal", - "s_comment", - "ps_supplycost", - "p_partkey", - "p_mfgr", - ], - ] - # `groupby(as_index=False)` is not yet implemented by Dask: - # https://github.com/dask/dask/issues/5834 - min_values = merged_df.groupby("p_partkey")["ps_supplycost"].min().reset_index() + jn = jn[jn["p_size"] == var1] + jn = jn[jn["p_type"].str.endswith(var2)] + jn = jn[jn["r_name"] == var3] - min_values.columns = ["P_PARTKEY_CPY", "MIN_SUPPLYCOST"] - merged_df = merged_df.merge( - min_values, - left_on=["p_partkey", "ps_supplycost"], - right_on=["P_PARTKEY_CPY", "MIN_SUPPLYCOST"], - how="inner", - ) - result_df = merged_df.loc[ + gb = jn.groupby("p_partkey") + agg = gb["ps_supplycost"].min().reset_index() + jn2 = agg.merge(jn, on=["p_partkey", "ps_supplycost"]) + + sel = jn2.loc[ :, [ "s_acctbal", @@ -145,22 +68,14 @@ def query() -> pd.DataFrame: "s_comment", ], ] - result_df = result_df.sort_values( - by=[ - "s_acctbal", - "n_name", - "s_name", - "p_partkey", - ], - ascending=[ - False, - True, - True, - True, - ], - ).head(100, compute=False) - return result_df.compute() # type: ignore[no-any-return] + sort = sel.sort_values( + by=["s_acctbal", "n_name", "s_name", "p_partkey"], + ascending=[False, True, True, True], + ) + result_df = sort.head(100) + + return result_df # type: ignore[no-any-return] utils.run_query(Q_NUM, query) diff --git a/queries/dask/q3.py b/queries/dask/q3.py index 83a2bf9..e8cfd1d 100644 --- a/queries/dask/q3.py +++ b/queries/dask/q3.py @@ -12,9 +12,6 @@ def q() -> None: - var1 = var2 = date(1995, 3, 15) - var3 = "BUILDING" - customer_ds = utils.get_customer_ds line_item_ds = utils.get_line_item_ds orders_ds = utils.get_orders_ds @@ -32,35 +29,28 @@ def query() -> pd.DataFrame: line_item_ds = line_item_ds() orders_ds = orders_ds() - lineitem_filtered = line_item_ds.loc[ - :, ["l_orderkey", "l_extendedprice", "l_discount", "l_shipdate"] - ] - orders_filtered = orders_ds.loc[ - :, ["o_orderkey", "o_custkey", "o_orderdate", "o_shippriority"] - ] - customer_filtered = customer_ds.loc[:, ["c_mktsegment", "c_custkey"]] - lsel = lineitem_filtered.l_shipdate > var1 - osel = orders_filtered.o_orderdate < var2 - csel = customer_filtered.c_mktsegment == var3 - flineitem = lineitem_filtered[lsel] - forders = orders_filtered[osel] - fcustomer = customer_filtered[csel] - jn1 = fcustomer.merge(forders, left_on="c_custkey", right_on="o_custkey") - jn2 = jn1.merge(flineitem, left_on="o_orderkey", right_on="l_orderkey") + var1 = "BUILDING" + var2 = date(1995, 3, 15) + + fcustomer = customer_ds[customer_ds["c_mktsegment"] == var1] + + jn1 = fcustomer.merge(orders_ds, left_on="c_custkey", right_on="o_custkey") + jn2 = jn1.merge(line_item_ds, left_on="o_orderkey", right_on="l_orderkey") + + jn2 = jn2[jn2["o_orderdate"] < var2] + jn2 = jn2[jn2["l_shipdate"] > var2] jn2["revenue"] = jn2.l_extendedprice * (1 - jn2.l_discount) - # `groupby(as_index=False)` is not yet implemented by Dask: - # https://github.com/dask/dask/issues/5834 - total = ( - jn2.groupby(["l_orderkey", "o_orderdate", "o_shippriority"])["revenue"] - .sum() - .reset_index() - .sort_values(["revenue"], ascending=False) - ) - result_df = total.head(10, compute=False).loc[ - :, ["l_orderkey", "revenue", "o_orderdate", "o_shippriority"] - ] - return result_df.compute() # type: ignore[no-any-return] + gb = jn2.groupby(["o_orderkey", "o_orderdate", "o_shippriority"]) + agg = gb["revenue"].sum().reset_index() + + sel = agg.loc[:, ["o_orderkey", "revenue", "o_orderdate", "o_shippriority"]] + sel = sel.rename(columns={"o_orderkey": "l_orderkey"}) + + sorted = sel.sort_values(by=["revenue", "o_orderdate"], ascending=[False, True]) + result_df = sorted.head(10) + + return result_df # type: ignore[no-any-return] utils.run_query(Q_NUM, query) diff --git a/queries/dask/q4.py b/queries/dask/q4.py index 4252efe..7bc85a3 100644 --- a/queries/dask/q4.py +++ b/queries/dask/q4.py @@ -1,24 +1,19 @@ from __future__ import annotations from datetime import date -from typing import TYPE_CHECKING -from queries.dask import utils +import pandas as pd -if TYPE_CHECKING: - import pandas as pd +from queries.dask import utils Q_NUM = 4 def q() -> None: - date1 = date(1993, 10, 1) - date2 = date(1993, 7, 1) - line_item_ds = utils.get_line_item_ds orders_ds = utils.get_orders_ds - # First call one time to cache in case we don't include the IO times + # first call one time to cache in case we don't include the IO times line_item_ds() orders_ds() @@ -28,27 +23,23 @@ def query() -> pd.DataFrame: line_item_ds = line_item_ds() orders_ds = orders_ds() - lsel = line_item_ds.l_commitdate < line_item_ds.l_receiptdate - osel = (orders_ds.o_orderdate < date1) & (orders_ds.o_orderdate >= date2) - flineitem = line_item_ds[lsel] - forders = orders_ds[osel] - - # `isin(Series)` is not yet implemented by Dask. - # https://github.com/dask/dask/issues/4227 - forders = forders[["o_orderkey", "o_orderpriority"]] - jn = forders.merge( - flineitem, left_on="o_orderkey", right_on="l_orderkey" - ).drop_duplicates(subset=["o_orderkey"])[["o_orderpriority", "o_orderkey"]] - - # `groupby(as_index=False)` is not yet implemented by Dask: - # https://github.com/dask/dask/issues/5834 - result_df = ( - jn.groupby("o_orderpriority")["o_orderkey"] - .count() - .reset_index() - .sort_values(["o_orderpriority"]) - .rename(columns={"o_orderkey": "order_count"}) - ) + var1 = date(1993, 7, 1) + var2 = date(1993, 10, 1) + + jn = line_item_ds.merge(orders_ds, left_on="l_orderkey", right_on="o_orderkey") + + jn = jn[(jn["o_orderdate"] >= var1) & (jn["o_orderdate"] < var2)] + jn = jn[jn["l_commitdate"] < jn["l_receiptdate"]] + + jn = jn.drop_duplicates(subset=["o_orderpriority", "l_orderkey"]) + + gb = jn.groupby("o_orderpriority") + agg = gb.agg( + order_count=pd.NamedAgg(column="o_orderkey", aggfunc="count") + ).reset_index() + + result_df = agg.sort_values(["o_orderpriority"]) + return result_df.compute() # type: ignore[no-any-return] utils.run_query(Q_NUM, query) diff --git a/queries/dask/q5.py b/queries/dask/q5.py index 223fa29..0c71c1f 100644 --- a/queries/dask/q5.py +++ b/queries/dask/q5.py @@ -12,9 +12,6 @@ def q() -> None: - date1 = date(1994, 1, 1) - date2 = date(1995, 1, 1) - region_ds = utils.get_region_ds nation_ds = utils.get_nation_ds customer_ds = utils.get_customer_ds @@ -37,7 +34,6 @@ def query() -> pd.DataFrame: nonlocal line_item_ds nonlocal orders_ds nonlocal supplier_ds - region_ds = region_ds() nation_ds = nation_ds() customer_ds = customer_ds() @@ -45,27 +41,25 @@ def query() -> pd.DataFrame: orders_ds = orders_ds() supplier_ds = supplier_ds() + var1 = "ASIA" + var2 = date(1994, 1, 1) + var3 = date(1995, 1, 1) + jn1 = region_ds.merge(nation_ds, left_on="r_regionkey", right_on="n_regionkey") jn2 = jn1.merge(customer_ds, left_on="n_nationkey", right_on="c_nationkey") jn3 = jn2.merge(orders_ds, left_on="c_custkey", right_on="o_custkey") jn4 = jn3.merge(line_item_ds, left_on="o_orderkey", right_on="l_orderkey") - jn5 = supplier_ds.merge( - jn4, - left_on=["s_suppkey", "s_nationkey"], - right_on=["l_suppkey", "n_nationkey"], + jn5 = jn4.merge( + supplier_ds, + left_on=["l_suppkey", "n_nationkey"], + right_on=["s_suppkey", "s_nationkey"], ) - jn5["revenue"] = jn5.l_extendedprice * (1.0 - jn5.l_discount) - jn5 = jn5[ - (jn5.o_orderdate >= date1) - & (jn5.o_orderdate < date2) - & (jn5.r_name == "ASIA") - ] + jn5 = jn5[jn5["r_name"] == var1] + jn5 = jn5[(jn5["o_orderdate"] >= var2) & (jn5["o_orderdate"] < var3)] + jn5["revenue"] = jn5.l_extendedprice * (1.0 - jn5.l_discount) - # `groupby(as_index=False)` is not yet implemented by Dask: - # https://github.com/dask/dask/issues/5834 gb = jn5.groupby("n_name")["revenue"].sum().reset_index() - result_df = gb.sort_values("revenue", ascending=False) return result_df.compute() # type: ignore[no-any-return] diff --git a/queries/dask/q6.py b/queries/dask/q6.py index 419cc3e..527ada8 100644 --- a/queries/dask/q6.py +++ b/queries/dask/q6.py @@ -10,10 +10,6 @@ def q() -> None: - date1 = date(1994, 1, 1) - date2 = date(1995, 1, 1) - var3 = 24 - line_item_ds = utils.get_line_item_ds # first call one time to cache in case we don't include the IO times @@ -23,22 +19,24 @@ def query() -> pd.DataFrame: nonlocal line_item_ds line_item_ds = line_item_ds() - lineitem_filtered = line_item_ds.loc[ - :, ["l_quantity", "l_extendedprice", "l_discount", "l_shipdate"] - ] - sel = ( - (lineitem_filtered.l_shipdate >= date1) - & (lineitem_filtered.l_shipdate < date2) - & (lineitem_filtered.l_discount >= 0.05) - & (lineitem_filtered.l_discount <= 0.07) - & (lineitem_filtered.l_quantity < var3) - ) + var1 = date(1994, 1, 1) + var2 = date(1995, 1, 1) + var3 = 0.05 + var4 = 0.07 + var5 = 24 - flineitem = lineitem_filtered[sel] + flineitem = line_item_ds[ + (line_item_ds["l_shipdate"] >= var1) & (line_item_ds["l_shipdate"] < var2) + ] + flineitem = line_item_ds[ + (line_item_ds["l_discount"] >= var3) & (line_item_ds["l_discount"] <= var4) + ] + flineitem = line_item_ds[line_item_ds["l_quantity"] < var5] result_value = ( - (flineitem.l_extendedprice * flineitem.l_discount).sum().compute() + (flineitem["l_extendedprice"] * flineitem["l_discount"]).sum().compute() ) result_df = pd.DataFrame({"revenue": [result_value]}) + return result_df utils.run_query(Q_NUM, query) diff --git a/queries/dask/q7.py b/queries/dask/q7.py index d6eb702..3bea051 100644 --- a/queries/dask/q7.py +++ b/queries/dask/q7.py @@ -2,7 +2,8 @@ import warnings from datetime import date -from typing import TYPE_CHECKING + +import pandas as pd from queries.dask import utils @@ -10,16 +11,11 @@ warnings.filterwarnings("ignore", category=DeprecationWarning) import dask.dataframe as dd -if TYPE_CHECKING: - import pandas as pd Q_NUM = 7 def q() -> None: - var1 = date(1995, 1, 1) - var2 = date(1997, 1, 1) - nation_ds = utils.get_nation_ds customer_ds = utils.get_customer_ds line_item_ds = utils.get_line_item_ds @@ -39,102 +35,50 @@ def query() -> pd.DataFrame: nonlocal line_item_ds nonlocal orders_ds nonlocal supplier_ds - nation_ds = nation_ds() customer_ds = customer_ds() line_item_ds = line_item_ds() orders_ds = orders_ds() supplier_ds = supplier_ds() - lineitem_filtered = line_item_ds[ - (line_item_ds["l_shipdate"] >= var1) & (line_item_ds["l_shipdate"] < var2) - ] - lineitem_filtered["l_year"] = lineitem_filtered["l_shipdate"].dt.year - lineitem_filtered["revenue"] = lineitem_filtered["l_extendedprice"] * ( - 1.0 - lineitem_filtered["l_discount"] - ) - lineitem_filtered = lineitem_filtered.loc[ - :, ["l_orderkey", "l_suppkey", "l_year", "revenue"] - ] - supplier_filtered = supplier_ds.loc[:, ["s_suppkey", "s_nationkey"]] - orders_filtered = orders_ds.loc[:, ["o_orderkey", "o_custkey"]] - customer_filtered = customer_ds.loc[:, ["c_custkey", "c_nationkey"]] - n1 = nation_ds[(nation_ds["n_name"] == "FRANCE")].loc[ - :, ["n_nationkey", "n_name"] - ] - n2 = nation_ds[(nation_ds["n_name"] == "GERMANY")].loc[ - :, ["n_nationkey", "n_name"] - ] - - # ----- do nation 1 ----- - N1_C = customer_filtered.merge( - n1, left_on="c_nationkey", right_on="n_nationkey", how="inner" - ) - N1_C = N1_C.drop(columns=["c_nationkey", "n_nationkey"]).rename( - columns={"n_name": "cust_nation"} - ) - N1_C_O = N1_C.merge( - orders_filtered, left_on="c_custkey", right_on="o_custkey", how="inner" - ) - N1_C_O = N1_C_O.drop(columns=["c_custkey", "o_custkey"]) - - N2_S = supplier_filtered.merge( - n2, left_on="s_nationkey", right_on="n_nationkey", how="inner" - ) - N2_S = N2_S.drop(columns=["s_nationkey", "n_nationkey"]).rename( - columns={"n_name": "supp_nation"} - ) - N2_S_L = N2_S.merge( - lineitem_filtered, left_on="s_suppkey", right_on="l_suppkey", how="inner" - ) - N2_S_L = N2_S_L.drop(columns=["s_suppkey", "l_suppkey"]) - - total1 = N1_C_O.merge( - N2_S_L, left_on="o_orderkey", right_on="l_orderkey", how="inner" - ) - total1 = total1.drop(columns=["o_orderkey", "l_orderkey"]) - - # ----- do nation 2 ----- (same as nation 1 section but with nation 2) - N2_C = customer_filtered.merge( - n2, left_on="c_nationkey", right_on="n_nationkey", how="inner" - ) - N2_C = N2_C.drop(columns=["c_nationkey", "n_nationkey"]).rename( - columns={"n_name": "cust_nation"} - ) - N2_C_O = N2_C.merge( - orders_filtered, left_on="c_custkey", right_on="o_custkey", how="inner" - ) - N2_C_O = N2_C_O.drop(columns=["c_custkey", "o_custkey"]) - - N1_S = supplier_filtered.merge( - n1, left_on="s_nationkey", right_on="n_nationkey", how="inner" - ) - N1_S = N1_S.drop(columns=["s_nationkey", "n_nationkey"]).rename( - columns={"n_name": "supp_nation"} - ) - N1_S_L = N1_S.merge( - lineitem_filtered, left_on="s_suppkey", right_on="l_suppkey", how="inner" - ) - N1_S_L = N1_S_L.drop(columns=["s_suppkey", "l_suppkey"]) - - total2 = N2_C_O.merge( - N1_S_L, left_on="o_orderkey", right_on="l_orderkey", how="inner" - ) - total2 = total2.drop(columns=["o_orderkey", "l_orderkey"]) - - # concat results - total = dd.concat([total1, total2]) # type: ignore[attr-defined,no-untyped-call] - result_df = ( - total.groupby(["supp_nation", "cust_nation", "l_year"]) - .revenue.agg("sum") - .reset_index() - ) - result_df.columns = ["supp_nation", "cust_nation", "l_year", "revenue"] - - result_df = result_df.sort_values( - by=["supp_nation", "cust_nation", "l_year"], - ascending=[True, True, True], - ) + var1 = "FRANCE" + var2 = "GERMANY" + var3 = date(1995, 1, 1) + var4 = date(1996, 12, 31) + + n1 = nation_ds[(nation_ds["n_name"] == var1)] + n2 = nation_ds[(nation_ds["n_name"] == var2)] + + # Part 1 + jn1 = customer_ds.merge(n1, left_on="c_nationkey", right_on="n_nationkey") + jn2 = jn1.merge(orders_ds, left_on="c_custkey", right_on="o_custkey") + jn2 = jn2.rename(columns={"n_name": "cust_nation"}) + jn3 = jn2.merge(line_item_ds, left_on="o_orderkey", right_on="l_orderkey") + jn4 = jn3.merge(supplier_ds, left_on="l_suppkey", right_on="s_suppkey") + jn5 = jn4.merge(n2, left_on="s_nationkey", right_on="n_nationkey") + df1 = jn5.rename(columns={"n_name": "supp_nation"}) + + # Part 2 + jn1 = customer_ds.merge(n2, left_on="c_nationkey", right_on="n_nationkey") + jn2 = jn1.merge(orders_ds, left_on="c_custkey", right_on="o_custkey") + jn2 = jn2.rename(columns={"n_name": "cust_nation"}) + jn3 = jn2.merge(line_item_ds, left_on="o_orderkey", right_on="l_orderkey") + jn4 = jn3.merge(supplier_ds, left_on="l_suppkey", right_on="s_suppkey") + jn5 = jn4.merge(n1, left_on="s_nationkey", right_on="n_nationkey") + df2 = jn5.rename(columns={"n_name": "supp_nation"}) + + # Combine + total = dd.concat([df1, df2]) # type: ignore[attr-defined,no-untyped-call] + + total = total[(total["l_shipdate"] >= var3) & (total["l_shipdate"] <= var4)] + total["volume"] = total["l_extendedprice"] * (1.0 - total["l_discount"]) + total["l_year"] = total["l_shipdate"].dt.year + + gb = total.groupby(["supp_nation", "cust_nation", "l_year"]) + agg = gb.agg(revenue=pd.NamedAgg(column="volume", aggfunc="sum")).reset_index() + + result_df = agg.sort_values(by=["supp_nation", "cust_nation", "l_year"]) + return result_df.compute() # type: ignore[no-any-return] utils.run_query(Q_NUM, query)