Skip to content

Commit

Permalink
feat(stream): signal_or_buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
qzhuyan committed May 23, 2024
1 parent dbcc82a commit d24a795
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 40 deletions.
9 changes: 5 additions & 4 deletions c_src/quicer_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -1486,10 +1486,11 @@ stream_controlling_process(ErlNifEnv *env,
{
// rollback, must success
enif_self(env, &s_ctx->owner->Pid);
flush_sig_buffer(env, s_ctx);
enif_monitor_process(env, s_ctx, caller, &s_ctx->owner_mon);
return ERROR_TUPLE_2(ATOM_OWNER_DEAD);
}

flush_sig_buffer(env, s_ctx);
TP_NIF_3(exit, (uintptr_t)s_ctx->Stream, (uintptr_t)&s_ctx->owner->Pid);
return ATOM_OK;
}
Expand Down Expand Up @@ -1575,6 +1576,8 @@ static ErlNifFunc nif_funcs[] = {
{ "setopt", 4, setopt4, 0},
{ "controlling_process", 2, controlling_process, 0},
{ "peercert", 1, peercert1, 0},
{ "enable_sig_buffer", 1, enable_sig_buffer, 0},
{ "flush_stream_buffered_sigs", 1, flush_stream_buffered_sigs, 0},
/* for DEBUG */
{ "get_conn_rid", 1, get_conn_rid1, 1},
{ "get_stream_rid", 1, get_stream_rid1, 1},
Expand All @@ -1586,9 +1589,7 @@ static ErlNifFunc nif_funcs[] = {
{ "get_stream_owner", 1, get_stream_owner1, 0},
{ "get_listener_owner", 1, get_listener_owner1, 0},
/* for testing */
{"buffer_sig", 3, buffer_sig, 0},
{"enable_sig_buffer", 1, enable_sig_buffer, 0},
{"flush_stream_buffered_sigs", 1, flush_stream_buffered_sigs, 0}
{ "mock_buffer_sig", 3, mock_buffer_sig, 0}
// clang-format on
};

Expand Down
23 changes: 10 additions & 13 deletions c_src/quicer_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ static void reset_stream_recv(QuicerStreamCTX *s_ctx);
static int
signal_or_buffer(QuicerStreamCTX *s_ctx, ErlNifPid *owner, ERL_NIF_TERM sig);

static int flush_sig_buffer(ErlNifEnv *env, QuicerStreamCTX *s_ctx);

QUIC_STATUS
ServerStreamCallback(HQUIC Stream, void *Context, QUIC_STREAM_EVENT *Event)
{
Expand Down Expand Up @@ -1061,7 +1059,7 @@ handle_stream_event_start_complete(QuicerStreamCTX *s_ctx,
props_name,
props_value,
3);
enif_send(NULL, &(s_ctx->owner->Pid), NULL, report);
signal_or_buffer(s_ctx, &(s_ctx->owner->Pid), report);
}
return QUIC_STATUS_SUCCESS;
}
Expand All @@ -1080,7 +1078,7 @@ handle_stream_event_peer_send_shutdown(
enif_make_copy(env, s_ctx->eHandle),
ATOM_UNDEFINED);

enif_send(NULL, &(s_ctx->owner->Pid), NULL, report);
signal_or_buffer(s_ctx, &(s_ctx->owner->Pid), report);
return QUIC_STATUS_SUCCESS;
}

Expand All @@ -1101,7 +1099,7 @@ handle_stream_event_peer_send_aborted(QuicerStreamCTX *s_ctx,
enif_make_copy(env, s_ctx->eHandle),
enif_make_uint64(env, Event->PEER_SEND_ABORTED.ErrorCode));

enif_send(NULL, &(s_ctx->owner->Pid), NULL, report);
signal_or_buffer(s_ctx, &(s_ctx->owner->Pid), report);
return QUIC_STATUS_SUCCESS;
}

Expand All @@ -1121,7 +1119,7 @@ handle_stream_event_peer_receive_aborted(QuicerStreamCTX *s_ctx,
ATOM_PEER_RECEIVE_ABORTED,
enif_make_copy(env, s_ctx->eHandle),
enif_make_uint64(env, Event->PEER_RECEIVE_ABORTED.ErrorCode));
enif_send(NULL, &(s_ctx->owner->Pid), NULL, report);
signal_or_buffer(s_ctx, &(s_ctx->owner->Pid), report);
return QUIC_STATUS_SUCCESS;
}

Expand Down Expand Up @@ -1154,7 +1152,7 @@ handle_stream_event_shutdown_complete(QuicerStreamCTX *s_ctx,
props_name,
props_value,
6);
enif_send(NULL, &(s_ctx->owner->Pid), NULL, report);
signal_or_buffer(s_ctx, &(s_ctx->owner->Pid), report);
return QUIC_STATUS_SUCCESS;
}

Expand All @@ -1171,7 +1169,7 @@ handle_stream_event_peer_accepted(QuicerStreamCTX *s_ctx,
ATOM_PEER_ACCEPTED,
enif_make_copy(env, s_ctx->eHandle),
ATOM_UNDEFINED);
enif_send(NULL, &(s_ctx->owner->Pid), NULL, report);
signal_or_buffer(s_ctx, &(s_ctx->owner->Pid), report);
return QUIC_STATUS_SUCCESS;
}

Expand Down Expand Up @@ -1282,9 +1280,8 @@ signal_or_buffer(QuicerStreamCTX *s_ctx,
ErlNifPid *owner_pid,
ERL_NIF_TERM msg)
{
if (s_ctx->sig_queue != NULL)
{ // Ongoing handoff... buffering
CXPLAT_FRE_ASSERT(ACCEPTOR_RECV_MODE_PASSIVE == s_ctx->owner->active);
if (s_ctx && s_ctx->sig_queue != NULL)
{
ErlNifEnv *q_env = s_ctx->sig_queue->env;
OWNER_SIGNAL *sig = OwnerSignalAlloc();
sig->msg = enif_make_copy(q_env, msg);
Expand All @@ -1299,7 +1296,7 @@ signal_or_buffer(QuicerStreamCTX *s_ctx,
}

// s_ctx MUST be locked
int
BOOLEAN
flush_sig_buffer(ErlNifEnv *env, QuicerStreamCTX *s_ctx)
{
OWNER_SIGNAL *sig = NULL;
Expand All @@ -1321,7 +1318,7 @@ flush_sig_buffer(ErlNifEnv *env, QuicerStreamCTX *s_ctx)
}

ERL_NIF_TERM
buffer_sig(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
mock_buffer_sig(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
{
QuicerStreamCTX *s_ctx;
ErlNifPid orig_pid;
Expand Down
9 changes: 8 additions & 1 deletion c_src/quicer_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ limitations under the License.
#define __QUICER_STREAM_H_

#include "quicer_config.h"
#include "quicer_ctx.h"
#include "quicer_internal.h"
#include "quicer_nif.h"

#define UNSET_STREAMID 0xFFFFFFFFFFFFFFF

struct QuicerStreamCTX;

typedef enum QUICER_SEND_FLAGS
{
QUICER_SEND_FLAGS_SYNC = 0x1000
Expand Down Expand Up @@ -68,7 +71,7 @@ ERL_NIF_TERM
get_stream_owner1(ErlNifEnv *env, int args, const ERL_NIF_TERM argv[]);

ERL_NIF_TERM
buffer_sig(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
mock_buffer_sig(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);

ERL_NIF_TERM
flush_stream_buffered_sigs(ErlNifEnv *env,
Expand All @@ -77,3 +80,7 @@ flush_stream_buffered_sigs(ErlNifEnv *env,

ERL_NIF_TERM
enable_sig_buffer(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);

typedef struct QuicerStreamCTX QuicerStreamCTX;
BOOLEAN
flush_sig_buffer(ErlNifEnv *env, QuicerStreamCTX *s_ctx);
16 changes: 11 additions & 5 deletions src/quicer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,9 @@ get_listener_owner(Listener) ->
quicer_nif:get_listener_owner(Listener).

%% @doc set controlling process for Connection/Stream.
%% For Stream, also flush the sig buffer to old owner if failed or new owner if succeeded.
%% mimic {@link ssl:controlling_process/2}
%% @see handoff_stream/2
%% @see wait_for_handoff/2
%% @end
-spec controlling_process(connection_handle() | stream_handle(), pid()) ->
Expand Down Expand Up @@ -1129,15 +1131,17 @@ handoff_stream(Stream, NewOwner) ->
handoff_stream(Stream, NewOwner, undefined).

%% @doc Used by Old stream owner to handoff to the new stream owner.
%% 1. The Stream will be put into passive mode.
%% 2. Stream messages in the current owners process messages queue will
%% 1. The Stream will be put into passive mode so the data is paused.
%% 2. The Stream signal buffer will be enabled, so the signal is paused.
%% 3. Stream messages (for both data and sig )in the current owners process messages queue will
%% be forwarded to the New Owner's mailbox in the same recv order.
%% 3. Set the control process of the stream to the new owner.
%% 4. A signal msg `{handoff_done, Stream, PostHandoff}' will be sent to the new owner.
%% 4. Set the control process of the stream to the new owner, signal buffer will be flushed to new owner if succeed, otherwise to the old owner
%% 5. A signal msg `{handoff_done, Stream, PostHandoff}' will be sent to the new owner.
%% The new owner should block for this message before handle any stream data to
%% ensure the ordering.
%% 5. Revert stream active mode whatever handoff fail or success.
%% 6. Revert stream active mode whatever handoff fail or success.
%% also @see wait_for_handoff/2
%% also @see controlling_process/2
%% @end
-spec handoff_stream(stream_handle(), pid(), term()) -> ok | {error, any()}.
handoff_stream(Stream, NewOwner, HandoffData) when NewOwner == self() ->
Expand All @@ -1150,13 +1154,15 @@ handoff_stream(Stream, NewOwner, HandoffData) ->
case quicer:getopt(Stream, active) of
{ok, ActiveN} ->
ActiveN =/= false andalso quicer:setopt(Stream, active, false),
ok = quicer_nif:enable_sig_buffer(Stream),
Res =
case forward_stream_msgs(Stream, NewOwner) of
ok ->
_ = quicer:controlling_process(Stream, NewOwner),
NewOwner ! {handoff_done, Stream, HandoffData},
ok;
{error, _} = Other ->
_ = quicer_nif:flush_stream_buffered_sigs(Stream),
Other
end,
ActiveN =/= false andalso quicer:setopt(Stream, active, ActiveN),
Expand Down
28 changes: 18 additions & 10 deletions src/quicer_nif.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
getopt/3,
setopt/4,
controlling_process/2,
peercert/1
peercert/1,
enable_sig_buffer/1,
flush_stream_buffered_sigs/1
]).

-export([
Expand All @@ -63,10 +65,7 @@
get_conn_owner/1,
get_stream_owner/1,
get_listener_owner/1,
buffer_sig/3,
%% @TODO move to API
flush_stream_buffered_sigs/1,
enable_sig_buffer/1
mock_buffer_sig/3
]).

-export([abi_version/0]).
Expand Down Expand Up @@ -377,19 +376,28 @@ get_connections() ->
get_connections(_RegHandle) ->
erlang:nif_error(nif_library_not_loaded).

%% @doc enable signal buffering, used in stream handoff.
%% * not exposed API.
-spec enable_sig_buffer(stream_handle()) -> ok.
enable_sig_buffer(_H) ->
erlang:nif_error(nif_library_not_loaded).

-spec buffer_sig(stream_handle(), OrigOwner :: pid(), term()) ->
ok | {error, false | none | bad_pid | bad_arg}.
buffer_sig(_StreamHandle, _OrigOwner, _Msg) ->
erlang:nif_error(nif_library_not_loaded).

%% @doc flush buffered stream signals to the current owner
%% * not exposed API.
%% also @see quicer:controlling_process/2
%% @end
-spec flush_stream_buffered_sigs(stream_handle()) -> ok | {error, badarg | none}.
flush_stream_buffered_sigs(_H) ->
erlang:nif_error(nif_library_not_loaded).

%% @doc mock buffer a signal in sig_buffer.
%% for testing sig_buffer
%% @end
-spec mock_buffer_sig(stream_handle(), OrigOwner :: pid(), term()) ->
ok | {error, false | none | bad_pid | bad_arg}.
mock_buffer_sig(_StreamHandle, _OrigOwner, _Msg) ->
erlang:nif_error(nif_library_not_loaded).

%% Internals
-spec locate_lib(file:name(), file:name()) ->
{ok, file:filename()} | {error, not_found}.
Expand Down
18 changes: 15 additions & 3 deletions test/example_client_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,13 @@ new_stream(
%% Spawn new stream
case quicer_remote_stream:start_link(example_server_stream, Stream, Conn, SOpts, Flags) of
{ok, StreamOwner} ->
quicer:handoff_stream(Stream, StreamOwner),
{ok, CBState#{streams := [{StreamOwner, Stream} | Streams]}};
case quicer:handoff_stream(Stream, StreamOwner) of
ok ->
{ok, CBState#{streams := [{StreamOwner, Stream} | Streams]}};
{error, E} ->
%% record bad stream
{ok, CBState#{streams := [{E, Stream} | Streams]}}
end;
Other ->
Other
end.
Expand Down Expand Up @@ -152,5 +157,12 @@ handle_info({quic, Sig, Stream, _} = Msg, #{streams := Streams} = S) when
Sig == peer_send_shutdown orelse Sig == stream_closed
->
{OwnerPid, Stream} = lists:keyfind(Stream, 2, Streams),
OwnerPid ! Msg,
NewS =
case OwnerPid == owner_down orelse OwnerPid == closed of
true ->
quicer:async_shutdown_stream(Stream),
S#{streams := lists:keydelete(Stream, 2, Streams)};
false ->
error(fixme)
end,
{ok, S}.
8 changes: 4 additions & 4 deletions test/prop_stream_sig_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ prop_buffer_sig_err_none() ->
{#prop_handle{handle = S, destructor = Destructor}, Pid, Term},
{valid_stream(), pid(), term()},
begin
Res = quicer_nif:buffer_sig(S, Pid, Term),
Res = quicer_nif:mock_buffer_sig(S, Pid, Term),
Destructor(),
Res == {error, none}
end
Expand All @@ -36,7 +36,7 @@ prop_buffer_sig_success() ->
{valid_stream(), pid(), term()},
begin
ok = quicer_nif:enable_sig_buffer(S),
Res = quicer_nif:buffer_sig(S, Pid, Term),
Res = quicer_nif:mock_buffer_sig(S, Pid, Term),
Destructor(),
Res == ok
end
Expand All @@ -51,7 +51,7 @@ prop_flush_buffered_sig_no_owner_change() ->
Ref = erlang:make_ref(),
lists:foreach(
fun(Term) ->
quicer_nif:buffer_sig(S, Pid, {Ref, Term})
quicer_nif:mock_buffer_sig(S, Pid, {Ref, Term})
end,
TermList
),
Expand All @@ -71,7 +71,7 @@ prop_flush_buffered_sig_success() ->
Ref = erlang:make_ref(),
lists:foreach(
fun(Term) ->
ok = quicer_nif:buffer_sig(S, Pid, {Ref, Term})
ok = quicer_nif:mock_buffer_sig(S, Pid, {Ref, Term})
end,
TermList
),
Expand Down

0 comments on commit d24a795

Please sign in to comment.