Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: vortex CLI convert parquet to Vortex #2649

Merged
merged 7 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion vortex-tui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@ rust-version = { workspace = true }
version = { workspace = true }

[dependencies]
arrow-array = { workspace = true }
async-trait = { workspace = true }
clap = { workspace = true, features = ["derive"] }
crossterm = { workspace = true }
futures-util = { workspace = true }
indicatif = { workspace = true }
parquet = { workspace = true, features = ["arrow", "async"] }
pin-project = { workspace = true }
ratatui = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }
vortex = { workspace = true, features = ["tokio"] }
vortex = { workspace = true, features = ["tokio", "parquet"] }
vortex-layout = { workspace = true }

[lints]
Expand Down
134 changes: 134 additions & 0 deletions vortex-tui/src/convert.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#![allow(clippy::use_debug)]

use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Instant;

use arrow_array::StructArray as ArrowStructArray;
use futures_util::Stream;
use indicatif::ProgressBar;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use pin_project::pin_project;
use tokio::fs::File;
use vortex::arrays::ChunkedArray;
use vortex::arrow::FromArrowArray;
use vortex::dtype::DType;
use vortex::error::{VortexExpect, VortexResult};
use vortex::file::VortexWriteOptions;
use vortex::stream::{ArrayStream, ArrayStreamArrayExt};
use vortex::{Array, ArrayRef};

#[derive(Default)]
pub struct Flags {
pub quiet: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it usually the other way around?!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think usually you want the print. See curl or wget for example

}

/// Convert Parquet files to Vortex.
pub async fn exec_convert(input_path: impl AsRef<Path>, flags: Flags) -> VortexResult<()> {
if !flags.quiet {
eprintln!(
"Converting input Parquet file: {}",
input_path.as_ref().display()
);
}

let wall_start = Instant::now();

let output_path = input_path.as_ref().with_extension("vortex");
let mut file = File::open(input_path).await?;

let metadata =
ArrowReaderMetadata::load_async(&mut file, ArrowReaderOptions::default()).await?;
let has_root_level_nulls = metadata.parquet_schema().root_schema().is_optional();

let mut reader = ParquetRecordBatchStreamBuilder::new(file).await?.build()?;
let mut chunks = Vec::new();

while let Some(mut reader) = reader.next_row_group().await? {
for batch in reader.by_ref() {
let batch = ArrowStructArray::from(batch?);
let next_chunk = ArrayRef::from_arrow(&batch, has_root_level_nulls);
chunks.push(next_chunk);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to just stream these straight into the writer without buffering everything in memory. using the row group count as your progress indicator

}
}

let read_complete = Instant::now();

if !flags.quiet {
eprintln!(
"Read Parquet file in {:?}",
read_complete.duration_since(wall_start)
);

eprintln!(
"Writing {} chunks to new Vortex file {}",
chunks.len(),
output_path.display()
);
}

let dtype = chunks.first().vortex_expect("empty chunks").dtype().clone();
let chunked_array = ChunkedArray::try_new(chunks, dtype)?;

let writer = VortexWriteOptions::default();

let output_file = File::create(output_path).await?;

if !flags.quiet {
let pb = ProgressBar::new(chunked_array.nchunks() as u64);
let stream = ProgressArrayStream {
inner: chunked_array.to_array_stream(),
progress: pb,
};
writer.write(output_file, stream).await?;
} else {
writer
.write(output_file, chunked_array.to_array_stream())
.await?;
}

if !flags.quiet {
eprintln!(
"Wrote Vortex in {:?}",
Instant::now().duration_since(read_complete)
);
}

Ok(())
}

#[pin_project]
struct ProgressArrayStream<S> {
#[pin]
inner: S,
progress: ProgressBar,
}

impl<S> Stream for ProgressArrayStream<S>
where
S: Stream<Item = VortexResult<ArrayRef>>,
{
type Item = VortexResult<ArrayRef>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match this.inner.poll_next(cx) {
Poll::Ready(inner) => {
this.progress.inc(1);
Poll::Ready(inner)
}
Poll::Pending => Poll::Pending,
}
}
}

impl<S> ArrayStream for ProgressArrayStream<S>
where
S: ArrayStream,
{
fn dtype(&self) -> &DType {
self.inner.dtype()
}
}
19 changes: 18 additions & 1 deletion vortex-tui/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(clippy::expect_used)]
mod browse;
mod convert;
mod tree;

use std::path::PathBuf;
Expand All @@ -10,6 +11,8 @@ use clap::Parser;
use tokio::runtime::Runtime;
use tree::exec_tree;

use crate::convert::{Flags, exec_convert};

static TOKIO_RUNTIME: LazyLock<Runtime> =
LazyLock::new(|| Runtime::new().expect("Tokio Runtime::new()"));

Expand All @@ -21,14 +24,28 @@ struct Cli {

#[derive(Debug, clap::Subcommand)]
enum Commands {
/// Print the encoding tree of a Vortex file.
Tree { file: PathBuf },
/// Convert a Parquet file to a Vortex file. Chunking occurs on Parquet RowGroup boundaries.
Convert {
/// Path to the Parquet file on disk to convert to Vortex
file: PathBuf,

/// Execute quietly. No output will be printed.
#[arg(short, long)]
quiet: bool,
},
/// Interactively browse the Vortex file.
Browse { file: PathBuf },
}

fn main() {
let cli = Cli::parse();
match cli.command {
Commands::Tree { file } => exec_tree(file).expect("exec_tre"),
Commands::Tree { file } => TOKIO_RUNTIME.block_on(exec_tree(file)).expect("exec_tree"),
Commands::Convert { file, quiet } => TOKIO_RUNTIME
.block_on(exec_convert(file, Flags { quiet }))
.expect("exec_convert"),
Commands::Browse { file } => exec_tui(file).expect("exec_tui"),
}
}
18 changes: 7 additions & 11 deletions vortex-tui/src/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,15 @@ use vortex::error::VortexResult;
use vortex::file::VortexOpenOptions;
use vortex::io::TokioFile;

use crate::TOKIO_RUNTIME;

pub fn exec_tree(file: impl AsRef<Path>) -> VortexResult<()> {
pub async fn exec_tree(file: impl AsRef<Path>) -> VortexResult<()> {
let opened = TokioFile::open(file)?;

let full = TOKIO_RUNTIME.block_on(async move {
VortexOpenOptions::file(opened)
.open()
.await?
.scan()
.read_all()
.await
})?;
let full = VortexOpenOptions::file(opened)
.open()
.await?
.scan()
.read_all()
.await?;

println!("{}", full.tree_display());

Expand Down
Loading