Skip to content

Commit

Permalink
Some review reformatting.
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp authored and coastalwhite committed Nov 13, 2024
1 parent a3deeb9 commit 8ff9766
Showing 1 changed file with 29 additions and 28 deletions.
57 changes: 29 additions & 28 deletions crates/polars-stream/src/nodes/io_sources/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ use crate::{DEFAULT_DISTRIBUTOR_BUFFER_SIZE, DEFAULT_LINEARIZER_BUFFER_SIZE};

const ROW_COUNT_OVERFLOW_ERR: PolarsError = PolarsError::ComputeError(ErrString::new_static(
"\
IPC file produces more than pow(2, 32) rows; \
consider compiling with polars-bigidx feature (polars-u64-idx package on python), \
or set 'streaming'",
IPC file produces more than 2^32 rows; \
consider compiling with polars-bigidx feature (polars-u64-idx package on python)",
));

pub struct IpcSourceNode {
Expand All @@ -64,7 +63,7 @@ pub struct IpcSourceNodeConfig {
}

pub struct IpcSourceNodeState {
seq: u64,
morsel_seq: u64,
row_idx_offset: IdxSize,

slice: Range<usize>,
Expand Down Expand Up @@ -131,7 +130,7 @@ impl IpcSourceNode {
.map(|p| prepare_projection(&first_metadata.schema, p.clone()));

let state = IpcSourceNodeState {
seq: 0,
morsel_seq: 0,
row_idx_offset: row_index.as_ref().map_or(0, |ri| ri.offset),

// Always create a slice. If no slice was given, just make the biggest slice possible.
Expand Down Expand Up @@ -218,15 +217,15 @@ impl ComputeNode for IpcSourceNode {
fn spawn<'env, 's>(
&'env mut self,
scope: &'s TaskScope<'s, 'env>,
recv: &mut [Option<RecvPort<'_>>],
send: &mut [Option<SendPort<'_>>],
recv_ports: &mut [Option<RecvPort<'_>>],
send_ports: &mut [Option<SendPort<'_>>],
_state: &'s ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
assert!(recv.is_empty());
assert_eq!(send.len(), 1);
assert!(recv_ports.is_empty());
assert_eq!(send_ports.len(), 1);

// Split size for morsels
// Split size for morsels.
let max_morsel_size = get_max_morsel_size();
let source_token = SourceToken::new();

Expand All @@ -235,27 +234,27 @@ impl ComputeNode for IpcSourceNode {
let sources = &self.sources;
let state = &mut self.state;

/// Messages sent from Walker task to Decoder tasks
/// Messages sent from Walker task to Decoder tasks.
struct BatchMessage {
memslice: Arc<MemSlice>,
metadata: Arc<FileMetadata>,
file_path: Option<Arc<str>>,
row_idx_offset: IdxSize,
slice: Range<usize>,
block_range: Range<usize>,
seq: u64,
morsel_seq_base: u64,
}

// Walker task -> Decoder tasks
// Walker task -> Decoder tasks.
let (mut batch_tx, batch_rxs) =
distributor_channel::<BatchMessage>(num_pipelines, DEFAULT_DISTRIBUTOR_BUFFER_SIZE);
// Decoder tasks -> Distributor task
// Decoder tasks -> Distributor task.
let (mut decoded_rx, decoded_tx) = Linearizer::<Priority<Reverse<MorselSeq>, Morsel>>::new(
num_pipelines,
DEFAULT_LINEARIZER_BUFFER_SIZE,
);
// Distributor task -> ...
let mut sender = send[0].take().unwrap().serial();
// Distributor task -> output.
let mut sender = send_ports[0].take().unwrap().serial();

// Distributor task.
//
Expand All @@ -265,7 +264,7 @@ impl ComputeNode for IpcSourceNode {
// morsels at the same time. At the same time, other decoders might not produce anything.
// Therefore, we would like to distribute the output of a single decoder task over the
// available output pipelines.
join_handles.push(scope.spawn_task(TaskPriority::Low, async move {
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
while let Some(morsel) = decoded_rx.get().await {
if sender.send(morsel.1).await.is_err() {
break;
Expand All @@ -284,7 +283,7 @@ impl ComputeNode for IpcSourceNode {
.map(|(mut send, mut rx)| {
let source_token = source_token.clone();
scope.spawn_task(TaskPriority::Low, async move {
// Amortize allocations
// Amortize allocations.
let mut data_scratch = Vec::new();
let mut message_scratch = Vec::new();
let mut projection_info = config.projection_info.clone();
Expand All @@ -302,11 +301,10 @@ impl ComputeNode for IpcSourceNode {
file_path,
row_idx_offset,
slice,
seq,
morsel_seq_base,
block_range,
} = m;


let mut reader = FileReader::new_with_projection_info(
Cursor::new(source.as_ref()),
metadata.as_ref().clone(),
Expand Down Expand Up @@ -352,7 +350,7 @@ impl ComputeNode for IpcSourceNode {
}
for i in 0..df.height().div_ceil(max_morsel_size) {
let morsel = df.slice((i * max_morsel_size) as i64, max_morsel_size);
let seq = MorselSeq::new(seq + i as u64);
let seq = MorselSeq::new(morsel_seq_base + i as u64);
let morsel = Morsel::new(
morsel,
seq,
Expand Down Expand Up @@ -490,15 +488,15 @@ impl ComputeNode for IpcSourceNode {

let current_block = reader.get_current_block();

// Subdivide into batches for large files
// Subdivide into batches for large files.
is_batch_complete |= batch.num_rows >= batch_size_limit;
// Subdivide into batches if the file is sliced
// Subdivide into batches if the file is sliced.
is_batch_complete |= batch.num_rows >= sliced_batch_size_limit;
// Subdivide into batches for small files
// Subdivide into batches for small files.
is_batch_complete |= current_block - batch.block_start >= batch_block_limit;

// Batch blocks such that we send appropriately sized morsels. We guarantee a
// lower bound here, but not an upperbound.
// lower bound here, but not an upper bound.
if is_batch_complete {
let batch_slice = slice_take(&mut uncommitted_slice, batch.num_rows);
let batch_slice_len = batch_slice.len();
Expand All @@ -510,22 +508,25 @@ impl ComputeNode for IpcSourceNode {
file_path: source.file_path.clone(),
row_idx_offset: batch.row_idx_offset,
slice: batch_slice,
seq: state.seq,
morsel_seq_base: state.morsel_seq,
block_range,
};

if source_token.stop_requested() {
break 'source_loop;
}

if batch_tx.send(message).await.is_err() {
// This should only happen if the receiver of the decoder
// has broken off, meaning no further input will be needed.
break 'source_loop;
}

// Commit the changes to the state.
// Now, we know that the a decoder will process it.
//
// This might generate several morsels if the record batch is very large.
state.seq += batch_slice_len.div_ceil(max_morsel_size) as u64;
state.morsel_seq += batch_slice_len.div_ceil(max_morsel_size) as u64;
state.slice = uncommitted_slice.clone();
state.row_idx_offset = uncommitted_row_idx_offset;
source.block_offset = current_block;
Expand All @@ -545,7 +546,7 @@ impl ComputeNode for IpcSourceNode {
state.source_idx += 1;
}

drop(batch_tx); // Inform decoder tasks to stop
drop(batch_tx); // Inform decoder tasks to stop.
for decoder_task in decoder_tasks {
decoder_task.await?;
}
Expand Down

0 comments on commit 8ff9766

Please sign in to comment.