Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

msquic2.2 evt peer needs streams #217

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions c_src/quicer_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -1355,20 +1355,12 @@ handle_connection_event_peer_needs_streams(
assert(QUIC_CONNECTION_EVENT_PEER_NEEDS_STREAMS == Event->Type);
assert(c_ctx->Connection);
ErlNifEnv *env = c_ctx->env;
/* reserved for the future upgrade
ERL_NIF_TERM props_name[] = { Event->PEER_NEEDS_STREAMS.Bidirectional ?
ATOM_BIDI_STREAMS : ATOM_UNIDI_STREAMS }; ERL_NIF_TERM props_value[] = {
enif_make_uint64(env, Event->PEER_NEEDS_STREAMS.StreamLimit) }; ERL_NIF_TERM
report = make_event_with_props(env, ATOM_PEER_NEEDS_STREAMS,
enif_make_resource(env, c_ctx),
props_name,
props_value,
1);
*/
ERL_NIF_TERM report = make_event(env,
ATOM_PEER_NEEDS_STREAMS,
enif_make_resource(env, c_ctx),
ATOM_UNDEFINED);
Event->PEER_NEEDS_STREAMS.Bidirectional
? ATOM_BIDI_STREAMS
: ATOM_UNIDI_STREAMS);

enif_send(NULL, &(c_ctx->owner->Pid), NULL, report);
return QUIC_STATUS_SUCCESS;
Expand Down
1 change: 1 addition & 0 deletions c_src/quicer_eterms.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ extern ERL_NIF_TERM ATOM_NEW_STREAM;
/*----------------------------------------------------------*/
extern ERL_NIF_TERM ATOM_SNABBKAFFE_COLLECTOR;
extern ERL_NIF_TERM ATOM_TRACE;
extern ERL_NIF_TERM ATOM_TIME;
// Trace point Context, nif for callback
extern ERL_NIF_TERM ATOM_CONTEXT;
extern ERL_NIF_TERM ATOM_NIF;
Expand Down
2 changes: 2 additions & 0 deletions c_src/quicer_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ ERL_NIF_TERM ATOM_NEW_STREAM;
/*----------------------------------------------------------*/
ERL_NIF_TERM ATOM_SNABBKAFFE_COLLECTOR;
ERL_NIF_TERM ATOM_TRACE;
ERL_NIF_TERM ATOM_TIME;
// Trace point Context, nif for callback
ERL_NIF_TERM ATOM_CONTEXT;
ERL_NIF_TERM ATOM_NIF;
Expand Down Expand Up @@ -715,6 +716,7 @@ ERL_NIF_TERM ATOM_QUIC_DATAGRAM_SEND_CANCELED;
ATOM(ATOM_NEW_STREAM, new_stream); \
ATOM(ATOM_SNABBKAFFE_COLLECTOR, snabbkaffe_collector); \
ATOM(ATOM_TRACE, trace); \
ATOM(ATOM_TIME, time); \
ATOM(ATOM_CONTEXT, context); \
ATOM(ATOM_NIF, nif); \
ATOM(ATOM_CALLBACK, callback); \
Expand Down
16 changes: 15 additions & 1 deletion c_src/quicer_tp.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#define TRACEPOINT_DEFINE
#include "quicer_tp.h"

extern uint64_t CxPlatTimeUs64(void);

void
tp_snk(ErlNifEnv *env,
const char *ctx,
Expand All @@ -18,14 +20,26 @@ tp_snk(ErlNifEnv *env,
= { ATOM_SNK_KIND, ATOM_CONTEXT, ATOM_FUNCTION, ATOM_TAG,
ATOM_RESOURCE_ID, ATOM_MARK, ATOM_SNK_META };

ERL_NIF_TERM snk_evt_meta;
ERL_NIF_TERM snk_evt_meta_key_array[1] = { ATOM_TIME };
ERL_NIF_TERM snk_evt_meta_val_array[1]
= { enif_make_uint64(env, CxPlatTimeUs64()) };

// shall never fail
enif_make_map_from_arrays(env,
snk_evt_meta_key_array,
snk_evt_meta_val_array,
1,
&snk_evt_meta);

ERL_NIF_TERM snk_event_val_array[7] = {
ATOM_DEBUG, // snk_kind
enif_make_string(env, ctx, ERL_NIF_LATIN1), // context
enif_make_string(env, fun, ERL_NIF_LATIN1), // fun
enif_make_string(env, tag, ERL_NIF_LATIN1), // tag
enif_make_uint64(env, rid), // rid
enif_make_uint64(env, mark), // mark
enif_make_new_map(env) // snk_meta
snk_evt_meta // snk_meta
};

enif_make_map_from_arrays(
Expand Down
2 changes: 1 addition & 1 deletion docs/messages_to_owner.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ More streams are available due to flow control from the peer.

Peer wants to open more streams but cannot due to flow control
```erlang
{quic, peer_needs_streams, connection_handle(), undefined}
{quic, peer_needs_streams, connection_handle(), unidi_streams | bidi_streams}
```

### Ideal processor changed
Expand Down
2 changes: 1 addition & 1 deletion src/quicer_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ handle_info({quic, peer_needs_streams, C, Needs},
#{ conn := C
, callback := M
, callback_state := CbState} = State) ->
?tp_ignore_side_effects_in_prod(debug, #{module => ?MODULE, conn => C, event => peer_needs_streams}),
?tp_ignore_side_effects_in_prod(debug, #{module => ?MODULE, conn => C, event => peer_needs_streams, needs => Needs}),
default_cb_ret(M:peer_needs_streams(C, Needs, CbState), State);

handle_info({quic, connection_resumed, C, ResumeData},
Expand Down
2 changes: 1 addition & 1 deletion src/quicer_server_conn_callback.erl
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ streams_available(_C, {BidirCnt, UnidirCnt}, S) ->
, peer_bidi_stream_count => BidirCnt}}.

%% @doc May integrate with App flow control
peer_needs_streams(_C, undefined, S) ->
peer_needs_streams(_C, _UnidiOrBidi, S) ->
{ok, S}.

connected(Conn, _Flags, #{ slow_start := false, stream_opts := SOpts
Expand Down
7 changes: 4 additions & 3 deletions test/example_server_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,12 @@ local_address_changed(_C, _NewAddr, S) ->
streams_available(_C, {_BidirCnt, _UnidirCnt}, S) ->
{ok, S}.

peer_needs_streams(C, #{unidi_streams := Current}, S) ->
peer_needs_streams(C, unidi_streams, S) ->
{ok, Current} = quicer:getopt(C, param_conn_local_unidi_stream_count),
ok = quicer:setopt(C, param_conn_settings, #{peer_unidi_stream_count => Current + 1}),
{ok, S};
peer_needs_streams(C, #{bidi_streams := Current}, S) ->
ok = quicer:setopt(C, param_conn_settings, #{peer_bidi_stream_count => Current + 1}),
peer_needs_streams(_C, bidi_streams, S) ->
%% leave it for test case to unblock it, see tc_multi_streams_example_server_3
{ok, S};
%% for https://github.com/microsoft/msquic/issues/3120
peer_needs_streams(_C, undefined, S) ->
Expand Down
14 changes: 4 additions & 10 deletions test/quicer_snb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1996,20 +1996,14 @@ tc_multi_streams_example_server_1(Config) ->
true = quicer:is_unidirectional(Flag),
Incoming
after 1000 ->
%%ct:fail("no incoming stream")
%% reenable the check when it is fixed.
%% https://github.com/microsoft/msquic/issues/3120
ok
ct:fail("no incoming stream")
end,
receive
{quic, Data, Stm3In, DFlag} ->
ct:pal("~p is received from ~p with flag: ~p", [Data, Stm3In, DFlag]),
?assertEqual(Data, <<"ping3">>)
after 1000 ->
%% ct:fail("no incoming data")
%% reenable the check when it is fixed.
%% https://github.com/microsoft/msquic/issues/3120
ok
ct:fail("no incoming data")
end,
quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0),
receive
Expand All @@ -2020,7 +2014,7 @@ tc_multi_streams_example_server_1(Config) ->
end,
fun(_Result, Trace) ->
ct:pal("Trace is ~p", [Trace]),
?assertMatch([{pair, _, _}],
?assertMatch([{pair, _, _}, {pair, _, _}],
?find_pairs(
#{ ?snk_kind := debug
, event := handoff_stream
Expand All @@ -2033,7 +2027,7 @@ tc_multi_streams_example_server_1(Config) ->
, stream := _STREAM0
},
Trace)),
?assertMatch([{pair, _, _}],
?assertMatch([{pair, _, _}, {pair, _, _}],
?find_pairs( #{ ?snk_kind := debug
, event := handoff_stream
, module := quicer
Expand Down