Skip to content

Commit 7fb0594

Browse files
committed
Configure datafusion to prefer existing sort
This ensures queries with filter clauses return data in order. apache/datafusion#10572 (comment)
1 parent 4191bd0 commit 7fb0594

File tree

2 files changed

+6
-4
lines changed

2 files changed

+6
-4
lines changed

nautilus_core/persistence/src/backend/session.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ impl DataBackendSession {
7171
.enable_all()
7272
.build()
7373
.unwrap();
74-
let session_cfg =
75-
SessionConfig::new().set_str("datafusion.optimizer.repartition_file_scans", "false");
74+
let session_cfg = SessionConfig::new()
75+
.set_str("datafusion.optimizer.repartition_file_scans", "false")
76+
.set_str("datafusion.optimizer.prefer_existing_sort", "true");
7677
let session_ctx = SessionContext::new_with_config(session_cfg);
7778
Self {
7879
session_ctx,
@@ -119,7 +120,7 @@ impl DataBackendSession {
119120
file_sort_order: vec![vec![Expr::Sort(Sort {
120121
expr: Box::new(col("ts_init")),
121122
asc: true,
122-
nulls_first: true,
123+
nulls_first: false,
123124
})]],
124125
..Default::default()
125126
};
@@ -129,7 +130,7 @@ impl DataBackendSession {
129130
parquet_options,
130131
))?;
131132

132-
let default_query = format!("SELECT * FROM {}", &table_name);
133+
let default_query = format!("SELECT * FROM {} ORDER BY ts_init", &table_name);
133134
let sql_query = sql_query.unwrap_or(&default_query);
134135
let query = self.runtime.block_on(self.session_ctx.sql(sql_query))?;
135136

nautilus_trader/persistence/catalog/parquet.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,7 @@ def _build_query(
590590
if conditions:
591591
query += f" WHERE {' AND '.join(conditions)}"
592592

593+
query += " ORDER BY ts_init"
593594
return query
594595

595596
@staticmethod

0 commit comments

Comments
 (0)