Skip to content

Commit

Permalink
Added logic to update the slot map based on MOVED errors (#186)
Browse files Browse the repository at this point in the history
  • Loading branch information
barshaul committed Sep 12, 2024
1 parent 65c9a5d commit bb5eb32
Show file tree
Hide file tree
Showing 7 changed files with 1,542 additions and 103 deletions.
2 changes: 1 addition & 1 deletion redis/benches/bench_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn bench_simple_getsetdel_async(b: &mut Bencher) {
() = redis::cmd("SET")
.arg(key)
.arg(42)
.query_async(&mut con)
.query_async::<_, Option<Value>>(&mut con)
.await?;
let _: isize = redis::cmd("GET").arg(key).query_async(&mut con).await?;
() = redis::cmd("DEL").arg(key).query_async(&mut con).await?;
Expand Down
23 changes: 14 additions & 9 deletions redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::cluster_async::ConnectionFuture;
use crate::cluster_routing::{Route, SlotAddr};
use crate::cluster_routing::{Route, ShardAddrs, SlotAddr};
use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue};
use crate::cluster_topology::TopologyHash;
use dashmap::DashMap;
use futures::FutureExt;
use rand::seq::IteratorRandom;
use std::net::IpAddr;
use std::sync::Arc;

/// A struct that encapsulates a network connection along with its associated IP address.
#[derive(Clone, Eq, PartialEq, Debug)]
Expand Down Expand Up @@ -137,6 +138,16 @@ where
}
}

/// Returns an iterator over the nodes in the `slot_map`, yielding pairs of the node address and its associated shard addresses.
pub(crate) fn slot_map_nodes(
&self,
) -> impl Iterator<Item = (Arc<String>, Arc<ShardAddrs>)> + '_ {
self.slot_map
.nodes_map()
.iter()
.map(|item| (item.key().clone(), item.value().clone()))
}

// Extends the current connection map with the provided one
pub(crate) fn extend_connection_map(
&mut self,
Expand All @@ -154,10 +165,7 @@ where
&self,
slot_map_value: &SlotMapValue,
) -> Option<ConnectionAndAddress<Connection>> {
let addrs = &slot_map_value
.addrs
.read()
.expect("Failed to obtain ShardAddrs's read lock");
let addrs = &slot_map_value.addrs;
let initial_index = slot_map_value
.last_used_replica
.load(std::sync::atomic::Ordering::Relaxed);
Expand Down Expand Up @@ -185,10 +193,7 @@ where

fn lookup_route(&self, route: &Route) -> Option<ConnectionAndAddress<Connection>> {
let slot_map_value = self.slot_map.slot_value_for_route(route)?;
let addrs = &slot_map_value
.addrs
.read()
.expect("Failed to obtain ShardAddrs's read lock");
let addrs = &slot_map_value.addrs;
if addrs.replicas().is_empty() {
return self.connection_for_address(addrs.primary().as_str());
}
Expand Down
152 changes: 126 additions & 26 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub mod testing {
}
use crate::{
client::GlideConnectionOptions,
cluster_routing::{Routable, RoutingInfo},
cluster_routing::{Routable, RoutingInfo, ShardUpdateResult},
cluster_slotmap::SlotMap,
cluster_topology::SLOT_SIZE,
cmd,
Expand Down Expand Up @@ -679,19 +679,17 @@ impl From<String> for OperationTarget {
#[derive(Clone, Debug)]
pub(crate) struct RedirectNode {
/// The address of the redirect node.
pub _address: String,
pub address: String,
/// The slot of the redirect node.
pub _slot: u16,
pub slot: u16,
}

impl RedirectNode {
/// This function expects an `Option` containing a tuple with a string slice and a u16.
/// The tuple represents an address and a slot, respectively. If the input is `Some`,
/// the function converts the address to a `String` and constructs a `RedirectNode`.
/// Constructs a `RedirectNode` from an optional tuple containing an address and a slot number.
pub(crate) fn from_option_tuple(option: Option<(&str, u16)>) -> Option<Self> {
option.map(|(address, slot)| RedirectNode {
_address: address.to_string(),
_slot: slot,
address: address.to_string(),
slot,
})
}
}
Expand Down Expand Up @@ -807,6 +805,10 @@ pin_project! {
#[pin]
sleep: BoxFuture<'static, ()>,
},
UpdateMoved {
#[pin]
future: BoxFuture<'static, RedisResult<()>>,
},
}
}

Expand Down Expand Up @@ -871,8 +873,25 @@ impl<C> Future for Request<C> {
}
.into();
}
RequestStateProj::UpdateMoved { future } => {
if let Err(err) = ready!(future.poll(cx)) {
// Updating the slot map based on the MOVED error is an optimization.
// If it fails, proceed by retrying the request with the redirected node,
// and allow the slot refresh task to correct the slot map.
info!(
"Failed to update the slot map based on the received MOVED error.
Error: {err:?}"
);
}
if let Some(request) = self.project().request.take() {
return Next::Retry { request }.into();
} else {
return Next::Done.into();
}
}
_ => panic!("Request future must be Some"),
};

match ready!(future.poll(cx)) {
Ok(item) => {
self.respond(Ok(item));
Expand Down Expand Up @@ -1851,6 +1870,77 @@ where
Ok(())
}

/// Handles MOVED errors by updating the client's slot and node mappings based on the new primary's role:
/// /// Updates the slot and node mappings in response to a MOVED error.
/// This function handles various scenarios based on the new primary's role:
///
/// 1. **No Change**: If the new primary is already the current slot owner, no updates are needed.
/// 2. **Failover**: If the new primary is a replica within the same shard (indicating a failover),
/// the slot ownership is updated by promoting the replica to the primary in the existing shard addresses.
/// 3. **Slot Migration**: If the new primary is an existing primary in another shard, this indicates a slot migration,
/// and the slot mapping is updated to point to the new shard addresses.
/// 4. **Replica Moved to a Different Shard**: If the new primary is a replica in a different shard, it can be due to:
/// - The replica became the primary of its shard after a failover, with new slots migrated to it.
/// - The replica has moved to a different shard as the primary.
/// Since further information is unknown, the replica is removed from its original shard and added as the primary of a new shard.
/// 5. **New Node**: If the new primary is unknown, it is added as a new node in a new shard, possibly indicating scale-out.
///
/// # Arguments
/// * `inner` - Shared reference to InnerCore containing connection and slot state.
/// * `slot` - The slot number reported as moved.
/// * `new_primary` - The address of the node now responsible for the slot.
///
/// # Returns
/// * `RedisResult<()>` indicating success or failure in updating slot mappings.
async fn update_upon_moved_error(
inner: Arc<InnerCore<C>>,
slot: u16,
new_primary: Arc<String>,
) -> RedisResult<()> {
let mut connections_container = inner.conn_lock.write().await;
let curr_shard_addrs = connections_container.slot_map.shard_addrs_for_slot(slot);
// Check if the new primary is part of the current shard and update if required
if let Some(curr_shard_addrs) = curr_shard_addrs {
match curr_shard_addrs.attempt_shard_role_update(new_primary.clone()) {
// Scenario 1: No changes needed as the new primary is already the current slot owner.
// Scenario 2: Failover occurred and the new primary was promoted from a replica.
ShardUpdateResult::AlreadyPrimary | ShardUpdateResult::Promoted => return Ok(()),
// The node was not found in this shard, proceed with further scenarios.
ShardUpdateResult::NodeNotFound => {}
}
}

// Scenario 3 & 4: Check if the new primary exists in other shards
let mut nodes_iter = connections_container.slot_map_nodes();
for (node_addr, shard_addrs_arc) in &mut nodes_iter {
if node_addr == new_primary {
let is_existing_primary = shard_addrs_arc.primary().eq(&new_primary);
if is_existing_primary {
// Scenario 3: Slot Migration - The new primary is an existing primary in another shard
// Update the associated addresses for `slot` to `shard_addrs`.
drop(nodes_iter);
return connections_container
.slot_map
.update_slot_range(slot, shard_addrs_arc.clone());
} else {
// Scenario 4: The MOVED error redirects to `new_primary` which is known as a replica in a shard that doesn’t own `slot`.
// Remove the replica from its existing shard and treat it as a new node in a new shard.
shard_addrs_arc.remove_replica(new_primary.clone())?;
drop(nodes_iter);
return connections_container
.slot_map
.add_new_primary(slot, new_primary);
}
}
}

// Scenario 5: New Node - The new primary is not present in the current slots map, add it as a primary of a new shard.
drop(nodes_iter);
connections_container
.slot_map
.add_new_primary(slot, new_primary)
}

async fn execute_on_multiple_nodes<'a>(
cmd: &'a Arc<Cmd>,
routing: &'a MultipleNodeRoutingInfo,
Expand Down Expand Up @@ -2272,25 +2362,37 @@ where
sleep_duration,
moved_redirect,
} => {
poll_flush_action = poll_flush_action
.change_state(PollFlushAction::RebuildSlots(moved_redirect));
if let Some(request) = request {
let future: RequestState<
Pin<Box<dyn Future<Output = OperationResult> + Send>>,
> = match sleep_duration {
Some(sleep_duration) => RequestState::Sleep {
poll_flush_action =
poll_flush_action.change_state(PollFlushAction::RebuildSlots);
let future: Option<
RequestState<Pin<Box<dyn Future<Output = OperationResult> + Send>>>,
> = if let Some(moved_redirect) = moved_redirect {
Some(RequestState::UpdateMoved {
future: Box::pin(ClusterConnInner::update_upon_moved_error(
self.inner.clone(),
moved_redirect.slot,
moved_redirect.address.into(),
)),
})
} else if let Some(ref request) = request {
match sleep_duration {
Some(sleep_duration) => Some(RequestState::Sleep {
sleep: boxed_sleep(sleep_duration),
},
None => RequestState::Future {
}),
None => Some(RequestState::Future {
future: Box::pin(Self::try_request(
request.info.clone(),
self.inner.clone(),
)),
},
};
}),
}
} else {
None
};
if let Some(future) = future {
self.in_flight_requests.push(Box::pin(Request {
retry_params: self.inner.cluster_params.retry_params.clone(),
request: Some(request),
request,
future,
}));
}
Expand Down Expand Up @@ -2343,7 +2445,7 @@ where

enum PollFlushAction {
None,
RebuildSlots(Option<RedirectNode>),
RebuildSlots,
Reconnect(Vec<String>),
ReconnectFromInitialConnections,
}
Expand All @@ -2358,9 +2460,8 @@ impl PollFlushAction {
PollFlushAction::ReconnectFromInitialConnections
}

(PollFlushAction::RebuildSlots(moved_redirect), _)
| (_, PollFlushAction::RebuildSlots(moved_redirect)) => {
PollFlushAction::RebuildSlots(moved_redirect)
(PollFlushAction::RebuildSlots, _) | (_, PollFlushAction::RebuildSlots) => {
PollFlushAction::RebuildSlots
}

(PollFlushAction::Reconnect(mut addrs), PollFlushAction::Reconnect(new_addrs)) => {
Expand Down Expand Up @@ -2421,8 +2522,7 @@ where

match ready!(self.poll_complete(cx)) {
PollFlushAction::None => return Poll::Ready(Ok(())),
PollFlushAction::RebuildSlots(_moved_redirect) => {
// TODO: Add logic to update the slots map based on the MOVED error
PollFlushAction::RebuildSlots => {
self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
ClusterConnInner::refresh_slots_and_subscriptions_with_retries(
self.inner.clone(),
Expand Down
Loading

0 comments on commit bb5eb32

Please sign in to comment.