diff --git a/.travis.yml b/.travis.yml index cb011d8..afc0f32 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,7 +2,7 @@ language: rust os: linux dist: bionic rust: - - 1.35.0 # msrv + - 1.54.0 # msrv - stable addons: @@ -21,8 +21,8 @@ addons: script: # Try bundled build first - - cargo build --features "bundled,openssl_bundled" - - cargo run --manifest-path examples/hello/Cargo.toml --features "bundled,openssl_bundled" -- 5 + - cargo build --features "tracing_subscriber,tokio_backend" + - cargo run --manifest-path examples/hello/Cargo.toml --features "tracing_subscriber,tokio_backend" -- 5 # Now pkg-config build - sudo apt-get install -y libssl-dev libevent-dev - cargo clean diff --git a/Cargo.toml b/Cargo.toml index 4d47f0e..d455c54 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,11 @@ [package] name = "libevent" -version = "0.1.0" -authors = ["Jon Magnuson ", - "Grant Elbert "] +version = "0.2.0" +authors = [ + "Jon Magnuson ", + "Grant Elbert ", + "Judge Maygarden ", +] description = "Rust bindings to the libevent async I/O framework" documentation = "https://docs.rs/libevent" repository = "https://github.com/jmagnuson/libevent-rs" @@ -12,22 +15,38 @@ edition = "2018" categories = ["api-bindings", "asynchronous"] keywords = ["libevent", "bindings", "async", "io"] +[lib] +crate-type = ["lib", "staticlib"] + [workspace] -members = ['examples/hello'] +members = ["examples/hello"] [features] -default = [ "pkgconfig", "openssl", "threading", "buildtime_bindgen" ] -static = [ "libevent-sys/static" ] -pkgconfig = [ "libevent-sys/pkgconfig" ] -bundled = [ "static", "libevent-sys/bundled" ] -buildtime_bindgen = [ "libevent-sys/buildtime_bindgen" ] -openssl = [ "libevent-sys/openssl" ] -openssl_bundled = [ "libevent-sys/openssl_bundled", "threading" ] -threading = [ "libevent-sys/threading" ] +default = ["pkgconfig", "openssl", "threading", "buildtime_bindgen"] +static = ["libevent-sys/static"] +pkgconfig = ["libevent-sys/pkgconfig"] +bundled = ["static", "libevent-sys/bundled"] +buildtime_bindgen = ["libevent-sys/buildtime_bindgen"] +openssl = ["libevent-sys/openssl"] +openssl_bundled = ["libevent-sys/openssl_bundled", "threading"] +threading = ["libevent-sys/threading"] +tokio_backend = ["bundled", "libevent-sys/tokio_backend", "tokio"] +tracing_subscriber = ["tracing-subscriber"] # features for development -verbose_build = [ "libevent-sys/verbose_build" ] +verbose_build = ["libevent-sys/verbose_build"] [dependencies] bitflags = "1.2" -libevent-sys = { version = "0.2", path = "libevent-sys", default-features = false } +libevent-sys = { version = "0.3", path = "libevent-sys", default-features = false } +tokio = { version = "1", optional = true, features = [ + "macros", + "net", + "rt", + "rt-multi-thread", + "signal", + "sync", + "time", +] } +tracing = { version = "0.1" } +tracing-subscriber = { version = "0.2", optional = true } diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..e5b64f0 --- /dev/null +++ b/Makefile @@ -0,0 +1,48 @@ +CARGO_FLAGS = --release --features tracing_subscriber,tokio_backend +CFLAGS = -Wall -Werror +SAMPLE_DIR = sample +OUT_DIR = target/release +LIBS = \ + -L$(OUT_DIR) \ + -llibevent +SAMPLES_SRC = \ + $(SAMPLE_DIR)/bench.c \ + $(SAMPLE_DIR)/dns-example.c \ + $(SAMPLE_DIR)/event-read-fifo.c \ + $(SAMPLE_DIR)/hello-world.c \ + $(SAMPLE_DIR)/tokio-time-test.c +SAMPLES_BIN = $(patsubst %.c,%,$(SAMPLES_SRC)) + +all: $(SAMPLES_BIN) + +%: %.c $(OUT_DIR)/liblibevent.a + $(CC) $(CFLAGS) -o $@ $(LIBS) $< + +$(OUT_DIR)/liblibevent.a: FORCE + cargo build $(CARGO_FLAGS) + +$(OUT_DIR)/bench-kqueue: $(SAMPLE_DIR)/bench.c $(OUT_DIR)/liblibevent.a + $(CC) $(CFLAGS) -o $(OUT_DIR)/bench-kqueue $(LIBS) $(SAMPLE_DIR)/bench.c + +$(OUT_DIR)/bench-tokio: $(SAMPLE_DIR)/bench.c $(OUT_DIR)/liblibevent.a + $(CC) $(CFLAGS) -o $(OUT_DIR)/bench-tokio $(LIBS) $(SAMPLE_DIR)/bench.c -DUSE_TOKIO + +plot-bench: $(OUT_DIR)/bench-kqueue $(OUT_DIR)/bench-tokio + $(OUT_DIR)/bench-kqueue > $(OUT_DIR)/kqueue.csv + $(OUT_DIR)/bench-tokio > $(OUT_DIR)/tokio.csv + ./plot-bench.py + +prof-bench: $(OUT_DIR)/bench-kqueue $(OUT_DIR)/bench-tokio + sudo flamegraph -o kqueue-flamegraph.svg target/release/bench-kqueue + sudo flamegraph -o tokio-flamegraph.svg target/release/bench-tokio + +clippy: FORCE + cargo clippy $(CARGO_FLAGS) + +run-hello-world: $(SAMPLE_DIR)/hello-world + RUST_BACKTRACE=1 RUST_LOG=debug ./$< + +FORCE: ; + +clean: + $(RM) $(SAMPLES_BIN) diff --git a/README.md b/README.md index ef5121a..c0b55f0 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,23 @@ base.run(); when `buildtime_bindgen` is not enabled, and it is only applicable in this case. +## Tokio Backend for libevent + +A optional tokio backend for handling libevent I/O and signal readiness is +optionally provided. It is not patched into libevent directly, but is +substituted at run time with a call to `libevent::inject_tokio`. The primary +motivation for this feature is to allow native tokio and libevent tasks to +co-exist with a single event loop on the same thread. This feature is +especially useful when gradually migrating a C/libevent project to Rust/tokio +when use of FFI between the C and Rust code prevents running the event loops +on separate threads. + +## Samples + +Versions of libevent samples modified to make use of an injected tokio +backend are located in the ./sample directory. There is a Makefile provided +for building these C programs linked to the Rust libevent crate. + ## Minimum Supported Rust Version (MSRV) This crate is guaranteed to compile on stable Rust 1.35.0 and up. It might compile diff --git a/examples/hello/Cargo.toml b/examples/hello/Cargo.toml index 2e13365..d487032 100644 --- a/examples/hello/Cargo.toml +++ b/examples/hello/Cargo.toml @@ -1,17 +1,32 @@ [package] name = "hello" version = "0.1.0" -authors = ["Jon Magnuson "] +authors = [ + "Jon Magnuson ", + "Judge Maygarden ", +] edition = "2018" -build="build.rs" +build = "build.rs" [features] -default = [ "bundled", "openssl", "buildtime_bindgen" ] -bundled = ["libevent/bundled", "libevent-sys/bundled" ] +default = [ + "bundled", + "openssl", + "buildtime_bindgen", + "tokio_backend", + "tracing_subscriber", +] +bundled = ["libevent/bundled", "libevent-sys/bundled"] openssl = ["libevent/openssl"] -openssl_bundled = [ "libevent/openssl_bundled" ] +openssl_bundled = ["libevent/openssl_bundled"] pkgconfig = ["libevent/pkgconfig"] buildtime_bindgen = ["libevent/buildtime_bindgen"] +tokio_backend = ["libevent/tokio_backend", "tokio"] +tracing_subscriber = ["tracing-subscriber"] + +[dependencies] +tokio = { version = "1", optional = true, features = ["rt"] } +tracing-subscriber = { version = "0.2", optional = true } [dependencies.libevent] path = "../../" diff --git a/examples/hello/src/main.rs b/examples/hello/src/main.rs index 72a4bf3..4845551 100644 --- a/examples/hello/src/main.rs +++ b/examples/hello/src/main.rs @@ -3,7 +3,7 @@ use std::time::Duration; pub mod ffi; -use libevent::{EventCallbackCtx, EventCallbackFlags, EvutilSocket}; +use libevent::{tokio_backend::Runtime, EventCallbackCtx, EventCallbackFlags, EvutilSocket}; extern "C" fn hello_callback( _fd: EvutilSocket, @@ -13,7 +13,29 @@ extern "C" fn hello_callback( println!("callback: rust fn (interval: 2s)"); } +#[cfg(feature = "tokio_backend")] +fn inject_tokio(base: &Base) { + let runtime = + libevent::tokio_backend::TokioRuntime::new().expect("failed to build a tokio runtime"); + + { + let _guard = runtime.enter(); + + tokio::spawn(async { + loop { + tokio::time::sleep(Duration::from_secs(3)).await; + println!("'Hello, world' from a tokio task!"); + } + }); + } + + base.inject_tokio(Box::new(runtime)); +} + fn main() { + #[cfg(feature = "tracing_subscriber")] + tracing_subscriber::fmt::init(); + let run_duration = std::env::args().nth(1).map(|val_s| { Duration::from_secs( val_s @@ -24,6 +46,9 @@ fn main() { let mut base = Base::new().unwrap_or_else(|e| panic!("{:?}", e)); + #[cfg(feature = "tokio_backend")] + inject_tokio(&base); + let ret = unsafe { ffi::helloc_init(base.as_raw().as_ptr()) }; assert_eq!(ret, 0); diff --git a/libevent-sys/Cargo.toml b/libevent-sys/Cargo.toml index 6660ce0..6cbe647 100644 --- a/libevent-sys/Cargo.toml +++ b/libevent-sys/Cargo.toml @@ -1,9 +1,12 @@ [package] name = "libevent-sys" -version = "0.2.4" -authors = ["Steven vanZyl ", - "Jon Magnuson ", - "Grant Elbert "] +version = "0.3.0" +authors = [ + "Steven vanZyl ", + "Jon Magnuson ", + "Grant Elbert ", + "Judge Maygarden ", +] repository = "https://github.com/jmagnuson/libevent-rs" edition = "2018" readme = "README.md" @@ -16,14 +19,15 @@ links = "event" build = "build.rs" [features] -default = [ "pkgconfig", "openssl", "threading", "buildtime_bindgen" ] +default = ["pkgconfig", "openssl", "threading", "buildtime_bindgen"] static = [] -pkgconfig = [ "pkg-config" ] -bundled = [ "static", "cmake" ] -buildtime_bindgen = [ "bindgen" ] -openssl = [ "openssl-sys" ] -openssl_bundled = [ "openssl-sys/vendored", "threading" ] +pkgconfig = ["pkg-config"] +bundled = ["static", "cmake"] +buildtime_bindgen = ["bindgen"] +openssl = ["openssl-sys"] +openssl_bundled = ["openssl-sys/vendored", "threading"] threading = [] +tokio_backend = ["buildtime_bindgen"] # features for development verbose_build = [] @@ -33,6 +37,6 @@ version = "0.9" optional = true [build-dependencies] -bindgen = { version = "0.53", optional = true } +bindgen = { version = "0.59", optional = true } cmake = { version = "0.1", optional = true } pkg-config = { version = "0.3", optional = true } diff --git a/libevent-sys/build.rs b/libevent-sys/build.rs index f9ec4b1..d135d63 100644 --- a/libevent-sys/build.rs +++ b/libevent-sys/build.rs @@ -105,7 +105,10 @@ fn run_pkg_config() -> Option> { let mut include_paths = HashSet::new(); - if let Ok(mut lib) = pkg.probe("libevent_core").or_else(|_| pkg.probe("libevent")) { + if let Ok(mut lib) = pkg + .probe("libevent_core") + .or_else(|_| pkg.probe("libevent")) + { include_paths.extend(lib.include_paths.drain(..)); } else { return None; @@ -166,7 +169,6 @@ fn find_libevent() -> Option> { #[cfg(feature = "buildtime_bindgen")] fn generate_bindings(include_paths: Vec, out_path: impl AsRef) { - println!("cargo:rerun-if-changed=libevent"); println!("cargo:rerun-if-changed=wrapper.h"); let target = env::var("TARGET").unwrap(); @@ -188,12 +190,23 @@ fn generate_bindings(include_paths: Vec, out_path: impl AsRef) { builder = builder.clang_arg(format!("-I{}", path)); } + // Some of the libevent internals need to be exposed to inject a tokio backend. + if cfg!(feature = "tokio_backend") { + builder = builder + .clang_arg("-Ilibevent") + .clang_arg(format!("-I{}/build/include", out_path.as_ref().display())) + .header("libevent/event-internal.h") + .header("libevent/evmap-internal.h") + .blocklist_item(".*voucher.*") + .blocklist_item("strto.*"); + } + let bindings = builder .header("wrapper.h") // Enable for more readable bindings // .rustfmt_bindings(true) // Fixes a bug with a duplicated const - .blacklist_item("IPPORT_RESERVED") + .blocklist_item("IPPORT_RESERVED") .generate() .expect("Failed to generate bindings"); diff --git a/libevent-sys/src/lib.rs b/libevent-sys/src/lib.rs index 1f7d2a3..b1cb9f7 100644 --- a/libevent-sys/src/lib.rs +++ b/libevent-sys/src/lib.rs @@ -7,11 +7,14 @@ //! - Functions are named the same as the C code and don't follow Rust naming schemes. //! - Uses C strings. See `CStr` in the Rust standard library. -#![allow(non_upper_case_globals)] +#![allow(clippy::redundant_static_lifetimes)] +#![allow(clippy::too_many_arguments)] +#![allow(clippy::unreadable_literal)] +#![allow(deref_nullptr)] #![allow(non_camel_case_types)] #![allow(non_snake_case)] -#![allow(clippy::unreadable_literal)] -#![allow(clippy::redundant_static_lifetimes)] +#![allow(non_upper_case_globals)] +#![allow(unaligned_references)] include!(concat!(env!("OUT_DIR"), "/bindings.rs")); @@ -22,6 +25,6 @@ mod tests { #[test] fn constant_access() { assert_eq!(EVENT_LOG_MSG, 1); - assert_eq!(IPPORT_RESERVED, 1024); + assert_eq!(IPV6PORT_RESERVED, 1024); } } diff --git a/plot-bench.py b/plot-bench.py new file mode 100755 index 0000000..cc98f41 --- /dev/null +++ b/plot-bench.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 + +import matplotlib.pyplot as pyplot +import pandas + + +def main(): + df = pandas.concat([pandas.read_csv("target/release/kqueue.csv", names=["kqueue"]), + pandas.read_csv("target/release/tokio.csv", names=["tokio"])], + axis=1) + + kqueue_mean = df["kqueue"].mean() + tokio_mean = df["tokio"].mean() + + print( + f"kqueue mean: {round(kqueue_mean)}, tokio mean: {round(tokio_mean)}, ratio: {round(100 * kqueue_mean / tokio_mean)}%") + + df.plot() + pyplot.title("Libevent Backend Benchmark (kqueue vs tokio)") + pyplot.xlabel("Test Run") + pyplot.ylabel("Time (µs)") + pyplot.show() + + +if __name__ == "__main__": + main() diff --git a/sample/bench.c b/sample/bench.c new file mode 100644 index 0000000..d02956e --- /dev/null +++ b/sample/bench.c @@ -0,0 +1,238 @@ +/* + * Copyright 2003-2007 Niels Provos + * Copyright 2007-2012 Niels Provos and Nick Mathewson + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 4. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * + * Mon 03/10/2003 - Modified by Davide Libenzi + * + * Added chain event propagation to improve the sensitivity of + * the measure respect to the event loop efficency. + * + * + */ + +#include "event2/event-config.h" +#include "event2/thread.h" + +#include +#include +#ifdef EVENT__HAVE_SYS_TIME_H +#include +#endif +#include +#include +#include +#include +#include +#include +#include +#ifdef EVENT__HAVE_UNISTD_H +#include +#endif +#include + +#include +#include + +#ifdef USE_TOKIO +#include "tokio_event_base.h" +#endif + +extern struct event_base *event_global_current_base_; + +#if defined(__STDC__) && defined(__STDC_VERSION__) && !defined(__MINGW64_VERSION_MAJOR) +#if (__STDC_VERSION__ >= 199901L) +#define EV_SIZE_FMT "%zu" +#define EV_SSIZE_FMT "%zd" +#define EV_SIZE_ARG(x) (x) +#define EV_SSIZE_ARG(x) (x) +#endif +#endif + +#ifndef EV_SIZE_FMT +#if (EVENT__SIZEOF_SIZE_T <= EVENT__SIZEOF_LONG) +#define EV_SIZE_FMT "%lu" +#define EV_SSIZE_FMT "%ld" +#define EV_SIZE_ARG(x) ((unsigned long)(x)) +#define EV_SSIZE_ARG(x) ((long)(x)) +#else +#define EV_SIZE_FMT EV_U64_FMT +#define EV_SSIZE_FMT EV_I64_FMT +#define EV_SIZE_ARG(x) EV_U64_ARG(x) +#define EV_SSIZE_ARG(x) EV_I64_ARG(x) +#endif +#endif + +static ev_ssize_t count, fired; +static int writes, failures; +static evutil_socket_t *pipes; +static int num_pipes, num_active, num_writes; +static struct event *events; + + +static void +read_cb(evutil_socket_t fd, short which, void *arg) +{ + ev_intptr_t idx = (ev_intptr_t) arg, widx = idx + 1; + unsigned char ch; + ev_ssize_t n; + + n = recv(fd, (char*)&ch, sizeof(ch), 0); + if (n >= 0) + count += n; + else + failures++; + if (writes) { + if (widx >= num_pipes) + widx -= num_pipes; + n = send(pipes[2 * widx + 1], "e", 1, 0); + if (n != 1) + failures++; + writes--; + fired++; + } + + if (count == fired) { + event_loopbreak(); + } +} + +static struct timeval * +run_once(void) +{ + evutil_socket_t *cp, space; + long i; + static struct timeval ts, te; + + for (cp = pipes, i = 0; i < num_pipes; i++, cp += 2) { + if (event_initialized(&events[i])) + event_del(&events[i]); + event_set(&events[i], cp[0], EV_READ | EV_PERSIST, read_cb, (void *)(ev_intptr_t) i); + event_add(&events[i], NULL); + } + + event_loop(EVLOOP_ONCE | EVLOOP_NONBLOCK); + + fired = 0; + space = num_pipes / num_active; + space = space * 2; + for (i = 0; i < num_active; i++, fired++) + (void) send(pipes[i * space + 1], "e", 1, 0); + + count = 0; + writes = num_writes; + { + int xcount = 0; + evutil_gettimeofday(&ts, NULL); + do { + event_loop(EVLOOP_ONCE | EVLOOP_NONBLOCK); + xcount++; + } while (count != fired); + evutil_gettimeofday(&te, NULL); + + if (xcount != count) + { + fprintf(stderr, "Xcount: %d, Rcount: " EV_SSIZE_FMT ", Failures: %d\n", + xcount, count, failures); + } + } + + evutil_timersub(&te, &ts, &te); + + return (&te); +} + +int +main(int argc, char **argv) +{ +#ifdef EVENT__HAVE_SETRLIMIT + struct rlimit rl; +#endif + int i, c; + struct timeval *tv; + evutil_socket_t *cp; + +#ifndef USE_TOKIO + evthread_use_pthreads(); +#endif + + num_pipes = 100; + num_active = 1; + num_writes = num_pipes; + while ((c = getopt(argc, argv, "n:a:w:")) != -1) { + switch (c) { + case 'n': + num_pipes = atoi(optarg); + break; + case 'a': + num_active = atoi(optarg); + break; + case 'w': + num_writes = atoi(optarg); + break; + default: + fprintf(stderr, "Illegal argument \"%c\"\n", c); + exit(1); + } + } + +#ifdef EVENT__HAVE_SETRLIMIT + rl.rlim_cur = rl.rlim_max = num_pipes * 2 + 50; + if (setrlimit(RLIMIT_NOFILE, &rl) == -1) { + perror("setrlimit"); + exit(1); + } +#endif + + events = calloc(num_pipes, sizeof(struct event)); + pipes = calloc(num_pipes * 2, sizeof(evutil_socket_t)); + if (events == NULL || pipes == NULL) { + perror("malloc"); + exit(1); + } + +#ifdef USE_TOKIO + event_global_current_base_ = tokio_event_base_new(); +#else + event_global_current_base_ = event_base_new(); +#endif + + for (cp = pipes, i = 0; i < num_pipes; i++, cp += 2) { + if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, cp) == -1) { + perror("pipe"); + exit(1); + } + } + + for (i = 0; i < 100; i++) { + tv = run_once(); + if (tv == NULL) + exit(1); + fprintf(stdout, "%ld\n", + tv->tv_sec * 1000000L + tv->tv_usec); + } + + exit(0); +} diff --git a/sample/dns-example.c b/sample/dns-example.c new file mode 100644 index 0000000..bd4a440 --- /dev/null +++ b/sample/dns-example.c @@ -0,0 +1,263 @@ +/* + This example code shows how to use the high-level, low-level, and + server-level interfaces of evdns. + + XXX It's pretty ugly and should probably be cleaned up. + */ + +#include + +#include + +#ifdef EVENT__HAVE_UNISTD_H +#include +#endif + +#ifdef _WIN32 +#include +#include +#include +#else +#include +#include +#include +#endif + +#include +#include +#include +#include + +#ifdef EVENT__HAVE_NETINET_IN6_H +#include +#endif + +#include +#include +#include + +#include "tokio_event_base.h" + +#define u32 ev_uint32_t +#define u8 ev_uint8_t + +static const char * +debug_ntoa(u32 address) +{ + static char buf[32]; + u32 a = ntohl(address); + evutil_snprintf(buf, sizeof(buf), "%d.%d.%d.%d", + (int)(u8)((a>>24)&0xff), + (int)(u8)((a>>16)&0xff), + (int)(u8)((a>>8 )&0xff), + (int)(u8)((a )&0xff)); + return buf; +} + +static void +main_callback(int result, char type, int count, int ttl, + void *addrs, void *orig) { + char *n = (char*)orig; + int i; + for (i = 0; i < count; ++i) { + if (type == DNS_IPv4_A) { + printf("%s: %s\n", n, debug_ntoa(((u32*)addrs)[i])); + } else if (type == DNS_PTR) { + printf("%s: %s\n", n, ((char**)addrs)[i]); + } + } + if (!count) { + printf("%s: No answer (%d)\n", n, result); + } + fflush(stdout); +} + +static void +gai_callback(int err, struct evutil_addrinfo *ai, void *arg) +{ + const char *name = arg; + int i; + struct evutil_addrinfo *first_ai = ai; + + if (err) { + printf("%s: %s\n", name, evutil_gai_strerror(err)); + } + if (ai && ai->ai_canonname) + printf(" %s ==> %s\n", name, ai->ai_canonname); + for (i=0; ai; ai = ai->ai_next, ++i) { + char buf[128]; + if (ai->ai_family == PF_INET) { + struct sockaddr_in *sin = + (struct sockaddr_in*)ai->ai_addr; + evutil_inet_ntop(AF_INET, &sin->sin_addr, buf, + sizeof(buf)); + printf("[%d] %s: %s\n",i,name,buf); + } else { + struct sockaddr_in6 *sin6 = + (struct sockaddr_in6*)ai->ai_addr; + evutil_inet_ntop(AF_INET6, &sin6->sin6_addr, buf, + sizeof(buf)); + printf("[%d] %s: %s\n",i,name,buf); + } + } + + if (first_ai) + evutil_freeaddrinfo(first_ai); +} + +static void +evdns_server_callback(struct evdns_server_request *req, void *data) +{ + int i, r; + (void)data; + /* dummy; give 192.168.11.11 as an answer for all A questions, + * give foo.bar.example.com as an answer for all PTR questions. */ + for (i = 0; i < req->nquestions; ++i) { + u32 ans = htonl(0xc0a80b0bUL); + if (req->questions[i]->type == EVDNS_TYPE_A && + req->questions[i]->dns_question_class == EVDNS_CLASS_INET) { + printf(" -- replying for %s (A)\n", req->questions[i]->name); + r = evdns_server_request_add_a_reply(req, req->questions[i]->name, + 1, &ans, 10); + if (r<0) + printf("eeep, didn't work.\n"); + } else if (req->questions[i]->type == EVDNS_TYPE_PTR && + req->questions[i]->dns_question_class == EVDNS_CLASS_INET) { + printf(" -- replying for %s (PTR)\n", req->questions[i]->name); + r = evdns_server_request_add_ptr_reply(req, NULL, req->questions[i]->name, + "foo.bar.example.com", 10); + if (r<0) + printf("ugh, no luck"); + } else { + printf(" -- skipping %s [%d %d]\n", req->questions[i]->name, + req->questions[i]->type, req->questions[i]->dns_question_class); + } + } + + r = evdns_server_request_respond(req, 0); + if (r<0) + printf("eeek, couldn't send reply.\n"); +} + +static int verbose = 0; + +static void +logfn(int is_warn, const char *msg) { + if (!is_warn && !verbose) + return; + fprintf(stderr, "%s: %s\n", is_warn?"WARN":"INFO", msg); +} + +int +main(int c, char **v) { + struct options { + int reverse; + int use_getaddrinfo; + int servertest; + const char *resolv_conf; + const char *ns; + }; + struct options o; + int opt; + struct event_base *event_base = NULL; + struct evdns_base *evdns_base = NULL; + + memset(&o, 0, sizeof(o)); + + if (c < 2) { + fprintf(stderr, "syntax: %s [-x] [-v] [-c resolv.conf] [-s ns] hostname\n", v[0]); + fprintf(stderr, "syntax: %s [-T]\n", v[0]); + return 1; + } + + while ((opt = getopt(c, v, "xvc:Ts:g")) != -1) { + switch (opt) { + case 'x': o.reverse = 1; break; + case 'v': ++verbose; break; + case 'g': o.use_getaddrinfo = 1; break; + case 'T': o.servertest = 1; break; + case 'c': o.resolv_conf = optarg; break; + case 's': o.ns = optarg; break; + default : fprintf(stderr, "Unknown option %c\n", opt); break; + } + } + +#ifdef _WIN32 + { + WSADATA WSAData; + WSAStartup(0x101, &WSAData); + } +#endif + + event_base = tokio_event_base_new(); + evdns_base = evdns_base_new(event_base, EVDNS_BASE_DISABLE_WHEN_INACTIVE); + evdns_set_log_fn(logfn); + + if (o.servertest) { + evutil_socket_t sock; + struct sockaddr_in my_addr; + sock = socket(PF_INET, SOCK_DGRAM, 0); + if (sock == -1) { + perror("socket"); + exit(1); + } + evutil_make_socket_nonblocking(sock); + my_addr.sin_family = AF_INET; + my_addr.sin_port = htons(10053); + my_addr.sin_addr.s_addr = INADDR_ANY; + if (bind(sock, (struct sockaddr*)&my_addr, sizeof(my_addr))<0) { + perror("bind"); + exit(1); + } + evdns_add_server_port_with_base(event_base, sock, 0, evdns_server_callback, NULL); + } + if (optind < c) { + int res; +#ifdef _WIN32 + if (o.resolv_conf == NULL && !o.ns) + res = evdns_base_config_windows_nameservers(evdns_base); + else +#endif + if (o.ns) + res = evdns_base_nameserver_ip_add(evdns_base, o.ns); + else + res = evdns_base_resolv_conf_parse(evdns_base, + DNS_OPTION_NAMESERVERS, o.resolv_conf); + + if (res < 0) { + fprintf(stderr, "Couldn't configure nameservers"); + return 1; + } + } + + printf("EVUTIL_AI_CANONNAME in example = %d\n", EVUTIL_AI_CANONNAME); + for (; optind < c; ++optind) { + if (o.reverse) { + struct in_addr addr; + if (evutil_inet_pton(AF_INET, v[optind], &addr)!=1) { + fprintf(stderr, "Skipping non-IP %s\n", v[optind]); + continue; + } + fprintf(stderr, "resolving %s...\n",v[optind]); + evdns_base_resolve_reverse(evdns_base, &addr, 0, main_callback, v[optind]); + } else if (o.use_getaddrinfo) { + struct evutil_addrinfo hints; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = PF_UNSPEC; + hints.ai_protocol = IPPROTO_TCP; + hints.ai_flags = EVUTIL_AI_CANONNAME; + fprintf(stderr, "resolving (fwd) %s...\n",v[optind]); + evdns_getaddrinfo(evdns_base, v[optind], NULL, &hints, + gai_callback, v[optind]); + } else { + fprintf(stderr, "resolving (fwd) %s...\n",v[optind]); + evdns_base_resolve_ipv4(evdns_base, v[optind], 0, main_callback, v[optind]); + } + } + fflush(stdout); + event_base_dispatch(event_base); + evdns_base_free(evdns_base, 1); + event_base_free(event_base); + return 0; +} + diff --git a/sample/event-read-fifo.c b/sample/event-read-fifo.c new file mode 100644 index 0000000..dd22837 --- /dev/null +++ b/sample/event-read-fifo.c @@ -0,0 +1,164 @@ +/* + * This sample code shows how to use Libevent to read from a named pipe. + * XXX This code could make better use of the Libevent interfaces. + * + * XXX This does not work on Windows; ignore everything inside the _WIN32 block. + * + * On UNIX, compile with: + * cc -I/usr/local/include -o event-read-fifo event-read-fifo.c \ + * -L/usr/local/lib -levent + */ + +#include + +#include +#include +#ifndef _WIN32 +#include +#include +#include +#include +#else +#include +#include +#endif +#include +#include +#include +#include +#include + +#include + +#include "tokio_event_base.h" + +static void +fifo_read(evutil_socket_t fd, short event, void *arg) +{ + char buf[255]; + int len; + struct event *ev = arg; +#ifdef _WIN32 + DWORD dwBytesRead; +#endif + + fprintf(stderr, "fifo_read called with fd: %d, event: %d, arg: %p\n", + (int)fd, event, arg); +#ifdef _WIN32 + len = ReadFile((HANDLE)fd, buf, sizeof(buf) - 1, &dwBytesRead, NULL); + + /* Check for end of file. */ + if (len && dwBytesRead == 0) { + fprintf(stderr, "End Of File"); + event_del(ev); + return; + } + + buf[dwBytesRead] = '\0'; +#else + len = read(fd, buf, sizeof(buf) - 1); + + if (len <= 0) { + if (len == -1) + perror("read"); + else if (len == 0) + fprintf(stderr, "Connection closed\n"); + event_del(ev); + event_base_loopbreak(event_get_base(ev)); + return; + } + + buf[len] = '\0'; +#endif + fprintf(stdout, "Read: %s\n", buf); +} + +/* On Unix, cleanup event.fifo if SIGINT is received. */ +#ifndef _WIN32 +static void +signal_cb(evutil_socket_t fd, short event, void *arg) +{ + struct event_base *base = arg; + event_base_loopbreak(base); +} +#endif + +int +main(int argc, char **argv) +{ + struct event *evfifo; + struct event_base* base; +#ifdef _WIN32 + HANDLE socket; + /* Open a file. */ + socket = CreateFileA("test.txt", /* open File */ + GENERIC_READ, /* open for reading */ + 0, /* do not share */ + NULL, /* no security */ + OPEN_EXISTING, /* existing file only */ + FILE_ATTRIBUTE_NORMAL, /* normal file */ + NULL); /* no attr. template */ + + if (socket == INVALID_HANDLE_VALUE) + return 1; + +#else + struct event *signal_int; + struct stat st; + const char *fifo = "event.fifo"; + int socket; + + if (lstat(fifo, &st) == 0) { + if ((st.st_mode & S_IFMT) == S_IFREG) { + errno = EEXIST; + perror("lstat"); + exit(1); + } + } + + unlink(fifo); + if (mkfifo(fifo, 0600) == -1) { + perror("mkfifo"); + exit(1); + } + + socket = open(fifo, O_RDONLY | O_NONBLOCK, 0); + + if (socket == -1) { + perror("open"); + exit(1); + } + + fprintf(stderr, "Write data to %s\n", fifo); +#endif + /* Initalize the event library */ + base = tokio_event_base_new(); + + /* Initalize one event */ +#ifdef _WIN32 + evfifo = event_new(base, (evutil_socket_t)socket, EV_READ|EV_PERSIST, fifo_read, + event_self_cbarg()); +#else + /* catch SIGINT so that event.fifo can be cleaned up */ + signal_int = evsignal_new(base, SIGINT, signal_cb, base); + event_add(signal_int, NULL); + + evfifo = event_new(base, socket, EV_READ|EV_PERSIST, fifo_read, + event_self_cbarg()); +#endif + + /* Add it to the active events, without a timeout */ + event_add(evfifo, NULL); + + event_base_dispatch(base); + event_base_free(base); +#ifdef _WIN32 + CloseHandle(socket); +#else + close(socket); + unlink(fifo); +#endif + libevent_global_shutdown(); + return (0); +} + diff --git a/sample/hello-world.c b/sample/hello-world.c new file mode 100644 index 0000000..0aeeb12 --- /dev/null +++ b/sample/hello-world.c @@ -0,0 +1,143 @@ +/* + This example program provides a trivial server program that listens for TCP + connections on port 9995. When they arrive, it writes a short message to + each client connection, and closes each connection once it is flushed. + + Where possible, it exits cleanly in response to a SIGINT (ctrl-c). +*/ + + +#include +#include +#include +#include +#ifndef _WIN32 +#include +# ifdef _XOPEN_SOURCE_EXTENDED +# include +# endif +#include +#endif + +#include +#include +#include +#include +#include + +#include "tokio_event_base.h" + +static const char MESSAGE[] = "Hello, World!\n"; + +static const int PORT = 9995; + +static void listener_cb(struct evconnlistener *, evutil_socket_t, + struct sockaddr *, int socklen, void *); +static void conn_writecb(struct bufferevent *, void *); +static void conn_eventcb(struct bufferevent *, short, void *); +static void signal_cb(evutil_socket_t, short, void *); + +int +main(int argc, char **argv) +{ + struct event_base *base; + struct evconnlistener *listener; + struct event *signal_event; + + struct sockaddr_in sin; +#ifdef _WIN32 + WSADATA wsa_data; + WSAStartup(0x0201, &wsa_data); +#endif + + base = tokio_event_base_new(); + if (!base) { + fprintf(stderr, "Could not initialize libevent!\n"); + return 1; + } + + memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_port = htons(PORT); + + listener = evconnlistener_new_bind(base, listener_cb, (void *)base, + LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1, + (struct sockaddr*)&sin, + sizeof(sin)); + + if (!listener) { + fprintf(stderr, "Could not create a listener!\n"); + return 1; + } + + signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base); + + if (!signal_event || event_add(signal_event, NULL)<0) { + fprintf(stderr, "Could not create/add a signal event!\n"); + return 1; + } + + event_base_dispatch(base); + + evconnlistener_free(listener); + event_free(signal_event); + event_base_free(base); + + printf("done\n"); + return 0; +} + +static void +listener_cb(struct evconnlistener *listener, evutil_socket_t fd, + struct sockaddr *sa, int socklen, void *user_data) +{ + struct event_base *base = user_data; + struct bufferevent *bev; + + bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); + if (!bev) { + fprintf(stderr, "Error constructing bufferevent!"); + event_base_loopbreak(base); + return; + } + bufferevent_setcb(bev, NULL, conn_writecb, conn_eventcb, NULL); + bufferevent_enable(bev, EV_WRITE); + bufferevent_disable(bev, EV_READ); + + bufferevent_write(bev, MESSAGE, strlen(MESSAGE)); +} + +static void +conn_writecb(struct bufferevent *bev, void *user_data) +{ + struct evbuffer *output = bufferevent_get_output(bev); + if (evbuffer_get_length(output) == 0) { + printf("flushed answer\n"); + bufferevent_free(bev); + } +} + +static void +conn_eventcb(struct bufferevent *bev, short events, void *user_data) +{ + if (events & BEV_EVENT_EOF) { + printf("Connection closed.\n"); + } else if (events & BEV_EVENT_ERROR) { + printf("Got an error on the connection: %s\n", + strerror(errno));/*XXX win32*/ + } + /* None of the other events can happen here, since we haven't enabled + * timeouts */ + bufferevent_free(bev); +} + +static void +signal_cb(evutil_socket_t sig, short events, void *user_data) +{ + struct event_base *base = user_data; + struct timeval delay = { 2, 0 }; + + printf("Caught an interrupt signal; exiting cleanly in two seconds.\n"); + + event_base_loopexit(base, &delay); +} diff --git a/sample/tokio-time-test.c b/sample/tokio-time-test.c new file mode 100644 index 0000000..d4c8e6c --- /dev/null +++ b/sample/tokio-time-test.c @@ -0,0 +1,146 @@ +/* + * XXX This sample code was once meant to show how to use the basic Libevent + * interfaces, but it never worked on non-Unix platforms, and some of the + * interfaces have changed since it was first written. It should probably + * be removed or replaced with something better. + * + * Compile with: + * cc -I/usr/local/include -o time-test time-test.c -L/usr/local/lib -levent + */ + +#include + +#include + +#include +#include +#include +#include +#ifdef EVENT__HAVE_SYS_TIME_H +#include +#endif +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "tokio_event_base.h" + +struct timeval lasttime; +struct timeval lasttime_sigalrm; + +int event_is_persistent; + +struct event sigalrm_event; +struct event short_timeout_event; + +static double +get_elapsed(struct timeval *lasttime) +{ + struct timeval newtime, difference; + double elapsed; + + evutil_gettimeofday(&newtime, NULL); + evutil_timersub(&newtime, lasttime, &difference); + elapsed = difference.tv_sec + + (difference.tv_usec / 1.0e6); + *lasttime = newtime; + + return elapsed; +} + +static void +long_timeout_cb(evutil_socket_t fd, short event, void *arg) +{ + struct timeval newtime, difference; + struct event *timeout = arg; + double elapsed; + + evutil_gettimeofday(&newtime, NULL); + evutil_timersub(&newtime, &lasttime, &difference); + elapsed = difference.tv_sec + + (difference.tv_usec / 1.0e6); + + printf("long_timeout_cb called at %d: %.3f seconds elapsed.\n", + (int)newtime.tv_sec, elapsed); + lasttime = newtime; + + if (! event_is_persistent) { + struct timeval tv; + evutil_timerclear(&tv); + tv.tv_sec = 10; + event_add(timeout, &tv); + } +} + +static void +short_timeout_cb(evutil_socket_t fd, short event, void *arg) +{ + double elapsed; + + elapsed = get_elapsed(&lasttime_sigalrm); + printf("short_timeout_cb called at %d: %.3f seconds elapsed.\n", + (int)lasttime_sigalrm.tv_sec, + elapsed); + + event_add(&sigalrm_event, NULL); + alarm(1); +} + +static void +sigalrm_cb(evutil_socket_t nsignal, short event, void *arg) +{ + struct timeval tv; + + evutil_gettimeofday(&lasttime_sigalrm, NULL); + printf("siglarm_cb called at %d\n", (int)lasttime_sigalrm.tv_sec); + + evutil_timerclear(&tv); + event_add(&short_timeout_event, &tv); +} + +int +main(int argc, char **argv) +{ + struct event long_timeout_event; + struct timeval tv; + struct event_base *base; + int flags; + + if (argc == 2 && !strcmp(argv[1], "-p")) { + event_is_persistent = 1; + flags = EV_PERSIST; + } else { + event_is_persistent = 0; + flags = 0; + } + + /* Initalize the event library */ + base = tokio_event_base_new(); + + /* Initalize one event */ + event_assign(&long_timeout_event, base, -1, flags, long_timeout_cb, &long_timeout_event); + event_assign(&short_timeout_event, base, -1, flags, short_timeout_cb, NULL); + event_assign(&sigalrm_event, base, SIGALRM, EV_SIGNAL, sigalrm_cb, NULL); + + evutil_timerclear(&tv); + tv.tv_sec = 10; + event_add(&long_timeout_event, &tv); + event_add(&sigalrm_event, NULL); + + evutil_gettimeofday(&lasttime, NULL); + alarm(1); + + setbuf(stdout, NULL); + setbuf(stderr, NULL); + + event_base_dispatch(base); + + return (0); +} + diff --git a/sample/tokio_event_base.h b/sample/tokio_event_base.h new file mode 100644 index 0000000..4c20fdd --- /dev/null +++ b/sample/tokio_event_base.h @@ -0,0 +1,7 @@ +#pragma once + +/* + * Creates a new event_base and injects a single-threaded tokio runtime into + * it as the backend. + */ +extern struct event_base* tokio_event_base_new(void); diff --git a/src/base.rs b/src/base.rs index ab37c4e..136fdb2 100644 --- a/src/base.rs +++ b/src/base.rs @@ -1,14 +1,13 @@ #![allow(dead_code)] +use super::event::*; +use crate::EventCallbackWrapper; use bitflags::bitflags; use std::io; use std::os::raw::{c_int, c_short, c_void}; use std::ptr::NonNull; use std::time::Duration; -use super::event::*; -use crate::EventCallbackWrapper; - /// A file descriptor in libevent. pub type EvutilSocket = c_int; @@ -51,6 +50,12 @@ impl Base { } } + /// Replaces the standard libevent backend with an owned tokio runtime + #[cfg(feature = "tokio_backend")] + pub fn inject_tokio(&self, runtime: Box) { + super::tokio_backend::inject_tokio(self.base, runtime) + } + /// Creates a new instance of `Base` using a raw, non-null `event_base` /// pointer. /// diff --git a/src/lib.rs b/src/lib.rs index 66bb03b..f2887c6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,9 @@ pub use base::{ Base, EventCallbackCtx, EventCallbackFlags, EventFlags, EvutilSocket, ExitReason, LoopFlags, }; +#[cfg(feature = "tokio_backend")] +pub mod tokio_backend; + /// The context passed into `handle_wrapped_callback`, which handles event-type /// specific metadata for trampolining into the user-supplied closure. pub(crate) struct EventCallbackWrapper { diff --git a/src/tokio_backend/api.rs b/src/tokio_backend/api.rs new file mode 100644 index 0000000..69ae2ec --- /dev/null +++ b/src/tokio_backend/api.rs @@ -0,0 +1,201 @@ +use super::{ + backend::TokioBackend, + io::IoType, + runtime::{Runtime, TokioRuntime}, + BaseWrapper, +}; +use libevent_sys::size_t; +use std::{ + ffi::c_void, + os::{ + raw::{c_int, c_short}, + unix::io::RawFd, + }, + ptr::NonNull, + time::Duration, +}; + +const EVSEL: libevent_sys::eventop = libevent_sys::eventop { + name: "tokio".as_ptr().cast(), + init: Some(tokio_backend_init), + add: Some(tokio_backend_add), + del: Some(tokio_backend_del), + dispatch: Some(tokio_backend_dispatch), + dealloc: Some(tokio_backend_dealloc), + need_reinit: 1, + features: libevent_sys::event_method_feature_EV_FEATURE_FDS, + fdinfo_len: std::mem::size_of::() as size_t, +}; +const EVSIGSEL: libevent_sys::eventop = libevent_sys::eventop { + name: "tokio_signal".as_ptr().cast(), + init: None, + add: Some(tokio_signal_backend_add), + del: Some(tokio_signal_backend_del), + dispatch: None, + dealloc: None, + need_reinit: 0, + features: 0, + fdinfo_len: 0, +}; + +/// Injects a tokio backend with the given runtime into the given libevent instance. +/// +/// The libevent instance will already have an initialized backend. This +/// exisiting backend is deallocated before being replaced. +pub fn inject_tokio(mut base: NonNull, runtime: Box) { + let backend = Box::new(TokioBackend::new(runtime)); + let base = unsafe { base.as_mut() }; + + if let Some(evsel) = unsafe { base.evsel.as_ref() } { + if let Some(dealloc) = evsel.dealloc { + unsafe { + dealloc(base); + } + } + } + + base.evsel = &EVSEL; + base.evsigsel = &EVSIGSEL; + base.evbase = Box::into_raw(backend).cast(); +} + +/// Convenience method to allow injecting C programs with a tokio backend +#[no_mangle] +pub unsafe extern "C" fn tokio_event_base_new() -> *mut libevent_sys::event_base { + let base = NonNull::new(libevent_sys::event_base_new()); + + match base { + Some(base) => match TokioRuntime::new() { + Ok(runtime) => { + inject_tokio(base, Box::new(runtime)); + + base.as_ptr() + } + Err(error) => { + tracing::error!(?error, "failed to create a new Tokio runtime"); + + std::ptr::null_mut() + } + }, + None => std::ptr::null_mut(), + } +} + +/// libevent callback to initialize the backend +/// +/// This function would normally be called in `event_base_new`, but the tokio +/// backend is inject after that call. Therefore, this call would only happen +/// if the process is forked. That functionality is not currently supported. +#[no_mangle] +pub unsafe extern "C" fn tokio_backend_init(_base: *mut libevent_sys::event_base) -> *mut c_void { + unimplemented!("forking with a tokio backend") +} + +/// libevent callback to add an I/O event +#[no_mangle] +pub unsafe extern "C" fn tokio_backend_add( + eb: *mut libevent_sys::event_base, + fd: c_int, + _old: c_short, + events: c_short, + _fdinfo: *mut c_void, +) -> c_int { + if let Some(io_type) = IoType::from_events(events as u32) { + if let Some(base) = eb.as_ref() { + if let Some(backend) = (base.evbase as *mut TokioBackend).as_mut() { + return backend.add_io(BaseWrapper(eb), fd, io_type); + } + } + } + + -1 +} + +/// libevent callback to remove an I/O event +#[no_mangle] +unsafe extern "C" fn tokio_backend_del( + base: *mut libevent_sys::event_base, + fd: c_int, + _old: c_short, + events: c_short, + _fdinfo: *mut c_void, +) -> c_int { + if let Some(base) = base.as_ref() { + if let Some(backend) = (base.evbase as *mut TokioBackend).as_mut() { + if let Some(io_type) = IoType::from_events(events as u32) { + return backend.del_io(fd, io_type); + } + } + } + + -1 +} + +/// libevent callback to drive the event loop +#[no_mangle] +unsafe extern "C" fn tokio_backend_dispatch( + eb: *mut libevent_sys::event_base, + tv: *mut libevent_sys::timeval, +) -> c_int { + if let Some(base) = eb.as_ref() { + if let Some(backend) = (base.evbase as *mut TokioBackend).as_mut() { + let timeout = tv.as_ref().map(|tv| { + Duration::from_secs(tv.tv_sec as u64) + .saturating_add(Duration::from_micros(tv.tv_usec as u64)) + }); + + backend.dispatch(eb, timeout); + + return 0; + } + } + + -1 +} + +/// libevent callback to deallocate the backend +#[no_mangle] +pub unsafe extern "C" fn tokio_backend_dealloc(base: *mut libevent_sys::event_base) { + if let Some(base) = base.as_mut() { + Box::from_raw(base.evbase); + base.evbase = std::ptr::null_mut(); + } +} + +/// libevent callback to add a signal event +#[no_mangle] +pub unsafe extern "C" fn tokio_signal_backend_add( + eb: *mut libevent_sys::event_base, + signum: c_int, + _old: c_short, + events: c_short, + _fdinfo: *mut c_void, +) -> c_int { + if events as u32 & libevent_sys::EV_SIGNAL != 0 { + if let Some(base) = eb.as_ref() { + if let Some(backend) = (base.evbase as *mut TokioBackend).as_mut() { + return backend.add_signal(BaseWrapper(eb), signum); + } + } + } + + -1 +} + +/// libevent callback to remove a signal event +#[no_mangle] +unsafe extern "C" fn tokio_signal_backend_del( + base: *mut libevent_sys::event_base, + fd: c_int, + _old: c_short, + _events: c_short, + _fdinfo: *mut c_void, +) -> c_int { + if let Some(base) = base.as_ref() { + if let Some(backend) = (base.evbase as *mut TokioBackend).as_mut() { + return backend.del_signal(fd); + } + } + + -1 +} diff --git a/src/tokio_backend/backend.rs b/src/tokio_backend/backend.rs new file mode 100644 index 0000000..4c39a34 --- /dev/null +++ b/src/tokio_backend/backend.rs @@ -0,0 +1,152 @@ +use super::{ + io::{IoMap, IoType}, + runtime::Runtime, + signal::SignalMap, + BaseWrapper, +}; +use crate::tokio_backend::runtime::{JoinFuture, YieldFuture}; +use std::{os::raw::c_int, sync::Arc, time::Duration}; +use tokio::{sync::Notify, task::JoinHandle}; + +/// Implements a libevent backend using a tokio runtime +pub struct TokioBackend { + /// Option tokio runtime to maintain ownership + runtime: Box, + /// Notifies the dispatch loop that it should yield back to libevent + dispatch_notify: Arc, + /// Map of futures for registered I/O events + io_map: IoMap, + /// Map of futures for registered signals + signal_map: SignalMap, +} + +impl TokioBackend { + /// Create a new libevent backend using the provided runtime + pub fn new(runtime: Box) -> Self { + let dispatch_notify = Arc::new(Notify::new()); + let io_map = IoMap::new(); + let signal_map = SignalMap::new(); + + Self { + runtime, + dispatch_notify, + io_map, + signal_map, + } + } + + /// Creates a task to service a libevent I/O request + /// + /// A task must continue to service the file descriptor events until + /// explicitly removed. Space is allocated by libevent that is used + /// to store an Arc object for clean shutdown of the created + /// task. + /// + /// AsyncFd is used to assess read and write readiness of the + /// file descriptor. All higher level funcitonality like socket listening + /// and DNS request rely on these readiness notifications, but they + /// otherwise function using unchanged libevent code. + pub(crate) fn add_io(&mut self, base: BaseWrapper, fd: c_int, io_type: IoType) -> c_int { + tracing::debug!(fd, ?io_type, "add an I/O event"); + + let _guard = self.runtime.enter(); + + match self + .io_map + .add(base, fd, io_type, self.dispatch_notify.clone()) + { + Ok(_) => 0, + Err(error) => { + tracing::error!(?error); + -1 + } + } + } + + /// Blocks on the given join handle + fn join(&self, join_handle: JoinHandle<()>) { + let future = JoinFuture::new(join_handle); + + self.runtime.join(future); + } + + /// Terminates an active I/O task + pub fn del_io(&mut self, fd: c_int, io_type: IoType) -> c_int { + tracing::debug!(fd, ?io_type, "delete an I/O event"); + + if let Ok(result) = self.io_map.del(fd, io_type) { + if let Some(join_handle) = result { + self.join(join_handle); + } + + 0 + } else { + -1 + } + } + + /// Creates a task to service a libevent signal request + /// + /// A task must continue to provide signal notifications until explicitly + /// removed. Note that libevent does not provide user data per signal + /// event. Therefore, signals are mapped to notifications in TokioBackend + /// to allow for clean task shutdown. + /// + /// Since the tokio signal handler is installed globally. It is safe to + /// handle signals with both libevent and using tokio directly. + pub(crate) fn add_signal(&mut self, base: BaseWrapper, signum: c_int) -> c_int { + tracing::debug!(signum, "add a signal event"); + + let _guard = self.runtime.enter(); + + match self + .signal_map + .add(base, signum, self.dispatch_notify.clone()) + { + Ok(_) => 0, + Err(error) => { + tracing::error!(?error); + -1 + } + } + } + + /// Terminates an active signal task + pub fn del_signal(&mut self, signum: c_int) -> c_int { + tracing::debug!(signum, "delete a signal event"); + + if let Ok(join_handle) = self.signal_map.del(signum) { + self.join(join_handle); + 0 + } else { + -1 + } + } + + /// Drive the tokio runtime with an optional duration for timout events + pub fn dispatch(&mut self, _base: *mut libevent_sys::event_base, timeout: Option) { + tracing::trace!(?timeout, "dispatch events"); + + let _guard = self.runtime.enter(); + + match timeout { + Some(duration) => { + if duration.is_zero() { + let future = YieldFuture::default(); + + self.runtime.dispatch_yield(future); + } else { + let future = self.dispatch_notify.notified(); + let future = tokio::time::timeout(duration, future); + + self.runtime.dispatch_timeout(future); + } + } + None => { + let future = self.dispatch_notify.notified(); + + self.runtime.dispatch_notify(future); + } + } + } +} diff --git a/src/tokio_backend/io.rs b/src/tokio_backend/io.rs new file mode 100644 index 0000000..d6977c1 --- /dev/null +++ b/src/tokio_backend/io.rs @@ -0,0 +1,207 @@ +use super::BaseWrapper; +use std::{ + collections::HashMap, + os::unix::prelude::*, + sync::{ + atomic::{AtomicI32, Ordering}, + Arc, + }, +}; +use tokio::{ + io::{unix::AsyncFd, Interest}, + sync::Notify, + task::JoinHandle, +}; + +/// Manages adding and removing I/O event tasks +#[derive(Debug)] +pub struct IoMap { + inner: HashMap, +} + +#[derive(Clone, Debug)] +pub enum IoType { + Read, + ReadWrite, + Write, +} + +#[derive(Debug)] +struct IoContext { + notify: Notify, + nread: AtomicI32, + nwrite: AtomicI32, +} + +#[derive(Debug)] +struct IoEntry { + context: Arc, + join_handle: JoinHandle<()>, +} + +impl IoMap { + pub(crate) fn new() -> Self { + Self { + inner: HashMap::new(), + } + } + + pub(crate) fn add( + &mut self, + base: BaseWrapper, + fd: RawFd, + io_type: IoType, + dispatch_notify: Arc, + ) -> std::io::Result<()> { + match self.inner.get(&fd) { + Some(entry) => { + if io_type.is_read() { + entry.context.nread.fetch_add(1, Ordering::AcqRel); + } + + if io_type.is_write() { + entry.context.nwrite.fetch_add(1, Ordering::AcqRel); + } + + entry.context.notify.notify_one(); + } + None => { + let context = Arc::new(IoContext { + notify: Notify::new(), + nread: AtomicI32::new(if io_type.is_read() { 1 } else { 0 }), + nwrite: AtomicI32::new(if io_type.is_write() { 1 } else { 0 }), + }); + let async_fd = AsyncFd::new(fd)?; + let join_handle = tokio::spawn(io_task( + async_fd, + base, + context.clone(), + dispatch_notify.clone(), + )); + let entry = IoEntry { + context, + join_handle, + }; + + self.inner.insert(fd, entry); + } + } + + Ok(()) + } + + pub fn del(&mut self, fd: RawFd, io_type: IoType) -> Result>, ()> { + let total = { + let entry = self.inner.get_mut(&fd).ok_or_else(|| ())?; + + let nread = if io_type.is_read() { + entry.context.nread.fetch_sub(1, Ordering::AcqRel) + } else { + entry.context.nread.load(Ordering::Acquire) + }; + assert!(nread >= 0); + + let nwrite = if io_type.is_write() { + entry.context.nwrite.fetch_sub(1, Ordering::AcqRel) + } else { + entry.context.nread.load(Ordering::Acquire) + }; + assert!(nwrite >= 0); + + entry.context.notify.notify_one(); + + nread + nwrite + }; + + Ok(if total > 0 { + let entry = self.inner.remove(&fd).unwrap(); + + Some(entry.join_handle) + } else { + None + }) + } +} + +impl IoType { + pub fn from_events(events: u32) -> Option { + let is_read = events & libevent_sys::EV_READ != 0; + let is_write = events & libevent_sys::EV_WRITE != 0; + + if is_read && is_write { + Some(IoType::ReadWrite) + } else if is_read { + Some(IoType::Read) + } else if is_write { + Some(IoType::Write) + } else { + None + } + } + + pub fn is_read(&self) -> bool { + match self { + IoType::Read => true, + IoType::ReadWrite => true, + IoType::Write => false, + } + } + + pub fn is_write(&self) -> bool { + match self { + IoType::Read => false, + IoType::ReadWrite => true, + IoType::Write => true, + } + } +} + +impl From for Interest { + fn from(io_type: IoType) -> Self { + match io_type { + IoType::Read => Interest::READABLE, + IoType::Write => Interest::WRITABLE, + IoType::ReadWrite => Interest::READABLE.add(Interest::WRITABLE), + } + } +} + +async fn io_task( + async_fd: AsyncFd, + base: BaseWrapper, + context: Arc, + dispatch_notify: Arc, +) { + let fd = async_fd.as_raw_fd(); + + loop { + tokio::select! { + _ = context.notify.notified() => { + let total = context.nread.load(Ordering::Acquire) + context.nwrite.load(Ordering::Acquire); + + if total == 0 { + tracing::debug!(fd, "I/O task removed"); + return; + } + }, + result = async_fd.readable(), if context.nread.load(Ordering::Acquire) > 0 => { + if let Ok(mut guard) = result { + unsafe { + libevent_sys::evmap_io_active_(base.0, fd, libevent_sys::EV_READ as i16); + } + guard.clear_ready(); + dispatch_notify.notify_one(); + } + }, + result = async_fd.writable(), if context.nwrite.load(Ordering::Acquire) > 0 => { + if let Ok(mut guard) = result { + unsafe { + libevent_sys::evmap_io_active_(base.0, fd, libevent_sys::EV_WRITE as i16); + } + guard.clear_ready(); + dispatch_notify.notify_one(); + } + }, + } + } +} diff --git a/src/tokio_backend/mod.rs b/src/tokio_backend/mod.rs new file mode 100644 index 0000000..2d5bb5a --- /dev/null +++ b/src/tokio_backend/mod.rs @@ -0,0 +1,16 @@ +mod api; +mod backend; +mod io; +mod runtime; +mod signal; + +pub use api::inject_tokio; +pub use runtime::{JoinFuture, Runtime, TokioRuntime, YieldFuture}; + +/// Wrapper to allow sending of raw event_base pointers to tokio tasks. +/// +/// This is safe because libevent performs locking internally. +#[derive(Debug)] +pub(crate) struct BaseWrapper(*mut libevent_sys::event_base); + +unsafe impl Send for BaseWrapper {} diff --git a/src/tokio_backend/runtime.rs b/src/tokio_backend/runtime.rs new file mode 100644 index 0000000..fef5da9 --- /dev/null +++ b/src/tokio_backend/runtime.rs @@ -0,0 +1,100 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::{ + sync::futures::Notified, + task::{JoinError, JoinHandle}, + time::Timeout, +}; + +/// Runtime interface for dealing with various runtime ownership scenarios +pub trait Runtime { + fn enter(&self) -> tokio::runtime::EnterGuard<'_>; + fn join(&self, future: JoinFuture); + fn dispatch_yield(&self, future: YieldFuture); + fn dispatch_notify(&self, future: Notified<'_>); + fn dispatch_timeout(&self, future: Timeout>); +} + +pub struct JoinFuture { + join_handle: JoinHandle<()>, +} + +impl JoinFuture { + pub fn new(join_handle: JoinHandle<()>) -> Self { + Self { join_handle } + } +} + +impl Future for JoinFuture { + type Output = Result<(), JoinError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let join_handle = Pin::new(&mut self.get_mut().join_handle); + + join_handle.poll(cx) + } +} + +pub struct YieldFuture(u8); + +impl Default for YieldFuture { + fn default() -> Self { + Self(0) + } +} + +impl Future for YieldFuture { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let counter = self.0; + + if counter < 2 { + self.get_mut().0 += 1; + cx.waker().wake_by_ref(); + + Poll::Pending + } else { + Poll::Ready(()) + } + } +} + +pub struct TokioRuntime { + inner: tokio::runtime::Runtime, +} + +impl TokioRuntime { + pub fn new() -> std::io::Result { + let inner = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + Ok(Self { inner }) + } +} + +impl Runtime for TokioRuntime { + fn enter(&self) -> tokio::runtime::EnterGuard<'_> { + self.inner.enter() + } + + fn join(&self, future: JoinFuture) { + let _ = self.inner.block_on(future); + } + + fn dispatch_yield(&self, future: YieldFuture) { + self.inner.block_on(future); + } + + fn dispatch_notify(&self, future: Notified<'_>) { + self.inner.block_on(future); + } + + fn dispatch_timeout(&self, future: Timeout>) { + let _ = self.inner.block_on(future); + } +} diff --git a/src/tokio_backend/signal.rs b/src/tokio_backend/signal.rs new file mode 100644 index 0000000..6bf69b5 --- /dev/null +++ b/src/tokio_backend/signal.rs @@ -0,0 +1,65 @@ +use super::BaseWrapper; +use std::{collections::HashMap, os::raw::c_int, sync::Arc}; +use tokio::{ + signal::unix::{signal, SignalKind}, + sync::Notify, + task::JoinHandle, +}; + +/// Manages adding and removing signal event tasks +#[derive(Debug)] +pub struct SignalMap { + inner: HashMap, JoinHandle<()>)>, +} + +impl SignalMap { + pub(crate) fn new() -> Self { + Self { + inner: HashMap::new(), + } + } + + pub(crate) fn add( + &mut self, + base: BaseWrapper, + signum: c_int, + dispatch_notify: Arc, + ) -> std::io::Result<()> { + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + let mut stream = signal(SignalKind::from_raw(signum))?; + let join_handle = tokio::spawn(async move { + loop { + tokio::select! { + result = stream.recv() => { + if result.is_some() { + unsafe { + libevent_sys::evmap_signal_active_(base.0, signum, 1); + } + dispatch_notify.notify_one(); + } else { + tracing::error!("signal stream has closed"); + break; + } + }, + _ = notify.notified() => { + break; + } + } + } + }); + + self.inner.insert(signum, (notify_clone, join_handle)); + + Ok(()) + } + + pub fn del(&mut self, signum: c_int) -> Result, ()> { + if let Some((notify, join_handle)) = self.inner.remove(&signum) { + notify.notify_one(); + Ok(join_handle) + } else { + Err(()) + } + } +}