Skip to content

Commit 9e99112

Browse files
committed
Create an abstraction for the tokio runtime to allow for different ownership configurations.
1 parent 235581c commit 9e99112

File tree

12 files changed

+214
-83
lines changed

12 files changed

+214
-83
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ tokio = { version = "1", optional = true, features = [
4343
"macros",
4444
"net",
4545
"rt",
46+
"rt-multi-thread",
4647
"signal",
4748
"sync",
4849
"time",

examples/hello/src/main.rs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::time::Duration;
33

44
pub mod ffi;
55

6-
use libevent::{EventCallbackCtx, EventCallbackFlags, EvutilSocket};
6+
use libevent::{tokio_backend::Runtime, EventCallbackCtx, EventCallbackFlags, EvutilSocket};
77

88
extern "C" fn hello_callback(
99
_fd: EvutilSocket,
@@ -15,19 +15,21 @@ extern "C" fn hello_callback(
1515

1616
#[cfg(feature = "tokio_backend")]
1717
fn inject_tokio(base: &Base) {
18-
let runtime = tokio::runtime::Builder::new_current_thread()
19-
.enable_all()
20-
.build()
21-
.expect("failed to build a tokio runtime");
22-
23-
runtime.spawn(async {
24-
loop {
25-
tokio::time::sleep(Duration::from_secs(5)).await;
26-
println!("'Hello, world' from a tokio task!");
27-
}
28-
});
18+
let runtime =
19+
libevent::tokio_backend::TokioRuntime::new().expect("failed to build a tokio runtime");
20+
21+
{
22+
let _guard = runtime.enter();
23+
24+
tokio::spawn(async {
25+
loop {
26+
tokio::time::sleep(Duration::from_secs(3)).await;
27+
println!("'Hello, world' from a tokio task!");
28+
}
29+
});
30+
}
2931

30-
base.inject_tokio(runtime);
32+
base.inject_tokio(Box::new(runtime));
3133
}
3234

3335
fn main() {

libevent-sys/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010
#![allow(clippy::redundant_static_lifetimes)]
1111
#![allow(clippy::too_many_arguments)]
1212
#![allow(clippy::unreadable_literal)]
13+
#![allow(deref_nullptr)]
1314
#![allow(non_camel_case_types)]
1415
#![allow(non_snake_case)]
1516
#![allow(non_upper_case_globals)]
17+
#![allow(unaligned_references)]
1618

1719
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
1820

@@ -23,6 +25,6 @@ mod tests {
2325
#[test]
2426
fn constant_access() {
2527
assert_eq!(EVENT_LOG_MSG, 1);
26-
assert_eq!(IPPORT_RESERVED, 1024);
28+
assert_eq!(IPV6PORT_RESERVED, 1024);
2729
}
2830
}

sample/bench.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
*/
3535

3636
#include "event2/event-config.h"
37+
#include "event2/thread.h"
3738

3839
#include <sys/types.h>
3940
#include <sys/stat.h>
@@ -173,6 +174,10 @@ main(int argc, char **argv)
173174
struct timeval *tv;
174175
evutil_socket_t *cp;
175176

177+
#ifndef USE_TOKIO
178+
evthread_use_pthreads();
179+
#endif
180+
176181
num_pipes = 100;
177182
num_active = 1;
178183
num_writes = num_pipes;

src/base.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
#![allow(dead_code)]
22

3+
use super::{event::*, tokio_backend::Runtime};
4+
use crate::EventCallbackWrapper;
35
use bitflags::bitflags;
46
use std::io;
57
use std::os::raw::{c_int, c_short, c_void};
68
use std::ptr::NonNull;
79
use std::time::Duration;
810

9-
use super::event::*;
10-
use crate::EventCallbackWrapper;
11-
1211
/// A file descriptor in libevent.
1312
pub type EvutilSocket = c_int;
1413

@@ -51,9 +50,9 @@ impl Base {
5150
}
5251
}
5352

54-
/// Replaces the standard libevent backend with tokio
53+
/// Replaces the standard libevent backend with an owned tokio runtime
5554
#[cfg(feature = "tokio_backend")]
56-
pub fn inject_tokio(&self, runtime: tokio::runtime::Runtime) {
55+
pub fn inject_tokio(&self, runtime: Box<dyn Runtime>) {
5756
super::tokio_backend::inject_tokio(self.base, runtime)
5857
}
5958

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ pub use base::{
1313
};
1414

1515
#[cfg(feature = "tokio_backend")]
16-
mod tokio_backend;
16+
pub mod tokio_backend;
1717

1818
/// The context passed into `handle_wrapped_callback`, which handles event-type
1919
/// specific metadata for trampolining into the user-supplied closure.

src/tokio_backend/api.rs

Lines changed: 40 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
use super::{backend::TokioBackend, io::IoType, BaseWrapper};
1+
use super::{
2+
backend::TokioBackend,
3+
io::IoType,
4+
runtime::{Runtime, TokioRuntime},
5+
BaseWrapper,
6+
};
27
use std::{
38
ffi::c_void,
49
os::{
@@ -9,40 +14,34 @@ use std::{
914
time::Duration,
1015
};
1116

17+
const EVSEL: libevent_sys::eventop = libevent_sys::eventop {
18+
name: "tokio".as_ptr().cast(),
19+
init: Some(tokio_backend_init),
20+
add: Some(tokio_backend_add),
21+
del: Some(tokio_backend_del),
22+
dispatch: Some(tokio_backend_dispatch),
23+
dealloc: Some(tokio_backend_dealloc),
24+
need_reinit: 1,
25+
features: libevent_sys::event_method_feature_EV_FEATURE_FDS,
26+
fdinfo_len: std::mem::size_of::<RawFd>() as u64,
27+
};
28+
const EVSIGSEL: libevent_sys::eventop = libevent_sys::eventop {
29+
name: "tokio_signal".as_ptr().cast(),
30+
init: None,
31+
add: Some(tokio_signal_backend_add),
32+
del: Some(tokio_signal_backend_del),
33+
dispatch: None,
34+
dealloc: None,
35+
need_reinit: 0,
36+
features: 0,
37+
fdinfo_len: 0,
38+
};
39+
1240
/// Injects a tokio backend with the given runtime into the given libevent instance.
1341
///
1442
/// The libevent instance will already have an initialized backend. This
1543
/// exisiting backend is deallocated before being replaced.
16-
///
17-
/// A tracing-subscriber may also be initialized if the feature is activated
18-
/// to enable tracing output when linked to a C program.
19-
pub fn inject_tokio(mut base: NonNull<libevent_sys::event_base>, runtime: tokio::runtime::Runtime) {
20-
const EVSEL: libevent_sys::eventop = libevent_sys::eventop {
21-
name: "tokio".as_ptr().cast(),
22-
init: Some(tokio_backend_init),
23-
add: Some(tokio_backend_add),
24-
del: Some(tokio_backend_del),
25-
dispatch: Some(tokio_backend_dispatch),
26-
dealloc: Some(tokio_backend_dealloc),
27-
need_reinit: 1,
28-
features: libevent_sys::event_method_feature_EV_FEATURE_FDS,
29-
fdinfo_len: std::mem::size_of::<RawFd>() as u64,
30-
};
31-
const EVSIGSEL: libevent_sys::eventop = libevent_sys::eventop {
32-
name: "tokio_signal".as_ptr().cast(),
33-
init: None,
34-
add: Some(tokio_signal_backend_add),
35-
del: Some(tokio_signal_backend_del),
36-
dispatch: None,
37-
dealloc: None,
38-
need_reinit: 0,
39-
features: 0,
40-
fdinfo_len: 0,
41-
};
42-
43-
#[cfg(feature = "tracing_subscriber")]
44-
tracing_subscriber::fmt::init();
45-
44+
pub fn inject_tokio(mut base: NonNull<libevent_sys::event_base>, runtime: Box<dyn Runtime>) {
4645
let backend = Box::new(TokioBackend::new(runtime));
4746
let base = unsafe { base.as_mut() };
4847

@@ -65,16 +64,18 @@ pub unsafe extern "C" fn tokio_event_base_new() -> *mut libevent_sys::event_base
6564
let base = NonNull::new(libevent_sys::event_base_new());
6665

6766
match base {
68-
Some(base) => {
69-
let runtime = tokio::runtime::Builder::new_current_thread()
70-
.enable_all()
71-
.build()
72-
.expect("failed to build a tokio runtime");
67+
Some(base) => match TokioRuntime::new() {
68+
Ok(runtime) => {
69+
inject_tokio(base, Box::new(runtime));
7370

74-
inject_tokio(base, runtime);
71+
base.as_ptr()
72+
}
73+
Err(error) => {
74+
tracing::error!(?error, "failed to create a new Tokio runtime");
7575

76-
base.as_ptr()
77-
}
76+
std::ptr::null_mut()
77+
}
78+
},
7879
None => std::ptr::null_mut(),
7980
}
8081
}

src/tokio_backend/backend.rs

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
use super::{
22
io::{IoMap, IoType},
3+
runtime::Runtime,
34
signal::SignalMap,
45
BaseWrapper,
56
};
7+
use crate::tokio_backend::runtime::{JoinFuture, YieldFuture};
68
use std::{os::raw::c_int, sync::Arc, time::Duration};
7-
use tokio::sync::Notify;
9+
use tokio::{sync::Notify, task::JoinHandle};
810

911
/// Implements a libevent backend using a tokio runtime
10-
#[derive(Debug)]
1112
pub struct TokioBackend {
12-
/// Tokio runtime for driving I/O and signal events
13-
runtime: tokio::runtime::Runtime,
13+
/// Option tokio runtime to maintain ownership
14+
runtime: Box<dyn Runtime>,
1415
/// Notifies the dispatch loop that it should yield back to libevent
1516
dispatch_notify: Arc<Notify>,
1617
/// Map of futures for registered I/O events
@@ -20,8 +21,8 @@ pub struct TokioBackend {
2021
}
2122

2223
impl TokioBackend {
23-
/// Create a new tokio libevent backend using the provided runtime
24-
pub fn new(runtime: tokio::runtime::Runtime) -> Self {
24+
/// Create a new libevent backend using the provided runtime
25+
pub fn new(runtime: Box<dyn Runtime>) -> Self {
2526
let dispatch_notify = Arc::new(Notify::new());
2627
let io_map = IoMap::new();
2728
let signal_map = SignalMap::new();
@@ -62,14 +63,19 @@ impl TokioBackend {
6263
}
6364
}
6465

66+
/// Blocks on the given join handle
67+
fn join(&self, join_handle: JoinHandle<()>) {
68+
let future = JoinFuture::new(join_handle);
69+
70+
self.runtime.join(future);
71+
}
72+
6573
/// Terminates an active I/O task
6674
pub fn del_io(&mut self, fd: c_int) -> c_int {
6775
tracing::debug!(fd, "delete an I/O event");
6876

6977
if let Ok(join_handle) = self.io_map.del(fd) {
70-
self.runtime.block_on(async move {
71-
let _ = join_handle.await;
72-
});
78+
self.join(join_handle);
7379
0
7480
} else {
7581
-1
@@ -104,12 +110,10 @@ impl TokioBackend {
104110

105111
/// Terminates an active signal task
106112
pub fn del_signal(&mut self, signum: c_int) -> c_int {
107-
tracing::debug!(signum, "delete an signal event");
113+
tracing::debug!(signum, "delete a signal event");
108114

109115
if let Ok(join_handle) = self.signal_map.del(signum) {
110-
self.runtime.block_on(async move {
111-
let _ = join_handle.await;
112-
});
116+
self.join(join_handle);
113117
0
114118
} else {
115119
-1
@@ -120,19 +124,26 @@ impl TokioBackend {
120124
pub fn dispatch(&mut self, _base: *mut libevent_sys::event_base, timeout: Option<Duration>) {
121125
tracing::trace!(?timeout, "dispatch events");
122126

123-
let dispatch_notify = self.dispatch_notify.clone();
127+
let _guard = self.runtime.enter();
124128

125-
self.runtime.block_on(async move {
126-
if let Some(duration) = timeout {
129+
match timeout {
130+
Some(duration) => {
127131
if duration.is_zero() {
128-
tokio::task::yield_now().await;
129-
tokio::task::yield_now().await;
132+
let future = YieldFuture::default();
133+
134+
self.runtime.dispatch_yield(future);
130135
} else {
131-
let _ = tokio::time::timeout(duration, dispatch_notify.notified()).await;
136+
let future = self.dispatch_notify.notified();
137+
let future = tokio::time::timeout(duration, future);
138+
139+
self.runtime.dispatch_timeout(future);
132140
}
133-
} else {
134-
dispatch_notify.notified().await;
135141
}
136-
})
142+
None => {
143+
let future = self.dispatch_notify.notified();
144+
145+
self.runtime.dispatch_notify(future);
146+
}
147+
}
137148
}
138149
}

src/tokio_backend/io.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
use super::BaseWrapper;
22
use std::{collections::HashMap, os::unix::prelude::RawFd, sync::Arc};
3-
use tokio::{io::{unix::AsyncFd, Interest}, sync::Notify, task::JoinHandle};
3+
use tokio::{
4+
io::{unix::AsyncFd, Interest},
5+
sync::Notify,
6+
task::JoinHandle,
7+
};
48

59
/// Manages adding and removing I/O event tasks
610
#[derive(Debug)]

src/tokio_backend/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
mod api;
22
mod backend;
33
mod io;
4+
mod runtime;
45
mod signal;
56

67
pub use api::inject_tokio;
8+
pub use runtime::{JoinFuture, Runtime, TokioRuntime, YieldFuture};
79

810
/// Wrapper to allow sending of raw event_base pointers to tokio tasks.
911
///

0 commit comments

Comments
 (0)