-
-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor to use new ResourceId from reactor crate #36
Changes from 1 commit
0fc9bb5
19ea32e
f86a594
b20dd94
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,7 +41,7 @@ use std::time::Duration; | |
use std::{fmt, io, net}; | ||
|
||
use reactor::poller::IoType; | ||
use reactor::{Io, Resource, WriteAtomic, WriteError}; | ||
use reactor::{Io, Resource, ResourceId, WriteAtomic, WriteError}; | ||
|
||
use crate::{Direction, NetConnection, NetListener, NetSession, READ_BUFFER_SIZE}; | ||
|
||
|
@@ -151,11 +151,8 @@ impl<L: NetListener<Stream = S::Connection>, S: NetSession> NetAccept<S, L> { | |
impl<L: NetListener<Stream = S::Connection>, S: NetSession> Resource for NetAccept<S, L> | ||
where S: Send | ||
{ | ||
type Id = net::SocketAddr; | ||
type Event = ListenerEvent<S>; | ||
|
||
fn id(&self) -> Self::Id { self.listener.local_addr() } | ||
|
||
fn interests(&self) -> IoType { IoType::read_only() } | ||
|
||
fn handle_io(&mut self, io: Io) -> Option<Self::Event> { | ||
|
@@ -202,6 +199,11 @@ pub enum TransportState { | |
Terminated, | ||
} | ||
|
||
/// Error indicating that method [`NetTransport::set_resource_id`] was called more than once. | ||
#[derive(Copy, Clone, Eq, PartialEq, Debug, Display, Error)] | ||
#[display("an attempt to re-assign resource id to {new} for net transport {current}.")] | ||
pub struct ResIdReassigned { current: ResourceId, new: ResourceId } | ||
|
||
/// Net transport is an adaptor around specific [`NetSession`] (implementing | ||
/// session management, including optional handshake, encoding etc) to be used | ||
/// as a transport resource in a [`reactor::Reactor`]. | ||
|
@@ -213,6 +215,8 @@ pub struct NetTransport<S: NetSession> { | |
write_intent: bool, | ||
read_buffer: Box<[u8; HEAP_BUFFER_SIZE]>, | ||
write_buffer: VecDeque<u8>, | ||
/// Resource id assigned by the reactor | ||
id: Option<ResourceId> | ||
} | ||
|
||
impl<S: NetSession> Display for NetTransport<S> { | ||
|
@@ -256,6 +260,7 @@ impl<S: NetSession> NetTransport<S> { | |
write_intent: true, | ||
read_buffer: Box::new([0u8; READ_BUFFER_SIZE]), | ||
write_buffer: empty!(), | ||
id: None, | ||
}) | ||
} | ||
|
||
|
@@ -295,9 +300,29 @@ impl<S: NetSession> NetTransport<S> { | |
write_intent: false, | ||
read_buffer: Box::new([0u8; READ_BUFFER_SIZE]), | ||
write_buffer: empty!(), | ||
id: None, | ||
}) | ||
} | ||
|
||
pub fn display(&self) -> impl Display { | ||
match self.id { | ||
None => self.session.display(), | ||
Some(id) => id.to_string() | ||
} | ||
} | ||
|
||
pub fn resource_id(&self) -> Option<ResourceId> { | ||
self.id | ||
} | ||
|
||
pub fn set_resource_id(&mut self, id: ResourceId) -> Result<(), ResIdReassigned> { | ||
if let Some(current) = self.id { | ||
return Err(ResIdReassigned { current, new: id }) | ||
} | ||
self.id = Some(id); | ||
Ok(()) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where is this used? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the logging above There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean inside the new There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean |
||
|
||
pub fn state(&self) -> TransportState { self.state } | ||
pub fn is_active(&self) -> bool { self.state == TransportState::Active } | ||
|
||
|
@@ -345,7 +370,7 @@ impl<S: NetSession> NetTransport<S> { | |
.contains(&err.kind()) => | ||
{ | ||
#[cfg(feature = "log")] | ||
log::warn!(target: "transport", "Resource {} was not able to consume any data even though it has announced its write readiness", self.id()); | ||
log::warn!(target: "transport", "Resource {} was not able to consume any data even though it has announced its write readiness", self.display()); | ||
self.write_intent = true; | ||
None | ||
} | ||
|
@@ -379,7 +404,7 @@ impl<S: NetSession> NetTransport<S> { | |
fn flush_buffer(&mut self) -> io::Result<()> { | ||
let orig_len = self.write_buffer.len(); | ||
#[cfg(feature = "log")] | ||
log::trace!(target: "transport", "Resource {} is flushing its buffer of {orig_len} bytes", self.id()); | ||
log::trace!(target: "transport", "Resource {} is flushing its buffer of {orig_len} bytes", self.display()); | ||
let len = | ||
self.session.write(self.write_buffer.make_contiguous()).or_else(|err| { | ||
match err.kind() { | ||
|
@@ -388,23 +413,23 @@ impl<S: NetSession> NetTransport<S> { | |
| io::ErrorKind::WriteZero | ||
| io::ErrorKind::Interrupted => { | ||
#[cfg(feature = "log")] | ||
log::warn!(target: "transport", "Resource {} kernel buffer is fulled (system message is '{err}')", self.id()); | ||
log::warn!(target: "transport", "Resource {} kernel buffer is fulled (system message is '{err}')", self.display()); | ||
Ok(0) | ||
}, | ||
_ => { | ||
#[cfg(feature = "log")] | ||
log::error!(target: "transport", "Resource {} failed write operation with message '{err}'", self.id()); | ||
log::error!(target: "transport", "Resource {} failed write operation with message '{err}'", self.display()); | ||
Err(err) | ||
}, | ||
} | ||
})?; | ||
if orig_len > len { | ||
#[cfg(feature = "log")] | ||
log::debug!(target: "transport", "Resource {} was able to consume only a part of the buffered data ({len} of {orig_len} bytes)", self.id()); | ||
log::debug!(target: "transport", "Resource {} was able to consume only a part of the buffered data ({len} of {orig_len} bytes)", self.display()); | ||
self.write_intent = true; | ||
} else { | ||
#[cfg(feature = "log")] | ||
log::trace!(target: "transport", "Resource {} was able to consume all of the buffered data ({len} of {orig_len} bytes)", self.id()); | ||
log::trace!(target: "transport", "Resource {} was able to consume all of the buffered data ({len} of {orig_len} bytes)", self.display()); | ||
self.write_intent = false; | ||
} | ||
self.write_buffer.drain(..len); | ||
|
@@ -413,12 +438,8 @@ impl<S: NetSession> NetTransport<S> { | |
} | ||
|
||
impl<S: NetSession> Resource for NetTransport<S> { | ||
// TODO: Use S::Artifact instead | ||
type Id = RawFd; | ||
type Event = SessionEvent<S>; | ||
|
||
fn id(&self) -> Self::Id { self.session.as_connection().as_raw_fd() } | ||
|
||
fn interests(&self) -> IoType { | ||
match self.state { | ||
TransportState::Init => IoType::write_only(), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to remove this once io-reactor patch is merged