Skip to content

Commit ac759d5

Browse files
alambetseidl
andauthored
Minor: Improve parallel parquet encoding example (#7323)
* Minor: Improve parallel parquet encoding example * Apply suggestions from code review Co-authored-by: Ed Seidl <[email protected]> --------- Co-authored-by: Ed Seidl <[email protected]>
1 parent ac00928 commit ac759d5

File tree

1 file changed

+27
-14
lines changed
  • parquet/src/arrow/arrow_writer

1 file changed

+27
-14
lines changed

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -530,8 +530,9 @@ impl ArrowColumnChunk {
530530

531531
/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
532532
///
533-
/// Note: This is a low-level interface for applications that require fine-grained control
534-
/// of encoding, see [`ArrowWriter`] for a higher-level interface
533+
/// Note: This is a low-level interface for applications that require
534+
/// fine-grained control of encoding (e.g. encoding using multiple threads),
535+
/// see [`ArrowWriter`] for a higher-level interface
535536
///
536537
/// # Example: Encoding two Arrow Array's in Parallel
537538
/// ```
@@ -540,9 +541,9 @@ impl ArrowColumnChunk {
540541
/// # use arrow_array::*;
541542
/// # use arrow_schema::*;
542543
/// # use parquet::arrow::ArrowSchemaConverter;
543-
/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers};
544+
/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers, ArrowColumnChunk};
544545
/// # use parquet::file::properties::WriterProperties;
545-
/// # use parquet::file::writer::SerializedFileWriter;
546+
/// # use parquet::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
546547
/// #
547548
/// let schema = Arc::new(Schema::new(vec![
548549
/// Field::new("i32", DataType::Int32, false),
@@ -560,15 +561,20 @@ impl ArrowColumnChunk {
560561
/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();
561562
///
562563
/// // Spawn a worker thread for each column
563-
/// // This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better
564+
/// //
565+
/// // Note: This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better.
566+
/// // The `map` produces an iterator of type `tuple of (thread handle, send channel)`.
564567
/// let mut workers: Vec<_> = col_writers
565568
/// .into_iter()
566569
/// .map(|mut col_writer| {
567570
/// let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
568571
/// let handle = std::thread::spawn(move || {
572+
/// // receive Arrays to encode via the channel
569573
/// for col in recv {
570574
/// col_writer.write(&col)?;
571575
/// }
576+
/// // once the input is complete, close the writer
577+
/// // to return the newly created ArrowColumnChunk
572578
/// col_writer.close()
573579
/// });
574580
/// (handle, send)
@@ -577,33 +583,40 @@ impl ArrowColumnChunk {
577583
///
578584
/// // Create parquet writer
579585
/// let root_schema = parquet_schema.root_schema_ptr();
580-
/// let mut out = Vec::with_capacity(1024); // This could be a File
581-
/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone()).unwrap();
586+
/// // write to memory in the example, but this could be a File
587+
/// let mut out = Vec::with_capacity(1024);
588+
/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone())
589+
/// .unwrap();
582590
///
583591
/// // Start row group
584-
/// let mut row_group = writer.next_row_group().unwrap();
592+
/// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
593+
/// .next_row_group()
594+
/// .unwrap();
585595
///
586-
/// // Columns to encode
596+
/// // Create some example input columns to encode
587597
/// let to_write = vec![
588598
/// Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
589599
/// Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
590600
/// ];
591601
///
592-
/// // Spawn work to encode columns
602+
/// // Send the input columns to the workers
593603
/// let mut worker_iter = workers.iter_mut();
594604
/// for (arr, field) in to_write.iter().zip(&schema.fields) {
595605
/// for leaves in compute_leaves(field, arr).unwrap() {
596606
/// worker_iter.next().unwrap().1.send(leaves).unwrap();
597607
/// }
598608
/// }
599609
///
600-
/// // Finish up parallel column encoding
610+
/// // Wait for the workers to complete encoding, and append
611+
/// // the resulting column chunks to the row group (and the file)
601612
/// for (handle, send) in workers {
602613
/// drop(send); // Drop send side to signal termination
603-
/// let chunk = handle.join().unwrap().unwrap();
604-
/// chunk.append_to_row_group(&mut row_group).unwrap();
614+
/// // wait for the worker to send the completed chunk
615+
/// let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
616+
/// chunk.append_to_row_group(&mut row_group_writer).unwrap();
605617
/// }
606-
/// row_group.close().unwrap();
618+
/// // Close the row group which writes to the underlying file
619+
/// row_group_writer.close().unwrap();
607620
///
608621
/// let metadata = writer.close().unwrap();
609622
/// assert_eq!(metadata.num_rows, 3);

0 commit comments

Comments
 (0)