Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A lot of stuff & optimize storage #7

Merged
merged 26 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
339 changes: 210 additions & 129 deletions Cargo.lock

Large diffs are not rendered by default.

23 changes: 18 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,14 @@ between our two instances, as for Redis & Dragonfly.

To be able to max out performances out of an application we must be able to have
a linear-scalability.[^1] Usual issues around scalability are when you share
data between threads, (like false-sharing[^3]). To solve this issue, we will use
a shared-nothing architecture (right now we aren't due to some APIs not
implemented yet) where each thread will own his own slice of the storage.
data between threads, (like false-sharing[^3]).

To solve this issue we use an
[scc::Hashmap](https://github.com/wvwwvwwv/scalable-concurrent-containers#HashMap)
which is a really efficient datastructure. This datastructure can be shared
across multiple thread without having a loss in performance expect when have too
many thread. When this happens, we'll partition the storage by load-balancing
TCP connection on those threads when there is a need to.

We also use a runtime which is based on `io-uring` to handle every I/O on the
application: [monoio](https://github.com/bytedance/monoio/).
Expand All @@ -83,8 +88,16 @@ Memcached running on commodity hardware and Linux.*"[^2]

### Storage

We use
[scc::Hashmap](https://github.com/wvwwvwwv/scalable-concurrent-containers#HashMap) behind an `Arc` for now while Sharding APIs are not implemented on [monoio](https://github.com/bytedance/monoio/issues/213) but as soon as we have a way to load-balance our TCP Connection to the proper thread, we should switch to a `scc::Hashmap` per thread.
For the storage, instead of having each thread handling his part of the storage
with a load balancing based on TCP connection, it seems it's more efficient to
have a storage shared between a number of threads.

We split the whole application into a number of Storage Segment which are shared
between a fixed number of thread.

<p align="center">
<img src="./docs/storage.svg" width="60%" />
</p>

## References

Expand Down
10 changes: 9 additions & 1 deletion app/roster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,23 @@ default = []
anyhow.workspace = true
atoi_simd = "0.15"
coarsetime = "0.1"
crc = "3"
bytestring = "1.3"
bytes.workspace = true
cfg-if = "1"
config.workspace = true
derive_builder.workspace = true
dotenv.workspace = true
futures = "0.3"
local-sync = "0.1"
monoio = { workspace = true, features = ["bytes", "zero-copy", "iouring"] }
monoio = { workspace = true, features = ["bytes", "sync", "iouring"] }
rustc-hash = "1.1.0"
scc = "2"
sharded-thread = "1"
serde.workspace = true
thiserror = "1"
rand = "0.8"
zstd = "0.13"

# Logging
tracing = { workspace = true, features = ["attributes"] }
Expand All @@ -60,6 +65,9 @@ tokio = { version = "1.35", features = [
"net",
] }

[profile.release]
debug = true

[[bench]]
name = "parsing"
harness = false
Expand Down
5 changes: 5 additions & 0 deletions app/roster/src/application/server/cmd/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::CommandExecution;
use crate::application::server::connection::WriteConnection;
use crate::application::server::context::Context;
use crate::application::server::frame::Frame;
use crate::infrastructure::hash::crc_hash;

/// Get the value of key. If the key does not exist the special value nil is
/// returned. An error is returned if the value stored at key is not a string,
Expand Down Expand Up @@ -40,4 +41,8 @@ impl CommandExecution for Get {

Ok(())
}

fn hash_key(&self) -> Option<u16> {
Some(crc_hash(self.key.as_bytes()))
}
}
33 changes: 27 additions & 6 deletions app/roster/src/application/server/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,23 @@ pub enum Command {
}

pub trait CommandExecution: Sized {
/// Apply a command or a subcommand.
/// Apply the command to the specified `Db` instance.
///
/// The response is written to `dst`. This is called by the server in order
/// to execute a received command.
async fn apply(
self,
dst: &mut WriteConnection,
ctx: Context,
) -> anyhow::Result<()>;

/// Give the hash_key for the specified command
/// it'll tell us where we should send this command based on the hash
///
/// https://redis.io/docs/reference/cluster-spec/#key-distribution-model
fn hash_key(&self) -> Option<u16> {
None
}
}

pub trait SubcommandRegistry {
Expand Down Expand Up @@ -96,12 +107,10 @@ impl Command {
// The command has been successfully parsed
Ok(command)
}
}

/// Apply the command to the specified `Db` instance.
///
/// The response is written to `dst`. This is called by the server in order
/// to execute a received command.
pub(crate) async fn apply(
impl CommandExecution for Command {
async fn apply(
self,
dst: &mut WriteConnection,
ctx: Context,
Expand All @@ -116,4 +125,16 @@ impl Command {
Get(cmd) => cmd.apply(dst, ctx).await,
}
}

fn hash_key(&self) -> Option<u16> {
use Command::*;

match self {
Ping(cmd) => cmd.hash_key(),
Unknown(cmd) => cmd.hash_key(),
Client(cmd) => cmd.hash_key(),
Set(cmd) => cmd.hash_key(),
Get(cmd) => cmd.hash_key(),
}
}
}
15 changes: 10 additions & 5 deletions app/roster/src/application/server/cmd/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::application::server::connection::WriteConnection;
use crate::application::server::context::Context;
use crate::application::server::frame::Frame;
use crate::domain::storage::SetOptions;
use crate::infrastructure::hash::crc_hash;

/// Set key to hold the string value. If key already holds a value, it is
/// overwritten, regardless of its type.
Expand Down Expand Up @@ -126,11 +127,11 @@ impl CommandExecution for Set {
let expired = self.expire.map(|dur| ctx.now() + dur.into());

// let now = Instant::now();
let response = match ctx
.storage
.set_async(self.key, self.value, SetOptions { expired })
.await
{
let response = match ctx.storage.set_async(
self.key,
self.value,
SetOptions { expired },
) {
Ok(_) => Frame::Simple(OK_STR.clone()),
Err(_) => Frame::Null,
};
Expand All @@ -144,4 +145,8 @@ impl CommandExecution for Set {

Ok(())
}

fn hash_key(&self) -> Option<u16> {
Some(crc_hash(self.key.as_bytes()))
}
}
23 changes: 22 additions & 1 deletion app/roster/src/application/server/connection.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fmt::Debug;
use std::io::{self, Cursor};

use bytes::BytesMut;
Expand Down Expand Up @@ -33,6 +34,12 @@ pub struct ReadConnection {
buffer: BytesMut,
}

impl Debug for ReadConnection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ReadConnection")
}
}

impl ReadConnection {
/// Read a single `Frame` value from the underlying stream.
///
Expand Down Expand Up @@ -116,7 +123,9 @@ impl ReadConnection {
// left to `BytesMut`. This is often done by moving an internal
// cursor, but it may be done by reallocating and copying data.
// self.buffer.advance(len);
self.buffer.reserve(4 * 1024);
if self.buffer.len() < 1024 {
self.buffer.reserve(4 * 1024);
}

// Return the parsed frame to the caller.
Ok(Some(frame))
Expand All @@ -135,6 +144,10 @@ impl ReadConnection {
Err(e) => Err(e.into()),
}
}

pub fn into_inner(self) -> OwnedReadHalf<TcpStream> {
self.stream_r.into_inner()
}
}

impl WriteConnection {
Expand Down Expand Up @@ -244,4 +257,12 @@ impl WriteConnection {

Ok(())
}

pub fn into_inner(self) -> OwnedWriteHalf<TcpStream> {
self.stream_w.into_inner()
}

pub fn reunite(self, read: ReadConnection) -> TcpStream {
self.into_inner().reunite(read.into_inner()).unwrap()
}
}
16 changes: 13 additions & 3 deletions app/roster/src/application/server/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,32 @@ use std::cell::Cell;

use coarsetime::Instant;

use crate::domain::storage::Storage;
use crate::domain::storage::StorageSegment;

#[derive(Clone)]
pub struct Context {
pub storage: Storage,
pub storage: StorageSegment,
now: Cell<bool>,
}

impl Context {
pub fn new(storage: Storage) -> Self {
pub fn new(storage: StorageSegment) -> Self {
Self {
storage,
now: Cell::new(false),
}
}

#[allow(dead_code)]
pub fn is_in_slot(&self, hash: u16) -> bool {
self.storage.is_in_slot(hash)
}

pub fn slot_nb(&self, _hash: u16) -> Option<usize> {
todo!()
// self.storage.slot_nb(hash)
}

#[inline]
pub fn now(&self) -> Instant {
let now = self.now.get();
Expand Down
Loading