diff --git a/Cargo.lock b/Cargo.lock index 60ce53e571..e5e754a93b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6162,9 +6162,14 @@ dependencies = [ name = "vortex-tui" version = "0.25.2" dependencies = [ + "arrow-array", "async-trait", "clap", "crossterm", + "futures-util", + "indicatif", + "parquet", + "pin-project", "ratatui", "tokio", "vortex", diff --git a/vortex-tui/Cargo.toml b/vortex-tui/Cargo.toml index 0778718c5a..a4006b71dd 100644 --- a/vortex-tui/Cargo.toml +++ b/vortex-tui/Cargo.toml @@ -14,12 +14,17 @@ rust-version = { workspace = true } version = { workspace = true } [dependencies] +arrow-array = { workspace = true } async-trait = { workspace = true } clap = { workspace = true, features = ["derive"] } crossterm = { workspace = true } +futures-util = { workspace = true } +indicatif = { workspace = true } +parquet = { workspace = true, features = ["arrow", "async"] } +pin-project = { workspace = true } ratatui = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread"] } -vortex = { workspace = true, features = ["tokio"] } +vortex = { workspace = true, features = ["tokio", "parquet"] } vortex-layout = { workspace = true } [lints] diff --git a/vortex-tui/src/convert.rs b/vortex-tui/src/convert.rs new file mode 100644 index 0000000000..88b12ab3f0 --- /dev/null +++ b/vortex-tui/src/convert.rs @@ -0,0 +1,134 @@ +#![allow(clippy::use_debug)] + +use std::path::Path; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Instant; + +use arrow_array::StructArray as ArrowStructArray; +use futures_util::Stream; +use indicatif::ProgressBar; +use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; +use pin_project::pin_project; +use tokio::fs::File; +use vortex::arrays::ChunkedArray; +use vortex::arrow::FromArrowArray; +use vortex::dtype::DType; +use vortex::error::{VortexExpect, VortexResult}; +use vortex::file::VortexWriteOptions; +use vortex::stream::{ArrayStream, ArrayStreamArrayExt}; +use vortex::{Array, ArrayRef}; + +#[derive(Default)] +pub struct Flags { + pub quiet: bool, +} + +/// Convert Parquet files to Vortex. +pub async fn exec_convert(input_path: impl AsRef, flags: Flags) -> VortexResult<()> { + if !flags.quiet { + eprintln!( + "Converting input Parquet file: {}", + input_path.as_ref().display() + ); + } + + let wall_start = Instant::now(); + + let output_path = input_path.as_ref().with_extension("vortex"); + let mut file = File::open(input_path).await?; + + let metadata = + ArrowReaderMetadata::load_async(&mut file, ArrowReaderOptions::default()).await?; + let has_root_level_nulls = metadata.parquet_schema().root_schema().is_optional(); + + let mut reader = ParquetRecordBatchStreamBuilder::new(file).await?.build()?; + let mut chunks = Vec::new(); + + while let Some(mut reader) = reader.next_row_group().await? { + for batch in reader.by_ref() { + let batch = ArrowStructArray::from(batch?); + let next_chunk = ArrayRef::from_arrow(&batch, has_root_level_nulls); + chunks.push(next_chunk); + } + } + + let read_complete = Instant::now(); + + if !flags.quiet { + eprintln!( + "Read Parquet file in {:?}", + read_complete.duration_since(wall_start) + ); + + eprintln!( + "Writing {} chunks to new Vortex file {}", + chunks.len(), + output_path.display() + ); + } + + let dtype = chunks.first().vortex_expect("empty chunks").dtype().clone(); + let chunked_array = ChunkedArray::try_new(chunks, dtype)?; + + let writer = VortexWriteOptions::default(); + + let output_file = File::create(output_path).await?; + + if !flags.quiet { + let pb = ProgressBar::new(chunked_array.nchunks() as u64); + let stream = ProgressArrayStream { + inner: chunked_array.to_array_stream(), + progress: pb, + }; + writer.write(output_file, stream).await?; + } else { + writer + .write(output_file, chunked_array.to_array_stream()) + .await?; + } + + if !flags.quiet { + eprintln!( + "Wrote Vortex in {:?}", + Instant::now().duration_since(read_complete) + ); + } + + Ok(()) +} + +#[pin_project] +struct ProgressArrayStream { + #[pin] + inner: S, + progress: ProgressBar, +} + +impl Stream for ProgressArrayStream +where + S: Stream>, +{ + type Item = VortexResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + match this.inner.poll_next(cx) { + Poll::Ready(inner) => { + this.progress.inc(1); + Poll::Ready(inner) + } + Poll::Pending => Poll::Pending, + } + } +} + +impl ArrayStream for ProgressArrayStream +where + S: ArrayStream, +{ + fn dtype(&self) -> &DType { + self.inner.dtype() + } +} diff --git a/vortex-tui/src/main.rs b/vortex-tui/src/main.rs index 91199c77ac..fe3579c808 100644 --- a/vortex-tui/src/main.rs +++ b/vortex-tui/src/main.rs @@ -1,5 +1,6 @@ #![allow(clippy::expect_used)] mod browse; +mod convert; mod tree; use std::path::PathBuf; @@ -10,6 +11,8 @@ use clap::Parser; use tokio::runtime::Runtime; use tree::exec_tree; +use crate::convert::{Flags, exec_convert}; + static TOKIO_RUNTIME: LazyLock = LazyLock::new(|| Runtime::new().expect("Tokio Runtime::new()")); @@ -21,14 +24,28 @@ struct Cli { #[derive(Debug, clap::Subcommand)] enum Commands { + /// Print the encoding tree of a Vortex file. Tree { file: PathBuf }, + /// Convert a Parquet file to a Vortex file. Chunking occurs on Parquet RowGroup boundaries. + Convert { + /// Path to the Parquet file on disk to convert to Vortex + file: PathBuf, + + /// Execute quietly. No output will be printed. + #[arg(short, long)] + quiet: bool, + }, + /// Interactively browse the Vortex file. Browse { file: PathBuf }, } fn main() { let cli = Cli::parse(); match cli.command { - Commands::Tree { file } => exec_tree(file).expect("exec_tre"), + Commands::Tree { file } => TOKIO_RUNTIME.block_on(exec_tree(file)).expect("exec_tree"), + Commands::Convert { file, quiet } => TOKIO_RUNTIME + .block_on(exec_convert(file, Flags { quiet })) + .expect("exec_convert"), Commands::Browse { file } => exec_tui(file).expect("exec_tui"), } } diff --git a/vortex-tui/src/tree.rs b/vortex-tui/src/tree.rs index 765d188af3..ac6232a540 100644 --- a/vortex-tui/src/tree.rs +++ b/vortex-tui/src/tree.rs @@ -4,19 +4,15 @@ use vortex::error::VortexResult; use vortex::file::VortexOpenOptions; use vortex::io::TokioFile; -use crate::TOKIO_RUNTIME; - -pub fn exec_tree(file: impl AsRef) -> VortexResult<()> { +pub async fn exec_tree(file: impl AsRef) -> VortexResult<()> { let opened = TokioFile::open(file)?; - let full = TOKIO_RUNTIME.block_on(async move { - VortexOpenOptions::file(opened) - .open() - .await? - .scan() - .read_all() - .await - })?; + let full = VortexOpenOptions::file(opened) + .open() + .await? + .scan() + .read_all() + .await?; println!("{}", full.tree_display());