@@ -228,7 +228,7 @@ impl ArrowReaderMetadata {
228
228
/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
229
229
pub struct AsyncReader < T > ( T ) ;
230
230
231
- /// A builder used to construct a [`ParquetRecordBatchStream`] for a parquet file
231
+ /// A builder used to construct a [`ParquetRecordBatchStream`] for `async` reading of a parquet file
232
232
///
233
233
/// In particular, this handles reading the parquet file metadata, allowing consumers
234
234
/// to use this information to select what specific columns, row groups, etc...
@@ -239,6 +239,37 @@ pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>
239
239
240
240
impl < T : AsyncFileReader + Send + ' static > ParquetRecordBatchStreamBuilder < T > {
241
241
/// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file
242
+ ///
243
+ /// # Example
244
+ ///
245
+ /// ```
246
+ /// # use std::fs::metadata;
247
+ /// # use std::sync::Arc;
248
+ /// # use bytes::Bytes;
249
+ /// # use arrow_array::{Int32Array, RecordBatch};
250
+ /// # use arrow_schema::{DataType, Field, Schema};
251
+ /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
252
+ /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
253
+ /// # use tempfile::tempfile;
254
+ /// # use futures::StreamExt;
255
+ /// # #[tokio::main(flavor="current_thread")]
256
+ /// # async fn main() {
257
+ /// #
258
+ /// # let mut file = tempfile().unwrap();
259
+ /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
260
+ /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
261
+ /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
262
+ /// # writer.write(&batch).unwrap();
263
+ /// # writer.close().unwrap();
264
+ /// // Open async file containing parquet data
265
+ /// let mut file = tokio::fs::File::from_std(file);
266
+ /// // construct the reader
267
+ /// let mut reader = ParquetRecordBatchStreamBuilder::new(file)
268
+ /// .await.unwrap().build().unwrap();
269
+ /// // Read batche
270
+ /// let batch: RecordBatch = reader.next().await.unwrap().unwrap();
271
+ /// # }
272
+ /// ```
242
273
pub async fn new ( input : T ) -> Result < Self > {
243
274
Self :: new_with_options ( input, Default :: default ( ) ) . await
244
275
}
@@ -253,7 +284,9 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
253
284
/// Create a [`ParquetRecordBatchStreamBuilder`] from the provided [`ArrowReaderMetadata`]
254
285
///
255
286
/// This allows loading metadata once and using it to create multiple builders with
256
- /// potentially different settings
287
+ /// potentially different settings, that can be read in parallel.
288
+ ///
289
+ /// # Example of reading from multiple streams in parallel
257
290
///
258
291
/// ```
259
292
/// # use std::fs::metadata;
@@ -268,23 +301,29 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
268
301
/// # #[tokio::main(flavor="current_thread")]
269
302
/// # async fn main() {
270
303
/// #
271
- /// let mut file = tempfile().unwrap();
304
+ /// # let mut file = tempfile().unwrap();
272
305
/// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
273
306
/// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
274
307
/// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
275
308
/// # writer.write(&batch).unwrap();
276
309
/// # writer.close().unwrap();
277
- /// #
310
+ /// // open file with parquet data
278
311
/// let mut file = tokio::fs::File::from_std(file);
312
+ /// // load metadata once
279
313
/// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap();
314
+ /// // create two readers, a and b, from the same underlying file
315
+ /// // without reading the metadata again
280
316
/// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata(
281
317
/// file.try_clone().await.unwrap(),
282
318
/// meta.clone()
283
319
/// ).build().unwrap();
284
320
/// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap();
285
321
///
286
- /// // Should be able to read from both in parallel
287
- /// assert_eq!(a.next().await.unwrap().unwrap(), b.next().await.unwrap().unwrap());
322
+ /// // Can read batches from both readers in parallel
323
+ /// assert_eq!(
324
+ /// a.next().await.unwrap().unwrap(),
325
+ /// b.next().await.unwrap().unwrap(),
326
+ /// );
288
327
/// # }
289
328
/// ```
290
329
pub fn new_with_metadata ( input : T , metadata : ArrowReaderMetadata ) -> Self {
0 commit comments