-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbench_polars.py
74 lines (56 loc) · 2.41 KB
/
bench_polars.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# Copyright (C) 2023 Adam Lugowski. All rights reserved.
# Use of this source code is governed by the BSD 2-clause license found in the LICENSE.txt file.
# SPDX-License-Identifier: BSD-2-Clause
from pathlib import Path
import google_benchmark as benchmark
from google_benchmark import Counter
import fast_matrix_market as fmm
import polars as pl
from common import *
@benchmark.register(name="op:read/impl:Polars/format:Parquet")
@benchmark.option.use_real_time()
@benchmark.option.unit(benchmark.kSecond)
@benchmark.option.dense_range(0, get_num_problems() - 1, step=1)
@benchmark.option.iterations(num_iterations)
def polars_read_parquet(state):
prob = get_problem(state.range(0))
mat = fmm.mmread(prob["mm_path"])
# create dataframe
df = pl.DataFrame(dict(col=mat.col, row=mat.row, data=mat.data))
del mat
tmp_path = temp_write_dir / f"write_{prob['name']}.pqt"
df.write_parquet(tmp_path)
while state:
_ = pl.read_parquet(tmp_path)
state.bytes_processed = state.iterations * tmp_path.stat().st_size
# read speed where file length is the length of the original Matrix Market file
state.counters["MM_equivalent_bytes_per_second"] = Counter(
state.iterations * prob["mm_path"].stat().st_size,
Counter.kIsRate)
state.counters[prob['name']] = Counter(state.range(0))
if delete_written_files_on_finish:
tmp_path.unlink()
@benchmark.register(name="op:write/impl:Polars/format:Parquet")
@benchmark.option.use_real_time()
@benchmark.option.unit(benchmark.kSecond)
@benchmark.option.dense_range(0, get_num_problems()-1, step=1)
@benchmark.option.iterations(num_iterations)
def polars_write_parquet(state):
prob = get_problem(state.range(0))
mat = fmm.mmread(prob["mm_path"])
# create dataframe
df = pl.DataFrame(dict(col=mat.col, row=mat.row, data=mat.data))
del mat
out_path = temp_write_dir / f"write_{prob['name']}.pqt"
while state:
df.write_parquet(out_path)
state.bytes_processed = state.iterations * out_path.stat().st_size
# read speed where file length is the length of the original Matrix Market file
state.counters["MM_equivalent_bytes_per_second"] = Counter(
state.iterations * prob["mm_path"].stat().st_size,
Counter.kIsRate)
state.counters[prob['name']] = Counter(state.range(0))
if delete_written_files_on_finish:
out_path.unlink()
if __name__ == "__main__":
benchmark.main()