Skip to content

Commit

Permalink
clippy warnings/fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
c-peters committed Apr 14, 2024
1 parent 4655a3c commit f31e799
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 147 deletions.
2 changes: 2 additions & 0 deletions crates/polars-arrow/src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ pub(super) fn read_ipc_message_from_block<'a, R: Read + Seek>(
}

/// Read an encapsulated IPC Message from the reader
#[allow(clippy::needless_lifetimes)]
pub(super) fn read_ipc_message<'a, R: Read>(
reader: &mut R,
scratch: &'a mut Vec<u8>,
Expand Down
1 change: 1 addition & 0 deletions crates/polars-arrow/src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ async fn read_ipc_message_from_block<'a, R: AsyncRead + AsyncSeek + Unpin>(
read_ipc_message(reader, scratch).await
}

#[allow(clippy::needless_lifetimes)]
async fn read_ipc_message<'a, R>(
mut reader: R,
data: &'a mut Vec<u8>,
Expand Down
10 changes: 1 addition & 9 deletions crates/polars-arrow/src/io/ipc/read/flight_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,12 @@ use crate::io::ipc::read::schema::deserialize_stream_metadata;
use crate::io::ipc::read::{read_dictionary, read_record_batch, Dictionaries, StreamMetadata};
use crate::io::ipc::write::common::EncodedData;
use crate::record_batch::RecordBatch;
#[derive(Default)]
pub struct FlightStreamReader {
metadata: Option<StreamMetadata>,
dictionaries: Dictionaries,
}

impl Default for FlightStreamReader {
fn default() -> Self {
Self {
metadata: None,
dictionaries: Default::default(),
}
}
}

impl FlightStreamReader {
pub fn get_metadata(&self) -> PolarsResult<&ArrowSchema> {
self.metadata
Expand Down
138 changes: 0 additions & 138 deletions crates/polars-arrow/src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use super::common::*;
use super::{read_batch, read_file_dictionaries, Dictionaries, FileMetadata};
use crate::array::Array;
use crate::datatypes::ArrowSchema;
use crate::io::ipc::read::file;
use crate::record_batch::RecordBatch;

/// An iterator of [`RecordBatch`]s from an Arrow IPC file.
Expand Down Expand Up @@ -136,140 +135,3 @@ impl<R: Read + Seek> Iterator for FileReader<R> {
Some(chunk)
}
}

/// An iterator of raw bytes from an Arrow IPC file.
/// Returns the raw header and body of each IPC message without parsing it
/// Useful when efficiently sending data over the wire
#[cfg(feature = "io_flight")]
pub struct FlightFileReader<R: Read + Seek> {
reader: R,
has_read_footer: bool,
record_batch_blocks: std::vec::IntoIter<arrow_format::ipc::Block>,
dictionaries_blocks: Option<std::vec::IntoIter<arrow_format::ipc::Block>>,
finished: bool,
}

#[cfg(feature = "io_flight")]
pub struct IPCRawMessage {
pub data_header: Vec<u8>,
pub data_body: Vec<u8>,
}

#[cfg(feature = "io_flight")]
impl<R: Read + Seek> FlightFileReader<R> {
pub fn new(reader: R) -> Self {
Self {
reader,
has_read_footer: false,
record_batch_blocks: vec![].into_iter(),
dictionaries_blocks: None,
finished: false,
}
}

/// Check if the stream is finished
pub fn is_finished(&self) -> bool {
self.finished
}

/// Read in the footer data of the IPC file returning the schema
/// We need to read in the footer data, because the dictionaries do not
/// necessarily come before the batches which is required for streaming data
pub fn read_footer(&mut self) -> PolarsResult<arrow_format::ipc::Schema> {
let (_, footer_len) = file::read_footer_len(&mut self.reader)?;
let footer_data = file::read_footer(&mut self.reader, footer_len)?;
let footer = file::deserialize_footer_ref(&footer_data)?;

self.record_batch_blocks =
file::deserialize_record_batch_blocks_from_footer(footer)?.into_iter();
self.dictionaries_blocks =
file::deserialize_dictionary_blocks_from_footer(footer)?.map(|b| b.into_iter());

// Get the schema from the footer
let schema_ref = file::deserialize_schema_ref_from_footer(footer)?;
let schema: arrow_format::ipc::Schema = schema_ref.try_into()?;
Ok(schema)
}

/// Convert an IPC schema into an IPC Raw Message
/// The schema comes from the footer and does not have the message format
fn schema_to_raw_message(&self, schema: arrow_format::ipc::Schema) -> IPCRawMessage {
// Turn the IPC schema into an encapsulated message
let message = arrow_format::ipc::Message {
version: arrow_format::ipc::MetadataVersion::V5,
header: Some(arrow_format::ipc::MessageHeader::Schema(Box::new(schema))),
body_length: 0,
custom_metadata: None, // todo: allow writing custom metadata
};
let mut builder = arrow_format::ipc::planus::Builder::new();
let header = builder.finish(&message, None).to_vec();
IPCRawMessage {
data_header: header,
data_body: vec![],
}
}

fn block_to_raw_message(
&mut self,
block: arrow_format::ipc::Block,
) -> PolarsResult<IPCRawMessage> {
let mut header = vec![];
let mut body = vec![];
let message = read_ipc_message_from_block(&mut self.reader, &block, &mut header)?;

let block_length: u64 = message
.body_length()
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferBodyLength(err)))?
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
self.reader
.by_ref()
.take(block_length)
.read_to_end(&mut body)?;

Ok(IPCRawMessage {
data_header: header,
data_body: body,
})
}

/// Return the next message
/// If the the reader is finished return None
fn maybe_next(&mut self) -> PolarsResult<Option<IPCRawMessage>> {
if self.finished {
return Ok(None);
}
// Schema as the first message
if !self.has_read_footer {
let schema = self.read_footer()?;
self.has_read_footer = true;
return Ok(Some(self.schema_to_raw_message(schema)));
}

// Second send all the dictionaries
if let Some(iter) = self.dictionaries_blocks.as_mut() {
if let Some(block) = iter.next() {
return self.block_to_raw_message(block).map(Some);
} else {
self.dictionaries_blocks = None;
}
}

// Send the record batches
if let Some(block) = self.record_batch_blocks.next() {
self.block_to_raw_message(block).map(Some)
} else {
self.finished = true;
Ok(None)
}
}
}

#[cfg(feature = "io_flight")]
impl<R: Read + Seek> Iterator for FlightFileReader<R> {
type Item = PolarsResult<IPCRawMessage>;

fn next(&mut self) -> Option<Self::Item> {
self.maybe_next().transpose()
}
}

0 comments on commit f31e799

Please sign in to comment.