diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 94a56e76..9be7230e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -42,8 +42,6 @@ if (${CMAKE_SYSTEM_NAME} STREQUAL "Linux") add_compile_options(-DHAVE_URING=1) include_directories(${LIBURING_INCLUDE_DIRS}) link_libraries(${LIBURING_LIBRARY}) - else () - add_compile_options(-DHAVE_URING=0) endif() add_compile_options(-DHAVE_EPOLL=1) @@ -113,6 +111,8 @@ else() ${CJSON_INCLUDE_DIRS} ) + add_compile_options(-DHAVE_KQUEUE=1) + # # Library directories # diff --git a/src/libpgagroal/ev.c b/src/libpgagroal/ev.c index 194cb294..2c7167a0 100644 --- a/src/libpgagroal/ev.c +++ b/src/libpgagroal/ev.c @@ -48,9 +48,15 @@ #include #include #endif +#if HAVE_EPOLL #include +#endif #include #include +#if HAVE_KQUEUE +#include +#include +#endif #define FALLBACK_BACKEND "epoll" #define TYPEOF(watcher) watcher->io->type @@ -460,15 +466,7 @@ int pgagroal_ev_signal_start(struct ev_loop* ev, struct ev_signal* w) { sigaddset(&ev->sigset, w->signum); - - // if (pthread_sigmask(SIG_BLOCK, &ev->sigset, NULL) == -1) - // { - // pgagroal_log_error("%s: pthread_sigmask failed\n", __func__); - // return EV_ERROR; - // } - list_add(w, ev->shead.next); - return EV_OK; } @@ -536,23 +534,6 @@ pgagroal_ev_periodic_stop(struct ev_loop* ev, struct ev_periodic* target) return ret; } -/* - * TODO: when developing config, make sure that the user has the backend that she claims to have. - * When it gets to this point, checks will not be made any longer, unless inside specific __ev_function. - * These functions will contain preprocessor checks so that the compilation does not break... - * - * int (*init)(struct ev_loop *); - * int (*loop)(struct ev_loop *); - * int (*io_start)(struct ev_loop*, struct ev_io*); - * int (*io_stop)(struct ev_loop*, struct ev_io*); - * int (*signal_init)(struct ev_loop*, struct ev_signal*); - * int (*signal_start)(struct ev_loop*, struct ev_signal*); - * int (*signal_stop)(struct ev_loop*, struct ev_signal*); - * int (*periodic_init)(struct ev_loop*, struct ev_periodic*); - * int (*periodic_start)(struct ev_loop*, struct ev_periodic*); - * int (*periodic_stop)(struct ev_loop*, struct ev_periodic*); - * - */ static int setup_ops(struct ev_loop* ev) { @@ -635,7 +616,7 @@ setup_context(struct ev_context* ctx, struct configuration* config) ctx->backend = backend_value(config->ev_backend); if (!ctx->backend) { - pgagroal_log_warn("ev_backend '%s' not supported", config->ev_backend); + pgagroal_log_warn("ev_backend option '%s' not recognized", config->ev_backend); pgagroal_log_warn("Selected default: '%s'", FALLBACK_BACKEND); strcpy(config->ev_backend, FALLBACK_BACKEND); ctx->backend = EV_BACKEND_EPOLL; @@ -669,16 +650,7 @@ setup_context(struct ev_context* ctx, struct configuration* config) if (ctx->backend == EV_BACKEND_IO_URING) { #if HAVE_URING - /* set opts */ - // ctx->napi = config.napi; - // ctx->sqpoll = config.sqpoll; - // ctx->use_huge = config.use_huge; - // ctx->defer_tw = config.defer_tw; - // ctx->snd_ring = config.snd_ring; - // ctx->snd_bundle = config.snd_bundle; - // ctx->fixed_files = config.fixed_files; - // ctx->no_use_buffers = config.no_use_buffers; - // ctx->buf_count = config.buf_count; + /* TODO set opts */ /* asserts */ if (ctx->defer_tw && ctx->sqpoll) @@ -1698,3 +1670,400 @@ __epoll_send_handler(struct ev_loop* ev, struct ev_io* w) } #endif +#if HAVE_KQUEUE + +int +__kqueue_loop(struct ev_loop* ev) +{ + int ret; + int nfds; + struct kevent events[MAX_EVENTS]; + struct timespec timeout; + timeout.tv_sec = 0; + timeout.tv_nsec = 10000000; // 10 ms + + set_running(ev); + while (is_running(ev)) + { + nfds = kevent(ev->kqueuefd, NULL, 0, events, MAX_EVENTS, &timeout); + + if (nfds == -1) + { + if (errno == EINTR) + { + continue; + } + pgagroal_log_error("kevent"); + return EV_ERROR; + } + + if (!is_running(ev)) + { + break; + } + for (int i = 0; i < nfds; i++) + { + ret = __kqueue_handler(ev, &events[i]); + if (ret == EV_ERROR) + { + pgagroal_log_error("handler error"); + return EV_ERROR; + } + } + } + return EV_OK; +} + +static int +__kqueue_init(struct ev_loop* ev) +{ + ev->buffer = malloc(sizeof(char) * (MAX_BUFFER_SIZE)); + ev->kqueuefd = kqueue(); + if (ev->kqueuefd == -1) + { + pgagroal_log_error("kqueue init error"); + return EV_ERROR; + } + return EV_OK; +} + +static int +__kqueue_fork(struct ev_loop** parent_loop) +{ + /* TODO: Destroy everything related to loop */ + close((*parent_loop)->kqueuefd); + return EV_OK; +} + +static int +__kqueue_destroy(struct ev_loop* ev) +{ + close(ev->kqueuefd); + free(ev->buffer); + free(ev); + return EV_OK; +} + +static int +__kqueue_handler(struct ev_loop* ev, void* udata) +{ + struct kevent* kev = (struct kevent*)udata; + + if (kev->filter == EVFILT_TIMER) + { + return __kqueue_periodic_handler(ev, kev); + } + else if (kev->filter == EVFILT_SIGNAL) + { + return __kqueue_signal_handler(ev, kev); + } + else if (kev->filter == EVFILT_READ || kev->filter == EVFILT_WRITE) + { + return __kqueue_io_handler(ev, kev); + } + else + { + pgagroal_log_error("Unknown filter in handler"); + return EV_ERROR; + } +} + +static int +__kqueue_signal_start(struct ev_loop* ev, struct ev_signal* w) +{ + struct kevent kev; + + EV_SET(&kev, w->signum, EVFILT_SIGNAL, EV_ADD, 0, 0, w); + if (kevent(ev->kqueuefd, &kev, 1, NULL, 0, NULL) == -1) + { + pgagroal_log_error("kevent: signal add"); + return EV_ERROR; + } + return EV_OK; +} + +static int +__kqueue_signal_stop(struct ev_loop* ev, struct ev_signal* w) +{ + struct kevent kev; + + EV_SET(&kev, w->signum, EVFILT_SIGNAL, EV_DELETE, 0, 0, w); + if (kevent(ev->kqueuefd, &kev, 1, NULL, 0, NULL) == -1) + { + pgagroal_log_error("kevent: signal delete"); + return EV_ERROR; + } + return EV_OK; +} + +static int +__kqueue_signal_handler(struct ev_loop* ev, struct kevent* kev) +{ + struct ev_signal* w = (struct ev_signal*)kev->udata; + + if (w->signum == (int)kev->ident) + { + w->cb(ev, w, 0); + return EV_OK; + } + else + { + pgagroal_log_error("No handler found for signal %d", (int)kev->ident); + return EV_ERROR; + } +} + +static int +__kqueue_periodic_init(struct ev_periodic* w, int msec) +{ + w->interval = msec; // Store interval in milliseconds + return EV_OK; +} + +static int +__kqueue_periodic_start(struct ev_loop* ev, struct ev_periodic* w) +{ + static int timer_id = 1; // Start from 1 to avoid using 0 + struct kevent kev; + + w->fd = timer_id++; // Assign unique id to w->fd + + EV_SET(&kev, w->fd, EVFILT_TIMER, EV_ADD | EV_ENABLE, NOTE_MSECONDS, w->interval, w); + + if (kevent(ev->kqueuefd, &kev, 1, NULL, 0, NULL) == -1) + { + pgagroal_log_error("kevent: timer add"); + return EV_ERROR; + } + + return EV_OK; +} + +static int +__kqueue_periodic_stop(struct ev_loop* ev, struct ev_periodic* w) +{ + struct kevent kev; + + EV_SET(&kev, w->fd, EVFILT_TIMER, EV_DELETE, 0, 0, NULL); + + if (kevent(ev->kqueuefd, &kev, 1, NULL, 0, NULL) == -1) + { + pgagroal_log_error("kevent: timer delete"); + return EV_ERROR; + } + + return EV_OK; +} + +static int +__kqueue_periodic_handler(struct ev_loop* ev, struct kevent* kev) +{ + struct ev_periodic* w = (struct ev_periodic*)kev->udata; + + w->cb(ev, w, 0); + + return EV_OK; +} + +static int +__kqueue_io_start(struct ev_loop* ev, struct ev_io* w) +{ + struct kevent kev; + int filter; + + switch (w->type) + { + case EV_ACCEPT: + case EV_RECEIVE: + filter = EVFILT_READ; + break; + case EV_SEND: + filter = EVFILT_WRITE; + break; + default: + pgagroal_log_fatal("%s: unknown event type: %d\n", __func__, w->type); + return EV_ERROR; + } + + if (set_non_blocking(w->fd)) + { + pgagroal_log_fatal("%s: set_non_blocking", __func__); + return EV_ERROR; + } + + EV_SET(&kev, w->fd, filter, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, w); + + if (kevent(ev->kqueuefd, &kev, 1, NULL, 0, NULL) == -1) + { + pgagroal_log_error("%s: kevent add failed", __func__); + return EV_ERROR; + } + + return EV_OK; +} + +static int +__kqueue_io_stop(struct ev_loop* ev, struct ev_io* w) +{ + struct kevent kev; + int filter; + + switch (w->type) + { + case EV_ACCEPT: + case EV_RECEIVE: + filter = EVFILT_READ; + break; + case EV_SEND: + filter = EVFILT_WRITE; + break; + default: + pgagroal_log_fatal("%s: unknown event type: %d\n", __func__, w->type); + return EV_ERROR; + } + + EV_SET(&kev, w->fd, filter, EV_DELETE, 0, 0, NULL); + + if (kevent(ev->kqueuefd, &kev, 1, NULL, 0, NULL) == -1) + { + pgagroal_log_error("%s: kevent delete failed", __func__); + return EV_ERROR; + } + + return EV_OK; +} + +static int +__kqueue_io_handler(struct ev_loop* ev, struct kevent* kev) +{ + struct ev_io* w = (struct ev_io*)kev->udata; + int ret = EV_OK; + + switch (w->type) + { + case EV_ACCEPT: + ret = __kqueue_accept_handler(ev, w); + break; + case EV_SEND: + ret = __kqueue_send_handler(ev, w); + break; + case EV_RECEIVE: + ret = __kqueue_receive_handler(ev, w); + if (ret == EV_CLOSE_FD) + { + __kqueue_io_stop(ev, w); + } + break; + default: + pgagroal_log_fatal("%s: unknown value for event type %d\n", __func__, w->type); + ret = EV_ERROR; + break; + } + + return ret; +} + +static int +__kqueue_receive_handler(struct ev_loop* ev, struct ev_io* w) +{ + int ret = EV_OK; + ssize_t nrecv = 0; + size_t total_recv = 0; + void* buf = ev->buffer; + + if (!buf) + { + pgagroal_log_error("malloc error"); + return EV_ALLOC_ERROR; + } + + if (!w->ssl) + { + while (1) + { + nrecv = recv(w->fd, buf + total_recv, MAX_BUFFER_SIZE - total_recv, 0); + if (nrecv == -1) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + break; + } + else + { + pgagroal_log_error("receive_handler: recv"); + ret = EV_ERROR; + break; + } + } + else if (nrecv == 0) + { + ret = EV_CLOSE_FD; + pgagroal_log_info("Connection closed fd=%d client_fd=%d\n", w->fd, w->client_fd); + break; + } + + total_recv += nrecv; + + if (total_recv >= MAX_BUFFER_SIZE) + { + pgagroal_log_error("receive_handler: buffer overflow"); + ret = EV_ERROR; + break; + } + } + + w->data = buf; + w->size = total_recv; + } + + w->cb(ev, w, ret); + return ret; +} + +static int +__kqueue_accept_handler(struct ev_loop* ev, struct ev_io* w) +{ + int ret = EV_OK; + int listen_fd = w->fd; + + while (1) + { + w->client_fd = accept(listen_fd, NULL, NULL); + if (w->client_fd == -1) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + ret = EV_OK; + break; + } + else + { + pgagroal_log_error("accept_handler: accept"); + ret = EV_ERROR; + break; + } + } + else + { + w->cb(ev, w, ret); + } + } + + return ret; +} + +static int +__kqueue_send_handler(struct ev_loop* ev, struct ev_io* w) +{ + int ret = EV_OK; + ssize_t nsent; + size_t total_sent = 0; + int fd = w->fd; + void* buf = w->data; + size_t buf_len = w->size; + /* TODO ? */ + return ret; +} + +#endif +