From a8994ca5796583b9286d9ec1912d78712dcbfb72 Mon Sep 17 00:00:00 2001 From: Karthik Subbarao Date: Sat, 7 Dec 2024 19:43:32 +0000 Subject: [PATCH] Deterministic replication - bloom object creation should have the same properties on replica nodes to match the object on the primary Signed-off-by: Karthik Subbarao --- src/bloom/command_handler.rs | 231 ++++++++++++++++++++++++++++++---- src/bloom/data_type.rs | 24 ++-- src/bloom/utils.rs | 167 +++++++++++++++++++----- src/lib.rs | 1 - src/wrapper/bloom_callback.rs | 45 ++++--- tests/test_replication.py | 84 +++++++++---- 6 files changed, 437 insertions(+), 115 deletions(-) diff --git a/src/bloom/command_handler.rs b/src/bloom/command_handler.rs index 864da45..9874656 100644 --- a/src/bloom/command_handler.rs +++ b/src/bloom/command_handler.rs @@ -11,6 +11,9 @@ use valkey_module::ContextFlags; use valkey_module::NotifyEvent; use valkey_module::{Context, ValkeyError, ValkeyResult, ValkeyString, ValkeyValue, VALKEY_OK}; +/// Helper function used to add items to a bloom object. It handles both multi item and single item add operations. +/// It is used by any command that allows adding of items: BF.ADD, BF.MADD, and BF.INSERT. +/// Returns the result of the item add operation on success as a ValkeyValue and a ValkeyError on failure. fn handle_bloom_add( args: &[ValkeyString], argc: usize, @@ -52,13 +55,95 @@ fn handle_bloom_add( } } +/// Structure to help provide the command arguments required for replication. This is used by mutative commands. +struct ReplicateArgs<'a> { + capacity: i64, + expansion: u32, + fp_rate: f64, + tightening_ratio: f64, + seed: [u8; 32], + items: &'a [ValkeyString], +} + +/// Helper function to replicate mutative commands to the replica nodes and publish keyspace events. +/// There are two main cases for replication: +/// - RESERVE operation: This is any bloom object creation which will be replicated with the exact properties of the +/// primary node using BF.INSERT. +/// - ADD operation: This is the case where only items were added to a bloom object. Here, the command is replicated verbatim. +/// +/// With this, replication becomes deterministic. +/// For keyspace events, we publish an event for both the RESERVE and ADD scenarios depending on if either or both of the +/// cases occurred. fn replicate_and_notify_events( ctx: &Context, key_name: &ValkeyString, add_operation: bool, reserve_operation: bool, + args: ReplicateArgs, ) { - if add_operation || reserve_operation { + if reserve_operation { + // Any bloom filter creation should have a deterministic replication with the exact same properties as what was + // created on the primary. This is done using BF.INSERT. + let capacity_str = + ValkeyString::create_from_slice(std::ptr::null_mut(), "CAPACITY".as_bytes()); + let capacity_val = ValkeyString::create_from_slice( + std::ptr::null_mut(), + args.capacity.to_string().as_bytes(), + ); + let fp_rate_str = ValkeyString::create_from_slice(std::ptr::null_mut(), "ERROR".as_bytes()); + let fp_rate_val = ValkeyString::create_from_slice( + std::ptr::null_mut(), + args.fp_rate.to_string().as_bytes(), + ); + let tightening_str = + ValkeyString::create_from_slice(std::ptr::null_mut(), "TIGHTENING".as_bytes()); + let tightening_val = ValkeyString::create_from_slice( + std::ptr::null_mut(), + args.tightening_ratio.to_string().as_bytes(), + ); + let seed_str = ValkeyString::create_from_slice(std::ptr::null_mut(), "SEED".as_bytes()); + let seed_val = ValkeyString::create_from_slice(std::ptr::null_mut(), &args.seed); + let mut cmd = vec![ + key_name, + &capacity_str, + &capacity_val, + &fp_rate_str, + &fp_rate_val, + &tightening_str, + &tightening_val, + &seed_str, + &seed_val, + ]; + // Add nonscaling / expansion related arguments. + let expansion_args = match args.expansion == 0 { + true => { + let nonscaling_str = + ValkeyString::create_from_slice(std::ptr::null_mut(), "NONSCALING".as_bytes()); + vec![nonscaling_str] + } + false => { + let expansion_str = + ValkeyString::create_from_slice(std::ptr::null_mut(), "EXPANSION".as_bytes()); + let expansion_val = ValkeyString::create_from_slice( + std::ptr::null_mut(), + args.expansion.to_string().as_bytes(), + ); + vec![expansion_str, expansion_val] + } + }; + for arg in &expansion_args { + cmd.push(arg); + } + // Add items if any exist. + let items_str = ValkeyString::create_from_slice(std::ptr::null_mut(), "ITEMS".as_bytes()); + if !args.items.is_empty() { + cmd.push(&items_str); + } + for item in args.items { + cmd.push(item); + } + ctx.replicate("BF.INSERT", cmd.as_slice()); + } else if add_operation { ctx.replicate_verbatim(); } if add_operation { @@ -69,6 +154,7 @@ fn replicate_and_notify_events( } } +/// Function that implements logic to handle the BF.ADD and BF.MADD commands. pub fn bloom_filter_add_value( ctx: &Context, input_args: &[ValkeyString], @@ -104,7 +190,15 @@ pub fn bloom_filter_add_value( &mut add_succeeded, validate_size_limit, ); - replicate_and_notify_events(ctx, filter_name, add_succeeded, false); + let replicate_args = ReplicateArgs { + capacity: bloom.capacity(), + expansion: bloom.expansion(), + fp_rate: bloom.fp_rate(), + tightening_ratio: bloom.tightening_ratio(), + seed: bloom.seed(), + items: &input_args[curr_cmd_idx..], + }; + replicate_and_notify_events(ctx, filter_name, add_succeeded, false, replicate_args); response } None => { @@ -114,17 +208,29 @@ pub fn bloom_filter_add_value( let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed); let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32; let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed); + let seed = match use_random_seed { + true => (None, true), + false => (Some(configs::FIXED_SEED), false), + }; let mut bloom = match BloomFilterType::new_reserved( fp_rate, tightening_ratio, capacity, expansion, - use_random_seed, + seed, validate_size_limit, ) { Ok(bf) => bf, Err(err) => return Err(ValkeyError::Str(err.as_str())), }; + let replicate_args = ReplicateArgs { + capacity: bloom.capacity(), + expansion: bloom.expansion(), + fp_rate: bloom.fp_rate(), + tightening_ratio: bloom.tightening_ratio(), + seed: bloom.seed(), + items: &input_args[curr_cmd_idx..], + }; let response = handle_bloom_add( input_args, argc, @@ -136,7 +242,13 @@ pub fn bloom_filter_add_value( ); match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) { Ok(()) => { - replicate_and_notify_events(ctx, filter_name, add_succeeded, true); + replicate_and_notify_events( + ctx, + filter_name, + add_succeeded, + true, + replicate_args, + ); response } Err(_) => Err(ValkeyError::Str(utils::ERROR)), @@ -145,6 +257,7 @@ pub fn bloom_filter_add_value( } } +/// Helper function used to check whether an item (or multiple items) exists on a bloom object. fn handle_item_exists(value: Option<&BloomFilterType>, item: &[u8]) -> ValkeyValue { if let Some(val) = value { if val.item_exists(item) { @@ -157,6 +270,7 @@ fn handle_item_exists(value: Option<&BloomFilterType>, item: &[u8]) -> ValkeyVal ValkeyValue::Integer(0) } +/// Function that implements logic to handle the BF.EXISTS and BF.MEXISTS commands. pub fn bloom_filter_exists( ctx: &Context, input_args: &[ValkeyString], @@ -191,6 +305,7 @@ pub fn bloom_filter_exists( Ok(ValkeyValue::Array(result)) } +/// Function that implements logic to handle the BF.CARD command. pub fn bloom_filter_card(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult { let argc = input_args.len(); if argc != 2 { @@ -212,6 +327,7 @@ pub fn bloom_filter_card(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe } } +/// Function that implements logic to handle the BF.RESERVE command. pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult { let argc = input_args.len(); if !(4..=6).contains(&argc) { @@ -279,6 +395,10 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke Some(_) => Err(ValkeyError::Str(utils::ITEM_EXISTS)), None => { let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed); + let seed = match use_random_seed { + true => (None, true), + false => (Some(configs::FIXED_SEED), false), + }; // Skip bloom filter size validation on replicated cmds. let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED); let tightening_ratio = configs::TIGHTENING_RATIO; @@ -287,15 +407,23 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke tightening_ratio, capacity, expansion, - use_random_seed, + seed, validate_size_limit, ) { Ok(bf) => bf, Err(err) => return Err(ValkeyError::Str(err.as_str())), }; + let replicate_args = ReplicateArgs { + capacity: bloom.capacity(), + expansion: bloom.expansion(), + fp_rate: bloom.fp_rate(), + tightening_ratio: bloom.tightening_ratio(), + seed: bloom.seed(), + items: &[], + }; match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) { Ok(()) => { - replicate_and_notify_events(ctx, filter_name, false, true); + replicate_and_notify_events(ctx, filter_name, false, true, replicate_args); VALKEY_OK } Err(_) => Err(ValkeyError::Str(utils::ERROR)), @@ -304,9 +432,9 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke } } +/// Function that implements logic to handle the BF.INSERT command. pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult { let argc = input_args.len(); - let replicated_cmd = ctx.get_flags().contains(ContextFlags::REPLICATED); // At the very least, we need: BF.INSERT ITEMS if argc < 4 { return Err(ValkeyError::WrongArity); @@ -315,10 +443,16 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey // Parse the filter name let filter_name = &input_args[idx]; idx += 1; + let replicated_cmd = ctx.get_flags().contains(ContextFlags::REPLICATED); let mut fp_rate = configs::BLOOM_FP_RATE_DEFAULT; let mut tightening_ratio = configs::TIGHTENING_RATIO; let mut capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed); let mut expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32; + let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed); + let mut seed = match use_random_seed { + true => (None, true), + false => (Some(configs::FIXED_SEED), false), + }; let mut nocreate = false; while idx < argc { match input_args[idx].to_string_lossy().to_uppercase().as_str() { @@ -338,6 +472,8 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey }; } "TIGHTENING" if replicated_cmd => { + // Note: This argument is only supported on replicated commands since primary nodes replicate bloom objects + // deterministically using every global bloom config/property. if idx >= (argc - 1) { return Err(ValkeyError::WrongArity); } @@ -367,6 +503,21 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey } }; } + "SEED" if replicated_cmd => { + // Note: This argument is only supported on replicated commands since primary nodes replicate bloom objects + // deterministically using every global bloom config/property. + if idx >= (argc - 1) { + return Err(ValkeyError::WrongArity); + } + idx += 1; + // The BloomObject implementation uses a 32-byte (u8) array as the seed. + let seed_result: Result<[u8; 32], _> = input_args[idx].as_slice().try_into(); + let Ok(seed_raw) = seed_result else { + return Err(ValkeyError::Str(utils::INVALID_SEED)); + }; + let is_seed_random = seed_raw != configs::FIXED_SEED; + seed = (Some(seed_raw), is_seed_random); + } "NOCREATE" => { nocreate = true; } @@ -395,8 +546,10 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey } idx += 1; } - if idx == argc { - // No ITEMS argument from the insert command + if idx == argc && !replicated_cmd { + // We expect the ITEMS [ ...] argument to be provided on the BF.INSERT command used on primary nodes. + // For replicated commands, this is optional to allow BF.INSERT to be used to replicate bloom object creation + // commands without any items (BF.RESERVE). return Err(ValkeyError::WrongArity); } // If the filter does not exist, create one @@ -408,6 +561,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey } }; // Skip bloom filter size validation on replicated cmds. + let validate_size_limit = !replicated_cmd; let mut add_succeeded = false; match value { Some(bloom) => { @@ -420,25 +574,40 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey &mut add_succeeded, !replicated_cmd, ); - replicate_and_notify_events(ctx, filter_name, add_succeeded, false); + let replicate_args = ReplicateArgs { + capacity: bloom.capacity(), + expansion: bloom.expansion(), + fp_rate: bloom.fp_rate(), + tightening_ratio: bloom.tightening_ratio(), + seed: bloom.seed(), + items: &input_args[idx..], + }; + replicate_and_notify_events(ctx, filter_name, add_succeeded, false, replicate_args); response } None => { if nocreate { return Err(ValkeyError::Str(utils::NOT_FOUND)); } - let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed); let mut bloom = match BloomFilterType::new_reserved( fp_rate, tightening_ratio, capacity, expansion, - use_random_seed, - !replicated_cmd, + seed, + validate_size_limit, ) { Ok(bf) => bf, Err(err) => return Err(ValkeyError::Str(err.as_str())), }; + let replicate_args = ReplicateArgs { + capacity: bloom.capacity(), + expansion: bloom.expansion(), + fp_rate: bloom.fp_rate(), + tightening_ratio: bloom.tightening_ratio(), + seed: bloom.seed(), + items: &input_args[idx..], + }; let response = handle_bloom_add( input_args, argc, @@ -450,7 +619,13 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey ); match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) { Ok(()) => { - replicate_and_notify_events(ctx, filter_name, add_succeeded, true); + replicate_and_notify_events( + ctx, + filter_name, + add_succeeded, + true, + replicate_args, + ); response } Err(_) => Err(ValkeyError::Str(utils::ERROR)), @@ -459,6 +634,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey } } +/// Function that implements logic to handle the BF.INFO command. pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult { let argc = input_args.len(); if !(2..=3).contains(&argc) { @@ -484,13 +660,13 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe { "CAPACITY" => Ok(ValkeyValue::Integer(val.capacity())), "SIZE" => Ok(ValkeyValue::Integer(val.memory_usage() as i64)), - "FILTERS" => Ok(ValkeyValue::Integer(val.filters.len() as i64)), + "FILTERS" => Ok(ValkeyValue::Integer(val.num_filters() as i64)), "ITEMS" => Ok(ValkeyValue::Integer(val.cardinality())), "EXPANSION" => { - if val.expansion == 0 { + if val.expansion() == 0 { return Ok(ValkeyValue::Null); } - Ok(ValkeyValue::Integer(val.expansion as i64)) + Ok(ValkeyValue::Integer(val.expansion() as i64)) } _ => Err(ValkeyError::Str(utils::INVALID_INFO_VALUE)), } @@ -502,15 +678,15 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe ValkeyValue::SimpleStringStatic("Size"), ValkeyValue::Integer(val.memory_usage() as i64), ValkeyValue::SimpleStringStatic("Number of filters"), - ValkeyValue::Integer(val.filters.len() as i64), + ValkeyValue::Integer(val.num_filters() as i64), ValkeyValue::SimpleStringStatic("Number of items inserted"), ValkeyValue::Integer(val.cardinality()), ValkeyValue::SimpleStringStatic("Expansion rate"), ]; - if val.expansion == 0 { + if val.expansion() == 0 { result.push(ValkeyValue::Null); } else { - result.push(ValkeyValue::Integer(val.expansion as i64)); + result.push(ValkeyValue::Integer(val.expansion() as i64)); } Ok(ValkeyValue::Array(result)) } @@ -518,6 +694,7 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe } } +/// Function that implements logic to handle the BF.LOAD command. pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult { let argc = input_args.len(); if argc != 3 { @@ -546,15 +723,23 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe // if filter not exists, create it. let hex = value.to_vec(); let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED); - let bf = match BloomFilterType::decode_bloom_filter(&hex, validate_size_limit) { + let bloom = match BloomFilterType::decode_bloom_filter(&hex, validate_size_limit) { Ok(v) => v, Err(err) => { return Err(ValkeyError::Str(err.as_str())); } }; - match filter_key.set_value(&BLOOM_FILTER_TYPE, bf) { + let replicate_args = ReplicateArgs { + capacity: bloom.capacity(), + expansion: bloom.expansion(), + fp_rate: bloom.fp_rate(), + tightening_ratio: bloom.tightening_ratio(), + seed: bloom.seed(), + items: &input_args[idx..], + }; + match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) { Ok(_) => { - replicate_and_notify_events(ctx, filter_name, false, true); + replicate_and_notify_events(ctx, filter_name, false, true, replicate_args); VALKEY_OK } Err(_) => Err(ValkeyError::Str(utils::ERROR)), diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index e7e1c2e..d2205ff 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -117,27 +117,27 @@ impl ValkeyDataType for BloomFilterType { } filters.push(Box::new(filter)); } - - let item = BloomFilterType { - expansion: expansion as u32, + let item = BloomFilterType::from_existing( + expansion as u32, fp_rate, tightening_ratio, is_seed_random, filters, - }; - item.bloom_filter_type_incr_metrics_on_new_create(); + ); Some(item) } /// Function that is used to generate a digest on the Bloom Object. fn debug_digest(&self, mut dig: Digest) { - dig.add_long_long(self.expansion.into()); - dig.add_string_buffer(&self.fp_rate.to_le_bytes()); - dig.add_string_buffer(&self.tightening_ratio.to_le_bytes()); - for filter in &self.filters { - dig.add_string_buffer(filter.bloom.as_slice()); - dig.add_long_long(filter.num_items); - dig.add_long_long(filter.capacity); + dig.add_long_long(self.expansion() as i64); + dig.add_string_buffer(&self.fp_rate().to_le_bytes()); + dig.add_string_buffer(&self.tightening_ratio().to_le_bytes()); + let is_seed_random = if self.is_seed_random() { 1 } else { 0 }; + dig.add_long_long(is_seed_random); + for filter in self.filters() { + dig.add_string_buffer(filter.raw_bloom().as_slice()); + dig.add_long_long(filter.num_items()); + dig.add_long_long(filter.capacity()); } dig.end_sequence(); } diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index e171187..eb2989a 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -21,6 +21,7 @@ pub const NON_SCALING_FILTER_FULL: &str = "ERR non scaling filter is full"; pub const NOT_FOUND: &str = "ERR not found"; pub const ITEM_EXISTS: &str = "ERR item exists"; pub const INVALID_INFO_VALUE: &str = "ERR invalid information value"; +pub const INVALID_SEED: &str = "ERR invalid seed"; pub const BAD_EXPANSION: &str = "ERR bad expansion"; pub const BAD_CAPACITY: &str = "ERR bad capacity"; pub const BAD_ERROR_RATE: &str = "ERR bad error rate"; @@ -69,12 +70,13 @@ impl BloomError { /// Can contain one or more filters. /// This is a generic top level structure which is not coupled to any bloom crate. #[derive(Serialize, Deserialize)] +#[allow(clippy::vec_box)] pub struct BloomFilterType { - pub expansion: u32, - pub fp_rate: f64, - pub tightening_ratio: f64, - pub is_seed_random: bool, - pub filters: Vec>, + expansion: u32, + fp_rate: f64, + tightening_ratio: f64, + is_seed_random: bool, + filters: Vec>, } impl BloomFilterType { @@ -84,7 +86,7 @@ impl BloomFilterType { tightening_ratio: f64, capacity: i64, expansion: u32, - use_random_seed: bool, + seed: (Option<[u8; 32]>, bool), validate_size_limit: bool, ) -> Result { // Reject the request, if the operation will result in creation of a bloom object containing a filter @@ -93,13 +95,16 @@ impl BloomFilterType { return Err(BloomError::ExceedsMaxBloomSize); } // Create the bloom filter and add to the main BloomFilter object. - let bloom = match use_random_seed { - true => Box::new(BloomFilter::with_random_seed(fp_rate, capacity)), - false => Box::new(BloomFilter::with_fixed_seed( - fp_rate, - capacity, - &configs::FIXED_SEED, - )), + let is_seed_random; + let bloom = match seed { + (None, _) => { + is_seed_random = true; + Box::new(BloomFilter::with_random_seed(fp_rate, capacity)) + } + (Some(seed), is_random) => { + is_seed_random = is_random; + Box::new(BloomFilter::with_fixed_seed(fp_rate, capacity, &seed)) + } }; let filters = vec![bloom]; let bloom = BloomFilterType { @@ -107,13 +112,32 @@ impl BloomFilterType { fp_rate, tightening_ratio, filters, - is_seed_random: use_random_seed, + is_seed_random, }; bloom.bloom_filter_type_incr_metrics_on_new_create(); Ok(bloom) } - /// Create a new BloomFilterType object from an existing one. + /// Create a BloomFilterType object from existing data (RDB Load / Restore). + pub fn from_existing( + expansion: u32, + fp_rate: f64, + tightening_ratio: f64, + is_seed_random: bool, + filters: Vec>, + ) -> BloomFilterType { + let bloom = BloomFilterType { + expansion, + fp_rate, + tightening_ratio, + is_seed_random, + filters, + }; + bloom.bloom_filter_type_incr_metrics_on_new_create(); + bloom + } + + /// Create a new BloomFilterType object from an existing one (COPY). pub fn create_copy_from(from_bf: &BloomFilterType) -> BloomFilterType { let mut filters: Vec> = Vec::with_capacity(from_bf.filters.capacity()); for filter in &from_bf.filters { @@ -180,6 +204,41 @@ impl BloomFilterType { .seed() } + /// Return the expansion of the bloom object. + pub fn expansion(&self) -> u32 { + self.expansion + } + + /// Return the false postive rate of the bloom object. + pub fn fp_rate(&self) -> f64 { + self.fp_rate + } + + /// Return the tightening ratio of the bloom object. + pub fn tightening_ratio(&self) -> f64 { + self.tightening_ratio + } + + /// Return whether the bloom object uses a random seed. + pub fn is_seed_random(&self) -> bool { + self.is_seed_random + } + + /// Return the number of filters in the bloom object. + pub fn num_filters(&self) -> usize { + self.filters.len() + } + + /// Return a borrowed ref to the vector of filters in the bloom object. + pub fn filters(&self) -> &Vec> { + &self.filters + } + + /// Return a mutatively borrowed ref to the vector of filters in the bloom object. + pub fn filters_mut(&mut self) -> &mut Vec> { + &mut self.filters + } + /// Add an item to the BloomFilterType object. /// If scaling is enabled, this can result in a new sub filter creation. pub fn add_item(&mut self, item: &[u8], validate_size_limit: bool) -> Result { @@ -274,7 +333,7 @@ impl BloomFilterType { } /// Increments metrics related to Bloom filter memory usage upon creation of a new filter. - pub fn bloom_filter_type_incr_metrics_on_new_create(&self) { + fn bloom_filter_type_incr_metrics_on_new_create(&self) { metrics::BLOOM_NUM_OBJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed); metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add( @@ -374,9 +433,9 @@ pub struct BloomFilter { serialize_with = "serialize", deserialize_with = "deserialize_boxed_bloom" )] - pub bloom: Box>, - pub num_items: i64, - pub capacity: i64, + bloom: Box>, + num_items: i64, + capacity: i64, } pub fn deserialize_boxed_bloom<'de, D>(deserializer: D) -> Result>, D::Error> @@ -451,6 +510,26 @@ impl BloomFilter { self.bloom.seed() } + /// Return the numer of items in the BloomFilter. + pub fn num_items(&self) -> i64 { + self.num_items + } + + /// Return the capcity of the BloomFilter - number of items that can be added to it. + pub fn capacity(&self) -> i64 { + self.capacity + } + + /// Return a borrowed ref to the raw bloom of the BloomFilter. + pub fn raw_bloom(&self) -> &bloomfilter::Bloom<[u8]> { + &self.bloom + } + + /// Return a mutatively borrowed ref to the raw bloom of the BloomFilter. + pub fn raw_bloom_mut(&mut self) -> &mut Box> { + &mut self.bloom + } + pub fn number_of_bytes(&self) -> usize { std::mem::size_of::() + std::mem::size_of::>() @@ -678,8 +757,12 @@ mod tests { fp_assert(error_count, num_operations, expected_fp_rate, fp_margin); } - #[rstest(is_seed_random, case::random_seed(true), case::fixed_seed(false))] - fn test_scaling_filter(is_seed_random: bool) { + #[rstest( + seed, + case::random_seed((None, true)), + case::fixed_seed((Some(configs::FIXED_SEED), false)) + )] + fn test_non_scaling_filter(seed: (Option<[u8; 32]>, bool)) { let rand_prefix = random_prefix(7); // 1 in every 1000 operations is expected to be a false positive. let expected_fp_rate: f64 = 0.001; @@ -693,7 +776,7 @@ mod tests { tightening_ratio, initial_capacity, expansion, - is_seed_random, + seed, true, ) .expect("Expect bloom creation to succeed"); @@ -742,8 +825,12 @@ mod tests { ); } - #[rstest(is_seed_random, case::random_seed(true), case::fixed_seed(false))] - fn test_non_scaling_filter(is_seed_random: bool) { + #[rstest( + seed, + case::random_seed((None, true)), + case::fixed_seed((Some(configs::FIXED_SEED), false)) + )] + fn test_scaling_filter(seed: (Option<[u8; 32]>, bool)) { let rand_prefix = random_prefix(7); // 1 in every 1000 operations is expected to be a false positive. let expected_fp_rate: f64 = 0.001; @@ -756,7 +843,7 @@ mod tests { tightening_ratio, initial_capacity, expansion, - is_seed_random, + seed, true, ) .expect("Expect bloom creation to succeed"); @@ -834,19 +921,27 @@ mod tests { fn test_exceeded_size_limit() { // Validate that bloom filter allocations within bloom objects are rejected if their memory usage would be beyond // the configured limit. - let result = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, i64::MAX, 1, true, true); + let result = + BloomFilterType::new_reserved(0.5_f64, 0.5_f64, i64::MAX, 1, (None, true), true); assert_eq!(result.err(), Some(BloomError::ExceedsMaxBloomSize)); let capacity = 50000000; assert!(!BloomFilter::validate_size(capacity, 0.001_f64)); - let result2 = BloomFilterType::new_reserved(0.001_f64, 0.5_f64, capacity, 1, true, true); + let result2 = + BloomFilterType::new_reserved(0.001_f64, 0.5_f64, capacity, 1, (None, true), true); assert_eq!(result2.err(), Some(BloomError::ExceedsMaxBloomSize)); } #[rstest(expansion, case::nonscaling(0), case::scaling(2))] fn test_bf_encode_and_decode(expansion: u32) { - let mut bf = - BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_i64, expansion, true, true) - .unwrap(); + let mut bf = BloomFilterType::new_reserved( + 0.5_f64, + 0.5_f64, + 1000_i64, + expansion, + (None, true), + true, + ) + .unwrap(); let item = "item1"; let _ = bf.add_item(item.as_bytes(), true); // action @@ -870,7 +965,8 @@ mod tests { fn test_bf_decode_when_unsupported_version_should_failed() { // arrange: prepare bloom filter let mut bf = - BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_i64, 2, true, true).unwrap(); + BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_i64, 2, (None, true), true) + .unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true).unwrap(); @@ -893,7 +989,8 @@ mod tests { fn test_bf_decode_when_bytes_is_empty_should_failed() { // arrange: prepare bloom filter let mut bf = - BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_i64, 2, true, true).unwrap(); + BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_i64, 2, (None, true), true) + .unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true); @@ -914,7 +1011,8 @@ mod tests { fn test_bf_decode_when_bytes_is_exceed_limit_should_failed() { // arrange: prepare bloom filter let mut bf = - BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_i64, 2, true, true).unwrap(); + BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_i64, 2, (None, true), true) + .unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true); let origin_fp_rate = bf.fp_rate; @@ -931,7 +1029,8 @@ mod tests { // build a larger than 64mb filter let extra_large_filter = - BloomFilterType::new_reserved(0.01_f64, 0.5_f64, 57000000, 2, true, false).unwrap(); + BloomFilterType::new_reserved(0.01_f64, 0.5_f64, 57000000, 2, (None, true), false) + .unwrap(); let vec = extra_large_filter.encode_bloom_filter().unwrap(); // should return error assert_eq!( diff --git a/src/lib.rs b/src/lib.rs index 98fe924..5a1096d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,7 +7,6 @@ pub mod metrics; pub mod wrapper; use crate::bloom::command_handler; use crate::bloom::data_type::BLOOM_FILTER_TYPE; -use valkey_module::logging; use valkey_module_macros::info_command_handler; pub const MODULE_NAME: &str = "bf"; diff --git a/src/wrapper/bloom_callback.rs b/src/wrapper/bloom_callback.rs index 6c2f502..e942f19 100644 --- a/src/wrapper/bloom_callback.rs +++ b/src/wrapper/bloom_callback.rs @@ -26,28 +26,25 @@ use super::defrag::Defrag; /// # Safety pub unsafe extern "C" fn bloom_rdb_save(rdb: *mut raw::RedisModuleIO, value: *mut c_void) { let v = &*value.cast::(); - raw::save_unsigned(rdb, v.filters.len() as u64); - raw::save_unsigned(rdb, v.expansion as u64); - raw::save_double(rdb, v.fp_rate); - raw::save_double(rdb, v.tightening_ratio); - let mut is_seed_random = 0; - if v.is_seed_random { - is_seed_random = 1; - } + raw::save_unsigned(rdb, v.num_filters() as u64); + raw::save_unsigned(rdb, v.expansion() as u64); + raw::save_double(rdb, v.fp_rate()); + raw::save_double(rdb, v.tightening_ratio()); + let is_seed_random = if v.is_seed_random() { 1 } else { 0 }; raw::save_unsigned(rdb, is_seed_random); - let filter_list = &v.filters; + let filter_list = v.filters(); let mut filter_list_iter = filter_list.iter().peekable(); while let Some(filter) = filter_list_iter.next() { - let bloom = &filter.bloom; + let bloom = filter.raw_bloom(); let bitmap = bloom.as_slice(); raw::RedisModule_SaveStringBuffer.unwrap()( rdb, bitmap.as_ptr().cast::(), bitmap.len(), ); - raw::save_unsigned(rdb, filter.capacity as u64); + raw::save_unsigned(rdb, filter.capacity() as u64); if filter_list_iter.peek().is_none() { - raw::save_unsigned(rdb, filter.num_items as u64); + raw::save_unsigned(rdb, filter.num_items() as u64); } } } @@ -150,7 +147,7 @@ pub unsafe extern "C" fn bloom_free_effort( curr_item.free_effort() } -/// Lazy static for a default temporary bloom that gets swapped during defrag. +// Lazy static for a default temporary bloom that gets swapped during defrag. lazy_static! { static ref DEFRAG_BLOOM_FILTER: Mutex>>> = Mutex::new(Some(Box::new(Bloom::<[u8]>::new(1, 1).unwrap()))); @@ -170,7 +167,7 @@ lazy_static! { /// Returns a new `Vec` that may have been defragmented. If defragmentation was successful, /// the returned vector will use the newly allocated memory. If defragmentation failed or was /// not necessary, the original vector's memory will be used. -fn external_vec_defrag(mut vec: Vec) -> Vec { +fn external_vec_defrag(vec: Vec) -> Vec { let defrag = Defrag::new(core::ptr::null_mut()); let len = vec.len(); let capacity = vec.capacity(); @@ -231,13 +228,13 @@ pub unsafe extern "C" fn bloom_defrag( // Convert pointer to BloomFilterType so we can operate on it. let bloom_filter_type: &mut BloomFilterType = &mut *(*value).cast::(); - let num_filters = bloom_filter_type.filters.len(); - let filters_capacity = bloom_filter_type.filters.capacity(); + let num_filters = bloom_filter_type.num_filters(); + let filters_capacity = bloom_filter_type.filters().capacity(); // While we are within a timeframe decided from should_stop_defrag and not over the number of filters defrag the next filter while !defrag.should_stop_defrag() && cursor < num_filters as u64 { // Remove the current filter, unbox it, and attempt to defragment. - let bloom_filter_box = bloom_filter_type.filters.remove(cursor as usize); + let bloom_filter_box = bloom_filter_type.filters_mut().remove(cursor as usize); let bloom_filter = Box::into_raw(bloom_filter_box); let defrag_result = defrag.alloc(bloom_filter as *mut c_void); let mut defragged_filter = { @@ -252,7 +249,7 @@ pub unsafe extern "C" fn bloom_defrag( .lock() .expect("We expect default to exist"); let inner_bloom = mem::replace( - &mut defragged_filter.bloom, + defragged_filter.raw_bloom_mut(), temporary_bloom.take().expect("We expect default to exist"), ); // Convert the inner_bloom into the correct type and then try to defragment it @@ -265,20 +262,20 @@ pub unsafe extern "C" fn bloom_defrag( let external_bloom = inner_bloom.realloc_large_heap_allocated_objects(external_vec_defrag); let placeholder_bloom = - mem::replace(&mut defragged_filter.bloom, Box::new(external_bloom)); + mem::replace(defragged_filter.raw_bloom_mut(), Box::new(external_bloom)); *temporary_bloom = Some(placeholder_bloom); // Reset the original static } else { let inner_bloom = unsafe { Box::from_raw(inner_bloom_ptr) }; let external_bloom = inner_bloom.realloc_large_heap_allocated_objects(external_vec_defrag); let placeholder_bloom = - mem::replace(&mut defragged_filter.bloom, Box::new(external_bloom)); + mem::replace(defragged_filter.raw_bloom_mut(), Box::new(external_bloom)); *temporary_bloom = Some(placeholder_bloom); // Reset the original static } // Reinsert the defragmented filter and increment the cursor bloom_filter_type - .filters + .filters_mut() .insert(cursor as usize, defragged_filter); cursor += 1; } @@ -289,11 +286,11 @@ pub unsafe extern "C" fn bloom_defrag( return 1; } // Defragment the Vec of filters itself - let filters_vec = mem::take(&mut bloom_filter_type.filters); + let filters_vec = mem::take(bloom_filter_type.filters_mut()); let filters_ptr = Box::into_raw(filters_vec.into_boxed_slice()) as *mut c_void; let defragged_filters_ptr = defrag.alloc(filters_ptr); if !defragged_filters_ptr.is_null() { - bloom_filter_type.filters = unsafe { + *bloom_filter_type.filters_mut() = unsafe { Vec::from_raw_parts( defragged_filters_ptr as *mut Box, num_filters, @@ -301,7 +298,7 @@ pub unsafe extern "C" fn bloom_defrag( ) }; } else { - bloom_filter_type.filters = unsafe { + *bloom_filter_type.filters_mut() = unsafe { Vec::from_raw_parts( filters_ptr as *mut Box, num_filters, diff --git a/tests/test_replication.py b/tests/test_replication.py index c34f996..cac4605 100644 --- a/tests/test_replication.py +++ b/tests/test_replication.py @@ -23,13 +23,30 @@ def use_random_seed_fixture(self, bloom_config_parameterization): elif bloom_config_parameterization == "fixed-seed": self.use_random_seed = "no" + def validate_cmd_stats(self, primary_cmd, replica_cmd, expected_primary_calls, expected_replica_calls): + """ + Helper fn to validate cmd count on primary & replica for non BF.RESERVE cases for object creation & item add. + """ + primary_cmd_stats = self.client.info("Commandstats")['cmdstat_' + primary_cmd] + assert primary_cmd_stats["calls"] == expected_primary_calls + replica_cmd_stats = self.replicas[0].client.info("Commandstats")['cmdstat_' + replica_cmd] + assert replica_cmd_stats["calls"] == expected_replica_calls + + def validate_reserve_cmd_stats(self, primary_reserve_count, primary_add_count, replica_insert_count, replica_add_count): + """ + Helper fn to validate cmd count on primary & replica for the BF.RESERVE case for object creation & item add. + """ + primary_cmd_stats = self.client.info("Commandstats") + replica_cmd_stats = self.replicas[0].client.info("Commandstats") + assert primary_cmd_stats['cmdstat_BF.RESERVE']["calls"] == primary_reserve_count and primary_cmd_stats['cmdstat_BF.ADD']["calls"] == primary_add_count + assert replica_cmd_stats['cmdstat_BF.INSERT']["calls"] == replica_insert_count and replica_cmd_stats['cmdstat_BF.ADD']["calls"] == replica_add_count + def test_replication_behavior(self): self.setup_replication(num_replicas=1) - is_random_seed = self.client.execute_command('CONFIG GET bf.bloom-use-random-seed') # Test replication for write commands. bloom_write_cmds = [ - ('BF.ADD', 'BF.ADD key item', 'BF.ADD key item1', 2), - ('BF.MADD', 'BF.MADD key item', 'BF.MADD key item1', 2), + ('BF.ADD', 'BF.ADD key item', 'BF.ADD key item1', 1), + ('BF.MADD', 'BF.MADD key item', 'BF.MADD key item1', 1), ('BF.RESERVE', 'BF.RESERVE key 0.001 100000', 'BF.ADD key item1', 1), ('BF.INSERT', 'BF.INSERT key items item', 'BF.INSERT key items item1', 2), ] @@ -37,35 +54,38 @@ def test_replication_behavior(self): prefix = test_case[0] create_cmd = test_case[1] # New bloom object being created is replicated. + # Validate that the bloom object creation command replicated as BF.INSERT. self.client.execute_command(create_cmd) assert self.client.execute_command('EXISTS key') == 1 self.waitForReplicaToSyncUp(self.replicas[0]) assert self.replicas[0].client.execute_command('EXISTS key') == 1 + self.validate_cmd_stats(prefix, 'BF.INSERT', 1, 1) # New item added to an existing bloom is replicated. item_add_cmd = test_case[2] + expected_calls = test_case[3] self.client.execute_command(item_add_cmd) assert self.client.execute_command('BF.EXISTS key item1') == 1 self.waitForReplicaToSyncUp(self.replicas[0]) assert self.replicas[0].client.execute_command('BF.EXISTS key item1') == 1 - - # Validate that the bloom object creation command and item add command was replicated. - expected_calls = test_case[3] - primary_cmd_stats = self.client.info("Commandstats")['cmdstat_' + prefix] - replica_cmd_stats = self.replicas[0].client.info("Commandstats")['cmdstat_' + prefix] - assert primary_cmd_stats["calls"] == expected_calls and replica_cmd_stats["calls"] == expected_calls - + # Validate that item addition (not bloom creation) is using the original command + if prefix != 'BF.RESERVE': + self.validate_cmd_stats(prefix, prefix, 2, expected_calls) + else: + # In case of the BF.RESERVE test case, we use BF.ADD to add items. Validate this is replicated. + self.validate_reserve_cmd_stats(1, 1, 1, 1) # Attempting to add an existing item to an existing bloom will NOT replicated. self.client.execute_command(item_add_cmd) self.waitForReplicaToSyncUp(self.replicas[0]) primary_cmd_stats = self.client.info("Commandstats") replica_cmd_stats = self.replicas[0].client.info("Commandstats") - if prefix == 'BF.RESERVE': - assert primary_cmd_stats['cmdstat_' + prefix]["calls"] == 1 and replica_cmd_stats['cmdstat_' + prefix]["calls"] == 1 - assert primary_cmd_stats['cmdstat_BF.ADD']["calls"] == 2 and replica_cmd_stats['cmdstat_BF.ADD']["calls"] == 1 + if prefix != 'BF.RESERVE': + self.validate_cmd_stats(prefix, prefix, 3, expected_calls) else: - assert primary_cmd_stats['cmdstat_' + prefix]["calls"] == (expected_calls + 1) and replica_cmd_stats['cmdstat_' + prefix]["calls"] == expected_calls - + # In case of the BF.RESERVE test case, we use BF.ADD to add items. Validate this is not replicated since + # the item already exists. + self.validate_reserve_cmd_stats(1, 2, 1, 1) + # cmd debug digest server_digest_primary = self.client.debug_digest() assert server_digest_primary != None or 0000000000000000000000000000000000000000 @@ -73,12 +93,7 @@ def test_replication_behavior(self): assert server_digest_primary == server_digest_replica object_digest_primary = self.client.execute_command('DEBUG DIGEST-VALUE key') debug_digest_replica = self.replicas[0].client.execute_command('DEBUG DIGEST-VALUE key') - # TODO: Update the test here to validate that digest always matches during replication. Once we implement - # deterministic replication (including replicating seeds), this assert will be updated. - if is_random_seed[1] == b'yes': - assert object_digest_primary != debug_digest_replica - else: - assert object_digest_primary == debug_digest_replica + assert object_digest_primary == debug_digest_replica self.client.execute_command('FLUSHALL') self.waitForReplicaToSyncUp(self.replicas[0]) @@ -139,3 +154,30 @@ def test_replication_behavior(self): assert primary_cmd_stats["calls"] == 1 assert primary_cmd_stats["failed_calls"] == 1 assert ('cmdstat_' + prefix) not in self.replicas[0].client.info("Commandstats") + + def test_deterministic_replication(self): + self.setup_replication(num_replicas=1) + # Set non default global properties (config) on the primary node. Any bloom creation on the primary should be + # replicated with the properties below. + assert self.client.execute_command('CONFIG SET bf.bloom-capacity 1000') == b'OK' + assert self.client.execute_command('CONFIG SET bf.bloom-expansion 3') == b'OK' + # Test bloom object creation with every command type. + bloom_write_cmds = [ + ('BF.ADD', 'BF.ADD key item'), + ('BF.MADD', 'BF.MADD key item'), + ('BF.RESERVE', 'BF.RESERVE key 0.001 100000'), + ('BF.INSERT', 'BF.INSERT key items item'), + ] + for test_case in bloom_write_cmds: + prefix = test_case[0] + create_cmd = test_case[1] + self.client.execute_command(create_cmd) + server_digest_primary = self.client.debug_digest() + assert server_digest_primary != None or 0000000000000000000000000000000000000000 + server_digest_replica = self.client.debug_digest() + object_digest_primary = self.client.execute_command('DEBUG DIGEST-VALUE key') + debug_digest_replica = self.replicas[0].client.execute_command('DEBUG DIGEST-VALUE key') + assert server_digest_primary == server_digest_replica + assert object_digest_primary == debug_digest_replica + self.client.execute_command('FLUSHALL') + self.waitForReplicaToSyncUp(self.replicas[0])