Skip to content

Commit 81f2af7

Browse files
committed
feat: solve issues with memory explosion
Signed-off-by: Anthony Griffon <[email protected]>
1 parent 9e75f78 commit 81f2af7

File tree

8 files changed

+91
-32
lines changed

8 files changed

+91
-32
lines changed

Cargo.lock

+45
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

app/roster/Cargo.toml

+5
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ scc = "2"
4040
sharded-thread = "1"
4141
serde.workspace = true
4242
thiserror = "1"
43+
rand = "0.8"
44+
zstd = "0.13"
4345

4446
# Logging
4547
tracing = { workspace = true, features = ["attributes"] }
@@ -63,6 +65,9 @@ tokio = { version = "1.35", features = [
6365
"net",
6466
] }
6567

68+
[profile.release]
69+
debug = true
70+
6671
[[bench]]
6772
name = "parsing"
6873
harness = false

app/roster/src/application/server/cmd/set.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -127,11 +127,11 @@ impl CommandExecution for Set {
127127
let expired = self.expire.map(|dur| ctx.now() + dur.into());
128128

129129
// let now = Instant::now();
130-
let response = match ctx
131-
.storage
132-
.set_async(self.key, self.value, SetOptions { expired })
133-
.await
134-
{
130+
let response = match ctx.storage.set_async(
131+
self.key,
132+
self.value,
133+
SetOptions { expired },
134+
) {
135135
Ok(_) => Frame::Simple(OK_STR.clone()),
136136
Err(_) => Frame::Null,
137137
};

app/roster/src/application/server/connection.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,9 @@ impl ReadConnection {
123123
// left to `BytesMut`. This is often done by moving an internal
124124
// cursor, but it may be done by reallocating and copying data.
125125
// self.buffer.advance(len);
126-
self.buffer.reserve(4 * 1024);
126+
if self.buffer.len() < 1024 {
127+
self.buffer.reserve(4 * 1024);
128+
}
127129

128130
// Return the parsed frame to the caller.
129131
Ok(Some(frame))

app/roster/src/application/server/handle.rs

+1
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ impl Handler {
123123

124124
let is_in_slot =
125125
hash.map(|hash| ctx.is_in_slot(hash)).unwrap_or(true);
126+
let is_in_slot = true;
126127

127128
if !is_in_slot {
128129
let hash = hash.unwrap();

app/roster/src/application/server/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ impl ServerConfig {
3434
pub fn initialize(self) -> ServerHandle {
3535
let mut threads = Vec::new();
3636
let cpus: usize = std::thread::available_parallelism().unwrap().into();
37+
// let cpus = 1;
3738

3839
// The mesh used to pass a whole connection if needed.
3940
let mesh =
@@ -45,6 +46,7 @@ impl ServerConfig {
4546
let main_dialer = RootDialer::new(mesh, &storage);
4647

4748
for cpu in 0..cpus {
49+
dbg!(cpu);
4850
// TODO(@miaxos): There are some links between those two, mb we
4951
// should modelise it again.
5052
let config = self.clone();

app/roster/src/domain/storage/mod.rs

+28-26
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
//! Storage primitive which is used to interact with Keys
22
33
use std::hash::BuildHasherDefault;
4+
use std::mem::size_of_val;
45
use std::sync::atomic::AtomicU32;
56
use std::sync::Arc;
67

78
use bytes::Bytes;
89
use bytestring::ByteString;
910
use coarsetime::Instant;
11+
use rand::random;
1012
use rustc_hash::FxHasher;
1113
use scc::HashMap;
1214

@@ -16,14 +18,14 @@ use crate::infrastructure::hash::HASH_SLOT_MAX;
1618
#[derive(Debug)]
1719
pub struct StorageValue {
1820
pub expired: Option<Instant>,
19-
pub val: Bytes,
21+
pub val: Vec<u8>,
2022
}
2123

2224
/// A [StorageSegment] is shared across multiple threads and owns a part of the
2325
/// hashing keys.
2426
#[derive(Debug, Clone)]
2527
pub struct StorageSegment {
26-
db: Arc<HashMap<ByteString, StorageValue, BuildHasherDefault<FxHasher>>>,
28+
db: Arc<HashMap<Vec<u8>, StorageValue, BuildHasherDefault<FxHasher>>>,
2729
slot: Slot,
2830
count: Arc<AtomicU32>,
2931
}
@@ -36,15 +38,17 @@ pub struct SetOptions {
3638
impl StorageSegment {
3739
/// Create a new [StorageSegment] by specifying the hash slot it handles.
3840
pub fn new(slot: Slot) -> Self {
39-
for _ in 0..4096 {
41+
let h = HashMap::with_capacity_and_hasher(
42+
2usize.pow(20),
43+
Default::default(),
44+
);
45+
46+
for _ in 0..(2usize.pow(20)) {
4047
drop(scc::ebr::Guard::new());
4148
}
4249

4350
Self {
44-
db: Arc::new(HashMap::with_capacity_and_hasher(
45-
4096,
46-
Default::default(),
47-
)),
51+
db: Arc::new(h),
4852
slot,
4953
count: Arc::new(AtomicU32::new(0)),
5054
}
@@ -54,36 +58,31 @@ impl StorageSegment {
5458
self.slot.contains(&i)
5559
}
5660

57-
/// Set a key
58-
pub async fn set_async(
61+
/// Set a key into the storage
62+
pub fn set_async(
5963
&self,
6064
key: ByteString,
6165
val: Bytes,
6266
opt: SetOptions,
6367
) -> Result<Option<StorageValue>, (String, StorageValue)> {
68+
let mut val = val.to_vec();
69+
val.shrink_to_fit();
70+
6471
let val = StorageValue {
6572
expired: opt.expired,
6673
val,
6774
};
6875

69-
let _old = self
76+
let mut key = key.into_bytes().to_vec();
77+
key.shrink_to_fit();
78+
79+
let old = self
7080
.count
7181
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
72-
/*
73-
// Simulate some eviction mechanisme when we have too many keys
74-
if old > 200_000 {
75-
// dbg!("remove");
76-
// TODO: If the RC is for the DB instead, we could have a spawn from
77-
// monoio for this task instead, it would save us some
78-
// time for the p99.9
79-
let count = self.count.clone();
80-
let db = self.db.clone();
81-
monoio::spawn(async move {
82-
db.retain_async(|_, _| false).await;
83-
count.swap(0, std::sync::atomic::Ordering::Relaxed);
84-
});
82+
83+
if old % 1_000_000 == 0 {
84+
dbg!(old);
8585
}
86-
*/
8786

8887
if let Err((key, val)) = self.db.insert(key, val) {
8988
let old = self.db.update(&key, |_, _| val);
@@ -101,6 +100,9 @@ impl StorageSegment {
101100
key: ByteString,
102101
now: Instant,
103102
) -> Option<Bytes> {
103+
let mut key = key.into_bytes().to_vec();
104+
key.shrink_to_fit();
105+
104106
match self.db.entry_async(key).await {
105107
scc::hash_map::Entry::Occupied(oqp) => {
106108
let val = oqp.get();
@@ -109,10 +111,10 @@ impl StorageSegment {
109111

110112
// TODO: Better handle expiration
111113
if is_expired {
112-
// oqp.remove()?;
114+
let _ = oqp.remove();
113115
None
114116
} else {
115-
Some(val.val.clone())
117+
Some(Bytes::from(val.val.clone()))
116118
}
117119
}
118120
scc::hash_map::Entry::Vacant(_) => None,

app/roster/src/infrastructure/config/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@ impl Cfg {
2424

2525
let settings = Config::builder()
2626
.add_source(config::File::with_name(&file_location))
27+
/*
2728
.add_source(
2829
config::Environment::with_prefix("ROSTER")
2930
.try_parsing(true)
3031
.separator("_"),
3132
)
33+
*/
3234
.build()?;
3335

3436
let config = settings.try_deserialize::<Cfg>()?;

0 commit comments

Comments
 (0)