Skip to content

Commit

Permalink
Column Iteration Benchmarks (#328)
Browse files Browse the repository at this point in the history
  • Loading branch information
stanbrub authored Aug 12, 2024
1 parent f877fc6 commit 74ef153
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/resources/compare-scale-benchmark.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ default.data.distribution=random
generator.pause.per.row=0 millis

# Compression used for generating and storing records (SNAPPY, ZSTD, LZ4, LZO, GZIP, NONE)
record.compression=LZO
record.compression=SNAPPY

# Row count to scale tests (Tests can override but typically do not)
scale.row.count=70000000
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/* Copyright (c) 2022-2024 Deephaven Data Labs and Patent Pending */
package io.deephaven.benchmark.tests.compare.iterate;

import org.junit.jupiter.api.*;
import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
import io.deephaven.benchmark.tests.compare.CompareTestRunner;

/**
* Product comparison tests for iterating and summing table columns. Tests read the same parquet data. To avoid an
* unfair advantage where some products may partition or group data during the read, parquet read time is included in
* the benchmark results.
* <p/>
* Each test produces a table result containing one row with one column that is the total of the result of the sum of
* two columns for each row. ex. sum((r1c1 + r1c2)..(rNc1 + rNc2)). This is achieved without creating an extra column to
* hold the column sums.
* <p/>
* Data generation only happens in the first test, the Deephaven test. Tests can be run individually, but only after the
* desired data has been generated.
*/
@TestMethodOrder(OrderAnnotation.class)
public class RowIteratorTest {
final CompareTestRunner runner = new CompareTestRunner(this);

@Test
@Order(1)
public void deephavenRowIterator() {
runner.initDeephaven(2, "source", null, "int250", "int640");
var setup = "from deephaven.parquet import read";
var op = """
source = read('/data/source.parquet').select()
result = new_table([
long_col('total', [sum(row.int250 + row.int640 for row in source.iter_tuple())])
])
""";
var msize = "source.size";
var rsize = "result.size";
runner.test("Deephaven Row Iterator", setup, op, msize, rsize);
}

@Test
@Order(2)
public void pyarrowRowIterator() {
runner.initPython("pyarrow");
var setup = """
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.compute as pc
def iterdicts(table, cols=[]):
for batch in table.to_batches(1024):
d = batch.to_pydict()
int250 = d['int250']
int640 = d['int640']
for i in range(len(int250)):
row = {'int250':int250[i],'int640':int640[i]}
yield row
""";
var op = """
source = ds.dataset('/data/source.parquet', format="parquet").to_table()
rsum = sum(row['int250'] + row['int640'] for row in iterdicts(source))
result = pa.Table.from_pydict({'total':[rsum]})
""";
var msize = "source.num_rows";
var rsize = "result.num_rows";
runner.test("PyArrow Row Iterator", setup, op, msize, rsize);
}

@Test
@Order(3)
public void pandasRowIteratior() {
runner.initPython("fastparquet", "pandas");
var setup = "import pandas as pd";
var op = """
source = pd.read_parquet('/data/source.parquet')
rsum = sum(row.int250 + row.int640 for row in source.itertuples())
result = pd.DataFrame([[rsum]], columns=['total'])
""";
var msize = "len(source)";
var rsize = "len(result)";
runner.test("Pandas Row Iterator", setup, op, msize, rsize);
}

@Test
@Order(4)
public void duckdbRowIterator() {
runner.initPython("duckdb");
var setup = """
import duckdb as db
def iterdicts(table):
while batch := table.fetchmany(1024):
for row in batch:
r = {'int250':row[0],'int640':row[1]}
yield r
""";
var op = """
source = db.sql("SELECT * FROM '/data/source.parquet'")
db.sql("CREATE TABLE results(total INT)")
rsum = sum(row['int250'] + row['int640'] for row in iterdicts(source))
db.sql("INSERT INTO results VALUES(" + str(rsum) + ")")
sourceLen = db.sql("SELECT count(*) FROM source").fetchone()[0]
resultLen = db.sql("SELECT count(*) FROM results").fetchone()[0]
""";
var msize = "sourceLen";
var rsize = "resultLen";
runner.test("DuckDb Row Iterator", setup, op, msize, rsize);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/* Copyright (c) 2022-2024 Deephaven Data Labs and Patent Pending */
package io.deephaven.benchmark.tests.standard.formula;

import org.junit.jupiter.api.*;
import io.deephaven.benchmark.tests.standard.StandardTestRunner;

/**
* Standard tests for iterating through tables to access column values directly. These benchmarks iterate through the
* same columns and do the same sums.
*/
public class RowIteratorTest {
final StandardTestRunner runner = new StandardTestRunner(this);

void setup(int rowFactor) {
runner.setRowFactor(rowFactor);
runner.tables("source");
runner.setScaleFactors(1, 0);
}

@Test
void iterDict2Cols() {
setup(2);
var q = """
new_table([
double_col('total', [sum(row['num1'] + row['num2'] for row in source.iter_dict())])
])
""";
runner.test("Row-IterDict- Sum 2 Double Cols", 1, q, "num1", "num2");
}

@Test
void iterTuple2Cols() {
setup(4);
var q = """
new_table([
double_col('total', [sum(row.num1 + row.num2 for row in source.iter_tuple())])
])
""";
runner.test("Row-IterTuple- Sum 2 Double Cols", 1, q, "num1", "num2");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ default.data.distribution=random
generator.pause.per.row=0 millis

# Compression used for generating and storing records (ZSTD, LZ4, LZO, GZIP, SNAPPY, NONE)
record.compression=LZO
record.compression=SNAPPY

# Row count to scale tests (Tests can override but typically do not)
scale.row.count=100000
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright (c) 2022-2024 Deephaven Data Labs and Patent Pending
#
# Supporting Deephaven queries to use the benchmark_snippet to investigate product comparisons.
# - Generate a table that shows the comparisons between products for each benchmark
# Requirements: Deephaven 0.23.0 or greater

from urllib.request import urlopen; import os

benchmark_set_arg = 'stanbrub/full-set-140M'

root = 'file:///data' if os.path.exists('/data/deephaven-benchmark') else 'https://storage.googleapis.com'
with urlopen(root + '/deephaven-benchmark/benchmark_tables.dh.py') as r:
benchmark_storage_uri_arg = root + '/deephaven-benchmark'
benchmark_category_arg ='adhoc'
benchmark_actor_filter_arg = os.path.dirname(benchmark_set_arg)
benchmark_set_filter_arg = os.path.basename(benchmark_set_arg)
exec(r.read().decode(), globals(), locals())


product_compare = bench_results_sets.view([
'Product=benchmark_name.replaceAll(`[ ].*$`,``)', 'Benchmark=benchmark_name.replaceAll(`^[^ ]+[ ]`,``)',
'Rate=op_rate'
]).sort(['Benchmark','Product'])

from deephaven import numpy as dhnp
products = dhnp.to_numpy(product_compare.select_distinct(['Product']))
products = [str(prod[0]) for prod in products]

product_compare = product_result.group_by(['Benchmark']).view(['Benchmark'] + [
products[i] + '=Rate[' + str(i) + ']' for i in range(len(products))
])

bench_results = bench_metrics = bench_platforms = bench_results_sets = bench_results_change = None

0 comments on commit 74ef153

Please sign in to comment.