From e4a96d0bcb58abfbbbaeb4dbd56a5ee26803228b Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Mon, 10 Mar 2025 17:33:28 -0400 Subject: [PATCH 1/7] feat: vortex CLI convert parquet to Vortex --- Cargo.lock | 5 +++ vortex-tui/Cargo.toml | 7 ++- vortex-tui/src/convert.rs | 93 +++++++++++++++++++++++++++++++++++++++ vortex-tui/src/main.rs | 9 +++- vortex-tui/src/tree.rs | 4 +- 5 files changed, 113 insertions(+), 5 deletions(-) create mode 100644 vortex-tui/src/convert.rs diff --git a/Cargo.lock b/Cargo.lock index 60ce53e571..e5e754a93b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6162,9 +6162,14 @@ dependencies = [ name = "vortex-tui" version = "0.25.2" dependencies = [ + "arrow-array", "async-trait", "clap", "crossterm", + "futures-util", + "indicatif", + "parquet", + "pin-project", "ratatui", "tokio", "vortex", diff --git a/vortex-tui/Cargo.toml b/vortex-tui/Cargo.toml index 0778718c5a..a4006b71dd 100644 --- a/vortex-tui/Cargo.toml +++ b/vortex-tui/Cargo.toml @@ -14,12 +14,17 @@ rust-version = { workspace = true } version = { workspace = true } [dependencies] +arrow-array = { workspace = true } async-trait = { workspace = true } clap = { workspace = true, features = ["derive"] } crossterm = { workspace = true } +futures-util = { workspace = true } +indicatif = { workspace = true } +parquet = { workspace = true, features = ["arrow", "async"] } +pin-project = { workspace = true } ratatui = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread"] } -vortex = { workspace = true, features = ["tokio"] } +vortex = { workspace = true, features = ["tokio", "parquet"] } vortex-layout = { workspace = true } [lints] diff --git a/vortex-tui/src/convert.rs b/vortex-tui/src/convert.rs new file mode 100644 index 0000000000..6c12f47759 --- /dev/null +++ b/vortex-tui/src/convert.rs @@ -0,0 +1,93 @@ +use std::path::Path; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use arrow_array::StructArray as ArrowStructArray; +use futures_util::Stream; +use indicatif::ProgressBar; +use parquet::arrow::ParquetRecordBatchStreamBuilder; +use pin_project::pin_project; +use tokio::fs::File; +use vortex::arrays::ChunkedArray; +use vortex::arrow::FromArrowArray; +use vortex::dtype::DType; +use vortex::error::{VortexExpect, VortexResult}; +use vortex::file::VortexWriteOptions; +use vortex::stream::{ArrayStream, ArrayStreamArrayExt}; +use vortex::{Array, ArrayRef}; + +/// Convert Parquet files to Vortex. +pub async fn exec_convert(input_path: impl AsRef) -> VortexResult<()> { + println!( + "Converting input Parquet file: {}", + input_path.as_ref().display() + ); + + let output_path = input_path.as_ref().with_extension("vortex"); + let file = File::open(input_path).await?; + let mut reader = ParquetRecordBatchStreamBuilder::new(file).await?.build()?; + let mut chunks = Vec::new(); + + while let Some(mut reader) = reader.next_row_group().await? { + for batch in reader.by_ref() { + let batch = ArrowStructArray::from(batch?); + let next_chunk = ArrayRef::from_arrow(&batch, true); + chunks.push(next_chunk); + } + } + + let dtype = chunks.first().vortex_expect("empty chunks").dtype().clone(); + let chunked_array = ChunkedArray::try_new(chunks, dtype)?; + + let writer = VortexWriteOptions::default(); + + let pb = ProgressBar::new(chunked_array.nchunks() as u64); + let stream = ProgressArrayStream { + inner: chunked_array.to_array_stream(), + progress: pb, + }; + + println!( + "Writing {} chunks to new Vortex file {}", + chunked_array.nchunks(), + output_path.display() + ); + let output_file = File::create(output_path).await?; + writer.write(output_file, stream).await?; + + Ok(()) +} + +#[pin_project] +struct ProgressArrayStream { + #[pin] + inner: S, + progress: ProgressBar, +} + +impl Stream for ProgressArrayStream +where + S: Stream>, +{ + type Item = VortexResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + match this.inner.poll_next(cx) { + Poll::Ready(inner) => { + this.progress.inc(1); + Poll::Ready(inner) + } + Poll::Pending => Poll::Pending, + } + } +} + +impl ArrayStream for ProgressArrayStream +where + S: ArrayStream, +{ + fn dtype(&self) -> &DType { + self.inner.dtype() + } +} diff --git a/vortex-tui/src/main.rs b/vortex-tui/src/main.rs index 91199c77ac..e4078fb2e2 100644 --- a/vortex-tui/src/main.rs +++ b/vortex-tui/src/main.rs @@ -1,5 +1,6 @@ #![allow(clippy::expect_used)] mod browse; +mod convert; mod tree; use std::path::PathBuf; @@ -10,6 +11,8 @@ use clap::Parser; use tokio::runtime::Runtime; use tree::exec_tree; +use crate::convert::exec_convert; + static TOKIO_RUNTIME: LazyLock = LazyLock::new(|| Runtime::new().expect("Tokio Runtime::new()")); @@ -22,13 +25,17 @@ struct Cli { #[derive(Debug, clap::Subcommand)] enum Commands { Tree { file: PathBuf }, + Convert { file: PathBuf }, Browse { file: PathBuf }, } fn main() { let cli = Cli::parse(); match cli.command { - Commands::Tree { file } => exec_tree(file).expect("exec_tre"), + Commands::Tree { file } => TOKIO_RUNTIME.block_on(exec_tree(file)).expect("exec_tre"), + Commands::Convert { file } => TOKIO_RUNTIME + .block_on(exec_convert(file)) + .expect("exec_convert"), Commands::Browse { file } => exec_tui(file).expect("exec_tui"), } } diff --git a/vortex-tui/src/tree.rs b/vortex-tui/src/tree.rs index 765d188af3..c0a6e4d82f 100644 --- a/vortex-tui/src/tree.rs +++ b/vortex-tui/src/tree.rs @@ -4,9 +4,7 @@ use vortex::error::VortexResult; use vortex::file::VortexOpenOptions; use vortex::io::TokioFile; -use crate::TOKIO_RUNTIME; - -pub fn exec_tree(file: impl AsRef) -> VortexResult<()> { +pub async fn exec_tree(file: impl AsRef) -> VortexResult<()> { let opened = TokioFile::open(file)?; let full = TOKIO_RUNTIME.block_on(async move { From 1ba3e6f02559894fba8ec29df5103da9867c919e Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Mon, 10 Mar 2025 17:39:52 -0400 Subject: [PATCH 2/7] log times --- vortex-tui/src/convert.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/vortex-tui/src/convert.rs b/vortex-tui/src/convert.rs index 6c12f47759..f067f73e59 100644 --- a/vortex-tui/src/convert.rs +++ b/vortex-tui/src/convert.rs @@ -1,6 +1,7 @@ use std::path::Path; use std::pin::Pin; use std::task::{Context, Poll}; +use std::time::Instant; use arrow_array::StructArray as ArrowStructArray; use futures_util::Stream; @@ -23,6 +24,8 @@ pub async fn exec_convert(input_path: impl AsRef) -> VortexResult<()> { input_path.as_ref().display() ); + let wall_start = Instant::now(); + let output_path = input_path.as_ref().with_extension("vortex"); let file = File::open(input_path).await?; let mut reader = ParquetRecordBatchStreamBuilder::new(file).await?.build()?; @@ -36,6 +39,12 @@ pub async fn exec_convert(input_path: impl AsRef) -> VortexResult<()> { } } + let read_complete = Instant::now(); + println!( + "Read Parquet file in {:?}", + read_complete.duration_since(wall_start) + ); + let dtype = chunks.first().vortex_expect("empty chunks").dtype().clone(); let chunked_array = ChunkedArray::try_new(chunks, dtype)?; @@ -55,6 +64,11 @@ pub async fn exec_convert(input_path: impl AsRef) -> VortexResult<()> { let output_file = File::create(output_path).await?; writer.write(output_file, stream).await?; + println!( + "Wrote Vortex in {:?}", + Instant::now().duration_since(read_complete) + ); + Ok(()) } From 83bcf7ce214173507bcd9120bb3362605ccd8b97 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Mon, 10 Mar 2025 17:56:02 -0400 Subject: [PATCH 3/7] log times --- vortex-tui/src/convert.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/vortex-tui/src/convert.rs b/vortex-tui/src/convert.rs index f067f73e59..d250dd01a4 100644 --- a/vortex-tui/src/convert.rs +++ b/vortex-tui/src/convert.rs @@ -1,3 +1,5 @@ +#![allow(clippy::use_debug)] + use std::path::Path; use std::pin::Pin; use std::task::{Context, Poll}; From 547b5002f84013267dbbad070e2b3cb7cc9ab871 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Mon, 10 Mar 2025 21:25:22 -0400 Subject: [PATCH 4/7] add `-q` flag for silent conversion --- vortex-tui/src/convert.rs | 69 +++++++++++++++++++++++++-------------- vortex-tui/src/main.rs | 18 +++++++--- 2 files changed, 58 insertions(+), 29 deletions(-) diff --git a/vortex-tui/src/convert.rs b/vortex-tui/src/convert.rs index d250dd01a4..641a3336b1 100644 --- a/vortex-tui/src/convert.rs +++ b/vortex-tui/src/convert.rs @@ -19,12 +19,19 @@ use vortex::file::VortexWriteOptions; use vortex::stream::{ArrayStream, ArrayStreamArrayExt}; use vortex::{Array, ArrayRef}; +#[derive(Default)] +pub struct Flags { + pub quiet: bool, +} + /// Convert Parquet files to Vortex. -pub async fn exec_convert(input_path: impl AsRef) -> VortexResult<()> { - println!( - "Converting input Parquet file: {}", - input_path.as_ref().display() - ); +pub async fn exec_convert(input_path: impl AsRef, flags: Flags) -> VortexResult<()> { + if !flags.quiet { + println!( + "Converting input Parquet file: {}", + input_path.as_ref().display() + ); + } let wall_start = Instant::now(); @@ -42,34 +49,46 @@ pub async fn exec_convert(input_path: impl AsRef) -> VortexResult<()> { } let read_complete = Instant::now(); - println!( - "Read Parquet file in {:?}", - read_complete.duration_since(wall_start) - ); + + if !flags.quiet { + println!( + "Read Parquet file in {:?}", + read_complete.duration_since(wall_start) + ); + + println!( + "Writing {} chunks to new Vortex file {}", + chunks.len(), + output_path.display() + ); + } let dtype = chunks.first().vortex_expect("empty chunks").dtype().clone(); let chunked_array = ChunkedArray::try_new(chunks, dtype)?; let writer = VortexWriteOptions::default(); - let pb = ProgressBar::new(chunked_array.nchunks() as u64); - let stream = ProgressArrayStream { - inner: chunked_array.to_array_stream(), - progress: pb, - }; - - println!( - "Writing {} chunks to new Vortex file {}", - chunked_array.nchunks(), - output_path.display() - ); let output_file = File::create(output_path).await?; - writer.write(output_file, stream).await?; - println!( - "Wrote Vortex in {:?}", - Instant::now().duration_since(read_complete) - ); + if !flags.quiet { + let pb = ProgressBar::new(chunked_array.nchunks() as u64); + let stream = ProgressArrayStream { + inner: chunked_array.to_array_stream(), + progress: pb, + }; + writer.write(output_file, stream).await?; + } else { + writer + .write(output_file, chunked_array.to_array_stream()) + .await?; + } + + if !flags.quiet { + println!( + "Wrote Vortex in {:?}", + Instant::now().duration_since(read_complete) + ); + } Ok(()) } diff --git a/vortex-tui/src/main.rs b/vortex-tui/src/main.rs index e4078fb2e2..ccee3c3eda 100644 --- a/vortex-tui/src/main.rs +++ b/vortex-tui/src/main.rs @@ -11,7 +11,7 @@ use clap::Parser; use tokio::runtime::Runtime; use tree::exec_tree; -use crate::convert::exec_convert; +use crate::convert::{Flags, exec_convert}; static TOKIO_RUNTIME: LazyLock = LazyLock::new(|| Runtime::new().expect("Tokio Runtime::new()")); @@ -24,8 +24,18 @@ struct Cli { #[derive(Debug, clap::Subcommand)] enum Commands { + /// Print the encoding tree of a Vortex file. Tree { file: PathBuf }, - Convert { file: PathBuf }, + /// Convert a Parquet file to a Vortex file. Chunking occurs on Parquet RowGroup boundaries. + Convert { + /// Path to the Parquet file on disk to convert to Vortex + file: PathBuf, + + /// Execute quietly. No output will be printed. + #[arg(short, long)] + quiet: bool, + }, + /// Interactively browse the Vortex file. Browse { file: PathBuf }, } @@ -33,8 +43,8 @@ fn main() { let cli = Cli::parse(); match cli.command { Commands::Tree { file } => TOKIO_RUNTIME.block_on(exec_tree(file)).expect("exec_tre"), - Commands::Convert { file } => TOKIO_RUNTIME - .block_on(exec_convert(file)) + Commands::Convert { file, quiet } => TOKIO_RUNTIME + .block_on(exec_convert(file, Flags { quiet })) .expect("exec_convert"), Commands::Browse { file } => exec_tui(file).expect("exec_tui"), } From 8adef64d7ebfe958cebb2115d2f5b06be42724bc Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 12 Mar 2025 20:35:59 -0400 Subject: [PATCH 5/7] eprintln --- vortex-tui/src/convert.rs | 8 ++++---- vortex-tui/src/tree.rs | 14 ++++++-------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/vortex-tui/src/convert.rs b/vortex-tui/src/convert.rs index 641a3336b1..97ea301a04 100644 --- a/vortex-tui/src/convert.rs +++ b/vortex-tui/src/convert.rs @@ -27,7 +27,7 @@ pub struct Flags { /// Convert Parquet files to Vortex. pub async fn exec_convert(input_path: impl AsRef, flags: Flags) -> VortexResult<()> { if !flags.quiet { - println!( + eprintln!( "Converting input Parquet file: {}", input_path.as_ref().display() ); @@ -51,12 +51,12 @@ pub async fn exec_convert(input_path: impl AsRef, flags: Flags) -> VortexR let read_complete = Instant::now(); if !flags.quiet { - println!( + eprintln!( "Read Parquet file in {:?}", read_complete.duration_since(wall_start) ); - println!( + eprintln!( "Writing {} chunks to new Vortex file {}", chunks.len(), output_path.display() @@ -84,7 +84,7 @@ pub async fn exec_convert(input_path: impl AsRef, flags: Flags) -> VortexR } if !flags.quiet { - println!( + eprintln!( "Wrote Vortex in {:?}", Instant::now().duration_since(read_complete) ); diff --git a/vortex-tui/src/tree.rs b/vortex-tui/src/tree.rs index c0a6e4d82f..ac6232a540 100644 --- a/vortex-tui/src/tree.rs +++ b/vortex-tui/src/tree.rs @@ -7,14 +7,12 @@ use vortex::io::TokioFile; pub async fn exec_tree(file: impl AsRef) -> VortexResult<()> { let opened = TokioFile::open(file)?; - let full = TOKIO_RUNTIME.block_on(async move { - VortexOpenOptions::file(opened) - .open() - .await? - .scan() - .read_all() - .await - })?; + let full = VortexOpenOptions::file(opened) + .open() + .await? + .scan() + .read_all() + .await?; println!("{}", full.tree_display()); From 13ca5c2675fc784e1984d6bbb749b6eabe321c5b Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 12 Mar 2025 20:43:48 -0400 Subject: [PATCH 6/7] tree --- vortex-tui/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vortex-tui/src/main.rs b/vortex-tui/src/main.rs index ccee3c3eda..fe3579c808 100644 --- a/vortex-tui/src/main.rs +++ b/vortex-tui/src/main.rs @@ -42,7 +42,7 @@ enum Commands { fn main() { let cli = Cli::parse(); match cli.command { - Commands::Tree { file } => TOKIO_RUNTIME.block_on(exec_tree(file)).expect("exec_tre"), + Commands::Tree { file } => TOKIO_RUNTIME.block_on(exec_tree(file)).expect("exec_tree"), Commands::Convert { file, quiet } => TOKIO_RUNTIME .block_on(exec_convert(file, Flags { quiet })) .expect("exec_convert"), From 185135878fdc48a64427a2735d68c8f5ce8ae5a5 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 12 Mar 2025 21:04:03 -0400 Subject: [PATCH 7/7] more correct root-level null checking --- vortex-tui/src/convert.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/vortex-tui/src/convert.rs b/vortex-tui/src/convert.rs index 97ea301a04..88b12ab3f0 100644 --- a/vortex-tui/src/convert.rs +++ b/vortex-tui/src/convert.rs @@ -9,6 +9,7 @@ use arrow_array::StructArray as ArrowStructArray; use futures_util::Stream; use indicatif::ProgressBar; use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use pin_project::pin_project; use tokio::fs::File; use vortex::arrays::ChunkedArray; @@ -36,14 +37,19 @@ pub async fn exec_convert(input_path: impl AsRef, flags: Flags) -> VortexR let wall_start = Instant::now(); let output_path = input_path.as_ref().with_extension("vortex"); - let file = File::open(input_path).await?; + let mut file = File::open(input_path).await?; + + let metadata = + ArrowReaderMetadata::load_async(&mut file, ArrowReaderOptions::default()).await?; + let has_root_level_nulls = metadata.parquet_schema().root_schema().is_optional(); + let mut reader = ParquetRecordBatchStreamBuilder::new(file).await?.build()?; let mut chunks = Vec::new(); while let Some(mut reader) = reader.next_row_group().await? { for batch in reader.by_ref() { let batch = ArrowStructArray::from(batch?); - let next_chunk = ArrayRef::from_arrow(&batch, true); + let next_chunk = ArrayRef::from_arrow(&batch, has_root_level_nulls); chunks.push(next_chunk); } }