diff --git a/redis/src/cluster.rs b/redis/src/cluster.rs index 5c0702d85..cec907b12 100644 --- a/redis/src/cluster.rs +++ b/redis/src/cluster.rs @@ -35,14 +35,14 @@ //! .expire(key, 60).ignore() //! .query(&mut connection).unwrap(); //! ``` +use rand::{seq::IteratorRandom, thread_rng, Rng}; use std::cell::RefCell; use std::collections::HashSet; use std::str::FromStr; +use std::sync::Arc; use std::thread; use std::time::Duration; -use rand::{seq::IteratorRandom, thread_rng, Rng}; - use crate::cluster_pipeline::UNROUTABLE_ERROR; use crate::cluster_routing::{ MultipleNodeRoutingInfo, ResponsePolicy, Routable, SingleNodeRoutingInfo, SlotAddr, @@ -343,22 +343,20 @@ where let mut slots = self.slots.borrow_mut(); *slots = self.create_new_slots()?; - let mut nodes = slots.values().flatten().collect::>(); - nodes.sort_unstable(); - nodes.dedup(); - + let nodes = slots.all_node_addresses(); let mut connections = self.connections.borrow_mut(); *connections = nodes .into_iter() .filter_map(|addr| { - if connections.contains_key(addr) { - let mut conn = connections.remove(addr).unwrap(); + let addr = addr.to_string(); + if connections.contains_key(&addr) { + let mut conn = connections.remove(&addr).unwrap(); if conn.check_connection() { return Some((addr.to_string(), conn)); } } - if let Ok(mut conn) = self.connect(addr) { + if let Ok(mut conn) = self.connect(&addr) { if conn.check_connection() { return Some((addr.to_string(), conn)); } @@ -424,7 +422,7 @@ where if let Some(addr) = slots.slot_addr_for_route(route) { Ok(( addr.to_string(), - self.get_connection_by_addr(connections, addr)?, + self.get_connection_by_addr(connections, &addr)?, )) } else { // try a random node next. This is safe if slots are involved @@ -495,13 +493,13 @@ where fn execute_on_all<'a>( &'a self, input: Input, - addresses: HashSet<&'a str>, + addresses: HashSet>, connections: &'a mut HashMap, - ) -> Vec> { + ) -> Vec, Value)>> { addresses .into_iter() .map(|addr| { - let connection = self.get_connection_by_addr(connections, addr)?; + let connection = self.get_connection_by_addr(connections, &addr)?; match input { Input::Slice { cmd, routable: _ } => connection.req_packed_command(cmd), Input::Cmd(cmd) => connection.req_command(cmd), @@ -526,8 +524,8 @@ where input: Input, slots: &'a mut SlotMap, connections: &'a mut HashMap, - ) -> Vec> { - self.execute_on_all(input, slots.addresses_for_all_nodes(), connections) + ) -> Vec, Value)>> { + self.execute_on_all(input, slots.all_node_addresses(), connections) } fn execute_on_all_primaries<'a>( @@ -535,7 +533,7 @@ where input: Input, slots: &'a mut SlotMap, connections: &'a mut HashMap, - ) -> Vec> { + ) -> Vec, Value)>> { self.execute_on_all(input, slots.addresses_for_all_primaries(), connections) } @@ -545,7 +543,7 @@ where slots: &'a mut SlotMap, connections: &'a mut HashMap, routes: &'b [(Route, Vec)], - ) -> Vec> + ) -> Vec, Value)>> where 'b: 'a, { @@ -557,7 +555,7 @@ where ErrorKind::IoError, "Couldn't find connection", )))?; - let connection = self.get_connection_by_addr(connections, addr)?; + let connection = self.get_connection_by_addr(connections, &addr)?; let (_, indices) = routes.get(index).unwrap(); let cmd = crate::cluster_routing::command_for_multi_slot_indices(&input, indices.iter()); diff --git a/redis/src/cluster_async/connections_container.rs b/redis/src/cluster_async/connections_container.rs index 2bfbb8b93..3c1bc10f1 100644 --- a/redis/src/cluster_async/connections_container.rs +++ b/redis/src/cluster_async/connections_container.rs @@ -1,11 +1,12 @@ use crate::cluster_async::ConnectionFuture; -use crate::cluster_routing::{Route, SlotAddr}; +use crate::cluster_routing::{Route, ShardAddrs, SlotAddr}; use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue}; use crate::cluster_topology::TopologyHash; use dashmap::DashMap; use futures::FutureExt; use rand::seq::IteratorRandom; use std::net::IpAddr; +use std::sync::Arc; /// A struct that encapsulates a network connection along with its associated IP address. #[derive(Clone, Eq, PartialEq, Debug)] @@ -137,6 +138,16 @@ where } } + /// Returns an iterator over the nodes in the `slot_map`, yielding pairs of the node address and its associated shard addresses. + pub(crate) fn slot_map_nodes( + &self, + ) -> impl Iterator, Arc)> + '_ { + self.slot_map + .nodes_map() + .iter() + .map(|item| (item.key().clone(), item.value().clone())) + } + // Extends the current connection map with the provided one pub(crate) fn extend_connection_map( &mut self, @@ -147,11 +158,7 @@ where /// Returns true if the address represents a known primary node. pub(crate) fn is_primary(&self, address: &String) -> bool { - self.connection_for_address(address).is_some() - && self - .slot_map - .values() - .any(|slot_addrs| slot_addrs.primary.as_str() == address) + self.connection_for_address(address).is_some() && self.slot_map.is_primary(address) } fn round_robin_read_from_replica( @@ -160,19 +167,20 @@ where ) -> Option> { let addrs = &slot_map_value.addrs; let initial_index = slot_map_value - .latest_used_replica + .last_used_replica .load(std::sync::atomic::Ordering::Relaxed); let mut check_count = 0; loop { check_count += 1; // Looped through all replicas, no connected replica was found. - if check_count > addrs.replicas.len() { - return self.connection_for_address(addrs.primary.as_str()); + if check_count > addrs.replicas().len() { + return self.connection_for_address(addrs.primary().as_str()); } - let index = (initial_index + check_count) % addrs.replicas.len(); - if let Some(connection) = self.connection_for_address(addrs.replicas[index].as_str()) { - let _ = slot_map_value.latest_used_replica.compare_exchange_weak( + let index = (initial_index + check_count) % addrs.replicas().len(); + if let Some(connection) = self.connection_for_address(addrs.replicas()[index].as_str()) + { + let _ = slot_map_value.last_used_replica.compare_exchange_weak( initial_index, index, std::sync::atomic::Ordering::Relaxed, @@ -186,15 +194,15 @@ where fn lookup_route(&self, route: &Route) -> Option> { let slot_map_value = self.slot_map.slot_value_for_route(route)?; let addrs = &slot_map_value.addrs; - if addrs.replicas.is_empty() { - return self.connection_for_address(addrs.primary.as_str()); + if addrs.replicas().is_empty() { + return self.connection_for_address(addrs.primary().as_str()); } match route.slot_addr() { - SlotAddr::Master => self.connection_for_address(addrs.primary.as_str()), + SlotAddr::Master => self.connection_for_address(addrs.primary().as_str()), SlotAddr::ReplicaOptional => match self.read_from_replica_strategy { ReadFromReplicaStrategy::AlwaysFromPrimary => { - self.connection_for_address(addrs.primary.as_str()) + self.connection_for_address(addrs.primary().as_str()) } ReadFromReplicaStrategy::RoundRobin => { self.round_robin_read_from_replica(slot_map_value) @@ -232,7 +240,7 @@ where self.slot_map .addresses_for_all_primaries() .into_iter() - .flat_map(|addr| self.connection_for_address(addr)) + .flat_map(|addr| self.connection_for_address(&addr)) } pub(crate) fn node_for_address(&self, address: &str) -> Option> { diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 965a05cf8..ca5d11fc2 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -31,7 +31,7 @@ pub mod testing { } use crate::{ client::GlideConnectionOptions, - cluster_routing::{Routable, RoutingInfo}, + cluster_routing::{Routable, RoutingInfo, ShardUpdateResult}, cluster_slotmap::SlotMap, cluster_topology::SLOT_SIZE, cmd, @@ -438,7 +438,7 @@ where &self, slot: u16, slot_addr: SlotAddr, - ) -> Option { + ) -> Option> { self.conn_lock .read() .await @@ -486,7 +486,7 @@ where } // return slots of node - pub(crate) async fn get_slots_of_address(&self, node_address: &str) -> Vec { + pub(crate) async fn get_slots_of_address(&self, node_address: Arc) -> Vec { self.conn_lock .read() .await @@ -675,6 +675,25 @@ impl From for OperationTarget { } } +/// Represents a node to which a `MOVED` or `ASK` error redirects. +#[derive(Clone, Debug)] +pub(crate) struct RedirectNode { + /// The address of the redirect node. + pub address: String, + /// The slot of the redirect node. + pub slot: u16, +} + +impl RedirectNode { + /// Constructs a `RedirectNode` from an optional tuple containing an address and a slot number. + pub(crate) fn from_option_tuple(option: Option<(&str, u16)>) -> Option { + option.map(|(address, slot)| RedirectNode { + address: address.to_string(), + slot, + }) + } +} + struct Message { cmd: CmdArg, sender: oneshot::Sender>, @@ -786,6 +805,10 @@ pin_project! { #[pin] sleep: BoxFuture<'static, ()>, }, + UpdateMoved { + #[pin] + future: BoxFuture<'static, RedisResult<()>>, + }, } } @@ -822,6 +845,7 @@ enum Next { // if not set, then a slot refresh should happen without sending a request afterwards request: Option>, sleep_duration: Option, + moved_redirect: Option, }, ReconnectToInitialNodes { // if not set, then a reconnect should happen without sending a request afterwards @@ -849,8 +873,25 @@ impl Future for Request { } .into(); } + RequestStateProj::UpdateMoved { future } => { + if let Err(err) = ready!(future.poll(cx)) { + // Updating the slot map based on the MOVED error is an optimization. + // If it fails, proceed by retrying the request with the redirected node, + // and allow the slot refresh task to correct the slot map. + info!( + "Failed to update the slot map based on the received MOVED error. + Error: {err:?}" + ); + } + if let Some(request) = self.project().request.take() { + return Next::Retry { request }.into(); + } else { + return Next::Done.into(); + } + } _ => panic!("Request future must be Some"), }; + match ready!(future.poll(cx)) { Ok(item) => { self.respond(Ok(item)); @@ -868,6 +909,7 @@ impl Future for Request { Next::RefreshSlots { request: None, sleep_duration: None, + moved_redirect: RedirectNode::from_option_tuple(err.redirect_node()), } .into() } else if matches!(err.retry_method(), crate::types::RetryMethod::Reconnect) { @@ -913,6 +955,7 @@ impl Future for Request { return Next::RefreshSlots { request: Some(request), sleep_duration: Some(sleep_duration), + moved_redirect: None, } .into(); } @@ -931,6 +974,7 @@ impl Future for Request { } crate::types::RetryMethod::MovedRedirect => { let mut request = this.request.take().unwrap(); + let redirect_node = err.redirect_node(); request.info.set_redirect( err.redirect_node() .map(|(node, _slot)| Redirect::Moved(node.to_string())), @@ -938,6 +982,7 @@ impl Future for Request { Next::RefreshSlots { request: Some(request), sleep_duration: None, + moved_redirect: RedirectNode::from_option_tuple(redirect_node), } .into() } @@ -1242,12 +1287,7 @@ where let mut nodes_to_delete = Vec::new(); let connections_container = inner.conn_lock.read().await; - let all_nodes_with_slots: HashSet = connections_container - .slot_map - .addresses_for_all_nodes() - .iter() - .map(|addr| String::from(*addr)) - .collect(); + let all_nodes_with_slots = connections_container.slot_map.all_node_addresses(); connections_container .all_node_connections() @@ -1280,8 +1320,8 @@ where addrs_to_refresh.extend( all_nodes_with_slots .iter() - .filter(|addr| !all_valid_conns.contains_key(*addr)) - .cloned(), + .filter(|addr| !all_valid_conns.contains_key(addr.as_str())) + .map(|addr| addr.to_string()), ); if !addrs_to_refresh.is_empty() { @@ -1477,7 +1517,7 @@ where { return Ok(()); } - let mut skip_slots_refresh = false; + let mut should_refresh_slots = true; if *policy == RefreshPolicy::Throttable { // Check if the current slot refresh is triggered before the wait duration has passed let last_run_rlock = last_run.read().await; @@ -1496,13 +1536,13 @@ where if passed_time <= wait_duration { debug!("Skipping slot refresh as the wait duration hasn't yet passed. Passed time = {:?}, Wait duration = {:?}", passed_time, wait_duration); - skip_slots_refresh = true; + should_refresh_slots = false; } } } let mut res = Ok(()); - if !skip_slots_refresh { + if should_refresh_slots { let retry_strategy = ExponentialBackoff { initial_interval: DEFAULT_REFRESH_SLOTS_RETRY_INITIAL_INTERVAL, max_interval: DEFAULT_REFRESH_SLOTS_RETRY_MAX_INTERVAL, @@ -1523,28 +1563,47 @@ where res } + /// Determines if the cluster topology has changed and refreshes slots and subscriptions if needed. + /// Returns `RedisResult` with `true` if changes were detected and slots were refreshed, + /// or `false` if no changes were found. Raises an error if refreshing the topology fails. pub(crate) async fn check_topology_and_refresh_if_diff( inner: Arc>, policy: &RefreshPolicy, - ) -> bool { + ) -> RedisResult { let topology_changed = Self::check_for_topology_diff(inner.clone()).await; if topology_changed { - let _ = Self::refresh_slots_and_subscriptions_with_retries(inner.clone(), policy).await; + Self::refresh_slots_and_subscriptions_with_retries(inner.clone(), policy).await?; } - topology_changed + Ok(topology_changed) } async fn periodic_topology_check(inner: Arc>, interval_duration: Duration) { loop { let _ = boxed_sleep(interval_duration).await; - let topology_changed = - Self::check_topology_and_refresh_if_diff(inner.clone(), &RefreshPolicy::Throttable) - .await; - if !topology_changed { - // This serves as a safety measure for validating pubsub subsctiptions state in case it has drifted - // while topology stayed the same. - // For example, a failed attempt to refresh a connection which is triggered from refresh_pubsub_subscriptions(), - // might leave a node unconnected indefinitely in case topology is stable and no request are attempted to this node. + + // Check and refresh topology if needed + let should_refresh_pubsub = match Self::check_topology_and_refresh_if_diff( + inner.clone(), + &RefreshPolicy::Throttable, + ) + .await + { + Ok(topology_changed) => !topology_changed, + Err(err) => { + warn!( + "Failed to refresh slots during periodic topology checks:\n{:?}", + err + ); + true + } + }; + + // Refresh pubsub subscriptions if topology wasn't changed or an error occurred. + // This serves as a safety measure for validating pubsub subsctiptions state in case it has drifted + // while topology stayed the same. + // For example, a failed attempt to refresh a connection which is triggered from refresh_pubsub_subscriptions(), + // might leave a node unconnected indefinitely in case topology is stable and no request are attempted to this node. + if should_refresh_pubsub { Self::refresh_pubsub_subscriptions(inner.clone()).await; } } @@ -1736,21 +1795,20 @@ where .0?; let connections = &*read_guard; // Create a new connection vector of the found nodes - let mut nodes = new_slots.values().flatten().collect::>(); - nodes.sort_unstable(); - nodes.dedup(); + let nodes = new_slots.all_node_addresses(); let nodes_len = nodes.len(); let addresses_and_connections_iter = stream::iter(nodes) .fold( Vec::with_capacity(nodes_len), |mut addrs_and_conns, addr| async move { + let addr = addr.to_string(); if let Some(node) = connections.node_for_address(addr.as_str()) { addrs_and_conns.push((addr, Some(node))); return addrs_and_conns; } // If it's a DNS endpoint, it could have been stored in the existing connections vector using the resolved IP address instead of the DNS endpoint's name. // We shall check if a connection is already exists under the resolved IP name. - let (host, port) = match get_host_and_port_from_addr(addr) { + let (host, port) = match get_host_and_port_from_addr(&addr) { Some((host, port)) => (host, port), None => { addrs_and_conns.push((addr, None)); @@ -1776,10 +1834,10 @@ where |connections, (addr, node)| async { let mut cluster_params = inner.cluster_params.clone(); let subs_guard = inner.subscriptions_by_address.read().await; - cluster_params.pubsub_subscriptions = subs_guard.get(addr).cloned(); + cluster_params.pubsub_subscriptions = subs_guard.get(&addr).cloned(); drop(subs_guard); let node = get_or_create_conn( - addr, + &addr, node, &cluster_params, RefreshConnectionType::AllConnections, @@ -1787,7 +1845,7 @@ where ) .await; if let Ok(node) = node { - connections.0.insert(addr.into(), node); + connections.0.insert(addr, node); } connections }, @@ -1807,6 +1865,77 @@ where Ok(()) } + /// Handles MOVED errors by updating the client's slot and node mappings based on the new primary's role: + /// /// Updates the slot and node mappings in response to a MOVED error. + /// This function handles various scenarios based on the new primary's role: + /// + /// 1. **No Change**: If the new primary is already the current slot owner, no updates are needed. + /// 2. **Failover**: If the new primary is a replica within the same shard (indicating a failover), + /// the slot ownership is updated by promoting the replica to the primary in the existing shard addresses. + /// 3. **Slot Migration**: If the new primary is an existing primary in another shard, this indicates a slot migration, + /// and the slot mapping is updated to point to the new shard addresses. + /// 4. **Replica Moved to a Different Shard**: If the new primary is a replica in a different shard, it can be due to: + /// - The replica became the primary of its shard after a failover, with new slots migrated to it. + /// - The replica has moved to a different shard as the primary. + /// Since further information is unknown, the replica is removed from its original shard and added as the primary of a new shard. + /// 5. **New Node**: If the new primary is unknown, it is added as a new node in a new shard, possibly indicating scale-out. + /// + /// # Arguments + /// * `inner` - Shared reference to InnerCore containing connection and slot state. + /// * `slot` - The slot number reported as moved. + /// * `new_primary` - The address of the node now responsible for the slot. + /// + /// # Returns + /// * `RedisResult<()>` indicating success or failure in updating slot mappings. + async fn update_upon_moved_error( + inner: Arc>, + slot: u16, + new_primary: Arc, + ) -> RedisResult<()> { + let mut connections_container = inner.conn_lock.write().await; + let curr_shard_addrs = connections_container.slot_map.shard_addrs_for_slot(slot); + // Check if the new primary is part of the current shard and update if required + if let Some(curr_shard_addrs) = curr_shard_addrs { + match curr_shard_addrs.attempt_shard_role_update(new_primary.clone()) { + // Scenario 1: No changes needed as the new primary is already the current slot owner. + // Scenario 2: Failover occurred and the new primary was promoted from a replica. + ShardUpdateResult::AlreadyPrimary | ShardUpdateResult::Promoted => return Ok(()), + // The node was not found in this shard, proceed with further scenarios. + ShardUpdateResult::NodeNotFound => {} + } + } + + // Scenario 3 & 4: Check if the new primary exists in other shards + let mut nodes_iter = connections_container.slot_map_nodes(); + for (node_addr, shard_addrs_arc) in &mut nodes_iter { + if node_addr == new_primary { + let is_existing_primary = shard_addrs_arc.primary().eq(&new_primary); + if is_existing_primary { + // Scenario 3: Slot Migration - The new primary is an existing primary in another shard + // Update the associated addresses for `slot` to `shard_addrs`. + drop(nodes_iter); + return connections_container + .slot_map + .update_slot_range(slot, shard_addrs_arc.clone()); + } else { + // Scenario 4: The MOVED error redirects to `new_primary` which is known as a replica in a shard that doesn’t own `slot`. + // Remove the replica from its existing shard and treat it as a new node in a new shard. + shard_addrs_arc.remove_replica(new_primary.clone())?; + drop(nodes_iter); + return connections_container + .slot_map + .add_new_primary(slot, new_primary); + } + } + } + + // Scenario 5: New Node - The new primary is not present in the current slots map, add it as a primary of a new shard. + drop(nodes_iter); + connections_container + .slot_map + .add_new_primary(slot, new_primary) + } + async fn execute_on_multiple_nodes<'a>( cmd: &'a Arc, routing: &'a MultipleNodeRoutingInfo, @@ -2226,26 +2355,39 @@ where Next::RefreshSlots { request, sleep_duration, + moved_redirect, } => { poll_flush_action = poll_flush_action.change_state(PollFlushAction::RebuildSlots); - if let Some(request) = request { - let future: RequestState< - Pin + Send>>, - > = match sleep_duration { - Some(sleep_duration) => RequestState::Sleep { + let future: Option< + RequestState + Send>>>, + > = if let Some(moved_redirect) = moved_redirect { + Some(RequestState::UpdateMoved { + future: Box::pin(ClusterConnInner::update_upon_moved_error( + self.inner.clone(), + moved_redirect.slot, + moved_redirect.address.into(), + )), + }) + } else if let Some(ref request) = request { + match sleep_duration { + Some(sleep_duration) => Some(RequestState::Sleep { sleep: boxed_sleep(sleep_duration), - }, - None => RequestState::Future { + }), + None => Some(RequestState::Future { future: Box::pin(Self::try_request( request.info.clone(), self.inner.clone(), )), - }, - }; + }), + } + } else { + None + }; + if let Some(future) = future { self.in_flight_requests.push(Box::pin(Request { retry_params: self.inner.cluster_params.retry_params.clone(), - request: Some(request), + request, future, })); } diff --git a/redis/src/cluster_routing.rs b/redis/src/cluster_routing.rs index 27abd54fe..da10a4b44 100644 --- a/redis/src/cluster_routing.rs +++ b/redis/src/cluster_routing.rs @@ -1,11 +1,13 @@ -use std::cmp::min; -use std::collections::HashMap; - use crate::cluster_topology::get_slot; use crate::cmd::{Arg, Cmd}; use crate::types::Value; -use crate::{ErrorKind, RedisResult}; +use crate::{ErrorKind, RedisError, RedisResult}; +use core::cmp::Ordering; +use std::cmp::min; +use std::collections::HashMap; use std::iter::Once; +use std::sync::Arc; +use std::sync::{RwLock, RwLockWriteGuard}; #[derive(Clone)] pub(crate) enum Redirect { @@ -848,7 +850,7 @@ impl Routable for Value { } } -#[derive(Debug, Hash)] +#[derive(Debug, Hash, Clone)] pub(crate) struct Slot { pub(crate) start: u16, pub(crate) end: u16, @@ -866,14 +868,6 @@ impl Slot { } } - pub fn start(&self) -> u16 { - self.start - } - - pub fn end(&self) -> u16 { - self.end - } - #[allow(dead_code)] // used in tests pub(crate) fn master(&self) -> &str { self.master.as_str() @@ -898,32 +892,173 @@ pub enum SlotAddr { ReplicaRequired, } +/// Represents the result of checking a shard for the status of a node. +/// +/// This enum indicates whether a given node is already the primary, has been promoted to a primary from a replica, +/// or is not found in the shard at all. +/// +/// Variants: +/// - `AlreadyPrimary`: The specified node is already the primary for the shard, so no changes are needed. +/// - `Promoted`: The specified node was found as a replica and successfully promoted to primary. +/// - `NodeNotFound`: The specified node is neither the current primary nor a replica within the shard. +#[derive(PartialEq, Debug)] +pub(crate) enum ShardUpdateResult { + AlreadyPrimary, + Promoted, + NodeNotFound, +} + +const READ_LK_ERR_SHARDADDRS: &str = "Failed to acquire read lock for ShardAddrs"; +const WRITE_LK_ERR_SHARDADDRS: &str = "Failed to acquire write lock for ShardAddrs"; /// This is just a simplified version of [`Slot`], /// which stores only the master and [optional] replica /// to avoid the need to choose a replica each time /// a command is executed -#[derive(Debug, Eq, PartialEq)] -pub(crate) struct SlotAddrs { - pub(crate) primary: String, - pub(crate) replicas: Vec, +#[derive(Debug)] +pub(crate) struct ShardAddrs { + primary: RwLock>, + replicas: RwLock>>, } -impl SlotAddrs { - pub(crate) fn new(primary: String, replicas: Vec) -> Self { +impl PartialEq for ShardAddrs { + fn eq(&self, other: &Self) -> bool { + let self_primary = self.primary.read().expect(READ_LK_ERR_SHARDADDRS); + let other_primary = other.primary.read().expect(READ_LK_ERR_SHARDADDRS); + + let self_replicas = self.replicas.read().expect(READ_LK_ERR_SHARDADDRS); + let other_replicas = other.replicas.read().expect(READ_LK_ERR_SHARDADDRS); + + *self_primary == *other_primary && *self_replicas == *other_replicas + } +} + +impl Eq for ShardAddrs {} + +impl PartialOrd for ShardAddrs { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ShardAddrs { + fn cmp(&self, other: &Self) -> Ordering { + let self_primary = self.primary.read().expect(READ_LK_ERR_SHARDADDRS); + let other_primary = other.primary.read().expect(READ_LK_ERR_SHARDADDRS); + + let primary_cmp = self_primary.cmp(&other_primary); + if primary_cmp == Ordering::Equal { + let self_replicas = self.replicas.read().expect(READ_LK_ERR_SHARDADDRS); + let other_replicas = other.replicas.read().expect(READ_LK_ERR_SHARDADDRS); + return self_replicas.cmp(&other_replicas); + } + + primary_cmp + } +} + +impl ShardAddrs { + pub(crate) fn new(primary: Arc, replicas: Vec>) -> Self { + let primary = RwLock::new(primary); + let replicas = RwLock::new(replicas); Self { primary, replicas } } - pub(crate) fn from_slot(slot: Slot) -> Self { - SlotAddrs::new(slot.master, slot.replicas) + pub(crate) fn new_with_primary(primary: Arc) -> Self { + Self::new(primary, Vec::default()) + } + + pub(crate) fn primary(&self) -> Arc { + self.primary.read().expect(READ_LK_ERR_SHARDADDRS).clone() + } + + pub(crate) fn replicas(&self) -> std::sync::RwLockReadGuard>> { + self.replicas.read().expect(READ_LK_ERR_SHARDADDRS) + } + + /// Attempts to update the shard roles based on the provided `new_primary`. + /// + /// This function evaluates whether the specified `new_primary` node is already + /// the primary, a replica that can be promoted to primary, or a node not present + /// in the shard. It handles three scenarios: + /// + /// 1. **Already Primary**: If the `new_primary` is already the current primary, + /// the function returns `ShardUpdateResult::AlreadyPrimary` and no changes are made. + /// + /// 2. **Promoted**: If the `new_primary` is found in the list of replicas, it is promoted + /// to primary by swapping it with the current primary, and the function returns + /// `ShardUpdateResult::Promoted`. + /// + /// 3. **Node Not Found**: If the `new_primary` is neither the current primary nor a replica, + /// the function returns `ShardUpdateResult::NodeNotFound` to indicate that the node is + /// not part of the current shard. + /// + /// # Arguments: + /// * `new_primary` - Representing the node to be promoted or checked. + /// + /// # Returns: + /// * `ShardUpdateResult` - The result of the role update operation. + pub(crate) fn attempt_shard_role_update(&self, new_primary: Arc) -> ShardUpdateResult { + let mut primary_lock = self.primary.write().expect(WRITE_LK_ERR_SHARDADDRS); + let mut replicas_lock = self.replicas.write().expect(WRITE_LK_ERR_SHARDADDRS); + + // If the new primary is already the current primary, return early. + if *primary_lock == new_primary { + return ShardUpdateResult::AlreadyPrimary; + } + + // If the new primary is found among replicas, swap it with the current primary. + if let Some(replica_idx) = Self::replica_index(&replicas_lock, new_primary.clone()) { + std::mem::swap(&mut *primary_lock, &mut replicas_lock[replica_idx]); + return ShardUpdateResult::Promoted; + } + + // If the new primary isn't part of the shard. + ShardUpdateResult::NodeNotFound + } + + fn replica_index( + replicas: &RwLockWriteGuard<'_, Vec>>, + target_replica: Arc, + ) -> Option { + replicas + .iter() + .position(|curr_replica| **curr_replica == *target_replica) + } + + /// Removes the specified `replica_to_remove` from the shard's replica list if it exists. + /// This method searches for the replica's index and removes it from the list. If the replica + /// is not found, it returns an error. + /// + /// # Arguments + /// * `replica_to_remove` - The address of the replica to be removed. + /// + /// # Returns + /// * `RedisResult<()>` - `Ok(())` if the replica was successfully removed, or an error if the + /// replica was not found. + pub(crate) fn remove_replica(&self, replica_to_remove: Arc) -> RedisResult<()> { + let mut replicas_lock = self.replicas.write().expect(WRITE_LK_ERR_SHARDADDRS); + if let Some(index) = Self::replica_index(&replicas_lock, replica_to_remove.clone()) { + replicas_lock.remove(index); + Ok(()) + } else { + Err(RedisError::from(( + ErrorKind::ClientError, + "Couldn't remove replica", + format!("Replica {replica_to_remove:?} not found"), + ))) + } } } -impl<'a> IntoIterator for &'a SlotAddrs { - type Item = &'a String; - type IntoIter = std::iter::Chain, std::slice::Iter<'a, String>>; +impl<'a> IntoIterator for &'a ShardAddrs { + type Item = Arc; + type IntoIter = std::iter::Chain>, std::vec::IntoIter>>; fn into_iter(self) -> Self::IntoIter { - std::iter::once(&self.primary).chain(self.replicas.iter()) + let primary = self.primary.read().expect(READ_LK_ERR_SHARDADDRS).clone(); + let replicas = self.replicas.read().expect(READ_LK_ERR_SHARDADDRS).clone(); + + std::iter::once(primary).chain(replicas) } } @@ -953,10 +1088,12 @@ impl Route { mod tests { use super::{ command_for_multi_slot_indices, AggregateOp, MultipleNodeRoutingInfo, ResponsePolicy, - Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr, + Route, RoutingInfo, ShardAddrs, SingleNodeRoutingInfo, SlotAddr, }; + use crate::cluster_routing::ShardUpdateResult; use crate::{cluster_topology::slot, cmd, parser::parse_redis_value, Value}; use core::panic; + use std::sync::{Arc, RwLock}; #[test] fn test_routing_info_mixed_capatalization() { @@ -1347,4 +1484,44 @@ mod tests { let result = super::combine_map_results(input); assert!(result.is_err()); } + + fn create_shard_addrs(primary: &str, replicas: Vec<&str>) -> ShardAddrs { + ShardAddrs { + primary: RwLock::new(Arc::new(primary.to_string())), + replicas: RwLock::new( + replicas + .into_iter() + .map(|r| Arc::new(r.to_string())) + .collect(), + ), + } + } + + #[test] + fn test_attempt_shard_role_update_already_primary() { + let shard_addrs = create_shard_addrs("node1:6379", vec!["node2:6379", "node3:6379"]); + let result = shard_addrs.attempt_shard_role_update(Arc::new("node1:6379".to_string())); + assert_eq!(result, ShardUpdateResult::AlreadyPrimary); + } + + #[test] + fn test_attempt_shard_role_update_promoted() { + let shard_addrs = create_shard_addrs("node1:6379", vec!["node2:6379", "node3:6379"]); + let result = shard_addrs.attempt_shard_role_update(Arc::new("node2:6379".to_string())); + assert_eq!(result, ShardUpdateResult::Promoted); + + let primary = shard_addrs.primary.read().unwrap().clone(); + assert_eq!(primary.as_str(), "node2:6379"); + + let replicas = shard_addrs.replicas.read().unwrap(); + assert_eq!(replicas.len(), 2); + assert!(replicas.iter().any(|r| r.as_str() == "node1:6379")); + } + + #[test] + fn test_attempt_shard_role_update_node_not_found() { + let shard_addrs = create_shard_addrs("node1:6379", vec!["node2:6379", "node3:6379"]); + let result = shard_addrs.attempt_shard_role_update(Arc::new("node4:6379".to_string())); + assert_eq!(result, ShardUpdateResult::NodeNotFound); + } } diff --git a/redis/src/cluster_slotmap.rs b/redis/src/cluster_slotmap.rs index 7f1f70af9..68d6a5c2b 100644 --- a/redis/src/cluster_slotmap.rs +++ b/redis/src/cluster_slotmap.rs @@ -1,26 +1,23 @@ +use std::sync::Arc; use std::{ collections::{BTreeMap, HashSet}, fmt::Display, sync::atomic::AtomicUsize, }; -use crate::cluster_routing::{Route, Slot, SlotAddr, SlotAddrs}; +use dashmap::DashMap; + +use crate::cluster_routing::{Route, ShardAddrs, Slot, SlotAddr}; +use crate::ErrorKind; +use crate::RedisError; +use crate::RedisResult; +pub(crate) type NodesMap = DashMap, Arc>; #[derive(Debug)] pub(crate) struct SlotMapValue { pub(crate) start: u16, - pub(crate) addrs: SlotAddrs, - pub(crate) latest_used_replica: AtomicUsize, -} - -impl SlotMapValue { - fn from_slot(slot: Slot) -> Self { - Self { - start: slot.start(), - addrs: SlotAddrs::from_slot(slot), - latest_used_replica: AtomicUsize::new(0), - } - } + pub(crate) addrs: Arc, + pub(crate) last_used_replica: Arc, } #[derive(Debug, Default, Clone, PartialEq, Copy)] @@ -33,6 +30,7 @@ pub(crate) enum ReadFromReplicaStrategy { #[derive(Debug, Default)] pub(crate) struct SlotMap { pub(crate) slots: BTreeMap, + nodes_map: NodesMap, read_from_replica: ReadFromReplicaStrategy, } @@ -40,34 +38,79 @@ fn get_address_from_slot( slot: &SlotMapValue, read_from_replica: ReadFromReplicaStrategy, slot_addr: SlotAddr, -) -> &str { - if slot_addr == SlotAddr::Master || slot.addrs.replicas.is_empty() { - return slot.addrs.primary.as_str(); +) -> Arc { + let addrs = &slot.addrs; + if slot_addr == SlotAddr::Master || addrs.replicas().is_empty() { + return addrs.primary(); } match read_from_replica { - ReadFromReplicaStrategy::AlwaysFromPrimary => slot.addrs.primary.as_str(), + ReadFromReplicaStrategy::AlwaysFromPrimary => addrs.primary(), ReadFromReplicaStrategy::RoundRobin => { let index = slot - .latest_used_replica + .last_used_replica .fetch_add(1, std::sync::atomic::Ordering::Relaxed) - % slot.addrs.replicas.len(); - slot.addrs.replicas[index].as_str() + % addrs.replicas().len(); + addrs.replicas()[index].clone() } } } impl SlotMap { - pub(crate) fn new(slots: Vec, read_from_replica: ReadFromReplicaStrategy) -> Self { - let mut this = Self { + pub(crate) fn new_with_read_strategy(read_from_replica: ReadFromReplicaStrategy) -> Self { + SlotMap { slots: BTreeMap::new(), + nodes_map: DashMap::new(), read_from_replica, - }; - this.slots.extend( - slots - .into_iter() - .map(|slot| (slot.end(), SlotMapValue::from_slot(slot))), - ); - this + } + } + + pub(crate) fn new(slots: Vec, read_from_replica: ReadFromReplicaStrategy) -> Self { + let mut slot_map = SlotMap::new_with_read_strategy(read_from_replica); + let mut shard_id = 0; + for slot in slots { + let primary = Arc::new(slot.master); + // Get the shard addresses if the primary is already in nodes_map; + // otherwise, create a new ShardAddrs and add it + let shard_addrs_arc = slot_map + .nodes_map + .entry(primary.clone()) + .or_insert_with(|| { + shard_id += 1; + let replicas: Vec> = + slot.replicas.into_iter().map(Arc::new).collect(); + Arc::new(ShardAddrs::new(primary, replicas)) + }) + .clone(); + + // Add all replicas to nodes_map with a reference to the same ShardAddrs if not already present + shard_addrs_arc.replicas().iter().for_each(|replica| { + slot_map + .nodes_map + .entry(replica.clone()) + .or_insert(shard_addrs_arc.clone()); + }); + + // Insert the slot value into the slots map + slot_map.slots.insert( + slot.end, + SlotMapValue { + addrs: shard_addrs_arc.clone(), + start: slot.start, + last_used_replica: Arc::new(AtomicUsize::new(0)), + }, + ); + } + slot_map + } + + pub(crate) fn nodes_map(&self) -> &NodesMap { + &self.nodes_map + } + + pub fn is_primary(&self, address: &String) -> bool { + self.nodes_map + .get(address) + .map_or(false, |shard_addrs| *shard_addrs.primary() == *address) } pub fn slot_value_for_route(&self, route: &Route) -> Option<&SlotMapValue> { @@ -84,40 +127,45 @@ impl SlotMap { }) } - pub fn slot_addr_for_route(&self, route: &Route) -> Option<&str> { + pub fn slot_addr_for_route(&self, route: &Route) -> Option> { self.slot_value_for_route(route).map(|slot_value| { get_address_from_slot(slot_value, self.read_from_replica, route.slot_addr()) }) } - pub fn values(&self) -> impl Iterator { - self.slots.values().map(|slot_value| &slot_value.addrs) - } - - fn all_unique_addresses(&self, only_primaries: bool) -> HashSet<&str> { - let mut addresses = HashSet::new(); - for slot in self.values() { - addresses.insert(slot.primary.as_str()); - if !only_primaries { - addresses.extend(slot.replicas.iter().map(|str| str.as_str())); - } - } - - addresses + /// Retrieves the shard addresses (`ShardAddrs`) for the specified `slot` by looking it up in the `slots` tree, + /// returning a reference to the stored shard addresses if found. + pub(crate) fn shard_addrs_for_slot(&self, slot: u16) -> Option> { + self.slots + .range(slot..) + .next() + .map(|(_, slot_value)| slot_value.addrs.clone()) } - pub fn addresses_for_all_primaries(&self) -> HashSet<&str> { - self.all_unique_addresses(true) + pub fn addresses_for_all_primaries(&self) -> HashSet> { + self.nodes_map + .iter() + .map(|map_item| { + let shard_addrs = map_item.value(); + shard_addrs.primary().clone() + }) + .collect() } - pub fn addresses_for_all_nodes(&self) -> HashSet<&str> { - self.all_unique_addresses(false) + pub fn all_node_addresses(&self) -> HashSet> { + self.nodes_map + .iter() + .map(|map_item| { + let node_addr = map_item.key(); + node_addr.clone() + }) + .collect() } pub fn addresses_for_multi_slot<'a, 'b>( &'a self, routes: &'b [(Route, Vec)], - ) -> impl Iterator> + 'a + ) -> impl Iterator>> + 'a where 'b: 'a, { @@ -127,14 +175,12 @@ impl SlotMap { } // Returns the slots that are assigned to the given address. - pub(crate) fn get_slots_of_node(&self, node_address: &str) -> Vec { - let node_address = node_address.to_string(); + pub(crate) fn get_slots_of_node(&self, node_address: Arc) -> Vec { self.slots .iter() .filter_map(|(end, slot_value)| { - if slot_value.addrs.primary == node_address - || slot_value.addrs.replicas.contains(&node_address) - { + let addrs = &slot_value.addrs; + if addrs.primary() == node_address || addrs.replicas().contains(&node_address) { Some(slot_value.start..(*end + 1)) } else { None @@ -148,31 +194,266 @@ impl SlotMap { &self, slot: u16, slot_addr: SlotAddr, - ) -> Option { + ) -> Option> { self.slots.range(slot..).next().and_then(|(_, slot_value)| { if slot_value.start <= slot { - Some( - get_address_from_slot(slot_value, self.read_from_replica, slot_addr) - .to_string(), - ) + Some(get_address_from_slot( + slot_value, + self.read_from_replica, + slot_addr, + )) } else { None } }) } + + /// Inserts a single slot into the `slots` map, associating it with a new `SlotMapValue` + /// that contains the shard addresses (`shard_addrs`) and represents a range of just the given slot. + /// + /// # Returns + /// * `Option` - Returns the previous `SlotMapValue` if a slot already existed for the given key, + /// or `None` if the slot was newly inserted. + fn insert_single_slot( + &mut self, + slot: u16, + shard_addrs: Arc, + ) -> Option { + self.slots.insert( + slot, + SlotMapValue { + start: slot, + addrs: shard_addrs, + last_used_replica: Arc::new(AtomicUsize::new(0)), + }, + ) + } + + /// Creats a new shard addresses that contain only the primary node, adds it to the nodes map + /// and updates the slots tree for the given `slot` to point to the new primary. + pub(crate) fn add_new_primary(&mut self, slot: u16, node_addr: Arc) -> RedisResult<()> { + let shard_addrs = Arc::new(ShardAddrs::new_with_primary(node_addr.clone())); + self.nodes_map.insert(node_addr, shard_addrs.clone()); + self.update_slot_range(slot, shard_addrs) + } + + fn shard_addrs_equal(shard1: &Arc, shard2: &Arc) -> bool { + Arc::ptr_eq(shard1, shard2) + } + + /// Updates the end of an existing slot range in the `slots` tree. This function removes the slot entry + /// associated with the current end (`curr_end`) and reinserts it with a new end value (`new_end`). + /// + /// The operation effectively shifts the range's end boundary from `curr_end` to `new_end`, while keeping the + /// rest of the slot's data (e.g., shard addresses) unchanged. + /// + /// # Parameters: + /// - `curr_end`: The current end of the slot range that will be removed. + /// - `new_end`: The new end of the slot range where the slot data will be reinserted. + fn update_end_range(&mut self, curr_end: u16, new_end: u16) -> RedisResult<()> { + if let Some(curr_slot_val) = self.slots.remove(&curr_end) { + self.slots.insert(new_end, curr_slot_val); + return Ok(()); + } + Err(RedisError::from(( + ErrorKind::ClientError, + "Couldn't find slot range with end: {curr_end:?} in the slot map", + ))) + } + + /// Attempts to merge the current `slot` with the next slot range in the `slots` map, if they are consecutive + /// and share the same shard addresses. If the next slot's starting position is exactly `slot + 1` + /// and the shard addresses match, the next slot's starting point is moved to `slot`, effectively merging + /// the slot to the existing range. + /// + /// # Parameters: + /// - `slot`: The slot to attempt to merge with the next slot. + /// - `new_addrs`: The shard addresses to compare with the next slot's shard addresses. + /// + /// # Returns: + /// - `bool`: Returns `true` if the merge was successful, otherwise `false`. + fn try_merge_to_next_range(&mut self, slot: u16, new_addrs: Arc) -> bool { + if let Some((_next_end, next_slot_value)) = self.slots.range_mut((slot + 1)..).next() { + if next_slot_value.start == slot + 1 + && Self::shard_addrs_equal(&next_slot_value.addrs, &new_addrs) + { + next_slot_value.start = slot; + return true; + } + } + false + } + + /// Attempts to merge the current slot with the previous slot range in the `slots` map, if they are consecutive + /// and share the same shard addresses. If the previous slot ends at `slot - 1` and the shard addresses match, + /// the end of the previous slot is extended to `slot`, effectively merging the slot to the existing range. + /// + /// # Parameters: + /// - `slot`: The slot to attempt to merge with the previous slot. + /// - `new_addrs`: The shard addresses to compare with the previous slot's shard addresses. + /// + /// # Returns: + /// - `RedisResult`: Returns `Ok(true)` if the merge was successful, otherwise `Ok(false)`. + fn try_merge_to_prev_range( + &mut self, + slot: u16, + new_addrs: Arc, + ) -> RedisResult { + if let Some((prev_end, prev_slot_value)) = self.slots.range_mut(..slot).next_back() { + if *prev_end == slot - 1 && Self::shard_addrs_equal(&prev_slot_value.addrs, &new_addrs) + { + let prev_end = *prev_end; + self.update_end_range(prev_end, slot)?; + return Ok(true); + } + } + Ok(false) + } + + /// Updates the slot range in the `slots` to point to new shard addresses. + /// + /// This function handles the following scenarios when updating the slot mapping: + /// + /// **Scenario 1 - Same Shard Owner**: + /// - If the slot is already associated with the same shard addresses, no changes are needed. + /// + /// **Scenario 2 - Single Slot Range**: + /// - If the slot is the only slot in the current range (i.e., `start == end == slot`), + /// the function simply replaces the shard addresses for this slot with the new shard addresses. + /// + /// **Scenario 3 - Slot Matches the End of a Range**: + /// - If the slot is the last slot in the current range (`slot == end`), the function + /// adjusts the range by decrementing the end of the current range by 1 (making the + /// new end equal to `end - 1`). The current slot is then removed and a new entry is + /// inserted for the slot with the new shard addresses. + /// + /// **Scenario 4 - Slot Matches the Start of a Range**: + /// - If the slot is the first slot in the current range (`slot == start`), the function + /// increments the start of the current range by 1 (making the new start equal to + /// `start + 1`). A new entry is then inserted for the slot with the new shard addresses. + /// + /// **Scenario 5 - Slot is Within a Range**: + /// - If the slot falls between the start and end of a current range (`start < slot < end`), + /// the function splits the current range into two. The range before the slot (`start` to + /// `slot - 1`) remains with the old shard addresses, a new entry for the slot is added + /// with the new shard addresses, and the range after the slot (`slot + 1` to `end`) is + /// reinserted with the old shard addresses. + /// + /// **Scenario 6 - Slot is Not Covered**: + /// - If the slot is not part of any existing range, a new entry is simply inserted into + /// the `slots` tree with the new shard addresses. + /// + /// # Parameters: + /// - `slot`: The specific slot that needs to be updated. + /// - `new_addrs`: The new shard addresses to associate with the slot. + /// + /// # Returns: + /// - `RedisResult<()>`: Indicates the success or failure of the operation. + pub(crate) fn update_slot_range( + &mut self, + slot: u16, + new_addrs: Arc, + ) -> RedisResult<()> { + let curr_tree_node = + self.slots + .range_mut(slot..) + .next() + .and_then(|(&end, slot_map_value)| { + if slot >= slot_map_value.start && slot <= end { + Some((end, slot_map_value)) + } else { + None + } + }); + + if let Some((curr_end, curr_slot_val)) = curr_tree_node { + // Scenario 1: Same shard owner + if Self::shard_addrs_equal(&curr_slot_val.addrs, &new_addrs) { + return Ok(()); + } + // Scenario 2: The slot is the only slot in the current range + else if curr_slot_val.start == curr_end && curr_slot_val.start == slot { + // Replace the shard addresses of the current slot value + curr_slot_val.addrs = new_addrs; + // Scenario 3: Slot matches the end of the current range + } else if slot == curr_end { + // Merge with the next range if shard addresses match + if self.try_merge_to_next_range(slot, new_addrs.clone()) { + // Adjust current range end + self.update_end_range(curr_end, curr_end - 1)?; + } else { + // Insert as a standalone slot + let curr_slot_val = self.insert_single_slot(curr_end, new_addrs); + if let Some(curr_slot_val) = curr_slot_val { + // Adjust current range end + self.slots.insert(curr_end - 1, curr_slot_val); + } + } + + // Scenario 4: Slot matches the start of the current range + } else if slot == curr_slot_val.start { + // Adjust current range start + curr_slot_val.start += 1; + // Attempt to merge with the previous range + if !self.try_merge_to_prev_range(slot, new_addrs.clone())? { + // Insert as a standalone slot + self.insert_single_slot(slot, new_addrs); + } + + // Scenario 5: Slot is within the current range + } else if slot > curr_slot_val.start && slot < curr_end { + // We will split the current range into three parts: + // A: [start, slot - 1], which will remain owned by the current shard, + // B: [slot, slot], which will be owned by the new shard addresses, + // C: [slot + 1, end], which will remain owned by the current shard. + + let start: u16 = curr_slot_val.start; + let addrs = curr_slot_val.addrs.clone(); + let last_used_replica = curr_slot_val.last_used_replica.clone(); + + // Modify the current slot range to become part C: [slot + 1, end], still owned by the current shard. + curr_slot_val.start = slot + 1; + + // Create and insert a new SlotMapValue representing part A: [start, slot - 1], + // still owned by the current shard, into the slot map. + self.slots.insert( + slot - 1, + SlotMapValue { + start, + addrs, + last_used_replica, + }, + ); + + // Insert the new shard addresses into the slot map as part B: [slot, slot], + // which will be owned by the new shard addresses. + self.insert_single_slot(slot, new_addrs); + } + // Scenario 6: Slot isn't covered by any existing range + } else { + // Try merging with the previous or next range; if no merge is possible, insert as a standalone slot + if !self.try_merge_to_prev_range(slot, new_addrs.clone())? + && !self.try_merge_to_next_range(slot, new_addrs.clone()) + { + self.insert_single_slot(slot, new_addrs); + } + } + Ok(()) + } } impl Display for SlotMap { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { writeln!(f, "Strategy: {:?}. Slot mapping:", self.read_from_replica)?; for (end, slot_map_value) in self.slots.iter() { + let addrs = &slot_map_value.addrs; writeln!( f, "({}-{}): primary: {}, replicas: {:?}", slot_map_value.start, end, - slot_map_value.addrs.primary, - slot_map_value.addrs.replicas + addrs.primary(), + addrs.replicas() )?; } Ok(()) @@ -180,9 +461,22 @@ impl Display for SlotMap { } #[cfg(test)] -mod tests { +mod tests_cluster_slotmap { use super::*; + fn process_expected(expected: Vec<&str>) -> HashSet> { + as IntoIterator>::into_iter(HashSet::from_iter(expected)) + .map(|s| Arc::new(s.to_string())) + .collect() + } + + fn process_expected_with_option(expected: Vec>) -> Vec> { + expected + .into_iter() + .filter_map(|opt| opt.map(|s| Arc::new(s.to_string()))) + .collect() + } + #[test] fn test_slot_map_retrieve_routes() { let slot_map = SlotMap::new( @@ -208,19 +502,19 @@ mod tests { .is_none()); assert_eq!( "node1:6379", - slot_map + *slot_map .slot_addr_for_route(&Route::new(1, SlotAddr::Master)) .unwrap() ); assert_eq!( "node1:6379", - slot_map + *slot_map .slot_addr_for_route(&Route::new(500, SlotAddr::Master)) .unwrap() ); assert_eq!( "node1:6379", - slot_map + *slot_map .slot_addr_for_route(&Route::new(1000, SlotAddr::Master)) .unwrap() ); @@ -230,19 +524,19 @@ mod tests { assert_eq!( "node2:6379", - slot_map + *slot_map .slot_addr_for_route(&Route::new(1002, SlotAddr::Master)) .unwrap() ); assert_eq!( "node2:6379", - slot_map + *slot_map .slot_addr_for_route(&Route::new(1500, SlotAddr::Master)) .unwrap() ); assert_eq!( "node2:6379", - slot_map + *slot_map .slot_addr_for_route(&Route::new(2000, SlotAddr::Master)) .unwrap() ); @@ -293,17 +587,17 @@ mod tests { let addresses = slot_map.addresses_for_all_primaries(); assert_eq!( addresses, - HashSet::from_iter(["node1:6379", "node2:6379", "node3:6379"]) + process_expected(vec!["node1:6379", "node2:6379", "node3:6379"]) ); } #[test] fn test_slot_map_get_all_nodes() { let slot_map = get_slot_map(ReadFromReplicaStrategy::AlwaysFromPrimary); - let addresses = slot_map.addresses_for_all_nodes(); + let addresses = slot_map.all_node_addresses(); assert_eq!( addresses, - HashSet::from_iter([ + process_expected(vec![ "node1:6379", "node2:6379", "node3:6379", @@ -327,11 +621,11 @@ mod tests { let addresses = slot_map .addresses_for_multi_slot(&routes) .collect::>(); - assert!(addresses.contains(&Some("node1:6379"))); + assert!(addresses.contains(&Some(Arc::new("node1:6379".to_string())))); assert!( - addresses.contains(&Some("replica4:6379")) - || addresses.contains(&Some("replica5:6379")) - || addresses.contains(&Some("replica6:6379")) + addresses.contains(&Some(Arc::new("replica4:6379".to_string()))) + || addresses.contains(&Some(Arc::new("replica5:6379".to_string()))) + || addresses.contains(&Some(Arc::new("replica6:6379".to_string()))) ); } @@ -348,19 +642,21 @@ mod tests { (Route::new(3, SlotAddr::ReplicaOptional), vec![]), (Route::new(2003, SlotAddr::Master), vec![]), ]; - let addresses = slot_map + let addresses: Vec> = slot_map .addresses_for_multi_slot(&routes) - .collect::>(); + .flatten() + .collect(); + assert_eq!( addresses, - vec![ + process_expected_with_option(vec![ Some("replica1:6379"), Some("node3:6379"), Some("replica1:6379"), Some("node3:6379"), Some("replica1:6379"), Some("node3:6379") - ] + ]) ); } @@ -373,12 +669,19 @@ mod tests { (Route::new(6000, SlotAddr::ReplicaOptional), vec![]), (Route::new(2002, SlotAddr::Master), vec![]), ]; - let addresses = slot_map + let addresses: Vec> = slot_map .addresses_for_multi_slot(&routes) - .collect::>(); + .flatten() + .collect(); + assert_eq!( addresses, - vec![Some("replica1:6379"), None, None, Some("node3:6379")] + process_expected_with_option(vec![ + Some("replica1:6379"), + None, + None, + Some("node3:6379") + ]) ); } @@ -395,6 +698,9 @@ mod tests { assert_eq!( addresses, vec!["replica4:6379", "replica5:6379", "replica6:6379"] + .into_iter() + .map(|s| Arc::new(s.to_string())) + .collect::>() ); } @@ -402,34 +708,422 @@ mod tests { fn test_get_slots_of_node() { let slot_map = get_slot_map(ReadFromReplicaStrategy::AlwaysFromPrimary); assert_eq!( - slot_map.get_slots_of_node("node1:6379"), + slot_map.get_slots_of_node(Arc::new("node1:6379".to_string())), (1..1001).collect::>() ); assert_eq!( - slot_map.get_slots_of_node("node2:6379"), + slot_map.get_slots_of_node(Arc::new("node2:6379".to_string())), vec![1002..2001, 3001..4001] .into_iter() .flatten() .collect::>() ); assert_eq!( - slot_map.get_slots_of_node("replica3:6379"), + slot_map.get_slots_of_node(Arc::new("replica3:6379".to_string())), vec![1002..2001, 3001..4001] .into_iter() .flatten() .collect::>() ); assert_eq!( - slot_map.get_slots_of_node("replica4:6379"), + slot_map.get_slots_of_node(Arc::new("replica4:6379".to_string())), (2001..3001).collect::>() ); assert_eq!( - slot_map.get_slots_of_node("replica5:6379"), + slot_map.get_slots_of_node(Arc::new("replica5:6379".to_string())), (2001..3001).collect::>() ); assert_eq!( - slot_map.get_slots_of_node("replica6:6379"), + slot_map.get_slots_of_node(Arc::new("replica6:6379".to_string())), (2001..3001).collect::>() ); } + + fn create_slot(start: u16, end: u16, master: &str, replicas: Vec<&str>) -> Slot { + Slot::new( + start, + end, + master.to_owned(), + replicas.into_iter().map(|r| r.to_owned()).collect(), + ) + } + + fn assert_equal_slot_maps(this: SlotMap, expected: Vec) { + for ((end, slot_value), expected_slot) in this.slots.iter().zip(expected.iter()) { + assert_eq!(*end, expected_slot.end); + assert_eq!(slot_value.start, expected_slot.start); + let shard_addrs = &slot_value.addrs; + assert_eq!(*shard_addrs.primary(), expected_slot.master); + let _ = shard_addrs + .replicas() + .iter() + .zip(expected_slot.replicas.iter()) + .map(|(curr, expected)| { + assert_eq!(**curr, *expected); + }); + } + } + + fn assert_slot_map_and_shard_addrs( + slot_map: SlotMap, + slot: u16, + new_shard_addrs: Arc, + expected_slots: Vec, + ) { + assert!(SlotMap::shard_addrs_equal( + &slot_map.shard_addrs_for_slot(slot).unwrap(), + &new_shard_addrs + )); + assert_equal_slot_maps(slot_map, expected_slots); + } + + #[test] + fn test_update_slot_range_single_slot_range() { + let test_slot = 8000; + let before_slots = vec![ + create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 8000, "node1:6379", vec!["replica1:6379"]), + create_slot(8001, 16383, "node3:6379", vec!["replica3:6379"]), + ]; + + let mut slot_map = SlotMap::new(before_slots, ReadFromReplicaStrategy::AlwaysFromPrimary); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(8001) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, test_slot - 1, "node1:6379", vec!["replica1:6379"]), + create_slot(test_slot, test_slot, "node3:6379", vec!["replica3:6379"]), + create_slot(test_slot + 1, 16383, "node3:6379", vec!["replica3:6379"]), + ]; + + assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_slot_matches_end_range_merge_ranges() { + let test_slot = 7999; + let before_slots = vec![ + create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new(before_slots, ReadFromReplicaStrategy::AlwaysFromPrimary); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(8000) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, test_slot - 1, "node1:6379", vec!["replica1:6379"]), + create_slot(test_slot, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_slot_matches_end_range_cant_merge_ranges() { + let test_slot = 7999; + let before_slots = vec![ + create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new(before_slots, ReadFromReplicaStrategy::AlwaysFromPrimary); + let new_shard_addrs = Arc::new(ShardAddrs::new( + Arc::new("node3:6379".to_owned()), + vec![Arc::new("replica3:6379".to_owned())], + )); + + let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, test_slot - 1, "node1:6379", vec!["replica1:6379"]), + create_slot(test_slot, test_slot, "node3:6379", vec!["replica3:6379"]), + create_slot(test_slot + 1, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_slot_matches_start_range_merge_ranges() { + let test_slot = 8000; + let before_slots = vec![ + create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new(before_slots, ReadFromReplicaStrategy::AlwaysFromPrimary); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(7999) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, test_slot, "node1:6379", vec!["replica1:6379"]), + create_slot(test_slot + 1, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_slot_matches_start_range_cant_merge_ranges() { + let test_slot = 8000; + let before_slots = vec![ + create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new(before_slots, ReadFromReplicaStrategy::AlwaysFromPrimary); + let new_shard_addrs = Arc::new(ShardAddrs::new( + Arc::new("node3:6379".to_owned()), + vec![Arc::new("replica3:6379".to_owned())], + )); + + let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, test_slot - 1, "node1:6379", vec!["replica1:6379"]), + create_slot(test_slot, test_slot, "node3:6379", vec!["replica3:6379"]), + create_slot(test_slot + 1, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_slot_is_within_a_range() { + let test_slot = 4000; + let before_slots = vec![ + create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new(before_slots, ReadFromReplicaStrategy::AlwaysFromPrimary); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(8000) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, test_slot - 1, "node1:6379", vec!["replica1:6379"]), + create_slot(test_slot, test_slot, "node2:6379", vec!["replica2:6379"]), + create_slot(test_slot + 1, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_slot_is_not_covered_cant_merge_ranges() { + let test_slot = 7998; + let before_slots = vec![ + create_slot(0, 7000, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new(before_slots, ReadFromReplicaStrategy::AlwaysFromPrimary); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(8000) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, 7000, "node1:6379", vec!["replica1:6379"]), + create_slot(test_slot, test_slot, "node2:6379", vec!["replica2:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_slot_is_not_covered_merge_with_next() { + let test_slot = 7999; + let before_slots = vec![ + create_slot(0, 7000, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new(before_slots, ReadFromReplicaStrategy::AlwaysFromPrimary); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(8000) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, 7000, "node1:6379", vec!["replica1:6379"]), + create_slot(test_slot, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_slot_is_not_covered_merge_with_prev() { + let test_slot = 7001; + let before_slots = vec![ + create_slot(0, 7000, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new(before_slots, ReadFromReplicaStrategy::AlwaysFromPrimary); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(7000) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, test_slot, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_same_shard_owner_no_change_needed() { + let test_slot = 7000; + let before_slots = vec![ + create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new( + before_slots.clone(), + ReadFromReplicaStrategy::AlwaysFromPrimary, + ); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(7000) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(test_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = before_slots; + assert_slot_map_and_shard_addrs(slot_map, test_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_max_slot_matches_end_range() { + let max_slot = 16383; + let before_slots = vec![ + create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new( + before_slots.clone(), + ReadFromReplicaStrategy::AlwaysFromPrimary, + ); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(7000) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(max_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, max_slot - 1, "node2:6379", vec!["replica2:6379"]), + create_slot(max_slot, max_slot, "node1:6379", vec!["replica1:6379"]), + ]; + assert_slot_map_and_shard_addrs(slot_map, max_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_max_slot_single_slot_range() { + let max_slot = 16383; + let before_slots = vec![ + create_slot(0, 16382, "node1:6379", vec!["replica1:6379"]), + create_slot(16383, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new( + before_slots.clone(), + ReadFromReplicaStrategy::AlwaysFromPrimary, + ); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(0) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(max_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(0, max_slot - 1, "node1:6379", vec!["replica1:6379"]), + create_slot(max_slot, max_slot, "node1:6379", vec!["replica1:6379"]), + ]; + assert_slot_map_and_shard_addrs(slot_map, max_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_min_slot_matches_start_range() { + let min_slot = 0; + let before_slots = vec![ + create_slot(0, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new( + before_slots.clone(), + ReadFromReplicaStrategy::AlwaysFromPrimary, + ); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(8000) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(min_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(min_slot, min_slot, "node2:6379", vec!["replica2:6379"]), + create_slot(min_slot + 1, 7999, "node1:6379", vec!["replica1:6379"]), + create_slot(8000, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + assert_slot_map_and_shard_addrs(slot_map, min_slot, new_shard_addrs, after_slots); + } + + #[test] + fn test_update_slot_range_min_slot_single_slot_range() { + let min_slot = 0; + let before_slots = vec![ + create_slot(0, 0, "node1:6379", vec!["replica1:6379"]), + create_slot(1, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + + let mut slot_map = SlotMap::new( + before_slots.clone(), + ReadFromReplicaStrategy::AlwaysFromPrimary, + ); + let new_shard_addrs = slot_map + .shard_addrs_for_slot(1) + .expect("Couldn't find shard address for slot"); + + let res = slot_map.update_slot_range(min_slot, new_shard_addrs.clone()); + assert!(res.is_ok(), "{res:?}"); + + let after_slots = vec![ + create_slot(min_slot, min_slot, "node2:6379", vec!["replica2:6379"]), + create_slot(min_slot + 1, 16383, "node2:6379", vec!["replica2:6379"]), + ]; + assert_slot_map_and_shard_addrs(slot_map, min_slot, new_shard_addrs, after_slots); + } } diff --git a/redis/src/cluster_topology.rs b/redis/src/cluster_topology.rs index a2ce9ea07..9a5b5e99a 100644 --- a/redis/src/cluster_topology.rs +++ b/redis/src/cluster_topology.rs @@ -300,7 +300,7 @@ pub(crate) fn calculate_topology<'a>( #[cfg(test)] mod tests { use super::*; - use crate::cluster_routing::SlotAddrs; + use crate::cluster_routing::ShardAddrs; #[test] fn test_get_hashtag() { @@ -456,10 +456,11 @@ mod tests { assert_eq!(calculate_hash(&res1), calculate_hash(&res2)); assert_eq!(res1.0, res2.0); assert_eq!(res1.1.len(), res2.1.len()); - let equality_check = - res1.1.iter().zip(&res2.1).all(|(first, second)| { - first.start() == second.start() && first.end() == second.end() - }); + let equality_check = res1 + .1 + .iter() + .zip(&res2.1) + .all(|(first, second)| first.start == second.start && first.end == second.end); assert!(equality_check); let replicas_check = res1 .1 @@ -502,8 +503,21 @@ mod tests { } } - fn get_node_addr(name: &str, port: u16) -> SlotAddrs { - SlotAddrs::new(format!("{name}:{port}"), Vec::new()) + fn get_node_addr(name: &str, port: u16) -> Arc { + Arc::new(ShardAddrs::new(format!("{name}:{port}").into(), Vec::new())) + } + + fn collect_shard_addrs(slot_map: &SlotMap) -> Vec> { + let mut shard_addrs: Vec> = slot_map + .nodes_map() + .iter() + .map(|map_item| { + let shard_addrs = map_item.value(); + shard_addrs.clone() + }) + .collect(); + shard_addrs.sort_unstable(); + shard_addrs } #[test] @@ -524,9 +538,9 @@ mod tests { ReadFromReplicaStrategy::AlwaysFromPrimary, ) .unwrap(); - let res: Vec<_> = topology_view.values().collect(); + let res = collect_shard_addrs(&topology_view); let node_1 = get_node_addr("node1", 6379); - let expected: Vec<&SlotAddrs> = vec![&node_1]; + let expected = vec![node_1]; assert_eq!(res, expected); } @@ -566,10 +580,10 @@ mod tests { ReadFromReplicaStrategy::AlwaysFromPrimary, ) .unwrap(); - let res: Vec<_> = topology_view.values().collect(); + let res = collect_shard_addrs(&topology_view); let node_1 = get_node_addr("node1", 6379); let node_2 = get_node_addr("node2", 6380); - let expected: Vec<&SlotAddrs> = vec![&node_1, &node_2]; + let expected = vec![node_1, node_2]; assert_eq!(res, expected); } @@ -589,10 +603,10 @@ mod tests { ReadFromReplicaStrategy::AlwaysFromPrimary, ) .unwrap(); - let res: Vec<_> = topology_view.values().collect(); + let res = collect_shard_addrs(&topology_view); let node_1 = get_node_addr("node1", 6379); let node_2 = get_node_addr("node2", 6380); - let expected: Vec<&SlotAddrs> = vec![&node_1, &node_2]; + let expected = vec![node_1, node_2]; assert_eq!(res, expected); } @@ -613,10 +627,10 @@ mod tests { ReadFromReplicaStrategy::AlwaysFromPrimary, ) .unwrap(); - let res: Vec<_> = topology_view.values().collect(); + let res = collect_shard_addrs(&topology_view); let node_1 = get_node_addr("node3", 6381); let node_2 = get_node_addr("node4", 6382); - let expected: Vec<&SlotAddrs> = vec![&node_1, &node_2]; + let expected = vec![node_1, node_2]; assert_eq!(res, expected); } @@ -637,9 +651,9 @@ mod tests { ReadFromReplicaStrategy::AlwaysFromPrimary, ) .unwrap(); - let res: Vec<_> = topology_view.values().collect(); + let res = collect_shard_addrs(&topology_view); let node_1 = get_node_addr("node1", 6379); - let expected: Vec<&SlotAddrs> = vec![&node_1]; + let expected = vec![node_1]; assert_eq!(res, expected); } } diff --git a/redis/src/commands/cluster_scan.rs b/redis/src/commands/cluster_scan.rs index 97f10577a..58a568cfb 100644 --- a/redis/src/commands/cluster_scan.rs +++ b/redis/src/commands/cluster_scan.rs @@ -134,14 +134,14 @@ impl ScanStateRC { #[async_trait] pub(crate) trait ClusterInScan { /// Retrieves the address associated with a given slot in the cluster. - async fn get_address_by_slot(&self, slot: u16) -> RedisResult; + async fn get_address_by_slot(&self, slot: u16) -> RedisResult>; /// Retrieves the epoch of a given address in the cluster. /// The epoch represents the version of the address, which is updated when a failover occurs or slots migrate in. async fn get_address_epoch(&self, address: &str) -> Result; /// Retrieves the slots assigned to a given address in the cluster. - async fn get_slots_of_address(&self, address: &str) -> Vec; + async fn get_slots_of_address(&self, address: Arc) -> Vec; /// Routes a Redis command to a specific address in the cluster. async fn route_command(&self, cmd: Cmd, address: &str) -> RedisResult; @@ -150,7 +150,7 @@ pub(crate) trait ClusterInScan { async fn are_all_slots_covered(&self) -> bool; /// Check if the topology of the cluster has changed and refresh the slots if needed - async fn refresh_if_topology_changed(&self); + async fn refresh_if_topology_changed(&self) -> RedisResult; } /// Represents the state of a scan operation in a Redis cluster. @@ -165,7 +165,7 @@ pub(crate) struct ScanState { scanned_slots_map: SlotsBitsArray, // the address that is being scanned currently, based on the next slot set to 0 in the scanned_slots_map, and the address that "owns" the slot // in the SlotMap - pub(crate) address_in_scan: String, + pub(crate) address_in_scan: Arc, // epoch represent the version of the address, when a failover happens or slots migrate in the epoch will be updated to +1 address_epoch: u64, // the status of the scan operation @@ -189,7 +189,7 @@ impl ScanState { pub fn new( cursor: u64, scanned_slots_map: SlotsBitsArray, - address_in_scan: String, + address_in_scan: Arc, address_epoch: u64, scan_status: ScanStateStage, ) -> Self { @@ -206,7 +206,7 @@ impl ScanState { Self { cursor: 0, scanned_slots_map: [0; BITS_ARRAY_SIZE], - address_in_scan: String::new(), + address_in_scan: Default::default(), address_epoch: 0, scan_status: ScanStateStage::Finished, } @@ -288,7 +288,16 @@ impl ScanState { &mut self, connection: &C, ) -> RedisResult { - let _ = connection.refresh_if_topology_changed().await; + connection + .refresh_if_topology_changed() + .await + .map_err(|err| { + RedisError::from(( + ErrorKind::ResponseError, + "Error during cluster scan: failed to refresh slots", + format!("{:?}", err), + )) + })?; let mut scanned_slots_map = self.scanned_slots_map; // If the address epoch changed it mean that some slots in the address are new, so we cant know which slots been there from the beginning and which are new, or out and in later. // In this case we will skip updating the scanned_slots_map and will just update the address and the cursor @@ -301,7 +310,9 @@ impl ScanState { } // If epoch wasn't changed, the slots owned by the address after the refresh are all valid as slots that been scanned // So we will update the scanned_slots_map with the slots owned by the address - let slots_scanned = connection.get_slots_of_address(&self.address_in_scan).await; + let slots_scanned = connection + .get_slots_of_address(self.address_in_scan.clone()) + .await; for slot in slots_scanned { let slot_index = slot as usize / BITS_PER_U64; let slot_bit = slot as usize % BITS_PER_U64; @@ -340,7 +351,7 @@ impl ClusterInScan for Core where C: ConnectionLike + Connect + Clone + Send + Sync + 'static, { - async fn get_address_by_slot(&self, slot: u16) -> RedisResult { + async fn get_address_by_slot(&self, slot: u16) -> RedisResult> { let address = self .get_address_from_slot(slot, SlotAddr::ReplicaRequired) .await; @@ -365,7 +376,7 @@ where async fn get_address_epoch(&self, address: &str) -> Result { self.as_ref().get_address_epoch(address).await } - async fn get_slots_of_address(&self, address: &str) -> Vec { + async fn get_slots_of_address(&self, address: Arc) -> Vec { self.as_ref().get_slots_of_address(address).await } async fn route_command(&self, cmd: Cmd, address: &str) -> RedisResult { @@ -387,14 +398,14 @@ where async fn are_all_slots_covered(&self) -> bool { ClusterConnInner::::check_if_all_slots_covered(&self.conn_lock.read().await.slot_map) } - async fn refresh_if_topology_changed(&self) { + async fn refresh_if_topology_changed(&self) -> RedisResult { ClusterConnInner::check_topology_and_refresh_if_diff( self.to_owned(), // The cluster SCAN implementation must refresh the slots when a topology change is found // to ensure the scan logic is correct. &RefreshPolicy::NotThrottable, ) - .await; + .await } } @@ -529,7 +540,13 @@ where { // TODO: This mechanism of refreshing on failure to route to address should be part of the routing mechanism // After the routing mechanism is updated to handle this case, this refresh in the case bellow should be removed - core.refresh_if_topology_changed().await; + core.refresh_if_topology_changed().await.map_err(|err| { + RedisError::from(( + ErrorKind::ResponseError, + "Error during cluster scan: failed to refresh slots", + format!("{:?}", err), + )) + })?; if !core.are_all_slots_covered().await { return Err(RedisError::from(( ErrorKind::NotAllSlotsCovered, @@ -576,7 +593,7 @@ mod tests { let scan_state = ScanState { cursor: 0, scanned_slots_map: [0; BITS_ARRAY_SIZE], - address_in_scan: String::from("address1"), + address_in_scan: String::from("address1").into(), address_epoch: 1, scan_status: ScanStateStage::InProgress, }; @@ -592,7 +609,7 @@ mod tests { let scan_state = ScanState { cursor: 0, scanned_slots_map, - address_in_scan: String::from("address1"), + address_in_scan: String::from("address1").into(), address_epoch: 1, scan_status: ScanStateStage::InProgress, }; @@ -604,7 +621,7 @@ mod tests { let scan_state = ScanState { cursor: 0, scanned_slots_map, - address_in_scan: String::from("address1"), + address_in_scan: String::from("address1").into(), address_epoch: 1, scan_status: ScanStateStage::InProgress, }; @@ -615,15 +632,17 @@ mod tests { struct MockConnection; #[async_trait] impl ClusterInScan for MockConnection { - async fn refresh_if_topology_changed(&self) {} - async fn get_address_by_slot(&self, _slot: u16) -> RedisResult { - Ok("mock_address".to_string()) + async fn refresh_if_topology_changed(&self) -> RedisResult { + Ok(true) + } + async fn get_address_by_slot(&self, _slot: u16) -> RedisResult> { + Ok("mock_address".to_string().into()) } async fn get_address_epoch(&self, _address: &str) -> Result { Ok(0) } - async fn get_slots_of_address(&self, address: &str) -> Vec { - if address == "mock_address" { + async fn get_slots_of_address(&self, address: Arc) -> Vec { + if address.as_str() == "mock_address" { vec![3, 4, 5] } else { vec![0, 1, 2] @@ -645,7 +664,10 @@ mod tests { // Assert that the scan state is initialized correctly assert_eq!(scan_state.cursor, 0); assert_eq!(scan_state.scanned_slots_map, [0; BITS_ARRAY_SIZE]); - assert_eq!(scan_state.address_in_scan, "mock_address"); + assert_eq!( + scan_state.address_in_scan, + "mock_address".to_string().into() + ); assert_eq!(scan_state.address_epoch, 0); } @@ -655,7 +677,7 @@ mod tests { let scan_state = ScanState { cursor: 0, scanned_slots_map: [0; BITS_ARRAY_SIZE], - address_in_scan: "".to_string(), + address_in_scan: "".to_string().into(), address_epoch: 0, scan_status: ScanStateStage::InProgress, }; @@ -691,7 +713,10 @@ mod tests { assert_eq!(updated_scan_state.cursor, 0); // address_in_scan should be updated to the new address - assert_eq!(updated_scan_state.address_in_scan, "mock_address"); + assert_eq!( + updated_scan_state.address_in_scan, + "mock_address".to_string().into() + ); // address_epoch should be updated to the new address epoch assert_eq!(updated_scan_state.address_epoch, 0); @@ -703,7 +728,7 @@ mod tests { let scan_state = ScanState::new( 0, [0; BITS_ARRAY_SIZE], - "address".to_string(), + "address".to_string().into(), 0, ScanStateStage::InProgress, ); @@ -714,7 +739,10 @@ mod tests { .unwrap(); assert_eq!(updated_scan_state.scanned_slots_map, scanned_slots_map); assert_eq!(updated_scan_state.cursor, 0); - assert_eq!(updated_scan_state.address_in_scan, "mock_address"); + assert_eq!( + updated_scan_state.address_in_scan, + "mock_address".to_string().into() + ); assert_eq!(updated_scan_state.address_epoch, 0); } } diff --git a/redis/tests/test_cluster_async.rs b/redis/tests/test_cluster_async.rs index b690ed87b..68ed557e7 100644 --- a/redis/tests/test_cluster_async.rs +++ b/redis/tests/test_cluster_async.rs @@ -1375,6 +1375,555 @@ mod cluster_async { assert_eq!(value, Ok(Some(123))); } + #[test] + fn test_async_cluster_update_slots_based_on_moved_error_indicates_slot_migration() { + // This test simulates the scenario where the client receives a MOVED error indicating that a key is now + // stored on the primary node of another shard. + // It ensures that the new slot now owned by the primary and its associated replicas. + let name = "test_async_cluster_update_slots_based_on_moved_error_indicates_slot_migration"; + let slots_config = vec![ + MockSlotRange { + primary_port: 6379, + replica_ports: vec![7000], + slot_range: (0..8000), + }, + MockSlotRange { + primary_port: 6380, + replica_ports: vec![7001], + slot_range: (8001..16380), + }, + ]; + + let moved_from_port = 6379; + let moved_to_port = 6380; + let new_shard_replica_port = 7001; + + // Tracking moved and replica requests for validation + let moved_requests = Arc::new(atomic::AtomicUsize::new(0)); + let cloned_moved_requests = moved_requests.clone(); + let replica_requests = Arc::new(atomic::AtomicUsize::new(0)); + let cloned_replica_requests = moved_requests.clone(); + + // Test key and slot + let key = "test"; + let key_slot = 6918; + + // Mock environment setup + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .slots_refresh_rate_limit(Duration::from_secs(1000000), 0) // Rate limiter to disable slot refresh + .read_from_replicas(), // Allow reads from replicas + name, + move |cmd: &[u8], port| { + if contains_slice(cmd, b"PING") + || contains_slice(cmd, b"SETNAME") + || contains_slice(cmd, b"READONLY") + { + return Err(Ok(Value::SimpleString("OK".into()))); + } + + if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { + let slots = create_topology_from_config(name, slots_config.clone()); + return Err(Ok(slots)); + } + + if contains_slice(cmd, b"SET") { + if port == moved_to_port { + // Simulate primary OK response + Err(Ok(Value::SimpleString("OK".into()))) + } else if port == moved_from_port { + // Simulate MOVED error for other port + moved_requests.fetch_add(1, Ordering::Relaxed); + Err(parse_redis_value( + format!("-MOVED {key_slot} {name}:{moved_to_port}\r\n").as_bytes(), + )) + } else { + panic!("unexpected port for SET command: {port:?}.\n + Expected one of: moved_to_port={moved_to_port}, moved_from_port={moved_from_port}"); + } + } else if contains_slice(cmd, b"GET") { + if new_shard_replica_port == port { + // Simulate replica response for GET after slot migration + replica_requests.fetch_add(1, Ordering::Relaxed); + Err(Ok(Value::BulkString(b"123".to_vec()))) + } else { + panic!("unexpected port for GET command: {port:?}, Expected: {new_shard_replica_port:?}"); + } + } else { + panic!("unexpected command {cmd:?}") + } + }, + ); + + // First request: Trigger MOVED error and reroute + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Second request: Should be routed directly to the new primary node if the slots map is updated + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Handle slot migration scenario: Ensure the new shard's replicas are accessible + let value = runtime.block_on( + cmd("GET") + .arg(key) + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(123))); + assert_eq!(cloned_replica_requests.load(Ordering::Relaxed), 1); + + // Assert there was only a single MOVED error + assert_eq!(cloned_moved_requests.load(Ordering::Relaxed), 1); + } + + #[test] + fn test_async_cluster_update_slots_based_on_moved_error_indicates_failover() { + // This test simulates a failover scenario, where the client receives a MOVED error and the replica becomes the new primary. + // The test verifies that the client updates the slot mapping to promote the replica to the primary and routes future requests + // to the new primary, ensuring other slots in the shard are also handled by the new primary. + let name = "test_async_cluster_update_slots_based_on_moved_error_indicates_failover"; + let slots_config = vec![ + MockSlotRange { + primary_port: 6379, + replica_ports: vec![7001], + slot_range: (0..8000), + }, + MockSlotRange { + primary_port: 6380, + replica_ports: vec![7002], + slot_range: (8001..16380), + }, + ]; + + let moved_from_port = 6379; + let moved_to_port = 7001; + + // Tracking moved for validation + let moved_requests = Arc::new(atomic::AtomicUsize::new(0)); + let cloned_moved_requests = moved_requests.clone(); + + // Test key and slot + let key = "test"; + let key_slot = 6918; + + // Mock environment setup + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .slots_refresh_rate_limit(Duration::from_secs(1000000), 0), // Rate limiter to disable slot refresh + name, + move |cmd: &[u8], port| { + if contains_slice(cmd, b"PING") + || contains_slice(cmd, b"SETNAME") + || contains_slice(cmd, b"READONLY") + { + return Err(Ok(Value::SimpleString("OK".into()))); + } + + if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { + let slots = create_topology_from_config(name, slots_config.clone()); + return Err(Ok(slots)); + } + + if contains_slice(cmd, b"SET") { + if port == moved_to_port { + // Simulate primary OK response + Err(Ok(Value::SimpleString("OK".into()))) + } else if port == moved_from_port { + // Simulate MOVED error for other port + moved_requests.fetch_add(1, Ordering::Relaxed); + Err(parse_redis_value( + format!("-MOVED {key_slot} {name}:{moved_to_port}\r\n").as_bytes(), + )) + } else { + panic!("unexpected port for SET command: {port:?}.\n + Expected one of: moved_to_port={moved_to_port}, moved_from_port={moved_from_port}"); + } + } else { + panic!("unexpected command {cmd:?}") + } + }, + ); + + // First request: Trigger MOVED error and reroute + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Second request: Should be routed directly to the new primary node if the slots map is updated + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Handle failover scenario: Ensure other slots in the same shard are updated to the new primary + let key_slot_1044 = "foo2"; + let value = runtime.block_on( + cmd("SET") + .arg(key_slot_1044) + .arg("bar2") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Assert there was only a single MOVED error + assert_eq!(cloned_moved_requests.load(Ordering::Relaxed), 1); + } + + #[test] + fn test_async_cluster_update_slots_based_on_moved_error_indicates_new_primary() { + // This test simulates the scenario where the client receives a MOVED error indicating that the key now belongs to + // an entirely new primary node that wasn't previously known. The test verifies that the client correctly adds the new + // primary node to its slot map and routes future requests to the new node. + let name = "test_async_cluster_update_slots_based_on_moved_error_indicates_new_primary"; + let slots_config = vec![ + MockSlotRange { + primary_port: 6379, + replica_ports: vec![], + slot_range: (0..8000), + }, + MockSlotRange { + primary_port: 6380, + replica_ports: vec![], + slot_range: (8001..16380), + }, + ]; + + let moved_from_port = 6379; + let moved_to_port = 6381; + + // Tracking moved for validation + let moved_requests = Arc::new(atomic::AtomicUsize::new(0)); + let cloned_moved_requests = moved_requests.clone(); + + // Test key and slot + let key = "test"; + let key_slot = 6918; + + // Mock environment setup + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .slots_refresh_rate_limit(Duration::from_secs(1000000), 0) // Rate limiter to disable slot refresh + .read_from_replicas(), // Allow reads from replicas + name, + move |cmd: &[u8], port| { + if contains_slice(cmd, b"PING") + || contains_slice(cmd, b"SETNAME") + || contains_slice(cmd, b"READONLY") + { + return Err(Ok(Value::SimpleString("OK".into()))); + } + + if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { + let slots = create_topology_from_config(name, slots_config.clone()); + return Err(Ok(slots)); + } + + if contains_slice(cmd, b"SET") { + if port == moved_to_port { + // Simulate primary OK response + Err(Ok(Value::SimpleString("OK".into()))) + } else if port == moved_from_port { + // Simulate MOVED error for other port + moved_requests.fetch_add(1, Ordering::Relaxed); + Err(parse_redis_value( + format!("-MOVED {key_slot} {name}:{moved_to_port}\r\n").as_bytes(), + )) + } else { + panic!("unexpected port for SET command: {port:?}.\n + Expected one of: moved_to_port={moved_to_port}, moved_from_port={moved_from_port}"); + } + } else if contains_slice(cmd, b"GET") { + if moved_to_port == port { + // Simulate primary response for GET + Err(Ok(Value::BulkString(b"123".to_vec()))) + } else { + panic!( + "unexpected port for GET command: {port:?}, Expected: {moved_to_port}" + ); + } + } else { + panic!("unexpected command {cmd:?}") + } + }, + ); + + // First request: Trigger MOVED error and reroute + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Second request: Should be routed directly to the new primary node if the slots map is updated + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Third request: The new primary should have no replicas so it should be directed to it + let value = runtime.block_on( + cmd("GET") + .arg(key) + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(123))); + + // Assert there was only a single MOVED error + assert_eq!(cloned_moved_requests.load(Ordering::Relaxed), 1); + } + + #[test] + fn test_async_cluster_update_slots_based_on_moved_error_indicates_replica_of_different_shard() { + // This test simulates a scenario where the client receives a MOVED error indicating that a key + // has been moved to a replica in a different shard. The replica is then promoted to primary and + // no longer exists in the shard’s replica set. + // The test validates that the key gets correctly routed to the new primary and ensures that the + // shard updates its mapping accordingly, with only one MOVED error encountered during the process. + + let name = "test_async_cluster_update_slots_based_on_moved_error_indicates_replica_of_different_shard"; + let slots_config = vec![ + MockSlotRange { + primary_port: 6379, + replica_ports: vec![7000], + slot_range: (0..8000), + }, + MockSlotRange { + primary_port: 6380, + replica_ports: vec![7001], + slot_range: (8001..16380), + }, + ]; + + let moved_from_port = 6379; + let moved_to_port = 7001; + let primary_shard2 = 6380; + + // Tracking moved for validation + let moved_requests = Arc::new(atomic::AtomicUsize::new(0)); + let cloned_moved_requests = moved_requests.clone(); + + // Test key and slot of the first shard + let key = "test"; + let key_slot = 6918; + + // Test key of the second shard + let key_shard2 = "foo"; // slot 12182 + + // Mock environment setup + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .slots_refresh_rate_limit(Duration::from_secs(1000000), 0) // Rate limiter to disable slot refresh + .read_from_replicas(), // Allow reads from replicas + name, + move |cmd: &[u8], port| { + if contains_slice(cmd, b"PING") + || contains_slice(cmd, b"SETNAME") + || contains_slice(cmd, b"READONLY") + { + return Err(Ok(Value::SimpleString("OK".into()))); + } + + if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { + let slots = create_topology_from_config(name, slots_config.clone()); + return Err(Ok(slots)); + } + + if contains_slice(cmd, b"SET") { + if port == moved_to_port { + // Simulate primary OK response + Err(Ok(Value::SimpleString("OK".into()))) + } else if port == moved_from_port { + // Simulate MOVED error for other port + moved_requests.fetch_add(1, Ordering::Relaxed); + Err(parse_redis_value( + format!("-MOVED {key_slot} {name}:{moved_to_port}\r\n").as_bytes(), + )) + } else { + panic!("unexpected port for SET command: {port:?}.\n + Expected one of: moved_to_port={moved_to_port}, moved_from_port={moved_from_port}"); + } + } else if contains_slice(cmd, b"GET") { + if port == primary_shard2 { + // Simulate second shard primary response for GET + Err(Ok(Value::BulkString(b"123".to_vec()))) + } else { + panic!("unexpected port for GET command: {port:?}, Expected: {primary_shard2:?}"); + } + } else { + panic!("unexpected command {cmd:?}") + } + }, + ); + + // First request: Trigger MOVED error and reroute + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Second request: Should be routed directly to the new primary node if the slots map is updated + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Third request: Verify that the promoted replica is no longer part of the second shard replicas by + // ensuring the response is received from the shard's primary + let value = runtime.block_on( + cmd("GET") + .arg(key_shard2) + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(123))); + + // Assert there was only a single MOVED error + assert_eq!(cloned_moved_requests.load(Ordering::Relaxed), 1); + } + + #[test] + fn test_async_cluster_update_slots_based_on_moved_error_no_change() { + // This test simulates a scenario where the client receives a MOVED error, but the new primary is the + // same as the old primary (no actual change). It ensures that no additional slot map + // updates are required and that the subsequent requests are still routed to the same primary node, with + // only one MOVED error encountered. + let name = "test_async_cluster_update_slots_based_on_moved_error_no_change"; + let slots_config = vec![ + MockSlotRange { + primary_port: 6379, + replica_ports: vec![7000], + slot_range: (0..8000), + }, + MockSlotRange { + primary_port: 6380, + replica_ports: vec![7001], + slot_range: (8001..16380), + }, + ]; + + let moved_from_port = 6379; + let moved_to_port = 6379; + + // Tracking moved for validation + let moved_requests = Arc::new(atomic::AtomicUsize::new(0)); + let cloned_moved_requests = moved_requests.clone(); + + // Test key and slot of the first shard + let key = "test"; + let key_slot = 6918; + + // Mock environment setup + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .slots_refresh_rate_limit(Duration::from_secs(1000000), 0), // Rate limiter to disable slot refresh + name, + move |cmd: &[u8], port| { + if contains_slice(cmd, b"PING") + || contains_slice(cmd, b"SETNAME") + || contains_slice(cmd, b"READONLY") + { + return Err(Ok(Value::SimpleString("OK".into()))); + } + + if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { + let slots = create_topology_from_config(name, slots_config.clone()); + return Err(Ok(slots)); + } + + if contains_slice(cmd, b"SET") { + if port == moved_to_port { + if moved_requests.load(Ordering::Relaxed) == 0 { + moved_requests.fetch_add(1, Ordering::Relaxed); + Err(parse_redis_value( + format!("-MOVED {key_slot} {name}:{moved_to_port}\r\n").as_bytes(), + )) + } else { + Err(Ok(Value::SimpleString("OK".into()))) + } + } else { + panic!("unexpected port for SET command: {port:?}.\n + Expected one of: moved_to_port={moved_to_port}, moved_from_port={moved_from_port}"); + } + } else { + panic!("unexpected command {cmd:?}") + } + }, + ); + + // First request: Trigger MOVED error and reroute + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Second request: Should be still routed to the same primary node + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Assert there was only a single MOVED error + assert_eq!(cloned_moved_requests.load(Ordering::Relaxed), 1); + } + #[test] fn test_async_cluster_reconnect_even_with_zero_retries() { let name = "test_async_cluster_reconnect_even_with_zero_retries";