Skip to content

Commit 1ae9434

Browse files
authored
time: revert "use sharding for timer implementation" related changes (#7226)
The work on sharding the timer implementation has caused a measurable performance regression due to increased contention. This patch reverts the current work on sharding. The next step will be to work on a per-worker timer wheel.
1 parent 8895bba commit 1ae9434

File tree

11 files changed

+70
-212
lines changed

11 files changed

+70
-212
lines changed

tokio/src/loom/mocked.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,6 @@ pub(crate) mod sync {
2424
pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
2525
self.0.try_lock().ok()
2626
}
27-
28-
#[inline]
29-
pub(crate) fn get_mut(&mut self) -> &mut T {
30-
self.0.get_mut().unwrap()
31-
}
3227
}
3328

3429
#[derive(Debug)]

tokio/src/loom/std/mutex.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,4 @@ impl<T> Mutex<T> {
3333
Err(TryLockError::WouldBlock) => None,
3434
}
3535
}
36-
37-
#[inline]
38-
pub(crate) fn get_mut(&mut self) -> &mut T {
39-
match self.0.get_mut() {
40-
Ok(val) => val,
41-
Err(p_err) => p_err.into_inner(),
42-
}
43-
}
4436
}

tokio/src/runtime/builder.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -924,7 +924,7 @@ impl Builder {
924924
}
925925
}
926926

927-
fn get_cfg(&self, workers: usize) -> driver::Cfg {
927+
fn get_cfg(&self) -> driver::Cfg {
928928
driver::Cfg {
929929
enable_pause_time: match self.kind {
930930
Kind::CurrentThread => true,
@@ -935,7 +935,6 @@ impl Builder {
935935
enable_time: self.enable_time,
936936
start_paused: self.start_paused,
937937
nevents: self.nevents,
938-
workers,
939938
}
940939
}
941940

@@ -1453,7 +1452,7 @@ impl Builder {
14531452
use crate::runtime::scheduler;
14541453
use crate::runtime::Config;
14551454

1456-
let (driver, driver_handle) = driver::Driver::new(self.get_cfg(1))?;
1455+
let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
14571456

14581457
// Blocking pool
14591458
let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
@@ -1608,7 +1607,7 @@ cfg_rt_multi_thread! {
16081607

16091608
let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
16101609

1611-
let (driver, driver_handle) = driver::Driver::new(self.get_cfg(worker_threads))?;
1610+
let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
16121611

16131612
// Create the blocking pool
16141613
let blocking_pool =

tokio/src/runtime/context.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::task::coop;
33

44
use std::cell::Cell;
55

6-
#[cfg(any(feature = "rt", feature = "macros", feature = "time"))]
6+
#[cfg(any(feature = "rt", feature = "macros"))]
77
use crate::util::rand::FastRand;
88

99
cfg_rt! {
@@ -57,7 +57,7 @@ struct Context {
5757
#[cfg(feature = "rt")]
5858
runtime: Cell<EnterRuntime>,
5959

60-
#[cfg(any(feature = "rt", feature = "macros", feature = "time"))]
60+
#[cfg(any(feature = "rt", feature = "macros"))]
6161
rng: Cell<Option<FastRand>>,
6262

6363
/// Tracks the amount of "work" a task may still do before yielding back to
@@ -100,7 +100,7 @@ tokio_thread_local! {
100100
#[cfg(feature = "rt")]
101101
runtime: Cell::new(EnterRuntime::NotEntered),
102102

103-
#[cfg(any(feature = "rt", feature = "macros", feature = "time"))]
103+
#[cfg(any(feature = "rt", feature = "macros"))]
104104
rng: Cell::new(None),
105105

106106
budget: Cell::new(coop::Budget::unconstrained()),
@@ -121,11 +121,7 @@ tokio_thread_local! {
121121
}
122122
}
123123

124-
#[cfg(any(
125-
feature = "time",
126-
feature = "macros",
127-
all(feature = "sync", feature = "rt")
128-
))]
124+
#[cfg(any(feature = "macros", all(feature = "sync", feature = "rt")))]
129125
pub(crate) fn thread_rng_n(n: u32) -> u32 {
130126
CONTEXT.with(|ctx| {
131127
let mut rng = ctx.rng.get().unwrap_or_else(FastRand::new);

tokio/src/runtime/driver.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ pub(crate) struct Cfg {
4040
pub(crate) enable_pause_time: bool,
4141
pub(crate) start_paused: bool,
4242
pub(crate) nevents: usize,
43-
pub(crate) workers: usize,
4443
}
4544

4645
impl Driver {
@@ -49,8 +48,7 @@ impl Driver {
4948

5049
let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
5150

52-
let (time_driver, time_handle) =
53-
create_time_driver(cfg.enable_time, io_stack, &clock, cfg.workers);
51+
let (time_driver, time_handle) = create_time_driver(cfg.enable_time, io_stack, &clock);
5452

5553
Ok((
5654
Self { inner: time_driver },
@@ -297,10 +295,9 @@ cfg_time! {
297295
enable: bool,
298296
io_stack: IoStack,
299297
clock: &Clock,
300-
workers: usize,
301298
) -> (TimeDriver, TimeHandle) {
302299
if enable {
303-
let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock, workers as u32);
300+
let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock);
304301

305302
(TimeDriver::Enabled { driver }, Some(handle))
306303
} else {
@@ -346,7 +343,6 @@ cfg_not_time! {
346343
_enable: bool,
347344
io_stack: IoStack,
348345
_clock: &Clock,
349-
_workers: usize,
350346
) -> (TimeDriver, TimeHandle) {
351347
(io_stack, ())
352348
}

tokio/src/runtime/scheduler/multi_thread/worker.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -790,11 +790,6 @@ impl Context {
790790
self.defer.defer(waker);
791791
}
792792
}
793-
794-
#[allow(dead_code)]
795-
pub(crate) fn get_worker_index(&self) -> usize {
796-
self.worker.index
797-
}
798793
}
799794

800795
impl Core {

tokio/src/runtime/time/entry.rs

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ use crate::loom::cell::UnsafeCell;
5858
use crate::loom::sync::atomic::AtomicU64;
5959
use crate::loom::sync::atomic::Ordering;
6060

61-
use crate::runtime::context;
6261
use crate::runtime::scheduler;
6362
use crate::sync::AtomicWaker;
6463
use crate::time::Instant;
@@ -329,8 +328,6 @@ pub(super) type EntryList = crate::util::linked_list::LinkedList<TimerShared, Ti
329328
///
330329
/// Note that this structure is located inside the `TimerEntry` structure.
331330
pub(crate) struct TimerShared {
332-
/// The shard id. We should never change it.
333-
shard_id: u32,
334331
/// A link within the doubly-linked list of timers on a particular level and
335332
/// slot. Valid only if state is equal to Registered.
336333
///
@@ -371,9 +368,8 @@ generate_addr_of_methods! {
371368
}
372369

373370
impl TimerShared {
374-
pub(super) fn new(shard_id: u32) -> Self {
371+
pub(super) fn new() -> Self {
375372
Self {
376-
shard_id,
377373
cached_when: AtomicU64::new(0),
378374
pointers: linked_list::Pointers::new(),
379375
state: StateCell::default(),
@@ -442,11 +438,6 @@ impl TimerShared {
442438
pub(super) fn might_be_registered(&self) -> bool {
443439
self.state.might_be_registered()
444440
}
445-
446-
/// Gets the shard id.
447-
pub(super) fn shard_id(&self) -> u32 {
448-
self.shard_id
449-
}
450441
}
451442

452443
unsafe impl linked_list::Link for TimerShared {
@@ -494,10 +485,8 @@ impl TimerEntry {
494485
fn inner(&self) -> &TimerShared {
495486
let inner = unsafe { &*self.inner.get() };
496487
if inner.is_none() {
497-
let shard_size = self.driver.driver().time().inner.get_shard_size();
498-
let shard_id = generate_shard_id(shard_size);
499488
unsafe {
500-
*self.inner.get() = Some(TimerShared::new(shard_id));
489+
*self.inner.get() = Some(TimerShared::new());
501490
}
502491
}
503492
return inner.as_ref().unwrap();
@@ -654,23 +643,3 @@ impl Drop for TimerEntry {
654643
unsafe { Pin::new_unchecked(self) }.as_mut().cancel();
655644
}
656645
}
657-
658-
// Generates a shard id. If current thread is a worker thread, we use its worker index as a shard id.
659-
// Otherwise, we use a random number generator to obtain the shard id.
660-
cfg_rt! {
661-
fn generate_shard_id(shard_size: u32) -> u32 {
662-
let id = context::with_scheduler(|ctx| match ctx {
663-
Some(scheduler::Context::CurrentThread(_ctx)) => 0,
664-
#[cfg(feature = "rt-multi-thread")]
665-
Some(scheduler::Context::MultiThread(ctx)) => ctx.get_worker_index() as u32,
666-
None => context::thread_rng_n(shard_size),
667-
});
668-
id % shard_size
669-
}
670-
}
671-
672-
cfg_not_rt! {
673-
fn generate_shard_id(shard_size: u32) -> u32 {
674-
context::thread_rng_n(shard_size)
675-
}
676-
}

0 commit comments

Comments
 (0)