Skip to content

Commit

Permalink
fmt/clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp committed Jun 11, 2024
1 parent 9b17b8d commit d88b0f8
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 88 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ num-traits = "0.2"
object_store = { version = "0.9", default-features = false }
once_cell = "1"
parking_lot = "0.12"
pin-project-lite = "0.2"
percent-encoding = "2.3"
pin-project-lite = "0.2"
pyo3 = "0.21"
rand = "0.8"
rand_distr = "0.4"
Expand All @@ -74,9 +74,9 @@ reqwest = { version = "0.11", default-features = false }
ryu = "1.0.13"
serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1"
slotmap = "1"
simd-json = { version = "0.13", features = ["known-key"] }
simdutf8 = "0.1.4"
slotmap = "1"
smartstring = "1"
sqlparser = "0.45"
stacker = "0.1"
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ description = "Private crate for the streaming execution engine for the Polars D

[dependencies]
atomic-waker = { workspace = true }
crossbeam-utils = { workspace = true }
crossbeam-deque = { workspace = true }
polars-utils = { workspace = true }
crossbeam-utils = { workspace = true }
parking_lot = { workspace = true }
pin-project-lite = { workspace = true }
polars-utils = { workspace = true }
rand = { workspace = true }
slotmap = { workspace = true }

Expand Down
10 changes: 5 additions & 5 deletions crates/polars-stream/src/async_primitives/distributor_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use super::task_parker::TaskParker;
/// Each [`Receiver`] has an internal buffer of `bufsize`. Thus it is possible
/// that when one [`Sender`] is exhausted some other receivers still have data
/// available.
///
///
/// The FIFO order is only guaranteed per receiver. That is, each receiver is
/// guaranteed to see a subset of the data sent by the sender in the order the
/// sender sent it in, but not necessarily contiguously.
Expand Down Expand Up @@ -104,8 +104,8 @@ pub struct Receiver<T> {
index: usize,
}

unsafe impl<T: Send> Send for Sender<T> { }
unsafe impl<T: Send> Send for Receiver<T> { }
unsafe impl<T: Send> Send for Sender<T> {}
unsafe impl<T: Send> Send for Receiver<T> {}

impl<T: Send> Sender<T> {
pub async fn send(&mut self, mut value: T) -> Result<(), T> {
Expand Down Expand Up @@ -149,7 +149,7 @@ impl<T: Send> Sender<T> {
},
Err(SendError::Closed(v)) => value = v,
}

idx += 1;
if idx >= num_receivers {
idx -= num_receivers;
Expand Down Expand Up @@ -273,4 +273,4 @@ impl<T> Drop for DistributorInner<T> {
}
}
}
}
}
4 changes: 2 additions & 2 deletions crates/polars-stream/src/async_primitives/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub mod distributor_channel;
pub mod pipe;
pub mod task_parker;
pub mod wait_group;
pub mod distributor_channel;
pub mod task_parker;
12 changes: 6 additions & 6 deletions crates/polars-stream/src/async_primitives/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ pub struct Sender<T> {
pipe: Arc<Pipe<T>>,
}

unsafe impl<T: Send> Send for Sender<T> { }
unsafe impl<T: Send> Send for Sender<T> {}

impl<T> Drop for Sender<T> {
fn drop(&mut self) {
Expand All @@ -197,7 +197,7 @@ pub struct Receiver<T> {
pipe: Arc<Pipe<T>>,
}

unsafe impl<T: Send> Send for Receiver<T> { }
unsafe impl<T: Send> Send for Receiver<T> {}

impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
Expand All @@ -212,7 +212,7 @@ pin_project! {
}
}

unsafe impl<'a, T: Send> Send for SendFuture<'a, T> { }
unsafe impl<'a, T: Send> Send for SendFuture<'a, T> {}

impl<T: Send> Sender<T> {
/// Returns a future that when awaited will send the value to the [`Receiver`].
Expand All @@ -224,7 +224,7 @@ impl<T: Send> Sender<T> {
value: Some(value),
}
}

pub fn try_send(&mut self, value: T) -> Result<(), SendError<T>> {
unsafe { self.pipe.try_send(value) }
}
Expand All @@ -249,7 +249,7 @@ pin_project! {
}
}

unsafe impl<'a, T: Send> Send for RecvFuture<'a, T> { }
unsafe impl<'a, T: Send> Send for RecvFuture<'a, T> {}

impl<T: Send> Receiver<T> {
/// Returns a future that when awaited will return `Ok(value)` once the
Expand All @@ -262,7 +262,7 @@ impl<T: Send> Receiver<T> {
done: false,
}
}

pub fn try_recv(&mut self) -> Result<T, RecvError> {
unsafe { self.pipe.try_recv() }
}
Expand Down
22 changes: 12 additions & 10 deletions crates/polars-stream/src/async_primitives/wait_group.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use std::future::Future;

use parking_lot::Mutex;


#[derive(Default)]
struct WaitGroupInner {
waker: Mutex<Option<Waker>>,
Expand All @@ -16,18 +15,20 @@ struct WaitGroupInner {

#[derive(Default)]
pub struct WaitGroup {
inner: Arc<WaitGroupInner>
inner: Arc<WaitGroupInner>,
}

impl WaitGroup {
/// Creates a token.
pub fn token(&self) -> WaitToken {
self.inner.token_count.fetch_add(1, Ordering::Relaxed);
WaitToken { inner: Arc::clone(&self.inner) }
WaitToken {
inner: Arc::clone(&self.inner),
}
}

/// Waits until all created tokens are dropped.
///
///
/// # Panics
/// Panics if there is more than one simultaneous waiter.
pub async fn wait(&self) {
Expand Down Expand Up @@ -67,15 +68,16 @@ impl<'a> Drop for WaitGroupFuture<'a> {
}
}


pub struct WaitToken {
inner: Arc<WaitGroupInner>
inner: Arc<WaitGroupInner>,
}

impl Clone for WaitToken {
fn clone(&self) -> Self {
self.inner.token_count.fetch_add(1, Ordering::Relaxed);
Self { inner: self.inner.clone() }
Self {
inner: self.inner.clone(),
}
}
}

Expand All @@ -88,4 +90,4 @@ impl Drop for WaitToken {
}
}
}
}
}
66 changes: 32 additions & 34 deletions crates/polars-stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod task;
mod park_group;
mod task;

use std::cell::{Cell, UnsafeCell};
use std::future::Future;
Expand All @@ -10,13 +10,12 @@ use std::sync::{Arc, OnceLock, Weak};

use crossbeam_deque::{Injector, Steal, Stealer, Worker as WorkQueue};
use crossbeam_utils::CachePadded;
use park_group::ParkGroup;
use parking_lot::Mutex;
use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng};
use slotmap::SlotMap;

use task::{CancelHandle, JoinHandle, Runnable};
use park_group::ParkGroup;

static NUM_EXECUTOR_THREADS: AtomicUsize = AtomicUsize::new(0);
pub fn set_num_threads(t: usize) {
Expand Down Expand Up @@ -76,7 +75,7 @@ impl Executor {
fn schedule_task(&self, task: ReadyTask) {
let thread = TLS_THREAD_ID.get();
let priority = task.metadata().priority;
if let Some(ttl) = self.thread_task_lists.get(thread as usize) {
if let Some(ttl) = self.thread_task_lists.get(thread) {
// SAFETY: this slot may only be accessed from the local thread, which we are.
let slot = unsafe { &mut *ttl.local_slot.get() };

Expand All @@ -93,7 +92,7 @@ impl Executor {
} else {
// Optimization: while this is a low priority task we have no
// high priority tasks on this thread so we'll execute this one.
if ttl.high_prio_tasks.len() == 0 && slot.is_none() {
if ttl.high_prio_tasks.is_empty() && slot.is_none() {
*slot = Some(task);
} else {
self.global_low_prio_task_queue.push(task);
Expand Down Expand Up @@ -130,7 +129,7 @@ impl Executor {
}

// Try to steal tasks.
let ttl = &self.thread_task_lists[thread as usize];
let ttl = &self.thread_task_lists[thread];
for _ in 0..4 {
let mut retry = true;
while retry {
Expand Down Expand Up @@ -162,7 +161,7 @@ impl Executor {
let mut worker = self.park_group.new_worker();

loop {
let ttl = &self.thread_task_lists[thread as usize];
let ttl = &self.thread_task_lists[thread];
let task = (|| {
// Try to get a task from LIFO slot.
if let Some(task) = unsafe { (*ttl.local_slot.get()).take() } {
Expand All @@ -187,22 +186,24 @@ impl Executor {
park.park();
None
})();

if let Some(task) = task {
worker.recruit_next();
task.run();
}
}
}

fn global() -> &'static Executor {
GLOBAL_SCHEDULER.get_or_init(|| {
let mut n_threads = NUM_EXECUTOR_THREADS.load(Ordering::Relaxed);
if n_threads == 0 {
n_threads = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(4);
n_threads = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
}

let thread_task_lists = (0..n_threads.try_into().unwrap())
let thread_task_lists = (0..n_threads)
.map(|t| {
std::thread::spawn(move || Self::global().runner(t));

Expand Down Expand Up @@ -241,7 +242,7 @@ impl<'scope, 'env> TaskScope<'scope, 'env> {
t.cancel();
}
}

fn clear_completed_tasks(&self) {
let mut cancel_handles = self.cancel_handles.lock();
for t in self.completed_tasks.lock().drain(..) {
Expand All @@ -261,31 +262,28 @@ impl<'scope, 'env> TaskScope<'scope, 'env> {

let mut runnable = None;
let mut join_handle = None;
self.cancel_handles
.lock()
.insert_with_key(|task_key| {
let (run, jh) = unsafe {
// SAFETY: we make sure to cancel this task before 'scope ends.
let executor = Executor::global();
task::spawn_with_lifetime(
fut,
move |task| executor.schedule_task(task),
TaskMetadata {
task_key,
priority,
completed_tasks: Arc::downgrade(&self.completed_tasks),
},
)
};
let cancel_handle = jh.cancel_handle();
runnable = Some(run);
join_handle = Some(jh);
cancel_handle
});
self.cancel_handles.lock().insert_with_key(|task_key| {
let (run, jh) = unsafe {
// SAFETY: we make sure to cancel this task before 'scope ends.
let executor = Executor::global();
task::spawn_with_lifetime(
fut,
move |task| executor.schedule_task(task),
TaskMetadata {
task_key,
priority,
completed_tasks: Arc::downgrade(&self.completed_tasks),
},
)
};
let cancel_handle = jh.cancel_handle();
runnable = Some(run);
join_handle = Some(jh);
cancel_handle
});
runnable.unwrap().schedule();
join_handle.unwrap()
}

}

pub fn task_scope<'env, F, T>(f: F) -> T
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-stream/src/executor/park_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl ParkGroupWorker {
pub fn prepare_park(&mut self) -> ParkAttempt<'_> {
let mut state = self.inner.state.load(Ordering::SeqCst);
self.version = state_version(state);

// If the version changes or someone else has set the
// PREPARING_TO_PARK_BIT, stop trying to update the state.
while state & PREPARING_TO_PARK_BIT == 0 && state_version(state) == self.version {
Expand Down
Loading

0 comments on commit d88b0f8

Please sign in to comment.