Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace legacy impl with Poll #329

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ if [ "${NO_RUN}" != "1" ] && [ "${NO_RUN}" != "true" ]; then
export CARGO_NET_RETRY=5
export CARGO_NET_TIMEOUT=10

cargo install cross --git "https://github.com/cross-rs/cross" --rev "7b79041c9278769eca57fae10c74741f5aa5c14b"
cargo install cross --git "https://github.com/cross-rs/cross" --rev "c7dee4d008475ce1c140773cbcd6078f4b86c2aa"
CARGO=cross

cargo clean
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::slice::{Iter, IterMut};

use mio::Token;
use windows_sys::Win32::System::IO::OVERLAPPED_ENTRY;

Expand Down Expand Up @@ -117,4 +119,30 @@ impl Events {
*status = unsafe { std::mem::zeroed() };
}
}

pub fn iter(&self) -> Iter<'_, Event> {
self.events.iter()
}

pub fn iter_mut(&mut self) -> IterMut<'_, Event> {
self.events.iter_mut()
}
}

impl<'a> IntoIterator for &'a Events {
type Item = &'a Event;
type IntoIter = Iter<'a, Event>;

fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}

impl<'a> IntoIterator for &'a mut Events {
type Item = &'a mut Event;
type IntoIter = IterMut<'a, Event>;

fn into_iter(self) -> Self::IntoIter {
self.iter_mut()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod waker;
pub use core::*;
use std::{
collections::VecDeque,
os::windows::prelude::RawSocket,
os::windows::prelude::{AsRawHandle, RawHandle, RawSocket},
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -229,7 +229,7 @@ impl Poller {
const POLL_GROUP__MAX_GROUP_SIZE: usize = 32;

let mut afd_group = self.afd.lock().unwrap();
if afd_group.len() == 0 {
if afd_group.is_empty() {
self._alloc_afd_group(&mut afd_group)?;
} else {
// + 1 reference in Vec
Expand Down Expand Up @@ -286,6 +286,12 @@ impl Drop for Poller {
}
}

impl AsRawHandle for Poller {
fn as_raw_handle(&self) -> RawHandle {
self.cp.as_raw_handle()
}
}

pub fn from_overlapped(ptr: *mut OVERLAPPED) -> Pin<Arc<Mutex<SockState>>> {
let sock_ptr: *const Mutex<SockState> = ptr as *const _;
unsafe { Pin::new_unchecked(Arc::from_raw(sock_ptr)) }
Expand Down
File renamed without changes.
File renamed without changes.
23 changes: 9 additions & 14 deletions monoio/src/driver/legacy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ use super::{
};
use crate::utils::slab::Slab;

#[allow(missing_docs, unreachable_pub, dead_code, unused_imports)]
#[cfg(windows)]
pub(super) mod iocp;

#[cfg(feature = "sync")]
mod waker;
#[cfg(feature = "sync")]
Expand All @@ -32,9 +28,9 @@ pub(crate) struct LegacyInner {
#[cfg(unix)]
poll: mio::Poll,
#[cfg(windows)]
events: iocp::Events,
events: crate::driver::iocp::Events,
#[cfg(windows)]
poll: iocp::Poller,
poll: crate::driver::iocp::Poller,

#[cfg(feature = "sync")]
shared_waker: std::sync::Arc<waker::EventWaker>,
Expand Down Expand Up @@ -69,18 +65,17 @@ impl LegacyDriver {
#[cfg(unix)]
let poll = mio::Poll::new()?;
#[cfg(windows)]
let poll = iocp::Poller::new()?;
let poll = crate::driver::iocp::Poller::new()?;

#[cfg(all(unix, feature = "sync"))]
let shared_waker = std::sync::Arc::new(waker::EventWaker::new(mio::Waker::new(
poll.registry(),
TOKEN_WAKEUP,
)?));
#[cfg(all(windows, feature = "sync"))]
let shared_waker = std::sync::Arc::new(waker::EventWaker::new(iocp::Waker::new(
&poll,
TOKEN_WAKEUP,
)?));
let shared_waker = std::sync::Arc::new(waker::EventWaker::new(
crate::driver::iocp::Waker::new(&poll, TOKEN_WAKEUP)?,
));
#[cfg(feature = "sync")]
let (waker_sender, waker_receiver) = flume::unbounded::<std::task::Waker>();
#[cfg(feature = "sync")]
Expand All @@ -93,7 +88,7 @@ impl LegacyDriver {
#[cfg(unix)]
poll,
#[cfg(windows)]
events: iocp::Events::with_capacity(entries as usize),
events: crate::driver::iocp::Events::with_capacity(entries as usize),
#[cfg(windows)]
poll,
#[cfg(feature = "sync")]
Expand Down Expand Up @@ -178,7 +173,7 @@ impl LegacyDriver {
#[cfg(windows)]
pub(crate) fn register(
this: &Rc<UnsafeCell<LegacyInner>>,
state: &mut iocp::SocketState,
state: &mut crate::driver::iocp::SocketState,
interest: mio::Interest,
) -> io::Result<usize> {
let inner = unsafe { &mut *this.get() };
Expand All @@ -198,7 +193,7 @@ impl LegacyDriver {
pub(crate) fn deregister(
this: &Rc<UnsafeCell<LegacyInner>>,
token: usize,
state: &mut iocp::SocketState,
state: &mut crate::driver::iocp::SocketState,
) -> io::Result<()> {
let inner = unsafe { &mut *this.get() };

Expand Down
4 changes: 2 additions & 2 deletions monoio/src/driver/legacy/waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::driver::unpark::Unpark;
pub(crate) struct EventWaker {
// raw waker
#[cfg(windows)]
waker: super::iocp::Waker,
waker: crate::driver::iocp::Waker,
#[cfg(unix)]
waker: mio::Waker,
// Atomic awake status
Expand All @@ -20,7 +20,7 @@ impl EventWaker {
}

#[cfg(windows)]
pub(crate) fn new(waker: super::iocp::Waker) -> Self {
pub(crate) fn new(waker: crate::driver::iocp::Waker) -> Self {
Self {
waker,
awake: std::sync::atomic::AtomicBool::new(true),
Expand Down
6 changes: 5 additions & 1 deletion monoio/src/driver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/// Monoio Driver.
#[allow(dead_code)]
pub(crate) mod op;
#[cfg(all(feature = "poll-io", unix))]
#[cfg(feature = "poll-io")]
pub(crate) mod poll;
#[cfg(any(feature = "legacy", feature = "poll-io"))]
pub(crate) mod ready;
Expand All @@ -17,6 +17,10 @@ mod legacy;
#[cfg(all(target_os = "linux", feature = "iouring"))]
mod uring;

#[allow(missing_docs, unreachable_pub, dead_code, unused_imports)]
#[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))]
pub(crate) mod iocp;

mod util;

use std::{
Expand Down
99 changes: 88 additions & 11 deletions monoio/src/driver/poll.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,42 @@
use std::{io, task::Context, time::Duration};
use std::{
io,
ops::{Deref, DerefMut},
task::Context,
time::Duration,
};

#[cfg(unix)]
use mio::{event::Source, Events};
use mio::{Interest, Token};

use super::{op::MaybeFd, ready::Direction, scheduled_io::ScheduledIo};
#[cfg(windows)]
use crate::driver::iocp::{Events, Poller, SocketState};
use crate::{driver::op::CompletionMeta, utils::slab::Slab};

/// Poller with io dispatch.
// TODO: replace legacy impl with this Poll.
pub(crate) struct Poll {
pub(crate) io_dispatch: Slab<ScheduledIo>,
#[cfg(unix)]
poll: mio::Poll,
events: mio::Events,
#[cfg(unix)]
events: Events,
#[cfg(windows)]
poll: Poller,
#[cfg(windows)]
events: Events,
}

impl Poll {
#[inline]
pub(crate) fn with_capacity(capacity: usize) -> io::Result<Self> {
Ok(Self {
io_dispatch: Slab::new(),
#[cfg(unix)]
poll: mio::Poll::new()?,
events: mio::Events::with_capacity(capacity),
#[cfg(windows)]
poll: Poller::new()?,
events: Events::with_capacity(capacity),
})
}

Expand All @@ -41,14 +60,15 @@ impl Poll {
Ok(())
}

#[cfg(unix)]
pub(crate) fn register(
&mut self,
source: &mut impl mio::event::Source,
interest: mio::Interest,
source: &mut impl Source,
interest: Interest,
) -> io::Result<usize> {
let token = self.io_dispatch.insert(ScheduledIo::new());
let registry = self.poll.registry();
match registry.register(source, mio::Token(token), interest) {
match registry.register(source, Token(token), interest) {
Ok(_) => Ok(token),
Err(e) => {
self.io_dispatch.remove(token);
Expand All @@ -57,11 +77,24 @@ impl Poll {
}
}

pub(crate) fn deregister(
#[cfg(windows)]
pub(crate) fn register(
&mut self,
source: &mut impl mio::event::Source,
token: usize,
) -> io::Result<()> {
source: &mut SocketState,
interest: Interest,
) -> io::Result<usize> {
let token = self.io_dispatch.insert(ScheduledIo::new());
match self.poll.register(source, Token(token), interest) {
Ok(_) => Ok(token),
Err(e) => {
self.io_dispatch.remove(token);
Err(e)
}
}
}

#[cfg(unix)]
pub(crate) fn deregister(&mut self, source: &mut impl Source, token: usize) -> io::Result<()> {
match self.poll.registry().deregister(source) {
Ok(_) => {
self.io_dispatch.remove(token);
Expand All @@ -71,6 +104,18 @@ impl Poll {
}
}

#[cfg(windows)]
pub(crate) fn deregister(&mut self, source: &mut SocketState, token: usize) -> io::Result<()> {
match self.poll.deregister(source) {
Ok(_) => {
self.io_dispatch.remove(token);
Ok(())
}
Err(e) => Err(e),
}
}

#[allow(dead_code)]
#[inline]
pub(crate) fn poll_syscall(
&mut self,
Expand Down Expand Up @@ -107,3 +152,35 @@ impl std::os::fd::AsRawFd for Poll {
self.poll.as_raw_fd()
}
}

#[cfg(unix)]
impl Deref for Poll {
type Target = mio::Poll;

fn deref(&self) -> &Self::Target {
&self.poll
}
}

#[cfg(windows)]
impl std::os::windows::io::AsRawHandle for Poll {
#[inline]
fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
self.poll.as_raw_handle()
}
}

#[cfg(windows)]
impl Deref for Poll {
type Target = Poller;

fn deref(&self) -> &Self::Target {
&self.poll
}
}

impl DerefMut for Poll {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.poll
}
}
2 changes: 1 addition & 1 deletion monoio/src/driver/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl Ready {
pub(crate) const WRITE_ALL: Ready = Ready(WRITABLE | WRITE_CLOSED | WRITE_CANCELED);

#[cfg(windows)]
pub(crate) fn from_mio(event: &super::legacy::iocp::Event) -> Ready {
pub(crate) fn from_mio(event: &crate::driver::iocp::Event) -> Ready {
let mut ready = Ready::EMPTY;

if event.is_readable() {
Expand Down
4 changes: 2 additions & 2 deletions monoio/src/driver/shared_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use std::os::windows::io::{
};
use std::{cell::UnsafeCell, io, rc::Rc};

#[cfg(windows)]
use super::legacy::iocp::SocketState as RawFd;
use super::CURRENT;
#[cfg(windows)]
use crate::driver::iocp::SocketState as RawFd;

// Tracks in-flight operations on a file descriptor. Ensures all in-flight
// operations complete before submitting the close.
Expand Down
9 changes: 6 additions & 3 deletions monoio/src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ pub async fn write<P: AsRef<Path>, C: IoBuf>(path: P, contents: C) -> (io::Resul
/// ```
#[cfg(feature = "unlinkat")]
pub async fn remove_file<P: AsRef<Path>>(path: P) -> io::Result<()> {
Op::unlink(path)?.await.meta.result?;
crate::driver::op::Op::unlink(path)?.await.meta.result?;
Ok(())
}

Expand Down Expand Up @@ -250,7 +250,7 @@ pub async fn remove_file<P: AsRef<Path>>(path: P) -> io::Result<()> {
/// ```
#[cfg(feature = "unlinkat")]
pub async fn remove_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
Op::rmdir(path)?.await.meta.result?;
crate::driver::op::Op::rmdir(path)?.await.meta.result?;
Ok(())
}

Expand Down Expand Up @@ -281,6 +281,9 @@ pub async fn remove_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
/// ```
#[cfg(feature = "renameat")]
pub async fn rename<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<()> {
Op::rename(from.as_ref(), to.as_ref())?.await.meta.result?;
crate::driver::op::Op::rename(from.as_ref(), to.as_ref())?
.await
.meta
.result?;
Ok(())
}
Loading
Loading