Skip to content

Commit 45d9857

Browse files
committed
feat: vortex CLI convert parquet to Vortex
1 parent dd86be7 commit 45d9857

File tree

5 files changed

+114
-13
lines changed

5 files changed

+114
-13
lines changed

Cargo.lock

+5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vortex-tui/Cargo.toml

+6-1
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,17 @@ rust-version = { workspace = true }
1414
version = { workspace = true }
1515

1616
[dependencies]
17+
arrow-array = { workspace = true }
1718
async-trait = { workspace = true }
1819
clap = { workspace = true, features = ["derive"] }
1920
crossterm = { workspace = true }
21+
futures-util = { workspace = true }
22+
indicatif = { workspace = true }
23+
parquet = { workspace = true, features = ["arrow", "async"] }
24+
pin-project = { workspace = true }
2025
ratatui = { workspace = true }
2126
tokio = { workspace = true, features = ["rt-multi-thread"] }
22-
vortex = { workspace = true, features = ["tokio"] }
27+
vortex = { workspace = true, features = ["tokio", "parquet"] }
2328
vortex-layout = { workspace = true }
2429

2530
[lints]

vortex-tui/src/convert.rs

+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
use std::path::Path;
2+
use std::pin::Pin;
3+
use std::task::{Context, Poll};
4+
5+
use arrow_array::StructArray as ArrowStructArray;
6+
use futures_util::Stream;
7+
use indicatif::ProgressBar;
8+
use parquet::arrow::ParquetRecordBatchStreamBuilder;
9+
use pin_project::pin_project;
10+
use tokio::fs::File;
11+
use vortex::arrays::ChunkedArray;
12+
use vortex::arrow::FromArrowArray;
13+
use vortex::dtype::DType;
14+
use vortex::error::{VortexExpect, VortexResult};
15+
use vortex::file::VortexWriteOptions;
16+
use vortex::stream::{ArrayStream, ArrayStreamArrayExt};
17+
use vortex::{Array, ArrayRef};
18+
19+
/// Convert Parquet files to Vortex, and Vortex to Parquet.
20+
pub async fn exec_convert(input_path: impl AsRef<Path>) -> VortexResult<()> {
21+
let output_path = input_path.as_ref().with_extension("vortex");
22+
let file = File::open(input_path).await?;
23+
let mut reader = ParquetRecordBatchStreamBuilder::new(file).await?.build()?;
24+
25+
// Create a new ChunkedArray and push a new chunk in for every RowGroup.
26+
27+
let mut chunks = Vec::new();
28+
29+
while let Some(mut reader) = reader.next_row_group().await? {
30+
while let Some(batch) = reader.next() {
31+
let batch = ArrowStructArray::from(batch?);
32+
let next_chunk = ArrayRef::from_arrow(&batch, true);
33+
chunks.push(next_chunk);
34+
}
35+
}
36+
37+
let dtype = chunks.first().vortex_expect("empty chunks").dtype().clone();
38+
let chunked_array = ChunkedArray::try_new(chunks, dtype)?;
39+
40+
let writer = VortexWriteOptions::default();
41+
42+
let pb = ProgressBar::new(chunked_array.nchunks() as u64);
43+
let stream = ProgressArrayStream {
44+
inner: chunked_array.to_array_stream(),
45+
progress: pb,
46+
};
47+
48+
let output_file = File::create(output_path).await?;
49+
writer.write(output_file, stream).await?;
50+
51+
Ok(())
52+
}
53+
54+
#[pin_project]
55+
struct ProgressArrayStream<S> {
56+
#[pin]
57+
inner: S,
58+
progress: ProgressBar,
59+
}
60+
61+
impl<S> Stream for ProgressArrayStream<S>
62+
where
63+
S: Stream<Item = VortexResult<ArrayRef>>,
64+
{
65+
type Item = VortexResult<ArrayRef>;
66+
67+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
68+
let this = self.project();
69+
match this.inner.poll_next(cx) {
70+
Poll::Ready(inner) => {
71+
this.progress.inc(1);
72+
Poll::Ready(inner)
73+
}
74+
Poll::Pending => {
75+
return Poll::Pending;
76+
}
77+
}
78+
}
79+
}
80+
81+
impl<S> ArrayStream for ProgressArrayStream<S>
82+
where
83+
S: ArrayStream,
84+
{
85+
fn dtype(&self) -> &DType {
86+
self.inner.dtype()
87+
}
88+
}

vortex-tui/src/main.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#![allow(clippy::expect_used)]
22
mod browse;
3+
mod convert;
34
mod tree;
45

56
use std::path::PathBuf;
@@ -10,6 +11,8 @@ use clap::Parser;
1011
use tokio::runtime::Runtime;
1112
use tree::exec_tree;
1213

14+
use crate::convert::exec_convert;
15+
1316
static TOKIO_RUNTIME: LazyLock<Runtime> =
1417
LazyLock::new(|| Runtime::new().expect("Tokio Runtime::new()"));
1518

@@ -22,13 +25,17 @@ struct Cli {
2225
#[derive(Debug, clap::Subcommand)]
2326
enum Commands {
2427
Tree { file: PathBuf },
28+
Convert { file: PathBuf },
2529
Browse { file: PathBuf },
2630
}
2731

2832
fn main() {
2933
let cli = Cli::parse();
3034
match cli.command {
31-
Commands::Tree { file } => exec_tree(file).expect("exec_tre"),
35+
Commands::Tree { file } => TOKIO_RUNTIME.block_on(exec_tree(file)).expect("exec_tre"),
36+
Commands::Convert { file } => TOKIO_RUNTIME
37+
.block_on(exec_convert(file))
38+
.expect("exec_convert"),
3239
Commands::Browse { file } => exec_tui(file).expect("exec_tui"),
3340
}
3441
}

vortex-tui/src/tree.rs

+7-11
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,15 @@ use vortex::error::VortexResult;
44
use vortex::file::VortexOpenOptions;
55
use vortex::io::TokioFile;
66

7-
use crate::TOKIO_RUNTIME;
8-
9-
pub fn exec_tree(file: impl AsRef<Path>) -> VortexResult<()> {
7+
pub async fn exec_tree(file: impl AsRef<Path>) -> VortexResult<()> {
108
let opened = TokioFile::open(file)?;
119

12-
let full = TOKIO_RUNTIME.block_on(async move {
13-
VortexOpenOptions::file(opened)
14-
.open()
15-
.await?
16-
.scan()
17-
.into_array()
18-
.await
19-
})?;
10+
let full = VortexOpenOptions::file(opened)
11+
.open()
12+
.await?
13+
.scan()
14+
.into_array()
15+
.await?;
2016

2117
println!("{}", full.tree_display());
2218

0 commit comments

Comments
 (0)