Skip to content

Commit 2834d58

Browse files
committed
Rename DataSource and FileSource fields for consistency
1 parent ea0686b commit 2834d58

File tree

19 files changed

+133
-110
lines changed

19 files changed

+133
-110
lines changed

datafusion-examples/examples/parquet_exec_visitor.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,11 @@ impl ExecutionPlanVisitor for ParquetExecVisitor {
9797
/// or `post_visit` (visit each node after its children/inputs)
9898
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
9999
// If needed match on a specific `ExecutionPlan` node type
100-
if let Some(data_source) = plan.as_any().downcast_ref::<DataSourceExec>() {
101-
let source = data_source.source();
102-
if let Some(file_config) = source.as_any().downcast_ref::<FileScanConfig>() {
100+
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
101+
let data_source = data_source_exec.data_source();
102+
if let Some(file_config) =
103+
data_source.as_any().downcast_ref::<FileScanConfig>()
104+
{
103105
if file_config
104106
.file_source()
105107
.as_any()
@@ -108,7 +110,7 @@ impl ExecutionPlanVisitor for ParquetExecVisitor {
108110
{
109111
self.file_groups = Some(file_config.file_groups.clone());
110112

111-
let metrics = match data_source.metrics() {
113+
let metrics = match data_source_exec.metrics() {
112114
None => return Ok(true),
113115
Some(metrics) => metrics,
114116
};

datafusion/core/src/datasource/physical_plan/arrow_file.rs

+4-5
Original file line numberDiff line numberDiff line change
@@ -84,17 +84,16 @@ impl ArrowExec {
8484
}
8585

8686
fn file_scan_config(&self) -> FileScanConfig {
87-
let source = self.inner.source();
88-
source
87+
self.inner
88+
.data_source()
8989
.as_any()
9090
.downcast_ref::<FileScanConfig>()
9191
.unwrap()
9292
.clone()
9393
}
9494

9595
fn json_source(&self) -> JsonSource {
96-
let source = self.file_scan_config();
97-
source
96+
self.file_scan_config()
9897
.file_source()
9998
.as_any()
10099
.downcast_ref::<JsonSource>()
@@ -130,7 +129,7 @@ impl ArrowExec {
130129
self.base_config.file_groups = file_groups.clone();
131130
let mut file_source = self.file_scan_config();
132131
file_source = file_source.with_file_groups(file_groups);
133-
self.inner = self.inner.with_source(Arc::new(file_source));
132+
self.inner = self.inner.with_data_source(Arc::new(file_source));
134133
self
135134
}
136135
}

datafusion/core/src/datasource/physical_plan/csv.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -254,8 +254,8 @@ impl CsvExec {
254254
}
255255

256256
fn file_scan_config(&self) -> FileScanConfig {
257-
let source = self.inner.source();
258-
source
257+
self.inner
258+
.data_source()
259259
.as_any()
260260
.downcast_ref::<FileScanConfig>()
261261
.unwrap()
@@ -316,7 +316,7 @@ impl CsvExec {
316316
self.base_config.file_groups = file_groups.clone();
317317
let mut file_source = self.file_scan_config();
318318
file_source = file_source.with_file_groups(file_groups);
319-
self.inner = self.inner.with_source(Arc::new(file_source));
319+
self.inner = self.inner.with_data_source(Arc::new(file_source));
320320
self
321321
}
322322
}

datafusion/core/src/datasource/physical_plan/json.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,8 @@ impl NdJsonExec {
103103
}
104104

105105
fn file_scan_config(&self) -> FileScanConfig {
106-
let source = self.inner.source();
107-
source
106+
self.inner
107+
.data_source()
108108
.as_any()
109109
.downcast_ref::<FileScanConfig>()
110110
.unwrap()
@@ -148,7 +148,7 @@ impl NdJsonExec {
148148
self.base_config.file_groups = file_groups.clone();
149149
let mut file_source = self.file_scan_config();
150150
file_source = file_source.with_file_groups(file_groups);
151-
self.inner = self.inner.with_source(Arc::new(file_source));
151+
self.inner = self.inner.with_data_source(Arc::new(file_source));
152152
self
153153
}
154154
}

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

+9-10
Original file line numberDiff line numberDiff line change
@@ -292,17 +292,16 @@ impl ParquetExec {
292292
}
293293
}
294294
fn file_scan_config(&self) -> FileScanConfig {
295-
let source = self.inner.source();
296-
source
295+
self.inner
296+
.data_source()
297297
.as_any()
298298
.downcast_ref::<FileScanConfig>()
299299
.unwrap()
300300
.clone()
301301
}
302302

303303
fn parquet_source(&self) -> ParquetSource {
304-
let source = self.file_scan_config();
305-
source
304+
self.file_scan_config()
306305
.file_source()
307306
.as_any()
308307
.downcast_ref::<ParquetSource>()
@@ -343,7 +342,7 @@ impl ParquetExec {
343342
let file_source = self.file_scan_config();
344343
self.inner = self
345344
.inner
346-
.with_source(Arc::new(file_source.with_source(Arc::new(parquet))));
345+
.with_data_source(Arc::new(file_source.with_source(Arc::new(parquet))));
347346
self.parquet_file_reader_factory = Some(parquet_file_reader_factory);
348347
self
349348
}
@@ -366,7 +365,7 @@ impl ParquetExec {
366365
let file_source = self.file_scan_config();
367366
self.inner = self
368367
.inner
369-
.with_source(Arc::new(file_source.with_source(Arc::new(parquet))));
368+
.with_data_source(Arc::new(file_source.with_source(Arc::new(parquet))));
370369
self.schema_adapter_factory = Some(schema_adapter_factory);
371370
self
372371
}
@@ -380,7 +379,7 @@ impl ParquetExec {
380379
let file_source = self.file_scan_config();
381380
self.inner = self
382381
.inner
383-
.with_source(Arc::new(file_source.with_source(Arc::new(parquet))));
382+
.with_data_source(Arc::new(file_source.with_source(Arc::new(parquet))));
384383
self.table_parquet_options.global.pushdown_filters = pushdown_filters;
385384
self
386385
}
@@ -404,7 +403,7 @@ impl ParquetExec {
404403
let file_source = self.file_scan_config();
405404
self.inner = self
406405
.inner
407-
.with_source(Arc::new(file_source.with_source(Arc::new(parquet))));
406+
.with_data_source(Arc::new(file_source.with_source(Arc::new(parquet))));
408407
self.table_parquet_options.global.reorder_filters = reorder_filters;
409408
self
410409
}
@@ -463,7 +462,7 @@ impl ParquetExec {
463462
) -> Self {
464463
let mut config = self.file_scan_config();
465464
config.file_groups = file_groups;
466-
self.inner = self.inner.with_source(Arc::new(config));
465+
self.inner = self.inner.with_data_source(Arc::new(config));
467466
self
468467
}
469468
}
@@ -1469,7 +1468,7 @@ mod tests {
14691468
])
14701469
.build();
14711470
let partition_count = parquet_exec
1472-
.source()
1471+
.data_source()
14731472
.output_partitioning()
14741473
.partition_count();
14751474
assert_eq!(partition_count, 1);

datafusion/core/src/datasource/physical_plan/parquet/source.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,8 @@ use object_store::ObjectStore;
164164
/// # fn parquet_exec() -> DataSourceExec { unimplemented!() }
165165
/// // Split a single DataSourceExec into multiple DataSourceExecs, one for each file
166166
/// let exec = parquet_exec();
167-
/// let source = exec.source();
168-
/// let base_config = source.as_any().downcast_ref::<FileScanConfig>().unwrap();
167+
/// let data_source = exec.data_source();
168+
/// let base_config = data_source.as_any().downcast_ref::<FileScanConfig>().unwrap();
169169
/// let existing_file_groups = &base_config.file_groups;
170170
/// let new_execs = existing_file_groups
171171
/// .iter()

datafusion/core/src/test/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,9 @@ pub fn partitioned_file_groups(
190190
pub fn partitioned_csv_config(
191191
schema: SchemaRef,
192192
file_groups: Vec<Vec<PartitionedFile>>,
193-
source: Arc<dyn FileSource>,
193+
file_source: Arc<dyn FileSource>,
194194
) -> FileScanConfig {
195-
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, source)
195+
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, file_source)
196196
.with_file_groups(file_groups)
197197
}
198198

datafusion/core/src/test_util/parquet.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -200,17 +200,18 @@ impl TestParquetFile {
200200
/// Recursively searches for DataSourceExec and returns the metrics
201201
/// on the first one it finds
202202
pub fn parquet_metrics(plan: &Arc<dyn ExecutionPlan>) -> Option<MetricsSet> {
203-
if let Some(maybe_file) = plan.as_any().downcast_ref::<DataSourceExec>() {
204-
let source = maybe_file.source();
205-
if let Some(maybe_parquet) = source.as_any().downcast_ref::<FileScanConfig>()
203+
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
204+
let data_source = data_source_exec.data_source();
205+
if let Some(maybe_parquet) =
206+
data_source.as_any().downcast_ref::<FileScanConfig>()
206207
{
207208
if maybe_parquet
208209
.file_source()
209210
.as_any()
210211
.downcast_ref::<ParquetSource>()
211212
.is_some()
212213
{
213-
return maybe_file.metrics();
214+
return data_source_exec.metrics();
214215
}
215216
}
216217
}

datafusion/core/tests/parquet/file_statistics.rs

+18-9
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,12 @@ async fn list_files_with_session_level_cache() {
150150
//Session 1 first time list files
151151
assert_eq!(get_list_file_cache_size(&state1), 0);
152152
let exec1 = table1.scan(&state1, None, &[], None).await.unwrap();
153-
let data_source = exec1.as_any().downcast_ref::<DataSourceExec>().unwrap();
154-
let source = data_source.source();
155-
let parquet1 = source.as_any().downcast_ref::<FileScanConfig>().unwrap();
153+
let data_source_exec = exec1.as_any().downcast_ref::<DataSourceExec>().unwrap();
154+
let data_source = data_source_exec.data_source();
155+
let parquet1 = data_source
156+
.as_any()
157+
.downcast_ref::<FileScanConfig>()
158+
.unwrap();
156159

157160
assert_eq!(get_list_file_cache_size(&state1), 1);
158161
let fg = &parquet1.file_groups;
@@ -163,9 +166,12 @@ async fn list_files_with_session_level_cache() {
163166
//check session 1 cache result not show in session 2
164167
assert_eq!(get_list_file_cache_size(&state2), 0);
165168
let exec2 = table2.scan(&state2, None, &[], None).await.unwrap();
166-
let data_source = exec2.as_any().downcast_ref::<DataSourceExec>().unwrap();
167-
let source = data_source.source();
168-
let parquet2 = source.as_any().downcast_ref::<FileScanConfig>().unwrap();
169+
let data_source_exec = exec2.as_any().downcast_ref::<DataSourceExec>().unwrap();
170+
let data_source = data_source_exec.data_source();
171+
let parquet2 = data_source
172+
.as_any()
173+
.downcast_ref::<FileScanConfig>()
174+
.unwrap();
169175

170176
assert_eq!(get_list_file_cache_size(&state2), 1);
171177
let fg2 = &parquet2.file_groups;
@@ -176,9 +182,12 @@ async fn list_files_with_session_level_cache() {
176182
//check session 1 cache result not show in session 2
177183
assert_eq!(get_list_file_cache_size(&state1), 1);
178184
let exec3 = table1.scan(&state1, None, &[], None).await.unwrap();
179-
let data_source = exec3.as_any().downcast_ref::<DataSourceExec>().unwrap();
180-
let source = data_source.source();
181-
let parquet3 = source.as_any().downcast_ref::<FileScanConfig>().unwrap();
185+
let data_source_exec = exec3.as_any().downcast_ref::<DataSourceExec>().unwrap();
186+
let data_source = data_source_exec.data_source();
187+
let parquet3 = data_source
188+
.as_any()
189+
.downcast_ref::<FileScanConfig>()
190+
.unwrap();
182191

183192
assert_eq!(get_list_file_cache_size(&state1), 1);
184193
let fg = &parquet3.file_groups;

datafusion/core/tests/parquet/utils.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,18 @@ impl MetricsFinder {
4747
impl ExecutionPlanVisitor for MetricsFinder {
4848
type Error = std::convert::Infallible;
4949
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
50-
if let Some(exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
51-
let source = exec.source();
52-
if let Some(file_config) = source.as_any().downcast_ref::<FileScanConfig>() {
50+
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
51+
let data_source = data_source_exec.data_source();
52+
if let Some(file_config) =
53+
data_source.as_any().downcast_ref::<FileScanConfig>()
54+
{
5355
if file_config
5456
.file_source()
5557
.as_any()
5658
.downcast_ref::<ParquetSource>()
5759
.is_some()
5860
{
59-
self.metrics = exec.metrics();
61+
self.metrics = data_source_exec.metrics();
6062
}
6163
}
6264
}

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ fn test_memory_after_projection() -> Result<()> {
460460
.as_any()
461461
.downcast_ref::<DataSourceExec>()
462462
.unwrap()
463-
.source()
463+
.data_source()
464464
.as_any()
465465
.downcast_ref::<MemorySourceConfig>()
466466
.unwrap()

datafusion/core/tests/sql/path_partition.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,12 @@ async fn parquet_partition_pruning_filter() -> Result<()> {
8585
Expr::gt(col("id"), lit(1)),
8686
];
8787
let exec = table.scan(&ctx.state(), None, &filters, None).await?;
88-
let data_source = exec.as_any().downcast_ref::<DataSourceExec>().unwrap();
89-
let source = data_source.source();
90-
let file_source = source.as_any().downcast_ref::<FileScanConfig>().unwrap();
88+
let data_source_exec = exec.as_any().downcast_ref::<DataSourceExec>().unwrap();
89+
let data_source = data_source_exec.data_source();
90+
let file_source = data_source
91+
.as_any()
92+
.downcast_ref::<FileScanConfig>()
93+
.unwrap();
9194
let parquet_config = file_source
9295
.file_source()
9396
.as_any()

0 commit comments

Comments
 (0)