diff --git a/Cargo.toml b/Cargo.toml index 431cd18..d205dbc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,6 @@ valkey-module-macros = "0" linkme = "0" bloomfilter = { version = "3.0.1", features = ["serde"] } lazy_static = "1.4.0" -bit-vec = "0.8.0" libc = "0.2" serde = { version = "1.0", features = ["derive"] } bincode = "1.3" @@ -34,10 +33,5 @@ opt-level = 0 debug = 2 debug-assertions = true -[profile.release] -opt-level = 0 -debug = 2 -debug-assertions = true - [features] enable-system-alloc = ["valkey-module/enable-system-alloc"] diff --git a/build.sh b/build.sh index 1b027e0..d8d6781 100755 --- a/build.sh +++ b/build.sh @@ -16,7 +16,7 @@ echo "Running cargo build release..." cargo build --all --all-targets --release echo "Running unit tests..." -# cargo test --features enable-system-alloc +cargo test --features enable-system-alloc # Ensure SERVER_VERSION environment variable is set if [ -z "$SERVER_VERSION" ]; then diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index 13b367a..1c5f273 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -76,7 +76,6 @@ impl ValkeyDataType for BloomFilterType { } else { Vec::new() }; - // let mut filters = Vec::new(); for i in 0..num_filters { let Ok(bitmap) = raw::load_string_buffer(rdb) else { return None; @@ -108,13 +107,14 @@ impl ValkeyDataType for BloomFilterType { }; let filter = BloomFilter::from_existing(bitmap.as_ref(), num_items as u32, capacity as u32); - filters.push(filter); + filters.push(Box::new(filter)); } BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add( mem::size_of::() + (filters.capacity() * std::mem::size_of::>()), std::sync::atomic::Ordering::Relaxed, ); + BLOOM_NUM_OBJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let item = BloomFilterType { expansion: expansion as u32, diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index 21027a9..09608a8 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -5,11 +5,10 @@ use crate::{ }, metrics, }; -use bloomfilter; +use bloomfilter::Bloom; use bloomfilter::{deserialize, serialize}; -use serde::{Deserialize, Serialize}; -use std::{mem, os::raw::c_void, sync::atomic::Ordering}; -use valkey_module::{logging, raw}; +use serde::{Deserialize, Deserializer, Serialize}; +use std::{mem, sync::atomic::Ordering}; /// KeySpace Notification Events pub const ADD_EVENT: &str = "bloom.add"; @@ -106,8 +105,6 @@ impl BloomFilterType { /// Create a new BloomFilterType object from an existing one. pub fn create_copy_from(from_bf: &BloomFilterType) -> BloomFilterType { - let mut filters: Vec = Vec::with_capacity(from_bf.filters.len()); - let mut filters: Vec> = Vec::with_capacity(from_bf.filters.len()); let mut filters: Vec> = Vec::with_capacity(from_bf.filters.capacity()); metrics::BLOOM_NUM_OBJECTS.fetch_add(1, Ordering::Relaxed); for filter in &from_bf.filters { @@ -128,7 +125,8 @@ impl BloomFilterType { /// Return the total memory usage of the BloomFilterType object. pub fn memory_usage(&self) -> usize { - let mut mem: usize = std::mem::size_of::(); + let mut mem: usize = std::mem::size_of::() + + (self.filters.capacity() * std::mem::size_of::>()); for filter in &self.filters { mem += filter.number_of_bytes(); } @@ -208,7 +206,6 @@ impl BloomFilterType { if validate_size_limit && !BloomFilter::validate_size(new_capacity, new_fp_rate) { return Err(BloomError::ExceedsMaxBloomSize); } - let mut new_filter = BloomFilter::new(new_fp_rate, new_capacity); let mut new_filter = Box::new(BloomFilter::new(new_fp_rate, new_capacity)); let capacity_before = self.filters.capacity(); // Add item. @@ -244,6 +241,7 @@ impl BloomFilterType { } } + /// Calculate the false positive rate for the Nth filter using tightening ratio. pub fn calculate_fp_rate(fp_rate: f64, num_filters: i32) -> Result { match fp_rate * configs::TIGHTENING_RATIO.powi(num_filters) { x if x > f64::MIN_POSITIVE => Ok(x), @@ -265,9 +263,10 @@ impl BloomFilterType { 1 => { // always use new version to init bloomFilterType. // This is to ensure that the new fields can be recognized when the object is serialized and deserialized in the future. - let (expansion, fp_rate, filters): (u32, f64, Vec) = - match bincode::deserialize::<(u32, f64, Vec)>(&decoded_bytes[1..]) - { + let (expansion, fp_rate, filters): (u32, f64, Vec>) = + match bincode::deserialize::<(u32, f64, Vec>)>( + &decoded_bytes[1..], + ) { Ok(values) => { if !(BLOOM_EXPANSION_MIN..=BLOOM_EXPANSION_MAX).contains(&values.0) { return Err(BloomError::BadExpansion); @@ -339,8 +338,6 @@ impl BloomFilterType { /// well within the u32::MAX limit. #[derive(Serialize, Deserialize)] pub struct BloomFilter { - #[serde(serialize_with = "serialize", deserialize_with = "deserialize")] - pub bloom: bloomfilter::Bloom<[u8]>, #[serde( serialize_with = "serialize", deserialize_with = "deserialize_boxed_bloom" @@ -350,9 +347,6 @@ pub struct BloomFilter { pub capacity: u32, } -use bloomfilter::Bloom; -use serde::Deserializer; - pub fn deserialize_boxed_bloom<'de, D>(deserializer: D) -> Result>, D::Error> where D: Deserializer<'de>, @@ -368,7 +362,6 @@ impl BloomFilter { fp_rate, &configs::FIXED_SEED, ) - .unwrap() .expect("We expect bloomfilter::Bloom<[u8]> creation to succeed"); let fltr = BloomFilter { bloom: Box::new(bloom), @@ -406,7 +399,6 @@ impl BloomFilter { } pub fn number_of_bytes(&self) -> usize { - std::mem::size_of::() + (self.bloom.len() / 8) as usize std::mem::size_of::() + std::mem::size_of::>() + (self.bloom.len() / 8) as usize @@ -434,7 +426,6 @@ impl BloomFilter { /// Create a new BloomFilter from an existing BloomFilter object (COPY command). pub fn create_copy_from(bf: &BloomFilter) -> BloomFilter { - BloomFilter::from_existing(&bf.bloom.to_bytes(), bf.num_items, bf.capacity) BloomFilter::from_existing(bf.bloom.as_slice(), bf.num_items, bf.capacity) } } @@ -466,10 +457,6 @@ impl Drop for BloomFilter { mod tests { use super::*; use configs::FIXED_SEED; - use crate::configs::{ - FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B, FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B, - }; - use configs::FIXED_SEED; use rand::{distributions::Alphanumeric, Rng}; /// Returns random string with specified number of characters. @@ -584,8 +571,6 @@ mod tests { .any( |filter| (filter.bloom.seed() == restore_filter.bloom.seed()) && (restore_filter.bloom.seed() == FIXED_SEED) - |filter| (filter.bloom.seed() == restore_filter.bloom.seed()) - && (restore_filter.bloom.seed() == FIXED_SEED) ))); assert!(restored_bloom_filter_type .filters @@ -602,7 +587,6 @@ mod tests { .filters .iter() .any(|filter| filter.bloom.as_slice() == restore_filter.bloom.as_slice()))); - .any(|filter| filter.bloom.as_slice() == restore_filter.bloom.as_slice()))); let (error_count, _) = check_items_exist( restored_bloom_filter_type, 1, @@ -753,8 +737,6 @@ mod tests { let test_bloom_filter = BloomFilter::new(0.5_f64, 1000_u32); let seed = test_bloom_filter.bloom.seed(); assert_eq!(seed, FIXED_SEED); - let test_sip_keys = test_bloom_filter.bloom.seed(); - assert!(test_sip_keys == FIXED_SEED); } #[test] diff --git a/src/wrapper/bloom_callback.rs b/src/wrapper/bloom_callback.rs index f1d0795..9cf015b 100644 --- a/src/wrapper/bloom_callback.rs +++ b/src/wrapper/bloom_callback.rs @@ -4,8 +4,6 @@ use crate::bloom::utils::BloomFilter; use crate::bloom::utils::BloomFilterType; use crate::configs; use crate::wrapper::digest::Digest; -use crate::configs; -use bit_vec::BitVec; use bloomfilter::Bloom; use lazy_static::lazy_static; use std::ffi::CString; @@ -14,7 +12,6 @@ use std::os::raw::{c_char, c_int, c_void}; use std::ptr::null_mut; use std::sync::atomic::Ordering; use std::sync::Mutex; -use valkey_module::logging; use valkey_module::logging::{log_io_error, ValkeyLogLevel}; use valkey_module::raw; use valkey_module::{RedisModuleDefragCtx, RedisModuleString}; diff --git a/src/wrapper/defrag.rs b/src/wrapper/defrag.rs index 6f65ea4..38027ef 100644 --- a/src/wrapper/defrag.rs +++ b/src/wrapper/defrag.rs @@ -13,28 +13,28 @@ impl Defrag { /// # Safety /// - /// This function should not be called before the horsemen are ready. + /// This function is temporary and will be removed once implemented in valkeymodule-rs . pub unsafe fn alloc(&self, ptr: *mut c_void) -> *mut c_void { unsafe { raw::RedisModule_DefragAlloc.unwrap()(self.defrag_ctx, ptr) } } /// # Safety /// - /// This function should not be called before the horsemen are ready. + /// This function sis temporary and will be removed once implemented in valkeymodule-rs . pub unsafe fn curserset(&self, cursor: u64) -> i32 { unsafe { raw::RedisModule_DefragCursorSet.unwrap()(self.defrag_ctx, cursor) } } /// # Safety /// - /// This function should not be called before the horsemen are ready. + /// This function sis temporary and will be removed once implemented in valkeymodule-rs . pub unsafe fn curserget(&self, cursor: *mut u64) -> i32 { unsafe { raw::RedisModule_DefragCursorGet.unwrap()(self.defrag_ctx, cursor) } } /// # Safety /// - /// This function should not be called before the horsemen are ready. + /// This function sis temporary and will be removed once implemented in valkeymodule-rs . pub unsafe fn should_stop_defrag(&self) -> i32 { unsafe { raw::RedisModule_DefragShouldStop.unwrap()(self.defrag_ctx) } } diff --git a/src/wrapper/mod.rs b/src/wrapper/mod.rs index 1027fa1..5f2d675 100644 --- a/src/wrapper/mod.rs +++ b/src/wrapper/mod.rs @@ -1,3 +1,3 @@ pub mod bloom_callback; -pub mod digest; pub mod defrag; +pub mod digest; diff --git a/tests/test_bloom_metrics.py b/tests/test_bloom_metrics.py index 7aec3a7..09659a7 100644 --- a/tests/test_bloom_metrics.py +++ b/tests/test_bloom_metrics.py @@ -135,6 +135,4 @@ def test_save_and_restore_metrics(self): # Compare original and loaded scaled bloomfilter infos new_client = self.server.get_new_client() - # When we scale out in the original it scales the vec by a factor of four. However when we load from an rdb we create an exact sized vec, this means the last - # two 8 byte sized allocations for a vec are no longer in memory so we have 16 less bytes in memory now. Figure out if we want to do this self.verify_bloom_metrics(new_client.execute_command("INFO bf"), original_info_obj[3] + DEFAULT_BLOOM_FILTER_SIZE, 2, 3, 7501, 21000 + DEFAULT_BLOOM_FILTER_CAPACITY) diff --git a/tests/test_save_and_restore.py b/tests/test_save_and_restore.py index 23850a9..d4fc828 100644 --- a/tests/test_save_and_restore.py +++ b/tests/test_save_and_restore.py @@ -26,14 +26,12 @@ def test_basic_save_and_restore(self): self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True) assert self.server.is_alive() - assert uptime_in_sec_1 > uptime_in_sec_2 - assert self.server.is_rdb_done_loading() + wait_for_equal(lambda: self.server.is_rdb_done_loading(), True) restored_server_digest = client.debug_digest() restored_object_digest = client.execute_command('DEBUG DIGEST-VALUE testSave') assert restored_server_digest == server_digest assert restored_object_digest == object_digest self.server.verify_string_in_logfile("Loading RDB produced by Valkey") - wait_for_equal(lambda: self.server.is_rdb_done_loading(), True) self.server.verify_string_in_logfile("Done loading RDB, keys loaded: 1, keys expired: 0") # verify restore results