Skip to content

Commit 2dbb0a8

Browse files
authored
feat: upgrade dependencies (#950)
also update row group size
1 parent 5d7cd68 commit 2dbb0a8

File tree

10 files changed

+787
-422
lines changed

10 files changed

+787
-422
lines changed

Cargo.lock

Lines changed: 679 additions & 356 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/Cargo.toml

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,18 @@ build = "build.rs"
1010
[dependencies]
1111
### apache arrow/datafusion dependencies
1212
# arrow = "51.0.0"
13-
arrow-schema = { version = "52.1.0", features = ["serde"] }
14-
arrow-array = { version = "52.1.0" }
15-
arrow-json = "52.1.0"
16-
arrow-ipc = { version = "52.1.0", features = ["zstd"] }
17-
arrow-select = "52.1.0"
18-
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "a64df83502821f18067fb4ff65dd217815b305c9" }
19-
object_store = { version = "0.10.2", features = ["cloud", "aws"] } # cannot update object_store as datafusion has not caught up
20-
parquet = "52.1.0"
21-
arrow-flight = { version = "52.1.0", features = [ "tls" ] }
22-
tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] }
23-
tonic-web = "0.11.0"
24-
tower-http = { version = "0.4.4", features = ["cors"] }
13+
arrow-schema = { version = "53.0.0", features = ["serde"] }
14+
arrow-array = { version = "53.0.0" }
15+
arrow-json = "53.0.0"
16+
arrow-ipc = { version = "53.0.0", features = ["zstd"] }
17+
arrow-select = "53.0.0"
18+
datafusion = "42.0.0"
19+
object_store = { version = "0.11.0", features = ["cloud", "aws"] }
20+
parquet = "53.0.0"
21+
arrow-flight = { version = "53.0.0", features = [ "tls" ] }
22+
tonic = {version = "0.12.3", features = ["tls", "transport", "gzip", "zstd"] }
23+
tonic-web = "0.12.3"
24+
tower-http = { version = "0.6.1", features = ["cors"] }
2525

2626
### actix dependencies
2727
actix-web-httpauth = "0.8"
@@ -53,8 +53,8 @@ clap = { version = "4.1", default-features = false, features = [
5353
"error-context",
5454
] }
5555
clokwerk = "0.4"
56-
crossterm = "0.27.0"
57-
derive_more = "0.99"
56+
crossterm = "0.28.1"
57+
derive_more = "0.99.18"
5858
env_logger = "0.11.3"
5959
fs_extra = "1.3"
6060
futures = "0.3"
@@ -68,7 +68,7 @@ log = "0.4"
6868
num_cpus = "1.15"
6969
once_cell = "1.17.1"
7070
prometheus = { version = "0.13", features = ["process"] }
71-
rand = "0.8"
71+
rand = "0.8.5"
7272
regex = "1.7.3"
7373
relative-path = { version = "1.7", features = ["serde"] }
7474
reqwest = { version = "0.11.27", default-features = false, features = [
@@ -81,8 +81,8 @@ semver = "1.0"
8181
serde = { version = "1.0", features = ["rc", "derive"] }
8282
serde_json = "1.0"
8383
static-files = "0.2"
84-
sysinfo = "0.30.11"
85-
thiserror = "1"
84+
sysinfo = "0.31.4"
85+
thiserror = "1.0.64"
8686
thread-priority = "1.0.0"
8787
tokio = { version = "1.28", default-features = false, features = [
8888
"sync",
@@ -97,13 +97,13 @@ xz2 = { version = "*", features = ["static"] }
9797
nom = "7.1.3"
9898
humantime = "2.1.0"
9999
human-size = "0.4"
100-
openid = { version = "0.14.0", default-features = false, features = ["rustls"] }
100+
openid = { version = "0.15.0", default-features = false, features = ["rustls"] }
101101
url = "2.4.0"
102102
http-auth-basic = "0.3.3"
103103
serde_repr = "0.1.17"
104104
hashlru = { version = "0.11.0", features = ["serde"] }
105105
path-clean = "1.0.1"
106-
prost = "0.12.3"
106+
prost = "0.13.3"
107107
prometheus-parse = "0.2.5"
108108
sha2 = "0.10.8"
109109

@@ -113,13 +113,13 @@ sha1_smol = { version = "1.0", features = ["std"] }
113113
static-files = "0.2"
114114
ureq = "2.6"
115115
vergen = { version = "8.1", features = ["build", "git", "cargo", "gitcl"] }
116-
zip = { version = "1.1.1", default-features = false, features = ["deflate"] }
116+
zip = { version = "2.2.0", default-features = false, features = ["deflate"] }
117117
url = "2.4.0"
118-
prost-build = "0.12.3"
118+
prost-build = "0.13.3"
119119

120120
[dev-dependencies]
121121
maplit = "1.0"
122-
rstest = "0.19.0"
122+
rstest = "0.23.0"
123123

124124
[package.metadata.parseable_ui]
125125
assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.6/build.zip"

server/src/catalog/column.rs

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -136,44 +136,60 @@ pub struct Column {
136136
impl TryFrom<&Statistics> for TypedStatistics {
137137
type Error = parquet::errors::ParquetError;
138138
fn try_from(value: &Statistics) -> Result<Self, Self::Error> {
139-
if !value.has_min_max_set() {
139+
if value.min_bytes_opt().is_none() || value.max_bytes_opt().is_none() {
140140
return Err(parquet::errors::ParquetError::General(
141141
"min max is not set".to_string(),
142142
));
143143
}
144144

145145
let res = match value {
146146
Statistics::Boolean(stats) => TypedStatistics::Bool(BoolType {
147-
min: *stats.min(),
148-
max: *stats.max(),
147+
min: *stats.min_opt().expect("Boolean stats min not set"),
148+
max: *stats.max_opt().expect("Boolean stats max not set"),
149149
}),
150150
Statistics::Int32(stats) => TypedStatistics::Int(Int64Type {
151-
min: *stats.min() as i64,
152-
max: *stats.max() as i64,
151+
min: *stats.min_opt().expect("Int32 stats min not set") as i64,
152+
max: *stats.max_opt().expect("Int32 stats max not set") as i64,
153153
}),
154154
Statistics::Int64(stats) => TypedStatistics::Int(Int64Type {
155-
min: *stats.min(),
156-
max: *stats.max(),
155+
min: *stats.min_opt().expect("Int64 stats min not set"),
156+
max: *stats.max_opt().expect("Int64 stats max not set"),
157157
}),
158158
Statistics::Int96(stats) => TypedStatistics::Int(Int64Type {
159-
min: stats.min().to_i64(),
160-
max: stats.max().to_i64(),
159+
min: stats.min_opt().expect("Int96 stats min not set").to_i64(),
160+
max: stats.max_opt().expect("Int96 stats max not set").to_i64(),
161161
}),
162162
Statistics::Float(stats) => TypedStatistics::Float(Float64Type {
163-
min: *stats.min() as f64,
164-
max: *stats.max() as f64,
163+
min: *stats.min_opt().expect("Float32 stats min not set") as f64,
164+
max: *stats.max_opt().expect("Float32 stats max not set") as f64,
165165
}),
166166
Statistics::Double(stats) => TypedStatistics::Float(Float64Type {
167-
min: *stats.min(),
168-
max: *stats.max(),
167+
min: *stats.min_opt().expect("Float64 stats min not set"),
168+
max: *stats.max_opt().expect("Float64 stats max not set"),
169169
}),
170170
Statistics::ByteArray(stats) => TypedStatistics::String(Utf8Type {
171-
min: stats.min().as_utf8()?.to_owned(),
172-
max: stats.max().as_utf8()?.to_owned(),
171+
min: stats
172+
.min_opt()
173+
.expect("Utf8 stats min not set")
174+
.as_utf8()?
175+
.to_owned(),
176+
max: stats
177+
.max_opt()
178+
.expect("Utf8 stats max not set")
179+
.as_utf8()?
180+
.to_owned(),
173181
}),
174182
Statistics::FixedLenByteArray(stats) => TypedStatistics::String(Utf8Type {
175-
min: stats.min().as_utf8()?.to_owned(),
176-
max: stats.max().as_utf8()?.to_owned(),
183+
min: stats
184+
.min_opt()
185+
.expect("Utf8 stats min not set")
186+
.as_utf8()?
187+
.to_owned(),
188+
max: stats
189+
.max_opt()
190+
.expect("Utf8 stats max not set")
191+
.as_utf8()?
192+
.to_owned(),
177193
}),
178194
};
179195

server/src/cli.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,12 +421,15 @@ impl Cli {
421421
.help("Set a fixed memory limit for query"),
422422
)
423423
.arg(
424+
// RowGroupSize controls the number of rows present in one row group
425+
// More rows = better compression but HIGHER Memory consumption during read/write
426+
// 1048576 is the default value for DataFusion
424427
Arg::new(Self::ROW_GROUP_SIZE)
425428
.long(Self::ROW_GROUP_SIZE)
426429
.env("P_PARQUET_ROW_GROUP_SIZE")
427430
.value_name("NUMBER")
428431
.required(false)
429-
.default_value("16384")
432+
.default_value("1048576")
430433
.value_parser(value_parser!(usize))
431434
.help("Number of rows in a row group"),
432435
).arg(

server/src/query.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ use datafusion::arrow::record_batch::RecordBatch;
2626

2727
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
2828
use datafusion::error::DataFusionError;
29-
use datafusion::execution::context::SessionState;
3029
use datafusion::execution::disk_manager::DiskManagerConfig;
3130
use datafusion::execution::runtime_env::RuntimeEnv;
31+
use datafusion::execution::SessionStateBuilder;
3232
use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringifiedPlan};
3333
use datafusion::prelude::*;
3434
use itertools::Itertools;
@@ -81,12 +81,37 @@ impl Query {
8181
let runtime_config = runtime_config.with_memory_limit(pool_size, fraction);
8282
let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap());
8383

84-
let config = SessionConfig::default()
84+
let mut config = SessionConfig::default()
8585
.with_parquet_pruning(true)
8686
.with_prefer_existing_sort(true)
8787
.with_round_robin_repartition(true);
8888

89-
let state = SessionState::new_with_config_rt(config, runtime);
89+
// For more details refer https://datafusion.apache.org/user-guide/configs.html
90+
91+
// Reduce the number of rows read (if possible)
92+
config.options_mut().execution.parquet.enable_page_index = true;
93+
94+
// Pushdown filters allows DF to push the filters as far down in the plan as possible
95+
// and thus, reducing the number of rows decoded
96+
config.options_mut().execution.parquet.pushdown_filters = true;
97+
98+
// Reorder filters allows DF to decide the order of filters minimizing the cost of filter evaluation
99+
config.options_mut().execution.parquet.reorder_filters = true;
100+
101+
// Enable StringViewArray
102+
// https://www.influxdata.com/blog/faster-queries-with-stringview-part-one-influxdb/
103+
config
104+
.options_mut()
105+
.execution
106+
.parquet
107+
.schema_force_view_types = true;
108+
109+
let state = SessionStateBuilder::new()
110+
.with_default_features()
111+
.with_config(config)
112+
.with_runtime_env(runtime)
113+
.build();
114+
90115
let schema_provider = Arc::new(GlobalSchemaProvider {
91116
storage: storage.get_object_store(),
92117
});

server/src/query/listing_table_builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use datafusion::{
2525
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
2626
},
2727
error::DataFusionError,
28-
logical_expr::{col, Expr},
28+
logical_expr::{col, SortExpr},
2929
};
3030
use futures_util::{future, stream::FuturesUnordered, Future, TryStreamExt};
3131
use itertools::Itertools;
@@ -188,7 +188,7 @@ impl ListingTableBuilder {
188188
if self.listing.is_empty() {
189189
return Ok(None);
190190
}
191-
let file_sort_order: Vec<Vec<Expr>>;
191+
let file_sort_order: Vec<Vec<SortExpr>>;
192192
let file_format = ParquetFormat::default().with_enable_pruning(true);
193193
if let Some(time_partition) = time_partition {
194194
file_sort_order = vec![vec![col(time_partition).sort(true, false)]];

server/src/query/stream_schema_provider.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,11 @@ use arrow_array::RecordBatch;
2727
use arrow_schema::{Schema, SchemaRef, SortOptions};
2828
use bytes::Bytes;
2929
use chrono::{DateTime, NaiveDateTime, Timelike, Utc};
30+
use datafusion::catalog::Session;
3031
use datafusion::common::stats::Precision;
3132
use datafusion::logical_expr::utils::conjunction;
3233
use datafusion::{
33-
catalog::schema::SchemaProvider,
34+
catalog::SchemaProvider,
3435
common::{
3536
tree_node::{TreeNode, TreeNodeRecursion},
3637
ToDFSchema,
@@ -122,7 +123,7 @@ async fn create_parquet_physical_plan(
122123
projection: Option<&Vec<usize>>,
123124
filters: &[Expr],
124125
limit: Option<usize>,
125-
state: &SessionState,
126+
state: &dyn Session,
126127
time_partition: Option<String>,
127128
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
128129
let filters = if let Some(expr) = conjunction(filters.to_vec()) {
@@ -149,7 +150,7 @@ async fn create_parquet_physical_plan(
149150
// create the execution plan
150151
let plan = file_format
151152
.create_physical_plan(
152-
state,
153+
state.as_any().downcast_ref::<SessionState>().unwrap(), // Remove this when ParquetFormat catches up
153154
FileScanConfig {
154155
object_store_url,
155156
file_schema: schema.clone(),
@@ -216,8 +217,8 @@ async fn collect_from_snapshot(
216217
fn partitioned_files(
217218
manifest_files: Vec<catalog::manifest::File>,
218219
table_schema: &Schema,
219-
target_partition: usize,
220220
) -> (Vec<Vec<PartitionedFile>>, datafusion::common::Statistics) {
221+
let target_partition = num_cpus::get();
221222
let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new()));
222223
let mut column_statistics = HashMap::<String, Option<catalog::column::TypedStatistics>>::new();
223224
let mut count = 0;
@@ -288,7 +289,7 @@ impl TableProvider for StandardTableProvider {
288289

289290
async fn scan(
290291
&self,
291-
state: &SessionState,
292+
state: &dyn Session,
292293
projection: Option<&Vec<usize>>,
293294
filters: &[Expr],
294295
limit: Option<usize>,
@@ -435,7 +436,7 @@ impl TableProvider for StandardTableProvider {
435436
);
436437
}
437438

438-
let (partitioned_files, statistics) = partitioned_files(manifest_files, &self.schema, 1);
439+
let (partitioned_files, statistics) = partitioned_files(manifest_files, &self.schema);
439440
let remote_exec = create_parquet_physical_plan(
440441
ObjectStoreUrl::parse(glob_storage.store_url()).unwrap(),
441442
partitioned_files,
@@ -496,7 +497,7 @@ async fn get_cache_exectuion_plan(
496497
projection: Option<&Vec<usize>>,
497498
filters: &[Expr],
498499
limit: Option<usize>,
499-
state: &SessionState,
500+
state: &dyn Session,
500501
time_partition: Option<String>,
501502
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> {
502503
let (cached, remainder) = cache_manager
@@ -519,7 +520,7 @@ async fn get_cache_exectuion_plan(
519520
})
520521
.collect();
521522

522-
let (partitioned_files, statistics) = partitioned_files(cached, &schema, 1);
523+
let (partitioned_files, statistics) = partitioned_files(cached, &schema);
523524
let plan = create_parquet_physical_plan(
524525
ObjectStoreUrl::parse("file:///").unwrap(),
525526
partitioned_files,
@@ -545,7 +546,7 @@ async fn get_hottier_exectuion_plan(
545546
projection: Option<&Vec<usize>>,
546547
filters: &[Expr],
547548
limit: Option<usize>,
548-
state: &SessionState,
549+
state: &dyn Session,
549550
time_partition: Option<String>,
550551
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> {
551552
let (hot_tier_files, remainder) = hot_tier_manager
@@ -570,7 +571,7 @@ async fn get_hottier_exectuion_plan(
570571
})
571572
.collect();
572573

573-
let (partitioned_files, statistics) = partitioned_files(hot_tier_files, &schema, 1);
574+
let (partitioned_files, statistics) = partitioned_files(hot_tier_files, &schema);
574575
let plan = create_parquet_physical_plan(
575576
ObjectStoreUrl::parse("file:///").unwrap(),
576577
partitioned_files,
@@ -594,7 +595,7 @@ async fn legacy_listing_table(
594595
object_store: Arc<dyn ObjectStore>,
595596
time_filters: &[PartialTimeFilter],
596597
schema: Arc<Schema>,
597-
state: &SessionState,
598+
state: &dyn Session,
598599
projection: Option<&Vec<usize>>,
599600
filters: &[Expr],
600601
limit: Option<usize>,
@@ -868,10 +869,7 @@ fn extract_timestamp_bound(
868869
binexpr: BinaryExpr,
869870
time_partition: Option<String>,
870871
) -> Option<(Operator, NaiveDateTime)> {
871-
Some((
872-
binexpr.op.clone(),
873-
extract_from_lit(binexpr, time_partition)?,
874-
))
872+
Some((binexpr.op, extract_from_lit(binexpr, time_partition)?))
875873
}
876874

877875
async fn collect_manifest_files(
@@ -942,7 +940,7 @@ trait ManifestExt: ManifestFile {
942940
return None;
943941
};
944942
/* `BinaryExp` doesn't implement `Copy` */
945-
Some((expr.op.clone(), value))
943+
Some((expr.op, value))
946944
}
947945

948946
let Some(col) = self.find_matching_column(partial_filter) else {

server/src/utils/arrow/flight.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ use futures::{stream, TryStreamExt};
3939
use tonic::{Request, Response, Status};
4040

4141
use arrow_flight::FlightClient;
42-
use http::Uri;
43-
use tonic::transport::Channel;
42+
// use http::Uri;
43+
use tonic::transport::{Channel, Uri};
4444

4545
pub type DoGetStream = stream::BoxStream<'static, Result<FlightData, Status>>;
4646

0 commit comments

Comments
 (0)