From b99624ff6486687c127c354b8c13d7cbb4d1c27c Mon Sep 17 00:00:00 2001
From: Tomek Karwowski <to.karwowski@gmail.com>
Date: Fri, 19 Aug 2022 21:03:28 +0200
Subject: [PATCH 1/6] feat(common): bytes::Buf wrapper that notifies
 subscribers on EOS

---
 src/common/buf.rs | 66 +++++++++++++++++++++++++++++++++++++++++++++++
 src/common/mod.rs |  1 +
 2 files changed, 67 insertions(+)
 create mode 100644 src/common/buf.rs

diff --git a/src/common/buf.rs b/src/common/buf.rs
new file mode 100644
index 00000000..3afd4cdc
--- /dev/null
+++ b/src/common/buf.rs
@@ -0,0 +1,66 @@
+use hyper::body::Buf;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use tokio::sync::Notify;
+
+#[derive(Clone)]
+pub struct EosSignaler {
+    notifier: Arc<Notify>,
+}
+
+impl EosSignaler {
+    fn notify_eos(&self) {
+        self.notifier.notify_waiters();
+    }
+
+    pub async fn wait_till_eos(self) {
+        self.notifier.notified().await;
+    }
+}
+
+pub struct AlertOnEos<B> {
+    inner: B,
+    signaler: EosSignaler,
+    // It'd be better if we consumed the signaler, making it inaccessible after notification.
+    // Unfortunately, that would require something like AtomicOption.
+    // arc_swap::ArcSwapOption was tried, but it can only return an Arc, and the pointed-to value cannot be consumed.
+    // One could write an AtomicOption type (like this https://docs.rs/atomic-option/0.1.2/atomic_option/),
+    // but it requires both unsafe and heap allocation, which is not worth it.
+    has_already_signaled: AtomicBool,
+}
+
+impl<B> AlertOnEos<B> {
+    pub fn new(inner: B) -> (Self, EosSignaler) {
+        let signal = EosSignaler {
+            notifier: Arc::new(Notify::new()),
+        };
+        let this = Self {
+            inner,
+            signaler: signal.clone(),
+            has_already_signaled: AtomicBool::new(false),
+        };
+        (this, signal)
+    }
+}
+
+impl<B: Buf> Buf for AlertOnEos<B> {
+    fn remaining(&self) -> usize {
+        self.inner.remaining()
+    }
+
+    fn chunk(&self) -> &[u8] {
+        self.inner.chunk()
+    }
+
+    fn advance(&mut self, cnt: usize) {
+        self.inner.advance(cnt);
+        if !self.inner.has_remaining() && !self.has_already_signaled.swap(true, Ordering::AcqRel) {
+            self.signaler.notify_eos();
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+
+}
\ No newline at end of file
diff --git a/src/common/mod.rs b/src/common/mod.rs
index 52b99174..3b7fbe8c 100644
--- a/src/common/mod.rs
+++ b/src/common/mod.rs
@@ -10,6 +10,7 @@ macro_rules! ready {
 }
 
 pub(crate) use ready;
+pub mod buf;
 pub(crate) mod exec;
 pub(crate) mod never;
 

From 03f7ca1aee058b799d14b57709bfe91f03cb1443 Mon Sep 17 00:00:00 2001
From: Tomek Karwowski <to.karwowski@gmail.com>
Date: Fri, 19 Aug 2022 21:42:08 +0200
Subject: [PATCH 2/6] test(common): add basic test for AlertOnEos

---
 src/common/buf.rs | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)

diff --git a/src/common/buf.rs b/src/common/buf.rs
index 3afd4cdc..4819b340 100644
--- a/src/common/buf.rs
+++ b/src/common/buf.rs
@@ -62,5 +62,15 @@ impl<B: Buf> Buf for AlertOnEos<B> {
 
 #[cfg(test)]
 mod tests {
+    use crate::common::buf::AlertOnEos;
+    use hyper::body::Bytes;
+    use std::time::Duration;
 
-}
\ No newline at end of file
+    #[tokio::test]
+    async fn test_get_notified() {
+        let buf = Bytes::from_static(b"");
+        let (_buf, signaler) = AlertOnEos::new(buf);
+        let result = tokio::time::timeout(Duration::from_secs(1), signaler.wait_till_eos()).await;
+        assert_eq!(result, Ok(()));
+    }
+}

From e00f6f148e53ce383459753e82839a44bf306eb4 Mon Sep 17 00:00:00 2001
From: Tomek Karwowski <to.karwowski@gmail.com>
Date: Sat, 20 Aug 2022 19:19:06 +0200
Subject: [PATCH 3/6] fix(common): replace usage of Notify::notify_waiters with
 Notify::notify_one

---
 src/common/buf.rs | 59 ++++++++++++++++++++++++++++++-----------------
 1 file changed, 38 insertions(+), 21 deletions(-)

diff --git a/src/common/buf.rs b/src/common/buf.rs
index 4819b340..2c43eade 100644
--- a/src/common/buf.rs
+++ b/src/common/buf.rs
@@ -3,24 +3,28 @@ use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 use tokio::sync::Notify;
 
-#[derive(Clone)]
+/// Signaler returned as part of `NotifyOnEos::new` that can be polled to receive information,
+/// when the buffer gets advanced to the end.
+// Cannot be Clone due to usage of `Notify::notify_one` in `NotifyOnEos::advance`,
+// revisit once `Notify::notify_all` stabilizes.
 pub struct EosSignaler {
     notifier: Arc<Notify>,
 }
 
 impl EosSignaler {
-    fn notify_eos(&self) {
-        self.notifier.notify_waiters();
-    }
-
     pub async fn wait_till_eos(self) {
         self.notifier.notified().await;
     }
 }
 
-pub struct AlertOnEos<B> {
+/// Wrapper for `bytes::Buf` that returns a `EosSignaler` that can be polled to receive information,
+/// when the buffer gets advanced to the end.
+///
+/// NOTE: For the notification to work, caller must ensure that `Buf::advance` gets called
+/// enough times to advance to the end of the buffer (so that `Buf::has_remaining` afterwards returns `0`).
+pub struct NotifyOnEos<B> {
     inner: B,
-    signaler: EosSignaler,
+    notifier: Arc<Notify>,
     // It'd be better if we consumed the signaler, making it inaccessible after notification.
     // Unfortunately, that would require something like AtomicOption.
     // arc_swap::ArcSwapOption was tried, but it can only return an Arc, and the pointed-to value cannot be consumed.
@@ -29,21 +33,20 @@ pub struct AlertOnEos<B> {
     has_already_signaled: AtomicBool,
 }
 
-impl<B> AlertOnEos<B> {
+impl<B> NotifyOnEos<B> {
     pub fn new(inner: B) -> (Self, EosSignaler) {
-        let signal = EosSignaler {
-            notifier: Arc::new(Notify::new()),
-        };
+        let notifier = Arc::new(Notify::new());
         let this = Self {
             inner,
-            signaler: signal.clone(),
+            notifier: notifier.clone(),
             has_already_signaled: AtomicBool::new(false),
         };
+        let signal = EosSignaler { notifier };
         (this, signal)
     }
 }
 
-impl<B: Buf> Buf for AlertOnEos<B> {
+impl<B: Buf> Buf for NotifyOnEos<B> {
     fn remaining(&self) -> usize {
         self.inner.remaining()
     }
@@ -55,22 +58,36 @@ impl<B: Buf> Buf for AlertOnEos<B> {
     fn advance(&mut self, cnt: usize) {
         self.inner.advance(cnt);
         if !self.inner.has_remaining() && !self.has_already_signaled.swap(true, Ordering::AcqRel) {
-            self.signaler.notify_eos();
+            // tokio::sync::Notify has private method `notify_all` that, once stabilized,
+            // would allow us to make `EosSignaler` Cloneable with better ergonomics
+            // to await EOS from multiple places.
+            self.notifier.notify_one();
         }
     }
 }
 
 #[cfg(test)]
 mod tests {
-    use crate::common::buf::AlertOnEos;
-    use hyper::body::Bytes;
+    use crate::common::buf::NotifyOnEos;
+    use hyper::body::{Buf, Bytes};
     use std::time::Duration;
 
     #[tokio::test]
-    async fn test_get_notified() {
-        let buf = Bytes::from_static(b"");
-        let (_buf, signaler) = AlertOnEos::new(buf);
-        let result = tokio::time::timeout(Duration::from_secs(1), signaler.wait_till_eos()).await;
-        assert_eq!(result, Ok(()));
+    async fn test_get_notified_immediately() {
+        let buf = Bytes::from_static(b"abc");
+        let (mut buf, signaler) = NotifyOnEos::new(buf);
+        buf.advance(3);
+        signaler.wait_till_eos().await;
+    }
+
+    #[tokio::test]
+    async fn test_get_notified_after_1ms() {
+        let buf = Bytes::from_static(b"abc");
+        let (mut buf, signaler) = NotifyOnEos::new(buf);
+        tokio::spawn(async move {
+            tokio::time::sleep(Duration::from_millis(1)).await;
+            buf.advance(3);
+        });
+        signaler.wait_till_eos().await;
     }
 }

From 5cd09b5dce88d5f48417f99fc95a75bd653a0e74 Mon Sep 17 00:00:00 2001
From: Tomek Karwowski <to.karwowski@gmail.com>
Date: Sat, 20 Aug 2022 19:25:09 +0200
Subject: [PATCH 4/6] chore: disable miri for tests

---
 src/common/buf.rs | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/src/common/buf.rs b/src/common/buf.rs
index 2c43eade..673059e7 100644
--- a/src/common/buf.rs
+++ b/src/common/buf.rs
@@ -72,6 +72,7 @@ mod tests {
     use hyper::body::{Buf, Bytes};
     use std::time::Duration;
 
+    #[cfg(not(miri))]
     #[tokio::test]
     async fn test_get_notified_immediately() {
         let buf = Bytes::from_static(b"abc");
@@ -80,6 +81,7 @@ mod tests {
         signaler.wait_till_eos().await;
     }
 
+    #[cfg(not(miri))]
     #[tokio::test]
     async fn test_get_notified_after_1ms() {
         let buf = Bytes::from_static(b"abc");

From 129d90115da3d451fabae913e72d7d29177cb749 Mon Sep 17 00:00:00 2001
From: Tomek Karwowski <to.karwowski@gmail.com>
Date: Sat, 20 Aug 2022 19:33:27 +0200
Subject: [PATCH 5/6] style: make structs/methods linkable in doc-strings

---
 src/common/buf.rs | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/src/common/buf.rs b/src/common/buf.rs
index 673059e7..d7565e44 100644
--- a/src/common/buf.rs
+++ b/src/common/buf.rs
@@ -3,7 +3,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 use tokio::sync::Notify;
 
-/// Signaler returned as part of `NotifyOnEos::new` that can be polled to receive information,
+/// Signaler returned as part of [`NotifyOnEos::new`] that can be polled to receive information,
 /// when the buffer gets advanced to the end.
 // Cannot be Clone due to usage of `Notify::notify_one` in `NotifyOnEos::advance`,
 // revisit once `Notify::notify_all` stabilizes.
@@ -17,11 +17,11 @@ impl EosSignaler {
     }
 }
 
-/// Wrapper for `bytes::Buf` that returns a `EosSignaler` that can be polled to receive information,
+/// Wrapper for [`Buf`] that returns an [`EosSignaler`] that can be polled to receive information,
 /// when the buffer gets advanced to the end.
 ///
-/// NOTE: For the notification to work, caller must ensure that `Buf::advance` gets called
-/// enough times to advance to the end of the buffer (so that `Buf::has_remaining` afterwards returns `0`).
+/// NOTE: For the notification to work, caller must ensure that [`Buf::advance`] gets called
+/// enough times to advance to the end of the buffer (so that [`Buf::has_remaining`] afterwards returns `0`).
 pub struct NotifyOnEos<B> {
     inner: B,
     notifier: Arc<Notify>,

From 9150c0f6a28f18e179ac75fc0ab0ea2049770b65 Mon Sep 17 00:00:00 2001
From: Tomek Karwowski <to.karwowski@gmail.com>
Date: Tue, 23 Aug 2022 20:31:47 +0200
Subject: [PATCH 6/6] refactor(common): make notifier an Option instead of
 struct + boolean

We want to ensure that the notifier is only called once, when writing that earier I tried to force the notifier into an Arc<Option<...>>,
completely forgetting about the ownership-safe and easier Option<Arc<...>>.
---
 src/common/buf.rs | 26 +++++++++++---------------
 1 file changed, 11 insertions(+), 15 deletions(-)

diff --git a/src/common/buf.rs b/src/common/buf.rs
index d7565e44..59aaa5c6 100644
--- a/src/common/buf.rs
+++ b/src/common/buf.rs
@@ -1,5 +1,4 @@
 use hyper::body::Buf;
-use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 use tokio::sync::Notify;
 
@@ -24,13 +23,7 @@ impl EosSignaler {
 /// enough times to advance to the end of the buffer (so that [`Buf::has_remaining`] afterwards returns `0`).
 pub struct NotifyOnEos<B> {
     inner: B,
-    notifier: Arc<Notify>,
-    // It'd be better if we consumed the signaler, making it inaccessible after notification.
-    // Unfortunately, that would require something like AtomicOption.
-    // arc_swap::ArcSwapOption was tried, but it can only return an Arc, and the pointed-to value cannot be consumed.
-    // One could write an AtomicOption type (like this https://docs.rs/atomic-option/0.1.2/atomic_option/),
-    // but it requires both unsafe and heap allocation, which is not worth it.
-    has_already_signaled: AtomicBool,
+    notifier: Option<Arc<Notify>>,
 }
 
 impl<B> NotifyOnEos<B> {
@@ -38,8 +31,7 @@ impl<B> NotifyOnEos<B> {
         let notifier = Arc::new(Notify::new());
         let this = Self {
             inner,
-            notifier: notifier.clone(),
-            has_already_signaled: AtomicBool::new(false),
+            notifier: Some(notifier.clone()),
         };
         let signal = EosSignaler { notifier };
         (this, signal)
@@ -57,11 +49,14 @@ impl<B: Buf> Buf for NotifyOnEos<B> {
 
     fn advance(&mut self, cnt: usize) {
         self.inner.advance(cnt);
-        if !self.inner.has_remaining() && !self.has_already_signaled.swap(true, Ordering::AcqRel) {
-            // tokio::sync::Notify has private method `notify_all` that, once stabilized,
-            // would allow us to make `EosSignaler` Cloneable with better ergonomics
-            // to await EOS from multiple places.
-            self.notifier.notify_one();
+        if !self.inner.has_remaining() {
+            // consume the notifier to ensure we only notify once
+            if let Some(notifier) = self.notifier.take() {
+                // tokio::sync::Notify has private method `notify_all` that, once stabilized,
+                // would allow us to make `EosSignaler` Cloneable with better ergonomics
+                // to await EOS from multiple places.
+                notifier.notify_one();
+            }
         }
     }
 }
@@ -83,6 +78,7 @@ mod tests {
 
     #[cfg(not(miri))]
     #[tokio::test]
+    /// Test against the foot-gun of using [`tokio::sync::Notify::notify_waiters`] instead of `notify_one`.
     async fn test_get_notified_after_1ms() {
         let buf = Bytes::from_static(b"abc");
         let (mut buf, signaler) = NotifyOnEos::new(buf);