Skip to content

Commit

Permalink
chore: switch to tp_ignore_side_effects_in_prod of snabbkaffe 1.0.9
Browse files Browse the repository at this point in the history
  • Loading branch information
qzhuyan committed Sep 5, 2023
1 parent b6d3507 commit b56cff6
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 37 deletions.
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
[{"(linux|darwin|solaris)", compile, "make build-nif"}
]}.

{deps, [{snabbkaffe, "1.0.1"}
{deps, [{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.9"}}}
]}.

{profiles,
Expand Down
8 changes: 4 additions & 4 deletions src/quicer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ accept_stream(Conn, Opts, Timeout) when is_map(Opts) ->
{quic, new_stream, Stream, _StreamProps} ->
{ok, Stream};
{quic, closed, undefined, undefined} ->
?tp(debug, stream_acceptor_conn_closed, #{ conn => Conn }),
?tp_ignore_side_effects_in_prod(stream_acceptor_conn_closed, #{ conn => Conn }),
{error, closed}
after Timeout ->
{error, timeout}
Expand Down Expand Up @@ -997,7 +997,7 @@ handoff_stream(Stream, NewOwner, HandoffData) when NewOwner == self() ->
NewOwner ! {handoff_done, Stream, HandoffData},
ok;
handoff_stream(Stream, NewOwner, HandoffData) ->
?tp(debug, #{event=>?FUNCTION_NAME, module=>?MODULE, stream=>Stream, owner=>NewOwner}),
?tp_ignore_side_effects_in_prod(debug, #{event=>?FUNCTION_NAME, module=>?MODULE, stream=>Stream, owner=>NewOwner}),
case quicer:getopt(Stream, active) of
{ok, ActiveN} ->
ActiveN =/= false andalso quicer:setopt(Stream, active, false),
Expand Down Expand Up @@ -1135,10 +1135,10 @@ do_forward_stream_msgs(Stream, Owner, MRef) ->
Owner ! Msg,
do_forward_stream_msgs(Stream, Owner, MRef);
{'DOWN', MRef, process, Owner, _} ->
?tp(debug, do_forward_stream_msg_fail, #{stream => Stream, owner => Owner}),
?tp_ignore_side_effects_in_prod(do_forward_stream_msg_fail, #{stream => Stream, owner => Owner}),
{error, owner_down}
after 0 ->
?tp(debug, do_forward_stream_msg_done, #{stream => Stream, owner => Owner}),
?tp_ignore_side_effects_in_prod(do_forward_stream_msg_done, #{stream => Stream, owner => Owner}),
erlang:demonitor(MRef),
ok
end.
Expand Down
28 changes: 14 additions & 14 deletions src/quicer_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ handle_call(get_handle, _From, #{ conn := Connection } = State) ->
{reply, Connection, State};
handle_call({stream_send, Callback, Data, SendFlags, Opts}, _From,
#{ callback_state := _CbState, conn := Conn } = State) ->
?tp(debug, #{module => ?MODULE, event => stream_send, conn => Conn}),
?tp_ignore_side_effects_in_prod(debug, #{module => ?MODULE, event => stream_send, conn => Conn}),
case quicer_stream:start_link(Callback, Conn, Opts) of
{ok, StreamPid} ->
try quicer_stream:send(StreamPid, Data, SendFlags) of
Expand Down Expand Up @@ -319,7 +319,7 @@ handle_cast(_Request, State) ->
{stop, Reason :: normal | term(), NewState :: term()}.
handle_info({quic, new_conn, C, Props},
#{callback := M, sup := Sup, callback_state := CBState} = State) ->
?tp(debug, #{module=>?MODULE, conn=>C, props=>Props, event=>new_conn}),
?tp_ignore_side_effects_in_prod(debug, #{module=>?MODULE, conn=>C, props=>Props, event=>new_conn}),
%% I become the connection owner, I should start an new acceptor.
Sup =/= undefined andalso (catch supervisor:start_child(Sup, [Sup])),
default_cb_ret(M:new_conn(C, Props, CBState), State#{conn := C});
Expand All @@ -328,7 +328,7 @@ handle_info({quic, connected, C, #{is_resumed := IsResumed} = Props},
#{ conn := C
, callback := M
, callback_state := CbState} = State) ->
?tp(debug, #{module=>?MODULE, conn=>C, props=>Props, event => connected}),
?tp_ignore_side_effects_in_prod(debug, #{module=>?MODULE, conn=>C, props=>Props, event => connected}),
%% @TODO add option to unlink from supervisor
default_cb_ret(M:connected(C, Props, CbState), State#{is_resumed => IsResumed});

Expand All @@ -337,35 +337,35 @@ handle_info({quic, transport_shutdown, C, DownInfo},
, callback := M
, callback_state := CbState
} = State) ->
?tp(debug, #{module => ?MODULE, conn => C, event => transport_shutdown}),
?tp_ignore_side_effects_in_prod(debug, #{module => ?MODULE, conn => C, event => transport_shutdown}),
default_cb_ret(M:transport_shutdown(C, DownInfo, CbState), State);

handle_info({quic, shutdown, C, ErrorCode},
#{ conn := C
, callback := M
, callback_state := CbState
} = State) ->
?tp(debug, #{module => ?MODULE, conn => C, event => shutdown}),
?tp_ignore_side_effects_in_prod(debug, #{module => ?MODULE, conn => C, event => shutdown}),
default_cb_ret(M:shutdown(C, ErrorCode, CbState), State);

handle_info({quic, closed, C, #{is_app_closing := false} = Flags},
#{conn := C, callback := M,
callback_state := CBState} = State) ->
?tp(debug, #{module=>?MODULE, conn=>C, event => closed}),
?tp_ignore_side_effects_in_prod(debug, #{module=>?MODULE, conn=>C, event => closed}),
default_cb_ret(M:closed(C, Flags, CBState), State);

handle_info({quic, local_address_changed, C, NewAddr},
#{ conn := C
, callback := M
, callback_state := CBState} = State) ->
?tp(debug, #{module => ?MODULE, conn => C, event => local_address_changed, new_addr => NewAddr}),
?tp_ignore_side_effects_in_prod(debug, #{module => ?MODULE, conn => C, event => local_address_changed, new_addr => NewAddr}),
default_cb_ret(M:local_address_changed(C, NewAddr, CBState), State);

handle_info({quic, peer_address_changed, C, NewAddr},
#{ conn := C
, callback := M
, callback_state := CbState} = State) ->
?tp(debug, #{module => ?MODULE, conn => C, event => peer_address_changed, new_addr => NewAddr}),
?tp_ignore_side_effects_in_prod(debug, #{module => ?MODULE, conn => C, event => peer_address_changed, new_addr => NewAddr}),
default_cb_ret(M:peer_address_changed(C, NewAddr, CbState), State);

handle_info({quic, new_stream, Stream, Props},
Expand All @@ -378,15 +378,15 @@ handle_info({quic, new_stream, Stream, Props},
%% AND the stream acceptor should accept new stream so it will likely pick up the control stream
%% note, by desgin, control stream doesn't have to be the first stream initiated.
%% here, it handles new stream when there is no available stream acceptor for the connection.
?tp(debug, #{module=>?MODULE, conn=>C, stream=>Stream, event => new_stream}),
?tp_ignore_side_effects_in_prod(debug, #{module=>?MODULE, conn=>C, stream=>Stream, event => new_stream}),
default_cb_ret(M:new_stream(Stream, Props, CbState), State);

handle_info({quic, streams_available, C, #{ bidi_streams := BidirStreams
, unidi_streams := UnidirStreams}},
#{ conn := C
, callback := M
, callback_state := CbState} = State) ->
?tp(debug, #{module => ?MODULE, conn => C, event => streams_available,
?tp_ignore_side_effects_in_prod(debug, #{module => ?MODULE, conn => C, event => streams_available,
bidir_cnt => BidirStreams, unidir_cnt => UnidirStreams}),
default_cb_ret(M:streams_available(C, {BidirStreams, UnidirStreams}, CbState), State);

Expand All @@ -395,18 +395,18 @@ handle_info({quic, peer_needs_streams, C, Needs},
#{ conn := C
, callback := M
, callback_state := CbState} = State) ->
?tp(debug, #{module => ?MODULE, conn => C, event => peer_needs_streams}),
?tp_ignore_side_effects_in_prod(debug, #{module => ?MODULE, conn => C, event => peer_needs_streams}),
default_cb_ret(M:peer_needs_streams(C, Needs, CbState), State);

handle_info({quic, connection_resumed, C, ResumeData},
#{callback := M, callback_state := CBState} = State) ->
?tp(debug, #{module => ?MODULE, conn => C, event => connection_resumed, data => ResumeData}),
?tp_ignore_side_effects_in_prod(debug, #{module => ?MODULE, conn => C, event => connection_resumed, data => ResumeData}),
default_cb_ret(M:resumed(C, ResumeData, CBState), State);

%% Client Only
handle_info({quic, nst_received, C, TicketBin},
#{callback := M, callback_state := CBState} = State) ->
?tp(debug, #{module => ?MODULE, conn => C, event => nst_received, ticket => TicketBin}),
?tp_ignore_side_effects_in_prod(debug, #{module => ?MODULE, conn => C, event => nst_received, ticket => TicketBin}),
default_cb_ret(M:nst_received(C, TicketBin, CBState), State);

%%% ==============================================================
Expand Down Expand Up @@ -434,7 +434,7 @@ handle_info(OtherInfo, #{callback := M,
{stop, Reason :: normal | term(), NewState :: term()}.
handle_continue(Cont, #{callback := M,
callback_state := CBState} = State) ->
?tp(debug, #{module=>?MODULE, event=>continue, stream=>maps:get(stream, State)}),
?tp_ignore_side_effects_in_prod(debug, #{module=>?MODULE, event=>continue, stream=>maps:get(stream, State)}),
default_cb_ret(M:handle_continue(Cont, CBState), State).
%%--------------------------------------------------------------------
%% @private
Expand Down
36 changes: 18 additions & 18 deletions src/quicer_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -318,20 +318,20 @@ handle_cast(_Request, State) ->
handle_info({quic, closed, undefined, undefined},
#{ is_local := false
, stream := undefined
, conn := Conn
, conn := _Conn
} = S) ->
?tp(debug, acceptor_recv_conn_stop, #{conn => Conn, module => ?MODULE, pid => self()}),
?tp_ignore_side_effects_in_prod(acceptor_recv_conn_stop, #{conn => _Conn, module => ?MODULE, pid => self()}),
{stop, normal, S};

%% For acceptor
handle_info({quic, new_stream, Stream, #{flags := Flags, is_orphan := false} = Props},
handle_info({quic, new_stream, Stream, #{flags := _Flags, is_orphan := false} = Props},
#{ stream_opts := Options
, stream := undefined
, conn := Conn
, callback := CallbackModule
, callback_state := undefined
} = State) ->
?tp(debug, new_stream, #{module=>?MODULE, stream=>Stream, stream_flags => Flags}),
?tp_ignore_side_effects_in_prod(new_stream, #{module=>?MODULE, stream=>Stream, stream_flags => _Flags}),
try CallbackModule:new_stream(Stream, maps:merge(Options, Props), Conn) of
{ok, CallbackState} ->
{noreply, State#{stream := Stream, callback_state := CallbackState}};
Expand All @@ -348,15 +348,15 @@ handle_info({quic, Bin, Stream, Props},
, callback_state := CallbackState } = State)
when is_binary(Bin) ->
%% FPbuffer is disabled, callback module should handle out of order delivery
?tp(debug, stream_data, #{module=>?MODULE, stream=>Stream}),
?tp_ignore_side_effects_in_prod(stream_data, #{module=>?MODULE, stream=>Stream}),
default_cb_ret(M:handle_stream_data(Stream, Bin, Props, CallbackState), State);
handle_info({quic, Bin, Stream, Props} = Evt,
#{ stream := Stream, callback := M
, fpbuffer := Buffer
, callback_state := CallbackState } = State)
when is_binary(Bin) andalso Buffer =/= disabled ->
%% FPbuffer is enabled, callback module get ordered data
?tp(debug, stream_data, #{module=>?MODULE, stream=>Stream, buffer => Buffer}),
?tp_ignore_side_effects_in_prod(stream_data, #{module=>?MODULE, stream=>Stream, buffer => Buffer}),
case quicer:update_fpbuffer(quicer:quic_data(Evt), Buffer) of
{[], NewBuffer} ->
{noreply, State#{ fpbuffer := NewBuffer} };
Expand All @@ -375,61 +375,61 @@ handle_info({quic, start_completed, Stream,
}
, #{ callback := M
, callback_state := CBState} = State) ->
?tp(debug, #{module=>?MODULE, event => start_completed, props => Props}),
?tp_ignore_side_effects_in_prod(debug, #{module=>?MODULE, event => start_completed, props => Props}),
default_cb_ret(M:start_completed(Stream, Props, CBState), State);

handle_info({quic, send_complete, Stream, IsSendCanceled},
#{ callback := M
, callback_state := CBState} = State) ->
?tp(debug, #{module=>?MODULE, event=>send_complete, is_canceled=>IsSendCanceled}),
?tp_ignore_side_effects_in_prod(debug, #{module=>?MODULE, event=>send_complete, is_canceled=>IsSendCanceled}),
default_cb_ret(M:send_complete(Stream, IsSendCanceled, CBState), State);

handle_info({quic, peer_send_shutdown, Stream, undefined},
#{ callback := M
, callback_state := CBState} = State) ->
?tp(debug, #{module=>?MODULE, event => peer_send_shutdown}),
?tp_ignore_side_effects_in_prod(debug, #{module=>?MODULE, event => peer_send_shutdown}),
default_cb_ret(M:peer_send_shutdown(Stream, undefined, CBState), State);

handle_info({quic, peer_send_aborted, Stream, ErrorCode},
#{ callback := M
, callback_state := CBState} = State) ->
?tp(debug, #{module=>?MODULE, event => peer_send_aborted, error_code => ErrorCode}),
?tp_ignore_side_effects_in_prod(debug, #{module=>?MODULE, event => peer_send_aborted, error_code => ErrorCode}),
default_cb_ret(M:peer_send_aborted(Stream, ErrorCode, CBState), State);

handle_info({quic, peer_receive_aborted, Stream, ErrorCode},
#{ callback := M,
callback_state := CBState} = State) ->
?tp(debug, #{module=>?MODULE, event => peer_receive_aborted, error_code => ErrorCode}),
?tp_ignore_side_effects_in_prod(debug, #{module=>?MODULE, event => peer_receive_aborted, error_code => ErrorCode}),
default_cb_ret(M:peer_receive_aborted(Stream, ErrorCode, CBState), State);

handle_info({quic, send_shutdown_complete, Stream, IsGraceful},
#{ callback := M
, callback_state := CBState} = State) ->
?tp(debug, #{module=>?MODULE, event => send_shutdown_complete, is_graceful => IsGraceful}),
?tp_ignore_side_effects_in_prod(debug, #{module=>?MODULE, event => send_shutdown_complete, is_graceful => IsGraceful}),
default_cb_ret(M:send_shutdown_complete(Stream, IsGraceful, CBState), State);

handle_info({quic, stream_closed, Stream, Flags},
#{ callback := M
, conn := C
, callback_state := CbState} = State) when C =/= undefined andalso is_map(Flags) ->
?tp(debug, #{module=>?MODULE, conn=>C, stream=>Stream, event=>stream_closed, flags=>Flags}),
?tp_ignore_side_effects_in_prod(debug, #{module=>?MODULE, conn=>C, stream=>Stream, event=>stream_closed, flags=>Flags}),
default_cb_ret(M:stream_closed(Stream, Flags, CbState), State);

handle_info({quic, peer_accepted, Stream, undefined},
#{ callback := M
, callback_state := CBState} = State) ->
?tp(debug, #{module=>?MODULE, event => peer_accepted}),
?tp_ignore_side_effects_in_prod(debug, #{module=>?MODULE, event => peer_accepted}),
default_cb_ret(M:peer_accepted(Stream, undefined, CBState), State);

handle_info({quic, passive, Stream, undefined},
#{ callback := M
, callback_state := CBState} = State) ->
?tp(debug, #{module=>?MODULE, event => passive}),
?tp_ignore_side_effects_in_prod(debug, #{module=>?MODULE, event => passive}),
default_cb_ret(M:passive(Stream, undefined, CBState), State);
handle_info(Info,
#{ callback := M
, callback_state := CBState} = State) ->
?tp(debug, #{module=>?MODULE, event => info}),
?tp_ignore_side_effects_in_prod(debug, #{module=>?MODULE, event => info}),
default_cb_ret(M:handle_info(Info, CBState), State).

%% @TODO handle_info({EXIT....
Expand All @@ -449,7 +449,7 @@ handle_info(Info,
handle_continue({?post_init, PrevOwner}, #{ is_owner := false, stream := Stream
, callback_state := CBState
, callback := M} = State) ->
?tp(debug, #{event=>?post_init, module=>?MODULE, stream=>Stream}),
?tp_ignore_side_effects_in_prod(debug, #{event=>?post_init, module=>?MODULE, stream=>Stream}),
case wait_for_handoff(PrevOwner, Stream) of
{ok, PostInfo}->
case erlang:function_exported(M, post_handoff, 3) of
Expand All @@ -463,7 +463,7 @@ handle_continue({?post_init, PrevOwner}, #{ is_owner := false, stream := Stream
end;
handle_continue(Other, #{ callback := M
, callback_state := CBState} = State) ->
?tp(debug, #{module=>?MODULE, event=>continue, stream=>maps:get(stream, State)}),
?tp_ignore_side_effects_in_prod(debug, #{module=>?MODULE, event=>continue, stream=>maps:get(stream, State)}),
default_cb_ret(M:handle_continue(Other, CBState), State).
%%--------------------------------------------------------------------
%% @private
Expand Down

0 comments on commit b56cff6

Please sign in to comment.