Skip to content

Commit 5e3577f

Browse files
author
Devdutt Shenoi
authored
feat: query parquet files still in staging (#1199)
Signed-off-by: Devdutt Shenoi <[email protected]>
1 parent 3e02f29 commit 5e3577f

File tree

1 file changed

+88
-41
lines changed

1 file changed

+88
-41
lines changed

src/query/stream_schema_provider.rs

Lines changed: 88 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,16 @@
1616
*
1717
*/
1818

19-
use crate::catalog::manifest::File;
20-
use crate::hottier::HotTierManager;
21-
use crate::option::Mode;
22-
use crate::parseable::STREAM_EXISTS;
23-
use crate::{
24-
catalog::snapshot::{self, Snapshot},
25-
storage::{ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
26-
};
19+
use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc};
20+
2721
use arrow_array::RecordBatch;
2822
use arrow_schema::{Schema, SchemaRef, SortOptions};
2923
use bytes::Bytes;
3024
use chrono::{DateTime, NaiveDateTime, TimeDelta, Timelike, Utc};
31-
use datafusion::catalog::Session;
32-
use datafusion::common::stats::Precision;
33-
use datafusion::logical_expr::utils::conjunction;
34-
use datafusion::physical_expr::LexOrdering;
3525
use datafusion::{
36-
catalog::SchemaProvider,
26+
catalog::{SchemaProvider, Session},
3727
common::{
28+
stats::Precision,
3829
tree_node::{TreeNode, TreeNodeRecursion},
3930
ToDFSchema,
4031
},
@@ -46,32 +37,36 @@ use datafusion::{
4637
},
4738
error::{DataFusionError, Result as DataFusionResult},
4839
execution::{context::SessionState, object_store::ObjectStoreUrl},
49-
logical_expr::{BinaryExpr, Operator, TableProviderFilterPushDown, TableType},
50-
physical_expr::{create_physical_expr, PhysicalSortExpr},
51-
physical_plan::{self, empty::EmptyExec, union::UnionExec, ExecutionPlan, Statistics},
40+
logical_expr::{
41+
utils::conjunction, BinaryExpr, Operator, TableProviderFilterPushDown, TableType,
42+
},
43+
physical_expr::{create_physical_expr, expressions::col, LexOrdering, PhysicalSortExpr},
44+
physical_plan::{empty::EmptyExec, union::UnionExec, ExecutionPlan, Statistics},
5245
prelude::Expr,
5346
scalar::ScalarValue,
5447
};
55-
5648
use futures_util::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt};
5749
use itertools::Itertools;
5850
use object_store::{path::Path, ObjectStore};
5951
use relative_path::RelativePathBuf;
60-
use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc};
6152
use url::Url;
6253

6354
use crate::{
6455
catalog::{
65-
self, column::TypedStatistics, manifest::Manifest, snapshot::ManifestItem, ManifestFile,
56+
column::{Column, TypedStatistics},
57+
manifest::{File, Manifest},
58+
snapshot::{ManifestItem, Snapshot},
59+
ManifestFile, Snapshot as CatalogSnapshot,
6660
},
6761
event::DEFAULT_TIMESTAMP_KEY,
62+
hottier::HotTierManager,
6863
metrics::QUERY_CACHE_HIT,
69-
parseable::PARSEABLE,
70-
storage::ObjectStorage,
64+
option::Mode,
65+
parseable::{PARSEABLE, STREAM_EXISTS},
66+
storage::{ObjectStorage, ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
7167
};
7268

7369
use super::listing_table_builder::ListingTableBuilder;
74-
use crate::catalog::Snapshot as CatalogSnapshot;
7570

7671
// schema provider for stream based on global data
7772
#[derive(Debug)]
@@ -142,9 +137,9 @@ impl StandardTableProvider {
142137

143138
let sort_expr = PhysicalSortExpr {
144139
expr: if let Some(time_partition) = time_partition {
145-
physical_plan::expressions::col(&time_partition, &self.schema)?
140+
col(&time_partition, &self.schema)?
146141
} else {
147-
physical_plan::expressions::col(DEFAULT_TIMESTAMP_KEY, &self.schema)?
142+
col(DEFAULT_TIMESTAMP_KEY, &self.schema)?
148143
},
149144
options: SortOptions {
150145
descending: true,
@@ -223,6 +218,59 @@ impl StandardTableProvider {
223218
Ok(())
224219
}
225220

221+
/// Create an execution plan over the records in arrows and parquet that are still in staging, awaiting push to object storage
222+
async fn get_staging_execution_plan(
223+
&self,
224+
execution_plans: &mut Vec<Arc<dyn ExecutionPlan>>,
225+
projection: Option<&Vec<usize>>,
226+
filters: &[Expr],
227+
limit: Option<usize>,
228+
state: &dyn Session,
229+
time_partition: Option<&String>,
230+
) -> Result<(), DataFusionError> {
231+
let Ok(staging) = PARSEABLE.get_stream(&self.stream) else {
232+
return Ok(());
233+
};
234+
235+
// Staging arrow exection plan
236+
let records = staging.recordbatches_cloned(&self.schema);
237+
let arrow_exec = reversed_mem_table(records, self.schema.clone())?
238+
.scan(state, projection, filters, limit)
239+
.await?;
240+
execution_plans.push(arrow_exec);
241+
242+
// Get a list of parquet files still in staging, order by filename
243+
let mut parquet_files = staging.parquet_files();
244+
parquet_files.sort_by(|a, b| a.cmp(b).reverse());
245+
246+
// NOTE: We don't partition among CPUs to ensure consistent results.
247+
// i.e. We were seeing in-consistent ordering when querying over parquets in staging.
248+
let mut partitioned_files = Vec::with_capacity(parquet_files.len());
249+
for file_path in parquet_files {
250+
let Ok(file_meta) = file_path.metadata() else {
251+
continue;
252+
};
253+
let file = PartitionedFile::new(file_path.display().to_string(), file_meta.len());
254+
partitioned_files.push(file)
255+
}
256+
257+
// NOTE: There is the possibility of a parquet file being pushed to object store
258+
// and deleted from staging in the time it takes for datafusion to get to it.
259+
// Staging parquet execution plan
260+
self.create_parquet_physical_plan(
261+
execution_plans,
262+
ObjectStoreUrl::parse("file:///").unwrap(),
263+
vec![partitioned_files],
264+
Statistics::new_unknown(&self.schema),
265+
projection,
266+
filters,
267+
limit,
268+
state,
269+
time_partition.cloned(),
270+
)
271+
.await
272+
}
273+
226274
#[allow(clippy::too_many_arguments)]
227275
async fn legacy_listing_table(
228276
&self,
@@ -277,20 +325,19 @@ impl StandardTableProvider {
277325

278326
fn partitioned_files(
279327
&self,
280-
manifest_files: Vec<catalog::manifest::File>,
328+
manifest_files: Vec<File>,
281329
) -> (Vec<Vec<PartitionedFile>>, datafusion::common::Statistics) {
282330
let target_partition = num_cpus::get();
283331
let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new()));
284-
let mut column_statistics =
285-
HashMap::<String, Option<catalog::column::TypedStatistics>>::new();
332+
let mut column_statistics = HashMap::<String, Option<TypedStatistics>>::new();
286333
let mut count = 0;
287334
for (index, file) in manifest_files
288335
.into_iter()
289336
.enumerate()
290337
.map(|(x, y)| (x % target_partition, y))
291338
{
292339
#[allow(unused_mut)]
293-
let catalog::manifest::File {
340+
let File {
294341
mut file_path,
295342
num_rows,
296343
columns,
@@ -357,12 +404,12 @@ impl StandardTableProvider {
357404
}
358405

359406
async fn collect_from_snapshot(
360-
snapshot: &catalog::snapshot::Snapshot,
407+
snapshot: &Snapshot,
361408
time_filters: &[PartialTimeFilter],
362409
object_store: Arc<dyn ObjectStore>,
363410
filters: &[Expr],
364411
limit: Option<usize>,
365-
) -> Result<Vec<catalog::manifest::File>, DataFusionError> {
412+
) -> Result<Vec<File>, DataFusionError> {
366413
let items = snapshot.manifests(time_filters);
367414
let manifest_files = collect_manifest_files(
368415
object_store,
@@ -443,17 +490,17 @@ impl TableProvider for StandardTableProvider {
443490
}
444491

445492
if is_within_staging_window(&time_filters) {
446-
if let Ok(staging) = PARSEABLE.get_stream(&self.stream) {
447-
let records = staging.recordbatches_cloned(&self.schema);
448-
let reversed_mem_table = reversed_mem_table(records, self.schema.clone())?;
449-
450-
let memory_exec = reversed_mem_table
451-
.scan(state, projection, filters, limit)
452-
.await?;
453-
execution_plans.push(memory_exec);
454-
}
493+
self.get_staging_execution_plan(
494+
&mut execution_plans,
495+
projection,
496+
filters,
497+
limit,
498+
state,
499+
time_partition.as_ref(),
500+
)
501+
.await?;
455502
};
456-
let mut merged_snapshot: snapshot::Snapshot = Snapshot::default();
503+
let mut merged_snapshot = Snapshot::default();
457504
if PARSEABLE.options.mode == Mode::Query {
458505
let path = RelativePathBuf::from_iter([&self.stream, STREAM_ROOT_DIRECTORY]);
459506
let obs = glob_storage
@@ -848,7 +895,7 @@ pub fn extract_primary_filter(
848895
}
849896

850897
trait ManifestExt: ManifestFile {
851-
fn find_matching_column(&self, partial_filter: &Expr) -> Option<&catalog::column::Column> {
898+
fn find_matching_column(&self, partial_filter: &Expr) -> Option<&Column> {
852899
let name = match partial_filter {
853900
Expr::BinaryExpr(binary_expr) => {
854901
let Expr::Column(col) = binary_expr.left.as_ref() else {

0 commit comments

Comments
 (0)