Skip to content

Commit

Permalink
Fixing merge conflicts with random seed. Updating defrag test to use …
Browse files Browse the repository at this point in the history
…mexists and add_items_till_capacity. Refactored metric tracking as well to reduce code repetition

Signed-off-by: zackcam <[email protected]>
  • Loading branch information
zackcam committed Dec 8, 2024
1 parent 5285813 commit 29ae5a3
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 194 deletions.
14 changes: 4 additions & 10 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,9 @@ impl ValkeyDataType for BloomFilterType {
return None;
};
let is_seed_random = is_seed_random_u64 == 1;
let mut filters = if num_filters == 1 {
Vec::with_capacity(1)
} else {
Vec::new()
};

let mut filters = Vec::with_capacity(1);

for i in 0..num_filters {
let Ok(bitmap) = raw::load_string_buffer(rdb) else {
return None;
Expand Down Expand Up @@ -117,11 +115,6 @@ impl ValkeyDataType for BloomFilterType {
}
filters.push(Box::new(filter));
}
BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>()
+ (filters.capacity() * std::mem::size_of::<Box<BloomFilter>>()),
std::sync::atomic::Ordering::Relaxed,
);

BLOOM_NUM_OBJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let item = BloomFilterType {
Expand All @@ -130,6 +123,7 @@ impl ValkeyDataType for BloomFilterType {
is_seed_random,
filters,
};
item.bloom_filter_type_incr_metrics_on_new_create();
Some(item)
}

Expand Down
106 changes: 49 additions & 57 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
use bloomfilter::Bloom;
use bloomfilter::{deserialize, serialize};
use serde::{Deserialize, Deserializer, Serialize};
use std::{mem, sync::atomic::Ordering};
use std::sync::atomic::Ordering;

/// KeySpace Notification Events
pub const ADD_EVENT: &str = "bloom.add";
Expand Down Expand Up @@ -86,27 +86,23 @@ impl BloomFilterType {
return Err(BloomError::ExceedsMaxBloomSize);
}
metrics::BLOOM_NUM_OBJECTS.fetch_add(1, Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>(),
std::sync::atomic::Ordering::Relaxed,
);
// Create the bloom filter and add to the main BloomFilter object.
let bloom = match use_random_seed {
true => Box::new(BloomFilter::with_random_seed(fp_rate, capacity)),
false => Box::new(BloomFilter::with_fixed_seed(fp_rate, capacity, &configs::FIXED_SEED)),
false => Box::new(BloomFilter::with_fixed_seed(
fp_rate,
capacity,
&configs::FIXED_SEED,
)),
};
let filters = vec![bloom];
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>()
+ (filters.capacity() * std::mem::size_of::<Box<BloomFilter>>()),
std::sync::atomic::Ordering::Relaxed,
);
let bloom = BloomFilterType {
expansion,
fp_rate,
filters,
is_seed_random: use_random_seed,
};
bloom.bloom_filter_type_incr_metrics_on_new_create();
Ok(bloom)
}

Expand All @@ -118,11 +114,7 @@ impl BloomFilterType {
let new_filter = Box::new(BloomFilter::create_copy_from(filter));
filters.push(new_filter);
}
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>()
+ (filters.capacity() * std::mem::size_of::<Box<BloomFilter>>()),
std::sync::atomic::Ordering::Relaxed,
);
from_bf.bloom_filter_type_incr_metrics_on_new_create();
BloomFilterType {
expansion: from_bf.expansion,
fp_rate: from_bf.fp_rate,
Expand All @@ -131,10 +123,9 @@ impl BloomFilterType {
}
}

/// Return the total memory usage of the BloomFilterType object.
/// Return the total memory usage of the BloomFilterType object and every allocation it contains.
pub fn memory_usage(&self) -> usize {
let mut mem: usize = std::mem::size_of::<BloomFilterType>()
+ (self.filters.capacity() * std::mem::size_of::<Box<BloomFilter>>());
let mut mem: usize = self.bloom_filter_type_memory_usage();
for filter in &self.filters {
mem += filter.number_of_bytes();
}
Expand Down Expand Up @@ -223,30 +214,37 @@ impl BloomFilterType {
if validate_size_limit && !BloomFilter::validate_size(new_capacity, new_fp_rate) {
return Err(BloomError::ExceedsMaxBloomSize);
}
let mut new_filter = Box::new(BloomFilter::new(new_fp_rate, new_capacity));
let seed = self.seed();
let mut new_filter = Box::new(BloomFilter::with_fixed_seed(new_fp_rate, new_capacity, &seed));
let capacity_before: usize = self.filters.capacity();
let mut new_filter = Box::new(BloomFilter::with_fixed_seed(
new_fp_rate,
new_capacity,
&seed,
));
let memory_usage_before: usize = self.bloom_filter_type_memory_usage();
// Add item.
new_filter.set(item);
new_filter.num_items += 1;
self.filters.push(new_filter);
// If we went over capacity and scaled the vec out we need to update the memory usage by the new capacity
if capacity_before != self.filters.capacity() {
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
(self.filters.capacity() - capacity_before)
* std::mem::size_of::<Box<BloomFilter>>(),
std::sync::atomic::Ordering::Relaxed,
);
}
let memory_usage_after = self.bloom_filter_type_memory_usage();

metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
memory_usage_after - memory_usage_before,
std::sync::atomic::Ordering::Relaxed,
);
metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return Ok(1);
}
Ok(0)
}

/// Calculates the memory usage of a BloomFilterType object
fn bloom_filter_type_memory_usage(&self) -> usize {
std::mem::size_of::<BloomFilterType>()
+ (self.filters.capacity() * std::mem::size_of::<Box<BloomFilter>>())
}

/// Serializes bloomFilter to a byte array.
pub fn encode_bloom_filter(&self) -> Result<Vec<u8>, BloomError> {
match bincode::serialize(self) {
Expand All @@ -268,6 +266,14 @@ impl BloomFilterType {
}
}

/// Increments metrics related to Bloom filter memory usage upon creation of a new filter.
pub fn bloom_filter_type_incr_metrics_on_new_create(&self) {
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
self.bloom_filter_type_memory_usage(),
std::sync::atomic::Ordering::Relaxed,
);
}

/// Deserialize a byte array to bloom filter.
/// We will need to handle any current or previous version and deserializing the bytes into a bloom object of the running Module's current version `BLOOM_TYPE_VERSION`.
pub fn decode_bloom_filter(
Expand All @@ -286,8 +292,8 @@ impl BloomFilterType {
u32,
f64,
bool,
Vec<BloomFilter>,
) = match bincode::deserialize::<(u32, f64, bool, Vec<BloomFilter>)>(
Vec<Box<BloomFilter>>,
) = match bincode::deserialize::<(u32, f64, bool, Vec<Box<BloomFilter>>)>(
&decoded_bytes[1..],
) {
Ok(values) => {
Expand Down Expand Up @@ -326,25 +332,15 @@ impl BloomFilterType {
};
// add bloom filter type metrics.
metrics::BLOOM_NUM_OBJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>()
+ (item.filters.capacity() * std::mem::size_of::<Box<BloomFilter>>()),
std::sync::atomic::Ordering::Relaxed,
);
item.bloom_filter_type_incr_metrics_on_new_create();
// add bloom filter metrics.

for filter in &item.filters {
metrics::BLOOM_NUM_FILTERS_ACROSS_OBJECTS
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
filter.number_of_bytes(),
std::sync::atomic::Ordering::Relaxed,
);
metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS.fetch_add(
filter.num_items.into(),
std::sync::atomic::Ordering::Relaxed,
);
metrics::BLOOM_CAPACITY_ACROSS_OBJECTS
.fetch_add(filter.capacity.into(), std::sync::atomic::Ordering::Relaxed);
filter.bloom_filter_incr_metrics_on_new_create();
}
Ok(item)
}
Expand Down Expand Up @@ -388,20 +384,22 @@ impl BloomFilter {
num_items: 0,
capacity,
};
fltr.incr_metrics_on_new_create();
fltr.bloom_filter_incr_metrics_on_new_create();
fltr
}

/// Instantiate empty BloomFilter object with a randomly generated seed used to create sip keys.
pub fn with_random_seed(fp_rate: f64, capacity: u32) -> BloomFilter {
let bloom = bloomfilter::Bloom::new_for_fp_rate(capacity as usize, fp_rate)
.expect("We expect bloomfilter::Bloom<[u8]> creation to succeed");
let bloom = Box::new(
bloomfilter::Bloom::new_for_fp_rate(capacity as usize, fp_rate)
.expect("We expect bloomfilter::Bloom<[u8]> creation to succeed"),
);
let fltr = BloomFilter {
bloom,
num_items: 0,
capacity,
};
fltr.incr_metrics_on_new_create();
fltr.bloom_filter_incr_metrics_on_new_create();
fltr
}

Expand All @@ -415,7 +413,7 @@ impl BloomFilter {
num_items,
capacity,
};
fltr.incr_metrics_on_new_create();
fltr.bloom_filter_incr_metrics_on_new_create();
metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS
.fetch_add(num_items.into(), std::sync::atomic::Ordering::Relaxed);
fltr
Expand All @@ -426,7 +424,7 @@ impl BloomFilter {
BloomFilter::from_existing(&bf.bloom.to_bytes(), bf.num_items, bf.capacity)
}

fn incr_metrics_on_new_create(&self) {
fn bloom_filter_incr_metrics_on_new_create(&self) {
metrics::BLOOM_NUM_FILTERS_ACROSS_OBJECTS
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES
Expand Down Expand Up @@ -465,18 +463,12 @@ impl BloomFilter {
pub fn set(&mut self, item: &[u8]) {
self.bloom.set(item)
}

/// Create a new BloomFilter from an existing BloomFilter object (COPY command).
pub fn create_copy_from(bf: &BloomFilter) -> BloomFilter {
BloomFilter::from_existing(bf.bloom.as_slice(), bf.num_items, bf.capacity)
}
}

impl Drop for BloomFilterType {
fn drop(&mut self) {
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_sub(
std::mem::size_of::<BloomFilterType>()
+ (self.filters.capacity() * std::mem::size_of::<Box<BloomFilter>>()),
self.bloom_filter_type_memory_usage(),
std::sync::atomic::Ordering::Relaxed,
);
metrics::BLOOM_NUM_OBJECTS.fetch_sub(1, Ordering::Relaxed);
Expand Down
2 changes: 0 additions & 2 deletions src/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ pub const BLOOM_FP_RATE_MAX: f64 = 1.0;

pub const BLOOM_USE_RANDOM_SEED_DEFAULT: bool = true;

pub const BLOOM_USE_RANDOM_SEED_DEFAULT: bool = true;

pub const BLOOM_DEFRAG_DEAFULT: bool = true;
// Max Memory usage allowed per bloom filter within a bloom object (64MB).
// Beyond this threshold, a bloom object is classified as large and is exempt from defrag operations.
Expand Down
17 changes: 10 additions & 7 deletions src/wrapper/bloom_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::os::raw::{c_char, c_int, c_void};
use std::ptr::null_mut;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
use valkey_module::logging;
use valkey_module::logging::{log_io_error, ValkeyLogLevel};
use valkey_module::raw;
use valkey_module::{RedisModuleDefragCtx, RedisModuleString};
Expand Down Expand Up @@ -225,7 +226,7 @@ pub unsafe extern "C" fn bloom_defrag(
// Get the cursor for the BloomFilterType otherwise start the cursor at 0
let mut cursor: u64 = 0;
let defrag = Defrag::new(defrag_ctx);
defrag.curserget(&mut cursor);
defrag.cursorget(&mut cursor);

// Convert pointer to BloomFilterType so we can operate on it.
let bloom_filter_type: &mut BloomFilterType = &mut *(*value).cast::<BloomFilterType>();
Expand All @@ -234,9 +235,9 @@ pub unsafe extern "C" fn bloom_defrag(
let filters_capacity = bloom_filter_type.filters.capacity();

// While we are within a timeframe decided from should_stop_defrag and not over the number of filters defrag the next filter
while defrag.should_stop_defrag() == 0 && cursor < num_filters.try_into().unwrap() {
while !defrag.should_stop_defrag() && cursor < num_filters as u64 {
// Remove the current filter, unbox it, and attempt to defragment.
let bloom_filter_box = bloom_filter_type.filters.remove(cursor.try_into().unwrap());
let bloom_filter_box = bloom_filter_type.filters.remove(cursor as usize);
let bloom_filter = Box::into_raw(bloom_filter_box);
let defrag_result = defrag.alloc(bloom_filter as *mut c_void);
let mut defragged_filter = {
Expand All @@ -247,7 +248,9 @@ pub unsafe extern "C" fn bloom_defrag(
}
};
// Swap the Bloom object with a temporary one for defragmentation
let mut temporary_bloom = DEFRAG_BLOOM_FILTER.lock().unwrap();
let mut temporary_bloom = DEFRAG_BLOOM_FILTER
.lock()
.expect("We expect default to exist");
let inner_bloom = mem::replace(
&mut defragged_filter.bloom,
temporary_bloom.take().expect("We expect default to exist"),
Expand Down Expand Up @@ -276,13 +279,13 @@ pub unsafe extern "C" fn bloom_defrag(
// Reinsert the defragmented filter and increment the cursor
bloom_filter_type
.filters
.insert(cursor.try_into().unwrap(), defragged_filter);
.insert(cursor as usize, defragged_filter);
cursor += 1;
}
// Save the cursor for where we will start defragmenting from next time
defrag.curserset(cursor);
defrag.cursorset(cursor);
// If not all filters were looked at, return 1 to indicate incomplete defragmentation
if cursor < (num_filters).try_into().unwrap() {
if cursor < num_filters as u64 {
return 1;
}
// Defragment the Vec of filters itself
Expand Down
14 changes: 7 additions & 7 deletions src/wrapper/defrag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@ impl Defrag {

/// # Safety
///
/// This function sis temporary and will be removed once implemented in valkeymodule-rs .
pub unsafe fn curserset(&self, cursor: u64) -> i32 {
/// This function is temporary and will be removed once implemented in valkeymodule-rs .
pub unsafe fn cursorset(&self, cursor: u64) -> i32 {
unsafe { raw::RedisModule_DefragCursorSet.unwrap()(self.defrag_ctx, cursor) }
}

/// # Safety
///
/// This function sis temporary and will be removed once implemented in valkeymodule-rs .
pub unsafe fn curserget(&self, cursor: *mut u64) -> i32 {
/// This function is temporary and will be removed once implemented in valkeymodule-rs .
pub unsafe fn cursorget(&self, cursor: *mut u64) -> i32 {
unsafe { raw::RedisModule_DefragCursorGet.unwrap()(self.defrag_ctx, cursor) }
}

/// # Safety
///
/// This function sis temporary and will be removed once implemented in valkeymodule-rs .
pub unsafe fn should_stop_defrag(&self) -> i32 {
unsafe { raw::RedisModule_DefragShouldStop.unwrap()(self.defrag_ctx) }
/// This function is temporary and will be removed once implemented in valkeymodule-rs .
pub unsafe fn should_stop_defrag(&self) -> bool {
unsafe { raw::RedisModule_DefragShouldStop.unwrap()(self.defrag_ctx) != 0 }
}
}
Loading

0 comments on commit 29ae5a3

Please sign in to comment.