Skip to content

Commit 7e5aeec

Browse files
authored
Merge pull request #7 from Miaxos/feat-add-tcp-load-balancing
A lot of stuff & optimize storage
2 parents 28427fb + 4378858 commit 7e5aeec

File tree

21 files changed

+834
-287
lines changed

21 files changed

+834
-287
lines changed

Cargo.lock

+210-129
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

+18-5
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,14 @@ between our two instances, as for Redis & Dragonfly.
5959

6060
To be able to max out performances out of an application we must be able to have
6161
a linear-scalability.[^1] Usual issues around scalability are when you share
62-
data between threads, (like false-sharing[^3]). To solve this issue, we will use
63-
a shared-nothing architecture (right now we aren't due to some APIs not
64-
implemented yet) where each thread will own his own slice of the storage.
62+
data between threads, (like false-sharing[^3]).
63+
64+
To solve this issue we use an
65+
[scc::Hashmap](https://github.com/wvwwvwwv/scalable-concurrent-containers#HashMap)
66+
which is a really efficient datastructure. This datastructure can be shared
67+
across multiple thread without having a loss in performance expect when have too
68+
many thread. When this happens, we'll partition the storage by load-balancing
69+
TCP connection on those threads when there is a need to.
6570

6671
We also use a runtime which is based on `io-uring` to handle every I/O on the
6772
application: [monoio](https://github.com/bytedance/monoio/).
@@ -83,8 +88,16 @@ Memcached running on commodity hardware and Linux.*"[^2]
8388

8489
### Storage
8590

86-
We use
87-
[scc::Hashmap](https://github.com/wvwwvwwv/scalable-concurrent-containers#HashMap) behind an `Arc` for now while Sharding APIs are not implemented on [monoio](https://github.com/bytedance/monoio/issues/213) but as soon as we have a way to load-balance our TCP Connection to the proper thread, we should switch to a `scc::Hashmap` per thread.
91+
For the storage, instead of having each thread handling his part of the storage
92+
with a load balancing based on TCP connection, it seems it's more efficient to
93+
have a storage shared between a number of threads.
94+
95+
We split the whole application into a number of Storage Segment which are shared
96+
between a fixed number of thread.
97+
98+
<p align="center">
99+
<img src="./docs/storage.svg" width="60%" />
100+
</p>
88101

89102
## References
90103

app/roster/Cargo.toml

+9-1
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,23 @@ default = []
2525
anyhow.workspace = true
2626
atoi_simd = "0.15"
2727
coarsetime = "0.1"
28+
crc = "3"
2829
bytestring = "1.3"
2930
bytes.workspace = true
3031
cfg-if = "1"
3132
config.workspace = true
3233
derive_builder.workspace = true
3334
dotenv.workspace = true
35+
futures = "0.3"
3436
local-sync = "0.1"
35-
monoio = { workspace = true, features = ["bytes", "zero-copy", "iouring"] }
37+
monoio = { workspace = true, features = ["bytes", "sync", "iouring"] }
3638
rustc-hash = "1.1.0"
3739
scc = "2"
40+
sharded-thread = "1"
3841
serde.workspace = true
3942
thiserror = "1"
43+
rand = "0.8"
44+
zstd = "0.13"
4045

4146
# Logging
4247
tracing = { workspace = true, features = ["attributes"] }
@@ -60,6 +65,9 @@ tokio = { version = "1.35", features = [
6065
"net",
6166
] }
6267

68+
[profile.release]
69+
debug = true
70+
6371
[[bench]]
6472
name = "parsing"
6573
harness = false

app/roster/src/application/server/cmd/get.rs

+5
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use super::CommandExecution;
55
use crate::application::server::connection::WriteConnection;
66
use crate::application::server::context::Context;
77
use crate::application::server::frame::Frame;
8+
use crate::infrastructure::hash::crc_hash;
89

910
/// Get the value of key. If the key does not exist the special value nil is
1011
/// returned. An error is returned if the value stored at key is not a string,
@@ -40,4 +41,8 @@ impl CommandExecution for Get {
4041

4142
Ok(())
4243
}
44+
45+
fn hash_key(&self) -> Option<u16> {
46+
Some(crc_hash(self.key.as_bytes()))
47+
}
4348
}

app/roster/src/application/server/cmd/mod.rs

+27-6
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,23 @@ pub enum Command {
2929
}
3030

3131
pub trait CommandExecution: Sized {
32-
/// Apply a command or a subcommand.
32+
/// Apply the command to the specified `Db` instance.
33+
///
34+
/// The response is written to `dst`. This is called by the server in order
35+
/// to execute a received command.
3336
async fn apply(
3437
self,
3538
dst: &mut WriteConnection,
3639
ctx: Context,
3740
) -> anyhow::Result<()>;
41+
42+
/// Give the hash_key for the specified command
43+
/// it'll tell us where we should send this command based on the hash
44+
///
45+
/// https://redis.io/docs/reference/cluster-spec/#key-distribution-model
46+
fn hash_key(&self) -> Option<u16> {
47+
None
48+
}
3849
}
3950

4051
pub trait SubcommandRegistry {
@@ -96,12 +107,10 @@ impl Command {
96107
// The command has been successfully parsed
97108
Ok(command)
98109
}
110+
}
99111

100-
/// Apply the command to the specified `Db` instance.
101-
///
102-
/// The response is written to `dst`. This is called by the server in order
103-
/// to execute a received command.
104-
pub(crate) async fn apply(
112+
impl CommandExecution for Command {
113+
async fn apply(
105114
self,
106115
dst: &mut WriteConnection,
107116
ctx: Context,
@@ -116,4 +125,16 @@ impl Command {
116125
Get(cmd) => cmd.apply(dst, ctx).await,
117126
}
118127
}
128+
129+
fn hash_key(&self) -> Option<u16> {
130+
use Command::*;
131+
132+
match self {
133+
Ping(cmd) => cmd.hash_key(),
134+
Unknown(cmd) => cmd.hash_key(),
135+
Client(cmd) => cmd.hash_key(),
136+
Set(cmd) => cmd.hash_key(),
137+
Get(cmd) => cmd.hash_key(),
138+
}
139+
}
119140
}

app/roster/src/application/server/cmd/set.rs

+10-5
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::application::server::connection::WriteConnection;
99
use crate::application::server::context::Context;
1010
use crate::application::server::frame::Frame;
1111
use crate::domain::storage::SetOptions;
12+
use crate::infrastructure::hash::crc_hash;
1213

1314
/// Set key to hold the string value. If key already holds a value, it is
1415
/// overwritten, regardless of its type.
@@ -126,11 +127,11 @@ impl CommandExecution for Set {
126127
let expired = self.expire.map(|dur| ctx.now() + dur.into());
127128

128129
// let now = Instant::now();
129-
let response = match ctx
130-
.storage
131-
.set_async(self.key, self.value, SetOptions { expired })
132-
.await
133-
{
130+
let response = match ctx.storage.set_async(
131+
self.key,
132+
self.value,
133+
SetOptions { expired },
134+
) {
134135
Ok(_) => Frame::Simple(OK_STR.clone()),
135136
Err(_) => Frame::Null,
136137
};
@@ -144,4 +145,8 @@ impl CommandExecution for Set {
144145

145146
Ok(())
146147
}
148+
149+
fn hash_key(&self) -> Option<u16> {
150+
Some(crc_hash(self.key.as_bytes()))
151+
}
147152
}

app/roster/src/application/server/connection.rs

+22-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::fmt::Debug;
12
use std::io::{self, Cursor};
23

34
use bytes::BytesMut;
@@ -33,6 +34,12 @@ pub struct ReadConnection {
3334
buffer: BytesMut,
3435
}
3536

37+
impl Debug for ReadConnection {
38+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39+
write!(f, "ReadConnection")
40+
}
41+
}
42+
3643
impl ReadConnection {
3744
/// Read a single `Frame` value from the underlying stream.
3845
///
@@ -116,7 +123,9 @@ impl ReadConnection {
116123
// left to `BytesMut`. This is often done by moving an internal
117124
// cursor, but it may be done by reallocating and copying data.
118125
// self.buffer.advance(len);
119-
self.buffer.reserve(4 * 1024);
126+
if self.buffer.len() < 1024 {
127+
self.buffer.reserve(4 * 1024);
128+
}
120129

121130
// Return the parsed frame to the caller.
122131
Ok(Some(frame))
@@ -135,6 +144,10 @@ impl ReadConnection {
135144
Err(e) => Err(e.into()),
136145
}
137146
}
147+
148+
pub fn into_inner(self) -> OwnedReadHalf<TcpStream> {
149+
self.stream_r.into_inner()
150+
}
138151
}
139152

140153
impl WriteConnection {
@@ -244,4 +257,12 @@ impl WriteConnection {
244257

245258
Ok(())
246259
}
260+
261+
pub fn into_inner(self) -> OwnedWriteHalf<TcpStream> {
262+
self.stream_w.into_inner()
263+
}
264+
265+
pub fn reunite(self, read: ReadConnection) -> TcpStream {
266+
self.into_inner().reunite(read.into_inner()).unwrap()
267+
}
247268
}

app/roster/src/application/server/context.rs

+13-3
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,32 @@ use std::cell::Cell;
22

33
use coarsetime::Instant;
44

5-
use crate::domain::storage::Storage;
5+
use crate::domain::storage::StorageSegment;
66

77
#[derive(Clone)]
88
pub struct Context {
9-
pub storage: Storage,
9+
pub storage: StorageSegment,
1010
now: Cell<bool>,
1111
}
1212

1313
impl Context {
14-
pub fn new(storage: Storage) -> Self {
14+
pub fn new(storage: StorageSegment) -> Self {
1515
Self {
1616
storage,
1717
now: Cell::new(false),
1818
}
1919
}
2020

21+
#[allow(dead_code)]
22+
pub fn is_in_slot(&self, hash: u16) -> bool {
23+
self.storage.is_in_slot(hash)
24+
}
25+
26+
pub fn slot_nb(&self, _hash: u16) -> Option<usize> {
27+
todo!()
28+
// self.storage.slot_nb(hash)
29+
}
30+
2131
#[inline]
2232
pub fn now(&self) -> Instant {
2333
let now = self.now.get();

0 commit comments

Comments
 (0)