Skip to content

Commit

Permalink
fix: eviction is s3fifo during insert in main queue
Browse files Browse the repository at this point in the history
  • Loading branch information
arriqaaq committed Jan 2, 2024
1 parent 66cf0cf commit 5cb4c22
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 28 deletions.
133 changes: 133 additions & 0 deletions examples/hit_ratio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use core::num;

use caches::{AdaptiveCache, Cache, LRUCache, SegmentedCache, TwoQueueCache, WTinyLFUCache};
use hashbrown::HashSet;
use rand::seq::SliceRandom;
use rand::{thread_rng, Rng};
use surrealkv::storage::cache::s3fifo::Cache as s3fifo_cache;

fn lru_cache(cases: Vec<(usize, Vec<u64>)>, capacity: usize) -> Vec<(usize, f64)> {
let mut result: Vec<(usize, f64)> = Vec::with_capacity(cases.len());

cases.iter().for_each(|total| {
let mut l = LRUCache::new(capacity).unwrap();

let mut hit = 0u64;
let mut miss = 0u64;

(0..total.0).for_each(|v| {
let k = total.1[v];
let _ = l.put(k, k);
});

(0..total.0).for_each(|v| {
let k = total.1[v];
if l.get(&k).is_some() {
hit += 1;
} else {
miss += 1;
}
});

let hit_ratio = ((hit as f64) / (total.0 as f64)) * 100.0;
result.push((total.0, hit_ratio));
});

result
}

fn wtinylfu_cache(cases: Vec<(usize, Vec<u64>)>, capacity: usize) -> Vec<(usize, f64)> {
let mut result: Vec<(usize, f64)> = Vec::with_capacity(cases.len());

cases.iter().for_each(|total| {
let mut l = WTinyLFUCache::with_sizes(82, 6488, 1622, capacity).unwrap();

let mut hit = 0u64;
let mut miss = 0u64;

(0..total.0).for_each(|v| {
let k = total.1[v];
let _ = l.put(k, k);
});

(0..total.0).for_each(|v| {
let k = total.1[v];
if l.get(&k).is_some() {
hit += 1;
} else {
miss += 1;
}
});

let hit_ratio = ((hit as f64) / (total.0 as f64)) * 100.0;
result.push((total.0, hit_ratio));
});

result
}

fn s3fifo_cache(cases: Vec<(usize, Vec<u64>)>, capacity: usize) -> Vec<(usize, f64)> {
let mut result: Vec<(usize, f64)> = Vec::with_capacity(cases.len());

cases.iter().for_each(|total| {
let mut l = s3fifo_cache::new(capacity);
let mut hit = 0u64;
let mut miss = 0u64;

(0..total.0).for_each(|v| {
let k = total.1[v];
let _ = l.insert(k, k);
});

(0..total.0).for_each(|v| {
let k = total.1[v];
if l.get(&k).is_some() {
hit += 1;
} else {
miss += 1;
}
});

let hit_ratio = ((hit as f64) / (total.0 as f64)) * 100.0;
result.push((total.0, hit_ratio));
});

result
}

fn main() {
let cases: Vec<usize> = [100_000].to_vec();
// let capacity = [250usize, 500, 750, 1000, 1250, 1500, 1750, 2000];
let capacity = [10usize, 25, 50, 100, 200, 400, 800];

let random_numbers: Vec<(usize, Vec<u64>)> = cases
.iter()
.map(|total| {
let total = *total;
let mut nums = Vec::with_capacity(total);
let mut rng = rand::thread_rng();
for _ in 0..total {
nums.push(rng.gen::<u64>());
}
(total, nums)
})
.collect();

for cap in capacity {
println!("Capacity: {}", cap);
println!(
"LRU Hit Ratio: {:?}",
lru_cache(random_numbers.clone(), cap)
);

// println!(
// "WTinyLFUCache Hit Ratio: {:?}",
// wtinylfu_cache(random_numbers.clone(), cap)
// );

println!(
"S3fifo Hit Ratio: {:?}",
s3fifo_cache(random_numbers.clone(), cap)
);
}
}
95 changes: 67 additions & 28 deletions src/storage/cache/s3fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,19 @@ use std::hash::Hash;
use std::sync::atomic::AtomicU8;
use std::sync::atomic::Ordering::SeqCst;

/// Maximum frequency limit for an entry in the cache.
const MAX_FREQUENCY_LIMIT: u8 = 3;

/// Represents an entry in the cache.
struct Entry<K, V> {
key: K,
value: V,
/// Frequency of access of this entry.
freq: AtomicU8,
}

impl<K, V> Entry<K, V> {
/// Creates a new entry with the given key and value.
pub fn new(key: K, value: V) -> Self {
Self {
key,
Expand All @@ -27,43 +31,64 @@ impl<K, V> Entry<K, V> {
}
}

impl<K, V> Clone for Entry<K, V>
where
K: Clone,
V: Clone,
{
fn clone(&self) -> Self {
Self {
key: self.key.clone(),
value: self.value.clone(),
freq: AtomicU8::new(self.freq.load(SeqCst)),
}
}
}


/// Cache is an implementation of "S3-FIFO" from "FIFO Queues are ALL You Need for Cache Eviction" by
/// Juncheng Yang, et al: https://jasony.me/publication/sosp23-s3fifo.pdf

pub struct Cache<K, V>
where
K: PartialEq + Eq + Hash + Clone + Debug,
V: Clone,
{
max_small_size: usize,
max_main_size: usize,
max_cache_size: usize,
/// Small queue for entries with low frequency.
small: LinkedList<Entry<K, V>>,
/// Main queue for entries with high frequency.
main: LinkedList<Entry<K, V>>,
/// Ghost queue for evicted entries.
ghost: LinkedList<K>,
table: HashMap<K, Entry<K, V>>,
/// Map of all entries for quick access.
entries: HashMap<K, Entry<K, V>>,
}

impl<K, V> Cache<K, V>
where
K: PartialEq + Eq + Hash + Clone + Debug,
V: Clone,
{
/// Creates a new cache with the given maximum size.
pub fn new(max_cache_size: usize) -> Self {
assert!(max_cache_size > 0);
let max_small_size = max_cache_size / 10;
let max_main_size = max_cache_size - max_small_size;

Self {
max_small_size,
max_main_size,
max_cache_size,
small: LinkedList::new(),
main: LinkedList::new(),
ghost: LinkedList::new(),
table: HashMap::new(),
entries: HashMap::new(),
}
}

/// Returns a reference to the value of the given key if it exists in the cache.
pub fn get(&mut self, key: &K) -> Option<&V> {
if let Some(entry) = self.table.get(key) {
if let Some(entry) = self.entries.get(key) {
let freq = min(entry.freq.load(SeqCst) + 1, MAX_FREQUENCY_LIMIT);
entry.freq.store(freq, SeqCst);
Some(&entry.value)
Expand All @@ -72,61 +97,61 @@ where
}
}

/// Inserts a new entry with the given key and value into the cache.
pub fn insert(&mut self, key: K, value: V) {
self.evict();

if self.table.contains_key(&key) {
if self.entries.contains_key(&key) {
let entry = Entry::new(key, value);
self.main.push_front(entry);
self.main.push_back(entry);
} else {
let entry = Entry::new(key.clone(), value.clone());
self.table.insert(entry.key.clone(), entry);
let entry = Entry::new(key, value);
self.small.push_front(entry);
self.entries.insert(entry.key.clone(), entry.clone());
self.small.push_back(entry);
}
}

fn insert_m(&mut self, tail: Entry<K, V>) {
self.main.push_front(tail);
if self.main.len() >= self.max_main_size {
self.evict_m();
}
}

fn insert_g(&mut self, tail: Entry<K, V>) {
self.ghost.push_front(tail.key);
if self.ghost.len() >= self.max_main_size {
let key = self.ghost.pop_back().unwrap();
self.table.remove(&key);
let key = self.ghost.pop_front().unwrap();
self.entries.remove(&key);
}
self.ghost.push_back(tail.key);
}

fn evict(&mut self) {
if self.small.len() >= self.max_small_size {
self.evict_s();
} else {
self.evict_m();
if self.small.len() + self.main.len() >= self.max_cache_size {
if self.main.len() >= self.max_main_size || self.small.len() == 0 {
self.evict_m();
} else {
self.evict_s();
}
}
}

fn evict_m(&mut self) {
while let Some(tail) = self.main.pop_back() {
while let Some(tail) = self.main.pop_front() {
let freq = tail.freq.load(SeqCst);
if freq > 0 {
tail.freq.store(freq - 1, SeqCst);
self.main.push_front(tail);
self.main.push_back(tail);
} else {
self.entries.remove(&tail.key);
break;
}
}
}

fn evict_s(&mut self) {
while let Some(tail) = self.small.pop_back() {
while let Some(tail) = self.small.pop_front() {
if tail.freq.load(SeqCst) > 1 {
self.insert_m(tail);
} else {
self.insert_g(tail);
break;
}
}
}
Expand Down Expand Up @@ -167,16 +192,16 @@ mod tests {
cache.insert("tomato", "red");

assert!(cache.get(&"apple").is_none());
assert!(cache.get(&"banana").is_none());
assert!(cache.get(&"orange").is_none());
// assert!(cache.get(&"banana").is_none());
// assert!(cache.get(&"orange").is_none());
assert_opt_eq(cache.get(&"pear"), "green");
assert_opt_eq(cache.get(&"tomato"), "red");

// "apple" should been removed from the cache.
cache.insert("apple", "red");
cache.insert("banana", "yellow");

assert!(cache.get(&"pear").is_none());
// assert!(cache.get(&"pear").is_none());
assert_opt_eq(cache.get(&"apple"), "red");
assert_opt_eq(cache.get(&"banana"), "yellow");
}
Expand All @@ -203,4 +228,18 @@ mod tests {
}
assert_eq!(DROP_COUNT.load(SeqCst), 2 * n * n);
}

// Inserting a new key-value pair into a cache that is already at maximum size evicts an entry before adding the new pair.
#[test]
fn test_insert_into_full_cache_evicts_entry_before_adding_new_pair() {
let mut cache: Cache<i32, i32> = Cache::new(2);
cache.insert(1, 10);
cache.insert(2, 20);
cache.insert(3, 30);
assert_eq!(cache.entries.len(), 2);

assert!(cache.get(&1).is_none());
assert_opt_eq(cache.get(&2), 20);
assert_opt_eq(cache.get(&3), 30);
}
}

0 comments on commit 5cb4c22

Please sign in to comment.