Skip to content

Commit 273de98

Browse files
authored
feat: vortex CLI convert parquet to Vortex (#2649)
1 parent 8b13f5d commit 273de98

File tree

5 files changed

+170
-13
lines changed

5 files changed

+170
-13
lines changed

Cargo.lock

+5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vortex-tui/Cargo.toml

+6-1
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,17 @@ rust-version = { workspace = true }
1414
version = { workspace = true }
1515

1616
[dependencies]
17+
arrow-array = { workspace = true }
1718
async-trait = { workspace = true }
1819
clap = { workspace = true, features = ["derive"] }
1920
crossterm = { workspace = true }
21+
futures-util = { workspace = true }
22+
indicatif = { workspace = true }
23+
parquet = { workspace = true, features = ["arrow", "async"] }
24+
pin-project = { workspace = true }
2025
ratatui = { workspace = true }
2126
tokio = { workspace = true, features = ["rt-multi-thread"] }
22-
vortex = { workspace = true, features = ["tokio"] }
27+
vortex = { workspace = true, features = ["tokio", "parquet"] }
2328
vortex-layout = { workspace = true }
2429

2530
[lints]

vortex-tui/src/convert.rs

+134
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
#![allow(clippy::use_debug)]
2+
3+
use std::path::Path;
4+
use std::pin::Pin;
5+
use std::task::{Context, Poll};
6+
use std::time::Instant;
7+
8+
use arrow_array::StructArray as ArrowStructArray;
9+
use futures_util::Stream;
10+
use indicatif::ProgressBar;
11+
use parquet::arrow::ParquetRecordBatchStreamBuilder;
12+
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
13+
use pin_project::pin_project;
14+
use tokio::fs::File;
15+
use vortex::arrays::ChunkedArray;
16+
use vortex::arrow::FromArrowArray;
17+
use vortex::dtype::DType;
18+
use vortex::error::{VortexExpect, VortexResult};
19+
use vortex::file::VortexWriteOptions;
20+
use vortex::stream::{ArrayStream, ArrayStreamArrayExt};
21+
use vortex::{Array, ArrayRef};
22+
23+
#[derive(Default)]
24+
pub struct Flags {
25+
pub quiet: bool,
26+
}
27+
28+
/// Convert Parquet files to Vortex.
29+
pub async fn exec_convert(input_path: impl AsRef<Path>, flags: Flags) -> VortexResult<()> {
30+
if !flags.quiet {
31+
eprintln!(
32+
"Converting input Parquet file: {}",
33+
input_path.as_ref().display()
34+
);
35+
}
36+
37+
let wall_start = Instant::now();
38+
39+
let output_path = input_path.as_ref().with_extension("vortex");
40+
let mut file = File::open(input_path).await?;
41+
42+
let metadata =
43+
ArrowReaderMetadata::load_async(&mut file, ArrowReaderOptions::default()).await?;
44+
let has_root_level_nulls = metadata.parquet_schema().root_schema().is_optional();
45+
46+
let mut reader = ParquetRecordBatchStreamBuilder::new(file).await?.build()?;
47+
let mut chunks = Vec::new();
48+
49+
while let Some(mut reader) = reader.next_row_group().await? {
50+
for batch in reader.by_ref() {
51+
let batch = ArrowStructArray::from(batch?);
52+
let next_chunk = ArrayRef::from_arrow(&batch, has_root_level_nulls);
53+
chunks.push(next_chunk);
54+
}
55+
}
56+
57+
let read_complete = Instant::now();
58+
59+
if !flags.quiet {
60+
eprintln!(
61+
"Read Parquet file in {:?}",
62+
read_complete.duration_since(wall_start)
63+
);
64+
65+
eprintln!(
66+
"Writing {} chunks to new Vortex file {}",
67+
chunks.len(),
68+
output_path.display()
69+
);
70+
}
71+
72+
let dtype = chunks.first().vortex_expect("empty chunks").dtype().clone();
73+
let chunked_array = ChunkedArray::try_new(chunks, dtype)?;
74+
75+
let writer = VortexWriteOptions::default();
76+
77+
let output_file = File::create(output_path).await?;
78+
79+
if !flags.quiet {
80+
let pb = ProgressBar::new(chunked_array.nchunks() as u64);
81+
let stream = ProgressArrayStream {
82+
inner: chunked_array.to_array_stream(),
83+
progress: pb,
84+
};
85+
writer.write(output_file, stream).await?;
86+
} else {
87+
writer
88+
.write(output_file, chunked_array.to_array_stream())
89+
.await?;
90+
}
91+
92+
if !flags.quiet {
93+
eprintln!(
94+
"Wrote Vortex in {:?}",
95+
Instant::now().duration_since(read_complete)
96+
);
97+
}
98+
99+
Ok(())
100+
}
101+
102+
#[pin_project]
103+
struct ProgressArrayStream<S> {
104+
#[pin]
105+
inner: S,
106+
progress: ProgressBar,
107+
}
108+
109+
impl<S> Stream for ProgressArrayStream<S>
110+
where
111+
S: Stream<Item = VortexResult<ArrayRef>>,
112+
{
113+
type Item = VortexResult<ArrayRef>;
114+
115+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
116+
let this = self.project();
117+
match this.inner.poll_next(cx) {
118+
Poll::Ready(inner) => {
119+
this.progress.inc(1);
120+
Poll::Ready(inner)
121+
}
122+
Poll::Pending => Poll::Pending,
123+
}
124+
}
125+
}
126+
127+
impl<S> ArrayStream for ProgressArrayStream<S>
128+
where
129+
S: ArrayStream,
130+
{
131+
fn dtype(&self) -> &DType {
132+
self.inner.dtype()
133+
}
134+
}

vortex-tui/src/main.rs

+18-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#![allow(clippy::expect_used)]
22
mod browse;
3+
mod convert;
34
mod tree;
45

56
use std::path::PathBuf;
@@ -10,6 +11,8 @@ use clap::Parser;
1011
use tokio::runtime::Runtime;
1112
use tree::exec_tree;
1213

14+
use crate::convert::{Flags, exec_convert};
15+
1316
static TOKIO_RUNTIME: LazyLock<Runtime> =
1417
LazyLock::new(|| Runtime::new().expect("Tokio Runtime::new()"));
1518

@@ -21,14 +24,28 @@ struct Cli {
2124

2225
#[derive(Debug, clap::Subcommand)]
2326
enum Commands {
27+
/// Print the encoding tree of a Vortex file.
2428
Tree { file: PathBuf },
29+
/// Convert a Parquet file to a Vortex file. Chunking occurs on Parquet RowGroup boundaries.
30+
Convert {
31+
/// Path to the Parquet file on disk to convert to Vortex
32+
file: PathBuf,
33+
34+
/// Execute quietly. No output will be printed.
35+
#[arg(short, long)]
36+
quiet: bool,
37+
},
38+
/// Interactively browse the Vortex file.
2539
Browse { file: PathBuf },
2640
}
2741

2842
fn main() {
2943
let cli = Cli::parse();
3044
match cli.command {
31-
Commands::Tree { file } => exec_tree(file).expect("exec_tre"),
45+
Commands::Tree { file } => TOKIO_RUNTIME.block_on(exec_tree(file)).expect("exec_tree"),
46+
Commands::Convert { file, quiet } => TOKIO_RUNTIME
47+
.block_on(exec_convert(file, Flags { quiet }))
48+
.expect("exec_convert"),
3249
Commands::Browse { file } => exec_tui(file).expect("exec_tui"),
3350
}
3451
}

vortex-tui/src/tree.rs

+7-11
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,15 @@ use vortex::error::VortexResult;
44
use vortex::file::VortexOpenOptions;
55
use vortex::io::TokioFile;
66

7-
use crate::TOKIO_RUNTIME;
8-
9-
pub fn exec_tree(file: impl AsRef<Path>) -> VortexResult<()> {
7+
pub async fn exec_tree(file: impl AsRef<Path>) -> VortexResult<()> {
108
let opened = TokioFile::open(file)?;
119

12-
let full = TOKIO_RUNTIME.block_on(async move {
13-
VortexOpenOptions::file(opened)
14-
.open()
15-
.await?
16-
.scan()
17-
.read_all()
18-
.await
19-
})?;
10+
let full = VortexOpenOptions::file(opened)
11+
.open()
12+
.await?
13+
.scan()
14+
.read_all()
15+
.await?;
2016

2117
println!("{}", full.tree_display());
2218

0 commit comments

Comments
 (0)