Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to upstream fix for immediate event sending #6

Merged
merged 2 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/changelog/unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,19 @@ The migration guide could reference other migration examples in the current
changelog entry.

## Unreleased

### Changed

- On Web, let events wake up event loop immediately when using
`ControlFlow::Poll`.

### Fixed

- On Web, fix `EventLoopProxy::send_event()` triggering event loop immediately
when not called from inside the event loop. Now queues a microtask instead.

### Removed

- Remove `EventLoop::run`.
- Remove `EventLoopExtRunOnDemand::run_on_demand`.
- Remove `EventLoopExtPumpEvents::pump_events`.
47 changes: 27 additions & 20 deletions src/platform_impl/web/async/waker.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
use std::future;
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Poll;

use super::super::main_thread::MainThreadMarker;
use super::{AtomicWaker, Wrapper};

pub struct WakerSpawner<T: 'static>(Wrapper<Handler<T>, Sender, usize>);
pub struct WakerSpawner<T: 'static>(Wrapper<Handler<T>, Sender, NonZeroUsize>);

pub struct Waker<T: 'static>(Wrapper<Handler<T>, Sender, usize>);
pub struct Waker<T: 'static>(Wrapper<Handler<T>, Sender, NonZeroUsize>);

struct Handler<T> {
value: T,
handler: fn(&T, usize),
handler: fn(&T, NonZeroUsize, bool),
}

#[derive(Clone)]
struct Sender(Arc<Inner>);

impl<T> WakerSpawner<T> {
#[track_caller]
pub fn new(main_thread: MainThreadMarker, value: T, handler: fn(&T, usize)) -> Option<Self> {
pub fn new(
main_thread: MainThreadMarker,
value: T,
handler: fn(&T, NonZeroUsize, bool),
) -> Option<Self> {
let inner = Arc::new(Inner {
counter: AtomicUsize::new(0),
waker: AtomicWaker::new(),
Expand All @@ -37,7 +42,7 @@ impl<T> WakerSpawner<T> {
|handler, count| {
let handler = handler.borrow();
let handler = handler.as_ref().unwrap();
(handler.handler)(&handler.value, count);
(handler.handler)(&handler.value, count, true);
},
{
let inner = Arc::clone(&inner);
Expand All @@ -46,29 +51,31 @@ impl<T> WakerSpawner<T> {
while let Some(count) = future::poll_fn(|cx| {
let count = inner.counter.swap(0, Ordering::Relaxed);

if count > 0 {
Poll::Ready(Some(count))
} else {
inner.waker.register(cx.waker());
match NonZeroUsize::new(count) {
Some(count) => Poll::Ready(Some(count)),
None => {
inner.waker.register(cx.waker());

let count = inner.counter.swap(0, Ordering::Relaxed);
let count = inner.counter.swap(0, Ordering::Relaxed);

if count > 0 {
Poll::Ready(Some(count))
} else {
if inner.closed.load(Ordering::Relaxed) {
return Poll::Ready(None);
}
match NonZeroUsize::new(count) {
Some(count) => Poll::Ready(Some(count)),
None => {
if inner.closed.load(Ordering::Relaxed) {
return Poll::Ready(None);
}

Poll::Pending
}
Poll::Pending
},
}
},
}
})
.await
{
let handler = handler.borrow();
let handler = handler.as_ref().unwrap();
(handler.handler)(&handler.value, count);
(handler.handler)(&handler.value, count, false);
}
}
},
Expand Down Expand Up @@ -107,7 +114,7 @@ impl<T> Drop for WakerSpawner<T> {

impl<T> Waker<T> {
pub fn wake(&self) {
self.0.send(1)
self.0.send(NonZeroUsize::MIN)
}
}

Expand Down
8 changes: 1 addition & 7 deletions src/platform_impl/web/event_loop/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,7 @@ impl<T: 'static> EventLoopProxy<T> {

pub fn send_event(&self, event: T) -> Result<(), EventLoopClosed<T>> {
self.sender.send(event).map_err(|SendError(event)| EventLoopClosed(event))?;

// Workaround: the we should never be immediately executing new things on the event loop!
let runner = self.runner.clone();
wasm_bindgen_futures::spawn_local(async move {
runner.wake();
});

self.runner.wake();
Ok(())
}
}
Expand Down
69 changes: 55 additions & 14 deletions src/platform_impl/web/event_loop/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ use crate::platform_impl::platform::r#async::{DispatchRunner, Waker, WakerSpawne
use crate::platform_impl::platform::window::Inner;
use crate::window::WindowId;

use js_sys::Function;
use std::cell::{Cell, RefCell};
use std::collections::{HashSet, VecDeque};
use std::iter;
use std::num::NonZeroUsize;
use std::ops::Deref;
use std::rc::{Rc, Weak};
use wasm_bindgen::prelude::Closure;
use wasm_bindgen::prelude::{wasm_bindgen, Closure};
use wasm_bindgen::JsCast;
use web_sys::{Document, KeyboardEvent, PageTransitionEvent, PointerEvent, WheelEvent};
use web_time::{Duration, Instant};

Expand Down Expand Up @@ -133,12 +136,13 @@ impl Shared {
let document = window.document().expect("Failed to obtain document");

Shared(Rc::<Execution>::new_cyclic(|weak| {
let proxy_spawner = WakerSpawner::new(main_thread, weak.clone(), |runner, count| {
if let Some(runner) = runner.upgrade() {
Shared(runner).send_events(iter::repeat(Event::UserEvent(())).take(count))
}
})
.expect("`EventLoop` has to be created in the main thread");
let proxy_spawner =
WakerSpawner::new(main_thread, weak.clone(), |runner, count, local| {
if let Some(runner) = runner.upgrade() {
Shared(runner).send_user_events(count, local)
}
})
.expect("`EventLoop` has to be created in the main thread");

Execution {
main_thread,
Expand Down Expand Up @@ -460,6 +464,48 @@ impl Shared {
self.send_events(iter::once(event));
}

// Add a series of user events to the event loop runner
//
// This will schedule the event loop to wake up instead of waking it up immediately if its not
// running.
pub(crate) fn send_user_events(&self, count: NonZeroUsize, local: bool) {
// If the event loop is closed, it should discard any new events
if self.is_closed() {
return;
}

if local {
// If the loop is not running and triggered locally, queue on next microtick.
if let Ok(RunnerEnum::Running(_)) =
self.0.runner.try_borrow().as_ref().map(Deref::deref)
{
#[wasm_bindgen]
extern "C" {
#[wasm_bindgen(js_name = queueMicrotask)]
fn queue_microtask(task: Function);
}

queue_microtask(
Closure::once_into_js({
let this = Rc::downgrade(&self.0);
move || {
if let Some(shared) = this.upgrade() {
Shared(shared).send_events(
iter::repeat(Event::UserEvent(())).take(count.get()),
)
}
}
})
.unchecked_into(),
);

return;
}
}

self.send_events(iter::repeat(Event::UserEvent(())).take(count.get()))
}

// Add a series of events to the event loop runner
//
// It will determine if the event should be immediately sent to the user or buffered for later
Expand All @@ -471,13 +517,8 @@ impl Shared {
// If we can run the event processing right now, or need to queue this and wait for later
let mut process_immediately = true;
match self.0.runner.try_borrow().as_ref().map(Deref::deref) {
Ok(RunnerEnum::Running(ref runner)) => {
// If we're currently polling, queue this and wait for the poll() method to be
// called
if let State::Poll { .. } = runner.state {
process_immediately = false;
}
},
// If the runner is attached but not running, we always wake it up.
Ok(RunnerEnum::Running(_)) => (),
Ok(RunnerEnum::Pending) => {
// The runner still hasn't been attached: queue this event and wait for it to be
process_immediately = false;
Expand Down
Loading