Skip to content

Commit

Permalink
roll is done externally
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Mar 16, 2024
1 parent bdd759f commit 42ec47a
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 35 deletions.
55 changes: 20 additions & 35 deletions src/mem/arena.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::time::Duration;
#[cfg(not(test))]
use std::time::Instant;

#[cfg(test)]
use mock_instant::Instant;

/// 256 KiB.
#[cfg(not(test))]
Expand Down Expand Up @@ -54,49 +51,34 @@ pub struct Arena {
// Pitfall: If pages are requests way less often than 256 times per minutes,
// this arena may take way too much time to release its memory.
struct ArenaStats {
max_num_used_pages_former: usize,
max_num_used_pages_current: usize,
call_counter: u8,
next_window_start: Instant,
max_num_used_pages_former: AtomicUsize,
max_num_used_pages_current: AtomicUsize,
}

const WINDOW: Duration = Duration::from_secs(60);

impl Default for ArenaStats {
fn default() -> ArenaStats {
ArenaStats {
// We arbitrarily initialize num used pages former to 100.
max_num_used_pages_former: 0,
max_num_used_pages_current: 0,
call_counter: 0u8,
next_window_start: Instant::now(),
max_num_used_pages_former: AtomicUsize::new(0),
max_num_used_pages_current: AtomicUsize::new(0),
}
}
}

impl ArenaStats {
/// This method happens when we are changing time window.
fn roll(&mut self, now: Instant) {
self.max_num_used_pages_former = self.max_num_used_pages_current;
self.max_num_used_pages_current = 0;
self.next_window_start = now + WINDOW;
fn roll(&mut self,) {
let max_num_used_page_former = self.max_num_used_pages_current.load(Ordering::Relaxed);
self.max_num_used_pages_former.store(max_num_used_page_former, Ordering::Relaxed);
self.max_num_used_pages_current.store(0, Ordering::Relaxed);
}

/// Records the number of used pages, and returns an estimation of the maximum number of pages
/// in the last 5 minutes.
pub fn record_num_used_page(&mut self, num_used_pages: usize) -> usize {
// The only function of the call counter is to avoid calling `Instant::now()`
// at every single call.
self.call_counter = (self.call_counter + 1) % 64;
if self.call_counter == 0u8 {
let now = Instant::now();
if now > self.next_window_start {
self.roll(now);
}
}
self.max_num_used_pages_current = self.max_num_used_pages_current.max(num_used_pages);
self.max_num_used_pages_former
.max(self.max_num_used_pages_current)
let max_num_used_pages = self.max_num_used_pages_current.load(Ordering::Relaxed).max(num_used_pages);
self.max_num_used_pages_current.store(max_num_used_pages, Ordering::Relaxed);
self.max_num_used_pages_former.load(Ordering::Relaxed).max(max_num_used_pages)
}
}

Expand All @@ -105,20 +87,17 @@ impl Arena {
pub fn acquire_page(&mut self) -> PageId {
if let Some(page_id) = self.free_page_ids.pop() {
assert!(self.pages[page_id.0].is_some());
self.gc();
return page_id;
}
let page: Page = vec![0u8; PAGE_SIZE].into_boxed_slice();
if let Some(free_slot) = self.free_slots.pop() {
let slot = &mut self.pages[free_slot.0];
assert!(slot.is_none());
*slot = Some(page);
self.gc();
free_slot
} else {
let new_page_id = self.pages.len();
self.pages.push(Some(page));
self.gc();
PageId(new_page_id)
}
}
Expand All @@ -139,7 +118,13 @@ impl Arena {
self.gc();
}

/// `gc` releases memory by deallocating ALL of the free pages.
/// Clients are expected roll the stats regularly.
pub fn roll(&mut self,) {
self.stats.roll();
self.gc();
}

/// `gc` releases memory by some of the free pages.
pub fn gc(&mut self) {
let num_used_pages = self.num_used_pages();
let max_used_num_pages_in_last_5_min = self.stats.record_num_used_page(num_used_pages);
Expand Down
5 changes: 5 additions & 0 deletions src/mem/queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ impl MemQueues {
Some(queue.truncate_up_to_included(position, &mut self.arena))
}

pub fn roll_and_gc(&mut self,) {
self.arena.roll();
self.arena.gc();
}

/// Return a tuple of (size, capacity) of memory used by the memqueues
pub fn size(&self) -> (usize, usize) {
let size = self
Expand Down
4 changes: 4 additions & 0 deletions src/multi_record_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,10 @@ impl MultiRecordLog {
Ok(removed_count)
}

pub fn memory_gc(&mut self) {
self.in_mem_queues.roll_and_gc();
}

fn run_gc_if_necessary(&mut self) -> io::Result<()> {
debug!("run_gc_if_necessary");
if self
Expand Down

0 comments on commit 42ec47a

Please sign in to comment.