Skip to content

Commit

Permalink
refactor(rust): Share source token between all sender tasks of source…
Browse files Browse the repository at this point in the history
… nodes in new-streaming engine (#19593)
  • Loading branch information
orlp authored Nov 1, 2024
1 parent 26c422f commit 1d5c640
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 3 deletions.
3 changes: 2 additions & 1 deletion crates/polars-stream/src/nodes/in_memory_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,12 @@ impl ComputeNode for InMemorySourceNode {
let source = self.source.as_ref().unwrap();

// TODO: can this just be serial, using the work distributor?
let source_token = SourceToken::new();
for mut send in senders {
let slf = &*self;
let source_token = source_token.clone();
join_handles.push(scope.spawn_task(TaskPriority::Low, async move {
let wait_group = WaitGroup::default();
let source_token = SourceToken::new();
loop {
let seq = slf.seq.fetch_add(1, Ordering::Relaxed);
let offset = (seq as usize * slf.morsel_size) as i64;
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-stream/src/nodes/parquet_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,14 +225,14 @@ impl ComputeNode for ParquetSourceNode {
}
let is_finished = self.is_finished.clone();

let source_token = SourceToken::new();
let task_handles = raw_morsel_receivers
.drain(..)
.zip(morsel_senders)
.map(|(mut raw_morsel_rx, mut morsel_tx)| {
let is_finished = is_finished.clone();

let source_token = source_token.clone();
scope.spawn_task(TaskPriority::Low, async move {
let source_token = SourceToken::new();
loop {
let Ok((df, morsel_seq, wait_token)) = raw_morsel_rx.recv().await else {
is_finished.store(true, Ordering::Relaxed);
Expand Down

0 comments on commit 1d5c640

Please sign in to comment.