Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
barshaul committed Sep 12, 2024
1 parent bda970f commit 58564f6
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 69 deletions.
10 changes: 4 additions & 6 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,9 +635,7 @@ pub(crate) struct RedirectNode {
}

impl RedirectNode {
/// This function expects an `Option` containing a tuple with a string slice and a u16.
/// The tuple represents an address and a slot, respectively. If the input is `Some`,
/// the function converts the address to a `String` and constructs a `RedirectNode`.
/// Constructs a `RedirectNode` from an optional tuple containing an address and a slot number.
pub(crate) fn from_option_tuple(option: Option<(&str, u16)>) -> Option<Self> {
option.map(|(address, slot)| RedirectNode {
address: address.to_string(),
Expand Down Expand Up @@ -828,8 +826,8 @@ impl<C> Future for Request<C> {
// Updating the slot map based on the MOVED error is an optimization.
// If it fails, proceed by retrying the request with the redirected node,
// and allow the slot refresh task to correct the slot map.
warn!(
"Failed to update the slot map based on the received MOVED error.\n
info!(
"Failed to update the slot map based on the received MOVED error.
Error: {err:?}"
);
}
Expand Down Expand Up @@ -1739,7 +1737,7 @@ where
// Scenario 1: No changes needed as the new primary is already the current slot owner.
// Scenario 2: Failover occurred and the new primary was promoted from a replica.
ShardUpdateResult::AlreadyPrimary | ShardUpdateResult::Promoted => return Ok(()),
// If the node was not found, proceed with further scenarios.
// The node was not found in this shard, proceed with further scenarios.
ShardUpdateResult::NodeNotFound => {}
}
}
Expand Down
82 changes: 19 additions & 63 deletions redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,8 @@ pub(crate) enum ShardUpdateResult {
NodeNotFound,
}

const READ_LK_ERR_SHARDADDRS: &str = "Failed to acquire read lock for ShardAddrs";
const WRITE_LK_ERR_SHARDADDRS: &str = "Failed to acquire write lock for ShardAddrs";
/// This is just a simplified version of [`Slot`],
/// which stores only the master and [optional] replica
/// to avoid the need to choose a replica each time
Expand All @@ -920,23 +922,11 @@ pub(crate) struct ShardAddrs {

impl PartialEq for ShardAddrs {
fn eq(&self, other: &Self) -> bool {
let self_primary = self
.primary
.read()
.expect("Failed to acquire lock on ShardAddrs");
let other_primary = other
.primary
.read()
.expect("Failed to acquire lock on ShardAddrs");

let self_replicas = self
.replicas
.read()
.expect("Failed to acquire lock on ShardAddrs");
let other_replicas = other
.replicas
.read()
.expect("Failed to acquire lock on ShardAddrs");
let self_primary = self.primary.read().expect(READ_LK_ERR_SHARDADDRS);
let other_primary = other.primary.read().expect(READ_LK_ERR_SHARDADDRS);

let self_replicas = self.replicas.read().expect(READ_LK_ERR_SHARDADDRS);
let other_replicas = other.replicas.read().expect(READ_LK_ERR_SHARDADDRS);

*self_primary == *other_primary && *self_replicas == *other_replicas
}
Expand All @@ -952,25 +942,13 @@ impl PartialOrd for ShardAddrs {

impl Ord for ShardAddrs {
fn cmp(&self, other: &Self) -> Ordering {
let self_primary = self
.primary
.read()
.expect("Failed to acquire lock on ShardAddrs");
let other_primary = other
.primary
.read()
.expect("Failed to acquire lock on ShardAddrs");
let self_primary = self.primary.read().expect(READ_LK_ERR_SHARDADDRS);
let other_primary = other.primary.read().expect(READ_LK_ERR_SHARDADDRS);

let primary_cmp = self_primary.cmp(&other_primary);
if primary_cmp == Ordering::Equal {
let self_replicas = self
.replicas
.read()
.expect("Failed to acquire lock on ShardAddrs");
let other_replicas = other
.replicas
.read()
.expect("Failed to acquire lock on ShardAddrs");
let self_replicas = self.replicas.read().expect(READ_LK_ERR_SHARDADDRS);
let other_replicas = other.replicas.read().expect(READ_LK_ERR_SHARDADDRS);
return self_replicas.cmp(&other_replicas);
}

Expand All @@ -986,20 +964,15 @@ impl ShardAddrs {
}

pub(crate) fn new_with_primary(primary: Arc<String>) -> Self {
Self::new(primary, Vec::new())
Self::new(primary, Vec::default())
}

pub(crate) fn primary(&self) -> Arc<String> {
self.primary
.read()
.expect("Failed to acquire lock on ShardAddrs")
.clone()
self.primary.read().expect(READ_LK_ERR_SHARDADDRS).clone()
}

pub(crate) fn replicas(&self) -> std::sync::RwLockReadGuard<Vec<Arc<String>>> {
self.replicas
.read()
.expect("Failed to acquire lock on ShardAddrs")
self.replicas.read().expect(READ_LK_ERR_SHARDADDRS)
}

/// Attempts to update the shard roles based on the provided `new_primary`.
Expand All @@ -1025,14 +998,8 @@ impl ShardAddrs {
/// # Returns:
/// * `ShardUpdateResult` - The result of the role update operation.
pub(crate) fn attempt_shard_role_update(&self, new_primary: Arc<String>) -> ShardUpdateResult {
let mut primary_lock = self
.primary
.write()
.expect("Failed to acquire lock on ShardAddrs");
let mut replicas_lock = self
.replicas
.write()
.expect("Failed to acquire lock on ShardAddrs");
let mut primary_lock = self.primary.write().expect(WRITE_LK_ERR_SHARDADDRS);
let mut replicas_lock = self.replicas.write().expect(WRITE_LK_ERR_SHARDADDRS);

// If the new primary is already the current primary, return early.
if *primary_lock == new_primary {
Expand Down Expand Up @@ -1069,10 +1036,7 @@ impl ShardAddrs {
/// * `RedisResult<()>` - `Ok(())` if the replica was successfully removed, or an error if the
/// replica was not found.
pub(crate) fn remove_replica(&self, replica_to_remove: Arc<String>) -> RedisResult<()> {
let mut replicas_lock = self
.replicas
.write()
.expect("Failed to acquire lock on ShardAddrs");
let mut replicas_lock = self.replicas.write().expect(WRITE_LK_ERR_SHARDADDRS);
if let Some(index) = Self::replica_index(&replicas_lock, replica_to_remove.clone()) {
replicas_lock.remove(index);
Ok(())
Expand All @@ -1091,16 +1055,8 @@ impl<'a> IntoIterator for &'a ShardAddrs {
type IntoIter = std::iter::Chain<Once<Arc<String>>, std::vec::IntoIter<Arc<String>>>;

fn into_iter(self) -> Self::IntoIter {
let primary = self
.primary
.read()
.expect("Failed to acquire lock on ShardAddrs")
.clone();
let replicas = self
.replicas
.read()
.expect("Failed to acquire lock on ShardAddrs")
.clone();
let primary = self.primary.read().expect(READ_LK_ERR_SHARDADDRS).clone();
let replicas = self.replicas.read().expect(READ_LK_ERR_SHARDADDRS).clone();

std::iter::once(primary).chain(replicas)
}
Expand Down

0 comments on commit 58564f6

Please sign in to comment.