From 59cc5adc0ac91b9b757ad31a2ff0ff60e3a6d6ef Mon Sep 17 00:00:00 2001 From: Szilard Parrag Date: Tue, 29 Oct 2024 09:09:31 +0100 Subject: [PATCH 01/13] lib/logmsg: remove unused function declaration The definition was removed in a276841c51f33fd5028752d140214f9ad7513c46, but the declaration was not. Signed-off-by: Szilard Parrag --- lib/logmsg/logmsg.h | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/logmsg/logmsg.h b/lib/logmsg/logmsg.h index c513eb43f..194309d0b 100644 --- a/lib/logmsg/logmsg.h +++ b/lib/logmsg/logmsg.h @@ -552,7 +552,6 @@ void log_msg_registry_init(void); void log_msg_registry_deinit(void); void log_msg_global_init(void); void log_msg_global_deinit(void); -void log_msg_stats_global_init(void); void log_msg_registry_foreach(GHFunc func, gpointer user_data); gint log_msg_lookup_time_stamp_name(const gchar *name); From 8ee08ba3aa3f5f16a496238fd9b40150d245920d Mon Sep 17 00:00:00 2001 From: Szilard Parrag Date: Mon, 4 Nov 2024 13:33:09 +0100 Subject: [PATCH 02/13] lib/messages: add LogMessage rcptid field to MsgContext This was required as the aim is to centralize logging. Signed-off-by: Szilard Parrag --- lib/messages.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/messages.c b/lib/messages.c index 2134f7b64..b7268aa43 100644 --- a/lib/messages.c +++ b/lib/messages.c @@ -50,6 +50,7 @@ typedef struct _MsgContext guint16 recurse_state; guint recurse_warning:1; gchar recurse_trigger[128]; + guint64 original_msg_rcptid; } MsgContext; static gint active_log_level = -1; @@ -84,6 +85,8 @@ msg_set_context(LogMessage *msg) { MsgContext *context = msg_get_context(); + context->original_msg_rcptid = msg ? msg->rcptid : 0; + if (msg && (msg->flags & LF_INTERNAL)) { if (msg->recursed) From 3394f130b42d143281b09ebd74c9a00316ed9fbd Mon Sep 17 00:00:00 2001 From: Szilard Parrag Date: Mon, 4 Nov 2024 14:07:39 +0100 Subject: [PATCH 03/13] lib/messages: add msg rcptid printing to msg_event_create This is needed as there has to be central place where this is logged. Signed-off-by: Szilard Parrag --- lib/messages.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lib/messages.c b/lib/messages.c index b7268aa43..84d58df0e 100644 --- a/lib/messages.c +++ b/lib/messages.c @@ -243,6 +243,12 @@ msg_event_create(gint prio, const gchar *desc, EVTTAG *tag1, ...) evt_rec_add_tagsv(e, va); va_end(va); } + MsgContext *msg_context = msg_get_context(); + if (msg_context->original_msg_rcptid != 0) + { + EVTTAG *rcptid_tag = evt_tag_printf("rcptid", "%" G_GUINT64_FORMAT, msg_context->original_msg_rcptid); + evt_rec_add_tag(e, rcptid_tag); + } g_mutex_unlock(&evtlog_lock); return e; } From 3a259883f0b7398aa72eeb910816f0c39a1a9349 Mon Sep 17 00:00:00 2001 From: Szilard Parrag Date: Mon, 4 Nov 2024 14:40:19 +0100 Subject: [PATCH 04/13] all: remove duplicate rcptid printing This is the followup for the previous patch, making evt_tag_msg_reference obsolete. This patch also removes printing memory addresses. Signed-off-by: Szilard Parrag --- lib/filter/filter-call.c | 3 +- lib/filter/filter-cmp.c | 3 +- lib/filter/filter-in-list.c | 3 +- lib/filter/filter-netmask.c | 3 +- lib/filter/filter-netmask6.c | 3 +- lib/filter/filter-pipe.c | 6 ++-- lib/filter/filter-pri.c | 6 ++-- lib/filter/filter-re.c | 9 ++---- lib/filter/filter-tags.c | 9 ++---- lib/filterx/filterx-pipe.c | 6 ++-- lib/logmsg/logmsg.c | 14 ++++----- lib/logreader.c | 5 +-- lib/logsource.c | 3 +- lib/logthrsource/logthrsourcedrv.c | 3 +- lib/parser/parser-expr.c | 6 ++-- lib/rewrite/rewrite-expr.c | 9 ++---- lib/rewrite/rewrite-set-facility.c | 3 +- lib/rewrite/rewrite-set-pri.c | 3 +- lib/rewrite/rewrite-set-severity.c | 3 +- .../add-contextual-data/add-contextual-data.c | 3 +- modules/afsnmp/snmptrapd-parser.c | 3 +- modules/correlation/dbparser.c | 4 +-- modules/csvparser/csvparser.c | 3 +- modules/geoip2/geoip-parser.c | 3 +- modules/grpc/otel/otel-dest-worker.cpp | 3 +- modules/grpc/otel/otel-protobuf-parser.cpp | 31 +++++-------------- modules/json/json-parser.c | 16 ++++++---- modules/kvformat/kv-parser.c | 3 +- modules/map-value-pairs/map-value-pairs.c | 3 +- modules/metrics-probe/metrics-probe.c | 3 +- modules/python/python-logparser.c | 3 +- modules/python/python-tf.c | 11 ++++--- modules/regexp-parser/regexp-parser.c | 3 +- modules/syslogformat/sdata-parser.c | 3 +- modules/syslogformat/syslog-format.c | 3 +- modules/syslogformat/syslog-parser.c | 6 ++-- modules/systemd-journal/journal-reader.c | 5 +-- modules/tagsparser/tags-parser.c | 3 +- modules/timestamp/date-parser.c | 4 +-- modules/xml/windows-eventlog-xml-parser.c | 3 +- modules/xml/xml.c | 3 +- 41 files changed, 83 insertions(+), 139 deletions(-) diff --git a/lib/filter/filter-call.c b/lib/filter/filter-call.c index d309fed9c..07b40ce18 100644 --- a/lib/filter/filter-call.c +++ b/lib/filter/filter-call.c @@ -51,8 +51,7 @@ filter_call_eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTemplate stats_counter_inc(self->super.not_matched); msg_trace("filter() evaluation started", - evt_tag_str("called-rule", self->rule), - evt_tag_msg_reference(msgs[num_msg - 1])); + evt_tag_str("called-rule", self->rule)); return res ^ s->comp; } diff --git a/lib/filter/filter-cmp.c b/lib/filter/filter-cmp.c index bc2670e74..b51be61ce 100644 --- a/lib/filter/filter-cmp.c +++ b/lib/filter/filter-cmp.c @@ -302,8 +302,7 @@ fop_cmp_eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTemplateEval evt_tag_str("compare_mode", _compare_mode_to_string(self->compare_mode)), evt_tag_str("left_type", log_msg_value_type_to_str(left_type)), evt_tag_str("right_type", log_msg_value_type_to_str(right_type)), - evt_tag_int("result", result), - evt_tag_msg_reference(msgs[num_msg - 1])); + evt_tag_int("result", result)); scratch_buffers_reclaim_marked(marker); return result ^ s->comp; diff --git a/lib/filter/filter-in-list.c b/lib/filter/filter-in-list.c index 59edb2d83..e908578cc 100644 --- a/lib/filter/filter-in-list.c +++ b/lib/filter/filter-in-list.c @@ -51,8 +51,7 @@ filter_in_list_eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTempl gboolean result = (g_tree_lookup(self->tree, value) != NULL); msg_trace("in-list() evaluation started", - evt_tag_str("value", value), - evt_tag_msg_reference(msg)); + evt_tag_str("value", value)); return result ^ s->comp; } diff --git a/lib/filter/filter-netmask.c b/lib/filter/filter-netmask.c index 1d42f908e..665990536 100644 --- a/lib/filter/filter-netmask.c +++ b/lib/filter/filter-netmask.c @@ -66,8 +66,7 @@ filter_netmask_eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTempl msg_trace("netmask() evaluation started", evt_tag_inaddr("msg_address", addr), evt_tag_inaddr("address", &self->address), - evt_tag_inaddr("netmask", &self->netmask), - evt_tag_msg_reference(msg)); + evt_tag_inaddr("netmask", &self->netmask)); return res ^ s->comp; } diff --git a/lib/filter/filter-netmask6.c b/lib/filter/filter-netmask6.c index 3d5523d96..71bdbf508 100644 --- a/lib/filter/filter-netmask6.c +++ b/lib/filter/filter-netmask6.c @@ -131,8 +131,7 @@ _eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTemplateEvalOptions msg_trace("netmask6() evaluation started", evt_tag_inaddr6("msg_address", address), evt_tag_inaddr6("address", &self->address), - evt_tag_int("prefix", self->prefix), - evt_tag_msg_reference(msg)); + evt_tag_int("prefix", self->prefix)); return result ^ s->comp; } diff --git a/lib/filter/filter-pipe.c b/lib/filter/filter-pipe.c index 22e7b06c8..8eb9c1624 100644 --- a/lib/filter/filter-pipe.c +++ b/lib/filter/filter-pipe.c @@ -61,16 +61,14 @@ log_filter_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_op msg_trace(">>>>>> filter rule evaluation begin", evt_tag_str("rule", self->name), - log_pipe_location_tag(s), - evt_tag_msg_reference(msg)); + log_pipe_location_tag(s)); res = filter_expr_eval_root(self->expr, &msg, path_options); msg_trace("<<<<<< filter rule evaluation result", evt_tag_str("result", res ? "matched" : "unmatched"), evt_tag_str("rule", self->name), - log_pipe_location_tag(s), - evt_tag_msg_reference(msg)); + log_pipe_location_tag(s)); if (res) { diff --git a/lib/filter/filter-pri.c b/lib/filter/filter-pri.c index a8c759e6a..c5121ffd3 100644 --- a/lib/filter/filter-pri.c +++ b/lib/filter/filter-pri.c @@ -51,8 +51,7 @@ filter_facility_eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTemp } msg_trace("facility() evaluation started", evt_tag_int("fac", fac_num), - evt_tag_printf("valid_fac", "%08x", self->valid), - evt_tag_msg_reference(msg)); + evt_tag_printf("valid_fac", "%08x", self->valid)); return res ^ s->comp; } @@ -81,8 +80,7 @@ filter_severity_eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTemp msg_trace("severity() evaluation started", evt_tag_int("pri", pri), - evt_tag_printf("valid_pri", "%08x", self->valid), - evt_tag_msg_reference(msg)); + evt_tag_printf("valid_pri", "%08x", self->valid)); return res ^ s->comp; } diff --git a/lib/filter/filter-re.c b/lib/filter/filter-re.c index b340c0a57..93315f80e 100644 --- a/lib/filter/filter-re.c +++ b/lib/filter/filter-re.c @@ -46,8 +46,7 @@ filter_re_eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTemplateEv msg_trace("match() evaluation started against a name-value pair", evt_tag_msg_value_name("name", self->value_handle), evt_tag_msg_value("value", msg, self->value_handle), - evt_tag_str("pattern", self->matcher->pattern), - evt_tag_msg_reference(msg)); + evt_tag_str("pattern", self->matcher->pattern)); result = log_matcher_match_value(self->matcher, msg, self->value_handle); return result ^ s->comp; } @@ -180,8 +179,7 @@ filter_match_eval_against_program_pid_msg(FilterExprNode *s, LogMessage **msgs, msg_trace("match() evaluation started against constructed $PROGRAM[$PID]: $MESSAGE string for compatibility", evt_tag_printf("input", "%s", str), - evt_tag_str("pattern", self->super.matcher->pattern), - evt_tag_msg_reference(msg)); + evt_tag_str("pattern", self->super.matcher->pattern)); result = log_matcher_match_buffer(self->super.matcher, msg, str, -1); @@ -198,8 +196,7 @@ filter_match_eval_against_template(FilterExprNode *s, LogMessage **msgs, gint nu msg_trace("match() evaluation started against template", evt_tag_template("input", self->template, msg, options), evt_tag_str("pattern", self->super.matcher->pattern), - evt_tag_str("template", self->template->template_str), - evt_tag_msg_reference(msg)); + evt_tag_str("template", self->template->template_str)); gboolean result = log_matcher_match_template(self->super.matcher, msg, self->template, options); return result ^ s->comp; } diff --git a/lib/filter/filter-tags.c b/lib/filter/filter-tags.c index 79d1d6b9c..003449bbc 100644 --- a/lib/filter/filter-tags.c +++ b/lib/filter/filter-tags.c @@ -46,8 +46,7 @@ filter_tags_eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTemplate if (log_msg_is_tag_by_id(msg, tag_id)) { msg_trace("tags() evaluation result, matching tag is found", - evt_tag_str("tag", log_tags_get_by_id(tag_id)), - evt_tag_msg_reference(msg)); + evt_tag_str("tag", log_tags_get_by_id(tag_id))); res = TRUE; return res ^ s->comp; @@ -56,13 +55,11 @@ filter_tags_eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTemplate { msg_trace("tags() evaluation progress, tag is not set", evt_tag_str("tag", log_tags_get_by_id(tag_id)), - evt_tag_int("value", log_msg_is_tag_by_id(msg, tag_id)), - evt_tag_msg_reference(msg)); + evt_tag_int("value", log_msg_is_tag_by_id(msg, tag_id))); } } - msg_trace("tags() evaluation result, none of the tags is present", - evt_tag_msg_reference(msg)); + msg_trace("tags() evaluation result, none of the tags is present"); res = FALSE; return res ^ s->comp; } diff --git a/lib/filterx/filterx-pipe.c b/lib/filterx/filterx-pipe.c index cc4ccea9e..24ae9fdf9 100644 --- a/lib/filterx/filterx-pipe.c +++ b/lib/filterx/filterx-pipe.c @@ -61,8 +61,7 @@ log_filterx_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_o msg_trace(">>>>>> filterx rule evaluation begin", evt_tag_str("rule", self->name), - log_pipe_location_tag(s), - evt_tag_msg_reference(msg)); + log_pipe_location_tag(s)); NVTable *payload = nv_table_ref(msg->payload); eval_res = filterx_eval_exec(&eval_context, self->block, msg); @@ -71,8 +70,7 @@ log_filterx_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_o filterx_format_eval_result(eval_res), evt_tag_str("rule", self->name), log_pipe_location_tag(s), - evt_tag_int("dirty", filterx_scope_is_dirty(eval_context.scope)), - evt_tag_msg_reference(msg)); + evt_tag_int("dirty", filterx_scope_is_dirty(eval_context.scope))); local_path_options.filterx_context = &eval_context; switch (eval_res) diff --git a/lib/logmsg/logmsg.c b/lib/logmsg/logmsg.c index 67fce48f2..977e98c39 100644 --- a/lib/logmsg/logmsg.c +++ b/lib/logmsg/logmsg.c @@ -633,8 +633,7 @@ log_msg_set_value_with_type(LogMessage *self, NVHandle handle, msg_trace("Setting value", evt_tag_str("name", name), evt_tag_mem("value", value, value_len), - evt_tag_str("type", log_msg_value_type_to_str(type)), - evt_tag_msg_reference(self)); + evt_tag_str("type", log_msg_value_type_to_str(type))); } if (!log_msg_chk_flag(self, LF_STATE_OWN_PAYLOAD)) @@ -689,8 +688,7 @@ log_msg_unset_value(LogMessage *self, NVHandle handle) if (_log_name_value_updates(self)) { msg_trace("Unsetting value", - evt_tag_str("name", log_msg_get_value_name(handle, NULL)), - evt_tag_msg_reference(self)); + evt_tag_str("name", log_msg_get_value_name(handle, NULL))); } if (!log_msg_chk_flag(self, LF_STATE_OWN_PAYLOAD)) @@ -754,8 +752,7 @@ log_msg_set_value_indirect_with_type(LogMessage *self, NVHandle handle, evt_tag_str("type", log_msg_value_type_to_str(type)), evt_tag_int("ref_handle", ref_handle), evt_tag_int("ofs", ofs), - evt_tag_int("len", len), - evt_tag_msg_reference(self)); + evt_tag_int("len", len)); } if (!log_msg_chk_flag(self, LF_STATE_OWN_PAYLOAD)) @@ -788,6 +785,7 @@ log_msg_set_value_indirect_with_type(LogMessage *self, NVHandle handle, if (new_entry) log_msg_update_sdata(self, handle, name, name_len); log_msg_update_num_matches(self, handle); + } void @@ -1510,8 +1508,7 @@ log_msg_clone_cow(LogMessage *msg, const LogPathOptions *path_options) msg->allocated_bytes = allocated_bytes; msg_trace("Message was cloned", - evt_tag_printf("original_msg", "%p", msg), - evt_tag_msg_reference(self)); + evt_tag_printf("original_msg", "%p", msg)); /* every field _must_ be initialized explicitly if its direct * copying would cause problems (like copying a pointer by value) */ @@ -1537,6 +1534,7 @@ log_msg_clone_cow(LogMessage *msg, const LogPathOptions *path_options) if (self->num_tags == 0) self->flags |= LF_STATE_OWN_TAGS; + return self; } diff --git a/lib/logreader.c b/lib/logreader.c index 35fd7de28..139c04485 100644 --- a/lib/logreader.c +++ b/lib/logreader.c @@ -459,9 +459,9 @@ log_reader_handle_line(LogReader *self, const guchar *line, gint length, LogTran LogMessage *m; m = msg_format_construct_message(&self->options->parse_options, line, length); + msg_set_context(m); msg_debug("Incoming log entry", - evt_tag_mem("input", line, length), - evt_tag_msg_reference(m)); + evt_tag_mem("input", line, length)); msg_format_parse_into(&self->options->parse_options, m, line, length); @@ -487,6 +487,7 @@ log_reader_handle_line(LogReader *self, const guchar *line, gint length, LogTran log_transport_aux_data_foreach(aux, _add_aux_nvpair, m); log_source_post(&self->super, m); + msg_set_context(NULL); log_msg_refcache_stop(); return log_source_free_to_send(&self->super); } diff --git a/lib/logsource.c b/lib/logsource.c index 69116e3fe..908998d40 100644 --- a/lib/logsource.c +++ b/lib/logsource.c @@ -636,8 +636,7 @@ log_source_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options msg_set_context(msg); msg_diagnostics(">>>>>> Source side message processing begin", - log_pipe_location_tag(s), - evt_tag_msg_reference(msg)); + log_pipe_location_tag(s)); /* $HOST setup */ log_source_mangle_hostname(self, msg); diff --git a/lib/logthrsource/logthrsourcedrv.c b/lib/logthrsource/logthrsourcedrv.c index e495fccf7..4289909ff 100644 --- a/lib/logthrsource/logthrsourcedrv.c +++ b/lib/logthrsource/logthrsourcedrv.c @@ -411,8 +411,7 @@ log_threaded_source_worker_post(LogThreadedSourceWorker *self, LogMessage *msg) msg_debug("Incoming log message", evt_tag_str("input", log_msg_get_value(msg, LM_V_MESSAGE, NULL)), evt_tag_str("driver", self->control->super.super.id), - evt_tag_int("worker_index", self->worker_index), - evt_tag_msg_reference(msg)); + evt_tag_int("worker_index", self->worker_index)); _apply_message_attributes(self->control, msg); log_source_post(&self->super, msg); diff --git a/lib/parser/parser-expr.c b/lib/parser/parser-expr.c index 5dc3926ff..8bf2746f4 100644 --- a/lib/parser/parser-expr.c +++ b/lib/parser/parser-expr.c @@ -93,16 +93,14 @@ log_parser_queue_method(LogPipe *s, LogMessage *msg, const LogPathOptions *path_ msg_trace(">>>>>> parser rule evaluation begin", evt_tag_str("rule", self->name), - log_pipe_location_tag(s), - evt_tag_msg_reference(msg)); + log_pipe_location_tag(s)); success = log_parser_process_message(self, &msg, path_options); msg_trace("<<<<<< parser rule evaluation result", evt_tag_str("result", success ? "accepted" : "rejected"), evt_tag_str("rule", self->name), - log_pipe_location_tag(s), - evt_tag_msg_reference(msg)); + log_pipe_location_tag(s)); if (success) { diff --git a/lib/rewrite/rewrite-expr.c b/lib/rewrite/rewrite-expr.c index aa28988cb..5cf26434c 100644 --- a/lib/rewrite/rewrite-expr.c +++ b/lib/rewrite/rewrite-expr.c @@ -39,15 +39,13 @@ log_rewrite_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_option msg_trace(">>>>>> rewrite rule evaluation begin", evt_tag_str("rule", self->name), - log_pipe_location_tag(s), - evt_tag_msg_reference(msg)); + log_pipe_location_tag(s)); if (self->condition && !filter_expr_eval_root(self->condition, &msg, path_options)) { msg_trace("Rewrite condition unmatched, skipping rewrite", evt_tag_str("value", log_msg_get_value_name(self->value_handle, NULL)), evt_tag_str("rule", self->name), - log_pipe_location_tag(s), - evt_tag_msg_reference(msg)); + log_pipe_location_tag(s)); } else { @@ -55,8 +53,7 @@ log_rewrite_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_option } msg_trace("<<<<<< rewrite rule evaluation finished", evt_tag_str("rule", self->name), - log_pipe_location_tag(s), - evt_tag_msg_reference(msg)); + log_pipe_location_tag(s)); log_pipe_forward_msg(s, msg, path_options); } diff --git a/lib/rewrite/rewrite-set-facility.c b/lib/rewrite/rewrite-set-facility.c index d9e11b170..519097961 100644 --- a/lib/rewrite/rewrite-set-facility.c +++ b/lib/rewrite/rewrite-set-facility.c @@ -101,8 +101,7 @@ log_rewrite_set_facility_process(LogRewrite *s, LogMessage **pmsg, const LogPath msg_trace("Setting syslog facility", evt_tag_int("old_facility", (*pmsg)->pri & SYSLOG_FACMASK), - evt_tag_int("new_facility", facility), - evt_tag_msg_reference(*pmsg)); + evt_tag_int("new_facility", facility)); _set_msg_facility(*pmsg, facility); error: diff --git a/lib/rewrite/rewrite-set-pri.c b/lib/rewrite/rewrite-set-pri.c index 4ec9abb4e..8625fdca3 100644 --- a/lib/rewrite/rewrite-set-pri.c +++ b/lib/rewrite/rewrite-set-pri.c @@ -75,8 +75,7 @@ log_rewrite_set_pri_process(LogRewrite *s, LogMessage **pmsg, const LogPathOptio msg_trace("Setting syslog pri", evt_tag_int("old_pri", (*pmsg)->pri), - evt_tag_int("new_pri", pri), - evt_tag_msg_reference(*pmsg)); + evt_tag_int("new_pri", pri)); (*pmsg)->pri = pri; } diff --git a/lib/rewrite/rewrite-set-severity.c b/lib/rewrite/rewrite-set-severity.c index f50ab784c..b57f39212 100644 --- a/lib/rewrite/rewrite-set-severity.c +++ b/lib/rewrite/rewrite-set-severity.c @@ -102,8 +102,7 @@ log_rewrite_set_severity_process(LogRewrite *s, LogMessage **pmsg, const LogPath msg_trace("Setting syslog severity", evt_tag_int("old_severity", SYSLOG_PRI((*pmsg)->pri)), - evt_tag_int("new_severity", severity), - evt_tag_msg_reference(*pmsg)); + evt_tag_int("new_severity", severity)); _set_msg_severity(*pmsg, severity); error: diff --git a/modules/add-contextual-data/add-contextual-data.c b/modules/add-contextual-data/add-contextual-data.c index cef440074..8fc06fdc6 100644 --- a/modules/add-contextual-data/add-contextual-data.c +++ b/modules/add-contextual-data/add-contextual-data.c @@ -123,8 +123,7 @@ _process(LogParser *s, LogMessage **pmsg, msg_trace("add-contextual-data(): message lookup finished", evt_tag_str("message", input), evt_tag_str("resolved_selector", resolved_selector), - evt_tag_str("selector", selector), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("selector", selector)); if (selector) context_info_db_foreach_record(self->context_info_db, selector, diff --git a/modules/afsnmp/snmptrapd-parser.c b/modules/afsnmp/snmptrapd-parser.c index 6c280df32..63c83f919 100644 --- a/modules/afsnmp/snmptrapd-parser.c +++ b/modules/afsnmp/snmptrapd-parser.c @@ -178,8 +178,7 @@ snmptrapd_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions * log_msg_make_writable(pmsg, path_options); msg_trace("snmptrapd-parser message processing started", evt_tag_str ("input", input), - evt_tag_str ("prefix", self->prefix->str), - evt_tag_msg_reference(*pmsg)); + evt_tag_str ("prefix", self->prefix->str)); APPEND_ZERO(input, input, input_len); diff --git a/modules/correlation/dbparser.c b/modules/correlation/dbparser.c index a55a24d77..28bf8f87f 100644 --- a/modules/correlation/dbparser.c +++ b/modules/correlation/dbparser.c @@ -202,8 +202,7 @@ log_db_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *pat { log_msg_make_writable(pmsg, path_options); msg_trace("db-parser message processing started", - evt_tag_str("input", input), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("input", input)); if (G_UNLIKELY(self->super.super.template_obj)) matched = pattern_db_process_with_custom_message(self->db, *pmsg, input, input_len); else @@ -220,6 +219,7 @@ log_db_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *pat matched = TRUE; if (self->super.inject_mode == LDBP_IM_AGGREGATE_ONLY) matched = FALSE; + return matched; } diff --git a/modules/csvparser/csvparser.c b/modules/csvparser/csvparser.c index a5451859b..3cc25d4cf 100644 --- a/modules/csvparser/csvparser.c +++ b/modules/csvparser/csvparser.c @@ -282,8 +282,7 @@ csv_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_o msg_trace("csv-parser message processing started", evt_tag_str ("input", input), - evt_tag_str ("prefix", self->prefix), - evt_tag_msg_reference(msg)); + evt_tag_str ("prefix", self->prefix)); CSVScanner scanner; csv_scanner_init(&scanner, &self->options, input); diff --git a/modules/geoip2/geoip-parser.c b/modules/geoip2/geoip-parser.c index d7a243837..86a1a8ecb 100644 --- a/modules/geoip2/geoip-parser.c +++ b/modules/geoip2/geoip-parser.c @@ -95,8 +95,7 @@ maxminddb_parser_process(LogParser *s, LogMessage **pmsg, LogMessage *msg = log_msg_make_writable(pmsg, path_options); msg_trace("geoip2-parser message processing started", evt_tag_str("input", input), - evt_tag_str("prefix", self->prefix), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("prefix", self->prefix)); MMDB_entry_data_list_s *entry_data_list; if (!_mmdb_load_entry_data_list(self, input, &entry_data_list)) diff --git a/modules/grpc/otel/otel-dest-worker.cpp b/modules/grpc/otel/otel-dest-worker.cpp index d25d0c3a8..902d28103 100644 --- a/modules/grpc/otel/otel-dest-worker.cpp +++ b/modules/grpc/otel/otel-dest-worker.cpp @@ -357,8 +357,7 @@ DestWorker::insert(LogMessage *msg) drop: msg_error("OpenTelemetry: Failed to insert message, dropping message", - log_pipe_location_tag(&owner.super->super.super.super.super), - evt_tag_msg_reference(msg)); + log_pipe_location_tag(&owner.super->super.super.super.super)); /* LTR_DROP currently drops the entire batch */ return LTR_QUEUED; diff --git a/modules/grpc/otel/otel-protobuf-parser.cpp b/modules/grpc/otel/otel-protobuf-parser.cpp index 3b8928dd7..4c14ec521 100644 --- a/modules/grpc/otel/otel-protobuf-parser.cpp +++ b/modules/grpc/otel/otel-protobuf-parser.cpp @@ -58,7 +58,6 @@ _get_string_field(LogMessage *msg, NVHandle handle, gssize *len) if (type != LM_VT_STRING) { msg_error("OpenTelemetry: unexpected LogMessage type, while getting string field", - evt_tag_msg_reference(msg), evt_tag_str("name", log_msg_get_value_name(handle, NULL)), evt_tag_str("type", log_msg_value_type_to_str(type))); return nullptr; @@ -76,9 +75,8 @@ _get_protobuf_field(LogMessage *msg, NVHandle handle, gssize *len) if (type != LM_VT_PROTOBUF) { msg_error("OpenTelemetry: unexpected LogMessage type, while getting protobuf field", - evt_tag_msg_reference(msg), evt_tag_str("name", log_msg_get_value_name(handle, NULL)), - evt_tag_str("type", log_msg_value_type_to_str(type))); + evt_tag_str("type", log_msg_value_type_to_str(type)), NULL); return nullptr; } @@ -283,8 +281,7 @@ _parse_metadata(LogMessage *msg, bool set_hostname) Resource resource; if (!resource.ParsePartialFromArray(value, len)) { - msg_error("OpenTelemetry: Failed to deserialize .otel_raw.resource", - evt_tag_msg_reference(msg)); + msg_error("OpenTelemetry: Failed to deserialize .otel_raw.resource"); return false; } @@ -310,8 +307,7 @@ _parse_metadata(LogMessage *msg, bool set_hostname) InstrumentationScope scope; if (!scope.ParsePartialFromArray(value, len)) { - msg_error("OpenTelemetry: Failed to deserialize .otel_raw.scope", - evt_tag_msg_reference(msg)); + msg_error("OpenTelemetry: Failed to deserialize .otel_raw.scope"); return false; } @@ -390,8 +386,7 @@ _parse_log_record(LogMessage *msg) LogRecord log_record; if (!log_record.ParsePartialFromArray(raw_value, len)) { - msg_error("OpenTelemetry: Failed to deserialize .otel_raw.log", - evt_tag_msg_reference(msg)); + msg_error("OpenTelemetry: Failed to deserialize .otel_raw.log"); return false; } @@ -935,8 +930,7 @@ _parse_metric(LogMessage *msg) Metric metric; if (!metric.ParsePartialFromArray(raw_value, len)) { - msg_error("OpenTelemetry: Failed to deserialize .otel_raw.metric", - evt_tag_msg_reference(msg)); + msg_error("OpenTelemetry: Failed to deserialize .otel_raw.metric"); return false; } @@ -968,8 +962,7 @@ _parse_span(LogMessage *msg) Span span; if (!span.ParsePartialFromArray(raw_value, len)) { - msg_error("OpenTelemetry: Failed to deserialize .otel_raw.span", - evt_tag_msg_reference(msg)); + msg_error("OpenTelemetry: Failed to deserialize .otel_raw.span"); return false; } @@ -1186,7 +1179,6 @@ _value_case_equals_or_error(LogMessage *msg, const KeyValue &kv, const AnyValue: if (kv.value().value_case() != expected_value_case) { msg_error("OpenTelemetry: unexpected attribute value type, skipping", - evt_tag_msg_reference(msg), evt_tag_str("name", kv.key().c_str()), evt_tag_int("type", kv.value().value_case())); return false; @@ -1205,14 +1197,12 @@ syslogng::grpc::otel::ProtobufParser::set_syslog_ng_nv_pairs(LogMessage *msg, co if (!log_msg_value_type_from_str(type_as_str.c_str(), &log_msg_type)) { msg_debug("OpenTelemetry: unexpected attribute logmsg type, skipping", - evt_tag_msg_reference(msg), evt_tag_str("type", type_as_str.c_str())); continue; } if (nv_pairs_by_type.value().value_case() != AnyValue::kKvlistValue) { msg_debug("OpenTelemetry: unexpected attribute, skipping", - evt_tag_msg_reference(msg), evt_tag_str("key", type_as_str.c_str())); continue; } @@ -1245,7 +1235,6 @@ syslogng::grpc::otel::ProtobufParser::set_syslog_ng_macros(LogMessage *msg, cons else { msg_error("OpenTelemetry: unexpected attribute value type, skipping", - evt_tag_msg_reference(msg), evt_tag_str("name", macro.key().c_str()), evt_tag_int("type", macro.value().value_case())); } @@ -1271,7 +1260,6 @@ syslogng::grpc::otel::ProtobufParser::set_syslog_ng_macros(LogMessage *msg, cons else { msg_debug("OpenTelemetry: unexpected attribute macro, skipping", - evt_tag_msg_reference(msg), evt_tag_str("name", name.c_str())); } } @@ -1352,7 +1340,6 @@ syslogng::grpc::otel::ProtobufParser::store_syslog_ng(LogMessage *msg, const Log if (attr.value().value_case() != AnyValue::kKvlistValue) { msg_debug("OpenTelemetry: unexpected attribute, skipping", - evt_tag_msg_reference(msg), evt_tag_str("key", key.c_str())); continue; } @@ -1377,7 +1364,6 @@ syslogng::grpc::otel::ProtobufParser::store_syslog_ng(LogMessage *msg, const Log else { msg_debug("OpenTelemetry: unexpected attribute, skipping", - evt_tag_msg_reference(msg), evt_tag_str("key", key.c_str())); } } @@ -1395,8 +1381,7 @@ syslogng::grpc::otel::ProtobufParser::is_syslog_ng_log_record(const Resource &re bool syslogng::grpc::otel::ProtobufParser::process(LogMessage *msg) { - msg_trace("OpenTelemetry: message processing started", - evt_tag_msg_reference(msg)); + msg_trace("OpenTelemetry: message processing started"); gssize len; LogMessageValueType log_msg_type; @@ -1413,7 +1398,6 @@ syslogng::grpc::otel::ProtobufParser::process(LogMessage *msg) if (log_msg_type != LM_VT_STRING) { msg_error("OpenTelemetry: unexpected .otel_raw.type LogMessage type", - evt_tag_msg_reference(msg), evt_tag_str("log_msg_type", log_msg_value_type_to_str(log_msg_type))); return false; } @@ -1439,7 +1423,6 @@ syslogng::grpc::otel::ProtobufParser::process(LogMessage *msg) else { msg_error("OpenTelemetry: unexpected .otel_raw.type", - evt_tag_msg_reference(msg), evt_tag_str("type", type.c_str())); return false; } diff --git a/modules/json/json-parser.c b/modules/json/json-parser.c index fa284be2e..27e7f75dd 100644 --- a/modules/json/json-parser.c +++ b/modules/json/json-parser.c @@ -332,12 +332,12 @@ json_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_ JSONParser *self = (JSONParser *) s; struct json_object *jso; struct json_tokener *tok; + gboolean success = TRUE; msg_trace("json-parser message processing started", evt_tag_str("input", input), evt_tag_str("prefix", self->prefix), - evt_tag_str("marker", self->marker), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("marker", self->marker)); if (self->marker) { if (strncmp(input, self->marker, self->marker_len) != 0) @@ -345,7 +345,8 @@ json_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_ msg_debug("json-parser(): no marker at the beginning of the message, skipping JSON parsing ", evt_tag_str("input", input), evt_tag_str("marker", self->marker)); - return FALSE; + success = FALSE; + goto exit; } input += self->marker_len; @@ -361,7 +362,8 @@ json_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_ evt_tag_str("input", input), tok->err != json_tokener_success ? evt_tag_str ("json_error", json_tokener_error_desc(tok->err)) : NULL); json_tokener_free (tok); - return FALSE; + success = FALSE; + goto exit; } json_tokener_free(tok); @@ -372,11 +374,13 @@ json_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_ evt_tag_str("input", input), evt_tag_str("extract_prefix", self->extract_prefix)); json_object_put(jso); - return FALSE; + success = FALSE; + goto exit; } json_object_put(jso); - return TRUE; +exit: + return success; } static LogPipe * diff --git a/modules/kvformat/kv-parser.c b/modules/kvformat/kv-parser.c index 5f213d6e3..1b1eec36d 100644 --- a/modules/kvformat/kv-parser.c +++ b/modules/kvformat/kv-parser.c @@ -112,8 +112,7 @@ _process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_options, co log_msg_make_writable(pmsg, path_options); msg_trace("kv-parser message processing started", evt_tag_str("input", input), - evt_tag_str("prefix", self->prefix), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("prefix", self->prefix)); /* FIXME: input length */ kv_scanner_input(&kv_scanner, input); while (kv_scanner_scan_next(&kv_scanner)) diff --git a/modules/map-value-pairs/map-value-pairs.c b/modules/map-value-pairs/map-value-pairs.c index 62a75ebf4..5f2d6f081 100644 --- a/modules/map-value-pairs/map-value-pairs.c +++ b/modules/map-value-pairs/map-value-pairs.c @@ -42,8 +42,7 @@ _process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_options, GlobalConfig *cfg = log_pipe_get_config(&s->super); LogMessage *msg = log_msg_make_writable(pmsg, path_options); msg_trace("value-pairs message processing started", - evt_tag_str("input", input), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("input", input)); LogTemplateEvalOptions options = {&cfg->template_options, LTZ_LOCAL, 0, NULL, LM_VT_STRING}; value_pairs_foreach(self->value_pairs, _map_name_values, diff --git a/modules/metrics-probe/metrics-probe.c b/modules/metrics-probe/metrics-probe.c index 9239de334..c46aa3245 100644 --- a/modules/metrics-probe/metrics-probe.c +++ b/modules/metrics-probe/metrics-probe.c @@ -95,8 +95,7 @@ _process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_options, co MetricsProbe *self = (MetricsProbe *) s; msg_trace("metrics-probe message processing started", - evt_tag_str("key", self->metrics_template->key), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("key", self->metrics_template->key)); if (!dyn_metrics_template_is_enabled(self->metrics_template)) return TRUE; diff --git a/modules/python/python-logparser.c b/modules/python/python-logparser.c index 93fa01087..f477d4b13 100644 --- a/modules/python/python-logparser.c +++ b/modules/python/python-logparser.c @@ -203,8 +203,7 @@ python_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *pat msg_trace("python-parser message processing started", evt_tag_str("input", input), evt_tag_str("parser", self->super.name), - evt_tag_str("class", self->binding.class), - evt_tag_msg_reference(msg)); + evt_tag_str("class", self->binding.class)); PyObject *msg_object = py_log_message_new(msg, cfg); result = _py_invoke_parser_process(self, msg_object); diff --git a/modules/python/python-tf.c b/modules/python/python-tf.c index 9fd51e29a..846fd8c07 100644 --- a/modules/python/python-tf.c +++ b/modules/python/python-tf.c @@ -58,7 +58,8 @@ static PyObject * _py_invoke_template_function(PythonTfState *state, const gchar *function_name, LogMessage *msg, gint argc, GString *const *argv) { - PyObject *callable, *ret, *args; + PyObject *callable, *args; + PyObject *ret = NULL; callable = _py_resolve_qualified_name(function_name); if (!callable) @@ -69,12 +70,11 @@ _py_invoke_template_function(PythonTfState *state, const gchar *function_name, L evt_tag_str("function", function_name), evt_tag_str("exception", _py_format_exception_text(buf, sizeof(buf)))); _py_finish_exception_handling(); - return NULL; + goto exit; } msg_debug("$(python): Invoking Python template function", - evt_tag_str("function", function_name), - evt_tag_msg_reference(msg)); + evt_tag_str("function", function_name)); args = _py_construct_args_tuple(state, msg, argc, argv); ret = PyObject_CallObject(callable, args); @@ -89,8 +89,9 @@ _py_invoke_template_function(PythonTfState *state, const gchar *function_name, L evt_tag_str("function", function_name), evt_tag_str("exception", _py_format_exception_text(buf, sizeof(buf)))); _py_finish_exception_handling(); - return NULL; + goto exit; } +exit: return ret; } diff --git a/modules/regexp-parser/regexp-parser.c b/modules/regexp-parser/regexp-parser.c index 4910d829e..8b3f2f796 100644 --- a/modules/regexp-parser/regexp-parser.c +++ b/modules/regexp-parser/regexp-parser.c @@ -103,8 +103,7 @@ regexp_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *pat log_msg_make_writable(pmsg, path_options); msg_trace("regexp-parser message processing started", evt_tag_str("input", input), - evt_tag_str("prefix", self->prefix), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("prefix", self->prefix)); gboolean result = FALSE; for (GList *item = self->matchers; item; item = item->next) diff --git a/modules/syslogformat/sdata-parser.c b/modules/syslogformat/sdata-parser.c index 6522f1df5..d6c35e253 100644 --- a/modules/syslogformat/sdata-parser.c +++ b/modules/syslogformat/sdata-parser.c @@ -33,8 +33,7 @@ sdata_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path msg = log_msg_make_writable(pmsg, path_options); msg_trace("sdata-parser() message processing started", - evt_tag_str("input", input), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("input", input)); const guchar *data = (const guchar *) input; gint data_len = input_len; diff --git a/modules/syslogformat/syslog-format.c b/modules/syslogformat/syslog-format.c index 102d82e1b..2e3a1a640 100644 --- a/modules/syslogformat/syslog-format.c +++ b/modules/syslogformat/syslog-format.c @@ -953,8 +953,7 @@ _syslog_format_check_framing(LogMessage *msg, const guchar **data, gint *length) /* we did indeed find a series of digits that look like framing, that's * probably not what was intended. */ msg_debug("RFC5425 style octet count was found at the start of the message, this is probably not what was intended", - evt_tag_mem("data", *data, src - (*data)), - evt_tag_msg_reference(msg)); + evt_tag_mem("data", *data, src - (*data))); log_msg_set_tag_by_id(msg, LM_T_SYSLOG_UNEXPECTED_FRAMING); *data = src; *length = left; diff --git a/modules/syslogformat/syslog-parser.c b/modules/syslogformat/syslog-parser.c index f2071673c..3cdc9e278 100644 --- a/modules/syslogformat/syslog-parser.c +++ b/modules/syslogformat/syslog-parser.c @@ -40,13 +40,13 @@ syslog_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *pat msg = log_msg_make_writable(pmsg, path_options); msg_trace("syslog-parser message processing started", - evt_tag_str("input", input), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("input", input)); if (self->drop_invalid) { gsize problem_position = 0; - return msg_format_try_parse_into(&self->parse_options, msg, (guchar *) input, input_len, &problem_position); + gboolean res = msg_format_try_parse_into(&self->parse_options, msg, (guchar *) input, input_len, &problem_position); + return res; } else { diff --git a/modules/systemd-journal/journal-reader.c b/modules/systemd-journal/journal-reader.c index 1dd4e296b..c543d70d6 100644 --- a/modules/systemd-journal/journal-reader.c +++ b/modules/systemd-journal/journal-reader.c @@ -311,11 +311,12 @@ _handle_message(JournalReader *self) _set_program(self->options, msg); _set_transport(msg); + msg_set_context(msg); msg_debug("Incoming log entry from journal", - evt_tag_printf("input", "%s", log_msg_get_value(msg, LM_V_MESSAGE, NULL)), - evt_tag_msg_reference(msg)); + evt_tag_printf("input", "%s", log_msg_get_value(msg, LM_V_MESSAGE, NULL))); log_source_post(&self->super, msg); + msg_set_context(NULL); return log_source_free_to_send(&self->super); } diff --git a/modules/tagsparser/tags-parser.c b/modules/tagsparser/tags-parser.c index 8d09c8639..4cfc791ed 100644 --- a/modules/tagsparser/tags-parser.c +++ b/modules/tagsparser/tags-parser.c @@ -39,8 +39,7 @@ _process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_options, co { LogMessage *msg = log_msg_make_writable(pmsg, path_options); msg_trace("tags-parser message processing started", - evt_tag_str("input", input), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("input", input)); ListScanner scanner; list_scanner_init(&scanner); diff --git a/modules/timestamp/date-parser.c b/modules/timestamp/date-parser.c index 428b920d3..a1287dd66 100644 --- a/modules/timestamp/date-parser.c +++ b/modules/timestamp/date-parser.c @@ -171,8 +171,7 @@ date_parser_process(LogParser *s, UnixTime time_stamp; msg_trace("date-parser message processing started", - evt_tag_str("input", input), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("input", input)); /* this macro ensures zero termination by copying input to a * g_alloca()-d buffer if necessary. In most cases it's not though. @@ -186,7 +185,6 @@ date_parser_process(LogParser *s, if (res) _store_timestamp(self, msg, &time_stamp); - return res; } diff --git a/modules/xml/windows-eventlog-xml-parser.c b/modules/xml/windows-eventlog-xml-parser.c index 208afb705..0df281022 100644 --- a/modules/xml/windows-eventlog-xml-parser.c +++ b/modules/xml/windows-eventlog-xml-parser.c @@ -88,8 +88,7 @@ _process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_options, co XMLScanner xml_scanner; msg_trace("windows-eventlog-xml-parser message processing started", evt_tag_str ("input", input), - evt_tag_str ("prefix", self->prefix), - evt_tag_msg_reference(*pmsg)); + evt_tag_str ("prefix", self->prefix)); PushParams push_params = {.msg = msg, .create_lists = self->create_lists, .prefix = self->prefix}; xml_scanner_init(&xml_scanner, &self->options, &scanner_push_function, &push_params, self->prefix); diff --git a/modules/xml/xml.c b/modules/xml/xml.c index ddf3ab214..81cf22444 100644 --- a/modules/xml/xml.c +++ b/modules/xml/xml.c @@ -106,8 +106,7 @@ xml_parser_process(LogParser *s, LogMessage **pmsg, XMLScanner xml_scanner; msg_trace("xml-parser message processing started", evt_tag_str ("input", input), - evt_tag_str ("prefix", self->prefix), - evt_tag_msg_reference(*pmsg)); + evt_tag_str ("prefix", self->prefix)); PushParams push_params = {.msg = msg, .create_lists = self->create_lists}; xml_scanner_init(&xml_scanner, &self->options, &scanner_push_function, &push_params, self->prefix); From 525a26b1b4799572f15703dc5c8332ecb619dcc9 Mon Sep 17 00:00:00 2001 From: Szilard Parrag Date: Fri, 8 Nov 2024 15:16:05 +0100 Subject: [PATCH 05/13] lib: remove msg_set_context(NULL) calls It's done as their values will be overwritten anyway. Signed-off-by: Szilard Parrag --- lib/logsource.c | 2 -- lib/logthrdest/logthrdestdrv.c | 1 - 2 files changed, 3 deletions(-) diff --git a/lib/logsource.c b/lib/logsource.c index 908998d40..930f3dfa7 100644 --- a/lib/logsource.c +++ b/lib/logsource.c @@ -685,8 +685,6 @@ log_source_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options log_pipe_location_tag(s), evt_tag_printf("msg", "%p", msg), evt_tag_printf("rcptid", "%" G_GUINT64_FORMAT, rcptid)); - - msg_set_context(NULL); } static void diff --git a/lib/logthrdest/logthrdestdrv.c b/lib/logthrdest/logthrdestdrv.c index 83a0d15c3..1fe888274 100644 --- a/lib/logthrdest/logthrdestdrv.c +++ b/lib/logthrdest/logthrdestdrv.c @@ -496,7 +496,6 @@ _perform_inserts(LogThreadedDestWorker *self) _perform_flush(self); log_msg_unref(msg); - msg_set_context(NULL); log_msg_refcache_stop(); flush_error: From 984582269fa6865e689dde297e6b94662eac858b Mon Sep 17 00:00:00 2001 From: Szilard Parrag Date: Fri, 8 Nov 2024 16:08:08 +0100 Subject: [PATCH 06/13] modules/systemd-journal: move msg_set_context sooner It's done to ensure the following logging calls have the correct rcptid. Signed-off-by: Szilard Parrag --- modules/systemd-journal/journal-reader.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/systemd-journal/journal-reader.c b/modules/systemd-journal/journal-reader.c index c543d70d6..2077b4c73 100644 --- a/modules/systemd-journal/journal-reader.c +++ b/modules/systemd-journal/journal-reader.c @@ -300,6 +300,7 @@ static gboolean _handle_message(JournalReader *self) { LogMessage *msg = log_msg_new_empty(); + msg_set_context(msg); msg->pri = self->options->default_pri; @@ -311,7 +312,6 @@ _handle_message(JournalReader *self) _set_program(self->options, msg); _set_transport(msg); - msg_set_context(msg); msg_debug("Incoming log entry from journal", evt_tag_printf("input", "%s", log_msg_get_value(msg, LM_V_MESSAGE, NULL))); From 759ed69c56d152d7e9fa39cedc96c2712abc0390 Mon Sep 17 00:00:00 2001 From: Szilard Parrag Date: Fri, 8 Nov 2024 16:07:37 +0100 Subject: [PATCH 07/13] all: consolidate msg_set_context calls This patch adds msg_set_context calls to places where LogMessages are popped from the queue. This happens after a message has been "read"/"consumed" (from the source side) but before it is sent (destination pops from the queue). Signed-off-by: Szilard Parrag --- lib/logqueue-fifo.c | 1 + modules/diskq/logqueue-disk-non-reliable.c | 1 + modules/diskq/logqueue-disk-reliable.c | 1 + 3 files changed, 3 insertions(+) diff --git a/lib/logqueue-fifo.c b/lib/logqueue-fifo.c index 5e90ea2b6..6fb122cd8 100644 --- a/lib/logqueue-fifo.c +++ b/lib/logqueue-fifo.c @@ -489,6 +489,7 @@ log_queue_fifo_pop_head(LogQueue *s, LogPathOptions *path_options) if (!node->flow_control_requested) self->backlog_queue.non_flow_controlled_len++; + msg_set_context(msg); return msg; } diff --git a/modules/diskq/logqueue-disk-non-reliable.c b/modules/diskq/logqueue-disk-non-reliable.c index 9a5e8a2b7..25f6ab4ea 100644 --- a/modules/diskq/logqueue-disk-non-reliable.c +++ b/modules/diskq/logqueue-disk-non-reliable.c @@ -345,6 +345,7 @@ _pop_head(LogQueue *s, LogPathOptions *path_options) if (stats_update) log_queue_queued_messages_dec(s); + msg_set_context(msg); return msg; } diff --git a/modules/diskq/logqueue-disk-reliable.c b/modules/diskq/logqueue-disk-reliable.c index 9ae17630b..eb7e77f14 100644 --- a/modules/diskq/logqueue-disk-reliable.c +++ b/modules/diskq/logqueue-disk-reliable.c @@ -310,6 +310,7 @@ _pop_head(LogQueue *s, LogPathOptions *path_options) log_queue_disk_restart_corrupted(&self->super); g_mutex_unlock(&s->lock); + msg_set_context(msg); return msg; } From 467f8f5d97f8e0e84f3dc8a241e5db59b7288afe Mon Sep 17 00:00:00 2001 From: Szilard Parrag Date: Fri, 8 Nov 2024 16:08:55 +0100 Subject: [PATCH 08/13] lib/logscheduler: call msg_set_context This is required here as the LogMessage popping is done rather oddly. Signed-off-by: Szilard Parrag --- lib/logscheduler.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/logscheduler.c b/lib/logscheduler.c index 5ff89ae11..62f847755 100644 --- a/lib/logscheduler.c +++ b/lib/logscheduler.c @@ -87,6 +87,7 @@ _work(gpointer s, gpointer arg) iv_list_del(&node->list); LogMessage *msg = log_msg_ref(node->msg); + msg_set_context(msg); LogPathOptions path_options = LOG_PATH_OPTIONS_INIT; path_options.ack_needed = node->ack_needed; @@ -96,6 +97,7 @@ _work(gpointer s, gpointer arg) log_msg_refcache_start_consumer(msg, &path_options); _reinject_message(partition->front_pipe, msg, &path_options); + msg_set_context(NULL); log_msg_unref(msg); log_msg_refcache_stop(); } From a18d0aa475b323d886355961368842f99c94b302 Mon Sep 17 00:00:00 2001 From: Szilard Parrag Date: Fri, 8 Nov 2024 17:30:01 +0100 Subject: [PATCH 09/13] lib: remove msg_set_context calls at sending side This is done as the previous commits set the context when the messages are popped from the queue. Signed-off-by: Szilard Parrag --- lib/logthrdest/logthrdestdrv.c | 1 - lib/logwriter.c | 1 - 2 files changed, 2 deletions(-) diff --git a/lib/logthrdest/logthrdestdrv.c b/lib/logthrdest/logthrdestdrv.c index 1fe888274..a98404125 100644 --- a/lib/logthrdest/logthrdestdrv.c +++ b/lib/logthrdest/logthrdestdrv.c @@ -484,7 +484,6 @@ _perform_inserts(LogThreadedDestWorker *self) break; } - msg_set_context(msg); log_msg_refcache_start_consumer(msg, &path_options); self->batch_size++; diff --git a/lib/logwriter.c b/lib/logwriter.c index 236b04683..2a5433ac5 100644 --- a/lib/logwriter.c +++ b/lib/logwriter.c @@ -1218,7 +1218,6 @@ log_writer_write_message(LogWriter *self, LogMessage *msg, LogPathOptions *path_ *write_error = FALSE; log_msg_refcache_start_consumer(msg, path_options); - msg_set_context(msg); log_writer_format_log(self, msg, self->line_buffer); From a5449054f67e08959defab629334c1a8728dcf36 Mon Sep 17 00:00:00 2001 From: Szilard Parrag Date: Fri, 8 Nov 2024 16:15:34 +0100 Subject: [PATCH 10/13] all: add msg_set_context() calls for LogThreadedSourceDriver subclasses Signed-off-by: Szilard Parrag --- modules/darwinosl/darwinosl-source.m | 1 + .../sources/random-choice-generator/random-choice-generator.cpp | 1 + .../threaded-random-generator/threaded-random-generator.c | 1 + modules/grpc/common/grpc-source-worker.cpp | 1 + modules/grpc/otel/otel-source-services.hpp | 1 + modules/python/python-source.c | 1 + 6 files changed, 6 insertions(+) diff --git a/modules/darwinosl/darwinosl-source.m b/modules/darwinosl/darwinosl-source.m index bf5cdbb2d..5feaf36c1 100644 --- a/modules/darwinosl/darwinosl-source.m +++ b/modules/darwinosl/darwinosl-source.m @@ -281,6 +281,7 @@ void darwinosl_sd_options_defaults(DarwinOSLogSourceOptions *self, LogMessage *msg; gsize msg_len = _log_message_from_string(log_string, self->options.format_options, &msg); + msg_set_context(msg); log_msg_set_value_to_string(msg, LM_V_TRANSPORT, "local+darwinoslog"); LogThreadedFetchResult result = {THREADED_FETCH_SUCCESS, msg}; _log_reader_insert_msg_length_stats(self, msg_len); diff --git a/modules/examples/sources/random-choice-generator/random-choice-generator.cpp b/modules/examples/sources/random-choice-generator/random-choice-generator.cpp index a2fd8ed38..dc5c7e0b2 100644 --- a/modules/examples/sources/random-choice-generator/random-choice-generator.cpp +++ b/modules/examples/sources/random-choice-generator/random-choice-generator.cpp @@ -102,6 +102,7 @@ SourceWorker::run() std::string random_choice = driver.choices[rand() % driver.choices.size()]; LogMessage *msg = log_msg_new_empty(); + msg_set_context(msg); log_msg_set_value(msg, LM_V_MESSAGE, random_choice.c_str(), -1); log_threaded_source_worker_blocking_post(&super->super, msg); diff --git a/modules/examples/sources/threaded-random-generator/threaded-random-generator.c b/modules/examples/sources/threaded-random-generator/threaded-random-generator.c index 327a21386..f832dcde6 100644 --- a/modules/examples/sources/threaded-random-generator/threaded-random-generator.c +++ b/modules/examples/sources/threaded-random-generator/threaded-random-generator.c @@ -86,6 +86,7 @@ _worker_run(LogThreadedSourceWorker *w) format_hex_string(random_bytes, control->bytes, random_hex_str, random_hex_str_size); LogMessage *msg = log_msg_new_empty(); + msg_set_context(msg); log_msg_set_value(msg, LM_V_MESSAGE, random_hex_str, -1); log_threaded_source_worker_blocking_post(w, msg); diff --git a/modules/grpc/common/grpc-source-worker.cpp b/modules/grpc/common/grpc-source-worker.cpp index 9eab33005..e4ec8030c 100644 --- a/modules/grpc/common/grpc-source-worker.cpp +++ b/modules/grpc/common/grpc-source-worker.cpp @@ -35,6 +35,7 @@ SourceWorker::SourceWorker(GrpcSourceWorker *s, SourceDriver &d) void SourceWorker::post(LogMessage *msg) { + msg_set_context(msg); log_threaded_source_worker_blocking_post(&super->super, msg); } diff --git a/modules/grpc/otel/otel-source-services.hpp b/modules/grpc/otel/otel-source-services.hpp index 91ad61a9f..e5b77da68 100644 --- a/modules/grpc/otel/otel-source-services.hpp +++ b/modules/grpc/otel/otel-source-services.hpp @@ -123,6 +123,7 @@ syslogng::grpc::otel::TraceServiceCall::Proceed(bool ok) } LogMessage *msg = log_msg_new_empty(); + msg_set_context(msg); ProtobufParser::store_raw_metadata(msg, ctx.peer(), resource, resource_spans_schema_url, scope, scope_spans_schema_url); ProtobufParser::store_raw(msg, span); diff --git a/modules/python/python-source.c b/modules/python/python-source.c index f643fc69c..64ddd7819 100644 --- a/modules/python/python-source.c +++ b/modules/python/python-source.c @@ -591,6 +591,7 @@ py_log_source_post(PyObject *s, PyObject *args, PyObject *kwrds) /* keep a reference until the PyLogMessage instance is freed */ LogMessage *message = log_msg_ref(pymsg->msg); + msg_set_context(message); sd->post_message(sd, message); Py_RETURN_NONE; From e3c8d2b967a4ec078e6ba0ebcf50a0eefdbcd0ff Mon Sep 17 00:00:00 2001 From: Szilard Parrag Date: Fri, 8 Nov 2024 16:18:42 +0100 Subject: [PATCH 11/13] all: add msg_set_context() calls for LogThreadedFetcherDriver subclasses Signed-off-by: Szilard Parrag --- .../sources/threaded-diskq-source/threaded-diskq-source.c | 1 + modules/mqtt/source/mqtt-source.c | 1 + modules/python/python-fetcher.c | 1 + 3 files changed, 3 insertions(+) diff --git a/modules/examples/sources/threaded-diskq-source/threaded-diskq-source.c b/modules/examples/sources/threaded-diskq-source/threaded-diskq-source.c index 7fc57cfd8..50688c15e 100644 --- a/modules/examples/sources/threaded-diskq-source/threaded-diskq-source.c +++ b/modules/examples/sources/threaded-diskq-source/threaded-diskq-source.c @@ -151,6 +151,7 @@ _fetch(LogThreadedFetcherDriver *s) gint64 remaining_messages = log_queue_get_length(self->queue); LogMessage *msg = log_queue_pop_head(self->queue, &local_options); + msg_set_context(msg); if (!msg) { diff --git a/modules/mqtt/source/mqtt-source.c b/modules/mqtt/source/mqtt-source.c index afb08c4ec..4bc9cd0be 100644 --- a/modules/mqtt/source/mqtt-source.c +++ b/modules/mqtt/source/mqtt-source.c @@ -191,6 +191,7 @@ _fetch(LogThreadedFetcherDriver *s) if (result == THREADED_FETCH_SUCCESS) { msg = log_msg_new_empty(); + msg_set_context(msg); log_msg_set_value(msg, LM_V_MESSAGE, (gchar *)message->payload, message->payloadlen); log_msg_set_value(msg, handle_mqtt_topic, topicName, topicLen); log_msg_set_value_to_string(msg, LM_V_TRANSPORT, "mqtt"); diff --git a/modules/python/python-fetcher.c b/modules/python/python-fetcher.c index bb43723d9..c3157f5bf 100644 --- a/modules/python/python-fetcher.c +++ b/modules/python/python-fetcher.c @@ -236,6 +236,7 @@ _py_invoke_fetch(PythonFetcherDriver *self, LogMessage **msg) /* keep a reference until the PyLogMessage instance is freed */ *msg = log_msg_ref(pymsg->msg); + msg_set_context(*msg); } Py_XDECREF(ret); From 0743300910e12fa4559c973d102782781ad4ce49 Mon Sep 17 00:00:00 2001 From: Szilard Parrag Date: Fri, 8 Nov 2024 17:31:36 +0100 Subject: [PATCH 12/13] lib/logsource: remove msg_set_context() call This is done as the subclasses already set it themselves, making this call redundant. Signed-off-by: Szilard Parrag --- lib/logsource.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/logsource.c b/lib/logsource.c index 930f3dfa7..76c6ea346 100644 --- a/lib/logsource.c +++ b/lib/logsource.c @@ -633,8 +633,6 @@ log_source_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options LogSource *self = (LogSource *) s; gint i; - msg_set_context(msg); - msg_diagnostics(">>>>>> Source side message processing begin", log_pipe_location_tag(s)); From 02093f90188cd0113f79cdaddddca7f6db66ca7c Mon Sep 17 00:00:00 2001 From: Szilard Parrag Date: Tue, 5 Nov 2024 09:20:58 +0100 Subject: [PATCH 13/13] lib/logmsg: remove obsolete evt_tag_msg_reference macro Signed-off-by: Szilard Parrag --- lib/logmsg/logmsg.h | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/logmsg/logmsg.h b/lib/logmsg/logmsg.h index 194309d0b..76593b78b 100644 --- a/lib/logmsg/logmsg.h +++ b/lib/logmsg/logmsg.h @@ -558,10 +558,6 @@ gint log_msg_lookup_time_stamp_name(const gchar *name); gssize log_msg_get_size(LogMessage *self); -#define evt_tag_msg_reference(msg) \ - evt_tag_printf("msg", "%p", (msg)), \ - evt_tag_printf("rcptid", "%" G_GUINT64_FORMAT, (msg)->rcptid) - static inline EVTTAG * evt_tag_msg_value(const gchar *name, LogMessage *msg, NVHandle value_handle) {