diff --git a/src/client.rs b/src/client.rs index 9b0cb19..fede63d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -50,6 +50,18 @@ impl Connectable for Vec<&str> { pub struct Client { connections: Vec>, pub hash_function: fn(&str) -> u64, + continuum: Continuum, +} + +#[derive(Clone)] +struct VNode { + position: u64, + connection_index: usize, +} + +#[derive(Clone)] +pub struct Continuum { + vnodes: Vec, } unsafe impl Send for Client {} @@ -60,6 +72,10 @@ fn default_hash_function(key: &str) -> u64 { return hasher.finish(); } +fn empty_hash_function(_key: &str) -> u64 { + return 0; +} + pub(crate) fn check_key_len(key: &str) -> Result<(), MemcacheError> { if key.len() > 250 { Err(ClientError::KeyTooLong)? @@ -67,32 +83,94 @@ pub(crate) fn check_key_len(key: &str) -> Result<(), MemcacheError> { Ok(()) } +impl Continuum { + fn bsearch_continuum_index(&self, hash: u64) -> usize { + let mut left = 0; + let mut right = self.vnodes.len(); + let mut middle; + while left < right { + middle = (left + right) / 2; + if self.vnodes[middle].position < hash { + left = middle + 1; + } else { + right = middle; + } + } + left // This can be continuum.vnodes.len() + } + + fn add_vnode(&mut self, vnodename: String, idx: usize) { + let hash = default_hash_function(&vnodename); + if self.vnodes.len() == 0 { + self.vnodes.push(VNode { + position: hash, + connection_index: idx, + }); + } else { + let cont_idx = self.bsearch_continuum_index(hash); + if cont_idx == self.vnodes.len() { + self.vnodes.push(VNode { + position: hash, + connection_index: idx, + }); + } else { + self.vnodes.insert( + cont_idx, + VNode { + position: hash, + connection_index: idx, + }, + ); + } + } + } + + fn add_vnodes(&mut self, hostname: String, idx: usize) { + for n in 0..160 { + let vnodename = format!("{}-{}#{}", hostname, idx, n); + self.add_vnode(vnodename, idx); + } + } +} + impl Client { #[deprecated(since = "0.10.0", note = "please use `connect` instead")] pub fn new(target: C) -> Result { return Self::connect(target); } + fn get_connections_index(&self, hash: u64) -> usize { + let mut idx = self.continuum.bsearch_continuum_index(hash); + if idx == self.continuum.vnodes.len() { + idx = 0; + } + self.continuum.vnodes[idx].connection_index + } + pub fn with_pool_size(target: C, size: u32) -> Result { let urls = target.get_urls(); let mut connections = vec![]; + let mut continuum = Continuum { vnodes: Vec::new() }; for url in urls { let parsed = Url::parse(url.as_str())?; let pool = r2d2::Pool::builder() .max_size(size) .build(ConnectionManager::new(parsed))?; connections.push(pool); + continuum.add_vnodes(url.to_string(), connections.len() - 1); } Ok(Client { connections, - hash_function: default_hash_function, + hash_function: empty_hash_function, + continuum: continuum, }) } pub fn with_pool(pool: Pool) -> Result { Ok(Client { connections: vec![pool], - hash_function: default_hash_function, + hash_function: empty_hash_function, + continuum: Continuum { vnodes: Vec::new() }, }) } @@ -102,7 +180,13 @@ impl Client { fn get_connection(&self, key: &str) -> Pool { let connections_count = self.connections.len(); - return self.connections[(self.hash_function)(key) as usize % connections_count].clone(); + // XXX: This may not be deterministic so ideally need to hash the whole function itself + if self.hash_function as isize == empty_hash_function as isize { + let connection_index = self.get_connections_index(default_hash_function(key)); + self.connections[connection_index as usize].clone() + } else { + self.connections[(self.hash_function)(key) as usize % connections_count].clone() + } } /// Set the socket read timeout for TCP connections. @@ -224,10 +308,17 @@ impl Client { let connections_count = self.connections.len(); for key in keys { - let connection_index = (self.hash_function)(key) as usize % connections_count; + let connection_index; + // XXX: This may not be deterministic so ideally need to hash the whole function itself + if self.hash_function as isize == empty_hash_function as isize { + connection_index = self.get_connections_index(default_hash_function(key)); + } else { + connection_index = (self.hash_function)(key) as usize % connections_count; + } let array = con_keys.entry(connection_index).or_insert_with(Vec::new); array.push(key); } + for (&connection_index, keys) in con_keys.iter() { let connection = self.connections[connection_index].clone(); result.extend(connection.get()?.gets(keys)?); @@ -472,4 +563,114 @@ mod tests { client.set("counter", 321, 0).unwrap(); assert_eq!(client.increment("counter", 123).unwrap(), 444); } + + #[test] + fn test_bsearch() { + let mut continuum = super::Continuum { vnodes: Vec::new() }; + + continuum.vnodes.push(super::VNode { + position: 10, + connection_index: 1, + }); + continuum.vnodes.push(super::VNode { + position: 20, + connection_index: 2, + }); + continuum.vnodes.push(super::VNode { + position: 30, + connection_index: 3, + }); + continuum.vnodes.push(super::VNode { + position: 40, + connection_index: 2, + }); + + assert_eq!(0, continuum.bsearch_continuum_index(5)); + assert_eq!( + 1, + continuum.vnodes[continuum.bsearch_continuum_index(5)].connection_index + ); + assert_eq!(1, continuum.bsearch_continuum_index(15)); + assert_eq!( + 2, + continuum.vnodes[continuum.bsearch_continuum_index(15)].connection_index + ); + assert_eq!(2, continuum.bsearch_continuum_index(30)); + assert_eq!( + 3, + continuum.vnodes[continuum.bsearch_continuum_index(30)].connection_index + ); + assert_eq!(3, continuum.bsearch_continuum_index(31)); + assert_eq!( + 2, + continuum.vnodes[continuum.bsearch_continuum_index(31)].connection_index + ); + + assert_eq!(4, continuum.bsearch_continuum_index(41)); + assert_eq!(4, continuum.bsearch_continuum_index(65535)); + + continuum.vnodes.insert( + 0, + super::VNode { + position: 1, + connection_index: 3, + }, + ); + continuum.vnodes.insert( + 2, + super::VNode { + position: 19, + connection_index: 1, + }, + ); + continuum.vnodes.insert( + 6, + super::VNode { + position: 50, + connection_index: 4, + }, + ); + + assert_eq!(0, continuum.bsearch_continuum_index(0)); + assert_eq!( + 3, + continuum.vnodes[continuum.bsearch_continuum_index(0)].connection_index + ); + assert_eq!(2, continuum.bsearch_continuum_index(15)); + assert_eq!( + 1, + continuum.vnodes[continuum.bsearch_continuum_index(15)].connection_index + ); + assert_eq!(5, continuum.bsearch_continuum_index(31)); + assert_eq!( + 2, + continuum.vnodes[continuum.bsearch_continuum_index(31)].connection_index + ); + assert_eq!(6, continuum.bsearch_continuum_index(41)); + assert_eq!( + 4, + continuum.vnodes[continuum.bsearch_continuum_index(41)].connection_index + ); + + assert_eq!(7, continuum.bsearch_continuum_index(51)); + } + + #[test] + fn test_key_distribution() { + let mut servers = Vec::new(); + for i in 0..5 { + servers.push(format!("memcache://localhost:{}", 12345 + i)); + } + let client = super::Client::connect(servers).unwrap(); + let mut map = super::HashMap::::new(); + for i in 1..10000 { + let key = super::default_hash_function(&format!("key{}", i)); + let idx = client.get_connections_index(key); + *map.entry(idx).or_insert(1) += 1; + } + for (_k, v) in map { + // Each server should contain at least 75% of expected # of keys + assert!(((10000 / 5) as f64) * 0.75 < v as f64); + } + } }