Skip to content

Commit fba5cc0

Browse files
Streaming CLI support (#8651)
* Streaming CLI support * Update Cargo.toml * Remove duplications * Clean up * Stream test will be added * Update print_format.rs * Address feedback * Final fix --------- Co-authored-by: Mehmet Ozan Kabak <[email protected]>
1 parent 1737d49 commit fba5cc0

File tree

8 files changed

+295
-161
lines changed

8 files changed

+295
-161
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ arrow = { version = "49.0.0", features = ["prettyprint"] }
3636
arrow-array = { version = "49.0.0", default-features = false, features = ["chrono-tz"] }
3737
arrow-buffer = { version = "49.0.0", default-features = false }
3838
arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] }
39-
arrow-ipc = { version = "49.0.0", default-features = false, features=["lz4"] }
39+
arrow-ipc = { version = "49.0.0", default-features = false, features = ["lz4"] }
4040
arrow-ord = { version = "49.0.0", default-features = false }
4141
arrow-schema = { version = "49.0.0", default-features = false }
4242
async-trait = "0.1.73"

datafusion-cli/Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-cli/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ datafusion = { path = "../datafusion/core", version = "34.0.0", features = ["avr
3838
datafusion-common = { path = "../datafusion/common" }
3939
dirs = "4.0.0"
4040
env_logger = "0.9"
41+
futures = "0.3"
4142
mimalloc = { version = "0.1", default-features = false }
4243
object_store = { version = "0.8.0", features = ["aws", "gcp"] }
4344
parking_lot = { version = "0.12" }

datafusion-cli/src/exec.rs

+34-32
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717

1818
//! Execution functions
1919
20+
use std::io::prelude::*;
21+
use std::io::BufReader;
22+
use std::time::Instant;
23+
use std::{fs::File, sync::Arc};
24+
25+
use crate::print_format::PrintFormat;
2026
use crate::{
2127
command::{Command, OutputFormat},
2228
helper::{unescape_input, CliHelper},
@@ -26,21 +32,19 @@ use crate::{
2632
},
2733
print_options::{MaxRows, PrintOptions},
2834
};
29-
use datafusion::common::plan_datafusion_err;
35+
36+
use datafusion::common::{exec_datafusion_err, plan_datafusion_err};
37+
use datafusion::datasource::listing::ListingTableUrl;
38+
use datafusion::datasource::physical_plan::is_plan_streaming;
39+
use datafusion::error::{DataFusionError, Result};
40+
use datafusion::logical_expr::{CreateExternalTable, DdlStatement, LogicalPlan};
41+
use datafusion::physical_plan::{collect, execute_stream};
42+
use datafusion::prelude::SessionContext;
3043
use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str};
31-
use datafusion::{
32-
datasource::listing::ListingTableUrl,
33-
error::{DataFusionError, Result},
34-
logical_expr::{CreateExternalTable, DdlStatement},
35-
};
36-
use datafusion::{logical_expr::LogicalPlan, prelude::SessionContext};
44+
3745
use object_store::ObjectStore;
3846
use rustyline::error::ReadlineError;
3947
use rustyline::Editor;
40-
use std::io::prelude::*;
41-
use std::io::BufReader;
42-
use std::time::Instant;
43-
use std::{fs::File, sync::Arc};
4448
use url::Url;
4549

4650
/// run and execute SQL statements and commands, against a context with the given print options
@@ -125,8 +129,6 @@ pub async fn exec_from_repl(
125129
)));
126130
rl.load_history(".history").ok();
127131

128-
let mut print_options = print_options.clone();
129-
130132
loop {
131133
match rl.readline("❯ ") {
132134
Ok(line) if line.starts_with('\\') => {
@@ -138,9 +140,7 @@ pub async fn exec_from_repl(
138140
Command::OutputFormat(subcommand) => {
139141
if let Some(subcommand) = subcommand {
140142
if let Ok(command) = subcommand.parse::<OutputFormat>() {
141-
if let Err(e) =
142-
command.execute(&mut print_options).await
143-
{
143+
if let Err(e) = command.execute(print_options).await {
144144
eprintln!("{e}")
145145
}
146146
} else {
@@ -154,7 +154,7 @@ pub async fn exec_from_repl(
154154
}
155155
}
156156
_ => {
157-
if let Err(e) = cmd.execute(ctx, &mut print_options).await {
157+
if let Err(e) = cmd.execute(ctx, print_options).await {
158158
eprintln!("{e}")
159159
}
160160
}
@@ -165,7 +165,7 @@ pub async fn exec_from_repl(
165165
}
166166
Ok(line) => {
167167
rl.add_history_entry(line.trim_end())?;
168-
match exec_and_print(ctx, &print_options, line).await {
168+
match exec_and_print(ctx, print_options, line).await {
169169
Ok(_) => {}
170170
Err(err) => eprintln!("{err}"),
171171
}
@@ -198,7 +198,6 @@ async fn exec_and_print(
198198
sql: String,
199199
) -> Result<()> {
200200
let now = Instant::now();
201-
202201
let sql = unescape_input(&sql)?;
203202
let task_ctx = ctx.task_ctx();
204203
let dialect = &task_ctx.session_config().options().sql_parser.dialect;
@@ -227,18 +226,24 @@ async fn exec_and_print(
227226
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
228227
create_external_table(ctx, cmd).await?;
229228
}
229+
230230
let df = ctx.execute_logical_plan(plan).await?;
231-
let results = df.collect().await?;
231+
let physical_plan = df.create_physical_plan().await?;
232232

233-
let print_options = if should_ignore_maxrows {
234-
PrintOptions {
235-
maxrows: MaxRows::Unlimited,
236-
..print_options.clone()
237-
}
233+
if is_plan_streaming(&physical_plan)? {
234+
let stream = execute_stream(physical_plan, task_ctx.clone())?;
235+
print_options.print_stream(stream, now).await?;
238236
} else {
239-
print_options.clone()
240-
};
241-
print_options.print_batches(&results, now)?;
237+
let mut print_options = print_options.clone();
238+
if should_ignore_maxrows {
239+
print_options.maxrows = MaxRows::Unlimited;
240+
}
241+
if print_options.format == PrintFormat::Automatic {
242+
print_options.format = PrintFormat::Table;
243+
}
244+
let results = collect(physical_plan, task_ctx.clone()).await?;
245+
print_options.print_batches(&results, now)?;
246+
}
242247
}
243248

244249
Ok(())
@@ -272,10 +277,7 @@ async fn create_external_table(
272277
.object_store_registry
273278
.get_store(url)
274279
.map_err(|_| {
275-
DataFusionError::Execution(format!(
276-
"Unsupported object store scheme: {}",
277-
scheme
278-
))
280+
exec_datafusion_err!("Unsupported object store scheme: {}", scheme)
279281
})?
280282
}
281283
};

datafusion-cli/src/main.rs

+10-9
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use clap::Parser;
18+
use std::collections::HashMap;
19+
use std::env;
20+
use std::path::Path;
21+
use std::str::FromStr;
22+
use std::sync::{Arc, OnceLock};
23+
1924
use datafusion::error::{DataFusionError, Result};
2025
use datafusion::execution::context::SessionConfig;
2126
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool};
@@ -29,12 +34,9 @@ use datafusion_cli::{
2934
print_options::{MaxRows, PrintOptions},
3035
DATAFUSION_CLI_VERSION,
3136
};
37+
38+
use clap::Parser;
3239
use mimalloc::MiMalloc;
33-
use std::collections::HashMap;
34-
use std::env;
35-
use std::path::Path;
36-
use std::str::FromStr;
37-
use std::sync::{Arc, OnceLock};
3840

3941
#[global_allocator]
4042
static GLOBAL: MiMalloc = MiMalloc;
@@ -111,7 +113,7 @@ struct Args {
111113
)]
112114
rc: Option<Vec<String>>,
113115

114-
#[clap(long, arg_enum, default_value_t = PrintFormat::Table)]
116+
#[clap(long, arg_enum, default_value_t = PrintFormat::Automatic)]
115117
format: PrintFormat,
116118

117119
#[clap(
@@ -331,9 +333,8 @@ fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
331333

332334
#[cfg(test)]
333335
mod tests {
334-
use datafusion::assert_batches_eq;
335-
336336
use super::*;
337+
use datafusion::assert_batches_eq;
337338

338339
fn assert_conversion(input: &str, expected: Result<usize, String>) {
339340
let result = extract_memory_pool_size(input);

0 commit comments

Comments
 (0)