From bf8924ae641863a937db3560892e3f125b0824a6 Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Tue, 25 Apr 2023 22:17:03 -0500 Subject: [PATCH] fix: retry should drop state on RESET (#337) * fix: retry should drop state on RESET * test: retry should drop state on RESET --- Cargo.lock | 2 +- toad/src/step/retry.rs | 169 +++++++++++++++++++++++++++++++++++++---- 2 files changed, 156 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 257243f..3bcc975 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1012,7 +1012,7 @@ dependencies = [ [[package]] name = "toad" -version = "0.18.0" +version = "0.19.0" dependencies = [ "embedded-time", "lazycell", diff --git a/toad/src/step/retry.rs b/toad/src/step/retry.rs index 4e6f81a..0fcc4fe 100644 --- a/toad/src/step/retry.rs +++ b/toad/src/step/retry.rs @@ -111,19 +111,13 @@ pub trait Buf

/// We saw an ACK and should transition the retry state for matching outbound /// CONs to the "acked" state fn mark_acked(&mut self, now: Instant, effects: &mut P::Effects, token: Token) { - let found = self.iter() - .enumerate() - .find(|(_, (_, msg))| msg.data().token == token); + let found = self.iter_mut().find(|(_, msg)| msg.data().token == token); - let (ix, new_timer) = match found { - | Some((ix, _)) if self[ix].1.data().code.kind() == CodeKind::Response => { - return self.forget(now, effects, token) + match found { + | Some((_, msg)) if msg.data().code.kind() == CodeKind::Response => { + self.forget(now, effects, token); }, - | Some((ix, - (state @ State::ConPreAck { post_ack_strategy, - post_ack_max_attempts, - .. }, - msg))) => { + | Some((state, msg)) if matches!(state, State::ConPreAck { .. }) => { let dbg = Self::debug(now, state, msg); log!(retry::Buf::mark_acked, effects, @@ -132,12 +126,50 @@ pub trait Buf

dbg.msg_short, dbg.since_last_attempt, dbg.since_first_attempt); - (ix, RetryTimer::new(now, *post_ack_strategy, *post_ack_max_attempts)) + + let timer = match state { + | State::ConPreAck { post_ack_strategy, + post_ack_max_attempts, + .. } => { + RetryTimer::new(now, *post_ack_strategy, *post_ack_max_attempts) + }, + | _ => unreachable!(), + }; + + *state = State::Just(timer); + }, + | _ => { + log!(retry::Buf::mark_acked, + effects, + log::Level::Info, + "ACK {:?} does not apply to any known messages", + token); }, - | _ => return, }; + } + + /// We saw a RESET regarding token `token` + fn mark_reset(&mut self, now: Instant, effects: &mut P::Effects, token: Token) { + let found = self.iter().find(|(_, msg)| msg.data().token == token); - self.get_mut(ix).unwrap().0 = State::Just(new_timer); + match found { + | Some((state, msg)) => { + let dbg = Self::debug(now, state, msg); + log!(retry::Buf::mark_reset, + effects, + log::Level::Debug, + "{} got RESET, dropping all retry state.", + dbg.msg_short); + self.forget(now, effects, token) + }, + | _ => { + log!(retry::Buf::mark_reset, + effects, + log::Level::Info, + "RESET {:?} does not correspond to any known messages", + token); + }, + }; } /// Called when a response of any kind to any request is @@ -150,6 +182,10 @@ pub trait Buf

msg: Addrd<&platform::Message

>) -> Result<(), Error> { match (msg.data().ty, msg.data().code.kind()) { + | (Type::Reset, _) => { + self.mark_reset(now, effects, msg.data().token); + Ok(()) + }, | (Type::Ack, CodeKind::Empty) => { log!(retry::Buf::maybe_seen_response, effects, log::Level::Trace, "ACK 0.00 {:?} means we should find the corresponding outbound CON and either forget (if CON response) or transition to expecting a response (if CON request). No following logs means the ACK was unexpected.", msg.data().token); self.mark_acked(now, effects, msg.data().token); @@ -603,6 +639,111 @@ mod tests { assert_eq!(sent!().len(), 2); } + /* + * | t | what | + * | ------ | ------------------------------------------------- | + * | 50 | CON request sent | + * | 100 | got RESET | + * | 10_000 | should not have retried | + */ + #[test] + fn when_outbound_message_reset_retry_should_not_retry() { + type Mock = test::MockStep<(), Addrd, Addrd, ()>; + let s = Retry::::default(); + + let token_a = Token(array_vec![1, 2, 3]); + let token_a: &'static Token = unsafe { core::mem::transmute::<_, _>(&token_a) }; + + let token_b = Token(array_vec![1, 2, 4]); + let token_b: &'static Token = unsafe { core::mem::transmute::<_, _>(&token_b) }; + + let token_c = Token(array_vec![1, 2, 5]); + let token_c: &'static Token = unsafe { core::mem::transmute::<_, _>(&token_c) }; + + s.inner() + .set_poll_resp(|_, Snapshot { time, .. }, _, token, _| { + let time: u64 = Milliseconds::try_from(time.duration_since_epoch()).unwrap() + .0; + + let mut rst = test::msg!(RESET x.x.x.x:0000); + rst.as_mut().token = token; + + match time { + | 150 => Some(Ok(rst.map(Resp::from))), + | _ => None, + } + }) + .set_poll_req(|_, Snapshot { time, .. }, _| { + let time: u64 = Milliseconds::try_from(time.duration_since_epoch()).unwrap() + .0; + + let mut rst = test::msg!(RESET x.x.x.x:0000); + rst.as_mut().token = *token_c; + + match time { + | 150 => Some(Ok(rst.map(Req::from))), + | _ => None, + } + }); + let cfg = config(200, 400); + let mut effs = Vec::::new(); + macro_rules! sent { + () => {{ + effs.iter().filter(|e| matches!(e, Effect::Log(_, _))).for_each(|e| match e { + Effect::Log(l, m) => println!("[{:?}] {}", l, m.as_str()), + _ => (), + }); + effs.iter().filter(|e| matches!(e, Effect::Send(_))).collect::>() + }}; + } + + let mut con_req = test::msg!(CON GET x.x.x.x:1111); + con_req.as_mut().token = *token_a; + + let mut non_req = test::msg!(NON GET x.x.x.x:1111); + non_req.as_mut().token = *token_b; + + let mut con_rep = test::msg!(CON {2 . 04} x.x.x.x:1111); + con_rep.as_mut().token = *token_c; + + s.on_message_sent(&snap_time(cfg, 50), &mut effs, &con_rep) + .unwrap(); + s.on_message_sent(&snap_time(cfg, 50), &mut effs, &con_req) + .unwrap(); + s.on_message_sent(&snap_time(cfg, 50), &mut effs, &non_req) + .unwrap(); + + let rep = s.poll_resp(&snap_time(cfg, 150), &mut effs, *token_a, con_req.addr()) + .unwrap() + .unwrap(); + assert_eq!(sent!().len(), 0); + assert_eq!(rep.data().msg().ty, Type::Reset); + + let rep = s.poll_resp(&snap_time(cfg, 150), &mut effs, *token_b, con_req.addr()) + .unwrap() + .unwrap(); + assert_eq!(sent!().len(), 0); + assert_eq!(rep.data().msg().ty, Type::Reset); + + let req = s.poll_req(&snap_time(cfg, 150), &mut effs) + .unwrap() + .unwrap(); + assert_eq!(sent!().len(), 0); + assert_eq!(req.data().msg().ty, Type::Reset); + + s.poll_resp(&snap_time(cfg, 10_000), &mut effs, *token_a, con_req.addr()) + .ok_or(()) + .unwrap_err(); + s.poll_resp(&snap_time(cfg, 10_000), &mut effs, *token_b, con_req.addr()) + .ok_or(()) + .unwrap_err(); + s.poll_req(&snap_time(cfg, 10_000), &mut effs) + .ok_or(()) + .unwrap_err(); + + assert_eq!(sent!().len(), 0); + } + /* * | t | what | * | ------ | ------------------------------------------------- |