Skip to content

Commit 4c31bbb

Browse files
committed
add code for 46
1 parent 373b1d1 commit 4c31bbb

38 files changed

+4771
-0
lines changed

46_kv/.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
fixtures

46_kv/Cargo.toml

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
[package]
2+
name = "kv6"
3+
version = "0.1.0"
4+
edition = "2018"
5+
6+
[[bin]]
7+
name = "kvs"
8+
path = "src/server.rs"
9+
doc = false
10+
11+
[[bin]]
12+
name = "kvc"
13+
path = "src/client.rs"
14+
doc = false
15+
16+
[dependencies]
17+
anyhow = "1" # 错误处理
18+
bytes = "1" # 高效处理网络 buffer 的库
19+
dashmap = "4" # 并发 HashMap
20+
flate2 = "1" # gzip 压缩
21+
futures = "0.3" # 提供 Stream trait
22+
http = "0.2" # 我们使用 HTTP status code 所以引入这个类型库
23+
opentelemetry-jaeger = "0.15" # opentelemetry jaeger 支持
24+
prost = "0.8" # 处理 protobuf 的代码
25+
rustls-native-certs = "0.5"
26+
serde = { version = "1", features = ["derive"] } # 序列化/反序列化
27+
sled = "0.34" # sled db
28+
thiserror = "1" # 错误定义和处理
29+
tokio = { version = "1", features = ["full" ] } # 异步网络库
30+
tokio-rustls = "0.22" # 处理 TLS
31+
tokio-stream = { version = "0.1", features = ["sync"] } # 处理 stream
32+
tokio-util = { version = "0.6", features = ["compat"]} # tokio 和 futures 的兼容性库
33+
toml = "0.5" # toml 支持
34+
tracing = "0.1" # 日志处理
35+
tracing-appender = "0.1" # 文件日志
36+
tracing-opentelemetry = "0.15" # opentelemetry 支持
37+
tracing-subscriber = { version = "0.2", features = ["json", "chrono"] } # 日志处理
38+
yamux = "0.9" # yamux 多路复用支持
39+
40+
[dev-dependencies]
41+
async-prost = "0.2.1" # 支持把 protobuf 封装成 TCP frame
42+
certify = "0.3"
43+
criterion = { version = "0.3", features = ["async_futures", "async_tokio", "html_reports"] } # benchmark
44+
rand = "0.8" # 随机数处理
45+
tempfile = "3" # 处理临时目录和临时文件
46+
tokio-util = { version = "0.6", features = ["codec"]}
47+
48+
[build-dependencies]
49+
prost-build = "0.8" # 编译 protobuf
50+
51+
[[bench]]
52+
name = "pubsub"
53+
harness = false

46_kv/abi.proto

+119
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
syntax = "proto3";
2+
3+
package abi;
4+
5+
// 来自客户端的命令请求
6+
message CommandRequest {
7+
oneof request_data {
8+
Hget hget = 1;
9+
Hgetall hgetall = 2;
10+
Hmget hmget = 3;
11+
Hset hset = 4;
12+
Hmset hmset = 5;
13+
Hdel hdel = 6;
14+
Hmdel hmdel = 7;
15+
Hexist hexist = 8;
16+
Hmexist hmexist = 9;
17+
Subscribe subscribe = 10;
18+
Unsubscribe unsubscribe = 11;
19+
Publish publish = 12;
20+
}
21+
}
22+
23+
// 服务器的响应
24+
message CommandResponse {
25+
// 状态码;复用 HTTP 2xx/4xx/5xx 状态码
26+
uint32 status = 1;
27+
// 如果不是 2xx,message 里包含详细的信息
28+
string message = 2;
29+
// 成功返回的 values
30+
repeated Value values = 3;
31+
// 成功返回的 kv pairs
32+
repeated Kvpair pairs = 4;
33+
}
34+
35+
// 从 table 中获取一个 key,返回 value
36+
message Hget {
37+
string table = 1;
38+
string key = 2;
39+
}
40+
41+
// 从 table 中获取所有的 Kvpair
42+
message Hgetall { string table = 1; }
43+
44+
// 从 table 中获取一组 key,返回它们的 value
45+
message Hmget {
46+
string table = 1;
47+
repeated string keys = 2;
48+
}
49+
50+
// 返回的值
51+
message Value {
52+
oneof value {
53+
string string = 1;
54+
bytes binary = 2;
55+
int64 integer = 3;
56+
double float = 4;
57+
bool bool = 5;
58+
}
59+
}
60+
61+
// 返回的 kvpair
62+
message Kvpair {
63+
string key = 1;
64+
Value value = 2;
65+
}
66+
67+
// 往 table 里存一个 kvpair,
68+
// 如果 table 不存在就创建这个 table
69+
message Hset {
70+
string table = 1;
71+
Kvpair pair = 2;
72+
}
73+
74+
// 往 table 中存一组 kvpair,
75+
// 如果 table 不存在就创建这个 table
76+
message Hmset {
77+
string table = 1;
78+
repeated Kvpair pairs = 2;
79+
}
80+
81+
// 从 table 中删除一个 key,返回它之前的值
82+
message Hdel {
83+
string table = 1;
84+
string key = 2;
85+
}
86+
87+
// 从 table 中删除一组 key,返回它们之前的值
88+
message Hmdel {
89+
string table = 1;
90+
repeated string keys = 2;
91+
}
92+
93+
// 查看 key 是否存在
94+
message Hexist {
95+
string table = 1;
96+
string key = 2;
97+
}
98+
99+
// 查看一组 key 是否存在
100+
message Hmexist {
101+
string table = 1;
102+
repeated string keys = 2;
103+
}
104+
105+
// subscribe 到某个主题,任何发布到这个主题的数据都会被收到
106+
// 成功后,第一个返回的 CommandResponse,我们返回一个唯一的 subscription id
107+
message Subscribe { string topic = 1; }
108+
109+
// 取消对某个主题的订阅
110+
message Unsubscribe {
111+
string topic = 1;
112+
uint32 id = 2;
113+
}
114+
115+
// 发布数据到某个主题
116+
message Publish {
117+
string topic = 1;
118+
repeated Value data = 2;
119+
}

46_kv/benches/pubsub.rs

+126
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
use anyhow::Result;
2+
use criterion::{criterion_group, criterion_main, Criterion};
3+
use futures::StreamExt;
4+
use kv6::{
5+
start_client_with_config, start_server_with_config, ClientConfig, CommandRequest, ServerConfig,
6+
StorageConfig, YamuxCtrl,
7+
};
8+
use rand::prelude::SliceRandom;
9+
use std::time::Duration;
10+
use tokio::net::TcpStream;
11+
use tokio::runtime::Builder;
12+
use tokio::time;
13+
use tokio_rustls::client::TlsStream;
14+
use tracing::{info, span};
15+
use tracing_subscriber::{layer::SubscriberExt, prelude::*, EnvFilter};
16+
17+
async fn start_server() -> Result<()> {
18+
let addr = "127.0.0.1:9999";
19+
let mut config: ServerConfig = toml::from_str(include_str!("../fixtures/server.conf"))?;
20+
config.general.addr = addr.into();
21+
config.storage = StorageConfig::MemTable;
22+
23+
tokio::spawn(async move {
24+
start_server_with_config(&config).await.unwrap();
25+
});
26+
27+
Ok(())
28+
}
29+
30+
async fn connect() -> Result<YamuxCtrl<TlsStream<TcpStream>>> {
31+
let addr = "127.0.0.1:9999";
32+
let mut config: ClientConfig = toml::from_str(include_str!("../fixtures/client.conf"))?;
33+
config.general.addr = addr.into();
34+
35+
Ok(start_client_with_config(&config).await?)
36+
}
37+
38+
async fn start_subscribers(topic: &'static str) -> Result<()> {
39+
let mut ctrl = connect().await?;
40+
let stream = ctrl.open_stream().await?;
41+
info!("C(subscriber): stream opened");
42+
let cmd = CommandRequest::new_subscribe(topic.to_string());
43+
tokio::spawn(async move {
44+
let mut stream = stream.execute_streaming(&cmd).await.unwrap();
45+
while let Some(Ok(data)) = stream.next().await {
46+
drop(data);
47+
}
48+
});
49+
50+
Ok(())
51+
}
52+
53+
async fn start_publishers(topic: &'static str, values: &'static [&'static str]) -> Result<()> {
54+
let mut rng = rand::thread_rng();
55+
let v = values.choose(&mut rng).unwrap();
56+
57+
let mut ctrl = connect().await.unwrap();
58+
let mut stream = ctrl.open_stream().await.unwrap();
59+
info!("C(publisher): stream opened");
60+
61+
let cmd = CommandRequest::new_publish(topic.to_string(), vec![(*v).into()]);
62+
stream.execute_unary(&cmd).await.unwrap();
63+
64+
Ok(())
65+
}
66+
67+
fn pubsub(c: &mut Criterion) {
68+
let tracer = opentelemetry_jaeger::new_pipeline()
69+
.with_service_name("kv-bench")
70+
.install_simple()
71+
.unwrap();
72+
let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer);
73+
74+
tracing_subscriber::registry()
75+
.with(EnvFilter::from_default_env())
76+
.with(opentelemetry)
77+
.init();
78+
79+
let root = span!(tracing::Level::INFO, "app_start", work_units = 2);
80+
let _enter = root.enter();
81+
// 创建 Tokio runtime
82+
let runtime = Builder::new_multi_thread()
83+
.worker_threads(4)
84+
.thread_name("pubsub")
85+
.enable_all()
86+
.build()
87+
.unwrap();
88+
89+
let base_str = include_str!("../fixtures/server.conf"); // 891 bytes
90+
91+
let values: &'static [&'static str] = Box::leak(
92+
vec![
93+
&base_str[..64],
94+
&base_str[..128],
95+
&base_str[..256],
96+
&base_str[..512],
97+
]
98+
.into_boxed_slice(),
99+
);
100+
let topic = "lobby";
101+
102+
// 运行服务器和 100 个 subscriber,为测试准备
103+
runtime.block_on(async {
104+
eprint!("preparing server and subscribers");
105+
start_server().await.unwrap();
106+
time::sleep(Duration::from_millis(50)).await;
107+
for _ in 0..1000 {
108+
start_subscribers(topic).await.unwrap();
109+
eprint!(".");
110+
}
111+
eprintln!("Done!");
112+
});
113+
114+
// 进行 benchmark
115+
c.bench_function("publishing", move |b| {
116+
b.to_async(&runtime)
117+
.iter(|| async { start_publishers(topic, values).await })
118+
});
119+
}
120+
121+
criterion_group! {
122+
name = benches;
123+
config = Criterion::default().sample_size(10);
124+
targets = pubsub
125+
}
126+
criterion_main!(benches);

46_kv/build.rs

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
fn main() {
2+
let mut config = prost_build::Config::new();
3+
config.bytes(&["."]);
4+
config.type_attribute(".", "#[derive(PartialOrd)]");
5+
config
6+
.out_dir("src/pb")
7+
.compile_protos(&["abi.proto"], &["."])
8+
.unwrap();
9+
println!("cargo:rerun-if-changed=build.rs");
10+
println!("cargo:rerun-if-changed=abi.proto");
11+
}

0 commit comments

Comments
 (0)