Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge fixes from upstream #165

Merged
merged 3 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 83 additions & 46 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,26 +606,31 @@ impl<C> RequestInfo<C> {
}
}

fn reset_redirect(&mut self) {
fn reset_routing(&mut self) {
let fix_route = |route: &mut InternalSingleNodeRouting<C>| {
match route {
InternalSingleNodeRouting::Redirect {
previous_routing, ..
} => {
let previous_routing = std::mem::take(previous_routing.as_mut());
*route = previous_routing;
}
// If a specific connection is specified, then reconnecting without resetting the routing
// will mean that the request is still routed to the old connection.
InternalSingleNodeRouting::Connection { address, .. } => {
*route = InternalSingleNodeRouting::ByAddress(address.to_string());
}
_ => {}
}
};
match &mut self.cmd {
CmdArg::Cmd { routing, .. } => {
if let InternalRoutingInfo::SingleNode(InternalSingleNodeRouting::Redirect {
previous_routing,
..
}) = routing
{
let previous_routing = std::mem::take(previous_routing.as_mut());
*routing = previous_routing.into();
if let InternalRoutingInfo::SingleNode(route) = routing {
fix_route(route);
}
}
CmdArg::Pipeline { route, .. } => {
if let InternalSingleNodeRouting::Redirect {
previous_routing, ..
} = route
{
let previous_routing = std::mem::take(previous_routing.as_mut());
*route = previous_routing;
}
fix_route(route);
}
// cluster_scan is sent as a normal command internally so we will not reach that point.
CmdArg::ClusterScan { .. } => {
Expand Down Expand Up @@ -675,15 +680,18 @@ enum Next<C> {
address: ArcStr,
},
Reconnect {
request: PendingRequest<C>,
// if not set, then a reconnect should happen without sending a request afterwards
request: Option<PendingRequest<C>>,
target: ArcStr,
},
RefreshSlots {
request: PendingRequest<C>,
// if not set, then a slot refresh should happen without sending a request afterwards
request: Option<PendingRequest<C>>,
sleep_duration: Option<Duration>,
},
ReconnectToInitialNodes {
request: PendingRequest<C>,
// if not set, then a reconnect should happen without sending a request afterwards
request: Option<PendingRequest<C>>,
},
Done,
}
Expand Down Expand Up @@ -714,16 +722,39 @@ impl<C> Future for Request<C> {
}
Err((target, err)) => {
let request = this.request.as_mut().unwrap();

// TODO - would be nice if we didn't need to repeat this code twice, with & without retries.
if request.retry >= this.retry_params.number_of_retries {
let next = if err.kind() == ErrorKind::ClusterConnectionNotFound {
Next::ReconnectToInitialNodes { request: None }.into()
} else if matches!(err.retry_method(), crate::types::RetryMethod::MovedRedirect)
|| matches!(target, OperationTarget::NotFound)
{
Next::RefreshSlots {
request: None,
sleep_duration: None,
}
.into()
} else if matches!(err.retry_method(), crate::types::RetryMethod::Reconnect) {
if let OperationTarget::Node { address } = target {
Next::Reconnect {
request: None,
target: address,
}
.into()
} else {
Next::Done.into()
}
} else {
Next::Done.into()
};
self.respond(Err(err));
return Next::Done.into();
return next;
}
request.retry = request.retry.saturating_add(1);

if err.kind() == ErrorKind::ClusterConnectionNotFound {
return Next::ReconnectToInitialNodes {
request: this.request.take().unwrap(),
request: Some(this.request.take().unwrap()),
}
.into();
}
Expand All @@ -742,9 +773,9 @@ impl<C> Future for Request<C> {
OperationTarget::NotFound => {
// TODO - this is essentially a repeat of the retirable error. probably can remove duplication.
let mut request = this.request.take().unwrap();
request.info.reset_redirect();
request.info.reset_routing();
return Next::RefreshSlots {
request,
request: Some(request),
sleep_duration: Some(sleep_duration),
}
.into();
Expand All @@ -768,7 +799,7 @@ impl<C> Future for Request<C> {
.map(|(node, _slot)| Redirect::Moved(node.to_string())),
);
Next::RefreshSlots {
request,
request: Some(request),
sleep_duration: None,
}
.into()
Expand All @@ -784,10 +815,10 @@ impl<C> Future for Request<C> {
crate::types::RetryMethod::Reconnect => {
let mut request = this.request.take().unwrap();
// TODO should we reset the redirect here?
request.info.reset_redirect();
request.info.reset_routing();
warn!("disconnected from {:?}", address);
Next::Reconnect {
request,
request: Some(request),
target: address,
}
.into()
Expand Down Expand Up @@ -1843,36 +1874,42 @@ where
} => {
poll_flush_action =
poll_flush_action.change_state(PollFlushAction::RebuildSlots);
let future: RequestState<
Pin<Box<dyn Future<Output = OperationResult> + Send>>,
> = match sleep_duration {
Some(sleep_duration) => RequestState::Sleep {
sleep: boxed_sleep(sleep_duration),
},
None => RequestState::Future {
future: Box::pin(Self::try_request(
request.info.clone(),
self.inner.clone(),
)),
},
};
self.in_flight_requests.push(Box::pin(Request {
retry_params: self.inner.cluster_params.retry_params.clone(),
request: Some(request),
future,
}));
if let Some(request) = request {
let future: RequestState<
Pin<Box<dyn Future<Output = OperationResult> + Send>>,
> = match sleep_duration {
Some(sleep_duration) => RequestState::Sleep {
sleep: boxed_sleep(sleep_duration),
},
None => RequestState::Future {
future: Box::pin(Self::try_request(
request.info.clone(),
self.inner.clone(),
)),
},
};
self.in_flight_requests.push(Box::pin(Request {
retry_params: self.inner.cluster_params.retry_params.clone(),
request: Some(request),
future,
}));
}
}
Next::Reconnect {
request, target, ..
} => {
poll_flush_action =
poll_flush_action.change_state(PollFlushAction::Reconnect(vec![target]));
self.inner.pending_requests.lock().unwrap().push(request);
if let Some(request) = request {
self.inner.pending_requests.lock().unwrap().push(request);
}
}
Next::ReconnectToInitialNodes { request } => {
poll_flush_action = poll_flush_action
.change_state(PollFlushAction::ReconnectFromInitialConnections);
self.inner.pending_requests.lock().unwrap().push(request);
if let Some(request) = request {
self.inner.pending_requests.lock().unwrap().push(request);
}
}
}
}
Expand Down
Loading