Skip to content

Commit

Permalink
properly fix deadlock this time
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Nov 13, 2024
1 parent 49b1ee8 commit a3deeb9
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions crates/polars-stream/src/nodes/io_sources/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cmp::Reverse;
use std::io::Cursor;
use std::ops::Range;
use std::sync::Arc;
Expand All @@ -22,15 +23,16 @@ use polars_plan::plans::{FileInfo, ScanSources};
use polars_plan::prelude::FileScanOptions;
use polars_utils::mmap::MemSlice;
use polars_utils::pl_str::PlSmallStr;
use polars_utils::priority::Priority;
use polars_utils::IdxSize;

use crate::async_primitives::distributor_channel::distributor_channel;
use crate::async_primitives::linearizer::Linearizer;
use crate::morsel::{get_ideal_morsel_size, SourceToken};
use crate::nodes::{
ComputeNode, JoinHandle, Morsel, MorselSeq, PortState, TaskPriority, TaskScope,
};
use crate::pipe::{RecvPort, SendPort};
use crate::utils::linearizer::Linearizer;
use crate::{DEFAULT_DISTRIBUTOR_BUFFER_SIZE, DEFAULT_LINEARIZER_BUFFER_SIZE};

const ROW_COUNT_OVERFLOW_ERR: PolarsError = PolarsError::ComputeError(ErrString::new_static(
Expand Down Expand Up @@ -202,7 +204,7 @@ impl ComputeNode for IpcSourceNode {
assert!(recv.is_empty());
assert_eq!(send.len(), 1);

if self.state.source_idx >= self.sources.len() {
if self.state.slice.is_empty() || self.state.source_idx >= self.sources.len() {
send[0] = PortState::Done;
}

Expand Down Expand Up @@ -248,8 +250,10 @@ impl ComputeNode for IpcSourceNode {
let (mut batch_tx, batch_rxs) =
distributor_channel::<BatchMessage>(num_pipelines, DEFAULT_DISTRIBUTOR_BUFFER_SIZE);
// Decoder tasks -> Distributor task
let (mut decoded_rx, decoded_tx) =
Linearizer::new(num_pipelines, DEFAULT_LINEARIZER_BUFFER_SIZE);
let (mut decoded_rx, decoded_tx) = Linearizer::<Priority<Reverse<MorselSeq>, Morsel>>::new(
num_pipelines,
DEFAULT_LINEARIZER_BUFFER_SIZE,
);
// Distributor task -> ...
let mut sender = send[0].take().unwrap().serial();

Expand All @@ -263,7 +267,7 @@ impl ComputeNode for IpcSourceNode {
// available output pipelines.
join_handles.push(scope.spawn_task(TaskPriority::Low, async move {
while let Some(morsel) = decoded_rx.get().await {
if sender.send(morsel).await.is_err() {
if sender.send(morsel.1).await.is_err() {
break;
}
}
Expand Down Expand Up @@ -348,12 +352,13 @@ impl ComputeNode for IpcSourceNode {
}
for i in 0..df.height().div_ceil(max_morsel_size) {
let morsel = df.slice((i * max_morsel_size) as i64, max_morsel_size);
let seq = MorselSeq::new(seq + i as u64);
let morsel = Morsel::new(
morsel,
MorselSeq::new(seq + i as u64),
seq,
source_token.clone(),
);
if send.insert(morsel).await.is_err() {
if send.insert(Priority(Reverse(seq), morsel)).await.is_err() {
break;
}
}
Expand Down

0 comments on commit a3deeb9

Please sign in to comment.