From 1d9c9239d6da662609834f5cf5b64cd9b2144c2c Mon Sep 17 00:00:00 2001 From: Congyuwang Date: Fri, 28 Jul 2023 06:10:40 +0800 Subject: [PATCH 01/14] update ringbuf to rc version --- Cargo.toml | 2 +- src/msg_sender.rs | 43 +++++++++++----------- src/write.rs | 92 +++++++++++++++-------------------------------- 3 files changed, 50 insertions(+), 87 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3124c1f..4ecea6c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" crate-type = ["staticlib"] [dependencies] -async-ringbuf = { version="0.1.3", features = ["tokio", "impl-tokio"] } +async-ringbuf = { git = "https://github.com/Congyuwang/ringbuf", branch = "master" } dashmap = "5.4.0" futures = { version = "0.3.28", default-features = false, features = ["async-await"] } libc = "0.2.146" diff --git a/src/msg_sender.rs b/src/msg_sender.rs index c243806..45e02b3 100644 --- a/src/msg_sender.rs +++ b/src/msg_sender.rs @@ -1,8 +1,10 @@ -use async_ringbuf::{AsyncHeapConsumer, AsyncHeapProducer, AsyncHeapRb}; -use std::future::Future; -use std::pin::pin; -use std::task::Poll::Ready; -use std::task::{ready, Context, Poll, Waker}; +use async_ringbuf::halves::{AsyncCons, AsyncProd}; +use async_ringbuf::producer::AsyncProducer; +use async_ringbuf::traits::{AsyncObserver, Observer, Producer, Split}; +use async_ringbuf::AsyncHeapRb; +use std::sync::Arc; +use std::task::Poll::{Pending, Ready}; +use std::task::{Poll, Waker}; use tokio::runtime::Handle; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; @@ -14,6 +16,9 @@ pub(crate) enum SendCommand { Flush, } +pub type AsyncHeapProducer = AsyncProd>>; +pub type AsyncHeapConsumer = AsyncCons>>; + pub(crate) fn make_sender(handle: Handle) -> (MsgSender, MsgRcv) { let (cmd, cmd_recv) = unbounded_channel::(); let (rings_prd, rings) = unbounded_channel::>(); @@ -55,7 +60,7 @@ fn burst_write( bytes: &[u8], ) -> BurstWriteState { loop { - let n = buf.as_mut_base().push_slice(&bytes[*offset..]); + let n = buf.push_slice(&bytes[*offset..]); if n == 0 { // no bytes read, return break BurstWriteState::Pending; @@ -84,7 +89,7 @@ impl MsgSender { // unfinished, enter into future self.handle.clone().block_on(async { loop { - self.ring_buf.wait_free(1).await; + self.ring_buf.wait_vacant(1).await; // check if closed if self.ring_buf.is_closed() { break Err(std::io::Error::new( @@ -119,7 +124,7 @@ impl MsgSender { // allocate new ring buffer if unable to write the entire message. let new_buf_size = RING_BUFFER_SIZE.max(bytes.len() - offset); let (mut ring_buf, ring) = AsyncHeapRb::::new(new_buf_size).split(); - ring_buf.as_mut_base().push_slice(&bytes[offset..]); + ring_buf.push_slice(&bytes[offset..]); self.rings_prd.send(ring).map_err(|e| { std::io::Error::new( std::io::ErrorKind::WriteZero, @@ -143,23 +148,15 @@ impl MsgSender { // attempt to write as much as possible burst_write(&mut offset, &mut self.ring_buf, bytes); if offset > 0 { - return Ready(Ok(offset)); + break Ready(Ok(offset)); } - ready!(pin!(self.ring_buf.wait_free(1)).poll(&mut Context::from_waker(&waker))); - // if wait_free returns ready, try again. - if self.ring_buf.is_closed() { - return Ready(Err(std::io::Error::new( - std::io::ErrorKind::WriteZero, - "connection closed", - ))); + // offset = 0, prepare to wait + self.ring_buf.register_read_waker(&waker); + // check again that the ring_buf is not empty + // to prevent deadlock + if !self.ring_buf.is_empty() { + break Pending; } - // New data is available, must try to consume again, - // otherwise, we risk to block forever. - // i.e., There is a tiny chance that the ring buffer - // becomes full *before the waker is registered*, - // and no notification will be received by us. - // However, the `poll` will have returned OK(()), - // indicating that we should try to consume again. } } diff --git a/src/write.rs b/src/write.rs index 3a63357..bf04b94 100644 --- a/src/write.rs +++ b/src/write.rs @@ -1,6 +1,8 @@ use crate::conn::ConnConfig; use crate::msg_sender::MsgRcv; -use async_ringbuf::AsyncHeapConsumer; +use crate::read::MIN_MSG_BUFFER_SIZE; +use crate::AsyncHeapConsumer; +use async_ringbuf::traits::{AsyncConsumer, AsyncObserver, Consumer, Observer}; use std::time::Duration; use tokio::io::AsyncWriteExt; use tokio::net::tcp::OwnedWriteHalf; @@ -29,8 +31,6 @@ async fn handle_writer_auto_flush( mut stop: oneshot::Sender<()>, ) -> std::io::Result<()> { debug_assert!(!duration.is_zero()); - let send_buf_size = socket2::SockRef::from(write.as_ref()).send_buffer_size()?; - tracing::trace!("send buffer size: {}", send_buf_size); let mut flush_tick = tokio::time::interval(duration); flush_tick.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -45,45 +45,41 @@ async fn handle_writer_auto_flush( Some(ring) => ring, None => break 'close, }; - let chunk_size = send_buf_size.min(ring.capacity()); let mut has_data = true; 'ring: loop { tokio::select! { biased; - // buf threshold - _ = ring.wait(chunk_size) => { + // !has_data => wait for has_data + // has_data => wait for write_threshold + _ = ring.wait_occupied(if !has_data {1} else {MIN_MSG_BUFFER_SIZE}) => { if ring.is_closed() { break 'ring; } - flush(&mut ring, &mut write, chunk_size).await?; - has_data = false; + has_data = true; + if ring.occupied_len() >= MIN_MSG_BUFFER_SIZE { + flush(&mut ring, &mut write).await?; + has_data = false + } } // flush cmd = recv.cmd_recv.recv() => { // always flush, including if sender is dropped - flush(&mut ring, &mut write, chunk_size).await?; + flush(&mut ring, &mut write).await?; if cmd.is_none() { break 'close; } has_data = false; } - _ = ring.wait(1), if !has_data => { - if ring.is_closed() { - break 'ring; - } - // got data, no writing, enable ticking - has_data = true; - } // tick flush _ = flush_tick.tick(), if has_data => { - flush(&mut ring, &mut write, chunk_size).await?; + flush(&mut ring, &mut write).await?; has_data = false; } _ = stop.closed() => break 'close, } } // always clear the old ring_buf before reading the next - flush(&mut ring, &mut write, chunk_size).await?; + flush(&mut ring, &mut write).await?; } tracing::debug!("connection stopped"); write.shutdown().await?; @@ -95,9 +91,6 @@ async fn handle_writer_no_auto_flush( mut recv: MsgRcv, mut stop: oneshot::Sender<()>, ) -> std::io::Result<()> { - let send_buf_size = socket2::SockRef::from(write.as_ref()).send_buffer_size()?; - tracing::trace!("send buffer size: {}", send_buf_size); - 'close: loop { // obtain a ring buffer let ring = tokio::select! { @@ -109,21 +102,20 @@ async fn handle_writer_no_auto_flush( Some(ring) => ring, None => break 'close, }; - let chunk_size = send_buf_size.min(ring.capacity()); 'ring: loop { tokio::select! { biased; // buf threshold - _ = ring.wait(chunk_size) => { + _ = ring.wait_occupied(MIN_MSG_BUFFER_SIZE) => { if ring.is_closed() { break 'ring; } - flush(&mut ring, &mut write, chunk_size).await?; + flush(&mut ring, &mut write).await?; } // flush cmd = recv.cmd_recv.recv() => { // always flush, including if sender is dropped - flush(&mut ring, &mut write, chunk_size).await?; + flush(&mut ring, &mut write).await?; if cmd.is_none() { break 'close; } @@ -132,7 +124,7 @@ async fn handle_writer_no_auto_flush( } } // always clear the old ring_buf before reading the next - flush(&mut ring, &mut write, chunk_size).await?; + flush(&mut ring, &mut write).await?; } tracing::debug!("connection stopped"); write.shutdown().await?; @@ -144,43 +136,17 @@ async fn handle_writer_no_auto_flush( async fn flush( ring_buf: &mut AsyncHeapConsumer, write: &mut OwnedWriteHalf, - chunk_size: usize, -) -> std::io::Result { - let mut n = 0; +) -> std::io::Result<()> { loop { - let written = write_chunk(ring_buf, write, chunk_size).await?; - if written == 0 { - break; - } - n += written; - } - Ok(n) -} - -#[inline] -async fn write_chunk( - ring_buf: &mut AsyncHeapConsumer, - write: &mut OwnedWriteHalf, - chunk_size: usize, -) -> std::io::Result { - debug_assert!(chunk_size > 0); - let (left, right) = ring_buf.as_base().as_slices(); - - // precompute all lengths to reduce cpu branching - let left_written = left.len().min(chunk_size); - let remaining = chunk_size - left_written; - let right_written = right.len().min(remaining); - let total = left_written + right_written; - - // execute write - if left_written > 0 { - write.write_all(&left[..left_written]).await?; - } - if right_written > 0 { - write.write_all(&right[..right_written]).await?; + let (left, right) = ring_buf.as_slices(); + let count = if !left.is_empty() { + write.write(left).await? + } else if !right.is_empty() { + write.write(right).await? + } else { + // both empty, break + return Ok(()); + }; + unsafe { ring_buf.advance_read_index(count) }; } - - // update ring_buf - unsafe { ring_buf.as_mut_base().advance(total) }; - Ok(total) } From ad273f2ed9dca6685f5419df3b73b397a1b340ca Mon Sep 17 00:00:00 2001 From: Congyuwang Date: Fri, 28 Jul 2023 06:27:32 +0800 Subject: [PATCH 02/14] only need `left` in flush --- src/write.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/write.rs b/src/write.rs index bf04b94..af7a4d7 100644 --- a/src/write.rs +++ b/src/write.rs @@ -138,15 +138,13 @@ async fn flush( write: &mut OwnedWriteHalf, ) -> std::io::Result<()> { loop { - let (left, right) = ring_buf.as_slices(); - let count = if !left.is_empty() { - write.write(left).await? - } else if !right.is_empty() { - write.write(right).await? + let (left, _) = ring_buf.as_slices(); + if !left.is_empty() { + let count = write.write(left).await?; + unsafe { ring_buf.advance_read_index(count) }; } else { // both empty, break return Ok(()); }; - unsafe { ring_buf.advance_read_index(count) }; } } From 07edca88bd6442aceefab5dd05303b2f4d93e0d1 Mon Sep 17 00:00:00 2001 From: Congyuwang Date: Fri, 28 Jul 2023 07:17:29 +0800 Subject: [PATCH 03/14] direct allocation for large message --- Cargo.toml | 1 + src/msg_sender.rs | 27 +++++++++++++++++++-------- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4ecea6c..e0e1802 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ crate-type = ["staticlib"] [dependencies] async-ringbuf = { git = "https://github.com/Congyuwang/ringbuf", branch = "master" } +ringbuf = { git = "https://github.com/Congyuwang/ringbuf", branch = "master" } dashmap = "5.4.0" futures = { version = "0.3.28", default-features = false, features = ["async-await"] } libc = "0.2.146" diff --git a/src/msg_sender.rs b/src/msg_sender.rs index 45e02b3..da1dc19 100644 --- a/src/msg_sender.rs +++ b/src/msg_sender.rs @@ -2,6 +2,7 @@ use async_ringbuf::halves::{AsyncCons, AsyncProd}; use async_ringbuf::producer::AsyncProducer; use async_ringbuf::traits::{AsyncObserver, Observer, Producer, Split}; use async_ringbuf::AsyncHeapRb; +use ringbuf::SharedRb; use std::sync::Arc; use std::task::Poll::{Pending, Ready}; use std::task::{Poll, Waker}; @@ -113,18 +114,28 @@ impl MsgSender { /// It caches all received bytes in memory /// (efficiently using a chain of ring buffers). pub fn send_nonblock(&mut self, bytes: &[u8]) -> std::io::Result<()> { - if bytes.is_empty() { + let msg_size = bytes.len(); + if msg_size == 0 { return Ok(()); } let mut offset = 0usize; - // attempt to write the entire message without new allocation - if let BurstWriteState::Finished = burst_write(&mut offset, &mut self.ring_buf, bytes) { - return Ok(()); + if msg_size <= RING_BUFFER_SIZE { + // attempt direct write for small message + if let BurstWriteState::Finished = burst_write(&mut offset, &mut self.ring_buf, bytes) { + return Ok(()); + } } - // allocate new ring buffer if unable to write the entire message. - let new_buf_size = RING_BUFFER_SIZE.max(bytes.len() - offset); - let (mut ring_buf, ring) = AsyncHeapRb::::new(new_buf_size).split(); - ring_buf.push_slice(&bytes[offset..]); + let (ring_buf, ring) = if msg_size <= RING_BUFFER_SIZE { + let mut rb = AsyncHeapRb::::new(RING_BUFFER_SIZE); + rb.push_slice(&bytes[offset..]); + rb + } else { + // direct allocation for large message + AsyncHeapRb::::from(unsafe { + SharedRb::from_raw_parts(std::mem::transmute(bytes.to_vec()), 0, msg_size) + }) + }.split(); + // send new allocation self.rings_prd.send(ring).map_err(|e| { std::io::Error::new( std::io::ErrorKind::WriteZero, From 21ea99322589954ac4c04659ea4dd3b443568765 Mon Sep 17 00:00:00 2001 From: Congyuwang Date: Fri, 28 Jul 2023 08:37:41 +0800 Subject: [PATCH 04/14] revert back send_nonblock --- Cargo.toml | 1 - src/msg_sender.rs | 27 ++++++++------------------- 2 files changed, 8 insertions(+), 20 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e0e1802..4ecea6c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,6 @@ crate-type = ["staticlib"] [dependencies] async-ringbuf = { git = "https://github.com/Congyuwang/ringbuf", branch = "master" } -ringbuf = { git = "https://github.com/Congyuwang/ringbuf", branch = "master" } dashmap = "5.4.0" futures = { version = "0.3.28", default-features = false, features = ["async-await"] } libc = "0.2.146" diff --git a/src/msg_sender.rs b/src/msg_sender.rs index da1dc19..45e02b3 100644 --- a/src/msg_sender.rs +++ b/src/msg_sender.rs @@ -2,7 +2,6 @@ use async_ringbuf::halves::{AsyncCons, AsyncProd}; use async_ringbuf::producer::AsyncProducer; use async_ringbuf::traits::{AsyncObserver, Observer, Producer, Split}; use async_ringbuf::AsyncHeapRb; -use ringbuf::SharedRb; use std::sync::Arc; use std::task::Poll::{Pending, Ready}; use std::task::{Poll, Waker}; @@ -114,28 +113,18 @@ impl MsgSender { /// It caches all received bytes in memory /// (efficiently using a chain of ring buffers). pub fn send_nonblock(&mut self, bytes: &[u8]) -> std::io::Result<()> { - let msg_size = bytes.len(); - if msg_size == 0 { + if bytes.is_empty() { return Ok(()); } let mut offset = 0usize; - if msg_size <= RING_BUFFER_SIZE { - // attempt direct write for small message - if let BurstWriteState::Finished = burst_write(&mut offset, &mut self.ring_buf, bytes) { - return Ok(()); - } + // attempt to write the entire message without new allocation + if let BurstWriteState::Finished = burst_write(&mut offset, &mut self.ring_buf, bytes) { + return Ok(()); } - let (ring_buf, ring) = if msg_size <= RING_BUFFER_SIZE { - let mut rb = AsyncHeapRb::::new(RING_BUFFER_SIZE); - rb.push_slice(&bytes[offset..]); - rb - } else { - // direct allocation for large message - AsyncHeapRb::::from(unsafe { - SharedRb::from_raw_parts(std::mem::transmute(bytes.to_vec()), 0, msg_size) - }) - }.split(); - // send new allocation + // allocate new ring buffer if unable to write the entire message. + let new_buf_size = RING_BUFFER_SIZE.max(bytes.len() - offset); + let (mut ring_buf, ring) = AsyncHeapRb::::new(new_buf_size).split(); + ring_buf.push_slice(&bytes[offset..]); self.rings_prd.send(ring).map_err(|e| { std::io::Error::new( std::io::ErrorKind::WriteZero, From 811f7d362136a217e957f61a804eb664c459b151 Mon Sep 17 00:00:00 2001 From: Congyuwang Date: Fri, 28 Jul 2023 09:11:46 +0800 Subject: [PATCH 05/14] use a larger ring buffer size for send_nonblock --- src/msg_sender.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/msg_sender.rs b/src/msg_sender.rs index 45e02b3..90d118b 100644 --- a/src/msg_sender.rs +++ b/src/msg_sender.rs @@ -113,6 +113,7 @@ impl MsgSender { /// It caches all received bytes in memory /// (efficiently using a chain of ring buffers). pub fn send_nonblock(&mut self, bytes: &[u8]) -> std::io::Result<()> { + const NONBLOCK_RING_BUFFER_SIZE: usize = 1024 * 1024; if bytes.is_empty() { return Ok(()); } @@ -122,7 +123,7 @@ impl MsgSender { return Ok(()); } // allocate new ring buffer if unable to write the entire message. - let new_buf_size = RING_BUFFER_SIZE.max(bytes.len() - offset); + let new_buf_size = NONBLOCK_RING_BUFFER_SIZE.max(bytes.len() - offset); let (mut ring_buf, ring) = AsyncHeapRb::::new(new_buf_size).split(); ring_buf.push_slice(&bytes[offset..]); self.rings_prd.send(ring).map_err(|e| { From f11f39ef049e9a9416b855001c04521dd6cfceed Mon Sep 17 00:00:00 2001 From: Congyuwang Date: Fri, 28 Jul 2023 09:20:54 +0800 Subject: [PATCH 06/14] bump version to v0.3.1 --- CMakeLists.txt | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 23799b9..7900e6d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,7 +5,7 @@ set(CMAKE_CXX_STANDARD 17) set(CMAKE_TOOLCHAIN_FILE ${CMAKE_SOURCE_DIR}/toolchain.cmake) # define project -project(socket_manager LANGUAGES C CXX VERSION 0.3.0) +project(socket_manager LANGUAGES C CXX VERSION 0.3.1) # set default build type as shared option(BUILD_SHARED_LIBS "Build using shared libraries" ON) diff --git a/Cargo.toml b/Cargo.toml index 4ecea6c..e5b215b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tokio-socket-manager" -version = "0.1.0" +version = "0.3.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html From c931250fd89ef24326750565370307b1a8637d59 Mon Sep 17 00:00:00 2001 From: Congyuwang Date: Fri, 28 Jul 2023 09:43:46 +0800 Subject: [PATCH 07/14] must manually close old ring_buf when new --- src/msg_sender.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/msg_sender.rs b/src/msg_sender.rs index 90d118b..66ab68e 100644 --- a/src/msg_sender.rs +++ b/src/msg_sender.rs @@ -5,6 +5,7 @@ use async_ringbuf::AsyncHeapRb; use std::sync::Arc; use std::task::Poll::{Pending, Ready}; use std::task::{Poll, Waker}; +use futures::SinkExt; use tokio::runtime::Handle; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; @@ -132,6 +133,9 @@ impl MsgSender { format!("connection closed: {e}"), ) })?; + // must manually close old ring_buf + // since it is not implemented in drop + self.ring_buf.close(); // set head to new ring_buf self.ring_buf = ring_buf; Ok(()) @@ -170,3 +174,10 @@ impl MsgSender { }) } } + +impl Drop for MsgSender { + fn drop(&mut self) { + // close ring_buf on drop + self.ring_buf.close(); + } +} From 76901ff2ce0bc3a1a2d61aa9b7d80f2aa30e70ad Mon Sep 17 00:00:00 2001 From: Congyuwang Date: Fri, 28 Jul 2023 09:43:46 +0800 Subject: [PATCH 08/14] must manually close old ring_buf when new --- src/msg_sender.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/msg_sender.rs b/src/msg_sender.rs index 90d118b..2c42992 100644 --- a/src/msg_sender.rs +++ b/src/msg_sender.rs @@ -132,6 +132,9 @@ impl MsgSender { format!("connection closed: {e}"), ) })?; + // must manually close old ring_buf + // since it is not implemented in drop + self.ring_buf.close(); // set head to new ring_buf self.ring_buf = ring_buf; Ok(()) @@ -170,3 +173,10 @@ impl MsgSender { }) } } + +impl Drop for MsgSender { + fn drop(&mut self) { + // close ring_buf on drop + self.ring_buf.close(); + } +} From 4558300a2d07cf1d1bd5ddae28d919f504eb4595 Mon Sep 17 00:00:00 2001 From: Congyuwang Date: Sat, 29 Jul 2023 10:55:24 +0800 Subject: [PATCH 09/14] Revert "use a larger ring buffer size for send_nonblock" This reverts commit 811f7d362136a217e957f61a804eb664c459b151. --- src/msg_sender.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/msg_sender.rs b/src/msg_sender.rs index 66ab68e..e9dc49a 100644 --- a/src/msg_sender.rs +++ b/src/msg_sender.rs @@ -114,7 +114,6 @@ impl MsgSender { /// It caches all received bytes in memory /// (efficiently using a chain of ring buffers). pub fn send_nonblock(&mut self, bytes: &[u8]) -> std::io::Result<()> { - const NONBLOCK_RING_BUFFER_SIZE: usize = 1024 * 1024; if bytes.is_empty() { return Ok(()); } @@ -124,7 +123,7 @@ impl MsgSender { return Ok(()); } // allocate new ring buffer if unable to write the entire message. - let new_buf_size = NONBLOCK_RING_BUFFER_SIZE.max(bytes.len() - offset); + let new_buf_size = RING_BUFFER_SIZE.max(bytes.len() - offset); let (mut ring_buf, ring) = AsyncHeapRb::::new(new_buf_size).split(); ring_buf.push_slice(&bytes[offset..]); self.rings_prd.send(ring).map_err(|e| { From 16cf570336554d4c9d51ed18b741a1d20348b521 Mon Sep 17 00:00:00 2001 From: Congyuwang Date: Sat, 29 Jul 2023 11:45:56 +0800 Subject: [PATCH 10/14] merge from master --- src/msg_sender.rs | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/src/msg_sender.rs b/src/msg_sender.rs index e9dc49a..65563aa 100644 --- a/src/msg_sender.rs +++ b/src/msg_sender.rs @@ -5,7 +5,6 @@ use async_ringbuf::AsyncHeapRb; use std::sync::Arc; use std::task::Poll::{Pending, Ready}; use std::task::{Poll, Waker}; -use futures::SinkExt; use tokio::runtime::Handle; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; @@ -132,9 +131,6 @@ impl MsgSender { format!("connection closed: {e}"), ) })?; - // must manually close old ring_buf - // since it is not implemented in drop - self.ring_buf.close(); // set head to new ring_buf self.ring_buf = ring_buf; Ok(()) @@ -156,9 +152,13 @@ impl MsgSender { } // offset = 0, prepare to wait self.ring_buf.register_read_waker(&waker); - // check again that the ring_buf is not empty - // to prevent deadlock - if !self.ring_buf.is_empty() { + // check the pending state ensues. + if self.ring_buf.is_closed() { + break Ready(Err(std::io::Error::new( + std::io::ErrorKind::Other, + "connection closed", + ))); + } else if self.ring_buf.is_full() { break Pending; } } @@ -173,10 +173,3 @@ impl MsgSender { }) } } - -impl Drop for MsgSender { - fn drop(&mut self) { - // close ring_buf on drop - self.ring_buf.close(); - } -} From bda848f0e3da14163b432aa0d7e89ac2059a5977 Mon Sep 17 00:00:00 2001 From: Congyuwang Date: Sat, 29 Jul 2023 12:53:12 +0800 Subject: [PATCH 11/14] Bump version to v0.3.2 --- CMakeLists.txt | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 7900e6d..993ef1a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,7 +5,7 @@ set(CMAKE_CXX_STANDARD 17) set(CMAKE_TOOLCHAIN_FILE ${CMAKE_SOURCE_DIR}/toolchain.cmake) # define project -project(socket_manager LANGUAGES C CXX VERSION 0.3.1) +project(socket_manager LANGUAGES C CXX VERSION 0.3.2) # set default build type as shared option(BUILD_SHARED_LIBS "Build using shared libraries" ON) diff --git a/Cargo.toml b/Cargo.toml index e5b215b..e249920 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tokio-socket-manager" -version = "0.3.1" +version = "0.3.2" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html From bcc111e4e75a41cb34c4eb6eba296b551b9f04ad Mon Sep 17 00:00:00 2001 From: Congyuwang Date: Sun, 30 Jul 2023 08:37:09 +0800 Subject: [PATCH 12/14] use main branch of async-ringbuf private fork --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index e249920..8f09895 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" crate-type = ["staticlib"] [dependencies] -async-ringbuf = { git = "https://github.com/Congyuwang/ringbuf", branch = "master" } +async-ringbuf = { git = "https://github.com/Congyuwang/ringbuf", branch = "main" } dashmap = "5.4.0" futures = { version = "0.3.28", default-features = false, features = ["async-await"] } libc = "0.2.146" From 53f97f730a1123cf6fec7df4138e42ae77f0921c Mon Sep 17 00:00:00 2001 From: Congyuwang Date: Sun, 30 Jul 2023 10:02:40 +0800 Subject: [PATCH 13/14] simplify send_async retry logic --- src/msg_sender.rs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/msg_sender.rs b/src/msg_sender.rs index 19575a3..41a03f1 100644 --- a/src/msg_sender.rs +++ b/src/msg_sender.rs @@ -1,6 +1,5 @@ use async_ringbuf::halves::{AsyncCons, AsyncProd}; -use async_ringbuf::producer::AsyncProducer; -use async_ringbuf::traits::{AsyncObserver, Observer, Producer, Split}; +use async_ringbuf::traits::{AsyncObserver, AsyncProducer, Producer, Split}; use async_ringbuf::AsyncHeapRb; use std::sync::Arc; use std::task::Poll::{Pending, Ready}; @@ -143,23 +142,28 @@ impl MsgSender { return Ready(Ok(0)); } let mut offset = 0usize; + let mut waker_registered = false; loop { + // check if closed + if self.ring_buf.is_closed() { + break Ready(Err(std::io::Error::new( + std::io::ErrorKind::Other, + "connection closed", + ))); + } // attempt to write as much as possible burst_write(&mut offset, &mut self.ring_buf, bytes); if offset > 0 { break Ready(Ok(offset)); } // offset = 0, prepare to wait - self.ring_buf.register_read_waker(&waker); - // check the pending state ensues. - if self.ring_buf.is_closed() { - break Ready(Err(std::io::Error::new( - std::io::ErrorKind::Other, - "connection closed", - ))); - } else if self.ring_buf.is_full() { + if waker_registered { break Pending; } + // register waker + self.ring_buf.register_read_waker(&waker); + waker_registered = true; + // try again to ensure no missing wake } } From 702582019c7bc6d2644f6134c143080ff0d4c4e1 Mon Sep 17 00:00:00 2001 From: Congyuwang Date: Sun, 30 Jul 2023 19:15:05 +0800 Subject: [PATCH 14/14] update async-ringbuf to v0.2.0-rc.1 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 930f780..e6f2691 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" crate-type = ["staticlib"] [dependencies] -async-ringbuf = { git = "https://github.com/Congyuwang/ringbuf", branch = "main" } +async-ringbuf = "0.2.0-rc.1" dashmap = { version = "5.4.0", features = ["inline"] } libc = "0.2.146" socket2 = "0.5.3"