Skip to content

Commit

Permalink
fix: retry should drop state on RESET (#337)
Browse files Browse the repository at this point in the history
* fix: retry should drop state on RESET

* test: retry should drop state on RESET
  • Loading branch information
cakekindel authored Apr 26, 2023
1 parent 932b168 commit bf8924a
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

169 changes: 155 additions & 14 deletions toad/src/step/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,13 @@ pub trait Buf<P>
/// We saw an ACK and should transition the retry state for matching outbound
/// CONs to the "acked" state
fn mark_acked(&mut self, now: Instant<P::Clock>, effects: &mut P::Effects, token: Token) {
let found = self.iter()
.enumerate()
.find(|(_, (_, msg))| msg.data().token == token);
let found = self.iter_mut().find(|(_, msg)| msg.data().token == token);

let (ix, new_timer) = match found {
| Some((ix, _)) if self[ix].1.data().code.kind() == CodeKind::Response => {
return self.forget(now, effects, token)
match found {
| Some((_, msg)) if msg.data().code.kind() == CodeKind::Response => {
self.forget(now, effects, token);
},
| Some((ix,
(state @ State::ConPreAck { post_ack_strategy,
post_ack_max_attempts,
.. },
msg))) => {
| Some((state, msg)) if matches!(state, State::ConPreAck { .. }) => {
let dbg = Self::debug(now, state, msg);
log!(retry::Buf::mark_acked,
effects,
Expand All @@ -132,12 +126,50 @@ pub trait Buf<P>
dbg.msg_short,
dbg.since_last_attempt,
dbg.since_first_attempt);
(ix, RetryTimer::new(now, *post_ack_strategy, *post_ack_max_attempts))

let timer = match state {
| State::ConPreAck { post_ack_strategy,
post_ack_max_attempts,
.. } => {
RetryTimer::new(now, *post_ack_strategy, *post_ack_max_attempts)
},
| _ => unreachable!(),
};

*state = State::Just(timer);
},
| _ => {
log!(retry::Buf::mark_acked,
effects,
log::Level::Info,
"ACK {:?} does not apply to any known messages",
token);
},
| _ => return,
};
}

/// We saw a RESET regarding token `token`
fn mark_reset(&mut self, now: Instant<P::Clock>, effects: &mut P::Effects, token: Token) {
let found = self.iter().find(|(_, msg)| msg.data().token == token);

self.get_mut(ix).unwrap().0 = State::Just(new_timer);
match found {
| Some((state, msg)) => {
let dbg = Self::debug(now, state, msg);
log!(retry::Buf::mark_reset,
effects,
log::Level::Debug,
"{} got RESET, dropping all retry state.",
dbg.msg_short);
self.forget(now, effects, token)
},
| _ => {
log!(retry::Buf::mark_reset,
effects,
log::Level::Info,
"RESET {:?} does not correspond to any known messages",
token);
},
};
}

/// Called when a response of any kind to any request is
Expand All @@ -150,6 +182,10 @@ pub trait Buf<P>
msg: Addrd<&platform::Message<P>>)
-> Result<(), Error<E>> {
match (msg.data().ty, msg.data().code.kind()) {
| (Type::Reset, _) => {
self.mark_reset(now, effects, msg.data().token);
Ok(())
},
| (Type::Ack, CodeKind::Empty) => {
log!(retry::Buf::maybe_seen_response, effects, log::Level::Trace, "ACK 0.00 {:?} means we should find the corresponding outbound CON and either forget (if CON response) or transition to expecting a response (if CON request). No following logs means the ACK was unexpected.", msg.data().token);
self.mark_acked(now, effects, msg.data().token);
Expand Down Expand Up @@ -603,6 +639,111 @@ mod tests {
assert_eq!(sent!().len(), 2);
}

/*
* | t | what |
* | ------ | ------------------------------------------------- |
* | 50 | CON request sent |
* | 100 | got RESET |
* | 10_000 | should not have retried |
*/
#[test]
fn when_outbound_message_reset_retry_should_not_retry() {
type Mock = test::MockStep<(), Addrd<test::Req>, Addrd<test::Resp>, ()>;
let s = Retry::<Mock>::default();

let token_a = Token(array_vec![1, 2, 3]);
let token_a: &'static Token = unsafe { core::mem::transmute::<_, _>(&token_a) };

let token_b = Token(array_vec![1, 2, 4]);
let token_b: &'static Token = unsafe { core::mem::transmute::<_, _>(&token_b) };

let token_c = Token(array_vec![1, 2, 5]);
let token_c: &'static Token = unsafe { core::mem::transmute::<_, _>(&token_c) };

s.inner()
.set_poll_resp(|_, Snapshot { time, .. }, _, token, _| {
let time: u64 = Milliseconds::try_from(time.duration_since_epoch()).unwrap()
.0;

let mut rst = test::msg!(RESET x.x.x.x:0000);
rst.as_mut().token = token;

match time {
| 150 => Some(Ok(rst.map(Resp::from))),
| _ => None,
}
})
.set_poll_req(|_, Snapshot { time, .. }, _| {
let time: u64 = Milliseconds::try_from(time.duration_since_epoch()).unwrap()
.0;

let mut rst = test::msg!(RESET x.x.x.x:0000);
rst.as_mut().token = *token_c;

match time {
| 150 => Some(Ok(rst.map(Req::from))),
| _ => None,
}
});
let cfg = config(200, 400);
let mut effs = Vec::<test::Effect>::new();
macro_rules! sent {
() => {{
effs.iter().filter(|e| matches!(e, Effect::Log(_, _))).for_each(|e| match e {
Effect::Log(l, m) => println!("[{:?}] {}", l, m.as_str()),
_ => (),
});
effs.iter().filter(|e| matches!(e, Effect::Send(_))).collect::<Vec<&test::Effect>>()
}};
}

let mut con_req = test::msg!(CON GET x.x.x.x:1111);
con_req.as_mut().token = *token_a;

let mut non_req = test::msg!(NON GET x.x.x.x:1111);
non_req.as_mut().token = *token_b;

let mut con_rep = test::msg!(CON {2 . 04} x.x.x.x:1111);
con_rep.as_mut().token = *token_c;

s.on_message_sent(&snap_time(cfg, 50), &mut effs, &con_rep)
.unwrap();
s.on_message_sent(&snap_time(cfg, 50), &mut effs, &con_req)
.unwrap();
s.on_message_sent(&snap_time(cfg, 50), &mut effs, &non_req)
.unwrap();

let rep = s.poll_resp(&snap_time(cfg, 150), &mut effs, *token_a, con_req.addr())
.unwrap()
.unwrap();
assert_eq!(sent!().len(), 0);
assert_eq!(rep.data().msg().ty, Type::Reset);

let rep = s.poll_resp(&snap_time(cfg, 150), &mut effs, *token_b, con_req.addr())
.unwrap()
.unwrap();
assert_eq!(sent!().len(), 0);
assert_eq!(rep.data().msg().ty, Type::Reset);

let req = s.poll_req(&snap_time(cfg, 150), &mut effs)
.unwrap()
.unwrap();
assert_eq!(sent!().len(), 0);
assert_eq!(req.data().msg().ty, Type::Reset);

s.poll_resp(&snap_time(cfg, 10_000), &mut effs, *token_a, con_req.addr())
.ok_or(())
.unwrap_err();
s.poll_resp(&snap_time(cfg, 10_000), &mut effs, *token_b, con_req.addr())
.ok_or(())
.unwrap_err();
s.poll_req(&snap_time(cfg, 10_000), &mut effs)
.ok_or(())
.unwrap_err();

assert_eq!(sent!().len(), 0);
}

/*
* | t | what |
* | ------ | ------------------------------------------------- |
Expand Down

0 comments on commit bf8924a

Please sign in to comment.