Skip to content

Commit 39d9323

Browse files
ExecutionPlan visitor example documentation (apache#10286)
* Parquet exec visitor * Cleanup * Update examples README * Feedback
1 parent 3b77b6b commit 39d9323

File tree

2 files changed

+111
-0
lines changed

2 files changed

+111
-0
lines changed

datafusion-examples/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ cargo run --example csv_sql
6262
- [`memtable.rs`](examples/memtable.rs): Create an query data in memory using SQL and `RecordBatch`es
6363
- [`parquet_sql.rs`](examples/parquet_sql.rs): Build and run a query plan from a SQL statement against a local Parquet file
6464
- [`parquet_sql_multiple_files.rs`](examples/parquet_sql_multiple_files.rs): Build and run a query plan from a SQL statement against multiple local Parquet files
65+
- ['parquet_exec_visitor.rs'](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution
6566
- [`pruning.rs`](examples/parquet_sql.rs): Use pruning to rule out files based on statistics
6667
- [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3
6768
- [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use datafusion::datasource::file_format::parquet::ParquetFormat;
21+
use datafusion::datasource::listing::{ListingOptions, PartitionedFile};
22+
use datafusion::datasource::physical_plan::ParquetExec;
23+
use datafusion::execution::context::SessionContext;
24+
use datafusion::physical_plan::metrics::MetricValue;
25+
use datafusion::physical_plan::{
26+
execute_stream, visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor,
27+
};
28+
use futures::StreamExt;
29+
30+
/// Example of collecting metrics after execution by visiting the `ExecutionPlan`
31+
#[tokio::main]
32+
async fn main() {
33+
let ctx = SessionContext::new();
34+
35+
let test_data = datafusion::test_util::parquet_test_data();
36+
37+
// Configure listing options
38+
let file_format = ParquetFormat::default().with_enable_pruning(true);
39+
let listing_options = ListingOptions::new(Arc::new(file_format));
40+
41+
// First example were we use an absolute path, which requires no additional setup.
42+
let _ = ctx
43+
.register_listing_table(
44+
"my_table",
45+
&format!("file://{test_data}/alltypes_plain.parquet"),
46+
listing_options.clone(),
47+
None,
48+
None,
49+
)
50+
.await;
51+
52+
let df = ctx.sql("SELECT * FROM my_table").await.unwrap();
53+
let plan = df.create_physical_plan().await.unwrap();
54+
55+
// Create empty visitor
56+
let mut visitor = ParquetExecVisitor {
57+
file_groups: None,
58+
bytes_scanned: None,
59+
};
60+
61+
// Make sure you execute the plan to collect actual execution statistics.
62+
// For example, in this example the `file_scan_config` is known without executing
63+
// but the `bytes_scanned` would be None if we did not execute.
64+
let mut batch_stream = execute_stream(plan.clone(), ctx.task_ctx()).unwrap();
65+
while let Some(batch) = batch_stream.next().await {
66+
println!("Batch rows: {}", batch.unwrap().num_rows());
67+
}
68+
69+
visit_execution_plan(plan.as_ref(), &mut visitor).unwrap();
70+
71+
println!(
72+
"ParquetExecVisitor bytes_scanned: {:?}",
73+
visitor.bytes_scanned
74+
);
75+
println!(
76+
"ParquetExecVisitor file_groups: {:?}",
77+
visitor.file_groups.unwrap()
78+
);
79+
}
80+
81+
/// Define a struct with fields to hold the execution information you want to
82+
/// collect. In this case, I want information on how many bytes were scanned
83+
/// and `file_groups` from the FileScanConfig.
84+
#[derive(Debug)]
85+
struct ParquetExecVisitor {
86+
file_groups: Option<Vec<Vec<PartitionedFile>>>,
87+
bytes_scanned: Option<MetricValue>,
88+
}
89+
90+
impl ExecutionPlanVisitor for ParquetExecVisitor {
91+
type Error = datafusion_common::DataFusionError;
92+
93+
/// This function is called once for every node in the tree.
94+
/// Based on your needs implement either `pre_visit` (visit each node before its children/inputs)
95+
/// or `post_visit` (visit each node after its children/inputs)
96+
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
97+
// If needed match on a specific `ExecutionPlan` node type
98+
let maybe_parquet_exec = plan.as_any().downcast_ref::<ParquetExec>();
99+
if let Some(parquet_exec) = maybe_parquet_exec {
100+
self.file_groups = Some(parquet_exec.base_config().file_groups.clone());
101+
102+
let metrics = match parquet_exec.metrics() {
103+
None => return Ok(true),
104+
Some(metrics) => metrics,
105+
};
106+
self.bytes_scanned = metrics.sum_by_name("bytes_scanned");
107+
}
108+
Ok(true)
109+
}
110+
}

0 commit comments

Comments
 (0)