Skip to content

Commit

Permalink
Merge branch 'master' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
Rory-Z authored Dec 21, 2020
2 parents 64f787f + 9e867b1 commit 9536d3f
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 16 deletions.
8 changes: 0 additions & 8 deletions include/emqtt.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@
-ifndef(EMQTT_HRL).
-define(EMQTT_HRL, true).

%%--------------------------------------------------------------------
%% MQTT SockOpts
%%--------------------------------------------------------------------

-define(MQTT_SOCKOPTS, [binary, {packet, raw},
{backlog, 512},
{nodelay, true}]).

%%--------------------------------------------------------------------
%% MQTT Protocol Version and Names
%%--------------------------------------------------------------------
Expand Down
6 changes: 3 additions & 3 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
warn_obsolete_guard
]}.

{deps, [{cowlib, "2.6.0"},
{gun, "1.3.0"},
{deps, [{cowlib, "2.8.0"},
{gun, {git, "https://github.com/emqx/gun", {tag, "1.3.4"}}},
{getopt, "1.0.1"},
{esasl, {git, "https://github.com/emqx/esasl", {tag, "master"}}}
esasl, {git, "https://github.com/emqx/esasl", {tag, "master"}}}
]}.

{escript_name, emqtt_cli}.
Expand Down
13 changes: 13 additions & 0 deletions src/emqtt.appup.src
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
%% -*-: erlang -*-
{"1.2.3",
[{<<"1.2.*">>, [
{load_module, emqtt_sock, brutal_purge, soft_purge, []},
{load_module, emqtt, brutal_purge, soft_purge, []}
]}
],
[{<<"1.2.*">>, [
{load_module, emqtt_sock, brutal_purge, soft_purge, []},
{load_module, emqtt, brutal_purge, soft_purge, []}
]}
]
}.
26 changes: 22 additions & 4 deletions src/emqtt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,12 @@
%% Message handler is a set of callbacks defined to handle MQTT messages
%% as well as the disconnect event.
-define(NO_MSG_HDLR, undefined).
-type(msg_handler() :: #{puback := fun((_) -> any()),
publish := fun((emqx_types:message()) -> any()),
disconnected := fun(({reason_code(), _Properties :: term()}) -> any())

-type(mfas() :: {module(), atom(), list()} | {function(), list()}).

-type(msg_handler() :: #{puback := fun((_) -> any()) | mfas(),
publish := fun((emqx_types:message()) -> any()) | mfas(),
disconnected := fun(({reason_code(), _Properties :: term()}) -> any()) | mfas()
}).

-type(option() :: {name, atom()}
Expand Down Expand Up @@ -1067,6 +1070,8 @@ connected(info, {timeout, TRef, keepalive},
true ->
case send(?PACKET(?PINGREQ), State) of
{ok, NewState} ->
{ok, [{send_oct, Val}]} = ConnMod:getstat(Sock, [send_oct]),
put(send_oct, Val),
{keep_state, ensure_keepalive_timer(NewState), [hibernate]};
Error -> {stop, Error}
end;
Expand Down Expand Up @@ -1359,9 +1364,22 @@ eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR,
ok;
eval_msg_handler(#state{msg_handler = Handler}, Kind, Msg) ->
F = maps:get(Kind, Handler),
_ = F(Msg),
_ = apply_handler_function(F, Msg),
ok.

apply_handler_function(F, Msg)
when is_function(F) ->
erlang:apply(F, [Msg]);
apply_handler_function({F, A}, Msg)
when is_function(F),
is_list(A) ->
erlang:apply(F, [Msg] ++ A);
apply_handler_function({M, F, A}, Msg)
when is_atom(M),
is_atom(F),
is_list(A) ->
erlang:apply(M, F, [Msg] ++ A).

packet_to_msg(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
dup = Dup,
qos = QoS,
Expand Down
2 changes: 1 addition & 1 deletion src/emqtt_sock.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
-export_type([socket/0, option/0]).

-define(DEFAULT_TCP_OPTIONS, [binary, {packet, raw}, {active, false},
{nodelay, true}, {reuseaddr, true}]).
{nodelay, true}]).

-spec(connect(inet:ip_address() | inet:hostname(),
inet:port_number(), [option()], timeout())
Expand Down

0 comments on commit 9536d3f

Please sign in to comment.