Skip to content

Commit

Permalink
internal_send function
Browse files Browse the repository at this point in the history
  • Loading branch information
ssrlive committed Sep 13, 2024
1 parent 2538684 commit b845b7b
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions src/async_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ impl AsyncSession {
}

pub async fn send(&self, buf: &[u8]) -> std::io::Result<usize> {
self.internal_send(buf)
}

fn internal_send(&self, buf: &[u8]) -> std::io::Result<usize> {
let packet = self.session.allocate_send_packet(buf.len() as _)?;
packet.bytes.copy_from_slice(buf);
self.session.send_packet(packet);
Expand All @@ -94,11 +98,15 @@ impl AsyncSession {

impl AsyncRead for AsyncSession {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
use std::io::{Error, ErrorKind::Other};
loop {
match &mut self.read_state {
ReadState::Idle => match self.session.try_receive() {
Ok(Some(packet)) => {
let size = packet.bytes.len().min(buf.len());
let size = packet.bytes.len();
if buf.len() < size {
return Poll::Ready(Err(Error::new(Other, "Buffer too small")));
}
buf[..size].copy_from_slice(&packet.bytes[..size]);
return Poll::Ready(Ok(size));
}
Expand All @@ -122,8 +130,7 @@ impl AsyncRead for AsyncSession {
Ok(guard) => guard,
Err(e) => {
self.read_state = ReadState::Waiting(Some(task));
use std::io::{Error, ErrorKind};
return Poll::Ready(Err(Error::new(ErrorKind::Other, format!("Lock task failed: {}", e))));
return Poll::Ready(Err(Error::new(Other, format!("Lock task failed: {}", e))));
}
};
self.read_state = match Pin::new(&mut *task_guard).poll(cx) {
Expand All @@ -143,10 +150,7 @@ impl AsyncRead for AsyncSession {

impl AsyncWrite for AsyncSession {
fn poll_write(self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8]) -> Poll<std::io::Result<usize>> {
let packet = self.session.allocate_send_packet(buf.len() as _)?;
packet.bytes.copy_from_slice(buf);
self.session.send_packet(packet);
Poll::Ready(Ok(buf.len()))
Poll::Ready(Ok(self.internal_send(buf)?))
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Expand Down

0 comments on commit b845b7b

Please sign in to comment.