From 41b37169a4568f36127559a4d47efa8043299539 Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 20 May 2024 23:33:47 +0200 Subject: [PATCH] feat: first attempt to enable buffering signals during ownership handoff --- CMakeLists.txt | 2 + c_src/quicer_ctx.c | 1 + c_src/quicer_ctx.h | 3 + c_src/quicer_nif.c | 6 +- c_src/quicer_owner_queue.c | 76 ++++++++++++++++++ c_src/quicer_owner_queue.h | 50 ++++++++++++ c_src/quicer_stream.c | 136 ++++++++++++++++++++++++++++++++- c_src/quicer_stream.h | 11 +++ src/quicer_nif.erl | 19 ++++- test/prop_stream_sig_queue.erl | 110 ++++++++++++++++++++++++++ 10 files changed, 411 insertions(+), 3 deletions(-) create mode 100644 c_src/quicer_owner_queue.c create mode 100644 c_src/quicer_owner_queue.h create mode 100644 test/prop_stream_sig_queue.erl diff --git a/CMakeLists.txt b/CMakeLists.txt index 56292f0e..6f0e943e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -112,6 +112,8 @@ set(SOURCES c_src/quicer_config.h c_src/quicer_queue.c c_src/quicer_queue.h + c_src/quicer_owner_queue.c + c_src/quicer_owner_queue.h c_src/quicer_ctx.c c_src/quicer_ctx.h c_src/quicer_listener.c diff --git a/c_src/quicer_ctx.c b/c_src/quicer_ctx.c index 7cc1410f..20e9f6bb 100644 --- a/c_src/quicer_ctx.c +++ b/c_src/quicer_ctx.c @@ -270,6 +270,7 @@ init_s_ctx() s_ctx->is_recv_pending = FALSE; s_ctx->is_closed = TRUE; // init s_ctx->event_mask = 0; + s_ctx->sig_queue = NULL; return s_ctx; } diff --git a/c_src/quicer_ctx.h b/c_src/quicer_ctx.h index b1f454c1..5d5aa32d 100644 --- a/c_src/quicer_ctx.h +++ b/c_src/quicer_ctx.h @@ -18,6 +18,7 @@ limitations under the License. #define __QUICER_CTX_H_ #include "quicer_nif.h" +#include "quicer_owner_queue.h" #include "quicer_queue.h" #include #include @@ -137,6 +138,8 @@ typedef struct QuicerStreamCTX // Track lifetime of Stream handle CXPLAT_REF_COUNT ref_count; uint32_t event_mask; + // for ownership handoff + OWNER_SIGNAL_QUEUE *sig_queue; void *reserved1; void *reserved2; void *reserved3; diff --git a/c_src/quicer_nif.c b/c_src/quicer_nif.c index 91eec7a2..928d8137 100644 --- a/c_src/quicer_nif.c +++ b/c_src/quicer_nif.c @@ -1584,7 +1584,11 @@ static ErlNifFunc nif_funcs[] = { { "get_connections", 1, get_connectionsX, 0}, { "get_conn_owner", 1, get_conn_owner1, 0}, { "get_stream_owner", 1, get_stream_owner1, 0}, - { "get_listener_owner", 1, get_listener_owner1, 0} + { "get_listener_owner", 1, get_listener_owner1, 0}, + /* for testing */ + {"buffer_sig", 3, buffer_sig, 0}, + {"enable_sig_buffer", 1, enable_sig_buffer, 0}, + {"flush_stream_buffered_sigs", 1, flush_stream_buffered_sigs, 0} // clang-format on }; diff --git a/c_src/quicer_owner_queue.c b/c_src/quicer_owner_queue.c new file mode 100644 index 00000000..9c0afc44 --- /dev/null +++ b/c_src/quicer_owner_queue.c @@ -0,0 +1,76 @@ +/*-------------------------------------------------------------------- +Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +-------------------------------------------------------------------*/ +#include "quicer_owner_queue.h" + +OWNER_SIGNAL_QUEUE * +OwnerSignalQueueNew() +{ + OWNER_SIGNAL_QUEUE *sig + = CxPlatAlloc(sizeof(OWNER_SIGNAL_QUEUE), QUICER_OWNER_SIGNAL); + return sig; +} + +void +OwnerSignalQueueInit(OWNER_SIGNAL_QUEUE *queue) +{ + queue->env = enif_alloc_env(); + CxPlatListInitializeHead(&queue->List); +} + +void +OwnerSignalQueueDestroy(OWNER_SIGNAL_QUEUE *queue) +{ + CXPLAT_DBG_ASSERT(CxPlatListIsEmpty(&queue->List)); + enif_free_env(queue->env); + CxPlatFree(queue, QUICER_OWNER_SIGNAL); +} + +OWNER_SIGNAL * +OwnerSignalAlloc() +{ + OWNER_SIGNAL *sig = CxPlatAlloc(sizeof(OWNER_SIGNAL), QUICER_OWNER_SIGNAL); + sig->Link.Flink = NULL; + return sig; +} + +void +OwnerSignalFree(OWNER_SIGNAL *sig) +{ + CXPLAT_FREE(sig, QUICER_OWNER_SIGNAL); +} + +void +OwnerSignalEnqueue(_In_ OWNER_SIGNAL_QUEUE *queue, _In_ OWNER_SIGNAL *sig) +{ + CxPlatListInsertTail(&queue->List, &sig->Link); +} + +OWNER_SIGNAL * +OwnerSignalDequeue(_In_ OWNER_SIGNAL_QUEUE *queue) +{ + OWNER_SIGNAL *sig; + if (CxPlatListIsEmpty(&queue->List)) + { + sig = NULL; + } + else + { + sig = CXPLAT_CONTAINING_RECORD( + CxPlatListRemoveHead(&queue->List), OWNER_SIGNAL, Link); + } + + return sig; +} diff --git a/c_src/quicer_owner_queue.h b/c_src/quicer_owner_queue.h new file mode 100644 index 00000000..800ccfce --- /dev/null +++ b/c_src/quicer_owner_queue.h @@ -0,0 +1,50 @@ +/*-------------------------------------------------------------------- +Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +-------------------------------------------------------------------*/ +#ifndef QUICER_OWNER_QUEUE_H_ +#define QUICER_OWNER_QUEUE_H_ + +#include +#include +#include +#include + + +#define QUICER_OWNER_SIGNAL 'E0rQ' // 'Er0d' QUICER_OWNER_SIGNAL + +// Owner Signal Queue +typedef struct OWNER_SIGNAL_QUEUE +{ + ErlNifEnv *env; + CXPLAT_LIST_ENTRY List; +} OWNER_SIGNAL_QUEUE; + +typedef struct OWNER_SIGNAL +{ + CXPLAT_LIST_ENTRY Link; + ERL_NIF_TERM msg; // resides in `env` of OWNER_SIGNAL_QUEUE + ERL_NIF_TERM orig_owner; // owner when msg is generated +} OWNER_SIGNAL; + +OWNER_SIGNAL_QUEUE *OwnerSignalQueueNew(); +OWNER_SIGNAL *OwnerSignalAlloc(); +void OwnerSignalQueueInit(OWNER_SIGNAL_QUEUE *queue); +void OwnerSignalQueueDestroy(OWNER_SIGNAL_QUEUE *queue); +void OwnerSignalFree(OWNER_SIGNAL *sig); +void OwnerSignalEnqueue(_In_ OWNER_SIGNAL_QUEUE *queue, + _In_ OWNER_SIGNAL *sig); +OWNER_SIGNAL *OwnerSignalDequeue(_In_ OWNER_SIGNAL_QUEUE *queue); + +#endif // QUICER_OWNER_QUEUE_H_ diff --git a/c_src/quicer_stream.c b/c_src/quicer_stream.c index 0acbadcf..eb1b4658 100644 --- a/c_src/quicer_stream.c +++ b/c_src/quicer_stream.c @@ -59,6 +59,9 @@ handle_stream_event_send_shutdown_complete(QuicerStreamCTX *s_ctx, static void reset_stream_recv(QuicerStreamCTX *s_ctx); +static int +signal_or_buffer(QuicerStreamCTX *s_ctx, ErlNifPid *owner, ERL_NIF_TERM sig); + QUIC_STATUS ServerStreamCallback(HQUIC Stream, void *Context, QUIC_STREAM_EVENT *Event) { @@ -1212,7 +1215,8 @@ handle_stream_event_send_shutdown_complete(QuicerStreamCTX *s_ctx, ATOM_SEND_SHUTDOWN_COMPLETE, enif_make_copy(env, s_ctx->eHandle), ATOM_BOOLEAN(is_graceful)); - enif_send(NULL, &(s_ctx->owner->Pid), NULL, report); + + signal_or_buffer(s_ctx, &(s_ctx->owner->Pid), report); return QUIC_STATUS_SUCCESS; } @@ -1268,6 +1272,136 @@ get_stream_owner1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) return res; } +// s_ctx MUST be locked +int +signal_or_buffer(QuicerStreamCTX *s_ctx, + ErlNifPid *owner_pid, + ERL_NIF_TERM msg) +{ + if (s_ctx->sig_queue != NULL) + { // Ongoing handoff... buffering + CXPLAT_FRE_ASSERT(ACCEPTOR_RECV_MODE_PASSIVE == s_ctx->owner->active); + ErlNifEnv *q_env = s_ctx->sig_queue->env; + OWNER_SIGNAL *sig = OwnerSignalAlloc(); + sig->msg = enif_make_copy(q_env, msg); + sig->orig_owner = enif_make_pid(q_env, owner_pid); + OwnerSignalEnqueue(s_ctx->sig_queue, sig); + return TRUE; + } + else + { + return enif_send(NULL, owner_pid, NULL, msg); + } +} + +// s_ctx MUST be locked +int +flush_sig_buffer(__unused_parm__ ErlNifEnv *env, QuicerStreamCTX *s_ctx) +{ + OWNER_SIGNAL *sig = NULL; + if (!s_ctx->sig_queue) + { + return FALSE; + } + + while ((sig = OwnerSignalDequeue(s_ctx->sig_queue))) + { + // if send failed, msg will be cleared in `OwnerSignalQueueDestroy` + enif_send(NULL, &(s_ctx->owner->Pid), NULL, sig->msg); + + OwnerSignalFree(sig); + } + OwnerSignalQueueDestroy(s_ctx->sig_queue); + s_ctx->sig_queue = NULL; + return TRUE; +} + +ERL_NIF_TERM +buffer_sig(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) +{ + QuicerStreamCTX *s_ctx; + ErlNifPid orig_pid; + ERL_NIF_TERM res = ATOM_OK; + + CXPLAT_FRE_ASSERT(argc == 3); + + if (!enif_get_resource(env, argv[0], ctx_stream_t, (void **)&s_ctx)) + { + return ERROR_TUPLE_2(ATOM_BADARG); + } + + if (!enif_get_local_pid(env, argv[1], &orig_pid)) + { + return ERROR_TUPLE_2(ATOM_BAD_PID); + } + + enif_mutex_lock(s_ctx->lock); + if (!s_ctx->sig_queue) + { + res = ERROR_TUPLE_2(ATOM_NONE); + goto Exit; + } + + if (!signal_or_buffer(s_ctx, &orig_pid, argv[2])) + { + res = ERROR_TUPLE_2(ATOM_FALSE); + } +Exit: + enif_mutex_unlock(s_ctx->lock); + return res; +} + +ERL_NIF_TERM +flush_stream_buffered_sigs(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) +{ + QuicerStreamCTX *s_ctx = NULL; + ERL_NIF_TERM res = ATOM_OK; + + CXPLAT_FRE_ASSERT(argc == 1); + + if (!enif_get_resource(env, argv[0], ctx_stream_t, (void **)&s_ctx)) + { + return ERROR_TUPLE_2(ATOM_BADARG); + } + enif_mutex_lock(s_ctx->lock); + if (!flush_sig_buffer(env, s_ctx)) + { + res = ERROR_TUPLE_2(ATOM_NONE); + } + enif_mutex_unlock(s_ctx->lock); + return res; +} + +/* +** Enable signal buffering. +** Signals are buffered instead of being sent to the owner. +** call `flush_stream_buffered_sigs` to flush the buffer. +*/ +ERL_NIF_TERM +enable_sig_buffer(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) +{ + QuicerStreamCTX *s_ctx = NULL; + ERL_NIF_TERM res = ATOM_OK; + + CXPLAT_FRE_ASSERT(argc == 1); + + if (!enif_get_resource(env, argv[0], ctx_stream_t, (void **)&s_ctx)) + { + return ERROR_TUPLE_2(ATOM_BADARG); + } + + enif_mutex_lock(s_ctx->lock); + if (!s_ctx->sig_queue) + { + s_ctx->owner->active = ACCEPTOR_RECV_MODE_PASSIVE; + s_ctx->sig_queue = OwnerSignalQueueNew(); + OwnerSignalQueueInit(s_ctx->sig_queue); + } + enif_mutex_unlock(s_ctx->lock); + + return res; +} + ///_* Emacs ///==================================================================== /// Local Variables: diff --git a/c_src/quicer_stream.h b/c_src/quicer_stream.h index 4037d68a..3b712ffb 100644 --- a/c_src/quicer_stream.h +++ b/c_src/quicer_stream.h @@ -66,3 +66,14 @@ get_stream_rid1(ErlNifEnv *env, int args, const ERL_NIF_TERM argv[]); ERL_NIF_TERM get_stream_owner1(ErlNifEnv *env, int args, const ERL_NIF_TERM argv[]); + +ERL_NIF_TERM +buffer_sig(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); + +ERL_NIF_TERM +flush_stream_buffered_sigs(ErlNifEnv *env, + int argc, + const ERL_NIF_TERM argv[]); + +ERL_NIF_TERM +enable_sig_buffer(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); diff --git a/src/quicer_nif.erl b/src/quicer_nif.erl index 4bb0ca03..fe8eeebc 100644 --- a/src/quicer_nif.erl +++ b/src/quicer_nif.erl @@ -62,7 +62,11 @@ get_connections/1, get_conn_owner/1, get_stream_owner/1, - get_listener_owner/1 + get_listener_owner/1, + buffer_sig/3, + %% @TODO move to API + flush_stream_buffered_sigs/1, + enable_sig_buffer/1 ]). -export([abi_version/0]). @@ -373,6 +377,19 @@ get_connections() -> get_connections(_RegHandle) -> erlang:nif_error(nif_library_not_loaded). +-spec enable_sig_buffer(stream_handle()) -> ok. +enable_sig_buffer(_H) -> + erlang:nif_error(nif_library_not_loaded). + +-spec buffer_sig(stream_handle(), OrigOwner :: pid(), term()) -> + ok | {error, false | none | bad_pid | bad_arg}. +buffer_sig(_StreamHandle, _OrigOwner, _Msg) -> + erlang:nif_error(nif_library_not_loaded). + +-spec flush_stream_buffered_sigs(stream_handle()) -> ok | {error, badarg | none}. +flush_stream_buffered_sigs(_H) -> + erlang:nif_error(nif_library_not_loaded). + %% Internals -spec locate_lib(file:name(), file:name()) -> {ok, file:filename()} | {error, not_found}. diff --git a/test/prop_stream_sig_queue.erl b/test/prop_stream_sig_queue.erl new file mode 100644 index 00000000..5b6eb84f --- /dev/null +++ b/test/prop_stream_sig_queue.erl @@ -0,0 +1,110 @@ +-module(prop_stream_sig_queue). +-include_lib("proper/include/proper.hrl"). +-include_lib("quicer/include/quicer_types.hrl"). +-include("prop_quic_types.hrl"). + +%%%%%%%%%%%%%%%%%% +%%% Properties %%% +%%%%%%%%%%%%%%%%%% +prop_buffer_sig_err_none() -> + ?FORALL( + {#prop_handle{handle = S}, Pid, Term}, + {valid_stream(), pid(), term()}, + begin + Res = quicer_nif:buffer_sig(S, Pid, Term), + Res == {error, none} + end + ). + +prop_enable_sig_queue() -> + ?FORALL( + #prop_handle{type = stream, handle = S, destructor = Destructor}, + valid_stream(), + begin + ok = quicer:setopt(S, active, 100), + ok = quicer_nif:enable_sig_buffer(S), + Res = quicer:getopt(S, active), + Destructor(), + Res == {ok, false} + end + ). + +prop_buffer_sig_success() -> + ?FORALL( + {#prop_handle{handle = S, destructor = Destructor}, Pid, Term}, + {valid_stream(), pid(), term()}, + begin + ok = quicer_nif:enable_sig_buffer(S), + Res = quicer_nif:buffer_sig(S, Pid, Term), + Destructor(), + Res == ok + end + ). + +prop_flush_buffered_sig_no_owner_change() -> + ?FORALL( + {#prop_handle{handle = S, destructor = Destructor}, Pid, TermList}, + {valid_stream(), pid(), list(term())}, + begin + ok = quicer_nif:enable_sig_buffer(S), + Ref = erlang:make_ref(), + lists:foreach( + fun(Term) -> + quicer_nif:buffer_sig(S, Pid, {Ref, Term}) + end, + TermList + ), + ok = quicer_nif:flush_stream_buffered_sigs(S), + Destructor(), + Rcvd = receive_n(length(TermList), Ref), + Rcvd == TermList + end + ). + +prop_flush_buffered_sig_success() -> + ?FORALL( + {#prop_handle{handle = S, destructor = Destructor}, Pid, TermList}, + {valid_stream(), pid(), list(integer())}, + begin + ok = quicer_nif:enable_sig_buffer(S), + Ref = erlang:make_ref(), + lists:foreach( + fun(Term) -> + ok = quicer_nif:buffer_sig(S, Pid, {Ref, Term}) + end, + TermList + ), + ok = quicer:controlling_process(S, self()), + {ok, NewOwner} = quicer:get_stream_owner(S), + NewOwner = self(), + ok = quicer_nif:flush_stream_buffered_sigs(S), + Res = receive_n(length(TermList), Ref), + Destructor(), + Res == TermList + end + ). + +%%%%%%%%%%%%%%% +%%% Helpers %%% +%%%%%%%%%%%%%%% +receive_n(N, Ref) -> + receive_n(N, Ref, []). +receive_n(0, _Ref, Acc) -> + lists:reverse(Acc); +receive_n(N, Ref, Acc) -> + receive + {Ref, X} -> + receive_n(N - 1, Ref, [X | Acc]); + {quic, _, _, _} = _Drop -> + receive_n(N, Ref, Acc) + after 1000 -> + {timeout, N} + end. + +%%%%%%%%%%%%%%%%%% +%%% Generators %%% +%%%%%%%%%%%%%%%%%% +valid_stream() -> quicer_prop_gen:valid_stream_handle(). + +pid() -> + quicer_prop_gen:pid().