diff --git a/include/emqtt.hrl b/include/emqtt.hrl index f4ba9e6d..1fd4a764 100644 --- a/include/emqtt.hrl +++ b/include/emqtt.hrl @@ -17,14 +17,6 @@ -ifndef(EMQTT_HRL). -define(EMQTT_HRL, true). -%%-------------------------------------------------------------------- -%% MQTT SockOpts -%%-------------------------------------------------------------------- - --define(MQTT_SOCKOPTS, [binary, {packet, raw}, - {backlog, 512}, - {nodelay, true}]). - %%-------------------------------------------------------------------- %% MQTT Protocol Version and Names %%-------------------------------------------------------------------- diff --git a/rebar.config b/rebar.config index e5d22823..762a976e 100644 --- a/rebar.config +++ b/rebar.config @@ -7,10 +7,10 @@ warn_obsolete_guard ]}. -{deps, [{cowlib, "2.6.0"}, - {gun, "1.3.0"}, +{deps, [{cowlib, "2.8.0"}, + {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.4"}}}, {getopt, "1.0.1"}, - {esasl, {git, "https://github.com/emqx/esasl", {tag, "master"}}} + esasl, {git, "https://github.com/emqx/esasl", {tag, "master"}}} ]}. {escript_name, emqtt_cli}. diff --git a/src/emqtt.appup.src b/src/emqtt.appup.src new file mode 100644 index 00000000..c7e63aae --- /dev/null +++ b/src/emqtt.appup.src @@ -0,0 +1,13 @@ +%% -*-: erlang -*- +{"1.2.3", + [{<<"1.2.*">>, [ + {load_module, emqtt_sock, brutal_purge, soft_purge, []}, + {load_module, emqtt, brutal_purge, soft_purge, []} + ]} + ], + [{<<"1.2.*">>, [ + {load_module, emqtt_sock, brutal_purge, soft_purge, []}, + {load_module, emqtt, brutal_purge, soft_purge, []} + ]} + ] +}. diff --git a/src/emqtt.erl b/src/emqtt.erl index 13c3b32e..fd33fd34 100644 --- a/src/emqtt.erl +++ b/src/emqtt.erl @@ -102,9 +102,12 @@ %% Message handler is a set of callbacks defined to handle MQTT messages %% as well as the disconnect event. -define(NO_MSG_HDLR, undefined). --type(msg_handler() :: #{puback := fun((_) -> any()), - publish := fun((emqx_types:message()) -> any()), - disconnected := fun(({reason_code(), _Properties :: term()}) -> any()) + +-type(mfas() :: {module(), atom(), list()} | {function(), list()}). + +-type(msg_handler() :: #{puback := fun((_) -> any()) | mfas(), + publish := fun((emqx_types:message()) -> any()) | mfas(), + disconnected := fun(({reason_code(), _Properties :: term()}) -> any()) | mfas() }). -type(option() :: {name, atom()} @@ -1067,6 +1070,8 @@ connected(info, {timeout, TRef, keepalive}, true -> case send(?PACKET(?PINGREQ), State) of {ok, NewState} -> + {ok, [{send_oct, Val}]} = ConnMod:getstat(Sock, [send_oct]), + put(send_oct, Val), {keep_state, ensure_keepalive_timer(NewState), [hibernate]}; Error -> {stop, Error} end; @@ -1359,9 +1364,22 @@ eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR, ok; eval_msg_handler(#state{msg_handler = Handler}, Kind, Msg) -> F = maps:get(Kind, Handler), - _ = F(Msg), + _ = apply_handler_function(F, Msg), ok. +apply_handler_function(F, Msg) + when is_function(F) -> + erlang:apply(F, [Msg]); +apply_handler_function({F, A}, Msg) + when is_function(F), + is_list(A) -> + erlang:apply(F, [Msg] ++ A); +apply_handler_function({M, F, A}, Msg) + when is_atom(M), + is_atom(F), + is_list(A) -> + erlang:apply(M, F, [Msg] ++ A). + packet_to_msg(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, dup = Dup, qos = QoS, diff --git a/src/emqtt_sock.erl b/src/emqtt_sock.erl index 8de3c35b..f5ea9062 100644 --- a/src/emqtt_sock.erl +++ b/src/emqtt_sock.erl @@ -38,7 +38,7 @@ -export_type([socket/0, option/0]). -define(DEFAULT_TCP_OPTIONS, [binary, {packet, raw}, {active, false}, - {nodelay, true}, {reuseaddr, true}]). + {nodelay, true}]). -spec(connect(inet:ip_address() | inet:hostname(), inet:port_number(), [option()], timeout())