diff --git a/crates/polars-stream/src/nodes/io_sources/ipc.rs b/crates/polars-stream/src/nodes/io_sources/ipc.rs index f90faae40612..3a83c8e3132c 100644 --- a/crates/polars-stream/src/nodes/io_sources/ipc.rs +++ b/crates/polars-stream/src/nodes/io_sources/ipc.rs @@ -37,9 +37,8 @@ use crate::{DEFAULT_DISTRIBUTOR_BUFFER_SIZE, DEFAULT_LINEARIZER_BUFFER_SIZE}; const ROW_COUNT_OVERFLOW_ERR: PolarsError = PolarsError::ComputeError(ErrString::new_static( "\ -IPC file produces more than pow(2, 32) rows; \ -consider compiling with polars-bigidx feature (polars-u64-idx package on python), \ -or set 'streaming'", +IPC file produces more than 2^32 rows; \ +consider compiling with polars-bigidx feature (polars-u64-idx package on python)", )); pub struct IpcSourceNode { @@ -64,7 +63,7 @@ pub struct IpcSourceNodeConfig { } pub struct IpcSourceNodeState { - seq: u64, + morsel_seq: u64, row_idx_offset: IdxSize, slice: Range, @@ -131,7 +130,7 @@ impl IpcSourceNode { .map(|p| prepare_projection(&first_metadata.schema, p.clone())); let state = IpcSourceNodeState { - seq: 0, + morsel_seq: 0, row_idx_offset: row_index.as_ref().map_or(0, |ri| ri.offset), // Always create a slice. If no slice was given, just make the biggest slice possible. @@ -218,15 +217,15 @@ impl ComputeNode for IpcSourceNode { fn spawn<'env, 's>( &'env mut self, scope: &'s TaskScope<'s, 'env>, - recv: &mut [Option>], - send: &mut [Option>], + recv_ports: &mut [Option>], + send_ports: &mut [Option>], _state: &'s ExecutionState, join_handles: &mut Vec>>, ) { - assert!(recv.is_empty()); - assert_eq!(send.len(), 1); + assert!(recv_ports.is_empty()); + assert_eq!(send_ports.len(), 1); - // Split size for morsels + // Split size for morsels. let max_morsel_size = get_max_morsel_size(); let source_token = SourceToken::new(); @@ -235,7 +234,7 @@ impl ComputeNode for IpcSourceNode { let sources = &self.sources; let state = &mut self.state; - /// Messages sent from Walker task to Decoder tasks + /// Messages sent from Walker task to Decoder tasks. struct BatchMessage { memslice: Arc, metadata: Arc, @@ -243,19 +242,19 @@ impl ComputeNode for IpcSourceNode { row_idx_offset: IdxSize, slice: Range, block_range: Range, - seq: u64, + morsel_seq_base: u64, } - // Walker task -> Decoder tasks + // Walker task -> Decoder tasks. let (mut batch_tx, batch_rxs) = distributor_channel::(num_pipelines, DEFAULT_DISTRIBUTOR_BUFFER_SIZE); - // Decoder tasks -> Distributor task + // Decoder tasks -> Distributor task. let (mut decoded_rx, decoded_tx) = Linearizer::, Morsel>>::new( num_pipelines, DEFAULT_LINEARIZER_BUFFER_SIZE, ); - // Distributor task -> ... - let mut sender = send[0].take().unwrap().serial(); + // Distributor task -> output. + let mut sender = send_ports[0].take().unwrap().serial(); // Distributor task. // @@ -265,7 +264,7 @@ impl ComputeNode for IpcSourceNode { // morsels at the same time. At the same time, other decoders might not produce anything. // Therefore, we would like to distribute the output of a single decoder task over the // available output pipelines. - join_handles.push(scope.spawn_task(TaskPriority::Low, async move { + join_handles.push(scope.spawn_task(TaskPriority::High, async move { while let Some(morsel) = decoded_rx.get().await { if sender.send(morsel.1).await.is_err() { break; @@ -284,7 +283,7 @@ impl ComputeNode for IpcSourceNode { .map(|(mut send, mut rx)| { let source_token = source_token.clone(); scope.spawn_task(TaskPriority::Low, async move { - // Amortize allocations + // Amortize allocations. let mut data_scratch = Vec::new(); let mut message_scratch = Vec::new(); let mut projection_info = config.projection_info.clone(); @@ -302,11 +301,10 @@ impl ComputeNode for IpcSourceNode { file_path, row_idx_offset, slice, - seq, + morsel_seq_base, block_range, } = m; - let mut reader = FileReader::new_with_projection_info( Cursor::new(source.as_ref()), metadata.as_ref().clone(), @@ -352,7 +350,7 @@ impl ComputeNode for IpcSourceNode { } for i in 0..df.height().div_ceil(max_morsel_size) { let morsel = df.slice((i * max_morsel_size) as i64, max_morsel_size); - let seq = MorselSeq::new(seq + i as u64); + let seq = MorselSeq::new(morsel_seq_base + i as u64); let morsel = Morsel::new( morsel, seq, @@ -490,15 +488,15 @@ impl ComputeNode for IpcSourceNode { let current_block = reader.get_current_block(); - // Subdivide into batches for large files + // Subdivide into batches for large files. is_batch_complete |= batch.num_rows >= batch_size_limit; - // Subdivide into batches if the file is sliced + // Subdivide into batches if the file is sliced. is_batch_complete |= batch.num_rows >= sliced_batch_size_limit; - // Subdivide into batches for small files + // Subdivide into batches for small files. is_batch_complete |= current_block - batch.block_start >= batch_block_limit; // Batch blocks such that we send appropriately sized morsels. We guarantee a - // lower bound here, but not an upperbound. + // lower bound here, but not an upper bound. if is_batch_complete { let batch_slice = slice_take(&mut uncommitted_slice, batch.num_rows); let batch_slice_len = batch_slice.len(); @@ -510,14 +508,17 @@ impl ComputeNode for IpcSourceNode { file_path: source.file_path.clone(), row_idx_offset: batch.row_idx_offset, slice: batch_slice, - seq: state.seq, + morsel_seq_base: state.morsel_seq, block_range, }; if source_token.stop_requested() { break 'source_loop; } + if batch_tx.send(message).await.is_err() { + // This should only happen if the receiver of the decoder + // has broken off, meaning no further input will be needed. break 'source_loop; } @@ -525,7 +526,7 @@ impl ComputeNode for IpcSourceNode { // Now, we know that the a decoder will process it. // // This might generate several morsels if the record batch is very large. - state.seq += batch_slice_len.div_ceil(max_morsel_size) as u64; + state.morsel_seq += batch_slice_len.div_ceil(max_morsel_size) as u64; state.slice = uncommitted_slice.clone(); state.row_idx_offset = uncommitted_row_idx_offset; source.block_offset = current_block; @@ -545,7 +546,7 @@ impl ComputeNode for IpcSourceNode { state.source_idx += 1; } - drop(batch_tx); // Inform decoder tasks to stop + drop(batch_tx); // Inform decoder tasks to stop. for decoder_task in decoder_tasks { decoder_task.await?; }