Skip to content

Commit 5d6edc8

Browse files
authored
Merge pull request #494 from stepancheg/sync-queue-bench
batch sync::mpsc
2 parents 0fb87c2 + fea8994 commit 5d6edc8

File tree

1 file changed

+155
-0
lines changed

1 file changed

+155
-0
lines changed

benches/sync_mpsc.rs

+155
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
#![feature(test)]
2+
3+
extern crate futures;
4+
extern crate test;
5+
6+
use futures::{Async, Poll, AsyncSink};
7+
use futures::executor;
8+
use futures::executor::{Notify, NotifyHandle};
9+
10+
use futures::sink::Sink;
11+
use futures::stream::Stream;
12+
13+
use futures::sync::mpsc::unbounded;
14+
use futures::sync::mpsc::channel;
15+
use futures::sync::mpsc::Sender;
16+
use futures::sync::mpsc::UnboundedSender;
17+
18+
19+
use test::Bencher;
20+
21+
fn notify_noop() -> NotifyHandle {
22+
struct Noop;
23+
24+
impl Notify for Noop {
25+
fn notify(&self, _id: usize) {}
26+
}
27+
28+
const NOOP : &'static Noop = &Noop;
29+
30+
NotifyHandle::from(NOOP)
31+
}
32+
33+
/// Single producer, single consumer
34+
#[bench]
35+
fn unbounded_1_tx(b: &mut Bencher) {
36+
b.iter(|| {
37+
let (tx, rx) = unbounded();
38+
39+
let mut rx = executor::spawn(rx);
40+
41+
// 1000 iterations to avoid measuring overhead of initialization
42+
// Result should be divided by 1000
43+
for i in 0..1000 {
44+
45+
// Poll, not ready, park
46+
assert_eq!(Ok(Async::NotReady), rx.poll_stream_notify(&notify_noop(), 1));
47+
48+
UnboundedSender::send(&tx, i).unwrap();
49+
50+
// Now poll ready
51+
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll_stream_notify(&notify_noop(), 1));
52+
}
53+
})
54+
}
55+
56+
/// 100 producers, single consumer
57+
#[bench]
58+
fn unbounded_100_tx(b: &mut Bencher) {
59+
b.iter(|| {
60+
let (tx, rx) = unbounded();
61+
62+
let mut rx = executor::spawn(rx);
63+
64+
let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect();
65+
66+
// 1000 send/recv operations total, result should be divided by 1000
67+
for _ in 0..10 {
68+
for i in 0..tx.len() {
69+
assert_eq!(Ok(Async::NotReady), rx.poll_stream_notify(&notify_noop(), 1));
70+
71+
UnboundedSender::send(&tx[i], i).unwrap();
72+
73+
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll_stream_notify(&notify_noop(), 1));
74+
}
75+
}
76+
})
77+
}
78+
79+
80+
/// A Stream that continuously sends incrementing number of the queue
81+
struct TestSender {
82+
tx: Sender<u32>,
83+
last: u32, // Last number sent
84+
}
85+
86+
// Could be a Future, it doesn't matter
87+
impl Stream for TestSender {
88+
type Item = u32;
89+
type Error = ();
90+
91+
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
92+
match self.tx.start_send(self.last + 1) {
93+
Err(_) => panic!(),
94+
Ok(AsyncSink::Ready) => {
95+
self.last += 1;
96+
assert_eq!(Ok(Async::Ready(())), self.tx.poll_complete());
97+
Ok(Async::Ready(Some(self.last)))
98+
}
99+
Ok(AsyncSink::NotReady(_)) => {
100+
Ok(Async::NotReady)
101+
}
102+
}
103+
}
104+
}
105+
106+
107+
/// Single producers, single consumer
108+
#[bench]
109+
fn bounded_1_tx(b: &mut Bencher) {
110+
b.iter(|| {
111+
let (tx, rx) = channel(0);
112+
113+
let mut tx = executor::spawn(TestSender {
114+
tx: tx,
115+
last: 0,
116+
});
117+
118+
let mut rx = executor::spawn(rx);
119+
120+
for i in 0..1000 {
121+
assert_eq!(Ok(Async::Ready(Some(i + 1))), tx.poll_stream_notify(&notify_noop(), 1));
122+
assert_eq!(Ok(Async::NotReady), tx.poll_stream_notify(&notify_noop(), 1));
123+
assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll_stream_notify(&notify_noop(), 1));
124+
}
125+
})
126+
}
127+
128+
/// 100 producers, single consumer
129+
#[bench]
130+
fn bounded_100_tx(b: &mut Bencher) {
131+
b.iter(|| {
132+
// Each sender can send one item after specified capacity
133+
let (tx, rx) = channel(0);
134+
135+
let mut tx: Vec<_> = (0..100).map(|_| {
136+
executor::spawn(TestSender {
137+
tx: tx.clone(),
138+
last: 0
139+
})
140+
}).collect();
141+
142+
let mut rx = executor::spawn(rx);
143+
144+
for i in 0..10 {
145+
for j in 0..tx.len() {
146+
// Send an item
147+
assert_eq!(Ok(Async::Ready(Some(i + 1))), tx[j].poll_stream_notify(&notify_noop(), 1));
148+
// Then block
149+
assert_eq!(Ok(Async::NotReady), tx[j].poll_stream_notify(&notify_noop(), 1));
150+
// Recv the item
151+
assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll_stream_notify(&notify_noop(), 1));
152+
}
153+
}
154+
})
155+
}

0 commit comments

Comments
 (0)