forked from tyrchen/geektime-rust
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdiff_benchmark
296 lines (273 loc) · 10.3 KB
/
diff_benchmark
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
diff --git a/43/kv/Cargo.toml b/43/kv/Cargo.toml
index 1c15362..cbbe18d 100644
--- a/43/kv/Cargo.toml
+++ b/43/kv/Cargo.toml
@@ -36,7 +36,7 @@ yamux = "0.9" # yamux 多路复用支持
async-prost = "0.2.1" # 支持把 protobuf 封装成 TCP frame
certify = "0.3"
criterion = { version = "0.3", features = ["async_futures", "async_tokio", "html_reports"] } # benchmark
-rand = "0.8"
+rand = "0.8" # 随机数处理
tempfile = "3" # 处理临时目录和临时文件
tokio-util = { version = "0.6", features = ["codec"]}
diff --git a/43/kv/benches/pubsub.rs b/43/kv/benches/pubsub.rs
new file mode 100644
index 0000000..c80d153
--- /dev/null
+++ b/43/kv/benches/pubsub.rs
@@ -0,0 +1,102 @@
+use anyhow::Result;
+use criterion::{criterion_group, criterion_main, Criterion};
+use futures::StreamExt;
+use kv6::{
+ start_client_with_config, start_server_with_config, ClientConfig, CommandRequest, ServerConfig,
+ StorageConfig, YamuxCtrl,
+};
+use rand::prelude::SliceRandom;
+use std::time::Duration;
+use tokio::net::TcpStream;
+use tokio::runtime::Builder;
+use tokio::time;
+use tokio_rustls::client::TlsStream;
+use tracing::info;
+
+async fn start_server() -> Result<()> {
+ let addr = "127.0.0.1:9999";
+ let mut config: ServerConfig = toml::from_str(include_str!("../fixtures/server.conf"))?;
+ config.general.addr = addr.into();
+ config.storage = StorageConfig::MemTable;
+
+ tokio::spawn(async move {
+ start_server_with_config(&config).await.unwrap();
+ });
+
+ Ok(())
+}
+
+async fn connect() -> Result<YamuxCtrl<TlsStream<TcpStream>>> {
+ let addr = "127.0.0.1:9999";
+ let mut config: ClientConfig = toml::from_str(include_str!("../fixtures/client.conf"))?;
+ config.general.addr = addr.into();
+
+ Ok(start_client_with_config(&config).await?)
+}
+
+async fn start_subscribers(topic: &'static str) -> Result<()> {
+ let mut ctrl = connect().await?;
+ let stream = ctrl.open_stream().await?;
+ info!("C(subscriber): stream opened");
+ let cmd = CommandRequest::new_subscribe(topic.to_string());
+ tokio::spawn(async move {
+ let mut stream = stream.execute_streaming(&cmd).await.unwrap();
+ while let Some(Ok(data)) = stream.next().await {
+ drop(data);
+ }
+ });
+
+ Ok(())
+}
+
+async fn start_publishers(topic: &'static str, values: &'static [&'static str]) -> Result<()> {
+ let mut rng = rand::thread_rng();
+ let v = values.choose(&mut rng).unwrap();
+
+ let mut ctrl = connect().await.unwrap();
+ let mut stream = ctrl.open_stream().await.unwrap();
+ info!("C(publisher): stream opened");
+
+ let cmd = CommandRequest::new_publish(topic.to_string(), vec![(*v).into()]);
+ stream.execute_unary(&cmd).await.unwrap();
+
+ Ok(())
+}
+
+fn pubsub(c: &mut Criterion) {
+ // tracing_subscriber::fmt::init();
+ // 创建 Tokio runtime
+ let runtime = Builder::new_multi_thread()
+ .worker_threads(4)
+ .thread_name("pubsub")
+ .enable_all()
+ .build()
+ .unwrap();
+ let values = &["Hello", "Tyr", "Goodbye", "World"];
+ let topic = "lobby";
+
+ // 运行服务器和 100 个 subscriber,为测试准备
+ runtime.block_on(async {
+ eprint!("preparing server and subscribers");
+ start_server().await.unwrap();
+ time::sleep(Duration::from_millis(50)).await;
+ for _ in 0..100 {
+ start_subscribers(topic).await.unwrap();
+ eprint!(".");
+ }
+ eprintln!("Done!");
+ });
+
+ // 进行 benchmark
+ c.bench_function("publishing", move |b| {
+ b.to_async(&runtime)
+ .iter(|| async { start_publishers(topic, values).await })
+ });
+}
+
+criterion_group! {
+ name = benches;
+ config = Criterion::default().sample_size(10);
+ targets = pubsub
+}
+criterion_main!(benches);
diff --git a/43/kv/src/client.rs b/43/kv/src/client.rs
index 70a962d..e1f1865 100644
--- a/43/kv/src/client.rs
+++ b/43/kv/src/client.rs
@@ -18,20 +18,18 @@ async fn main() -> Result<()> {
let channel = "lobby";
start_publishing(ctrl.open_stream().await?, channel)?;
- let stream = ctrl.open_stream().await?;
-
- let mut client = ProstClientStream::new(stream);
+ let mut stream = ctrl.open_stream().await?;
// 生成一个 HSET 命令
let cmd = CommandRequest::new_hset("table1", "hello", "world".to_string().into());
// 发送 HSET 命令
- let data = client.execute_unary(&cmd).await?;
+ let data = stream.execute_unary(&cmd).await?;
info!("Got response {:?}", data);
// 生成一个 Subscribe 命令
let cmd = CommandRequest::new_subscribe(channel);
- let mut stream = client.execute_streaming(&cmd).await?;
+ let mut stream = stream.execute_streaming(&cmd).await?;
let id = stream.id;
start_unsubscribe(ctrl.open_stream().await?, channel, id)?;
@@ -44,24 +42,29 @@ async fn main() -> Result<()> {
Ok(())
}
-fn start_publishing(stream: Compat<yamux::Stream>, name: &str) -> Result<(), KvError> {
+fn start_publishing(
+ mut stream: ProstClientStream<Compat<yamux::Stream>>,
+ name: &str,
+) -> Result<(), KvError> {
let cmd = CommandRequest::new_publish(name, vec![1.into(), 2.into(), "hello".into()]);
tokio::spawn(async move {
time::sleep(Duration::from_millis(1000)).await;
- let mut client = ProstClientStream::new(stream);
- let res = client.execute_unary(&cmd).await.unwrap();
+ let res = stream.execute_unary(&cmd).await.unwrap();
println!("Finished publishing: {:?}", res);
});
Ok(())
}
-fn start_unsubscribe(stream: Compat<yamux::Stream>, name: &str, id: u32) -> Result<(), KvError> {
+fn start_unsubscribe(
+ mut stream: ProstClientStream<Compat<yamux::Stream>>,
+ name: &str,
+ id: u32,
+) -> Result<(), KvError> {
let cmd = CommandRequest::new_unsubscribe(name, id as _);
tokio::spawn(async move {
time::sleep(Duration::from_millis(2000)).await;
- let mut client = ProstClientStream::new(stream);
- let res = client.execute_unary(&cmd).await.unwrap();
+ let res = stream.execute_unary(&cmd).await.unwrap();
println!("Finished unsubscribing: {:?}", res);
});
diff --git a/43/kv/src/lib.rs b/43/kv/src/lib.rs
index 4005f2a..8df5ec1 100644
--- a/43/kv/src/lib.rs
+++ b/43/kv/src/lib.rs
@@ -18,6 +18,7 @@ use tokio_rustls::client;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::info;
+/// 通过配置创建 KV 服务器
pub async fn start_server_with_config(config: &ServerConfig) -> Result<()> {
let acceptor =
TlsServerAcceptor::new(&config.tls.cert, &config.tls.key, config.tls.ca.as_deref())?;
@@ -31,6 +32,7 @@ pub async fn start_server_with_config(config: &ServerConfig) -> Result<()> {
Ok(())
}
+/// 通过配置创建 KV 客户端
pub async fn start_client_with_config(
config: &ClientConfig,
) -> Result<YamuxCtrl<client::TlsStream<TcpStream>>> {
diff --git a/43/kv/src/network/multiplex.rs b/43/kv/src/network/multiplex.rs
index 0dc72e2..07b347c 100644
--- a/43/kv/src/network/multiplex.rs
+++ b/43/kv/src/network/multiplex.rs
@@ -4,6 +4,8 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};
use yamux::{Config, Connection, ConnectionError, Control, Mode, WindowUpdateMode};
+use crate::ProstClientStream;
+
/// Yamux 控制结构
pub struct YamuxCtrl<S> {
/// yamux control,用于创建新的 stream
@@ -63,9 +65,11 @@ where
}
/// 打开一个新的 stream
- pub async fn open_stream(&mut self) -> Result<Compat<yamux::Stream>, ConnectionError> {
+ pub async fn open_stream(
+ &mut self,
+ ) -> Result<ProstClientStream<Compat<yamux::Stream>>, ConnectionError> {
let stream = self.ctrl.open_stream().await?;
- Ok(stream.compat())
+ Ok(ProstClientStream::new(stream.compat()))
}
}
@@ -78,8 +82,8 @@ mod tests {
assert_res_ok,
network::tls::tls_utils::{tls_acceptor, tls_connector},
utils::DummyStream,
- CommandRequest, KvError, MemTable, ProstClientStream, ProstServerStream, Service,
- ServiceInner, Storage, TlsServerAcceptor,
+ CommandRequest, KvError, MemTable, ProstServerStream, Service, ServiceInner, Storage,
+ TlsServerAcceptor,
};
use anyhow::Result;
use tokio::net::{TcpListener, TcpStream};
@@ -162,15 +166,13 @@ mod tests {
let mut ctrl = YamuxCtrl::new_client(stream, None);
// 从 client ctrl 中打开一个新的 yamux stream
- let stream = ctrl.open_stream().await?;
- // 封装成 ProstClientStream
- let mut client = ProstClientStream::new(stream);
+ let mut stream = ctrl.open_stream().await?;
let cmd = CommandRequest::new_hset("t1", "k1", "v1".into());
- client.execute_unary(&cmd).await.unwrap();
+ stream.execute_unary(&cmd).await.unwrap();
let cmd = CommandRequest::new_hget("t1", "k1");
- let res = client.execute_unary(&cmd).await.unwrap();
+ let res = stream.execute_unary(&cmd).await.unwrap();
assert_res_ok(&res, &["v1".into()], &[]);
Ok(())
diff --git a/43/kv/tests/server.rs b/43/kv/tests/server.rs
index 78a0675..062f1a5 100644
--- a/43/kv/tests/server.rs
+++ b/43/kv/tests/server.rs
@@ -1,7 +1,7 @@
use anyhow::Result;
use kv6::{
- start_client_with_config, start_server_with_config, ClientConfig, CommandRequest,
- ProstClientStream, ServerConfig, StorageConfig,
+ start_client_with_config, start_server_with_config, ClientConfig, CommandRequest, ServerConfig,
+ StorageConfig,
};
use std::time::Duration;
use tokio::time;
@@ -24,16 +24,15 @@ async fn yamux_server_client_full_tests() -> Result<()> {
config.general.addr = addr.into();
let mut ctrl = start_client_with_config(&config).await.unwrap();
- let stream = ctrl.open_stream().await?;
- let mut client = ProstClientStream::new(stream);
+ let mut stream = ctrl.open_stream().await?;
// 生成一个 HSET 命令
let cmd = CommandRequest::new_hset("table1", "hello", "world".to_string().into());
- client.execute_unary(&cmd).await?;
+ stream.execute_unary(&cmd).await?;
// 生成一个 HGET 命令
let cmd = CommandRequest::new_hget("table1", "hello");
- let data = client.execute_unary(&cmd).await?;
+ let data = stream.execute_unary(&cmd).await?;
assert_eq!(data.status, 200);
assert_eq!(data.values, &["world".into()]);