Skip to content

Commit

Permalink
feat: stop then start listener (#205)
Browse files Browse the repository at this point in the history
add API to stop and restart the listener with Listener handle
fix config resource leaks in listener
rename start_listener to spawn listener for supervised listener
rename stop_listener to terminate listener for supervised listener

* feat: stop then start listener
* api: rename listener start/stop to spawn/terminate
  • Loading branch information
qzhuyan authored Aug 22, 2023
1 parent 4a752f5 commit bd633e2
Show file tree
Hide file tree
Showing 10 changed files with 308 additions and 98 deletions.
19 changes: 9 additions & 10 deletions c_src/quicer_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -726,18 +726,17 @@ encode_parm_to_eterm(ErlNifEnv *env,
|| QUIC_PARAM_GLOBAL_RETRY_MEMORY_PERCENT == Param)))
{
if (BufferLength == sizeof(uint64_t))
{
res = SUCCESS(ETERM_UINT_64(*(uint64_t *)Buffer));
}
{
res = SUCCESS(ETERM_UINT_64(*(uint64_t *)Buffer));
}
else if (BufferLength == sizeof(uint32_t))
{
res = SUCCESS(ETERM_INT(*(uint32_t *)Buffer));
}
{
res = SUCCESS(ETERM_INT(*(uint32_t *)Buffer));
}
else if (BufferLength == sizeof(uint16_t))
{
res = SUCCESS(ETERM_INT(*(uint16_t *)Buffer));
}

{
res = SUCCESS(ETERM_INT(*(uint16_t *)Buffer));
}
}
else if ((QUICER_PARAM_HANDLE_TYPE_CONN == Type
&& (QUIC_PARAM_CONN_REMOTE_ADDRESS == Param
Expand Down
4 changes: 4 additions & 0 deletions c_src/quicer_ctx.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ deinit_l_ctx(QuicerListenerCTX *l_ctx)
void
destroy_l_ctx(QuicerListenerCTX *l_ctx)
{
// @note, Destroy config asap as it holds rundown
// ref count in registration
destroy_config_ctx(l_ctx->config_resource);
l_ctx->config_resource = NULL;
enif_release_resource(l_ctx);
}

Expand Down
1 change: 1 addition & 0 deletions c_src/quicer_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ typedef struct QuicerListenerCTX
// Listener handle closed flag
// false means the handle is invalid
BOOLEAN is_closed;
BOOLEAN is_stopped;
BOOLEAN allow_insecure;
void *reserved1;
void *reserved2;
Expand Down
147 changes: 132 additions & 15 deletions c_src/quicer_listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -216,23 +216,20 @@ ServerListenerCallback(__unused_parm__ HQUIC Listener,

case QUIC_LISTENER_EVENT_STOP_COMPLETE:
env = l_ctx->env;

// Close listener in NIF CTX leads to NULL Listener HQUIC
assert(l_ctx->Listener == NULL);

// Dummy call to prevent leakage if handle is not NULL
// @TODO they should be removed when we support ListenerStop call
MsQuic->ListenerClose(l_ctx->Listener);
l_ctx->Listener = NULL;

enif_send(NULL,
&(l_ctx->listenerPid),
NULL,
enif_make_tuple3(env,
ATOM_QUIC,
ATOM_LISTENER_STOPPED,
enif_make_resource(env, l_ctx)));
is_destroy = TRUE;
if (!l_ctx->Listener)
{
// @NOTE This callback is part of the listener *close* process
// Listener is already closing, we can destroy the l_ctx now.
assert(!l_ctx->is_stopped);
is_destroy = TRUE;
}
enif_clear_env(env);
break;
default:
Expand Down Expand Up @@ -284,7 +281,6 @@ listen2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[])
return ERROR_TUPLE_2(ATOM_BADARG);
}


// Build CredConfig
QUIC_CREDENTIAL_CONFIG CredConfig;
CxPlatZeroMemory(&CredConfig, sizeof(QUIC_CREDENTIAL_CONFIG));
Expand Down Expand Up @@ -452,6 +448,8 @@ listen2(ErlNifEnv *env, __unused_parm__ int argc, const ERL_NIF_TERM argv[])
l_ctx->Listener, alpn_buffers, alpn_buffer_length, &Address)))
{
TP_NIF_3(start_fail, (uintptr_t)(l_ctx->Listener), Status);
MsQuic->ListenerClose(l_ctx->Listener);
l_ctx->Listener = NULL;
destroy_l_ctx(l_ctx);
return ERROR_TUPLE_3(ATOM_LISTENER_START_ERROR, ATOM_STATUS(Status));
}
Expand All @@ -465,21 +463,140 @@ close_listener1(ErlNifEnv *env,
const ERL_NIF_TERM argv[])
{
QuicerListenerCTX *l_ctx;
BOOLEAN is_destroy = FALSE;
ERL_NIF_TERM ret = ATOM_OK;
if (!enif_get_resource(env, argv[0], ctx_listener_t, (void **)&l_ctx))
{
return ERROR_TUPLE_2(ATOM_BADARG);
}

enif_mutex_lock(l_ctx->lock);
if (l_ctx->is_closed)
{
enif_mutex_unlock(l_ctx->lock);
return ERROR_TUPLE_2(ATOM_CLOSED);
}
HQUIC l = l_ctx->Listener;
// set before destroy_l_ctx
l_ctx->Listener = NULL;
l_ctx->is_closed = TRUE;

// If is_stopped, it means the listener is already stopped.
// there will be no callback for QUIC_LISTENER_EVENT_STOP_COMPLETE
// so we need to destroy the l_ctx otherwise it will leak.
is_destroy = l_ctx->is_stopped;

enif_mutex_unlock(l_ctx->lock);

// It is safe to close it without holding the lock
// This also ensures no ongoing listener callbacks
// This is a blocking call. @TODO have async version or use dirty scheduler
MsQuic->ListenerClose(l);
if (is_destroy)
{
destroy_l_ctx(l_ctx);
}
return ret;
}

ERL_NIF_TERM
stop_listener1(ErlNifEnv *env,
__unused_parm__ int argc,
const ERL_NIF_TERM argv[])
{
QuicerListenerCTX *l_ctx;
ERL_NIF_TERM ret = ATOM_OK;
BOOLEAN is_stopped = FALSE;
assert(argc == 1);
if (!enif_get_resource(env, argv[0], ctx_listener_t, (void **)&l_ctx))
{
return ERROR_TUPLE_2(ATOM_BADARG);
}

enif_mutex_lock(l_ctx->lock);
HQUIC l = l_ctx->Listener;
is_stopped = l_ctx->is_stopped;
l_ctx->is_stopped = TRUE;
enif_mutex_unlock(l_ctx->lock);
if (!l)
{
return ERROR_TUPLE_2(ATOM_CLOSED);
}
else if (!is_stopped)
{
// void return
MsQuic->ListenerStop(l);
}
return ret;
}

ERL_NIF_TERM
start_listener3(ErlNifEnv *env,
__unused_parm__ int argc,
const ERL_NIF_TERM argv[])
{
ERL_NIF_TERM listener_handle = argv[0];
ERL_NIF_TERM elisten_on = argv[1];
ERL_NIF_TERM options = argv[2];

QuicerListenerCTX *l_ctx;
unsigned alpn_buffer_length = 0;
QUIC_BUFFER alpn_buffers[MAX_ALPN];
QUIC_ADDR Address = {};
int UdpPort = 0;

// Return value
ERL_NIF_TERM ret = ATOM_OK;
QUIC_STATUS Status = QUIC_STATUS_SUCCESS;

if (!enif_get_resource(
env, listener_handle, ctx_listener_t, (void **)&l_ctx))
{
return ERROR_TUPLE_2(ATOM_BADARG);
}

char listen_on[INET6_ADDRSTRLEN + 6] = { 0 };
if (enif_get_string(
env, elisten_on, listen_on, INET6_ADDRSTRLEN + 6, ERL_NIF_LATIN1)
> 0)
{
if (!(QuicAddr4FromString(listen_on, &Address)
|| QuicAddr6FromString(listen_on, &Address)))
{
return ERROR_TUPLE_2(ATOM_BADARG);
}
}
else if (enif_get_int(env, elisten_on, &UdpPort) && UdpPort >= 0)
{
QuicAddrSetFamily(&Address, QUIC_ADDRESS_FAMILY_UNSPEC);
QuicAddrSetPort(&Address, (uint16_t)UdpPort);
}
else
{
return ERROR_TUPLE_2(ATOM_BADARG);
}

if (!load_alpn(env, &options, &alpn_buffer_length, alpn_buffers))
{
return ERROR_TUPLE_2(ATOM_ALPN);
}

enif_mutex_lock(l_ctx->lock);
if (!l_ctx->Listener)
{
ret = ERROR_TUPLE_2(ATOM_CLOSED);
goto exit;
}

if (QUIC_FAILED(
Status = MsQuic->ListenerStart(
l_ctx->Listener, alpn_buffers, alpn_buffer_length, &Address)))
{
TP_NIF_3(start_fail, (uintptr_t)(l_ctx->Listener), Status);
ret = ERROR_TUPLE_3(ATOM_LISTENER_START_ERROR, ATOM_STATUS(Status));
goto exit;
}
l_ctx->is_stopped = FALSE;

exit:
enif_mutex_unlock(l_ctx->lock);

return ATOM_OK;
return ret;
}
7 changes: 7 additions & 0 deletions c_src/quicer_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ QUIC_STATUS ServerListenerCallback(HQUIC Listener,
QUIC_LISTENER_EVENT *Event);

ERL_NIF_TERM listen2(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);

ERL_NIF_TERM
start_listener3(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);

ERL_NIF_TERM
stop_listener1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);

ERL_NIF_TERM
close_listener1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);

Expand Down
2 changes: 2 additions & 0 deletions c_src/quicer_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -1419,6 +1419,8 @@ static ErlNifFunc nif_funcs[] = {
{ "reg_open", 1, registration, 0 },
{ "reg_close", 0, deregistration, 0 },
{ "listen", 2, listen2, 0},
{ "start_listener", 3, start_listener3, 0},
{ "stop_listener", 1, stop_listener1, 0},
{ "close_listener", 1, close_listener1, 0},
{ "open_connection", 0, open_connection0, 0},
{ "async_connect", 3, async_connect3, 0},
Expand Down
65 changes: 57 additions & 8 deletions src/quicer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@

%% Traffic APIs
-export([ listen/2
, stop_listener/1
, start_listener/3
, close_listener/1
, close_listener/2
, connect/4
, async_connect/3
, handshake/1
Expand Down Expand Up @@ -107,8 +110,8 @@
, open_connection/0
]).

-export([ start_listener/3 %% start application over quic
, stop_listener/1
-export([ spawn_listener/3 %% start application over quic
, terminate_listener/1
]).

-type connection_opts() :: proplists:proplist() | quicer_connection:opts().
Expand Down Expand Up @@ -168,17 +171,42 @@ reg_open(Profile) ->
reg_close() ->
quicer_nif:reg_close().

-spec start_listener(Appname :: atom(), listen_on(),
%% @doc Start a stopped listener with listener handle.
-spec start_listener(listener_handle(), listen_on(), listen_opts()) ->
{ok, pid()} | {error, any()}.
start_listener(Listener, Port, Options) when is_list(Options)->
start_listener(Listener, Port, maps:from_list(Options));
start_listener(Listener, Port, Options) ->
quicer_nif:start_listener(Listener, Port, Options).

%% @doc Stop a started listener which could be closed or restarted later.
-spec stop_listener(listener_handle()) -> ok.
stop_listener(Handle) ->
case quicer_nif:stop_listener(Handle) of
ok ->
receive
{quic, listener_stopped, Handle} ->
ok
end;
%% @TODO handle already stopped
{error, Reason} ->
{error, Reason}
end.


%% @doc start a listener process under supervisor tree
-spec spawn_listener(Appname :: atom() | listener_handle(), listen_on(),
{listener_opts(),
connection_opts(),
stream_opts() | user_opts()}
) ->
{ok, pid()} | {error, any()}.
start_listener(AppName, Port, Options) ->
spawn_listener(AppName, Port, Options) when is_atom(AppName) ->
quicer_listener:start_listener(AppName, Port, Options).

-spec stop_listener(atom()) -> ok.
stop_listener(AppName) ->
%% @doc terminate a listener process under supervisor tree
-spec terminate_listener(atom() | listener_handle()) -> ok.
terminate_listener(AppName) when is_atom(AppName)->
quicer_listener:stop_listener(AppName).

%% @doc Start listen on Port or "HOST:PORT".
Expand All @@ -202,9 +230,30 @@ listen(ListenOn, Opts) when is_map(Opts) ->
quicer_nif:listen(ListenOn, Opts).

%% @doc close listener with listener handle
-spec close_listener(listener_handle()) -> ok.
-spec close_listener(listener_handle()) -> ok | {error, badarg | closed | timeout}.
close_listener(Listener) ->
quicer_nif:close_listener(Listener).
close_listener(Listener, 5000).

-spec close_listener(listener_handle(), timer:time()) ->
ok | {error, badarg | closed | timeout}.
close_listener(Listener, Timeout) ->
case quicer_nif:close_listener(Listener) of
ok when Timeout == 0 ->
ok;
ok ->
receive
{quic, listener_stopped, Listener} ->
ok
after Timeout ->
{error, timeout}
end;
{error, closed} ->
%% already closed
%% follow OTP behavior
ok;
{error, _} = E->
E
end.

%% @doc
%% Initiate New Connection (Client)
Expand Down
12 changes: 11 additions & 1 deletion src/quicer_nif.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
, reg_open/1
, reg_close/0
, listen/2
, start_listener/3
, stop_listener/1
, close_listener/1
, async_connect/3
, async_accept/2
Expand Down Expand Up @@ -113,10 +115,18 @@ reg_close() ->
listen(_ListenOn, _Options) ->
erlang:nif_error(nif_library_not_loaded).

-spec close_listener(listener_handle()) -> ok.
-spec start_listener(listener_handle(), listen_on(), listen_opts()) -> ok | {error, closed | badarg}.
start_listener(_Listener, _ListenOn, _Opts) ->
erlang:nif_error(nif_library_not_loaded).

-spec close_listener(listener_handle()) -> ok | {error, closed | badarg}.
close_listener(_Listener) ->
erlang:nif_error(nif_library_not_loaded).

-spec stop_listener(listener_handle()) -> ok | {error, closed | badarg}.
stop_listener(_Listener) ->
erlang:nif_error(nif_library_not_loaded).

-spec open_connection() -> {ok, connection_handle()} | {error, atom_reason()}.
open_connection() ->
erlang:nif_error(nif_library_not_loaded).
Expand Down
Loading

0 comments on commit bd633e2

Please sign in to comment.