Skip to content

Commit

Permalink
refactor(rust): Remove SyncCounter (#19556)
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp authored Nov 1, 2024
1 parent 992128d commit 7c2f31e
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 79 deletions.
33 changes: 8 additions & 25 deletions crates/polars-pipe/src/executors/sinks/slice.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,31 @@
use std::any::Any;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};

use polars_core::error::PolarsResult;
use polars_core::frame::DataFrame;
use polars_core::schema::SchemaRef;
use polars_utils::atomic::SyncCounter;

use crate::operators::{
chunks_to_df_unchecked, DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult,
};

#[derive(Clone)]
// Ensure the data is return in the order it was streamed
pub struct SliceSink {
offset: SyncCounter,
current_len: SyncCounter,
offset: Arc<AtomicUsize>,
current_len: Arc<AtomicUsize>,
len: usize,
chunks: Arc<Mutex<Vec<DataChunk>>>,
schema: SchemaRef,
}

impl Clone for SliceSink {
fn clone(&self) -> Self {
Self {
offset: self.offset.clone(),
current_len: self.current_len.clone(),
len: self.len,
chunks: self.chunks.clone(),
schema: self.schema.clone(),
}
}
}

impl SliceSink {
pub fn new(offset: u64, len: usize, schema: SchemaRef) -> SliceSink {
let offset = SyncCounter::new(offset as usize);
let offset = Arc::new(AtomicUsize::new(offset as usize));
SliceSink {
offset,
current_len: SyncCounter::new(0),
current_len: Arc::default(),
len,
chunks: Default::default(),
schema,
Expand Down Expand Up @@ -101,18 +89,13 @@ impl Sink for SliceSink {

let df = chunks_to_df_unchecked(chunks);
let offset = self.offset.load(Ordering::Acquire) as i64;

// drop the counters
unsafe {
self.offset.manual_drop();
self.current_len.manual_drop();
}

Ok(FinalizedSink::Finished(df.slice(offset, self.len)))
}

fn as_any(&mut self) -> &mut dyn Any {
self
}

fn fmt(&self) -> &str {
"slice_sink"
}
Expand Down
53 changes: 0 additions & 53 deletions crates/polars-utils/src/atomic.rs

This file was deleted.

1 change: 0 additions & 1 deletion crates/polars-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
pub mod abs_diff;
pub mod algebraic_ops;
pub mod arena;
pub mod atomic;
pub mod binary_search;
pub mod cache;
pub mod cardinality_sketch;
Expand Down

0 comments on commit 7c2f31e

Please sign in to comment.