From 2d1db7717a75c61136b469e7b8f6048d1e8a3ec7 Mon Sep 17 00:00:00 2001 From: Attila Szakacs Date: Fri, 11 Oct 2024 14:45:52 +0200 Subject: [PATCH 01/13] logthrdest: fix leaking worker_partition_key template Signed-off-by: Attila Szakacs --- lib/logthrdest/logthrdestdrv.c | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/logthrdest/logthrdestdrv.c b/lib/logthrdest/logthrdestdrv.c index 134358422..83a0d15c3 100644 --- a/lib/logthrdest/logthrdestdrv.c +++ b/lib/logthrdest/logthrdestdrv.c @@ -1482,6 +1482,7 @@ log_threaded_dest_driver_free(LogPipe *s) LogThreadedDestDriver *self = (LogThreadedDestDriver *)s; g_free(self->workers); + log_template_unref(self->worker_partition_key); log_dest_driver_free((LogPipe *)self); } From 695ef4aab093d10de64bdae948417d66fa2db0e5 Mon Sep 17 00:00:00 2001 From: Attila Szakacs Date: Fri, 11 Oct 2024 14:45:13 +0200 Subject: [PATCH 02/13] grpc/metrics: fix leaking stats cluster key Signed-off-by: Attila Szakacs --- modules/grpc/common/metrics/grpc-metrics.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/grpc/common/metrics/grpc-metrics.cpp b/modules/grpc/common/metrics/grpc-metrics.cpp index 4085032f4..2220a20c4 100644 --- a/modules/grpc/common/metrics/grpc-metrics.cpp +++ b/modules/grpc/common/metrics/grpc-metrics.cpp @@ -99,6 +99,8 @@ DestDriverMetrics::create_grpc_request_cluster(::grpc::StatusCode response_code) StatsCounterItem *counter; cluster = stats_register_counter(stats_level, sc_key, SC_TYPE_SINGLE_VALUE, &counter); + + stats_cluster_key_free(sc_key); } stats_cluster_key_builder_pop(kb); From 61d21347560087b9522a7f9898732870371ac9d6 Mon Sep 17 00:00:00 2001 From: Attila Szakacs Date: Fri, 11 Oct 2024 14:44:35 +0200 Subject: [PATCH 03/13] grpc: add missing worker super deinit() Signed-off-by: Attila Szakacs --- modules/grpc/common/grpc-dest-worker.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/grpc/common/grpc-dest-worker.cpp b/modules/grpc/common/grpc-dest-worker.cpp index d11652b97..52189d94b 100644 --- a/modules/grpc/common/grpc-dest-worker.cpp +++ b/modules/grpc/common/grpc-dest-worker.cpp @@ -78,6 +78,7 @@ DestWorker::init() void DestWorker::deinit() { + log_threaded_dest_worker_deinit_method(&super->super); } bool From e246b5f84ca522d5c8cc92fd0f4ab19ae018ff06 Mon Sep 17 00:00:00 2001 From: Attila Szakacs Date: Thu, 10 Oct 2024 22:23:48 +0200 Subject: [PATCH 04/13] grpc/loki: flush on worker-partition-key change Out intention is to batch messages that correspond to the same label set. If between two messages the label set changes, but they arrive to the same worker, we should flush the previous batch. Signed-off-by: Attila Szakacs --- modules/grpc/loki/loki-dest.cpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/modules/grpc/loki/loki-dest.cpp b/modules/grpc/loki/loki-dest.cpp index ac6dff79a..38fa21433 100644 --- a/modules/grpc/loki/loki-dest.cpp +++ b/modules/grpc/loki/loki-dest.cpp @@ -93,9 +93,14 @@ DestinationDriver::init() } if (log_template_is_literal_string(worker_partition_key)) - log_template_unref(worker_partition_key); + { + log_template_unref(worker_partition_key); + } else - log_threaded_dest_driver_set_worker_partition_key_ref(&this->super->super.super.super, worker_partition_key); + { + log_threaded_dest_driver_set_worker_partition_key_ref(&this->super->super.super.super, worker_partition_key); + log_threaded_dest_driver_set_flush_on_worker_key_change(&this->super->super.super.super, TRUE); + } return syslogng::grpc::DestDriver::init(); } From fc7423de1a45a2fe7837df37c0230ec3eb5086c1 Mon Sep 17 00:00:00 2001 From: Attila Szakacs Date: Thu, 10 Oct 2024 22:52:05 +0200 Subject: [PATCH 05/13] grpc: add worker_partition_key related functions to base class Signed-off-by: Attila Szakacs --- modules/grpc/common/grpc-dest.cpp | 33 ++++++++++++++++++++++++++++++- modules/grpc/common/grpc-dest.hpp | 14 +++++++++++++ modules/grpc/loki/loki-dest.cpp | 31 ++--------------------------- 3 files changed, 48 insertions(+), 30 deletions(-) diff --git a/modules/grpc/common/grpc-dest.cpp b/modules/grpc/common/grpc-dest.cpp index 587b9f9a5..560500f2b 100644 --- a/modules/grpc/common/grpc-dest.cpp +++ b/modules/grpc/common/grpc-dest.cpp @@ -31,11 +31,39 @@ using namespace syslogng::grpc; DestDriver::DestDriver(GrpcDestDriver *s) : super(s), compression(false), batch_bytes(4 * 1000 * 1000), - keepalive_time(-1), keepalive_timeout(-1), keepalive_max_pings_without_data(-1) + keepalive_time(-1), keepalive_timeout(-1), keepalive_max_pings_without_data(-1), + flush_on_key_change(false) { credentials_builder_wrapper.self = &credentials_builder; } +bool +DestDriver::set_worker_partition_key() +{ + GlobalConfig *cfg = log_pipe_get_config(&this->super->super.super.super.super); + + LogTemplate *worker_partition_key_tpl = log_template_new(cfg, NULL); + if (!log_template_compile(worker_partition_key_tpl, this->worker_partition_key.str().c_str(), NULL)) + { + msg_error("Error compiling worker partition key template", + evt_tag_str("template", this->worker_partition_key.str().c_str())); + return false; + } + + if (log_template_is_literal_string(worker_partition_key_tpl)) + { + log_template_unref(worker_partition_key_tpl); + } + else + { + log_threaded_dest_driver_set_worker_partition_key_ref(&this->super->super.super.super, worker_partition_key_tpl); + log_threaded_dest_driver_set_flush_on_worker_key_change(&this->super->super.super.super, + this->flush_on_key_change); + } + + return true; +} + bool DestDriver::init() { @@ -51,6 +79,9 @@ DestDriver::init() return false; } + if (this->worker_partition_key.rdbuf()->in_avail() && !this->set_worker_partition_key()) + return false; + if (!log_threaded_dest_driver_init_method(&this->super->super.super.super.super)) return false; diff --git a/modules/grpc/common/grpc-dest.hpp b/modules/grpc/common/grpc-dest.hpp index 4667269aa..261ccf27c 100644 --- a/modules/grpc/common/grpc-dest.hpp +++ b/modules/grpc/common/grpc-dest.hpp @@ -37,6 +37,7 @@ #include #include +#include namespace syslogng { namespace grpc { @@ -118,11 +119,21 @@ class DestDriver this->headers.push_back(std::make_pair(name, value)); } + void extend_worker_partition_key(const std::string &extension) + { + if (this->worker_partition_key.rdbuf()->in_avail()) + this->worker_partition_key << ","; + this->worker_partition_key << extension; + } + GrpcClientCredentialsBuilderW *get_credentials_builder_wrapper() { return &this->credentials_builder_wrapper; } +private: + bool set_worker_partition_key(); + public: GrpcDestDriver *super; DestDriverMetrics metrics; @@ -139,6 +150,9 @@ class DestDriver int keepalive_timeout; int keepalive_max_pings_without_data; + std::stringstream worker_partition_key; + bool flush_on_key_change; + std::list> int_extra_channel_args; std::list> string_extra_channel_args; diff --git a/modules/grpc/loki/loki-dest.cpp b/modules/grpc/loki/loki-dest.cpp index 38fa21433..cb48086db 100644 --- a/modules/grpc/loki/loki-dest.cpp +++ b/modules/grpc/loki/loki-dest.cpp @@ -43,6 +43,7 @@ DestinationDriver::DestinationDriver(GrpcDestDriver *s) : syslogng::grpc::DestDriver(s), timestamp(LM_TS_PROCESSED) { this->url = "localhost:9095"; + this->flush_on_key_change = true; log_template_options_defaults(&this->template_options); } @@ -71,36 +72,8 @@ DestinationDriver::init() log_template_options_init(&this->template_options, cfg); - LogTemplate *worker_partition_key = log_template_new(cfg, NULL); - - std::stringstream template_str; - bool comma_needed = false; for (const auto &label : this->labels) - { - if (comma_needed) - template_str << ","; - template_str << label.name << "=" << label.value->template_str; - - comma_needed = true; - } - - std::string worker_partition_key_str = template_str.str(); - if (!log_template_compile(worker_partition_key, worker_partition_key_str.c_str(), NULL)) - { - msg_error("Error compiling worker partition key template", - evt_tag_str("template", worker_partition_key_str.c_str())); - return false; - } - - if (log_template_is_literal_string(worker_partition_key)) - { - log_template_unref(worker_partition_key); - } - else - { - log_threaded_dest_driver_set_worker_partition_key_ref(&this->super->super.super.super, worker_partition_key); - log_threaded_dest_driver_set_flush_on_worker_key_change(&this->super->super.super.super, TRUE); - } + this->extend_worker_partition_key(label.name + "=" + label.value->template_str); return syslogng::grpc::DestDriver::init(); } From ce6b2d9ef87cf59e11554df460b314bb1d83e07a Mon Sep 17 00:00:00 2001 From: Attila Szakacs Date: Fri, 11 Oct 2024 11:02:47 +0200 Subject: [PATCH 06/13] grpc: factor out loki::Label class as grpc::NameValueTemplatePair Will be useful for templated headers as well. Signed-off-by: Attila Szakacs --- modules/grpc/common/grpc-dest.hpp | 27 +++++++++++++++++++++++++++ modules/grpc/loki/loki-dest.cpp | 2 +- modules/grpc/loki/loki-dest.hpp | 29 +---------------------------- 3 files changed, 29 insertions(+), 29 deletions(-) diff --git a/modules/grpc/common/grpc-dest.hpp b/modules/grpc/common/grpc-dest.hpp index 261ccf27c..7f42f788f 100644 --- a/modules/grpc/common/grpc-dest.hpp +++ b/modules/grpc/common/grpc-dest.hpp @@ -42,6 +42,33 @@ namespace syslogng { namespace grpc { +struct NameValueTemplatePair +{ + std::string name; + LogTemplate *value; + + NameValueTemplatePair(std::string name_, LogTemplate *value_) + : name(name_), value(log_template_ref(value_)) {} + + NameValueTemplatePair(const NameValueTemplatePair &a) + : name(a.name), value(log_template_ref(a.value)) {} + + NameValueTemplatePair &operator=(const NameValueTemplatePair &a) + { + name = a.name; + log_template_unref(value); + value = log_template_ref(a.value); + + return *this; + } + + ~NameValueTemplatePair() + { + log_template_unref(value); + } + +}; + class DestDriver { public: diff --git a/modules/grpc/loki/loki-dest.cpp b/modules/grpc/loki/loki-dest.cpp index cb48086db..8fe0601f9 100644 --- a/modules/grpc/loki/loki-dest.cpp +++ b/modules/grpc/loki/loki-dest.cpp @@ -56,7 +56,7 @@ DestinationDriver::~DestinationDriver() void DestinationDriver::add_label(std::string name, LogTemplate *value) { - this->labels.push_back(Label{name, value}); + this->labels.push_back(NameValueTemplatePair{name, value}); } bool diff --git a/modules/grpc/loki/loki-dest.hpp b/modules/grpc/loki/loki-dest.hpp index 8b4f70d44..5b56ea6f5 100644 --- a/modules/grpc/loki/loki-dest.hpp +++ b/modules/grpc/loki/loki-dest.hpp @@ -43,33 +43,6 @@ namespace syslogng { namespace grpc { namespace loki { -struct Label -{ - std::string name; - LogTemplate *value; - - Label(std::string name_, LogTemplate *value_) - : name(name_), value(log_template_ref(value_)) {} - - Label(const Label &a) - : name(a.name), value(log_template_ref(a.value)) {} - - Label &operator=(const Label &a) - { - name = a.name; - log_template_unref(value); - value = log_template_ref(a.value); - - return *this; - } - - ~Label() - { - log_template_unref(value); - } - -}; - class DestinationDriver final : public syslogng::grpc::DestDriver { public: @@ -118,7 +91,7 @@ class DestinationDriver final : public syslogng::grpc::DestDriver std::string tenant_id; LogTemplate *message = nullptr; - std::vector