Skip to content

Commit e7c1bff

Browse files
committed
more metrics
1 parent e15f087 commit e7c1bff

File tree

5 files changed

+108
-18
lines changed

5 files changed

+108
-18
lines changed

benchmarks/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ structopt = { version = "0.3", default-features = false }
5252
test-utils = { path = "../test-utils/", version = "0.1.0" }
5353
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
5454
url.workspace = true
55+
pprof = { version = "0.13", features = ["flamegraph"] }
5556

5657
[dev-dependencies]
5758
datafusion-proto = { workspace = true }

benchmarks/src/clickbench.rs

+54-12
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,20 @@ use std::sync::Arc;
2323

2424
use arrow::ipc::reader::FileReader;
2525
use arrow::util::pretty;
26+
use datafusion::datasource::physical_plan::parquet::Parquet7FileReaderFactory;
27+
use datafusion::execution::cache::cache_unit::Cache37;
28+
use datafusion::execution::object_store::ObjectStoreUrl;
2629
use datafusion::physical_plan::collect;
2730
use datafusion::physical_plan::display::DisplayableExecutionPlan;
31+
use datafusion::prelude::ParquetReadOptions;
2832
use datafusion::{
2933
error::{DataFusionError, Result},
3034
prelude::SessionContext,
3135
};
3236
use datafusion_common::exec_datafusion_err;
3337
use datafusion_common::instant::Instant;
3438
use object_store::aws::AmazonS3Builder;
39+
use object_store::ObjectStore;
3540
use parquet::arrow::builder::ArrowArrayCache;
3641
use structopt::StructOpt;
3742
use url::Url;
@@ -89,6 +94,10 @@ pub struct RunOpt {
8994
/// Check the answers against the stored answers
9095
#[structopt(long)]
9196
skip_answers: bool,
97+
98+
/// Generate a flamegraph
99+
#[structopt(parse(from_os_str), long)]
100+
flamegraph: Option<PathBuf>,
92101
}
93102

94103
struct AllQueries {
@@ -140,6 +149,7 @@ impl AllQueries {
140149
self.queries.len() - 1
141150
}
142151
}
152+
143153
impl RunOpt {
144154
pub async fn run(self) -> Result<()> {
145155
println!("Running benchmarks with the following options: {self:?}");
@@ -168,8 +178,18 @@ impl RunOpt {
168178
println!("Q{query_id}: {sql}");
169179

170180
for i in 0..iterations {
181+
let profiler_guard = if self.flamegraph.is_some() && i == iterations - 1 {
182+
Some(
183+
pprof::ProfilerGuardBuilder::default()
184+
.frequency(1000)
185+
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
186+
.build()
187+
.unwrap(),
188+
)
189+
} else {
190+
None
191+
};
171192
let start = Instant::now();
172-
// let results = ctx.sql(sql).await?.collect().await?;
173193
let plan = ctx.sql(sql).await?;
174194
let (state, plan) = plan.into_parts();
175195

@@ -252,28 +272,50 @@ impl RunOpt {
252272
println!("Query {} iteration {} answer not checked", query_id, i);
253273
}
254274

275+
if let Some(guard) = profiler_guard {
276+
let flamegraph_path = self.flamegraph.as_ref().unwrap();
277+
if let Ok(report) = guard.report().build() {
278+
let file = File::create(flamegraph_path).unwrap();
279+
report.flamegraph(file).unwrap();
280+
}
281+
}
282+
255283
benchmark_run.write_iter(elapsed, row_count);
256284
}
257285
}
286+
258287
benchmark_run.set_cache_stats(ArrowArrayCache::get().stats());
288+
benchmark_run.set_parquet_cache_size(Cache37::memory_usage());
259289
benchmark_run.maybe_write_json(self.output_path.as_ref())?;
260290
Ok(())
261291
}
262292

263293
/// Registrs the `hits.parquet` as a table named `hits`
264294
async fn register_hits(&self, ctx: &SessionContext) -> Result<()> {
265-
let options = Default::default();
266295
let path = self.path.as_os_str().to_str().unwrap();
267-
let url = Url::parse(&"minio://parquet-oo").unwrap();
268-
let object_store = AmazonS3Builder::new()
269-
.with_bucket_name("parquet-oo")
270-
.with_endpoint("http://c220g5-110910.wisc.cloudlab.us:9000")
271-
.with_allow_http(true)
272-
.with_region("us-east-1")
273-
.with_access_key_id(env::var("MINIO_ACCESS_KEY_ID").unwrap())
274-
.with_secret_access_key(env::var("MINIO_SECRET_ACCESS_KEY").unwrap())
275-
.build()?;
276-
ctx.register_object_store(&url, Arc::new(object_store));
296+
297+
let object_store: Arc<dyn ObjectStore> = if path.starts_with("minio://") {
298+
let url = Url::parse(path).unwrap();
299+
let bucket_name = url.host_str().unwrap_or("parquet-oo");
300+
let object_store = AmazonS3Builder::new()
301+
.with_bucket_name(bucket_name)
302+
.with_endpoint("http://c220g5-110910.wisc.cloudlab.us:9000")
303+
.with_allow_http(true)
304+
.with_region("us-east-1")
305+
.with_access_key_id(env::var("MINIO_ACCESS_KEY_ID").unwrap())
306+
.with_secret_access_key(env::var("MINIO_SECRET_ACCESS_KEY").unwrap())
307+
.build()?;
308+
let object_store = Arc::new(object_store);
309+
ctx.register_object_store(&url, object_store.clone());
310+
object_store
311+
} else {
312+
let url = ObjectStoreUrl::local_filesystem();
313+
let object_store = ctx.runtime_env().object_store(url).unwrap();
314+
Arc::new(object_store)
315+
};
316+
317+
let mut options: ParquetReadOptions<'_> = Default::default();
318+
options.reader = Some(Arc::new(Parquet7FileReaderFactory::new(object_store)));
277319

278320
ctx.register_parquet("hits", &path, options)
279321
.await

benchmarks/src/util/run.rs

+27-6
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use datafusion::{error::Result, DATAFUSION_VERSION};
19-
use parquet::arrow::builder::CacheStatistics;
19+
use parquet::arrow::builder::ArrowCacheStatistics;
2020
use serde::{Serialize, Serializer};
2121
use serde_json::Value;
2222
use std::{
@@ -98,7 +98,8 @@ pub struct BenchmarkRun {
9898
context: RunContext,
9999
queries: Vec<BenchQuery>,
100100
current_case: Option<usize>,
101-
cache_stats: Option<CacheStatistics>,
101+
arrow_cache_stats: Option<ArrowCacheStatistics>,
102+
parquet_cache_size: Option<usize>,
102103
}
103104

104105
impl Default for BenchmarkRun {
@@ -114,7 +115,8 @@ impl BenchmarkRun {
114115
context: RunContext::new(),
115116
queries: vec![],
116117
current_case: None,
117-
cache_stats: None,
118+
arrow_cache_stats: None,
119+
parquet_cache_size: None,
118120
}
119121
}
120122
/// begin a new case. iterations added after this will be included in the new case
@@ -141,15 +143,34 @@ impl BenchmarkRun {
141143
}
142144
}
143145

144-
pub fn set_cache_stats(&mut self, cache_stats: CacheStatistics) {
145-
self.cache_stats = Some(cache_stats);
146+
pub fn set_cache_stats(&mut self, cache_stats: ArrowCacheStatistics) {
147+
self.arrow_cache_stats = Some(cache_stats);
148+
}
149+
150+
pub fn set_parquet_cache_size(&mut self, size: usize) {
151+
self.parquet_cache_size = Some(size);
146152
}
147153

148154
/// Stringify data into formatted json
149155
pub fn to_json(&self) -> String {
150156
let mut output = HashMap::<&str, Value>::new();
151157
output.insert("context", serde_json::to_value(&self.context).unwrap());
152158
output.insert("queries", serde_json::to_value(&self.queries).unwrap());
159+
output.insert(
160+
"parquet_cache_size",
161+
serde_json::to_value(&self.parquet_cache_size.unwrap_or(0)).unwrap(),
162+
);
163+
output.insert(
164+
"arrow_cache_size",
165+
serde_json::to_value(
166+
&self
167+
.arrow_cache_stats
168+
.as_ref()
169+
.map(|x| x.memory_usage())
170+
.unwrap_or(0),
171+
)
172+
.unwrap(),
173+
);
153174
serde_json::to_string_pretty(&output).unwrap()
154175
}
155176

@@ -164,7 +185,7 @@ impl BenchmarkRun {
164185
// stats is too large so we write into parquet
165186
if let Some(path) = maybe_path {
166187
// same filename but parquet extension
167-
let stats = self.cache_stats.take();
188+
let stats = self.arrow_cache_stats.take();
168189
if let Some(stats) = stats {
169190
let path = path.as_ref().with_extension("parquet");
170191
let mut writer = File::create(path)?;

datafusion/core/src/datasource/file_format/options.rs

+9
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::datasource::file_format::file_compression_type::FileCompressionType;
2525
use crate::datasource::file_format::parquet::ParquetFormat;
2626
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
2727
use crate::datasource::listing::ListingTableUrl;
28+
use crate::datasource::physical_plan::ParquetFileReaderFactory;
2829
use crate::datasource::{
2930
file_format::{avro::AvroFormat, csv::CsvFormat, json::JsonFormat},
3031
listing::ListingOptions,
@@ -241,6 +242,9 @@ pub struct ParquetReadOptions<'a> {
241242
pub schema: Option<&'a Schema>,
242243
/// Indicates how the file is sorted
243244
pub file_sort_order: Vec<Vec<SortExpr>>,
245+
246+
/// A custom ParquetFileReaderFactory to use for reading Parquet files.
247+
pub reader: Option<Arc<dyn ParquetFileReaderFactory>>,
244248
}
245249

246250
impl<'a> Default for ParquetReadOptions<'a> {
@@ -252,6 +256,7 @@ impl<'a> Default for ParquetReadOptions<'a> {
252256
skip_metadata: None,
253257
schema: None,
254258
file_sort_order: vec![],
259+
reader: None,
255260
}
256261
}
257262
}
@@ -558,6 +563,10 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
558563
) -> ListingOptions {
559564
let mut file_format = ParquetFormat::new().with_options(table_options.parquet);
560565

566+
if let Some(reader) = &self.reader {
567+
file_format = file_format.with_reader(reader.clone());
568+
}
569+
561570
if let Some(parquet_pruning) = self.parquet_pruning {
562571
file_format = file_format.with_enable_pruning(parquet_pruning)
563572
}

datafusion/execution/src/cache/cache_unit.rs

+17
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,23 @@ impl Cache37 {
184184
pub fn bytes_cache() -> &'static RwLock<HashMap<(Path, Range<usize>), Arc<Bytes>>> {
185185
&CACHE.bytes_map
186186
}
187+
188+
pub fn memory_usage() -> usize {
189+
let cache = &CACHE;
190+
let metadata_map = cache.metadata_map.read().unwrap();
191+
let mut total_size = 0;
192+
193+
for i in metadata_map.iter() {
194+
total_size += i.1.memory_size();
195+
}
196+
197+
let bytes_map = cache.bytes_map.read().unwrap();
198+
for i in bytes_map.iter() {
199+
total_size += i.1.len();
200+
}
201+
202+
total_size
203+
}
187204
}
188205

189206
#[cfg(test)]

0 commit comments

Comments
 (0)