Skip to content

Commit d518f68

Browse files
seanayeAlorel
authored andcommitted
fix: Fixed a race condition in cursors
Closes #52
1 parent 3b383a2 commit d518f68

File tree

7 files changed

+127
-27
lines changed

7 files changed

+127
-27
lines changed

src/cursor/base_cursor.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use super::{CursorDirection, CursorSys};
2-
use crate::future::{PollUnpinned, VoidRequest};
2+
use crate::future::request::listeners::EventTargetResult;
3+
use crate::future::{PollUnpinned, Request};
34
use crate::internal_utils::SystemRepr;
45
use crate::primitive::{TryFromJs, TryToJs};
56
use fancy_constructor::new;
@@ -27,7 +28,7 @@ pub(crate) enum CursorState {
2728
TryNext,
2829

2930
/// We are currently reading the next record.
30-
ReadingNext(VoidRequest),
31+
ReadingNext(Request<EventTargetResult>),
3132
}
3233

3334
impl BaseCursor {
@@ -68,7 +69,7 @@ impl BaseCursor {
6869
Ok(())
6970
} else {
7071
self.as_sys().advance(step)?;
71-
self.req().await
72+
self.req().await.map(|_| ())
7273
}
7374
}
7475

@@ -108,8 +109,8 @@ impl BaseCursor {
108109
self.as_sys().direction()
109110
}
110111

111-
pub(crate) fn req(&self) -> VoidRequest {
112-
VoidRequest::new(self.as_sys().req())
112+
pub(crate) fn req(&self) -> Request<EventTargetResult> {
113+
Request::new(self.as_sys().req())
113114
}
114115

115116
pub(crate) fn poll_state<R, F>(
@@ -186,7 +187,7 @@ impl BaseCursor {
186187

187188
fn on_req_polled<R, F>(
188189
&mut self,
189-
poll: Poll<crate::Result<()>>,
190+
poll: Poll<crate::Result<EventTargetResult>>,
190191
read_current: F,
191192
) -> Poll<crate::Result<Option<R>>>
192193
where
@@ -196,15 +197,27 @@ impl BaseCursor {
196197
Poll::Ready(res) => {
197198
self.state = CursorState::ReadCurrent;
198199

199-
Poll::Ready(match res {
200-
Ok(()) => {
200+
let should_continue = res.map(|event_result| match event_result {
201+
EventTargetResult::Null => false,
202+
EventTargetResult::Cursor(cursor_sys) => {
203+
self.sys = cursor_sys;
204+
true
205+
}
206+
EventTargetResult::NotNull => true,
207+
});
208+
209+
Poll::Ready(match should_continue {
210+
Ok(true) => {
201211
// Firefox implementation: key gets set to undefined on cursor end
202212
if self.has_key() {
203213
self.read_current(read_current)
204214
} else {
205215
Ok(None)
206216
}
207217
}
218+
// Chrome implementation: the only way to know if a cursor has finished
219+
// is by reading the value from onsuccess event.target.result
220+
Ok(false) => Ok(None),
208221
Err(e) => Err(e),
209222
})
210223
}

src/future/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ mod basic;
1313
mod get_all;
1414
mod maybe_errored;
1515
mod open_db;
16-
mod request;
16+
pub(crate) mod request;
1717
mod traits;
1818

1919
iffeat! {

src/future/request.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
mod listeners;
1+
pub(crate) mod listeners;
22
mod untyped;
33

44
use crate::internal_utils::SystemRepr;
@@ -54,19 +54,30 @@ impl PollUnpinned for Request {
5454

5555
fn poll_unpinned(&mut self, cx: &mut Context) -> Poll<Self::Output> {
5656
match self.inner.poll_unpinned(cx) {
57-
Poll::Ready(Ok(())) => Poll::Ready(self.as_sys().result().map_err(Into::into)),
57+
Poll::Ready(Ok(_)) => Poll::Ready(self.as_sys().result().map_err(Into::into)),
5858
Poll::Pending => Poll::Pending,
5959
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
6060
}
6161
}
6262
}
6363

64+
#[sealed]
65+
#[cfg(feature = "cursors")]
66+
impl PollUnpinned for Request<listeners::EventTargetResult> {
67+
type Output = crate::Result<listeners::EventTargetResult>;
68+
69+
#[inline]
70+
fn poll_unpinned(&mut self, cx: &mut Context) -> Poll<Self::Output> {
71+
self.inner.poll_unpinned(cx)
72+
}
73+
}
74+
6475
#[sealed]
6576
impl PollUnpinned for Request<()> {
6677
type Output = crate::Result<()>;
6778

6879
#[inline]
6980
fn poll_unpinned(&mut self, cx: &mut Context) -> Poll<Self::Output> {
70-
self.inner.poll_unpinned(cx)
81+
self.inner.poll_unpinned(cx).map(|res| res.map(|_| ()))
7182
}
7283
}

src/future/request/listeners.rs

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,58 @@
11
use super::super::traits::*;
22
use crate::error::UnexpectedDataError;
3+
use cfg_if::cfg_if;
34
use std::task::{Context, Poll};
45
use tokio::sync::mpsc;
56
use wasm_bindgen::prelude::*;
67

7-
type Callback = Closure<dyn FnMut() + 'static>;
8+
type Callback = Closure<dyn FnMut(web_sys::Event) + 'static>;
9+
10+
/// represents the value on an event.target.result
11+
pub(crate) enum EventTargetResult {
12+
/// the event.target.result was null
13+
Null,
14+
/// the event.target.result was a [`web_sys::IdbCursor`] instance
15+
#[cfg(feature = "cursors")]
16+
Cursor(crate::cursor::CursorSys),
17+
/// the event.target.result was not null
18+
NotNull,
19+
}
820

921
pub(super) struct Listeners {
10-
rx: mpsc::Receiver<()>,
22+
rx: mpsc::Receiver<EventTargetResult>,
1123
req: web_sys::IdbRequest,
12-
_callback: Closure<dyn FnMut() + 'static>,
24+
_callback: Callback,
1325
}
1426

1527
impl Listeners {
1628
pub(super) fn new(req: web_sys::IdbRequest) -> Self {
1729
let (tx, rx) = mpsc::channel(1);
1830

19-
let callback = Callback::wrap(Box::new(move || {
20-
let _ = tx.try_send(());
31+
let callback = Callback::wrap(Box::new(move |e: web_sys::Event| {
32+
let non_null_result = e
33+
.target()
34+
.map(JsValue::from)
35+
// get the event.target.result
36+
.and_then(|val| js_sys::Reflect::get(&val, &JsValue::from("result")).ok())
37+
// make sure its not null or undefined
38+
.filter(|val| !val.is_undefined() && !val.is_null());
39+
40+
let _ = tx.try_send(match non_null_result {
41+
None => EventTargetResult::Null,
42+
#[cfg_attr(not(feature = "cursors"), expect(unused_variables))]
43+
Some(val) => {
44+
cfg_if! {
45+
if #[cfg(feature = "cursors")] {
46+
match val.dyn_into::<crate::cursor::CursorSys>() {
47+
Ok(cursor) => EventTargetResult::Cursor(cursor),
48+
Err(_) => EventTargetResult::NotNull,
49+
}
50+
} else {
51+
EventTargetResult::NotNull
52+
}
53+
}
54+
}
55+
});
2156
}));
2257

2358
let as_fn = callback.as_ref().unchecked_ref();
@@ -41,12 +76,15 @@ impl Drop for Listeners {
4176

4277
#[::sealed::sealed]
4378
impl PollUnpinned for Listeners {
44-
type Output = crate::Result<()>;
79+
type Output = crate::Result<EventTargetResult>;
4580

4681
fn poll_unpinned(&mut self, cx: &mut Context) -> Poll<Self::Output> {
4782
match self.rx.poll_recv(cx) {
4883
Poll::Pending => Poll::Pending,
49-
Poll::Ready(Some(())) => Poll::Ready(super::UntypedRequest::req_to_result(&self.req)),
84+
Poll::Ready(Some(event_target)) => Poll::Ready(super::UntypedRequest::req_to_result(
85+
&self.req,
86+
event_target,
87+
)),
5088
Poll::Ready(None) => Poll::Ready(Err(UnexpectedDataError::ChannelDropped.into())),
5189
}
5290
}

src/future/request/untyped.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use super::super::traits::*;
21
use super::Listeners;
2+
use super::{super::traits::*, listeners::EventTargetResult};
33
use sealed::sealed;
44
use std::task::{Context, Poll};
55
use wasm_bindgen::prelude::*;
@@ -31,17 +31,17 @@ impl crate::internal_utils::SystemRepr for UntypedRequest {
3131
}
3232

3333
impl UntypedRequest {
34-
pub(super) fn req_to_result(req: &web_sys::IdbRequest) -> crate::Result<()> {
34+
pub(super) fn req_to_result<T>(req: &web_sys::IdbRequest, v: T) -> crate::Result<T> {
3535
match req.error() {
36-
Ok(None) => Ok(()),
36+
Ok(None) => Ok(v),
3737
Ok(Some(e)) => Err(e.into()),
3838
Err(e) => Err(e.into()),
3939
}
4040
}
4141

42-
fn poll_request(req: &web_sys::IdbRequest) -> Poll<crate::Result<()>> {
42+
fn poll_request<T>(req: &web_sys::IdbRequest, v: T) -> Poll<crate::Result<T>> {
4343
if matches!(req.ready_state(), IdbRequestReadyState::Done) {
44-
Poll::Ready(Self::req_to_result(req))
44+
Poll::Ready(Self::req_to_result(req, v))
4545
} else {
4646
Poll::Pending
4747
}
@@ -56,13 +56,13 @@ impl UntypedRequest {
5656

5757
#[sealed]
5858
impl PollUnpinned for UntypedRequest {
59-
type Output = crate::Result<()>;
59+
type Output = crate::Result<EventTargetResult>;
6060

6161
fn poll_unpinned(&mut self, cx: &mut Context) -> Poll<Self::Output> {
6262
match self {
6363
Self::WithListeners(listeners) => listeners.poll_unpinned(cx),
6464
Self::Bare(req) => {
65-
if let Poll::Ready(res) = Self::poll_request(req) {
65+
if let Poll::Ready(res) = Self::poll_request(req, EventTargetResult::NotNull) {
6666
Poll::Ready(res)
6767
} else {
6868
let mut listeners = Self::create_listeners(req);

tests/tests/object_store/add_put.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ macro_rules! common_tests {
103103
.unwrap()
104104
.await;
105105

106-
assert_eq!(res, Ok(()));
106+
assert!(res.is_ok());
107107
}
108108
};
109109
}

tests/tests/object_store/query_source/cursor.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,44 @@ pub mod cursor {
378378
assert_eq!(data, expected);
379379
}
380380

381+
#[wasm_bindgen_test]
382+
#[cfg(all(feature = "serde", feature = "streams", feature = "cursors"))]
383+
pub async fn cursor_stream_one() {
384+
use serde::{Deserialize, Serialize};
385+
let db = random_db_keyval().await;
386+
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
387+
struct Foo {
388+
a: String,
389+
b: u32,
390+
key: u32,
391+
}
392+
let data_1 = Foo {
393+
a: "hello".to_string(),
394+
b: 42,
395+
key: 1,
396+
};
397+
let data_2 = Foo {
398+
a: "world".to_string(),
399+
b: 42,
400+
key: 2,
401+
};
402+
// seed db
403+
{
404+
open_tx!(db, Readwrite > (tx, store));
405+
406+
store.add(data_1.clone()).serde().unwrap().await.unwrap();
407+
store.add(data_2.clone()).serde().unwrap().await.unwrap();
408+
drop(store);
409+
tx.commit().await.unwrap();
410+
}
411+
open_tx!(db, Readonly > (tx, store));
412+
let cursor = store.open_cursor().serde().unwrap().await.unwrap().unwrap();
413+
let res: Vec<Foo> = cursor.stream_ser::<Foo>().try_collect().await.unwrap();
414+
assert_eq!(res.len(), 2);
415+
assert_eq!(res.first().unwrap(), &data_1);
416+
assert_eq!(res.get(1).unwrap(), &data_2);
417+
}
418+
381419
#[wasm_bindgen_test]
382420
pub async fn with_query() {
383421
let db = random_db_keyval().await;

0 commit comments

Comments
 (0)