From 2a17eb7cbbb46d7cbc59cb2eaaf0b9eb02e8f197 Mon Sep 17 00:00:00 2001 From: Christian Huitema Date: Mon, 12 Feb 2024 20:31:44 -0800 Subject: [PATCH] Add wake up function to packet loop --- picoquic/picoquic_packet_loop.h | 71 +++++-- picoquic/sockloop.c | 315 +++++++++++++++++++++++++------- 2 files changed, 303 insertions(+), 83 deletions(-) diff --git a/picoquic/picoquic_packet_loop.h b/picoquic/picoquic_packet_loop.h index 09c08ac78..838f55beb 100644 --- a/picoquic/picoquic_packet_loop.h +++ b/picoquic/picoquic_packet_loop.h @@ -24,6 +24,7 @@ #include "picosocks.h" #include "picoquic.h" +#include "picoquic_utils.h" #ifdef __cplusplus extern "C" { @@ -75,9 +76,19 @@ typedef enum { picoquic_packet_loop_after_receive, /* Argument type size_t*: nb packets received */ picoquic_packet_loop_after_send, /* Argument type size_t*: nb packets sent */ picoquic_packet_loop_port_update, /* argument type struct_sockaddr*: new address for wakeup */ - picoquic_packet_loop_time_check /* argument type . Optional. */ + picoquic_packet_loop_time_check, /* argument type packet_loop_time_check_arg_t*. Optional. */ + picoquic_packet_loop_wake_up /* no argument (void* NULL). Used when loop wakeup is supported */ } picoquic_packet_loop_cb_enum; +/* The time check option passes as argument a pointer to a structure specifying +* the current time and the proposed delta. The application uses the specified +* current time to compute an updated delta. +*/ +typedef struct st_packet_loop_time_check_arg_t { + uint64_t current_time; + int64_t delta_t; +} packet_loop_time_check_arg_t; + typedef int (*picoquic_packet_loop_cb_fn)(picoquic_quic_t * quic, picoquic_packet_loop_cb_enum cb_mode, void * callback_ctx, void * callback_argv); /* Packet loop option list shows support by application of optional features. @@ -88,15 +99,6 @@ typedef struct st_picoquic_packet_loop_options_t { int do_time_check : 1; /* App should be polled for next time before sock select */ } picoquic_packet_loop_options_t; -/* The time check option passes as argument a pointer to a structure specifying - * the current time and the proposed delta. The application uses the specified - * current time to compute an updated delta. - */ -typedef struct st_packet_loop_time_check_arg_t { - uint64_t current_time; - int64_t delta_t; -} packet_loop_time_check_arg_t; - /* Version 2 of packet loop, works in progress. * Parameters are set in a struct, for future * extensibility. @@ -117,8 +119,53 @@ int picoquic_packet_loop_v2(picoquic_quic_t* quic, picoquic_packet_loop_cb_fn loop_callback, void * loop_callback_ctx); -/* Two versions of the packet loop, one portable and one speciailezed - * for winsock. +/* Threaded version of packet loop, when running picoquic in a background thread. +* +* Thread is started by calling picoquic_start_network_thread, which +* returns an argument of type picoquic_network_thread_ctx_t. Returns a NULL +* pointer if the thread could not be created. +* +* If the application needs to post new data or otherwise interact with +* the quic connections, it should call picoquic_wake_up_network_thread, +* passing the thread context as an argument. This with trigger a +* callback of type `picoquic_packet_loop_wake_up`, which executes +* in the context of the network thread. Picoquic APIs can be called +* in this context without worrying about concurrency issues. +* +* If the application wants to close the network thread, it calls +* picoquic_close_network_thread, passing the thread context as an argument. +* The network thread context will be freed during that call. +*/ + +typedef struct st_picoquic_network_thread_ctx_t { + picoquic_quic_t* quic; + picoquic_packet_loop_param_t* param; + picoquic_packet_loop_cb_fn loop_callback; + void* loop_callback_ctx; + picoquic_thread_t thread_id; +#ifdef _WINDOWS + HANDLE wake_up_event; +#else + int wake_up_pipe_fd[2]; +#endif + int wake_up_defined; + volatile int thread_should_close; +} picoquic_network_thread_ctx_t; + +picoquic_network_thread_ctx_t* picoquic_start_network_thread( + picoquic_quic_t* quic, + picoquic_packet_loop_param_t* param, + picoquic_packet_loop_cb_fn loop_callback, + void* loop_callback_ctx, + int * ret); + +int picoquic_wake_up_network_thread(picoquic_network_thread_ctx_t* thread_ctx); +void picoquic_delete_network_thread(picoquic_network_thread_ctx_t* thread_ctx); + + +/* Legacy versions the packet loop, one portable and one specialized + * for winsock. Keeping these API for compatibility, but the implementation + * redirects to picoquic_packet_loop_v2. */ int picoquic_packet_loop(picoquic_quic_t* quic, int local_port, diff --git a/picoquic/sockloop.c b/picoquic/sockloop.c index ad2193d39..17177a1c5 100644 --- a/picoquic/sockloop.c +++ b/picoquic/sockloop.c @@ -456,8 +456,6 @@ int picoquic_packet_loop_open_sockets(uint16_t local_port, int local_af, int soc } /* -* TODO: accomodate both Windows and Unix style. -* * Windows: use asynchronous receive. Asynchronous receive requires * declaring an overlap context and event per socket, as well as a * buffer per socket. This should probably be an option. Instead of @@ -487,51 +485,71 @@ int picoquic_packet_loop_wait(picoquic_socket_ctx_t* s_ctx, unsigned char* received_ecn, uint8_t** received_buffer, int64_t delta_t, + int* is_wake_up_event, + picoquic_network_thread_ctx_t * thread_ctx, int* socket_rank) { int bytes_recv = 0; - HANDLE events[4]; + HANDLE events[5]; DWORD ret_event; DWORD nb_events = 0; + int wake_up_event_rank = -1; DWORD dwDeltaT = (DWORD)((delta_t <= 0)? 0: (delta_t / 1000)); for (int i = 0; i < 4 && i < nb_sockets; i++) { events[i] = s_ctx[i].overlap.hEvent; nb_events++; } + *is_wake_up_event = 0; + if (thread_ctx->wake_up_defined) { + wake_up_event_rank = nb_events; + events[nb_events] = thread_ctx->wake_up_event; + nb_events++; + } - ret_event = WSAWaitForMultipleEvents(nb_sockets, events, FALSE, dwDeltaT, TRUE); + ret_event = WSAWaitForMultipleEvents(nb_events, events, FALSE, dwDeltaT, TRUE); if (ret_event == WSA_WAIT_FAILED) { DBG_PRINTF("WSAWaitForMultipleEvents fails, error 0x%x", WSAGetLastError()); bytes_recv = -1; } else if (ret_event >= WSA_WAIT_EVENT_0) { - *socket_rank = ret_event - WSA_WAIT_EVENT_0; - /* if received data on a socket, process it. */ - if (*socket_rank < nb_sockets) { - /* Received data on socket i */ - int ret = picoquic_win_recvmsg_async_finish(&s_ctx[*socket_rank]); - ResetEvent(s_ctx[*socket_rank].overlap.hEvent); - - if (ret != 0) { - DBG_PRINTF("%s", "Cannot finish async recv"); - bytes_recv = -1; - } - else { - bytes_recv = s_ctx[*socket_rank].bytes_recv; - *received_ecn = s_ctx[*socket_rank].received_ecn; - *received_buffer = s_ctx[*socket_rank].recv_buffer; - picoquic_store_addr(addr_dest, (struct sockaddr*)&s_ctx[*socket_rank].addr_dest); - picoquic_store_addr(addr_from, (struct sockaddr*)&s_ctx[*socket_rank].addr_from); - /* Document incoming port */ - if (addr_dest->ss_family == AF_INET6) { - ((struct sockaddr_in6*) addr_dest)->sin6_port = htons(s_ctx[*socket_rank].port); + int event_rank = ret_event - WSA_WAIT_EVENT_0; + + if (event_rank < nb_sockets) { + *socket_rank = event_rank; + /* if received data on a socket, process it. */ + if (*socket_rank < nb_sockets) { + /* Received data on socket i */ + int ret = picoquic_win_recvmsg_async_finish(&s_ctx[*socket_rank]); + ResetEvent(s_ctx[*socket_rank].overlap.hEvent); + + if (ret != 0) { + DBG_PRINTF("%s", "Cannot finish async recv"); + bytes_recv = -1; } - else if (addr_dest->ss_family == AF_INET) { - ((struct sockaddr_in*) addr_dest)->sin_port = htons(s_ctx[*socket_rank].port); + else { + bytes_recv = s_ctx[*socket_rank].bytes_recv; + *received_ecn = s_ctx[*socket_rank].received_ecn; + *received_buffer = s_ctx[*socket_rank].recv_buffer; + picoquic_store_addr(addr_dest, (struct sockaddr*)&s_ctx[*socket_rank].addr_dest); + picoquic_store_addr(addr_from, (struct sockaddr*)&s_ctx[*socket_rank].addr_from); + /* Document incoming port */ + if (addr_dest->ss_family == AF_INET6) { + ((struct sockaddr_in6*)addr_dest)->sin6_port = htons(s_ctx[*socket_rank].port); + } + else if (addr_dest->ss_family == AF_INET) { + ((struct sockaddr_in*)addr_dest)->sin_port = htons(s_ctx[*socket_rank].port); + } } } } + else if (event_rank == wake_up_event_rank) { + *is_wake_up_event = 1; + if (ResetEvent(thread_ctx->wake_up_event) == 0) { + DBG_PRINTF("Cannot reset network event, error 0x%x", GetLastError()); + bytes_recv = -1; + } + } } return bytes_recv; } @@ -544,6 +562,8 @@ int picoquic_packet_loop_select(picoquic_socket_ctx_t* s_ctx, unsigned char * received_ecn, uint8_t* buffer, int buffer_max, int64_t delta_t, + int * is_wake_up_event, + picoquic_network_thread_ctx_t * thread_ctx, int * socket_rank) { fd_set readfds; @@ -565,6 +585,14 @@ int picoquic_packet_loop_select(picoquic_socket_ctx_t* s_ctx, FD_SET(s_ctx[i].fd, &readfds); } + *is_wake_up_event = 0; + if (thread_ctx->wake_up_defined) { + if (sockmax < (int)wake_up_fd[0]) { + sockmax = (int)wake_up_fd[0]; + } + FD_SET(wake_up_fd[0], &readfds); + } + if (delta_t <= 0) { tv.tv_sec = 0; tv.tv_usec = 0; @@ -584,35 +612,51 @@ int picoquic_packet_loop_select(picoquic_socket_ctx_t* s_ctx, bytes_recv = -1; DBG_PRINTF("Error: select returns %d\n", ret_select); } else if (ret_select > 0) { - for (int i = 0; i < nb_sockets; i++) { - if (FD_ISSET(s_ctx[i].fd, &readfds)) { - *socket_rank = i; - bytes_recv = picoquic_recvmsg(s_ctx[i].fd, addr_from, - addr_dest, dest_if, received_ecn, - buffer, buffer_max); - - if (bytes_recv <= 0) { -#ifdef _WINDOWS - int last_error = WSAGetLastError(); - - if (last_error == WSAECONNRESET || last_error == WSAEMSGSIZE) { - bytes_recv = 0; - continue; - } -#endif - DBG_PRINTF("Could not receive packet on UDP socket[%d]= %d!\n", - i, (int)s_ctx[i].fd); + /* Check if the 'wake up' pipe is full. If it is, read the data on it, + * set the is_wake_up_event flag, and ignore the other file descriptors. */ + if (thread_ctx->wake_up_defined && FD_ISSET(thread_ctx->wake_up_fd[0], &readfds)) { + /* Something was written on the "wakeup" pipe. Read it. */ + uint8_t eventbuf[8]; + int pipe_recv; + if ((pipe_recv = read(thread_ctx->wake_up_fd[0], eventbuf, sizeof(eventbuf))) <= 0) { + bytes_recv = -1; + DBG_PRINTF("Error: read pipe returns %d\n", (pipe_recv == 0)?EPIPE:errno); + } + else { + *is_wake_up_event = 1; + } + } + else + { + for (int i = 0; i < nb_sockets; i++) { + if (FD_ISSET(s_ctx[i].fd, &readfds)) { + *socket_rank = i; + bytes_recv = picoquic_recvmsg(s_ctx[i].fd, addr_from, + addr_dest, dest_if, received_ecn, + buffer, buffer_max); + + if (bytes_recv <= 0) { + int last_error = WSAGetLastError(); + + if (last_error == WSAECONNRESET || last_error == WSAEMSGSIZE) { + bytes_recv = 0; + continue; + } + DBG_PRINTF("Could not receive packet on UDP socket[%d]= %d!\n", + i, (int)s_ctx[i].fd); - break; - } else { - /* Document incoming port */ - if (addr_dest->ss_family == AF_INET6) { - ((struct sockaddr_in6*) addr_dest)->sin6_port = htons(s_ctx[i].port); + break; } - else if (addr_dest->ss_family == AF_INET) { - ((struct sockaddr_in*) addr_dest)->sin_port = htons(s_ctx[i].port); + else { + /* Document incoming port */ + if (addr_dest->ss_family == AF_INET6) { + ((struct sockaddr_in6*)addr_dest)->sin6_port = htons(s_ctx[i].port); + } + else if (addr_dest->ss_family == AF_INET) { + ((struct sockaddr_in*)addr_dest)->sin_port = htons(s_ctx[i].port); + } + break; } - break; } } } @@ -621,18 +665,17 @@ int picoquic_packet_loop_select(picoquic_socket_ctx_t* s_ctx, return bytes_recv; } #endif - -/* TODO: replace select by windows like structure. -* Step 1, just use synchronous sendmsg on both. -* Step 2, consider using asynchronous sendmsg on Windows, -* which requires managing a list of buffers. - */ - -int picoquic_packet_loop_v2(picoquic_quic_t* quic, - picoquic_packet_loop_param_t * param, - picoquic_packet_loop_cb_fn loop_callback, - void * loop_callback_ctx) +#ifdef _WINDOWS +DWORD WINAPI picoquic_packet_loop_v3(LPVOID v_ctx) +#else +int picoquic_packet_loop_v3(void * v_ctx) +#endif { + picoquic_network_thread_ctx_t* thread_ctx = (picoquic_network_thread_ctx_t*)v_ctx; + picoquic_quic_t* quic = thread_ctx->quic; + picoquic_packet_loop_param_t* param = thread_ctx->param; + picoquic_packet_loop_cb_fn loop_callback = thread_ctx->loop_callback; + void* loop_callback_ctx = thread_ctx->loop_callback_ctx; int ret = 0; uint64_t current_time = picoquic_get_quic_time(quic); int64_t delay_max = 10000000; @@ -652,14 +695,11 @@ int picoquic_packet_loop_v2(picoquic_quic_t* quic, picoquic_socket_ctx_t s_ctx[4]; int nb_sockets = 0; int nb_sockets_available = 0; -#if 0 - int testing_migration = 0; /* Hook for the migration test */ - uint16_t next_port = 0; /* Data for the migration test */ -#endif picoquic_cnx_t* last_cnx = NULL; int loop_immediate = 0; picoquic_packet_loop_options_t options = { 0 }; uint64_t next_send_time = current_time + PICOQUIC_PACKET_LOOP_SEND_DELAY_MAX; + int is_wake_up_event; #ifdef _WINDOWS WSADATA wsaData = { 0 }; (void)WSA_START(MAKEWORD(2, 2), &wsaData); @@ -700,7 +740,7 @@ int picoquic_packet_loop_v2(picoquic_quic_t* quic, /* Wait for packets */ /* TODO: add stopping condition, was && (!just_once || !connection_done) */ /* Actually, no, rely on the callback return code for that? */ - while (ret == 0) { + while (ret == 0 && !thread_ctx->thread_should_close) { int socket_rank = -1; int64_t delta_t = 0; uint8_t received_ecn; @@ -724,18 +764,22 @@ int picoquic_packet_loop_v2(picoquic_quic_t* quic, #ifdef _WINDOWS bytes_recv = picoquic_packet_loop_wait(s_ctx, nb_sockets_available, &addr_from, &addr_to, &if_index_to, &received_ecn, &received_buffer, - delta_t, &socket_rank); + delta_t, &is_wake_up_event, thread_ctx, &socket_rank); #else bytes_recv = picoquic_packet_loop_select(s_ctx, nb_sockets_available, &addr_from, &addr_to, &if_index_to, &received_ecn, buffer, sizeof(buffer), - delta_t, &socket_rank); + delta_t, &is_wake_up_event, thread_ctx, &socket_rank); received_buffer = buffer; #endif current_time = picoquic_current_time(); if (bytes_recv < 0) { - ret = -1; + /* The interrupt error is expected if the loop is closing. */ + ret = (thread_ctx->thread_should_close)?PICOQUIC_NO_ERROR_TERMINATE_PACKET_LOOP:-1; + } + else if (bytes_recv == 0 && is_wake_up_event) { + ret = loop_callback(quic, picoquic_packet_loop_wake_up, loop_callback_ctx, NULL); } else { uint64_t loop_time = current_time; @@ -937,6 +981,22 @@ int picoquic_packet_loop_v2(picoquic_quic_t* quic, return ret; } +int picoquic_packet_loop_v2(picoquic_quic_t* quic, + picoquic_packet_loop_param_t* param, + picoquic_packet_loop_cb_fn loop_callback, + void* loop_callback_ctx) +{ + picoquic_network_thread_ctx_t thread_ctx = { 0 }; + + thread_ctx.quic = quic; + thread_ctx.param = param; + thread_ctx.loop_callback = loop_callback; + thread_ctx.loop_callback_ctx = loop_callback_ctx; + return picoquic_packet_loop_v3((void*)&thread_ctx); +} + +/* Support for legacy API */ + int picoquic_packet_loop(picoquic_quic_t* quic, int local_port, int local_af, @@ -955,4 +1015,117 @@ int picoquic_packet_loop(picoquic_quic_t* quic, param.do_not_use_gso = do_not_use_gso; return picoquic_packet_loop_v2(quic, ¶m, loop_callback, loop_callback_ctx); +} + +/* Management of background thread. */ + +static void picoquic_close_network_wake_up(picoquic_network_thread_ctx_t* thread_ctx) +{ + if (thread_ctx->wake_up_defined) { +#ifdef _WINDOWS + CloseHandle(thread_ctx->wake_up_event); +#else + for (int i = 0; i < 2; i++) { + (void)close(thread_ctx->wake_up_pipe_fd[i]); + } +#endif + thread_ctx->wake_up_defined = 0; + } +} + +static void picoquic_open_network_wake_up(picoquic_network_thread_ctx_t* thread_ctx, int *ret) +{ + thread_ctx->wake_up_defined = 0; +#ifdef _WINDOWS + thread_ctx->wake_up_event = CreateEvent(NULL, TRUE, FALSE, NULL); + if (thread_ctx->wake_up_event == NULL) { + *ret = GetLastError(); + } + else { + thread_ctx->wake_up_defined = 1; + } +#else + if (pipe(thread_ctx->wake_up_pipe_fd) != 0) { + *ret = errno; + } + else + { + thread_ctx->wake_up_defined = 1; + } +#endif +} + +picoquic_network_thread_ctx_t* picoquic_start_network_thread(picoquic_quic_t* quic, + picoquic_packet_loop_param_t* param, picoquic_packet_loop_cb_fn loop_callback, void* loop_callback_ctx, int* ret) +{ + picoquic_network_thread_ctx_t* thread_ctx = (picoquic_network_thread_ctx_t*)malloc(sizeof(picoquic_network_thread_ctx_t)); + *ret = 0; + + if (thread_ctx == NULL) { + /* Error, no memory */ + } + else { + /* Fill the arguments in the context */ + thread_ctx->quic = quic; + thread_ctx->param = param; + thread_ctx->loop_callback = loop_callback; + thread_ctx->loop_callback_ctx = loop_callback_ctx; + /* Open the wake up pipe or event */ + picoquic_open_network_wake_up(thread_ctx, ret); + /* Start thread at specified entry point */ + if (thread_ctx->wake_up_defined){ + if ((*ret = picoquic_create_thread(&thread_ctx->thread_id, picoquic_packet_loop_v3, (void*)thread_ctx)) != 0) { + /* Free the context and return error condition if something went wrong */ + picoquic_close_network_wake_up(thread_ctx); + free(thread_ctx); + thread_ctx = NULL; + } + } + } + return thread_ctx; +} + +int picoquic_wake_up_network_thread(picoquic_network_thread_ctx_t* thread_ctx) +{ + int ret = 0; + + if (thread_ctx->wake_up_defined) { +#ifdef _WINDOWS + if (SetEvent(thread_ctx->wake_up_event) == 0) { + DWORD err = WSAGetLastError(); + DBG_PRINTF("Set network event fails, error 0x%x", err); + ret = (int)err; + } +#else + /* TODO: write to network pipe */ + ssize_t written = 0; + if ((written = write(thread_ctx->wake_up_pipe_fd[1], &ret, 1)) != 1) { + if (written == 0) { + ret = EPIPE; + } + else { + ret = errno; + } + } +#endif + } + else { + ret = -1; + } + return ret; +} + +void picoquic_delete_network_thread(picoquic_network_thread_ctx_t* thread_ctx) +{ + /* set the should_close flag, so the thread knows the loop should stop */ + thread_ctx->thread_should_close = 1; + /* Delete the wake up event. This ought to create a fault + * in the wait for event call, causing the thread to wake up, + * notice the flag, and exit. + */ + picoquic_close_network_wake_up(thread_ctx); + /* delete the thread */ + picoquic_delete_thread(thread_ctx->thread_id); + /* Free the context */ + free(thread_ctx); } \ No newline at end of file