-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: vortex CLI convert parquet to Vortex #2649
Changes from all commits
e4a96d0
1ba3e6f
83bcf7c
547b500
8adef64
13ca5c2
1851358
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
#![allow(clippy::use_debug)] | ||
|
||
use std::path::Path; | ||
use std::pin::Pin; | ||
use std::task::{Context, Poll}; | ||
use std::time::Instant; | ||
|
||
use arrow_array::StructArray as ArrowStructArray; | ||
use futures_util::Stream; | ||
use indicatif::ProgressBar; | ||
use parquet::arrow::ParquetRecordBatchStreamBuilder; | ||
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; | ||
use pin_project::pin_project; | ||
use tokio::fs::File; | ||
use vortex::arrays::ChunkedArray; | ||
use vortex::arrow::FromArrowArray; | ||
use vortex::dtype::DType; | ||
use vortex::error::{VortexExpect, VortexResult}; | ||
use vortex::file::VortexWriteOptions; | ||
use vortex::stream::{ArrayStream, ArrayStreamArrayExt}; | ||
use vortex::{Array, ArrayRef}; | ||
|
||
#[derive(Default)] | ||
pub struct Flags { | ||
pub quiet: bool, | ||
} | ||
|
||
/// Convert Parquet files to Vortex. | ||
pub async fn exec_convert(input_path: impl AsRef<Path>, flags: Flags) -> VortexResult<()> { | ||
if !flags.quiet { | ||
eprintln!( | ||
"Converting input Parquet file: {}", | ||
input_path.as_ref().display() | ||
); | ||
} | ||
|
||
let wall_start = Instant::now(); | ||
|
||
let output_path = input_path.as_ref().with_extension("vortex"); | ||
let mut file = File::open(input_path).await?; | ||
|
||
let metadata = | ||
ArrowReaderMetadata::load_async(&mut file, ArrowReaderOptions::default()).await?; | ||
let has_root_level_nulls = metadata.parquet_schema().root_schema().is_optional(); | ||
|
||
let mut reader = ParquetRecordBatchStreamBuilder::new(file).await?.build()?; | ||
let mut chunks = Vec::new(); | ||
|
||
while let Some(mut reader) = reader.next_row_group().await? { | ||
for batch in reader.by_ref() { | ||
let batch = ArrowStructArray::from(batch?); | ||
let next_chunk = ArrayRef::from_arrow(&batch, has_root_level_nulls); | ||
chunks.push(next_chunk); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should be able to just stream these straight into the writer without buffering everything in memory. using the row group count as your progress indicator |
||
} | ||
} | ||
|
||
let read_complete = Instant::now(); | ||
|
||
if !flags.quiet { | ||
eprintln!( | ||
"Read Parquet file in {:?}", | ||
read_complete.duration_since(wall_start) | ||
); | ||
|
||
eprintln!( | ||
"Writing {} chunks to new Vortex file {}", | ||
chunks.len(), | ||
output_path.display() | ||
); | ||
} | ||
|
||
let dtype = chunks.first().vortex_expect("empty chunks").dtype().clone(); | ||
let chunked_array = ChunkedArray::try_new(chunks, dtype)?; | ||
|
||
let writer = VortexWriteOptions::default(); | ||
|
||
let output_file = File::create(output_path).await?; | ||
|
||
if !flags.quiet { | ||
let pb = ProgressBar::new(chunked_array.nchunks() as u64); | ||
let stream = ProgressArrayStream { | ||
inner: chunked_array.to_array_stream(), | ||
progress: pb, | ||
}; | ||
writer.write(output_file, stream).await?; | ||
} else { | ||
writer | ||
.write(output_file, chunked_array.to_array_stream()) | ||
.await?; | ||
} | ||
|
||
if !flags.quiet { | ||
eprintln!( | ||
"Wrote Vortex in {:?}", | ||
Instant::now().duration_since(read_complete) | ||
); | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
#[pin_project] | ||
struct ProgressArrayStream<S> { | ||
#[pin] | ||
inner: S, | ||
progress: ProgressBar, | ||
} | ||
|
||
impl<S> Stream for ProgressArrayStream<S> | ||
where | ||
S: Stream<Item = VortexResult<ArrayRef>>, | ||
{ | ||
type Item = VortexResult<ArrayRef>; | ||
|
||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
let this = self.project(); | ||
match this.inner.poll_next(cx) { | ||
Poll::Ready(inner) => { | ||
this.progress.inc(1); | ||
Poll::Ready(inner) | ||
} | ||
Poll::Pending => Poll::Pending, | ||
} | ||
} | ||
} | ||
|
||
impl<S> ArrayStream for ProgressArrayStream<S> | ||
where | ||
S: ArrayStream, | ||
{ | ||
fn dtype(&self) -> &DType { | ||
self.inner.dtype() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it usually the other way around?!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think usually you want the print. See curl or wget for example