diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index 1fc949593512..0d5e0614d26f 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -23,8 +23,10 @@ use crate::print_options::MaxRows; use arrow::csv::writer::WriterBuilder; use arrow::datatypes::SchemaRef; +use arrow::error::ArrowError; use arrow::json::{ArrayWriter, LineDelimitedWriter}; use arrow::record_batch::RecordBatch; +use arrow::util::display::{ArrayFormatter, ValueFormatter}; use arrow::util::pretty::pretty_format_batches_with_options; use datafusion::common::format::DEFAULT_CLI_FORMAT_OPTIONS; use datafusion::error::Result; @@ -153,6 +155,164 @@ fn format_batches_with_maxrows( Ok(()) } +/// The state and methods for displaying output +pub struct OutputStreamState<'a> { + pub preview_batches: Vec, + pub preview_row_count: usize, + pub preview_limit: usize, + pub precomputed_widths: Option>, + pub header_printed: bool, + pub writer: &'a mut dyn std::io::Write, + pub format: PrintFormat, +} + +impl<'a> OutputStreamState<'a> { + /// Create a new OutputStreamState + pub fn new( + writer: &'a mut dyn std::io::Write, + format: PrintFormat, + preview_limit: usize, + ) -> Self { + Self { + preview_batches: Vec::new(), + preview_row_count: 0, + preview_limit, + precomputed_widths: None, + header_printed: false, + writer, + format, + } + } + + /// Process a single batch of data + pub fn process_batch( + &mut self, + batch: &RecordBatch, + schema: SchemaRef, + ) -> Result<()> { + if self.precomputed_widths.is_none() { + self.preview_batches.push(batch.clone()); + self.preview_row_count += batch.num_rows(); + if self.preview_row_count >= self.preview_limit { + let widths = + self.compute_column_widths(&self.preview_batches, schema.clone())?; + self.precomputed_widths = Some(widths.clone()); + self.print_header(&schema, &widths)?; + self.header_printed = true; + let drained_batches: Vec<_> = self.preview_batches.drain(..).collect(); + for preview_batch in drained_batches { + self.print_batch_with_widths(&preview_batch, &widths)?; + } + } + } else { + let widths = self.precomputed_widths.clone().unwrap(); + if !self.header_printed { + self.print_header(&schema, &widths)?; + self.header_printed = true; + } + self.print_batch_with_widths(batch, &widths)?; + } + Ok(()) + } + + /// Compute the widths of each column for display + pub fn compute_column_widths( + &self, + batches: &Vec, + schema: SchemaRef, + ) -> Result> { + let mut widths: Vec = + schema.fields().iter().map(|f| f.name().len()).collect(); + for batch in batches { + let formatters = batch + .columns() + .iter() + .map(|c| ArrayFormatter::try_new(c.as_ref(), &DEFAULT_CLI_FORMAT_OPTIONS)) + .collect::, ArrowError>>()?; + for row in 0..batch.num_rows() { + for (i, formatter) in formatters.iter().enumerate() { + let cell = formatter.value(row); + widths[i] = widths[i].max(cell.to_string().len()); + } + } + } + Ok(widths) + } + + /// Print the header of the table + pub fn print_header(&mut self, schema: &SchemaRef, widths: &[usize]) -> Result<()> { + Self::print_border(widths, self.writer)?; + + let header: Vec = schema + .fields() + .iter() + .enumerate() + .map(|(i, field)| Self::pad_cell(field.name(), widths[i])) + .collect(); + writeln!(self.writer, "| {} |", header.join(" | "))?; + + Self::print_border(widths, self.writer)?; + Ok(()) + } + + /// Print a batch with pre-computed column widths + pub fn print_batch_with_widths( + &mut self, + batch: &RecordBatch, + widths: &[usize], + ) -> Result<()> { + let formatters = batch + .columns() + .iter() + .map(|c| ArrayFormatter::try_new(c.as_ref(), &DEFAULT_CLI_FORMAT_OPTIONS)) + .collect::, ArrowError>>()?; + for row in 0..batch.num_rows() { + let cells: Vec = formatters + .iter() + .enumerate() + .map(|(i, formatter)| Self::pad_value(&formatter.value(row), widths[i])) + .collect(); + writeln!(self.writer, "| {} |", cells.join(" | "))?; + } + Ok(()) + } + + /// Print a dotted line indicating truncated output + pub fn print_dotted_line(&mut self, widths: &[usize]) -> Result<()> { + let cells: Vec = widths + .iter() + .map(|&w| format!(" {: Result<()> { + let cells: Vec = widths.iter().map(|&w| "-".repeat(w + 2)).collect(); + writeln!(self.writer, "+{}+", cells.join("+"))?; + Ok(()) + } + + /// Print a horizontal border line + fn print_border(widths: &[usize], writer: &mut dyn std::io::Write) -> Result<()> { + let cells: Vec = widths.iter().map(|&w| "-".repeat(w + 2)).collect(); + writeln!(writer, "+{}+", cells.join("+"))?; + Ok(()) + } + + /// Pad a cell to fit the required width + fn pad_cell(cell: &str, width: usize) -> String { + format!("{: String { + let s = formatter.try_to_string().unwrap_or_default(); + format!("{:( @@ -539,6 +699,238 @@ mod tests { .run(); } + #[test] + fn test_compute_column_widths() { + let schema = three_column_schema(); + let batches = vec![three_column_batch()]; + let mut writer = Vec::new(); + let state = OutputStreamState::new(&mut writer, PrintFormat::Table, 10); + let widths = state + .compute_column_widths(&batches, schema.clone()) + .unwrap(); + assert_eq!(widths, vec![1, 1, 1]); + + let schema = one_column_schema(); + let batches = vec![one_column_batch()]; + let mut writer = Vec::new(); + let state = OutputStreamState::new(&mut writer, PrintFormat::Table, 10); + let widths = state + .compute_column_widths(&batches, schema.clone()) + .unwrap(); + assert_eq!(widths, vec![1]); + + let schema = three_column_schema(); + let batches = vec![three_column_batch_with_widths()]; + let mut writer = Vec::new(); + let state = OutputStreamState::new(&mut writer, PrintFormat::Table, 10); + let widths = state + .compute_column_widths(&batches, schema.clone()) + .unwrap(); + assert_eq!(widths, [7, 5, 6]); + } + + #[test] + fn test_print_header() { + let schema = three_column_schema(); + let widths = vec![1, 1, 1]; + let mut writer = Vec::new(); + let mut state = OutputStreamState::new(&mut writer, PrintFormat::Table, 10); + state.print_header(&schema, &widths).unwrap(); + let expected = &["+---+---+---+", "| a | b | c |", "+---+---+---+"]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_batch_with_same_widths() { + let batch = three_column_batch(); + let widths = vec![1, 1, 1]; + let mut writer = Vec::new(); + let mut state = OutputStreamState::new(&mut writer, PrintFormat::Table, 10); + state.print_batch_with_widths(&batch, &widths).unwrap(); + let expected = &["| 1 | 4 | 7 |", "| 2 | 5 | 8 |", "| 3 | 6 | 9 |"]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_batch_with_different_widths() { + let batch = three_column_batch_with_widths(); + let widths = vec![7, 5, 6]; + let mut writer = Vec::new(); + let mut state = OutputStreamState::new(&mut writer, PrintFormat::Table, 10); + state.print_batch_with_widths(&batch, &widths).unwrap(); + let expected = &[ + "| 1 | 42222 | 7 |", + "| 2222222 | 5 | 8 |", + "| 3 | 6 | 922222 |", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_dotted_line() { + let widths = vec![1, 1, 1]; + let mut writer = Vec::new(); + let mut state = OutputStreamState::new(&mut writer, PrintFormat::Table, 10); + state.print_dotted_line(&widths).unwrap(); + let expected = &["| . | . | . |"]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_bottom_border() { + let widths = vec![1, 1, 1]; + let mut writer = Vec::new(); + let mut state = OutputStreamState::new(&mut writer, PrintFormat::Table, 10); + state.print_bottom_border(&widths).unwrap(); + let expected = &["+---+---+---+"]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_batches_with_maxrows() { + let batch = one_column_batch(); + let schema = one_column_schema(); + let format = PrintFormat::Table; + + // should print out entire output with no truncation if unlimited or + // limit greater than number of batches or equal to the number of batches + for max_rows in [MaxRows::Unlimited, MaxRows::Limited(5), MaxRows::Limited(3)] { + let mut writer = Vec::new(); + format + .print_batches( + &mut writer, + schema.clone(), + &[batch.clone()], + max_rows, + true, + ) + .unwrap(); + let expected = &[ + "+---+", "| a |", "+---+", "| 1 |", "| 2 |", "| 3 |", "+---+", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + // should truncate output if limit is less than number of batches + let mut writer = Vec::new(); + format + .print_batches( + &mut writer, + schema.clone(), + &[batch.clone()], + MaxRows::Limited(1), + true, + ) + .unwrap(); + let expected = &[ + "+---+", "| a |", "+---+", "| 1 |", "| . |", "| . |", "| . |", "+---+", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + // test process_batch with different batch widths + // and preview count is less than the first batch + #[test] + fn test_print_batches_with_preview_count_less_than_first_batch() { + let batch = three_column_batch_with_widths(); + let schema = three_column_schema(); + let mut writer = Vec::new(); + let mut state = OutputStreamState::new(&mut writer, PrintFormat::Table, 2); + + state.process_batch(&batch, schema.clone()).unwrap(); + + let expected = &[ + "+---------+-------+--------+", + "| a | b | c |", + "+---------+-------+--------+", + "| 1 | 42222 | 7 |", + "| 2222222 | 5 | 8 |", + "| 3 | 6 | 922222 |", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_batches_with_preview_and_later_batches() { + let batch1 = three_column_batch(); + let batch2 = three_column_batch_with_widths(); + let schema = three_column_schema(); + // preview limit is less than the first batch + // so the second batch if it's width is greater than the first batch, it will be unformatted + let mut writer = Vec::new(); + let mut state = OutputStreamState::new(&mut writer, PrintFormat::Table, 2); + + state.process_batch(&batch1, schema.clone()).unwrap(); + state.process_batch(&batch2, schema.clone()).unwrap(); + state.process_batch(&batch1, schema.clone()).unwrap(); + + let expected = &[ + "+---+---+---+", + "| a | b | c |", + "+---+---+---+", + "| 1 | 4 | 7 |", + "| 2 | 5 | 8 |", + "| 3 | 6 | 9 |", + "| 1 | 42222 | 7 |", + "| 2222222 | 5 | 8 |", + "| 3 | 6 | 922222 |", + "| 1 | 4 | 7 |", + "| 2 | 5 | 8 |", + "| 3 | 6 | 9 |", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + + #[test] + fn test_print_batches_with_preview_cover_later_batches() { + let batch1 = three_column_batch(); + let batch2 = three_column_batch_with_widths(); + let schema = three_column_schema(); + // preview limit is greater than the first batch + let mut writer = Vec::new(); + let mut state = OutputStreamState::new(&mut writer, PrintFormat::Table, 4); + + state.process_batch(&batch1, schema.clone()).unwrap(); + state.process_batch(&batch2, schema.clone()).unwrap(); + state.process_batch(&batch1, schema.clone()).unwrap(); + + let expected = &[ + "+---------+-------+--------+", + "| a | b | c |", + "+---------+-------+--------+", + "| 1 | 4 | 7 |", + "| 2 | 5 | 8 |", + "| 3 | 6 | 9 |", + "| 1 | 42222 | 7 |", + "| 2222222 | 5 | 8 |", + "| 3 | 6 | 922222 |", + "| 1 | 4 | 7 |", + "| 2 | 5 | 8 |", + "| 3 | 6 | 9 |", + ]; + let binding = String::from_utf8(writer.clone()).unwrap(); + let actual: Vec<_> = binding.trim_end().split('\n').collect(); + assert_eq!(actual, expected); + } + #[derive(Debug)] struct PrintBatchesTest { format: PrintFormat, @@ -672,6 +1064,19 @@ mod tests { .unwrap() } + /// Return a batch with three columns and three rows, but with different widths + fn three_column_batch_with_widths() -> RecordBatch { + RecordBatch::try_new( + three_column_schema(), + vec![ + Arc::new(Int32Array::from(vec![1, 2222222, 3])), + Arc::new(Int32Array::from(vec![42222, 5, 6])), + Arc::new(Int32Array::from(vec![7, 8, 922222])), + ], + ) + .unwrap() + } + /// Return a schema with one column fn one_column_schema() -> SchemaRef { Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]))