From e50e33d3cc3983dba982722e340380dce7249431 Mon Sep 17 00:00:00 2001 From: Marc Worrell Date: Mon, 15 Jul 2024 12:20:08 +0200 Subject: [PATCH 1/4] Extra formatting of logstash reports as json data. Fallback to 'text' field in report if no 'msg' field available. --- src/logstasher_h.erl | 189 ++++++++++++++++++++++++++++++++------ test/logstasher_SUITE.erl | 34 ++++++- 2 files changed, 190 insertions(+), 33 deletions(-) diff --git a/src/logstasher_h.erl b/src/logstasher_h.erl index a1a8f7c..caa43c9 100644 --- a/src/logstasher_h.erl +++ b/src/logstasher_h.erl @@ -3,9 +3,15 @@ %% logger callbacks -export([log/2]). +%% Testing +-export([log_data/1]). + %% Xref ignores -ignore_xref([log/2]). +%% Truncate binary values beyond this size. +-define(LOG_BINARY_SIZE, 2000). + %%============================================================================== %% API %%============================================================================== @@ -14,61 +20,186 @@ log(#{level := info, meta := #{error_logger := #{type := progress}}}, _Config) -> % Ignore supervisor progress reports ok; -log(#{level := Level, msg := EventData, meta := Meta}, _Config) -> - {Msg, MsgFields} = format_msg(EventData), - Fields = [{severity, Level}] ++ safe_meta(Meta) ++ MsgFields, - Data = #{fields => Fields, '@timestamp' => format_timestamp(Meta), message => Msg}, - _ = logstasher:send(jsx:encode(Data)), - ok. +log(LogEvent, _Config) -> + try + Data = log_data(LogEvent), + _ = logstasher:send(jsx:encode(Data)), + ok + catch + _:_ -> + % Ignore crashes on unexpected data, as that would remove the log + % handler from the logger and stop logging. + ok + end. %%============================================================================== %% Internal functions %%============================================================================== --spec format_msg(Data) -> {binary(), [{binary() | atom(), jsx:json_term()}]} when - Data :: {io:format(), [term()]} - | {report, logger:report()} - | {string, unicode:chardata()}. +log_data(#{level := Level, msg := EventData, meta := Meta}) -> + {Msg, MsgFields} = format_msg(EventData), + {Msg1, MsgFields1} = maybe_extract_message(Msg, MsgFields), + Fields = maps:merge(safe_meta(Meta), MsgFields1), + Fields1 = Fields#{ severity => Level }, + #{ + fields => Fields1, + '@timestamp' => format_timestamp(Meta), + message => Msg1 + }. + +%% @doc If there is no message, try to extract the 'text' fields from the message fields +%% and use that as the message. +-spec maybe_extract_message(Message, MsgFields) -> {Message1, MsgFields1} when + Message :: null | binary(), + Message1 :: null | binary(), + MsgFields :: map(), + MsgFields1 :: map(). +maybe_extract_message(null, #{ text := Text } = MsgFields) when is_binary(Text) -> + {Text, maps:remove(text, MsgFields)}; +maybe_extract_message(Msg, MsgFields) -> + {Msg, MsgFields}. + + +-spec format_msg(Data) -> {Message, #{ Key => Value } } when + Data :: {io:format(), [ term() ]} + | {report, logger:report()} + | {string, unicode:chardata()}, + Message :: binary() | null, + Key :: binary() | atom(), + Value :: jsx:json_term(). format_msg({string, Message}) -> - {unicode:characters_to_binary(Message), []}; + {unicode:characters_to_binary(Message), #{}}; format_msg({report, Report}) when is_map(Report) -> - format_msg({report, maps:to_list(Report)}); + {maps:get(msg, Report, null), safe_fields(Report)}; format_msg({report, Report}) when is_list(Report) -> - {proplists:get_value(msg, Report, null), safe_fields(Report)}; -format_msg({Format, Params}) -> - {unicode:characters_to_binary(io_lib:format(Format, Params)), []}. + format_msg({report, maps:from_list(Report)}); +format_msg({"Error in process ~p on node ~p with exit value:~n~p~n", [_, _, {undef, Undef}]}) -> + format_undef(Undef); +format_msg({Format, Params}) when is_list(Format), is_list(Params) -> + {unicode:characters_to_binary(io_lib:format(Format, Params)), #{}}; +format_msg(Other) -> + {unicode:characters_to_binary(io_lib:format("~p", [ Other ])), #{}}. + +format_undef([ {Module, Function, Args, _} | _ ] = Stack) when is_list(Args) -> + Arity = length(Args), + Message = io_lib:format("Undefined function ~p:~p/~p", [Module, Function, Arity]), + Report = #{ + result => error, + reason => undef, + module => Module, + function => Function, + args => Args, + stack => Stack + }, + {unicode:characters_to_binary(Message), safe_fields(Report)}. -spec format_timestamp(logger:metadata()) -> binary(). format_timestamp(#{time := Ts}) -> list_to_binary(calendar:system_time_to_rfc3339(Ts, [{unit, microsecond}, {offset, "Z"}])). --spec safe_meta(logger:metadata()) -> [{binary() | atom(), jsx:json_term()}]. +-spec safe_meta(logger:metadata()) -> #{ Key => Term } when + Key :: binary() | atom(), + Term :: jsx:json_term(). safe_meta(Meta) -> - safe_fields(maps:to_list(Meta)). + safe_fields(Meta). --spec safe_fields([{term(), term()}]) -> [{binary() | atom(), jsx:json_term()}]. +-spec safe_fields(map()) -> map(). safe_fields(Terms) -> - lists:map(fun safe_field/1, Terms). + maps:fold( + fun(K, V, Acc) -> + {K1, V1} = safe_field(K, V), + Acc#{ K1 => V1 } + end, + #{}, + Terms). --spec safe_field({atom() | binary() | atom(), term()}) -> {atom() | binary(), jsx:json_term()}. -safe_field({Key, Value}) when is_atom(Key); is_binary(Key) -> +-spec safe_field(atom() | binary() | string(), term()) -> {atom() | binary(), jsx:json_term()}. +safe_field(stack, Stack) when is_list(Stack) -> + {stack, safe_stack(Stack)}; +safe_field(file, Filename) when is_list(Filename) -> + {file, unicode:characters_to_binary(Filename)}; +safe_field(Key, Value) when is_atom(Key); is_binary(Key) -> {Key, safe_value(Value)}; -safe_field({Key, Value}) when is_list(Key) -> - safe_field({list_to_binary(Key), Value}). +safe_field(Key, Value) when is_list(Key) -> + safe_field(unicode:characters_to_binary(Key), Value). + +safe_stack(Stack) -> + lists:map(fun safe_stack_entry/1, Stack). + +safe_stack_entry({Mod, Fun, Args, _}) when is_atom(Mod), is_atom(Fun), is_list(Args) -> + Arity = length(Args), + Function = io_lib:format("~p:~p/~p", [Mod, Fun, Arity]), + #{ + function => unicode:characters_to_binary(Function) + }; +safe_stack_entry({Mod, Fun, Arity, Loc}) when is_atom(Mod), is_atom(Fun), is_integer(Arity) -> + Function = io_lib:format("~p:~p/~p", [ Mod, Fun, Arity ]), + #{ + function => unicode:characters_to_binary(Function), + at => unicode:characters_to_binary([stack_file(Loc), $:, integer_to_binary(stack_line(Loc))]) + }; +safe_stack_entry(Entry) -> + safe_value(Entry). + +stack_file(Loc) when is_list(Loc) -> proplists:get_value(file, Loc, ""); +stack_file({File, _}) -> File; +stack_file({File, _, _}) -> File; +stack_file(_) -> "". + +stack_line([ {_, _} | _ ] = Loc) -> proplists:get_value(line, Loc, ""); +stack_line({_, Line}) -> Line; +stack_line({_, Line, _}) -> Line; +stack_line(_) -> 0. -spec safe_value(term()) -> jsx:json_term(). safe_value(Pid) when is_pid(Pid) -> list_to_binary(pid_to_list(Pid)); +safe_value([]) -> + []; safe_value(List) when is_list(List) -> - case io_lib:char_list(List) of - true -> - list_to_binary(List); + case is_proplist(List) of + true -> safe_value(map_from_proplist(List)); false -> - lists:map(fun safe_value/1, List) + case is_ascii_list(List) of + true -> unicode:characters_to_binary(List); + false -> lists:map(fun safe_value/1, List) + end end; +safe_value(Map) when is_map(Map) -> + safe_fields(Map); safe_value(undefined) -> null; -safe_value(Val) when is_binary(Val); is_atom(Val); is_integer(Val) -> +safe_value(Val) when is_atom(Val); is_number(Val) -> Val; +safe_value(Val) when is_binary(Val) -> + maybe_truncate(Val); safe_value(Val) -> - unicode:characters_to_binary(io_lib:format("~p", [Val])). + maybe_truncate(unicode:characters_to_binary(io_lib:format("~p", [Val]))). + +% Map a proplists to a map +map_from_proplist(L) -> + lists:foldl( + fun + ({K,V}, Acc) -> Acc#{ K => V }; + (K, Acc) -> Acc#{ K => true } + end, + #{}, + L). + +% If something is a proplist, then we will display it as a map. +is_proplist([]) -> true; +is_proplist([ {K, _} | T ]) when is_atom(K); is_binary(K) -> is_proplist(T); +is_proplist([ K | T ]) when is_atom(K) -> is_proplist(T); +is_proplist(_) -> false. + +% Simple ASCII character string, typically SQL statements, filenames or literal texts. +is_ascii_list([]) -> true; +is_ascii_list([ C | T ]) when C >= 32, C =< 127 -> is_ascii_list(T); +is_ascii_list([ C | T ]) when C =:= $\n, C =:= $\t -> is_ascii_list(T); +is_ascii_list(_) -> false. + +maybe_truncate(Bin) when size(Bin) >= ?LOG_BINARY_SIZE -> + <> = Bin, + <>; +maybe_truncate(Bin) -> + Bin. diff --git a/test/logstasher_SUITE.erl b/test/logstasher_SUITE.erl index 03dfd7f..c2acd3c 100644 --- a/test/logstasher_SUITE.erl +++ b/test/logstasher_SUITE.erl @@ -9,7 +9,7 @@ -export([all/0, groups/0, init_per_testcase/2, end_per_testcase/2]). --export([logstasher_udp/1, logstasher_tcp/1]). +-export([logstasher_udp/1, logstasher_tcp/1, logstasher_message/1]). -spec all() -> [ct_suite:ct_test_def(), ...]. all() -> @@ -17,15 +17,17 @@ all() -> -spec groups() -> [ct_suite:ct_group_def(), ...]. groups() -> - [{logstasher, [sequence], [logstasher_udp, logstasher_tcp]}]. + [{logstasher, [sequence], [ + logstasher_udp, + logstasher_tcp, + logstasher_message + ]}]. -spec init_per_testcase(ct_suite:ct_testname(), ct_suite:ct_config()) -> ct_suite:ct_config() | {fail, term()} | {skip, term()}. init_per_testcase(_Name, Config) -> ok = logger:add_handler(logstash, logstasher_h, #{level => info}), ok = logger:update_primary_config(#{level => all}), - Config; -init_per_testcase(_Name, Config) -> Config. -spec end_per_testcase(ct_suite:ct_testname(), ct_suite:ct_config()) -> @@ -105,3 +107,27 @@ logstasher_tcp(_Config) -> end, ?assertEqual(list_to_binary(ErrorMsg), Msg). + + +-spec logstasher_message(ct_suite:ct_config()) -> ok | no_return(). +logstasher_message(_Config) -> + {ok, _Started} = application:ensure_all_started(logstasher), + #{ + message := <<"Hello">>, + fields := Fields1 + } = logstasher_h:log_data(#{ + level => info, + msg => {report, #{ msg => <<"Hello">> }}, + meta => #{ time => 0 } + }), + true = maps:is_key(msg, Fields1), + #{ + message := <<"Hello">>, + fields := Fields2 + } = logstasher_h:log_data(#{ + level => info, + msg => {report, #{ text => <<"Hello">> }}, + meta => #{ time => 0 } + }), + false = maps:is_key(text, Fields2), + ok. From 0f3e78d818b279ee575a1d3f59fe5aa48f1aa510 Mon Sep 17 00:00:00 2001 From: Marc Worrell Date: Mon, 15 Jul 2024 12:33:01 +0200 Subject: [PATCH 2/4] Fix types, add newer OTP releases --- .github/workflows/ci.yaml | 2 +- .github/workflows/hex.yaml | 27 --------------------------- src/logstasher.erl | 27 +++++++++++++++++---------- src/logstasher_h.erl | 11 +++-------- 4 files changed, 21 insertions(+), 46 deletions(-) delete mode 100644 .github/workflows/hex.yaml diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 8d25e96..f32fcfe 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -15,7 +15,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - otp: [22.3, 23.3, 24.2] + otp: [24.3, 25, 26] rebar: [3.18.0] steps: - uses: actions/checkout@v2 diff --git a/.github/workflows/hex.yaml b/.github/workflows/hex.yaml deleted file mode 100644 index 0449297..0000000 --- a/.github/workflows/hex.yaml +++ /dev/null @@ -1,27 +0,0 @@ -name: Publish to hex.pm - -on: - push: - tags: - - '*' - -jobs: - publish: - name: "Ubuntu" - runs-on: ubuntu-latest - strategy: - matrix: - otp: [24.2] - rebar: [3.18.0] - steps: - - uses: actions/checkout@v2 - - uses: erlef/setup-beam@v1 - with: - otp-version: ${{matrix.otp}} - rebar3-version: ${{matrix.rebar}} - - name: Publish to Hex.pm - env: - HEX_API_KEY: ${{ secrets.HEX_API_KEY }} - run: | - rebar3 edoc - rebar3 hex publish -r hexpm --yes diff --git a/src/logstasher.erl b/src/logstasher.erl index d0d8993..ac84c56 100644 --- a/src/logstasher.erl +++ b/src/logstasher.erl @@ -59,26 +59,33 @@ start_link() -> %% Supervisor callbacks %%============================================================================== --spec init(term()) -> {ok, maps:map()} | {stop, maps:map()}. +-type state() :: #{ + transport := module(), + host := string() | binary() | tuple(), + port := pos_integer(), + socket => gen_udp:socket() | gen_tcp:socket() | undefined + }. + +-spec init(_) -> {ok, state()}. init(_) -> Transport = application:get_env(?MODULE, transport, ?LOGSTASH_TRANSPORT), Host = application:get_env(?MODULE, host, ?LOGSTASH_HOST), Port = application:get_env(?MODULE, port, ?LOGSTASH_PORT), - Opts = #{transport => Transport, host => Host, port => Port}, - State = Opts#{socket => connect(Opts)}, + Opts = #{ transport => Transport, host => Host, port => Port }, + State = Opts#{ socket => connect(Opts) }, {ok, State}. --spec handle_call({send, binary()}, any(), maps:map()) -> - {reply, ok | {error, atom() | {timeout, binary()}}, maps:map()}. +-spec handle_call({send, binary()}, _, state()) -> + {reply, ok | {error, atom() | {timeout, binary()}}, state()}. handle_call({send, Data}, _, State) -> Result = maybe_send(Data, State), {reply, Result, State}. --spec handle_cast(term(), maps:map()) -> {noreply, maps:map()}. +-spec handle_cast(_, state()) -> {noreply, state()}. handle_cast(_, State) -> {noreply, State}. --spec terminate(term(), maps:map()) -> ok. +-spec terminate(_, state()) -> ok. terminate(_, #{transport := tcp, socket := Socket}) -> gen_tcp:close(Socket); terminate(_, #{transport := udp, socket := Socket}) -> @@ -90,7 +97,7 @@ terminate(_, #{transport := console}) -> %% Internal functions %%============================================================================== --spec connect(maps:map()) -> gen_udp:socket() | gen_tcp:socket() | undefined. +-spec connect(state()) -> gen_udp:socket() | gen_tcp:socket() | undefined. connect(#{transport := tcp, host := Host, port := Port}) -> Opts = [binary, {active, false}, {keepalive, true}], case gen_tcp:connect(Host, Port, Opts, ?TCP_CONNECT_TIMEOUT) of @@ -112,7 +119,7 @@ connect(#{transport := udp}) -> connect(#{transport := console}) -> undefined. --spec maybe_send(binary(), maps:map()) -> ok | {error, atom()}. +-spec maybe_send(binary(), map()) -> ok | {error, atom()}. maybe_send(Data, #{transport := console} = State) -> send(Data, State); maybe_send(Data, #{socket := undefined} = State) -> @@ -124,7 +131,7 @@ maybe_send(Data, State) -> {error, _} = Error -> Error end. --spec send(binary(), maps:map()) -> ok | {error, atom()}. +-spec send(binary(), map()) -> ok | {error, atom()}. send(Data, #{transport := console}) -> io:put_chars([ Data, "\n"]); send(_Data, #{socket := undefined}) -> diff --git a/src/logstasher_h.erl b/src/logstasher_h.erl index caa43c9..56d3559 100644 --- a/src/logstasher_h.erl +++ b/src/logstasher_h.erl @@ -7,7 +7,7 @@ -export([log_data/1]). %% Xref ignores --ignore_xref([log/2]). +-ignore_xref([log/2, log_data/1]). %% Truncate binary values beyond this size. -define(LOG_BINARY_SIZE, 2000). @@ -49,11 +49,6 @@ log_data(#{level := Level, msg := EventData, meta := Meta}) -> %% @doc If there is no message, try to extract the 'text' fields from the message fields %% and use that as the message. --spec maybe_extract_message(Message, MsgFields) -> {Message1, MsgFields1} when - Message :: null | binary(), - Message1 :: null | binary(), - MsgFields :: map(), - MsgFields1 :: map(). maybe_extract_message(null, #{ text := Text } = MsgFields) when is_binary(Text) -> {Text, maps:remove(text, MsgFields)}; maybe_extract_message(Msg, MsgFields) -> @@ -195,11 +190,11 @@ is_proplist(_) -> false. % Simple ASCII character string, typically SQL statements, filenames or literal texts. is_ascii_list([]) -> true; is_ascii_list([ C | T ]) when C >= 32, C =< 127 -> is_ascii_list(T); -is_ascii_list([ C | T ]) when C =:= $\n, C =:= $\t -> is_ascii_list(T); +is_ascii_list([ C | T ]) when C =:= $\n; C =:= $\t -> is_ascii_list(T); is_ascii_list(_) -> false. maybe_truncate(Bin) when size(Bin) >= ?LOG_BINARY_SIZE -> <> = Bin, - <>; + <>; maybe_truncate(Bin) -> Bin. From 4b9d7937dc6c53e2f55e7279fcde0c7e3c5d05a0 Mon Sep 17 00:00:00 2001 From: Marc Worrell Date: Mon, 15 Jul 2024 12:58:29 +0200 Subject: [PATCH 3/4] Update test --- test/logstasher_SUITE.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/logstasher_SUITE.erl b/test/logstasher_SUITE.erl index c2acd3c..11bfe7b 100644 --- a/test/logstasher_SUITE.erl +++ b/test/logstasher_SUITE.erl @@ -114,13 +114,12 @@ logstasher_message(_Config) -> {ok, _Started} = application:ensure_all_started(logstasher), #{ message := <<"Hello">>, - fields := Fields1 + fields := #{ msg := <<"Hello">>, severity := info } } = logstasher_h:log_data(#{ level => info, msg => {report, #{ msg => <<"Hello">> }}, meta => #{ time => 0 } }), - true = maps:is_key(msg, Fields1), #{ message := <<"Hello">>, fields := Fields2 From 117f71f882a5d48337a74263b757fac7c81eb8bc Mon Sep 17 00:00:00 2001 From: Marc Worrell Date: Mon, 15 Jul 2024 12:59:53 +0200 Subject: [PATCH 4/4] Send newline after data --- src/logstasher.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/logstasher.erl b/src/logstasher.erl index ac84c56..7000c5e 100644 --- a/src/logstasher.erl +++ b/src/logstasher.erl @@ -137,6 +137,6 @@ send(Data, #{transport := console}) -> send(_Data, #{socket := undefined}) -> {error, closed}; send(Data, #{transport := tcp, socket := Socket}) -> - gen_tcp:send(Socket, Data); + gen_tcp:send(Socket, [ Data, "\n" ]); send(Data, #{transport := udp, socket := Socket, host := Host, port := Port}) -> - gen_udp:send(Socket, Host, Port, Data). + gen_udp:send(Socket, Host, Port, [ Data, "\n" ]).