Skip to content

Commit

Permalink
Dont retry on connection errors where it is unclear if the server rec…
Browse files Browse the repository at this point in the history
…eived the request
  • Loading branch information
barshaul committed Oct 1, 2024
1 parent c690be4 commit 0398fc6
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 99 deletions.
34 changes: 19 additions & 15 deletions redis/src/aio/multiplexed_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ where
&mut self,
item: SinkItem,
timeout: Duration,
) -> Result<Value, Option<RedisError>> {
) -> Result<Value, RedisError> {
self.send_recv(item, None, timeout).await
}

Expand All @@ -359,7 +359,7 @@ where
// If `None`, this is a single request, not a pipeline of multiple requests.
pipeline_response_count: Option<usize>,
timeout: Duration,
) -> Result<Value, Option<RedisError>> {
) -> Result<Value, RedisError> {
let (sender, receiver) = oneshot::channel();

self.sender
Expand All @@ -369,15 +369,25 @@ where
output: sender,
})
.await
.map_err(|_| None)?;
.map_err(|err| {
// If an error occurs here, it means the request never reached the server.
// Since the server did not receive the data, it is safe to retry the request.
RedisError::from((
crate::ErrorKind::IoErrorRetrySafe,
"Failed to send the request to the server",
format!("{err}"),
))
})?;
match Runtime::locate().timeout(timeout, receiver).await {
Ok(Ok(result)) => result.map_err(Some),
Ok(Ok(result)) => result,
Ok(Err(_)) => {
// The `sender` was dropped which likely means that the stream part
// failed for one reason or another
Err(None)
// failed for one reason or another.
// Since we don't know if the server received the request, retrying it isn't safe.
// Hence, we return an IoError instead of an IoErrorRetrySafe.
Err(RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))
}
Err(elapsed) => Err(Some(elapsed.into())),
Err(elapsed) => Err(elapsed.into()),
}
}

Expand Down Expand Up @@ -503,10 +513,7 @@ impl MultiplexedConnection {
let result = self
.pipeline
.send_single(cmd.get_packed_command(), self.response_timeout)
.await
.map_err(|err| {
err.unwrap_or_else(|| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))
});
.await;
if self.protocol != ProtocolVersion::RESP2 {
if let Err(e) = &result {
if e.is_connection_dropped() {
Expand Down Expand Up @@ -537,10 +544,7 @@ impl MultiplexedConnection {
Some(offset + count),
self.response_timeout,
)
.await
.map_err(|err| {
err.unwrap_or_else(|| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))
});
.await;

if self.protocol != ProtocolVersion::RESP2 {
if let Err(e) = &result {
Expand Down
3 changes: 2 additions & 1 deletion redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,8 @@ where
.wait_time_for_retry(retries);
thread::sleep(sleep_time);
}
crate::types::RetryMethod::Reconnect => {
crate::types::RetryMethod::Reconnect
| crate::types::RetryMethod::ReconnectAndRetry => {
if *self.auto_reconnect.borrow() {
if let Ok(mut conn) = self.connect(&addr) {
if conn.check_connection() {
Expand Down
31 changes: 21 additions & 10 deletions redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,16 +255,22 @@ where
&self,
amount: usize,
conn_type: ConnectionType,
) -> impl Iterator<Item = ConnectionAndAddress<Connection>> + '_ {
self.connection_map
.iter()
.choose_multiple(&mut rand::thread_rng(), amount)
.into_iter()
.map(move |item| {
let (address, node) = (item.key(), item.value());
let conn = node.get_connection(&conn_type);
(address.clone(), conn)
})
) -> Option<impl Iterator<Item = ConnectionAndAddress<Connection>> + '_> {
if self.connection_map.is_empty() {
None
} else {
Some(
self.connection_map
.iter()
.choose_multiple(&mut rand::thread_rng(), amount)
.into_iter()
.map(move |item| {
let (address, node) = (item.key(), item.value());
let conn = node.get_connection(&conn_type);
(address.clone(), conn)
}),
)
}
}

pub(crate) fn replace_or_add_connection_for_address(
Expand Down Expand Up @@ -633,6 +639,7 @@ mod tests {

let random_connections: HashSet<_> = container
.random_connections(3, ConnectionType::User)
.expect("No connections found")
.map(|pair| pair.1)
.collect();

Expand All @@ -651,6 +658,7 @@ mod tests {
0,
container
.random_connections(1, ConnectionType::User)
.expect("No connections found")
.count()
);
}
Expand All @@ -665,6 +673,7 @@ mod tests {
);
let random_connections: Vec<_> = container
.random_connections(1, ConnectionType::User)
.expect("No connections found")
.collect();

assert_eq!(vec![(address, 4)], random_connections);
Expand All @@ -675,6 +684,7 @@ mod tests {
let container = create_container();
let mut random_connections: Vec<_> = container
.random_connections(1000, ConnectionType::User)
.expect("No connections found")
.map(|pair| pair.1)
.collect();
random_connections.sort();
Expand All @@ -687,6 +697,7 @@ mod tests {
let container = create_container_with_strategy(ReadFromReplicaStrategy::RoundRobin, true);
let mut random_connections: Vec<_> = container
.random_connections(1000, ConnectionType::PreferManagement)
.expect("No connections found")
.map(|pair| pair.1)
.collect();
random_connections.sort();
Expand Down
108 changes: 81 additions & 27 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,7 @@ enum Next<C> {
// if not set, then a reconnect should happen without sending a request afterwards
request: Option<PendingRequest<C>>,
target: String,
should_retry: bool,
},
RefreshSlots {
// if not set, then a slot refresh should happen without sending a request afterwards
Expand Down Expand Up @@ -860,6 +861,7 @@ impl<C> Future for Request<C> {
let request = this.request.as_mut().unwrap();
// TODO - would be nice if we didn't need to repeat this code twice, with & without retries.
if request.retry >= this.retry_params.number_of_retries {
let retry_method = err.retry_method();
let next = if err.kind() == ErrorKind::AllConnectionsUnavailable {
Next::ReconnectToInitialNodes { request: None }.into()
} else if matches!(err.retry_method(), crate::types::RetryMethod::MovedRedirect)
Expand All @@ -870,11 +872,18 @@ impl<C> Future for Request<C> {
sleep_duration: None,
}
.into()
} else if matches!(err.retry_method(), crate::types::RetryMethod::Reconnect) {
} else if matches!(retry_method, crate::types::RetryMethod::Reconnect)
|| matches!(retry_method, crate::types::RetryMethod::ReconnectAndRetry)
{
if let OperationTarget::Node { address } = target {
let should_retry = matches!(
retry_method,
crate::types::RetryMethod::ReconnectAndRetry
);
Next::Reconnect {
request: None,
target: address,
should_retry,
}
.into()
} else {
Expand Down Expand Up @@ -949,14 +958,20 @@ impl<C> Future for Request<C> {
});
self.poll(cx)
}
crate::types::RetryMethod::Reconnect => {
crate::types::RetryMethod::Reconnect
| crate::types::RetryMethod::ReconnectAndRetry => {
let mut request = this.request.take().unwrap();
// TODO should we reset the redirect here?
request.info.reset_routing();
warn!("disconnected from {:?}", address);
let should_retry = matches!(
err.retry_method(),
crate::types::RetryMethod::ReconnectAndRetry
);
Next::Reconnect {
request: Some(request),
target: address,
should_retry,
}
.into()
}
Expand Down Expand Up @@ -1201,8 +1216,11 @@ where
Ok(connections.0)
}

fn reconnect_to_initial_nodes(&mut self) -> impl Future<Output = ()> {
let inner = self.inner.clone();
// Reconnet to the initial nodes provided by the user in the creation of the client,
// and try to refresh the slots based on the initial connections.
// Being used when all cluster connections are unavailable.
fn reconnect_to_initial_nodes(inner: Arc<InnerCore<C>>) -> impl Future<Output = ()> {
let inner = inner.clone();
async move {
let connection_map = match Self::create_initial_connections(
&inner.initial_nodes,
Expand Down Expand Up @@ -1708,7 +1726,9 @@ where
Self::refresh_slots_inner(inner, curr_retry)
.await
.map_err(|err| {
if curr_retry > DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES {
if curr_retry > DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES
|| err.kind() == ErrorKind::AllConnectionsUnavailable
{
BackoffError::Permanent(err)
} else {
BackoffError::from(err)
Expand Down Expand Up @@ -1817,7 +1837,6 @@ where
core: Core<C>,
response_policy: Option<ResponsePolicy>,
) -> OperationResult {
trace!("execute_on_multiple_nodes");
let connections_container = core.conn_lock.read().await;
if connections_container.is_empty() {
return OperationResult::Err((
Expand Down Expand Up @@ -2101,14 +2120,20 @@ where
}
ConnectionCheck::RandomConnection => {
let read_guard = core.conn_lock.read().await;
let (random_address, random_conn_future) = read_guard
.random_connections(1, ConnectionType::User)
.next()
.ok_or(RedisError::from((
let random_conns_option = read_guard.random_connections(1, ConnectionType::User);
if let Some(mut random_connections) = random_conns_option {
let (random_address, random_conn_future) =
random_connections.next().ok_or(RedisError::from((
ErrorKind::AllConnectionsUnavailable,
"No random connection found",
)))?;
return Ok((random_address, random_conn_future.await));
} else {
return Err(RedisError::from((
ErrorKind::AllConnectionsUnavailable,
"No random connection found",
)))?;
return Ok((random_address, random_conn_future.await));
)));
}
}
};

Expand All @@ -2123,29 +2148,42 @@ where
ConnectionState::PollComplete => return Poll::Ready(Ok(())),
ConnectionState::Recover(future) => future,
};
match recover_future {
let (next_state, poll) = match recover_future {
RecoverFuture::RecoverSlots(ref mut future) => match ready!(future.as_mut().poll(cx)) {
Ok(_) => {
trace!("Recovered!");
self.state = ConnectionState::PollComplete;
Poll::Ready(Ok(()))
(Some(ConnectionState::PollComplete), Poll::Ready(Ok(())))
}
Err(err) => {
trace!("Recover slots failed!");
*future = Box::pin(Self::refresh_slots_and_subscriptions_with_retries(
self.inner.clone(),
&RefreshPolicy::Throttable,
));
Poll::Ready(Err(err))

let next_state = if err.kind() == ErrorKind::AllConnectionsUnavailable {
Some(ConnectionState::Recover(RecoverFuture::Reconnect(
Box::pin(ClusterConnInner::reconnect_to_initial_nodes(
self.inner.clone(),
)),
)))
} else {
Some(ConnectionState::Recover(RecoverFuture::RecoverSlots(
Box::pin(Self::refresh_slots_and_subscriptions_with_retries(
self.inner.clone(),
&RefreshPolicy::Throttable,
)),
)))
};
(next_state, Poll::Ready(Err(err)))
}
},
RecoverFuture::Reconnect(ref mut future) => {
ready!(future.as_mut().poll(cx));
trace!("Reconnected connections");
self.state = ConnectionState::PollComplete;
Poll::Ready(Ok(()))
(Some(ConnectionState::PollComplete), Poll::Ready(Ok(())))
}
};
if let Some(state) = next_state {
self.state = state;
}
poll
}

async fn handle_loading_error(
Expand Down Expand Up @@ -2255,12 +2293,16 @@ where
}
}
Next::Reconnect {
request, target, ..
request,
target,
should_retry,
} => {
poll_flush_action =
poll_flush_action.change_state(PollFlushAction::Reconnect(vec![target]));
if let Some(request) = request {
self.inner.pending_requests.lock().unwrap().push(request);
if should_retry {
self.inner.pending_requests.lock().unwrap().push(request);
}
}
}
Next::ReconnectToInitialNodes { request } => {
Expand Down Expand Up @@ -2399,7 +2441,7 @@ where
}
PollFlushAction::ReconnectFromInitialConnections => {
self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
self.reconnect_to_initial_nodes(),
ClusterConnInner::reconnect_to_initial_nodes(self.inner.clone()),
)));
}
}
Expand Down Expand Up @@ -2441,8 +2483,20 @@ async fn calculate_topology_from_random_nodes<'a, C>(
where
C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
{
let requested_nodes =
read_guard.random_connections(num_of_nodes_to_query, ConnectionType::PreferManagement);
let requested_nodes = match read_guard
.random_connections(num_of_nodes_to_query, ConnectionType::PreferManagement)
{
Some(random_conns) => random_conns,
None => {
return (
Err(RedisError::from((
ErrorKind::AllConnectionsUnavailable,
"No available connections to refresh slots from",
))),
vec![],
)
}
};
let topology_join_results =
futures::future::join_all(requested_nodes.map(|(addr, conn)| async move {
let mut conn: C = conn.await;
Expand Down
Loading

0 comments on commit 0398fc6

Please sign in to comment.