From a3deeb90a9bb8b42be80642b2bdf8df0e77d6090 Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Wed, 6 Nov 2024 14:31:28 +0100 Subject: [PATCH] properly fix deadlock this time --- .../polars-stream/src/nodes/io_sources/ipc.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/crates/polars-stream/src/nodes/io_sources/ipc.rs b/crates/polars-stream/src/nodes/io_sources/ipc.rs index fdf755932806..f90faae40612 100644 --- a/crates/polars-stream/src/nodes/io_sources/ipc.rs +++ b/crates/polars-stream/src/nodes/io_sources/ipc.rs @@ -1,3 +1,4 @@ +use std::cmp::Reverse; use std::io::Cursor; use std::ops::Range; use std::sync::Arc; @@ -22,15 +23,16 @@ use polars_plan::plans::{FileInfo, ScanSources}; use polars_plan::prelude::FileScanOptions; use polars_utils::mmap::MemSlice; use polars_utils::pl_str::PlSmallStr; +use polars_utils::priority::Priority; use polars_utils::IdxSize; use crate::async_primitives::distributor_channel::distributor_channel; +use crate::async_primitives::linearizer::Linearizer; use crate::morsel::{get_ideal_morsel_size, SourceToken}; use crate::nodes::{ ComputeNode, JoinHandle, Morsel, MorselSeq, PortState, TaskPriority, TaskScope, }; use crate::pipe::{RecvPort, SendPort}; -use crate::utils::linearizer::Linearizer; use crate::{DEFAULT_DISTRIBUTOR_BUFFER_SIZE, DEFAULT_LINEARIZER_BUFFER_SIZE}; const ROW_COUNT_OVERFLOW_ERR: PolarsError = PolarsError::ComputeError(ErrString::new_static( @@ -202,7 +204,7 @@ impl ComputeNode for IpcSourceNode { assert!(recv.is_empty()); assert_eq!(send.len(), 1); - if self.state.source_idx >= self.sources.len() { + if self.state.slice.is_empty() || self.state.source_idx >= self.sources.len() { send[0] = PortState::Done; } @@ -248,8 +250,10 @@ impl ComputeNode for IpcSourceNode { let (mut batch_tx, batch_rxs) = distributor_channel::(num_pipelines, DEFAULT_DISTRIBUTOR_BUFFER_SIZE); // Decoder tasks -> Distributor task - let (mut decoded_rx, decoded_tx) = - Linearizer::new(num_pipelines, DEFAULT_LINEARIZER_BUFFER_SIZE); + let (mut decoded_rx, decoded_tx) = Linearizer::, Morsel>>::new( + num_pipelines, + DEFAULT_LINEARIZER_BUFFER_SIZE, + ); // Distributor task -> ... let mut sender = send[0].take().unwrap().serial(); @@ -263,7 +267,7 @@ impl ComputeNode for IpcSourceNode { // available output pipelines. join_handles.push(scope.spawn_task(TaskPriority::Low, async move { while let Some(morsel) = decoded_rx.get().await { - if sender.send(morsel).await.is_err() { + if sender.send(morsel.1).await.is_err() { break; } } @@ -348,12 +352,13 @@ impl ComputeNode for IpcSourceNode { } for i in 0..df.height().div_ceil(max_morsel_size) { let morsel = df.slice((i * max_morsel_size) as i64, max_morsel_size); + let seq = MorselSeq::new(seq + i as u64); let morsel = Morsel::new( morsel, - MorselSeq::new(seq + i as u64), + seq, source_token.clone(), ); - if send.insert(morsel).await.is_err() { + if send.insert(Priority(Reverse(seq), morsel)).await.is_err() { break; } }