Skip to content

Commit

Permalink
tmp commit
Browse files Browse the repository at this point in the history
  • Loading branch information
barshaul committed Sep 9, 2024
1 parent b705d8c commit a374c59
Show file tree
Hide file tree
Showing 5 changed files with 461 additions and 110 deletions.
2 changes: 2 additions & 0 deletions redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ where

fn lookup_route(&self, route: &Route) -> Option<ConnectionAndAddress<Connection>> {
let slot_map_value = self.slot_map.slot_value_for_route(route)?;
println!("found slot_map_value = {slot_map_value:?}");
let addrs = &slot_map_value
.addrs
.read()
Expand All @@ -223,6 +224,7 @@ where
&self,
route: &Route,
) -> Option<ConnectionAndAddress<Connection>> {
println!("connection_for_route is called");
self.lookup_route(route).or_else(|| {
if route.slot_addr() != SlotAddr::Master {
self.lookup_route(&Route::new(route.slot(), SlotAddr::Master))
Expand Down
29 changes: 17 additions & 12 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1708,15 +1708,19 @@ where
}

/// Handles MOVED errors by updating the client's slot and node mappings based on the new primary's role:
/// 1. **No Change**: If the new primary is already the slot owner.
/// 2. **Failover**: If moved to a replica within the same shard, suggesting a failover has occurred.
/// 3. **Slot Migration**: Indicates a potential ongoing migration if moved to a primary in another shard.
/// 4. **Moved to an Existing Replica in a Different Shard**:
/// /// Updates the slot and node mappings in response to a MOVED error.
/// This function handles various scenarios based on the new primary's role:
///
/// 1. **No Change**: If the new primary is already the current slot owner, no updates are needed.
/// 2. **Failover**: If the new primary is a replica within the same shard (indicating a failover),
/// the slot ownership is updated by promoting the replica to the primary in the existing shard addresses.
/// 3. **Slot Migration**: If the new primary is an existing primary in another shard, this indicates a slot migration,
/// and the slot mapping is updated to point to the new shard addresses.
/// 4. **Replica Moved to a Different Shard**: If the new primary is a replica in a different shard, it can be due to:
/// - The replica became the primary of its shard after a failover, with new slots migrated to it.
/// - The replica has moved to a different shard as the primary.
/// Information about the new shard—whether it’s the original shard or a new one
/// with different slots and nodes—is unknown.
/// 5. **New Node**: If moved to an unknown node, this suggests the addition of a new node and possible scale-out.
/// Since further information is unknown, the replica is removed from its original shard and added as the primary of a new shard.
/// 5. **New Node**: If the new primary is unknown, it is added as a new node in a new shard, possibly indicating scale-out.
///
/// # Arguments
/// * `inner` - Shared reference to InnerCore containing connection and slot state.
Expand All @@ -1741,12 +1745,12 @@ where
.expect("Failed to acquire read lock for ShardAddrs");
if *curr_shard_addrs_read.primary() == *new_primary {
println!("Scenario 1");
// Scenario 1: No change needed if the new primary is already recognized as the slot owner.
// Scenario 1: No Change - The new primary is already the current slot owner.
return Ok(());
}

if curr_shard_addrs_read.replicas().contains(&new_primary) {
// Scenario 2: Handle failover within the same shard by updating all slots managed by the old primary to the new primary.
// Scenario 2: Failover - The new primary is a replica within the same shard
println!("Scenario 2");
drop(curr_shard_addrs_read);
let mut curr_shard_addrs_write = curr_shard_addrs
Expand All @@ -1755,7 +1759,8 @@ where
return curr_shard_addrs_write.promote_replica_to_primary(new_primary);
}
}
// Scenario 3 and 4: Check if the node exists in other shards

// Scenario 3 & 4: Check if the new primary exists in other shards
let mut nodes_iter = connections_container.slot_map_nodes();
for (node_addr, shard_addrs_arc) in &mut nodes_iter {
if node_addr == new_primary {
Expand All @@ -1767,7 +1772,7 @@ where
is_existing_primary = *shard_addrs.primary() == *new_primary;
}
if is_existing_primary {
// Scenario 3: Slot migration to an existing primary in another shard.
// Scenario 3: Slot Migration - The new primary is an existing primary in another shard
// Update the associated addresses for `slot` to `shard_addrs`.
println!("Scenario 3");
drop(nodes_iter);
Expand Down Expand Up @@ -1798,7 +1803,7 @@ where
}
}

// Scenario 5: `new_primary` isn’t present in the current slots map. Add it as a new node in a new shard.
// Scenario 5: New Node - The new primary is not present in the current slots map, add it as a primary of a new shard.
println!("Scenario 5");
drop(nodes_iter);
drop(connections_container);
Expand Down
45 changes: 43 additions & 2 deletions redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ impl Routable for Value {
}
}

#[derive(Debug, Hash)]
#[derive(Debug, Hash, Clone)]
pub(crate) struct Slot {
pub(crate) start: u16,
pub(crate) end: u16,
Expand Down Expand Up @@ -1004,10 +1004,11 @@ impl Route {
mod tests {
use super::{
command_for_multi_slot_indices, AggregateOp, MultipleNodeRoutingInfo, ResponsePolicy,
Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr,
Route, RoutingInfo, ShardAddrs, SingleNodeRoutingInfo, SlotAddr,
};
use crate::{cluster_topology::slot, cmd, parser::parse_redis_value, Value};
use core::panic;
use std::sync::Arc;

#[test]
fn test_routing_info_mixed_capatalization() {
Expand Down Expand Up @@ -1398,4 +1399,44 @@ mod tests {
let result = super::combine_map_results(input);
assert!(result.is_err());
}

fn create_test_shard(primary: &str, replicas: Vec<&str>) -> ShardAddrs {
let primary = Arc::new(primary.to_string());
let replicas = replicas
.into_iter()
.map(|r| Arc::new(r.to_string()))
.collect();
ShardAddrs::new(primary, replicas)
}

#[test]
fn test_promote_replica_to_primary_success() {
let mut shard = create_test_shard("primary1", vec!["replica1", "replica2"]);

// Promote "replica1" to primary
let replica = Arc::new("replica1".to_string());
let result = shard.promote_replica_to_primary(replica.clone());

// Assert the promotion was successful
assert!(result.is_ok());
assert_eq!(*shard.primary(), "replica1");
assert_eq!(*shard.replicas()[0], "primary1");
assert_eq!(*shard.replicas()[1], "replica2");
}

#[test]
fn test_promote_replica_to_primary_not_found() {
let mut shard = create_test_shard("primary1", vec!["replica1", "replica2"]);

// Try to promote a replica that doesn't exist
let non_existent_replica = Arc::new("replica3".to_string());
let result = shard.promote_replica_to_primary(non_existent_replica.clone());

// Assert the promotion failed
assert!(result.is_err(), "{result:?}");
let error = result.err().unwrap();
assert!(error
.to_string()
.contains("Failed to promote replica to primary"));
}
}
Loading

0 comments on commit a374c59

Please sign in to comment.