Skip to content

Commit fa2fbfd

Browse files
authored
added a flush method to IPC writers (#6108)
While the writers expose `get_ref` and `get_mut` to access the underlying `io::Write` writer, there is an internal layer of a `BufWriter` that is not accessible. Because of that, there is no way to ensure that all messages written thus far to the `StreamWriter` or `FileWriter` have actually been passed to the underlying writer. Here we expose a `flush` method that flushes the internal buffer and the underlying writer. See #6099 for the discussion.
1 parent 49e714d commit fa2fbfd

File tree

1 file changed

+63
-0
lines changed

1 file changed

+63
-0
lines changed

arrow-ipc/src/writer.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -982,6 +982,14 @@ impl<W: Write> FileWriter<W> {
982982
self.writer.get_mut()
983983
}
984984

985+
/// Flush the underlying writer.
986+
///
987+
/// Both the BufWriter and the underlying writer are flushed.
988+
pub fn flush(&mut self) -> Result<(), ArrowError> {
989+
self.writer.flush()?;
990+
Ok(())
991+
}
992+
985993
/// Unwraps the BufWriter housed in FileWriter.writer, returning the underlying
986994
/// writer
987995
///
@@ -1097,6 +1105,14 @@ impl<W: Write> StreamWriter<W> {
10971105
self.writer.get_mut()
10981106
}
10991107

1108+
/// Flush the underlying writer.
1109+
///
1110+
/// Both the BufWriter and the underlying writer are flushed.
1111+
pub fn flush(&mut self) -> Result<(), ArrowError> {
1112+
self.writer.flush()?;
1113+
Ok(())
1114+
}
1115+
11001116
/// Unwraps the BufWriter housed in StreamWriter.writer, returning the underlying
11011117
/// writer
11021118
///
@@ -2615,4 +2631,51 @@ mod tests {
26152631
offset from expected alignment of 16 by 8"
26162632
);
26172633
}
2634+
2635+
#[test]
2636+
fn test_flush() {
2637+
// We write a schema which is small enough to fit into a buffer and not get flushed,
2638+
// and then force the write with .flush().
2639+
let num_cols = 2;
2640+
let mut fields = Vec::new();
2641+
let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
2642+
for i in 0..num_cols {
2643+
let field = Field::new(&format!("col_{}", i), DataType::Decimal128(38, 10), true);
2644+
fields.push(field);
2645+
}
2646+
let schema = Schema::new(fields);
2647+
let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
2648+
let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
2649+
let mut stream_writer =
2650+
StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
2651+
.unwrap();
2652+
let mut file_writer =
2653+
FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
2654+
2655+
let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
2656+
let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
2657+
stream_writer.flush().unwrap();
2658+
file_writer.flush().unwrap();
2659+
let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
2660+
let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
2661+
let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
2662+
// Finishing a stream writes the continuation bytes in MetadataVersion::V5 (4 bytes)
2663+
// and then a length of 0 (4 bytes) for a total of 8 bytes.
2664+
// Everything before that should have been flushed in the .flush() call.
2665+
let expected_stream_flushed_bytes = stream_out.len() - 8;
2666+
// A file write is the same as the stream write except for the leading magic string
2667+
// ARROW1 plus padding, which is 8 bytes.
2668+
let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
2669+
2670+
assert!(
2671+
stream_bytes_written_on_new < stream_bytes_written_on_flush,
2672+
"this test makes no sense if flush is not actually required"
2673+
);
2674+
assert!(
2675+
file_bytes_written_on_new < file_bytes_written_on_flush,
2676+
"this test makes no sense if flush is not actually required"
2677+
);
2678+
assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
2679+
assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
2680+
}
26182681
}

0 commit comments

Comments
 (0)