From 2a17eb7cbbb46d7cbc59cb2eaaf0b9eb02e8f197 Mon Sep 17 00:00:00 2001 From: Christian Huitema Date: Mon, 12 Feb 2024 20:31:44 -0800 Subject: [PATCH 1/6] Add wake up function to packet loop --- picoquic/picoquic_packet_loop.h | 71 +++++-- picoquic/sockloop.c | 315 +++++++++++++++++++++++++------- 2 files changed, 303 insertions(+), 83 deletions(-) diff --git a/picoquic/picoquic_packet_loop.h b/picoquic/picoquic_packet_loop.h index 09c08ac78..838f55beb 100644 --- a/picoquic/picoquic_packet_loop.h +++ b/picoquic/picoquic_packet_loop.h @@ -24,6 +24,7 @@ #include "picosocks.h" #include "picoquic.h" +#include "picoquic_utils.h" #ifdef __cplusplus extern "C" { @@ -75,9 +76,19 @@ typedef enum { picoquic_packet_loop_after_receive, /* Argument type size_t*: nb packets received */ picoquic_packet_loop_after_send, /* Argument type size_t*: nb packets sent */ picoquic_packet_loop_port_update, /* argument type struct_sockaddr*: new address for wakeup */ - picoquic_packet_loop_time_check /* argument type . Optional. */ + picoquic_packet_loop_time_check, /* argument type packet_loop_time_check_arg_t*. Optional. */ + picoquic_packet_loop_wake_up /* no argument (void* NULL). Used when loop wakeup is supported */ } picoquic_packet_loop_cb_enum; +/* The time check option passes as argument a pointer to a structure specifying +* the current time and the proposed delta. The application uses the specified +* current time to compute an updated delta. +*/ +typedef struct st_packet_loop_time_check_arg_t { + uint64_t current_time; + int64_t delta_t; +} packet_loop_time_check_arg_t; + typedef int (*picoquic_packet_loop_cb_fn)(picoquic_quic_t * quic, picoquic_packet_loop_cb_enum cb_mode, void * callback_ctx, void * callback_argv); /* Packet loop option list shows support by application of optional features. @@ -88,15 +99,6 @@ typedef struct st_picoquic_packet_loop_options_t { int do_time_check : 1; /* App should be polled for next time before sock select */ } picoquic_packet_loop_options_t; -/* The time check option passes as argument a pointer to a structure specifying - * the current time and the proposed delta. The application uses the specified - * current time to compute an updated delta. - */ -typedef struct st_packet_loop_time_check_arg_t { - uint64_t current_time; - int64_t delta_t; -} packet_loop_time_check_arg_t; - /* Version 2 of packet loop, works in progress. * Parameters are set in a struct, for future * extensibility. @@ -117,8 +119,53 @@ int picoquic_packet_loop_v2(picoquic_quic_t* quic, picoquic_packet_loop_cb_fn loop_callback, void * loop_callback_ctx); -/* Two versions of the packet loop, one portable and one speciailezed - * for winsock. +/* Threaded version of packet loop, when running picoquic in a background thread. +* +* Thread is started by calling picoquic_start_network_thread, which +* returns an argument of type picoquic_network_thread_ctx_t. Returns a NULL +* pointer if the thread could not be created. +* +* If the application needs to post new data or otherwise interact with +* the quic connections, it should call picoquic_wake_up_network_thread, +* passing the thread context as an argument. This with trigger a +* callback of type `picoquic_packet_loop_wake_up`, which executes +* in the context of the network thread. Picoquic APIs can be called +* in this context without worrying about concurrency issues. +* +* If the application wants to close the network thread, it calls +* picoquic_close_network_thread, passing the thread context as an argument. +* The network thread context will be freed during that call. +*/ + +typedef struct st_picoquic_network_thread_ctx_t { + picoquic_quic_t* quic; + picoquic_packet_loop_param_t* param; + picoquic_packet_loop_cb_fn loop_callback; + void* loop_callback_ctx; + picoquic_thread_t thread_id; +#ifdef _WINDOWS + HANDLE wake_up_event; +#else + int wake_up_pipe_fd[2]; +#endif + int wake_up_defined; + volatile int thread_should_close; +} picoquic_network_thread_ctx_t; + +picoquic_network_thread_ctx_t* picoquic_start_network_thread( + picoquic_quic_t* quic, + picoquic_packet_loop_param_t* param, + picoquic_packet_loop_cb_fn loop_callback, + void* loop_callback_ctx, + int * ret); + +int picoquic_wake_up_network_thread(picoquic_network_thread_ctx_t* thread_ctx); +void picoquic_delete_network_thread(picoquic_network_thread_ctx_t* thread_ctx); + + +/* Legacy versions the packet loop, one portable and one specialized + * for winsock. Keeping these API for compatibility, but the implementation + * redirects to picoquic_packet_loop_v2. */ int picoquic_packet_loop(picoquic_quic_t* quic, int local_port, diff --git a/picoquic/sockloop.c b/picoquic/sockloop.c index ad2193d39..17177a1c5 100644 --- a/picoquic/sockloop.c +++ b/picoquic/sockloop.c @@ -456,8 +456,6 @@ int picoquic_packet_loop_open_sockets(uint16_t local_port, int local_af, int soc } /* -* TODO: accomodate both Windows and Unix style. -* * Windows: use asynchronous receive. Asynchronous receive requires * declaring an overlap context and event per socket, as well as a * buffer per socket. This should probably be an option. Instead of @@ -487,51 +485,71 @@ int picoquic_packet_loop_wait(picoquic_socket_ctx_t* s_ctx, unsigned char* received_ecn, uint8_t** received_buffer, int64_t delta_t, + int* is_wake_up_event, + picoquic_network_thread_ctx_t * thread_ctx, int* socket_rank) { int bytes_recv = 0; - HANDLE events[4]; + HANDLE events[5]; DWORD ret_event; DWORD nb_events = 0; + int wake_up_event_rank = -1; DWORD dwDeltaT = (DWORD)((delta_t <= 0)? 0: (delta_t / 1000)); for (int i = 0; i < 4 && i < nb_sockets; i++) { events[i] = s_ctx[i].overlap.hEvent; nb_events++; } + *is_wake_up_event = 0; + if (thread_ctx->wake_up_defined) { + wake_up_event_rank = nb_events; + events[nb_events] = thread_ctx->wake_up_event; + nb_events++; + } - ret_event = WSAWaitForMultipleEvents(nb_sockets, events, FALSE, dwDeltaT, TRUE); + ret_event = WSAWaitForMultipleEvents(nb_events, events, FALSE, dwDeltaT, TRUE); if (ret_event == WSA_WAIT_FAILED) { DBG_PRINTF("WSAWaitForMultipleEvents fails, error 0x%x", WSAGetLastError()); bytes_recv = -1; } else if (ret_event >= WSA_WAIT_EVENT_0) { - *socket_rank = ret_event - WSA_WAIT_EVENT_0; - /* if received data on a socket, process it. */ - if (*socket_rank < nb_sockets) { - /* Received data on socket i */ - int ret = picoquic_win_recvmsg_async_finish(&s_ctx[*socket_rank]); - ResetEvent(s_ctx[*socket_rank].overlap.hEvent); - - if (ret != 0) { - DBG_PRINTF("%s", "Cannot finish async recv"); - bytes_recv = -1; - } - else { - bytes_recv = s_ctx[*socket_rank].bytes_recv; - *received_ecn = s_ctx[*socket_rank].received_ecn; - *received_buffer = s_ctx[*socket_rank].recv_buffer; - picoquic_store_addr(addr_dest, (struct sockaddr*)&s_ctx[*socket_rank].addr_dest); - picoquic_store_addr(addr_from, (struct sockaddr*)&s_ctx[*socket_rank].addr_from); - /* Document incoming port */ - if (addr_dest->ss_family == AF_INET6) { - ((struct sockaddr_in6*) addr_dest)->sin6_port = htons(s_ctx[*socket_rank].port); + int event_rank = ret_event - WSA_WAIT_EVENT_0; + + if (event_rank < nb_sockets) { + *socket_rank = event_rank; + /* if received data on a socket, process it. */ + if (*socket_rank < nb_sockets) { + /* Received data on socket i */ + int ret = picoquic_win_recvmsg_async_finish(&s_ctx[*socket_rank]); + ResetEvent(s_ctx[*socket_rank].overlap.hEvent); + + if (ret != 0) { + DBG_PRINTF("%s", "Cannot finish async recv"); + bytes_recv = -1; } - else if (addr_dest->ss_family == AF_INET) { - ((struct sockaddr_in*) addr_dest)->sin_port = htons(s_ctx[*socket_rank].port); + else { + bytes_recv = s_ctx[*socket_rank].bytes_recv; + *received_ecn = s_ctx[*socket_rank].received_ecn; + *received_buffer = s_ctx[*socket_rank].recv_buffer; + picoquic_store_addr(addr_dest, (struct sockaddr*)&s_ctx[*socket_rank].addr_dest); + picoquic_store_addr(addr_from, (struct sockaddr*)&s_ctx[*socket_rank].addr_from); + /* Document incoming port */ + if (addr_dest->ss_family == AF_INET6) { + ((struct sockaddr_in6*)addr_dest)->sin6_port = htons(s_ctx[*socket_rank].port); + } + else if (addr_dest->ss_family == AF_INET) { + ((struct sockaddr_in*)addr_dest)->sin_port = htons(s_ctx[*socket_rank].port); + } } } } + else if (event_rank == wake_up_event_rank) { + *is_wake_up_event = 1; + if (ResetEvent(thread_ctx->wake_up_event) == 0) { + DBG_PRINTF("Cannot reset network event, error 0x%x", GetLastError()); + bytes_recv = -1; + } + } } return bytes_recv; } @@ -544,6 +562,8 @@ int picoquic_packet_loop_select(picoquic_socket_ctx_t* s_ctx, unsigned char * received_ecn, uint8_t* buffer, int buffer_max, int64_t delta_t, + int * is_wake_up_event, + picoquic_network_thread_ctx_t * thread_ctx, int * socket_rank) { fd_set readfds; @@ -565,6 +585,14 @@ int picoquic_packet_loop_select(picoquic_socket_ctx_t* s_ctx, FD_SET(s_ctx[i].fd, &readfds); } + *is_wake_up_event = 0; + if (thread_ctx->wake_up_defined) { + if (sockmax < (int)wake_up_fd[0]) { + sockmax = (int)wake_up_fd[0]; + } + FD_SET(wake_up_fd[0], &readfds); + } + if (delta_t <= 0) { tv.tv_sec = 0; tv.tv_usec = 0; @@ -584,35 +612,51 @@ int picoquic_packet_loop_select(picoquic_socket_ctx_t* s_ctx, bytes_recv = -1; DBG_PRINTF("Error: select returns %d\n", ret_select); } else if (ret_select > 0) { - for (int i = 0; i < nb_sockets; i++) { - if (FD_ISSET(s_ctx[i].fd, &readfds)) { - *socket_rank = i; - bytes_recv = picoquic_recvmsg(s_ctx[i].fd, addr_from, - addr_dest, dest_if, received_ecn, - buffer, buffer_max); - - if (bytes_recv <= 0) { -#ifdef _WINDOWS - int last_error = WSAGetLastError(); - - if (last_error == WSAECONNRESET || last_error == WSAEMSGSIZE) { - bytes_recv = 0; - continue; - } -#endif - DBG_PRINTF("Could not receive packet on UDP socket[%d]= %d!\n", - i, (int)s_ctx[i].fd); + /* Check if the 'wake up' pipe is full. If it is, read the data on it, + * set the is_wake_up_event flag, and ignore the other file descriptors. */ + if (thread_ctx->wake_up_defined && FD_ISSET(thread_ctx->wake_up_fd[0], &readfds)) { + /* Something was written on the "wakeup" pipe. Read it. */ + uint8_t eventbuf[8]; + int pipe_recv; + if ((pipe_recv = read(thread_ctx->wake_up_fd[0], eventbuf, sizeof(eventbuf))) <= 0) { + bytes_recv = -1; + DBG_PRINTF("Error: read pipe returns %d\n", (pipe_recv == 0)?EPIPE:errno); + } + else { + *is_wake_up_event = 1; + } + } + else + { + for (int i = 0; i < nb_sockets; i++) { + if (FD_ISSET(s_ctx[i].fd, &readfds)) { + *socket_rank = i; + bytes_recv = picoquic_recvmsg(s_ctx[i].fd, addr_from, + addr_dest, dest_if, received_ecn, + buffer, buffer_max); + + if (bytes_recv <= 0) { + int last_error = WSAGetLastError(); + + if (last_error == WSAECONNRESET || last_error == WSAEMSGSIZE) { + bytes_recv = 0; + continue; + } + DBG_PRINTF("Could not receive packet on UDP socket[%d]= %d!\n", + i, (int)s_ctx[i].fd); - break; - } else { - /* Document incoming port */ - if (addr_dest->ss_family == AF_INET6) { - ((struct sockaddr_in6*) addr_dest)->sin6_port = htons(s_ctx[i].port); + break; } - else if (addr_dest->ss_family == AF_INET) { - ((struct sockaddr_in*) addr_dest)->sin_port = htons(s_ctx[i].port); + else { + /* Document incoming port */ + if (addr_dest->ss_family == AF_INET6) { + ((struct sockaddr_in6*)addr_dest)->sin6_port = htons(s_ctx[i].port); + } + else if (addr_dest->ss_family == AF_INET) { + ((struct sockaddr_in*)addr_dest)->sin_port = htons(s_ctx[i].port); + } + break; } - break; } } } @@ -621,18 +665,17 @@ int picoquic_packet_loop_select(picoquic_socket_ctx_t* s_ctx, return bytes_recv; } #endif - -/* TODO: replace select by windows like structure. -* Step 1, just use synchronous sendmsg on both. -* Step 2, consider using asynchronous sendmsg on Windows, -* which requires managing a list of buffers. - */ - -int picoquic_packet_loop_v2(picoquic_quic_t* quic, - picoquic_packet_loop_param_t * param, - picoquic_packet_loop_cb_fn loop_callback, - void * loop_callback_ctx) +#ifdef _WINDOWS +DWORD WINAPI picoquic_packet_loop_v3(LPVOID v_ctx) +#else +int picoquic_packet_loop_v3(void * v_ctx) +#endif { + picoquic_network_thread_ctx_t* thread_ctx = (picoquic_network_thread_ctx_t*)v_ctx; + picoquic_quic_t* quic = thread_ctx->quic; + picoquic_packet_loop_param_t* param = thread_ctx->param; + picoquic_packet_loop_cb_fn loop_callback = thread_ctx->loop_callback; + void* loop_callback_ctx = thread_ctx->loop_callback_ctx; int ret = 0; uint64_t current_time = picoquic_get_quic_time(quic); int64_t delay_max = 10000000; @@ -652,14 +695,11 @@ int picoquic_packet_loop_v2(picoquic_quic_t* quic, picoquic_socket_ctx_t s_ctx[4]; int nb_sockets = 0; int nb_sockets_available = 0; -#if 0 - int testing_migration = 0; /* Hook for the migration test */ - uint16_t next_port = 0; /* Data for the migration test */ -#endif picoquic_cnx_t* last_cnx = NULL; int loop_immediate = 0; picoquic_packet_loop_options_t options = { 0 }; uint64_t next_send_time = current_time + PICOQUIC_PACKET_LOOP_SEND_DELAY_MAX; + int is_wake_up_event; #ifdef _WINDOWS WSADATA wsaData = { 0 }; (void)WSA_START(MAKEWORD(2, 2), &wsaData); @@ -700,7 +740,7 @@ int picoquic_packet_loop_v2(picoquic_quic_t* quic, /* Wait for packets */ /* TODO: add stopping condition, was && (!just_once || !connection_done) */ /* Actually, no, rely on the callback return code for that? */ - while (ret == 0) { + while (ret == 0 && !thread_ctx->thread_should_close) { int socket_rank = -1; int64_t delta_t = 0; uint8_t received_ecn; @@ -724,18 +764,22 @@ int picoquic_packet_loop_v2(picoquic_quic_t* quic, #ifdef _WINDOWS bytes_recv = picoquic_packet_loop_wait(s_ctx, nb_sockets_available, &addr_from, &addr_to, &if_index_to, &received_ecn, &received_buffer, - delta_t, &socket_rank); + delta_t, &is_wake_up_event, thread_ctx, &socket_rank); #else bytes_recv = picoquic_packet_loop_select(s_ctx, nb_sockets_available, &addr_from, &addr_to, &if_index_to, &received_ecn, buffer, sizeof(buffer), - delta_t, &socket_rank); + delta_t, &is_wake_up_event, thread_ctx, &socket_rank); received_buffer = buffer; #endif current_time = picoquic_current_time(); if (bytes_recv < 0) { - ret = -1; + /* The interrupt error is expected if the loop is closing. */ + ret = (thread_ctx->thread_should_close)?PICOQUIC_NO_ERROR_TERMINATE_PACKET_LOOP:-1; + } + else if (bytes_recv == 0 && is_wake_up_event) { + ret = loop_callback(quic, picoquic_packet_loop_wake_up, loop_callback_ctx, NULL); } else { uint64_t loop_time = current_time; @@ -937,6 +981,22 @@ int picoquic_packet_loop_v2(picoquic_quic_t* quic, return ret; } +int picoquic_packet_loop_v2(picoquic_quic_t* quic, + picoquic_packet_loop_param_t* param, + picoquic_packet_loop_cb_fn loop_callback, + void* loop_callback_ctx) +{ + picoquic_network_thread_ctx_t thread_ctx = { 0 }; + + thread_ctx.quic = quic; + thread_ctx.param = param; + thread_ctx.loop_callback = loop_callback; + thread_ctx.loop_callback_ctx = loop_callback_ctx; + return picoquic_packet_loop_v3((void*)&thread_ctx); +} + +/* Support for legacy API */ + int picoquic_packet_loop(picoquic_quic_t* quic, int local_port, int local_af, @@ -955,4 +1015,117 @@ int picoquic_packet_loop(picoquic_quic_t* quic, param.do_not_use_gso = do_not_use_gso; return picoquic_packet_loop_v2(quic, ¶m, loop_callback, loop_callback_ctx); +} + +/* Management of background thread. */ + +static void picoquic_close_network_wake_up(picoquic_network_thread_ctx_t* thread_ctx) +{ + if (thread_ctx->wake_up_defined) { +#ifdef _WINDOWS + CloseHandle(thread_ctx->wake_up_event); +#else + for (int i = 0; i < 2; i++) { + (void)close(thread_ctx->wake_up_pipe_fd[i]); + } +#endif + thread_ctx->wake_up_defined = 0; + } +} + +static void picoquic_open_network_wake_up(picoquic_network_thread_ctx_t* thread_ctx, int *ret) +{ + thread_ctx->wake_up_defined = 0; +#ifdef _WINDOWS + thread_ctx->wake_up_event = CreateEvent(NULL, TRUE, FALSE, NULL); + if (thread_ctx->wake_up_event == NULL) { + *ret = GetLastError(); + } + else { + thread_ctx->wake_up_defined = 1; + } +#else + if (pipe(thread_ctx->wake_up_pipe_fd) != 0) { + *ret = errno; + } + else + { + thread_ctx->wake_up_defined = 1; + } +#endif +} + +picoquic_network_thread_ctx_t* picoquic_start_network_thread(picoquic_quic_t* quic, + picoquic_packet_loop_param_t* param, picoquic_packet_loop_cb_fn loop_callback, void* loop_callback_ctx, int* ret) +{ + picoquic_network_thread_ctx_t* thread_ctx = (picoquic_network_thread_ctx_t*)malloc(sizeof(picoquic_network_thread_ctx_t)); + *ret = 0; + + if (thread_ctx == NULL) { + /* Error, no memory */ + } + else { + /* Fill the arguments in the context */ + thread_ctx->quic = quic; + thread_ctx->param = param; + thread_ctx->loop_callback = loop_callback; + thread_ctx->loop_callback_ctx = loop_callback_ctx; + /* Open the wake up pipe or event */ + picoquic_open_network_wake_up(thread_ctx, ret); + /* Start thread at specified entry point */ + if (thread_ctx->wake_up_defined){ + if ((*ret = picoquic_create_thread(&thread_ctx->thread_id, picoquic_packet_loop_v3, (void*)thread_ctx)) != 0) { + /* Free the context and return error condition if something went wrong */ + picoquic_close_network_wake_up(thread_ctx); + free(thread_ctx); + thread_ctx = NULL; + } + } + } + return thread_ctx; +} + +int picoquic_wake_up_network_thread(picoquic_network_thread_ctx_t* thread_ctx) +{ + int ret = 0; + + if (thread_ctx->wake_up_defined) { +#ifdef _WINDOWS + if (SetEvent(thread_ctx->wake_up_event) == 0) { + DWORD err = WSAGetLastError(); + DBG_PRINTF("Set network event fails, error 0x%x", err); + ret = (int)err; + } +#else + /* TODO: write to network pipe */ + ssize_t written = 0; + if ((written = write(thread_ctx->wake_up_pipe_fd[1], &ret, 1)) != 1) { + if (written == 0) { + ret = EPIPE; + } + else { + ret = errno; + } + } +#endif + } + else { + ret = -1; + } + return ret; +} + +void picoquic_delete_network_thread(picoquic_network_thread_ctx_t* thread_ctx) +{ + /* set the should_close flag, so the thread knows the loop should stop */ + thread_ctx->thread_should_close = 1; + /* Delete the wake up event. This ought to create a fault + * in the wait for event call, causing the thread to wake up, + * notice the flag, and exit. + */ + picoquic_close_network_wake_up(thread_ctx); + /* delete the thread */ + picoquic_delete_thread(thread_ctx->thread_id); + /* Free the context */ + free(thread_ctx); } \ No newline at end of file From 453d5a0b05cb2d33c7415117b0cb546d75a8e96f Mon Sep 17 00:00:00 2001 From: Christian Huitema Date: Mon, 12 Feb 2024 21:06:09 -0800 Subject: [PATCH 2/6] Fix Unix compile issues --- picoquic/picoquic_packet_loop.h | 2 ++ picoquic/sockloop.c | 51 +++++++++++++++++++-------------- 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/picoquic/picoquic_packet_loop.h b/picoquic/picoquic_packet_loop.h index 838f55beb..5bc2ddb19 100644 --- a/picoquic/picoquic_packet_loop.h +++ b/picoquic/picoquic_packet_loop.h @@ -148,8 +148,10 @@ typedef struct st_picoquic_network_thread_ctx_t { #else int wake_up_pipe_fd[2]; #endif + int is_threaded; int wake_up_defined; volatile int thread_should_close; + int return_code; } picoquic_network_thread_ctx_t; picoquic_network_thread_ctx_t* picoquic_start_network_thread( diff --git a/picoquic/sockloop.c b/picoquic/sockloop.c index 17177a1c5..45bbed5f9 100644 --- a/picoquic/sockloop.c +++ b/picoquic/sockloop.c @@ -587,10 +587,10 @@ int picoquic_packet_loop_select(picoquic_socket_ctx_t* s_ctx, *is_wake_up_event = 0; if (thread_ctx->wake_up_defined) { - if (sockmax < (int)wake_up_fd[0]) { - sockmax = (int)wake_up_fd[0]; + if (sockmax < (int)wake_up_pipe_fd[0]) { + sockmax = (int)wake_up_pipe_fd[0]; } - FD_SET(wake_up_fd[0], &readfds); + FD_SET(wake_up_pipe_fd[0], &readfds); } if (delta_t <= 0) { @@ -614,11 +614,11 @@ int picoquic_packet_loop_select(picoquic_socket_ctx_t* s_ctx, } else if (ret_select > 0) { /* Check if the 'wake up' pipe is full. If it is, read the data on it, * set the is_wake_up_event flag, and ignore the other file descriptors. */ - if (thread_ctx->wake_up_defined && FD_ISSET(thread_ctx->wake_up_fd[0], &readfds)) { + if (thread_ctx->wake_up_defined && FD_ISSET(thread_ctx->wake_up_pipe_fd[0], &readfds)) { /* Something was written on the "wakeup" pipe. Read it. */ uint8_t eventbuf[8]; int pipe_recv; - if ((pipe_recv = read(thread_ctx->wake_up_fd[0], eventbuf, sizeof(eventbuf))) <= 0) { + if ((pipe_recv = read(thread_ctx->wake_up_pipe_fd[0], eventbuf, sizeof(eventbuf))) <= 0) { bytes_recv = -1; DBG_PRINTF("Error: read pipe returns %d\n", (pipe_recv == 0)?EPIPE:errno); } @@ -636,15 +636,8 @@ int picoquic_packet_loop_select(picoquic_socket_ctx_t* s_ctx, buffer, buffer_max); if (bytes_recv <= 0) { - int last_error = WSAGetLastError(); - - if (last_error == WSAECONNRESET || last_error == WSAEMSGSIZE) { - bytes_recv = 0; - continue; - } DBG_PRINTF("Could not receive packet on UDP socket[%d]= %d!\n", i, (int)s_ctx[i].fd); - break; } else { @@ -668,7 +661,7 @@ int picoquic_packet_loop_select(picoquic_socket_ctx_t* s_ctx, #ifdef _WINDOWS DWORD WINAPI picoquic_packet_loop_v3(LPVOID v_ctx) #else -int picoquic_packet_loop_v3(void * v_ctx) +void* picoquic_packet_loop_v3(void* v_ctx) #endif { picoquic_network_thread_ctx_t* thread_ctx = (picoquic_network_thread_ctx_t*)v_ctx; @@ -770,13 +763,13 @@ int picoquic_packet_loop_v3(void * v_ctx) &addr_from, &addr_to, &if_index_to, &received_ecn, buffer, sizeof(buffer), - delta_t, &is_wake_up_event, thread_ctx, &socket_rank); + delta_t, &is_wake_up_event, thread_ctx, &socket_rank); received_buffer = buffer; #endif current_time = picoquic_current_time(); if (bytes_recv < 0) { /* The interrupt error is expected if the loop is closing. */ - ret = (thread_ctx->thread_should_close)?PICOQUIC_NO_ERROR_TERMINATE_PACKET_LOOP:-1; + ret = (thread_ctx->thread_should_close) ? PICOQUIC_NO_ERROR_TERMINATE_PACKET_LOOP : -1; } else if (bytes_recv == 0 && is_wake_up_event) { ret = loop_callback(quic, picoquic_packet_loop_wake_up, loop_callback_ctx, NULL); @@ -866,7 +859,7 @@ int picoquic_packet_loop_v3(void * v_ctx) * either IPv6, or IPv4, or both, and binding to a port number. * Find the first socket where: * - the destination AF is supported. - * - either the source port is not specified, or it matches the local port. + * - either the source port is not specified, or it matches the local port. */ SOCKET_TYPE send_socket = INVALID_SOCKET; uint16_t send_port = (peer_addr.ss_family == AF_INET) ? @@ -977,8 +970,17 @@ int picoquic_packet_loop_v3(void * v_ctx) if (send_buffer != NULL) { free(send_buffer); } - - return ret; + thread_ctx->return_code = ret; +#ifdef _WINDOWS + return (DWORD)ret; +#else + if (thread_ctx->is_threaded) { + pthread_exit((void*)&thread_ctx->return_code); + } + else { + return(NULL); + } +#endif } int picoquic_packet_loop_v2(picoquic_quic_t* quic, @@ -992,7 +994,9 @@ int picoquic_packet_loop_v2(picoquic_quic_t* quic, thread_ctx.param = param; thread_ctx.loop_callback = loop_callback; thread_ctx.loop_callback_ctx = loop_callback_ctx; - return picoquic_packet_loop_v3((void*)&thread_ctx); + + (void)picoquic_packet_loop_v3((void*)&thread_ctx); + return thread_ctx.return_code; } /* Support for legacy API */ @@ -1074,10 +1078,11 @@ picoquic_network_thread_ctx_t* picoquic_start_network_thread(picoquic_quic_t* qu picoquic_open_network_wake_up(thread_ctx, ret); /* Start thread at specified entry point */ if (thread_ctx->wake_up_defined){ + thread_ctx->is_threaded = 1; if ((*ret = picoquic_create_thread(&thread_ctx->thread_id, picoquic_packet_loop_v3, (void*)thread_ctx)) != 0) { /* Free the context and return error condition if something went wrong */ - picoquic_close_network_wake_up(thread_ctx); - free(thread_ctx); + thread_ctx->is_threaded = 0; + picoquic_delete_network_thread(thread_ctx); thread_ctx = NULL; } } @@ -1125,7 +1130,9 @@ void picoquic_delete_network_thread(picoquic_network_thread_ctx_t* thread_ctx) */ picoquic_close_network_wake_up(thread_ctx); /* delete the thread */ - picoquic_delete_thread(thread_ctx->thread_id); + if (thread_ctx->is_threaded) { + picoquic_delete_thread(thread_ctx->thread_id); + } /* Free the context */ free(thread_ctx); } \ No newline at end of file From 383c45feaa4c517d1c6067d318bd547167b906ca Mon Sep 17 00:00:00 2001 From: Christian Huitema Date: Mon, 12 Feb 2024 21:08:59 -0800 Subject: [PATCH 3/6] Fix pointer in delete thread --- picoquic/sockloop.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/picoquic/sockloop.c b/picoquic/sockloop.c index 45bbed5f9..982fb861f 100644 --- a/picoquic/sockloop.c +++ b/picoquic/sockloop.c @@ -1131,7 +1131,7 @@ void picoquic_delete_network_thread(picoquic_network_thread_ctx_t* thread_ctx) picoquic_close_network_wake_up(thread_ctx); /* delete the thread */ if (thread_ctx->is_threaded) { - picoquic_delete_thread(thread_ctx->thread_id); + picoquic_delete_thread(&thread_ctx->thread_id); } /* Free the context */ free(thread_ctx); From ba934059b60e0c106de8b8d8e8d678497808bcf4 Mon Sep 17 00:00:00 2001 From: Christian Huitema Date: Mon, 12 Feb 2024 21:13:17 -0800 Subject: [PATCH 4/6] Fix 2 more Unix bugs. --- picoquic/sockloop.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/picoquic/sockloop.c b/picoquic/sockloop.c index 982fb861f..9d345b1ae 100644 --- a/picoquic/sockloop.c +++ b/picoquic/sockloop.c @@ -79,6 +79,8 @@ #include #include +#include + #ifndef SOCKET_TYPE #define SOCKET_TYPE int #endif @@ -587,10 +589,10 @@ int picoquic_packet_loop_select(picoquic_socket_ctx_t* s_ctx, *is_wake_up_event = 0; if (thread_ctx->wake_up_defined) { - if (sockmax < (int)wake_up_pipe_fd[0]) { - sockmax = (int)wake_up_pipe_fd[0]; + if (sockmax < (int)thread_ctx->wake_up_pipe_fd[0]) { + sockmax = (int)thread_ctx->wake_up_pipe_fd[0]; } - FD_SET(wake_up_pipe_fd[0], &readfds); + FD_SET(thread_ctx->wake_up_pipe_fd[0], &readfds); } if (delta_t <= 0) { @@ -977,9 +979,7 @@ void* picoquic_packet_loop_v3(void* v_ctx) if (thread_ctx->is_threaded) { pthread_exit((void*)&thread_ctx->return_code); } - else { - return(NULL); - } + return(NULL); #endif } From 70d33283792506d1ccc09d7431dccf4631decbeb Mon Sep 17 00:00:00 2001 From: Christian Huitema Date: Tue, 13 Feb 2024 08:28:57 -0800 Subject: [PATCH 5/6] Test of loop with background thread --- UnitTest1/unittest1.cpp | 7 +++ picoquic/picoquic_packet_loop.h | 2 + picoquic/picosocks.c | 2 - picoquic/sockloop.c | 13 +++++- picoquic_t/picoquic_t.c | 1 + picoquictest/picoquictest.h | 1 + picoquictest/sockloop_test.c | 78 +++++++++++++++++++++++++++++++-- 7 files changed, 97 insertions(+), 7 deletions(-) diff --git a/UnitTest1/unittest1.cpp b/UnitTest1/unittest1.cpp index 9a860d788..eb2e31800 100644 --- a/UnitTest1/unittest1.cpp +++ b/UnitTest1/unittest1.cpp @@ -153,6 +153,13 @@ namespace UnitTest1 Assert::AreEqual(ret, 0); } + TEST_METHOD(sockloop_thread) + { + int ret = sockloop_thread_test(); + + Assert::AreEqual(ret, 0); + } + TEST_METHOD(splay) { int ret = splay_test(); diff --git a/picoquic/picoquic_packet_loop.h b/picoquic/picoquic_packet_loop.h index 5bc2ddb19..c125dd73d 100644 --- a/picoquic/picoquic_packet_loop.h +++ b/picoquic/picoquic_packet_loop.h @@ -150,7 +150,9 @@ typedef struct st_picoquic_network_thread_ctx_t { #endif int is_threaded; int wake_up_defined; + volatile int thread_is_ready; volatile int thread_should_close; + volatile int thread_is_closed; int return_code; } picoquic_network_thread_ctx_t; diff --git a/picoquic/picosocks.c b/picoquic/picosocks.c index 969529afd..ae610ef3c 100644 --- a/picoquic/picosocks.c +++ b/picoquic/picosocks.c @@ -912,8 +912,6 @@ int picoquic_recvmsg_async_start(picoquic_recvmsg_async_ctx_t* ctx) } } else { - DBG_PRINTF("Receive async immediate (WSARecvMsg) on UDP socket %d -- %d bytes !\n", - (int)ctx->fd, numberOfBytesReceived); ctx->nb_immediate_receive++; } } while (should_retry); diff --git a/picoquic/sockloop.c b/picoquic/sockloop.c index 9d345b1ae..b5d44258a 100644 --- a/picoquic/sockloop.c +++ b/picoquic/sockloop.c @@ -213,8 +213,6 @@ int picoquic_win_recvmsg_async_start(picoquic_socket_ctx_t* ctx) } } else { - DBG_PRINTF("Receive async immediate (WSARecvMsg) on UDP socket %d -- %d bytes !\n", - (int)ctx->fd, numberOfBytesReceived); ctx->nb_immediate_receive++; } } while (should_retry); @@ -732,6 +730,13 @@ void* picoquic_packet_loop_v3(void* v_ctx) } } + if (ret == 0) { + thread_ctx->thread_is_ready = 1; + } + else { + DBG_PRINTF("%s", "Thread cannot run"); + } + /* Wait for packets */ /* TODO: add stopping condition, was && (!just_once || !connection_done) */ /* Actually, no, rely on the callback return code for that? */ @@ -959,6 +964,8 @@ void* picoquic_packet_loop_v3(void* v_ctx) } } + thread_ctx->thread_is_ready = 0; + if (ret == PICOQUIC_NO_ERROR_TERMINATE_PACKET_LOOP) { /* Normal termination requested by the application, returns no error */ ret = 0; @@ -1069,6 +1076,7 @@ picoquic_network_thread_ctx_t* picoquic_start_network_thread(picoquic_quic_t* qu /* Error, no memory */ } else { + memset(thread_ctx, 0, sizeof(picoquic_network_thread_ctx_t)); /* Fill the arguments in the context */ thread_ctx->quic = quic; thread_ctx->param = param; @@ -1115,6 +1123,7 @@ int picoquic_wake_up_network_thread(picoquic_network_thread_ctx_t* thread_ctx) #endif } else { + DBG_PRINTF("%s", "Wake up event not defined."); ret = -1; } return ret; diff --git a/picoquic_t/picoquic_t.c b/picoquic_t/picoquic_t.c index cb3b0d462..c1d3c0ec1 100644 --- a/picoquic_t/picoquic_t.c +++ b/picoquic_t/picoquic_t.c @@ -57,6 +57,7 @@ static const picoquic_test_def_t test_table[] = { { "sockloop_ipv4", sockloop_ipv4_test }, { "sockloop_migration", sockloop_migration_test }, { "sockloop_nat", sockloop_nat_test }, + { "sockloop_thread", sockloop_thread_test }, { "splay", splay_test }, { "cnxcreation", cnxcreation_test }, { "parseheader", parseheadertest }, diff --git a/picoquictest/picoquictest.h b/picoquictest/picoquictest.h index afa6a9870..76fd1251b 100644 --- a/picoquictest/picoquictest.h +++ b/picoquictest/picoquictest.h @@ -174,6 +174,7 @@ int sockloop_errsock_test(); int sockloop_ipv4_test(); int sockloop_migration_test(); int sockloop_nat_test(); +int sockloop_thread_test(); int splay_test(); int TlsStreamFrameTest(); int draft17_vector_test(); diff --git a/picoquictest/sockloop_test.c b/picoquictest/sockloop_test.c index 189199fec..b8ab01e3a 100644 --- a/picoquictest/sockloop_test.c +++ b/picoquictest/sockloop_test.c @@ -30,6 +30,15 @@ #include "picoquic_packet_loop.h" #include "picosocks.h" + +#ifndef SLEEP +#ifdef _WINDOWS +#define SLEEP(x) Sleep(x) +#else +#define SLEEP(x) usleep((x)*1000) +#endif +#endif + /* * Testing the socket loop. * This requires sending packets through sockets, etc. We do that by using a @@ -223,6 +232,12 @@ int sockloop_test_cb(picoquic_quic_t* quic, picoquic_packet_loop_cb_enum cb_mode } break; } + case picoquic_packet_loop_wake_up: { + ret = picoquic_start_client_cnx(cnx_client); + DBG_PRINTF("Starting the client connection, returns: %d", ret); + break; + } + default: ret = PICOQUIC_ERROR_UNEXPECTED_ERROR; break; @@ -401,6 +416,7 @@ int sockloop_test_one(sockloop_test_spec_t *spec) sockloop_test_cb_t loop_cb = { 0 }; uint64_t current_time = picoquic_current_time(); picoquic_socket_ctx_t double_bind[2] = { 0 }; + picoquic_network_thread_ctx_t* thread_ctx = NULL; int nb_double_bind = 0; /* Create test context @@ -449,7 +465,9 @@ int sockloop_test_one(sockloop_test_spec_t *spec) if (ret == 0) { loop_cb.test_ctx = test_ctx; loop_cb.test_id = spec->test_id; - picoquic_start_client_cnx(test_ctx->cnx_client); + if (!spec->use_background_thread) { + picoquic_start_client_cnx(test_ctx->cnx_client); + } if (spec->test_id == 1) { #ifdef _WINDOWS_BUT_MAYBE_NOT ret = picoquic_packet_loop_win(test_ctx->qserver, spec->port, 0, 0, @@ -473,7 +491,48 @@ int sockloop_test_one(sockloop_test_spec_t *spec) loop_cb.force_migration = spec->force_migration; loop_cb.param = ¶m; - ret = picoquic_packet_loop_v2(test_ctx->qserver, ¶m, sockloop_test_cb, &loop_cb); + if (spec->use_background_thread) { + thread_ctx = picoquic_start_network_thread(test_ctx->qserver, ¶m, sockloop_test_cb, &loop_cb, &ret); + if (thread_ctx == NULL) { + if (ret == 0) { + ret = -1; + } + } + else { + for (int i = 0; i < 2000; i++) { + if (thread_ctx->thread_is_ready) { + DBG_PRINTF("Thread is ready after %dms", i); + break; + } + else { + SLEEP(1); + } + } + if (!thread_ctx->thread_is_ready) { + DBG_PRINTF("%s", "Cannot start the network thread in 2000ms"); + ret = -1; + } + else if (picoquic_wake_up_network_thread(thread_ctx) != 0) { + DBG_PRINTF("%s", "Cannot wakeup the network thread"); + ret = -1; + } + else { + for (int i = 0; i < 50; i++) { + if (sockloop_test_received_finished(test_ctx)) { + DBG_PRINTF("Receive finished after %dms", 100 * i); + break; + } + else { + SLEEP(100); + } + } + } + picoquic_delete_network_thread(thread_ctx); + } + } + else { + ret = picoquic_packet_loop_v2(test_ctx->qserver, ¶m, sockloop_test_cb, &loop_cb); + } } } /* Verify that the scenario worked. */ @@ -602,4 +661,17 @@ int sockloop_nat_test() spec.force_migration = 1; return(sockloop_test_one(&spec)); -} \ No newline at end of file +} + + +int sockloop_thread_test() +{ + sockloop_test_spec_t spec; + sockloop_test_set_spec(&spec, 7); + spec.socket_buffer_size = 0xffff; + spec.scenario = sockloop_test_scenario_1M; + spec.scenario_size = sizeof(sockloop_test_scenario_1M); + spec.use_background_thread = 1; + + return(sockloop_test_one(&spec)); +} From 1c3f5c73980201279e0d3d88ab1967eaddb7ac85 Mon Sep 17 00:00:00 2001 From: Christian Huitema Date: Tue, 13 Feb 2024 08:53:21 -0800 Subject: [PATCH 6/6] Cleanup and bump version --- CMakeLists.txt | 2 +- picoquic/picoquic.h | 2 +- picoquic/tls_api.c | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index ebfdb65e0..48fa09fdc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,7 +8,7 @@ else() endif() project(picoquic - VERSION 1.1.18.0 + VERSION 1.1.19.0 DESCRIPTION "picoquic library" LANGUAGES C CXX) diff --git a/picoquic/picoquic.h b/picoquic/picoquic.h index cf13aecf1..ace461aa0 100644 --- a/picoquic/picoquic.h +++ b/picoquic/picoquic.h @@ -40,7 +40,7 @@ extern "C" { #endif -#define PICOQUIC_VERSION "1.1.18.0" +#define PICOQUIC_VERSION "1.1.19.0" #define PICOQUIC_ERROR_CLASS 0x400 #define PICOQUIC_ERROR_DUPLICATE (PICOQUIC_ERROR_CLASS + 1) #define PICOQUIC_ERROR_AEAD_CHECK (PICOQUIC_ERROR_CLASS + 3) diff --git a/picoquic/tls_api.c b/picoquic/tls_api.c index d9a0fa8c6..e94ab6ed9 100644 --- a/picoquic/tls_api.c +++ b/picoquic/tls_api.c @@ -155,12 +155,12 @@ static int tls_api_is_init = 0; void picoquic_tls_api_init_providers(int unload) { if ((tls_api_init_flags & TLS_API_INIT_FLAGS_NO_MINICRYPTO) == 0) { - DBG_PRINTF("%s", "Loading minicrypto"); + DBG_PRINTF("%s", "%s minicrypto", (unload)?"Unloading":"Loading"); picoquic_ptls_minicrypto_load(unload); } #ifndef PTLS_WITHOUT_OPENSSL if ((tls_api_init_flags & TLS_API_INIT_FLAGS_NO_OPENSSL) == 0) { - DBG_PRINTF("%s", "Loading openssl"); + DBG_PRINTF("%s", "%s openssl", (unload)?"Unloading":"Loading"); picoquic_ptls_openssl_load(unload); } #else @@ -171,7 +171,7 @@ void picoquic_tls_api_init_providers(int unload) // picoquic_bcrypt_load(unload); #if (!defined(_WINDOWS) || defined(_WINDOWS64)) && !defined(PTLS_WITHOUT_FUSION) if ((tls_api_init_flags & TLS_API_INIT_FLAGS_NO_FUSION) == 0) { - DBG_PRINTF("%s", "Loading fusion"); + DBG_PRINTF("%s", "%s fusion", (unload)?"Unloading":"Loading"); picoquic_ptls_fusion_load(unload); } #else @@ -182,7 +182,7 @@ void picoquic_tls_api_init_providers(int unload) #ifdef PICOQUIC_WITH_MBEDTLS if ((tls_api_init_flags & TLS_API_INIT_FLAGS_NO_MBEDTLS) == 0) { - DBG_PRINTF("%s", "Loading MbedTLS"); + DBG_PRINTF("%s", "%s MbedTLS", (unload)?"Unloading":"Loading"); picoquic_mbedtls_load(unload); } #endif