Skip to content

Commit f40e0db

Browse files
blagininalamb
andauthored
Add FormatOptions to Config (#15793)
* Add `FormatOptions` to Config * Fix `output_with_header` * Add cli test * Add `to_string` * Prettify * Prettify * Preserve the initial `NULL` logic * Cleanup * Remove `lt` as no longer needed * Format assert * Fix sqllogictest * Fix tests * Set formatting params for dates / times * Lowercase `duration_format` --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent b90e2a0 commit f40e0db

File tree

15 files changed

+250
-63
lines changed

15 files changed

+250
-63
lines changed

datafusion-cli/src/command.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,15 @@ impl Command {
6464
let command_batch = all_commands_info();
6565
let schema = command_batch.schema();
6666
let num_rows = command_batch.num_rows();
67-
print_options.print_batches(schema, &[command_batch], now, num_rows)
67+
let task_ctx = ctx.task_ctx();
68+
let config = &task_ctx.session_config().options().format;
69+
print_options.print_batches(
70+
schema,
71+
&[command_batch],
72+
now,
73+
num_rows,
74+
config,
75+
)
6876
}
6977
Self::ListTables => {
7078
exec_and_print(ctx, print_options, "SHOW TABLES".into()).await

datafusion-cli/src/exec.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,8 @@ pub(super) async fn exec_and_print(
216216
) -> Result<()> {
217217
let now = Instant::now();
218218
let task_ctx = ctx.task_ctx();
219-
let dialect = &task_ctx.session_config().options().sql_parser.dialect;
219+
let options = task_ctx.session_config().options();
220+
let dialect = &options.sql_parser.dialect;
220221
let dialect = dialect_from_str(dialect).ok_or_else(|| {
221222
plan_datafusion_err!(
222223
"Unsupported SQL dialect: {dialect}. Available dialects: \
@@ -250,7 +251,9 @@ pub(super) async fn exec_and_print(
250251
// As the input stream comes, we can generate results.
251252
// However, memory safety is not guaranteed.
252253
let stream = execute_stream(physical_plan, task_ctx.clone())?;
253-
print_options.print_stream(stream, now).await?;
254+
print_options
255+
.print_stream(stream, now, &options.format)
256+
.await?;
254257
} else {
255258
// Bounded stream; collected results size is limited by the maxrows option
256259
let schema = physical_plan.schema();
@@ -273,9 +276,13 @@ pub(super) async fn exec_and_print(
273276
}
274277
row_count += curr_num_rows;
275278
}
276-
adjusted
277-
.into_inner()
278-
.print_batches(schema, &results, now, row_count)?;
279+
adjusted.into_inner().print_batches(
280+
schema,
281+
&results,
282+
now,
283+
row_count,
284+
&options.format,
285+
)?;
279286
reservation.free();
280287
}
281288
}

datafusion-cli/src/main.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,11 @@ fn get_session_config(args: &Args) -> Result<SessionConfig> {
265265
config_options.explain.format = String::from("tree");
266266
}
267267

268+
// in the CLI, we want to show NULL values rather the empty strings
269+
if env::var_os("DATAFUSION_FORMAT_NULL").is_none() {
270+
config_options.format.null = String::from("NULL");
271+
}
272+
268273
let session_config =
269274
SessionConfig::from(config_options).with_information_schema(true);
270275
Ok(session_config)

datafusion-cli/src/print_format.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use arrow::datatypes::SchemaRef;
2626
use arrow::json::{ArrayWriter, LineDelimitedWriter};
2727
use arrow::record_batch::RecordBatch;
2828
use arrow::util::pretty::pretty_format_batches_with_options;
29-
use datafusion::common::format::DEFAULT_CLI_FORMAT_OPTIONS;
29+
use datafusion::config::FormatOptions;
3030
use datafusion::error::Result;
3131

3232
/// Allow records to be printed in different formats
@@ -110,7 +110,10 @@ fn format_batches_with_maxrows<W: std::io::Write>(
110110
writer: &mut W,
111111
batches: &[RecordBatch],
112112
maxrows: MaxRows,
113+
format_options: &FormatOptions,
113114
) -> Result<()> {
115+
let options: arrow::util::display::FormatOptions = format_options.try_into()?;
116+
114117
match maxrows {
115118
MaxRows::Limited(maxrows) => {
116119
// Filter batches to meet the maxrows condition
@@ -131,10 +134,8 @@ fn format_batches_with_maxrows<W: std::io::Write>(
131134
}
132135
}
133136

134-
let formatted = pretty_format_batches_with_options(
135-
&filtered_batches,
136-
&DEFAULT_CLI_FORMAT_OPTIONS,
137-
)?;
137+
let formatted =
138+
pretty_format_batches_with_options(&filtered_batches, &options)?;
138139
if over_limit {
139140
let mut formatted_str = format!("{}", formatted);
140141
formatted_str = keep_only_maxrows(&formatted_str, maxrows);
@@ -144,8 +145,7 @@ fn format_batches_with_maxrows<W: std::io::Write>(
144145
}
145146
}
146147
MaxRows::Unlimited => {
147-
let formatted =
148-
pretty_format_batches_with_options(batches, &DEFAULT_CLI_FORMAT_OPTIONS)?;
148+
let formatted = pretty_format_batches_with_options(batches, &options)?;
149149
writeln!(writer, "{}", formatted)?;
150150
}
151151
}
@@ -162,6 +162,7 @@ impl PrintFormat {
162162
batches: &[RecordBatch],
163163
maxrows: MaxRows,
164164
with_header: bool,
165+
format_options: &FormatOptions,
165166
) -> Result<()> {
166167
// filter out any empty batches
167168
let batches: Vec<_> = batches
@@ -170,7 +171,7 @@ impl PrintFormat {
170171
.cloned()
171172
.collect();
172173
if batches.is_empty() {
173-
return self.print_empty(writer, schema);
174+
return self.print_empty(writer, schema, format_options);
174175
}
175176

176177
match self {
@@ -182,7 +183,7 @@ impl PrintFormat {
182183
if maxrows == MaxRows::Limited(0) {
183184
return Ok(());
184185
}
185-
format_batches_with_maxrows(writer, &batches, maxrows)
186+
format_batches_with_maxrows(writer, &batches, maxrows, format_options)
186187
}
187188
Self::Json => batches_to_json!(ArrayWriter, writer, &batches),
188189
Self::NdJson => batches_to_json!(LineDelimitedWriter, writer, &batches),
@@ -194,15 +195,17 @@ impl PrintFormat {
194195
&self,
195196
writer: &mut W,
196197
schema: SchemaRef,
198+
format_options: &FormatOptions,
197199
) -> Result<()> {
198200
match self {
199201
// Print column headers for Table format
200202
Self::Table if !schema.fields().is_empty() => {
203+
let format_options: arrow::util::display::FormatOptions =
204+
format_options.try_into()?;
205+
201206
let empty_batch = RecordBatch::new_empty(schema);
202-
let formatted = pretty_format_batches_with_options(
203-
&[empty_batch],
204-
&DEFAULT_CLI_FORMAT_OPTIONS,
205-
)?;
207+
let formatted =
208+
pretty_format_batches_with_options(&[empty_batch], &format_options)?;
206209
writeln!(writer, "{}", formatted)?;
207210
}
208211
_ => {}
@@ -644,6 +647,7 @@ mod tests {
644647
&self.batches,
645648
self.maxrows,
646649
with_header,
650+
&FormatOptions::default(),
647651
)
648652
.unwrap();
649653
String::from_utf8(buffer).unwrap()

datafusion-cli/src/print_options.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use datafusion::common::DataFusionError;
2929
use datafusion::error::Result;
3030
use datafusion::physical_plan::RecordBatchStream;
3131

32+
use datafusion::config::FormatOptions;
3233
use futures::StreamExt;
3334

3435
#[derive(Debug, Clone, PartialEq, Copy)]
@@ -103,12 +104,19 @@ impl PrintOptions {
103104
batches: &[RecordBatch],
104105
query_start_time: Instant,
105106
row_count: usize,
107+
format_options: &FormatOptions,
106108
) -> Result<()> {
107109
let stdout = std::io::stdout();
108110
let mut writer = stdout.lock();
109111

110-
self.format
111-
.print_batches(&mut writer, schema, batches, self.maxrows, true)?;
112+
self.format.print_batches(
113+
&mut writer,
114+
schema,
115+
batches,
116+
self.maxrows,
117+
true,
118+
format_options,
119+
)?;
112120

113121
let formatted_exec_details = get_execution_details_formatted(
114122
row_count,
@@ -132,6 +140,7 @@ impl PrintOptions {
132140
&self,
133141
mut stream: Pin<Box<dyn RecordBatchStream>>,
134142
query_start_time: Instant,
143+
format_options: &FormatOptions,
135144
) -> Result<()> {
136145
if self.format == PrintFormat::Table {
137146
return Err(DataFusionError::External(
@@ -154,6 +163,7 @@ impl PrintOptions {
154163
&[batch],
155164
MaxRows::Unlimited,
156165
with_header,
166+
format_options,
157167
)?;
158168
with_header = false;
159169
}

datafusion-cli/tests/cli_integration.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ fn init() {
6969
// can choose the old explain format too
7070
["--command", "EXPLAIN FORMAT indent SELECT 123"],
7171
)]
72+
#[case::change_format_version(
73+
"change_format_version",
74+
["--file", "tests/sql/types_format.sql", "-q"],
75+
)]
7276
#[test]
7377
fn cli_quick_test<'a>(
7478
#[case] snapshot_name: &'a str,
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
---
2+
source: datafusion-cli/tests/cli_integration.rs
3+
info:
4+
program: datafusion-cli
5+
args:
6+
- "--file"
7+
- tests/sql/types_format.sql
8+
- "-q"
9+
---
10+
success: true
11+
exit_code: 0
12+
----- stdout -----
13+
+-----------+
14+
| Int64(54) |
15+
| Int64 |
16+
+-----------+
17+
| 54 |
18+
+-----------+
19+
20+
----- stderr -----
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
set datafusion.format.types_info to true;
2+
3+
select 54

datafusion/common/src/config.rs

Lines changed: 69 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,16 @@
1717

1818
//! Runtime configuration, via [`ConfigOptions`]
1919
20+
use crate::error::_config_err;
21+
use crate::parsers::CompressionTypeVariant;
22+
use crate::utils::get_available_parallelism;
23+
use crate::{DataFusionError, Result};
2024
use std::any::Any;
2125
use std::collections::{BTreeMap, HashMap};
2226
use std::error::Error;
2327
use std::fmt::{self, Display};
2428
use std::str::FromStr;
2529

26-
use crate::error::_config_err;
27-
use crate::parsers::CompressionTypeVariant;
28-
use crate::utils::get_available_parallelism;
29-
use crate::{DataFusionError, Result};
30-
3130
/// A macro that wraps a configuration struct and automatically derives
3231
/// [`Default`] and [`ConfigField`] for it, allowing it to be used
3332
/// in the [`ConfigOptions`] configuration tree.
@@ -759,6 +758,59 @@ impl ExecutionOptions {
759758
}
760759
}
761760

761+
config_namespace! {
762+
/// Options controlling the format of output when printing record batches
763+
/// Copies [`arrow::util::display::FormatOptions`]
764+
pub struct FormatOptions {
765+
/// If set to `true` any formatting errors will be written to the output
766+
/// instead of being converted into a [`std::fmt::Error`]
767+
pub safe: bool, default = true
768+
/// Format string for nulls
769+
pub null: String, default = "".into()
770+
/// Date format for date arrays
771+
pub date_format: Option<String>, default = Some("%Y-%m-%d".to_string())
772+
/// Format for DateTime arrays
773+
pub datetime_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
774+
/// Timestamp format for timestamp arrays
775+
pub timestamp_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
776+
/// Timestamp format for timestamp with timezone arrays. When `None`, ISO 8601 format is used.
777+
pub timestamp_tz_format: Option<String>, default = None
778+
/// Time format for time arrays
779+
pub time_format: Option<String>, default = Some("%H:%M:%S%.f".to_string())
780+
/// Duration format. Can be either `"pretty"` or `"ISO8601"`
781+
pub duration_format: String, transform = str::to_lowercase, default = "pretty".into()
782+
/// Show types in visual representation batches
783+
pub types_info: bool, default = false
784+
}
785+
}
786+
787+
impl<'a> TryInto<arrow::util::display::FormatOptions<'a>> for &'a FormatOptions {
788+
type Error = DataFusionError;
789+
fn try_into(self) -> Result<arrow::util::display::FormatOptions<'a>> {
790+
let duration_format = match self.duration_format.as_str() {
791+
"pretty" => arrow::util::display::DurationFormat::Pretty,
792+
"iso8601" => arrow::util::display::DurationFormat::ISO8601,
793+
_ => {
794+
return _config_err!(
795+
"Invalid duration format: {}. Valid values are pretty or iso8601",
796+
self.duration_format
797+
)
798+
}
799+
};
800+
801+
Ok(arrow::util::display::FormatOptions::new()
802+
.with_display_error(self.safe)
803+
.with_null(&self.null)
804+
.with_date_format(self.date_format.as_deref())
805+
.with_datetime_format(self.datetime_format.as_deref())
806+
.with_timestamp_format(self.timestamp_format.as_deref())
807+
.with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
808+
.with_time_format(self.time_format.as_deref())
809+
.with_duration_format(duration_format)
810+
.with_types_info(self.types_info))
811+
}
812+
}
813+
762814
/// A key value pair, with a corresponding description
763815
#[derive(Debug)]
764816
pub struct ConfigEntry {
@@ -788,6 +840,8 @@ pub struct ConfigOptions {
788840
pub explain: ExplainOptions,
789841
/// Optional extensions registered using [`Extensions::insert`]
790842
pub extensions: Extensions,
843+
/// Formatting options when printing batches
844+
pub format: FormatOptions,
791845
}
792846

793847
impl ConfigField for ConfigOptions {
@@ -800,6 +854,7 @@ impl ConfigField for ConfigOptions {
800854
"optimizer" => self.optimizer.set(rem, value),
801855
"explain" => self.explain.set(rem, value),
802856
"sql_parser" => self.sql_parser.set(rem, value),
857+
"format" => self.format.set(rem, value),
803858
_ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
804859
}
805860
}
@@ -810,6 +865,7 @@ impl ConfigField for ConfigOptions {
810865
self.optimizer.visit(v, "datafusion.optimizer", "");
811866
self.explain.visit(v, "datafusion.explain", "");
812867
self.sql_parser.visit(v, "datafusion.sql_parser", "");
868+
self.format.visit(v, "datafusion.format", "");
813869
}
814870
}
815871

@@ -2004,11 +2060,11 @@ config_namespace! {
20042060
}
20052061
}
20062062

2007-
pub trait FormatOptionsExt: Display {}
2063+
pub trait OutputFormatExt: Display {}
20082064

20092065
#[derive(Debug, Clone, PartialEq)]
20102066
#[allow(clippy::large_enum_variant)]
2011-
pub enum FormatOptions {
2067+
pub enum OutputFormat {
20122068
CSV(CsvOptions),
20132069
JSON(JsonOptions),
20142070
#[cfg(feature = "parquet")]
@@ -2017,15 +2073,15 @@ pub enum FormatOptions {
20172073
ARROW,
20182074
}
20192075

2020-
impl Display for FormatOptions {
2076+
impl Display for OutputFormat {
20212077
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
20222078
let out = match self {
2023-
FormatOptions::CSV(_) => "csv",
2024-
FormatOptions::JSON(_) => "json",
2079+
OutputFormat::CSV(_) => "csv",
2080+
OutputFormat::JSON(_) => "json",
20252081
#[cfg(feature = "parquet")]
2026-
FormatOptions::PARQUET(_) => "parquet",
2027-
FormatOptions::AVRO => "avro",
2028-
FormatOptions::ARROW => "arrow",
2082+
OutputFormat::PARQUET(_) => "parquet",
2083+
OutputFormat::AVRO => "avro",
2084+
OutputFormat::ARROW => "arrow",
20292085
};
20302086
write!(f, "{}", out)
20312087
}

0 commit comments

Comments
 (0)