Skip to content

Commit

Permalink
Merge pull request #63 from arpunk/remove-gen_logger
Browse files Browse the repository at this point in the history
Remove gen_logger and use native Erlang logging functions
  • Loading branch information
turtleDeng authored Aug 31, 2018
2 parents cffa0aa + 9ccff9b commit 9be166f
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 102 deletions.
17 changes: 17 additions & 0 deletions include/emqttc_packet.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,23 @@
%%% @end
%%%-----------------------------------------------------------------------------

%%------------------------------------------------------------------------------
%% Logging wrappers
%%------------------------------------------------------------------------------

%% this macro only exists in OTP21 and above where logger already exists
-ifdef(OTP_RELEASE).
-define(debug(Message, Opts), logger:debug(Message, Opts)).
-define(info(Message, Opts), logger:info(Message, Opts)).
-define(warn(Message, Opts), logger:warning(Message, Opts)).
-define(error(Message, Opts), logger:error(Message, Opts)).
-else.
-define(debug(Message, Opts), error_logger:info_msg(Message, Opts)).
-define(info(Message, Opts), error_logger:info_msg(Message, Opts)).
-define(warn(Message, Opts), error_logger:warning_msg(Message, Opts)).
-define(error(Message, Opts), error_logger:error_msg(Message, Opts)).
-endif.

%%------------------------------------------------------------------------------
%% MQTT Protocol Version and Levels
%%------------------------------------------------------------------------------
Expand Down
8 changes: 3 additions & 5 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@
{xref_checks, [undefined_function_calls]}.
{cover_enabled, false}.

{edoc_opts, [{dialyzer_specs, all},
{edoc_opts, [{dialyzer_specs, all},
{report_missing_type, true},
{report_type_mismatch, true},
{report_type_mismatch, true},
{pretty_print, erl_pp},
{preprocess, true}]}.

{validate_app_modules, true}.

{deps, [
{gen_logger, ".*", {git, "https://github.com/emqtt/gen_logger.git", {branch, "master"}}}
]}.
{deps, []}.
1 change: 1 addition & 0 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[].
2 changes: 1 addition & 1 deletion src/emqttc.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
{registered, []},
{applications, [kernel,
stdlib]},
{included_applications, [gen_logger]},
{included_applications, []},
{env, []}
]}.
132 changes: 59 additions & 73 deletions src/emqttc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
| {puback_timeout, pos_integer()}
| {suback_timeout, pos_integer()}
| ssl | {ssl, [ssl:ssloption()]}
| {logger, atom() | {atom(), atom()}}
| force_ping | {force_ping, boolean()}
| auto_resub | {auto_resub, boolean()}
| {reconnect, non_neg_integer() | {non_neg_integer(), non_neg_integer()} | false}.
Expand Down Expand Up @@ -115,7 +114,6 @@
connack_tref :: reference(),
transport = tcp :: tcp | ssl,
reconnector :: emqttc_reconnector:reconnector() | undefined,
logger :: gen_logger:logmod(),
tcp_opts :: [gen_tcp:connect_option()],
ssl_opts :: [ssl:ssloption()]}).

Expand Down Expand Up @@ -389,31 +387,25 @@ init([Name, Recipient, MqttOpts, TcpOpts]) ->

process_flag(trap_exit, true),

Logger = gen_logger:new(get_value(logger, MqttOpts, {console, debug})),

MqttOpts1 = proplists:delete(logger, MqttOpts),

case get_value(client_id, MqttOpts1) of
undefined -> Logger:warning("ClientId is NULL!");
case get_value(client_id, MqttOpts) of
undefined -> ?warn("ClientId is NULL!", []);
_ -> ok
end,

ProtoState = emqttc_protocol:init(
emqttc_opts:merge([{logger, Logger},
{keepalive, ?KEEPALIVE}], MqttOpts1)),

State = init(MqttOpts1, #state{name = Name,
recipient = Recipient,
host = "127.0.0.1",
port = 1883,
proto_state = ProtoState,
keepalive_after = ?KEEPALIVE,
connack_timeout = ?CONNACK_TIMEOUT,
puback_timeout = ?PUBACK_TIMEOUT,
suback_timeout = ?SUBACK_TIMEOUT,
logger = Logger,
tcp_opts = TcpOpts,
ssl_opts = []}),
emqttc_opts:merge([{keepalive, ?KEEPALIVE}], MqttOpts)),

State = init(MqttOpts, #state{name = Name,
recipient = Recipient,
host = "127.0.0.1",
port = 1883,
proto_state = ProtoState,
keepalive_after = ?KEEPALIVE,
connack_timeout = ?CONNACK_TIMEOUT,
puback_timeout = ?PUBACK_TIMEOUT,
suback_timeout = ?SUBACK_TIMEOUT,
tcp_opts = TcpOpts,
ssl_opts = []}),

{ok, connecting, State, 0}.

Expand All @@ -437,8 +429,6 @@ init([{force_ping, Cfg} | Opts], State) when is_boolean(Cfg) ->
init(Opts, State#state{force_ping = Cfg});
init([force_ping | Opts], State) ->
init(Opts, State#state{force_ping = true});
init([{logger, Cfg} | Opts], State) ->
init(Opts, State#state{logger = gen_logger:new(Cfg)});
init([{keepalive, Time} | Opts], State) ->
init(Opts, State#state{keepalive_after = Time});
init([{connack_timeout, Timeout}| Opts], State) ->
Expand Down Expand Up @@ -478,9 +468,8 @@ waiting_for_connack(?CONNACK_PACKET(?CONNACK_ACCEPT), State = #state{
pubsub_map = PubsubMap,
proto_state = ProtoState,
keepalive = KeepAlive,
connack_tref = TRef,
logger = Logger}) ->
Logger:info("[Client ~s] RECV: CONNACK_ACCEPT", [Name]),
connack_tref = TRef}) ->
?info("[Client ~s] RECV: CONNACK_ACCEPT", [Name]),

%% Cancel connack timer
if
Expand Down Expand Up @@ -518,13 +507,13 @@ waiting_for_connack(?CONNACK_PACKET(?CONNACK_ACCEPT), State = #state{
{stop, {shutdown, Error}, State}
end;

waiting_for_connack(?CONNACK_PACKET(ReturnCode), State = #state{name = Name, logger = Logger}) ->
waiting_for_connack(?CONNACK_PACKET(ReturnCode), State = #state{name = Name}) ->
ErrConnAck = emqttc_packet:connack_name(ReturnCode),
Logger:debug("[Client ~s] RECV: ~s", [Name, ErrConnAck]),
?debug("[Client ~s] RECV: ~s", [Name, ErrConnAck]),
{stop, {shutdown, {connack_error, ErrConnAck}}, State};

waiting_for_connack(Packet = ?PACKET(_Type), State = #state{name = Name, logger = Logger}) ->
Logger:error("[Client ~s] RECV: ~s, when waiting for connack!", [Name, emqttc_packet:dump(Packet)]),
waiting_for_connack(Packet = ?PACKET(_Type), State = #state{name = Name}) ->
?error("[Client ~s] RECV: ~s, when waiting for connack!", [Name, emqttc_packet:dump(Packet)]),
next_state(waiting_for_connack, State);

waiting_for_connack(Event = {publish, _Msg}, State) ->
Expand All @@ -539,21 +528,21 @@ waiting_for_connack(disconnect, State=#state{receiver = Receiver, proto_state =
emqttc_socket:stop(Receiver),
{stop, normal, State#state{socket = undefined, receiver = undefined}};

waiting_for_connack({timeout, TRef, connack}, State = #state{name = Name, logger = Logger, connack_tref = TRef}) ->
Logger:error("[Client ~s] CONNACK Timeout!", [Name]),
waiting_for_connack({timeout, TRef, connack}, State = #state{name = Name, connack_tref = TRef}) ->
?error("[Client ~s] CONNACK Timeout!", [Name]),
{stop, {shutdown, connack_timeout}, State};

waiting_for_connack(Event, State = #state{name = Name, logger = Logger}) ->
Logger:warning("[Client ~s] Unexpected Event: ~p, when waiting for connack!", [Name, Event]),
waiting_for_connack(Event, State = #state{name = Name}) ->
?warn("[Client ~s] Unexpected Event: ~p, when waiting for connack!", [Name, Event]),
{next_state, waiting_for_connack, State}.

%%------------------------------------------------------------------------------
%% @private
%% @doc Sync Event Handler for state that waiting_for_connack from MQTT broker.
%% @end
%%------------------------------------------------------------------------------
waiting_for_connack(Event, _From, State = #state{name = Name, logger = Logger}) ->
Logger:error("[Client ~s] Event when waiting_for_connack: ~p", [Name, Event]),
waiting_for_connack(Event, _From, State = #state{name = Name}) ->
?error("[Client ~s] Event when waiting_for_connack: ~p", [Name, Event]),
{reply, {error, waiting_for_connack}, waiting_for_connack, State}.

%%------------------------------------------------------------------------------
Expand All @@ -567,8 +556,7 @@ connected({publish, Msg}, State=#state{proto_state = ProtoState}) ->

connected({subscribe, SubPid, Topics}, State = #state{subscribers = Subscribers,
pubsub_map = PubSubMap,
proto_state = ProtoState,
logger = Logger}) ->
proto_state = ProtoState}) ->

{ok, MsgId, ProtoState1} = emqttc_protocol:subscribe(Topics, ProtoState),

Expand All @@ -592,7 +580,7 @@ connected({subscribe, SubPid, Topics}, State = #state{subscribers = Subscribers,
Qos =:= OldQos ->
Map;
true ->
Logger:error("Subscribe topic '~s' with different qos: old=~p, new=~p", [Topic, OldQos, Qos]),
?error("Subscribe topic '~s' with different qos: old=~p, new=~p", [Topic, OldQos, Qos]),
maps:put(Topic, {Qos, Subs}, Map)
end;
false ->
Expand Down Expand Up @@ -655,13 +643,13 @@ connected(disconnect, State=#state{receiver = Receiver, proto_state = ProtoState
emqttc_socket:stop(Receiver),
{stop, normal, State#state{socket = undefined, receiver = undefined}};

connected(Packet = ?PACKET(_Type), State = #state{name = Name, logger = Logger}) ->
% Logger:debug("[Client ~s] RECV: ~s", [Name, emqttc_packet:dump(Packet)]),
connected(Packet = ?PACKET(_Type), State = #state{name = Name}) ->
% ?debug("[Client ~s] RECV: ~s", [Name, emqttc_packet:dump(Packet)]),
{ok, NewState} = received(Packet, State),
next_state(connected, NewState);

connected(Event, State = #state{name = Name, logger = Logger}) ->
Logger:warning("[Client ~s] Unexpected Event: ~p, when broker connected!", [Name, Event]),
connected(Event, State = #state{name = Name}) ->
?warn("[Client ~s] Unexpected Event: ~p, when broker connected!", [Name, Event]),
next_state(connected, State).

%%------------------------------------------------------------------------------
Expand Down Expand Up @@ -707,8 +695,8 @@ connected({Pid, ping}, From, State = #state{ping_reqs = PingReqs, proto_state =
end,
{next_state, connected, State#state{ping_reqs = PingReqs1}};

connected(Event, _From, State = #state{name = Name, logger = Logger}) ->
Logger:error("[Client ~s] Unexpected Sync Event when connected: ~p", [Name, Event]),
connected(Event, _From, State = #state{name = Name}) ->
?error("[Client ~s] Unexpected Sync Event when connected: ~p", [Name, Event]),
{reply, {error, unexpected_event}, connected, State}.

%%------------------------------------------------------------------------------
Expand All @@ -726,17 +714,17 @@ disconnected(Event = {Tag, _From, _Topics}, State) when
disconnected(disconnect, State) ->
{stop, normal, State};

disconnected(Event, State = #state{name = Name, logger = Logger}) ->
Logger:error("[Client ~s] Unexpected Event: ~p, when disconnected from broker!", [Name, Event]),
disconnected(Event, State = #state{name = Name}) ->
?error("[Client ~s] Unexpected Event: ~p, when disconnected from broker!", [Name, Event]),
next_state(disconnected, State).

%%------------------------------------------------------------------------------
%% @private
%% @doc Sync Event Handler for state that disconnected from MQTT broker.
%% @end
%%------------------------------------------------------------------------------
disconnected(Event, _From, State = #state{name = Name, logger = Logger}) ->
Logger:error("Client ~s] Unexpected Sync Event: ~p, when disconnected from broker!", [Name, Event]),
disconnected(Event, _From, State = #state{name = Name}) ->
?error("Client ~s] Unexpected Sync Event: ~p, when disconnected from broker!", [Name, Event]),
{reply, {error, disonnected}, disconnected, State}.

%%------------------------------------------------------------------------------
Expand All @@ -755,14 +743,14 @@ disconnected(Event, _From, State = #state{name = Name, logger = Logger}) ->
timeout() | hibernate} |
{stop, Reason :: term(), NewStateData :: #state{}}).

handle_event({frame_error, Error}, _StateName, State = #state{name = Name, logger = Logger}) ->
Logger:error("[Client ~s] Frame Error: ~p", [Name, Error]),
handle_event({frame_error, Error}, _StateName, State = #state{name = Name}) ->
?error("[Client ~s] Frame Error: ~p", [Name, Error]),
{stop, {shutdown, {frame_error, Error}}, State};

handle_event({connection_lost, Reason}, StateName, State = #state{recipient = Recipient, name = Name, keepalive = KeepAlive, connack_tref = TRef, logger = Logger})
handle_event({connection_lost, Reason}, StateName, State = #state{recipient = Recipient, name = Name, keepalive = KeepAlive, connack_tref = TRef})
when StateName =:= connected; StateName =:= waiting_for_connack ->

Logger:warning("[Client ~s] Connection lost for: ~p", [Name, Reason]),
?warn("[Client ~s] Connection lost for: ~p", [Name, Reason]),

%% cancel connack timer first, if connection lost when waiting for connack.
case {StateName, TRef} of
Expand All @@ -779,8 +767,8 @@ handle_event({connection_lost, Reason}, StateName, State = #state{recipient = Re

try_reconnect(Reason, State#state{socket = undefined, connack_tref = TRef});

handle_event(Event, StateName, State = #state{name = Name, logger = Logger}) ->
Logger:warning("[Client ~s] Unexpected Event when ~s: ~p", [Name, StateName, Event]),
handle_event(Event, StateName, State = #state{name = Name}) ->
?warn("[Client ~s] Unexpected Event when ~s: ~p", [Name, StateName, Event]),
{next_state, StateName, State}.

%%------------------------------------------------------------------------------
Expand Down Expand Up @@ -859,19 +847,18 @@ handle_info({'EXIT', Receiver, normal}, StateName, State = #state{receiver = Rec

handle_info({'EXIT', Receiver, Reason}, _StateName,
State = #state{name = Name, receiver = Receiver,
keepalive = KeepAlive, logger = Logger}) ->
keepalive = KeepAlive}) ->
%% event occured when receiver error
Logger:error("[Client ~s] receiver exit: ~p", [Name, Reason]),
?error("[Client ~s] receiver exit: ~p", [Name, Reason]),
emqttc_keepalive:cancel(KeepAlive),
try_reconnect({receiver, Reason}, State#state{receiver = undefined, socket = undefined});

handle_info(Down = {'DOWN', MonRef, process, Pid, _Why}, StateName,
State = #state{name = Name,
subscribers = Subscribers,
pubsub_map = PubSubMap,
ping_reqs = PingReqs,
logger = Logger}) ->
Logger:warning("[Client ~s] Process DOWN: ~p", [Name, Down]),
ping_reqs = PingReqs}) ->
?warn("[Client ~s] Process DOWN: ~p", [Name, Down]),

%% ping?
PingReqs1 = lists:keydelete(MonRef, 2, PingReqs),
Expand Down Expand Up @@ -899,8 +886,8 @@ handle_info({inet_reply, Socket, ok}, StateName, State = #state{socket = Socket}
%socket send reply.
next_state(StateName, State);

handle_info(Info, StateName, State = #state{name = Name, logger = Logger}) ->
Logger:error("[Client ~s] Unexpected Info when ~s: ~p", [Name, StateName, Info]),
handle_info(Info, StateName, State = #state{name = Name}) ->
?error("[Client ~s] Unexpected Info when ~s: ~p", [Name, StateName, Info]),
{next_state, StateName, State}.

%%------------------------------------------------------------------------------
Expand Down Expand Up @@ -950,37 +937,36 @@ connect(State = #state{name = Name,
keepalive_after = KeepAliveTime,
connack_timeout = ConnAckTimeout,
transport = Transport,
logger = Logger,
tcp_opts = TcpOpts,
ssl_opts = SslOpts}) ->
Logger:info("[Client ~s]: connecting to ~s:~p", [Name, Host, Port]),
?info("[Client ~s]: connecting to ~s:~p", [Name, Host, Port]),
case emqttc_socket:connect(self(), Transport, Host, Port, TcpOpts, SslOpts) of
{ok, Socket, Receiver} ->
ProtoState1 = emqttc_protocol:set_socket(ProtoState, Socket),
emqttc_protocol:connect(ProtoState1),
KeepAlive = emqttc_keepalive:new({Socket, send_oct}, KeepAliveTime, {keepalive, timeout}),
TRef = gen_fsm:start_timer(ConnAckTimeout*1000, connack),
Logger:info("[Client ~s] connected with ~s:~p", [Name, Host, Port]),
?info("[Client ~s] connected with ~s:~p", [Name, Host, Port]),
{next_state, waiting_for_connack, State#state{socket = Socket,
receiver = Receiver,
keepalive = KeepAlive,
connack_tref = TRef,
proto_state = ProtoState1}};
{error, Reason} ->
Logger:info("[Client ~s] connection failure: ~p", [Name, Reason]),
?info("[Client ~s] connection failure: ~p", [Name, Reason]),
try_reconnect(Reason, State)
end.

try_reconnect(Reason, State = #state{reconnector = undefined}) ->
{stop, {shutdown, Reason}, State};

try_reconnect(Reason, State = #state{name = Name, reconnector = Reconnector, logger = Logger}) ->
Logger:info("[Client ~s] try reconnecting...", [Name]),
try_reconnect(Reason, State = #state{name = Name, reconnector = Reconnector}) ->
?info("[Client ~s] try reconnecting...", [Name]),
case emqttc_reconnector:execute(Reconnector, {reconnect, timeout}) of
{ok, Reconnector1} ->
{next_state, disconnected, State#state{reconnector = Reconnector1}};
{stop, Error} ->
Logger:error("[Client ~s] reconect error: ~p", [Name, Error]),
?error("[Client ~s] reconect error: ~p", [Name, Error]),
{stop, {shutdown, Reason}, State}
end.

Expand Down Expand Up @@ -1069,14 +1055,14 @@ reply({PubSub, ReqId}, Reply, State = #state{inflight_reqs = InflightReqs}) ->
end,
State#state{inflight_reqs = InflightReqs1}.

reply_timeout({Ack, ReqId}, State=#state{inflight_reqs = InflightReqs, logger = Logger}) ->
reply_timeout({Ack, ReqId}, State=#state{inflight_reqs = InflightReqs}) ->
InflightReqs1 =
case maps:find(ReqId, InflightReqs) of
{ok, {_Pubsub, From, _MRef}} ->
gen_fsm:reply(From, {error, ack_timeout}),
maps:remove(ReqId, InflightReqs);
error ->
Logger:error("~s timeout, cannot find inflight reqid: ~p", [Ack, ReqId]),
?error("~s timeout, cannot find inflight reqid: ~p", [Ack, ReqId]),
InflightReqs
end,
State#state{inflight_reqs = InflightReqs1}.
Expand Down
Loading

0 comments on commit 9be166f

Please sign in to comment.