Skip to content

Commit d71c436

Browse files
authored
Add examples from TPC-H (apache#666)
* Update location of docker image * Initial commit for queries 1-3 * Commit queries 4-7 of TPC-H in examples * Add required license text * Add additional text around why to use a case statement in the example * add market share example * Add example for product type profit measure * Inital commit returned item report * Linting * Initial commit of q11 example * Initial commit of q12 from tpc-h * Initial commit for customer distribution example * Initial commit of promotion effect example * Initial commit of q15 in tph-c, top supplier * Initial commit of q16 in tph-c, part supplier relationship * Initial commit of q17 in tph-c, small quatity order * Initial commit of q18 in tph-c, large volume customer * Initial commit of q19 in tph-c, discounted revenue * Initial commit of q20 in tph-c, potential part promotion * Initial commit of q21 in tph-c, supplier who kept order waiting * Initial commit of q22 in tph-c, global sales opportunity * Adding readme information and marking text as copyrighted * Minimum part cost must be identified per part not across all parts that match the filters * Change ordering of output rows to match spec * Set parameter to match spec * Set parameter to match spec * setting values to match spec * Linting * Expand on readme to link to examples within tpch folder * Minor typo
1 parent 7d4a40c commit d71c436

27 files changed

+2420
-3
lines changed

benchmarks/tpch/tpch-gen.sh

+3-3
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ FILE=./data/supplier.tbl
2929
if test -f "$FILE"; then
3030
echo "$FILE exists."
3131
else
32-
docker run -v `pwd`/data:/data -it --rm ghcr.io/databloom-ai/tpch-docker:main -vf -s $1
32+
docker run -v `pwd`/data:/data -it --rm ghcr.io/scalytics/tpch-docker:main -vf -s $1
3333

3434
# workaround for https://github.com/apache/arrow-datafusion/issues/6147
3535
mv data/customer.tbl data/customer.csv
@@ -49,5 +49,5 @@ FILE=./data/answers/q1.out
4949
if test -f "$FILE"; then
5050
echo "$FILE exists."
5151
else
52-
docker run -v `pwd`/data:/data -it --entrypoint /bin/bash --rm ghcr.io/databloom-ai/tpch-docker:main -c "cp /opt/tpch/2.18.0_rc2/dbgen/answers/* /data/answers/"
53-
fi
52+
docker run -v `pwd`/data:/data -it --entrypoint /bin/bash --rm ghcr.io/scalytics/tpch-docker:main -c "cp /opt/tpch/2.18.0_rc2/dbgen/answers/* /data/answers/"
53+
fi

examples/README.md

+64
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,67 @@ Here is a direct link to the file used in the examples:
5252
- [Executing SQL on Polars](./sql-on-polars.py)
5353
- [Executing SQL on Pandas](./sql-on-pandas.py)
5454
- [Executing SQL on cuDF](./sql-on-cudf.py)
55+
56+
## TPC-H Examples
57+
58+
Within the subdirectory `tpch` there are 22 examples that reproduce queries in
59+
the TPC-H specification. These include realistic data that can be generated at
60+
arbitrary scale and allow the user to see use cases for a variety of data frame
61+
operations.
62+
63+
In the list below we describe which new operations can be found in the examples.
64+
The queries are designed to be of increasing complexity, so it is recommended to
65+
review them in order. For brevity, the following list does not include operations
66+
found in previous examples.
67+
68+
- [Convert CSV to Parquet](./tpch/convert_data_to_parquet.py)
69+
- Read from a CSV files where the delimiter is something other than a comma
70+
- Specify schema during CVS reading
71+
- Write to a parquet file
72+
- [Pricing Summary Report](./tpch/q01_pricing_summary_report.py)
73+
- Aggregation computing the maximum value, average, sum, and number of entries
74+
- Filter data by date and interval
75+
- Sorting
76+
- [Minimum Cost Supplier](./tpch/q02_minimum_cost_supplier.py)
77+
- Window operation to find minimum
78+
- Sorting in descending order
79+
- [Shipping Priority](./tpch/q03_shipping_priority.py)
80+
- [Order Priority Checking](./tpch/q04_order_priority_checking.py)
81+
- Aggregating multiple times in one data frame
82+
- [Local Supplier Volume](./tpch/q05_local_supplier_volume.py)
83+
- [Forecasting Revenue Change](./tpch/q06_forecasting_revenue_change.py)
84+
- Using collect and extracting values as a python object
85+
- [Volume Shipping](./tpch/q07_volume_shipping.py)
86+
- Finding multiple distinct and mutually exclusive values within one dataframe
87+
- Using `case` and `when` statements
88+
- [Market Share](./tpch/q08_market_share.py)
89+
- The operations in this query are similar to those in the prior examples, but
90+
it is a more complex example of using filters, joins, and aggregates
91+
- Using left outer joins
92+
- [Product Type Profit Measure](./tpch/q09_product_type_profit_measure.py)
93+
- Extract year from a date
94+
- [Returned Item Reporting](./tpch/q10_returned_item_reporting.py)
95+
- [Important Stock Identification](./tpch/q11_important_stock_identification.py)
96+
- [Shipping Modes and Order](./tpch/q12_ship_mode_order_priority.py)
97+
- Finding non-null values using a boolean operation in a filter
98+
- Case statement with default value
99+
- [Customer Distribution](./tpch/q13_customer_distribution.py)
100+
- [Promotion Effect](./tpch/q14_promotion_effect.py)
101+
- [Top Supplier](./tpch/q15_top_supplier.py)
102+
- [Parts/Supplier Relationship](./tpch/q16_part_supplier_relationship.py)
103+
- Using anti joins
104+
- Using regular expressions (regex)
105+
- Creating arrays of literal values
106+
- Determine if an element exists within an array
107+
- [Small-Quantity-Order Revenue](./tpch/q17_small_quantity_order.py)
108+
- [Large Volume Customer](./tpch/q18_large_volume_customer.py)
109+
- [Discounted Revenue](./tpch/q19_discounted_revenue.py)
110+
- Creating a user defined function (UDF)
111+
- Convert pyarrow Array to python values
112+
- Filtering based on a UDF
113+
- [Potential Part Promotion](./tpch/q20_potential_part_promotion.py)
114+
- Extracting part of a string using substr
115+
- [Suppliers Who Kept Orders Waiting](./tpch/q21_suppliers_kept_orders_waiting.py)
116+
- Using array aggregation
117+
- Determining the size of array elements
118+
- [Global Sales Opportunity](./tpch/q22_global_sales_opportunity.py)

examples/tpch/.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
data
2+

examples/tpch/README.md

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
<!---
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# DataFusion Python Examples for TPC-H
21+
22+
These examples reproduce the problems listed in the Transaction Process Council
23+
TPC-H benchmark. The purpose of these examples is to demonstrate how to use
24+
different aspects of Data Fusion and not necessarily geared towards creating the
25+
most performant queries possible. Within each example is a description of the
26+
problem. For users who are familiar with SQL style commands, you can compare the
27+
approaches in these examples with those listed in the specification.
28+
29+
- https://www.tpc.org/tpch/
30+
31+
The examples provided are based on version 2.18.0 of the TPC-H specification.
32+
33+
## Data Setup
34+
35+
To run these examples, you must first generate a dataset. The `dbgen` tool
36+
provided by TPC can create datasets of arbitrary scale. For testing it is
37+
typically sufficient to create a 1 gigabyte dataset. For convenience, this
38+
repository has a script which uses docker to create this dataset. From the
39+
`benchmarks/tpch` directory execute the following script.
40+
41+
```bash
42+
./tpch-gen.sh 1
43+
```
44+
45+
The examples provided use parquet files for the tables generated by `dbgen`.
46+
A python script is provided to convert the text files from `dbgen` into parquet
47+
files expected by the examples. From the `examples/tpch` directory you can
48+
execute the following command to create the necessary parquet files.
49+
50+
```bash
51+
python convert_data_to_parquet.py
52+
```
53+
54+
## Description of Examples
55+
56+
For easier access, a description of the techniques demonstrated in each file
57+
is in the README.md file in the `examples` directory.
+142
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""
19+
This is a utility function that will consumer the data generated by dbgen from TPC-H and convert
20+
it into a parquet file with the column names as expected by the TPC-H specification. It assumes
21+
the data generated resides in a path ../../benchmarks/tpch/data relative to the current file,
22+
as will be generated by the script provided in this repository.
23+
"""
24+
25+
import os
26+
import pyarrow
27+
import datafusion
28+
29+
ctx = datafusion.SessionContext()
30+
31+
all_schemas = {}
32+
33+
all_schemas["customer"] = [
34+
("C_CUSTKEY", pyarrow.int32()),
35+
("C_NAME", pyarrow.string()),
36+
("C_ADDRESS", pyarrow.string()),
37+
("C_NATIONKEY", pyarrow.int32()),
38+
("C_PHONE", pyarrow.string()),
39+
("C_ACCTBAL", pyarrow.float32()),
40+
("C_MKTSEGMENT", pyarrow.string()),
41+
("C_COMMENT", pyarrow.string()),
42+
]
43+
44+
all_schemas["lineitem"] = [
45+
("L_ORDERKEY", pyarrow.int32()),
46+
("L_PARTKEY", pyarrow.int32()),
47+
("L_SUPPKEY", pyarrow.int32()),
48+
("L_LINENUMBER", pyarrow.int32()),
49+
("L_QUANTITY", pyarrow.float32()),
50+
("L_EXTENDEDPRICE", pyarrow.float32()),
51+
("L_DISCOUNT", pyarrow.float32()),
52+
("L_TAX", pyarrow.float32()),
53+
("L_RETURNFLAG", pyarrow.string()),
54+
("L_LINESTATUS", pyarrow.string()),
55+
("L_SHIPDATE", pyarrow.date32()),
56+
("L_COMMITDATE", pyarrow.date32()),
57+
("L_RECEIPTDATE", pyarrow.date32()),
58+
("L_SHIPINSTRUCT", pyarrow.string()),
59+
("L_SHIPMODE", pyarrow.string()),
60+
("L_COMMENT", pyarrow.string()),
61+
]
62+
63+
all_schemas["nation"] = [
64+
("N_NATIONKEY", pyarrow.int32()),
65+
("N_NAME", pyarrow.string()),
66+
("N_REGIONKEY", pyarrow.int32()),
67+
("N_COMMENT", pyarrow.string()),
68+
]
69+
70+
all_schemas["orders"] = [
71+
("O_ORDERKEY", pyarrow.int32()),
72+
("O_CUSTKEY", pyarrow.int32()),
73+
("O_ORDERSTATUS", pyarrow.string()),
74+
("O_TOTALPRICE", pyarrow.float32()),
75+
("O_ORDERDATE", pyarrow.date32()),
76+
("O_ORDERPRIORITY", pyarrow.string()),
77+
("O_CLERK", pyarrow.string()),
78+
("O_SHIPPRIORITY", pyarrow.int32()),
79+
("O_COMMENT", pyarrow.string()),
80+
]
81+
82+
all_schemas["part"] = [
83+
("P_PARTKEY", pyarrow.int32()),
84+
("P_NAME", pyarrow.string()),
85+
("P_MFGR", pyarrow.string()),
86+
("P_BRAND", pyarrow.string()),
87+
("P_TYPE", pyarrow.string()),
88+
("P_SIZE", pyarrow.int32()),
89+
("P_CONTAINER", pyarrow.string()),
90+
("P_RETAILPRICE", pyarrow.float32()),
91+
("P_COMMENT", pyarrow.string()),
92+
]
93+
94+
all_schemas["partsupp"] = [
95+
("PS_PARTKEY", pyarrow.int32()),
96+
("PS_SUPPKEY", pyarrow.int32()),
97+
("PS_AVAILQTY", pyarrow.int32()),
98+
("PS_SUPPLYCOST", pyarrow.float32()),
99+
("PS_COMMENT", pyarrow.string()),
100+
]
101+
102+
all_schemas["region"] = [
103+
("r_REGIONKEY", pyarrow.int32()),
104+
("r_NAME", pyarrow.string()),
105+
("r_COMMENT", pyarrow.string()),
106+
]
107+
108+
all_schemas["supplier"] = [
109+
("S_SUPPKEY", pyarrow.int32()),
110+
("S_NAME", pyarrow.string()),
111+
("S_ADDRESS", pyarrow.string()),
112+
("S_NATIONKEY", pyarrow.int32()),
113+
("S_PHONE", pyarrow.string()),
114+
("S_ACCTBAL", pyarrow.float32()),
115+
("S_COMMENT", pyarrow.string()),
116+
]
117+
118+
curr_dir = os.path.dirname(os.path.abspath(__file__))
119+
for filename, curr_schema in all_schemas.items():
120+
121+
# For convenience, go ahead and convert the schema column names to lowercase
122+
curr_schema = [(s[0].lower(), s[1]) for s in curr_schema]
123+
124+
# Pre-collect the output columns so we can ignore the null field we add
125+
# in to handle the trailing | in the file
126+
output_cols = [r[0] for r in curr_schema]
127+
128+
# Trailing | requires extra field for in processing
129+
curr_schema.append(("some_null", pyarrow.null()))
130+
131+
schema = pyarrow.schema(curr_schema)
132+
133+
source_file = os.path.abspath(
134+
os.path.join(curr_dir, f"../../benchmarks/tpch/data/{filename}.csv")
135+
)
136+
dest_file = os.path.abspath(os.path.join(curr_dir, f"./data/{filename}.parquet"))
137+
138+
df = ctx.read_csv(source_file, schema=schema, has_header=False, delimiter="|")
139+
140+
df = df.select_columns(*output_cols)
141+
142+
df.write_parquet(dest_file, compression="snappy")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""
19+
TPC-H Problem Statement Query 1:
20+
21+
The Pricing Summary Report Query provides a summary pricing report for all lineitems shipped as of
22+
a given date. The date is within 60 - 120 days of the greatest ship date contained in the database.
23+
The query lists totals for extended price, discounted extended price, discounted extended price
24+
plus tax, average quantity, average extended price, and average discount. These aggregates are
25+
grouped by RETURNFLAG and LINESTATUS, and listed in ascending order of RETURNFLAG and LINESTATUS.
26+
A count of the number of lineitems in each group is included.
27+
28+
The above problem statement text is copyrighted by the Transaction Processing Performance Council
29+
as part of their TPC Benchmark H Specification revision 2.18.0.
30+
"""
31+
32+
import pyarrow as pa
33+
from datafusion import SessionContext, col, lit, functions as F
34+
35+
ctx = SessionContext()
36+
37+
df = ctx.read_parquet("data/lineitem.parquet")
38+
39+
# It may be that the date can be hard coded, based on examples shown.
40+
# This approach will work with any date range in the provided data set.
41+
42+
greatest_ship_date = df.aggregate(
43+
[], [F.max(col("l_shipdate")).alias("shipdate")]
44+
).collect()[0]["shipdate"][0]
45+
46+
# From the given problem, this is how close to the last date in the database we
47+
# want to report results for. It should be between 60-120 days before the end.
48+
DAYS_BEFORE_FINAL = 68
49+
50+
# Note: this is a hack on setting the values. It should be set differently once
51+
# https://github.com/apache/datafusion-python/issues/665 is resolved.
52+
interval = pa.scalar((0, 0, DAYS_BEFORE_FINAL), type=pa.month_day_nano_interval())
53+
54+
print("Final date in database:", greatest_ship_date)
55+
56+
# Filter data to the dates of interest
57+
df = df.filter(col("l_shipdate") <= lit(greatest_ship_date) - lit(interval))
58+
59+
# Aggregate the results
60+
61+
df = df.aggregate(
62+
[col("l_returnflag"), col("l_linestatus")],
63+
[
64+
F.sum(col("l_quantity")).alias("sum_qty"),
65+
F.sum(col("l_extendedprice")).alias("sum_base_price"),
66+
F.sum(col("l_extendedprice") * (lit(1.0) - col("l_discount"))).alias(
67+
"sum_disc_price"
68+
),
69+
F.sum(
70+
col("l_extendedprice")
71+
* (lit(1.0) - col("l_discount"))
72+
* (lit(1.0) + col("l_tax"))
73+
).alias("sum_charge"),
74+
F.avg(col("l_quantity")).alias("avg_qty"),
75+
F.avg(col("l_extendedprice")).alias("avg_price"),
76+
F.avg(col("l_discount")).alias("avg_disc"),
77+
F.count(col("l_returnflag")).alias(
78+
"count_order"
79+
), # Counting any column should return same result
80+
],
81+
)
82+
83+
# Sort per the expected result
84+
85+
df = df.sort(col("l_returnflag").sort(), col("l_linestatus").sort())
86+
87+
# Note: There appears to be a discrepancy between what is returned here and what is in the generated
88+
# answers file for the case of return flag N and line status O, but I did not investigate further.
89+
90+
df.show()

0 commit comments

Comments
 (0)