diff --git a/lib/logthrdest/logthrdestdrv.c b/lib/logthrdest/logthrdestdrv.c index 1343584228..83a0d15c37 100644 --- a/lib/logthrdest/logthrdestdrv.c +++ b/lib/logthrdest/logthrdestdrv.c @@ -1482,6 +1482,7 @@ log_threaded_dest_driver_free(LogPipe *s) LogThreadedDestDriver *self = (LogThreadedDestDriver *)s; g_free(self->workers); + log_template_unref(self->worker_partition_key); log_dest_driver_free((LogPipe *)self); } diff --git a/modules/grpc/bigquery/bigquery-dest.cpp b/modules/grpc/bigquery/bigquery-dest.cpp index f8033ac0f3..e329449a38 100644 --- a/modules/grpc/bigquery/bigquery-dest.cpp +++ b/modules/grpc/bigquery/bigquery-dest.cpp @@ -92,13 +92,11 @@ DestinationDriver::DestinationDriver(GrpcDestDriver *s) { this->url = "bigquerystorage.googleapis.com"; this->credentials_builder.set_mode(GCAM_ADC); - log_template_options_defaults(&this->template_options); } DestinationDriver::~DestinationDriver() { g_list_free_full(this->protobuf_schema.values, _template_unref); - log_template_options_destroy(&this->template_options); } bool @@ -166,9 +164,6 @@ DestinationDriver::init() return false; } - GlobalConfig *cfg = log_pipe_get_config(&this->super->super.super.super.super); - log_template_options_init(&this->template_options, cfg); - if (this->protobuf_schema.proto_path.empty()) this->construct_schema_prototype(); else @@ -246,7 +241,7 @@ DestinationDriver::construct_schema_prototype() for (auto &field : this->fields) { google::protobuf::FieldDescriptorProto *field_desc_proto = descriptor_proto->add_field(); - field_desc_proto->set_name(field.name); + field_desc_proto->set_name(field.nv.name); field_desc_proto->set_type(field.type); field_desc_proto->set_number(num++); } @@ -376,14 +371,6 @@ bigquery_dd_set_protobuf_schema(LogDriver *d, const gchar *proto_path, GList *va cpp->set_protobuf_schema(proto_path, values); } -LogTemplateOptions * -bigquery_dd_get_template_options(LogDriver *d) -{ - GrpcDestDriver *self = (GrpcDestDriver *) d; - DestinationDriver *cpp = bigquery_dd_get_cpp(self); - return &cpp->get_template_options(); -} - LogDriver * bigquery_dd_new(GlobalConfig *cfg) { diff --git a/modules/grpc/bigquery/bigquery-dest.h b/modules/grpc/bigquery/bigquery-dest.h index a63490493e..77af41f5bc 100644 --- a/modules/grpc/bigquery/bigquery-dest.h +++ b/modules/grpc/bigquery/bigquery-dest.h @@ -40,8 +40,6 @@ void bigquery_dd_set_table(LogDriver *d, const gchar *table); gboolean bigquery_dd_add_field(LogDriver *d, const gchar *name, const gchar *type, LogTemplate *value); void bigquery_dd_set_protobuf_schema(LogDriver *d, const gchar *proto_path, GList *values); -LogTemplateOptions *bigquery_dd_get_template_options(LogDriver *d); - #include "compat/cpp-end.h" #endif diff --git a/modules/grpc/bigquery/bigquery-dest.hpp b/modules/grpc/bigquery/bigquery-dest.hpp index a6e3f57c8a..d60fd42635 100644 --- a/modules/grpc/bigquery/bigquery-dest.hpp +++ b/modules/grpc/bigquery/bigquery-dest.hpp @@ -49,33 +49,25 @@ namespace bigquery { struct Field { - std::string name; + NameValueTemplatePair nv; google::protobuf::FieldDescriptorProto::Type type; - LogTemplate *value; const google::protobuf::FieldDescriptor *field_desc; Field(std::string name_, google::protobuf::FieldDescriptorProto::Type type_, LogTemplate *value_) - : name(name_), type(type_), value(log_template_ref(value_)), field_desc(nullptr) {} + : nv(name_, value_), type(type_), field_desc(nullptr) {} Field(const Field &a) - : name(a.name), type(a.type), value(log_template_ref(a.value)), field_desc(a.field_desc) {} + : nv(a.nv), type(a.type), field_desc(a.field_desc) {} Field &operator=(const Field &a) { - name = a.name; + nv = a.nv; type = a.type; - log_template_unref(value); - value = log_template_ref(a.value); field_desc = a.field_desc; return *this; } - ~Field() - { - log_template_unref(value); - } - }; class DestinationDriver final : public syslogng::grpc::DestDriver @@ -91,11 +83,6 @@ class DestinationDriver final : public syslogng::grpc::DestDriver bool add_field(std::string name, std::string type, LogTemplate *value); void set_protobuf_schema(std::string proto_path, GList *values); - LogTemplateOptions &get_template_options() - { - return this->template_options; - } - void set_project(std::string p) { this->project = p; @@ -132,8 +119,6 @@ class DestinationDriver final : public syslogng::grpc::DestDriver bool load_protobuf_schema(); private: - LogTemplateOptions template_options; - std::string project; std::string dataset; std::string table; diff --git a/modules/grpc/bigquery/bigquery-grammar.ym b/modules/grpc/bigquery/bigquery-grammar.ym index 8261721426..cef67cc111 100644 --- a/modules/grpc/bigquery/bigquery-grammar.ym +++ b/modules/grpc/bigquery/bigquery-grammar.ym @@ -90,7 +90,6 @@ bigquery_dest_option free($3); } | grpc_dest_option - | { last_template_options = bigquery_dd_get_template_options(last_driver); } template_option ; bigquery_schema_fields diff --git a/modules/grpc/bigquery/bigquery-worker.cpp b/modules/grpc/bigquery/bigquery-worker.cpp index bc908ccf2d..c2703ca28e 100644 --- a/modules/grpc/bigquery/bigquery-worker.cpp +++ b/modules/grpc/bigquery/bigquery-worker.cpp @@ -180,13 +180,13 @@ DestinationWorker::insert_field(const google::protobuf::Reflection *reflection, LogMessageValueType type; - Slice value = this->format_template(field.value, msg, buf, &type); + Slice value = this->format_template(field.nv.value, msg, buf, &type); if (type == LM_VT_NULL) { if (field.field_desc->is_required()) { - msg_error("Missing required field", evt_tag_str("field", field.name.c_str())); + msg_error("Missing required field", evt_tag_str("field", field.nv.name.c_str())); goto error; } diff --git a/modules/grpc/common/grpc-dest-worker.cpp b/modules/grpc/common/grpc-dest-worker.cpp index d11652b971..134b1cc21c 100644 --- a/modules/grpc/common/grpc-dest-worker.cpp +++ b/modules/grpc/common/grpc-dest-worker.cpp @@ -28,6 +28,10 @@ #include "grpc-dest-worker.hpp" +#include "compat/cpp-start.h" +#include "scratch-buffers.h" +#include "compat/cpp-end.h" + using namespace syslogng::grpc; /* C++ Implementations */ @@ -60,6 +64,7 @@ DestWorker::create_channel_args() args.SetCompressionAlgorithm(GRPC_COMPRESS_GZIP); args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); + args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1); for (auto nv : this->owner.int_extra_channel_args) args.SetInt(nv.first, nv.second); @@ -78,6 +83,7 @@ DestWorker::init() void DestWorker::deinit() { + log_threaded_dest_worker_deinit_method(&super->super); } bool @@ -94,8 +100,37 @@ DestWorker::disconnect() void DestWorker::prepare_context(::grpc::ClientContext &context) { + g_assert(!this->owner.dynamic_headers_enabled); + + for (auto nv : owner.headers) + context.AddMetadata(nv.name, nv.value->template_str); +} + +void +DestWorker::prepare_context_dynamic(::grpc::ClientContext &context, LogMessage *msg) +{ + g_assert(this->owner.dynamic_headers_enabled); + + LogTemplateEvalOptions options = {&this->owner.template_options, LTZ_SEND, this->super->super.seq_num, NULL, + LM_VT_STRING + }; + + ScratchBuffersMarker marker; + GString *buf = scratch_buffers_alloc_and_mark(&marker); + for (auto nv : owner.headers) - context.AddMetadata(nv.first, nv.second); + { + if (log_template_is_literal_string(nv.value)) + { + context.AddMetadata(nv.name, log_template_get_literal_value(nv.value, NULL)); + continue; + } + + log_template_format(nv.value, msg, &options, buf); + context.AddMetadata(nv.name, buf->str); + } + + scratch_buffers_reclaim_marked(marker); } /* C Wrappers */ diff --git a/modules/grpc/common/grpc-dest-worker.hpp b/modules/grpc/common/grpc-dest-worker.hpp index ea4ee43e61..169f72d59f 100644 --- a/modules/grpc/common/grpc-dest-worker.hpp +++ b/modules/grpc/common/grpc-dest-worker.hpp @@ -51,6 +51,7 @@ class DestWorker protected: void prepare_context(::grpc::ClientContext &context); + void prepare_context_dynamic(::grpc::ClientContext &context, LogMessage *msg); std::shared_ptr<::grpc::ChannelCredentials> create_credentials(); ::grpc::ChannelArguments create_channel_args(); diff --git a/modules/grpc/common/grpc-dest.cpp b/modules/grpc/common/grpc-dest.cpp index 587b9f9a5d..7887f25d91 100644 --- a/modules/grpc/common/grpc-dest.cpp +++ b/modules/grpc/common/grpc-dest.cpp @@ -31,14 +31,50 @@ using namespace syslogng::grpc; DestDriver::DestDriver(GrpcDestDriver *s) : super(s), compression(false), batch_bytes(4 * 1000 * 1000), - keepalive_time(-1), keepalive_timeout(-1), keepalive_max_pings_without_data(-1) + keepalive_time(-1), keepalive_timeout(-1), keepalive_max_pings_without_data(-1), + flush_on_key_change(false), dynamic_headers_enabled(false) { + log_template_options_defaults(&this->template_options); credentials_builder_wrapper.self = &credentials_builder; } +DestDriver::~DestDriver() +{ + log_template_options_destroy(&this->template_options); +} + +bool +DestDriver::set_worker_partition_key() +{ + GlobalConfig *cfg = log_pipe_get_config(&this->super->super.super.super.super); + + LogTemplate *worker_partition_key_tpl = log_template_new(cfg, NULL); + if (!log_template_compile(worker_partition_key_tpl, this->worker_partition_key.str().c_str(), NULL)) + { + msg_error("Error compiling worker partition key template", + evt_tag_str("template", this->worker_partition_key.str().c_str())); + return false; + } + + if (log_template_is_literal_string(worker_partition_key_tpl)) + { + log_template_unref(worker_partition_key_tpl); + } + else + { + log_threaded_dest_driver_set_worker_partition_key_ref(&this->super->super.super.super, worker_partition_key_tpl); + log_threaded_dest_driver_set_flush_on_worker_key_change(&this->super->super.super.super, + this->flush_on_key_change); + } + + return true; +} + bool DestDriver::init() { + GlobalConfig *cfg = log_pipe_get_config(&this->super->super.super.super.super); + if (url.length() == 0) { msg_error("url() option is mandatory", @@ -51,6 +87,11 @@ DestDriver::init() return false; } + if (this->worker_partition_key.rdbuf()->in_avail() && !this->set_worker_partition_key()) + return false; + + log_template_options_init(&this->template_options, cfg); + if (!log_threaded_dest_driver_init_method(&this->super->super.super.super.super)) return false; @@ -128,11 +169,18 @@ grpc_dd_add_string_channel_arg(LogDriver *s, const gchar *name, const gchar *val self->cpp->add_extra_channel_arg(name, value); } -void -grpc_dd_add_header(LogDriver *s, const gchar *name, const gchar *value) +gboolean +grpc_dd_add_header(LogDriver *s, const gchar *name, LogTemplate *value) { GrpcDestDriver *self = (GrpcDestDriver *) s; - self->cpp->add_header(name, value); + return self->cpp->add_header(name, value); +} + +LogTemplateOptions * +grpc_dd_get_template_options(LogDriver *d) +{ + GrpcDestDriver *self = (GrpcDestDriver *) d; + return &self->cpp->get_template_options(); } GrpcClientCredentialsBuilderW * diff --git a/modules/grpc/common/grpc-dest.h b/modules/grpc/common/grpc-dest.h index 9c051cb667..a1aaf791e2 100644 --- a/modules/grpc/common/grpc-dest.h +++ b/modules/grpc/common/grpc-dest.h @@ -42,7 +42,8 @@ void grpc_dd_set_keepalive_timeout(LogDriver *s, gint t); void grpc_dd_set_keepalive_max_pings(LogDriver *s, gint p); void grpc_dd_add_int_channel_arg(LogDriver *s, const gchar *name, glong value); void grpc_dd_add_string_channel_arg(LogDriver *s, const gchar *name, const gchar *value); -void grpc_dd_add_header(LogDriver *s, const gchar *name, const gchar *value); +gboolean grpc_dd_add_header(LogDriver *s, const gchar *name, LogTemplate *value); +LogTemplateOptions *grpc_dd_get_template_options(LogDriver *d); GrpcClientCredentialsBuilderW *grpc_dd_get_credentials_builder(LogDriver *s); #include "compat/cpp-end.h" diff --git a/modules/grpc/common/grpc-dest.hpp b/modules/grpc/common/grpc-dest.hpp index 4667269aa1..83242b62a9 100644 --- a/modules/grpc/common/grpc-dest.hpp +++ b/modules/grpc/common/grpc-dest.hpp @@ -37,15 +37,43 @@ #include #include +#include namespace syslogng { namespace grpc { +struct NameValueTemplatePair +{ + std::string name; + LogTemplate *value; + + NameValueTemplatePair(std::string name_, LogTemplate *value_) + : name(name_), value(log_template_ref(value_)) {} + + NameValueTemplatePair(const NameValueTemplatePair &a) + : name(a.name), value(log_template_ref(a.value)) {} + + NameValueTemplatePair &operator=(const NameValueTemplatePair &a) + { + name = a.name; + log_template_unref(value); + value = log_template_ref(a.value); + + return *this; + } + + ~NameValueTemplatePair() + { + log_template_unref(value); + } + +}; + class DestDriver { public: DestDriver(GrpcDestDriver *s); - virtual ~DestDriver() {}; + virtual ~DestDriver(); virtual bool init(); virtual bool deinit(); @@ -108,14 +136,42 @@ class DestDriver this->string_extra_channel_args.push_back(std::make_pair(name, value)); } - void add_header(std::string name, std::string value) + bool add_header(std::string name, LogTemplate *value) { + bool is_literal_string = log_template_is_literal_string(value); + + if (!this->dynamic_headers_enabled && !is_literal_string) + return false; + std::transform(name.begin(), name.end(), name.begin(), [](auto c) { return ::tolower(c); }); - this->headers.push_back(std::make_pair(name, value)); + this->headers.push_back(NameValueTemplatePair{name, value}); + + if (!is_literal_string) + this->extend_worker_partition_key(value->template_str); + + return true; + } + + void extend_worker_partition_key(const std::string &extension) + { + if (this->worker_partition_key.rdbuf()->in_avail()) + this->worker_partition_key << ","; + this->worker_partition_key << extension; + } + + void enable_dynamic_headers() + { + this->dynamic_headers_enabled = true; + this->flush_on_key_change = true; + } + + LogTemplateOptions &get_template_options() + { + return this->template_options; } GrpcClientCredentialsBuilderW *get_credentials_builder_wrapper() @@ -123,6 +179,9 @@ class DestDriver return &this->credentials_builder_wrapper; } +private: + bool set_worker_partition_key(); + public: GrpcDestDriver *super; DestDriverMetrics metrics; @@ -139,10 +198,16 @@ class DestDriver int keepalive_timeout; int keepalive_max_pings_without_data; + std::stringstream worker_partition_key; + bool flush_on_key_change; + std::list> int_extra_channel_args; std::list> string_extra_channel_args; - std::list> headers; + std::list headers; + bool dynamic_headers_enabled; + + LogTemplateOptions template_options; GrpcClientCredentialsBuilderW credentials_builder_wrapper; }; diff --git a/modules/grpc/common/grpc-grammar.ym b/modules/grpc/common/grpc-grammar.ym index 8d2511ee76..3d6fa6ea6a 100644 --- a/modules/grpc/common/grpc-grammar.ym +++ b/modules/grpc/common/grpc-grammar.ym @@ -101,6 +101,7 @@ grpc_dest_option | threaded_dest_driver_general_option | threaded_dest_driver_batch_option | threaded_dest_driver_workers_option + | { last_template_options = grpc_dd_get_template_options(last_driver); } template_option ; grpc_keepalive_options @@ -131,7 +132,12 @@ grpc_dest_headers ; grpc_dest_header - : string LL_ARROW string { grpc_dd_add_header(last_driver, $1, $3); free($1); free($3); } + : string LL_ARROW template_name_or_content + { + CHECK_ERROR(grpc_dd_add_header(last_driver, $1, $3), @3, "Failed to set header: templating is not supported"); + free($1); + log_template_unref($3); + } ; grpc_server_credentials_builder_option diff --git a/modules/grpc/common/grpc-source.cpp b/modules/grpc/common/grpc-source.cpp index 313f7e6a09..d4a3d4ed7c 100644 --- a/modules/grpc/common/grpc-source.cpp +++ b/modules/grpc/common/grpc-source.cpp @@ -74,6 +74,8 @@ SourceDriver::prepare_server_builder(::grpc::ServerBuilder &builder) builder.AddListeningPort(address, credentials_builder.build()); + builder.AddChannelArgument(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1); + for (auto nv : int_extra_channel_args) builder.AddChannelArgument(nv.first, nv.second); for (auto nv : string_extra_channel_args) diff --git a/modules/grpc/common/metrics/grpc-metrics.cpp b/modules/grpc/common/metrics/grpc-metrics.cpp index 4085032f4f..2220a20c4c 100644 --- a/modules/grpc/common/metrics/grpc-metrics.cpp +++ b/modules/grpc/common/metrics/grpc-metrics.cpp @@ -99,6 +99,8 @@ DestDriverMetrics::create_grpc_request_cluster(::grpc::StatusCode response_code) StatsCounterItem *counter; cluster = stats_register_counter(stats_level, sc_key, SC_TYPE_SINGLE_VALUE, &counter); + + stats_cluster_key_free(sc_key); } stats_cluster_key_builder_pop(kb); diff --git a/modules/grpc/loki/loki-dest.cpp b/modules/grpc/loki/loki-dest.cpp index ac6dff79aa..292c998884 100644 --- a/modules/grpc/loki/loki-dest.cpp +++ b/modules/grpc/loki/loki-dest.cpp @@ -43,19 +43,18 @@ DestinationDriver::DestinationDriver(GrpcDestDriver *s) : syslogng::grpc::DestDriver(s), timestamp(LM_TS_PROCESSED) { this->url = "localhost:9095"; - log_template_options_defaults(&this->template_options); + this->enable_dynamic_headers(); } DestinationDriver::~DestinationDriver() { - log_template_options_destroy(&this->template_options); log_template_unref(this->message); } void DestinationDriver::add_label(std::string name, LogTemplate *value) { - this->labels.push_back(Label{name, value}); + this->labels.push_back(NameValueTemplatePair{name, value}); } bool @@ -69,33 +68,8 @@ DestinationDriver::init() log_template_compile(this->message, DEFAULT_MESSAGE_TEMPLATE, NULL); } - log_template_options_init(&this->template_options, cfg); - - LogTemplate *worker_partition_key = log_template_new(cfg, NULL); - - std::stringstream template_str; - bool comma_needed = false; for (const auto &label : this->labels) - { - if (comma_needed) - template_str << ","; - template_str << label.name << "=" << label.value->template_str; - - comma_needed = true; - } - - std::string worker_partition_key_str = template_str.str(); - if (!log_template_compile(worker_partition_key, worker_partition_key_str.c_str(), NULL)) - { - msg_error("Error compiling worker partition key template", - evt_tag_str("template", worker_partition_key_str.c_str())); - return false; - } - - if (log_template_is_literal_string(worker_partition_key)) - log_template_unref(worker_partition_key); - else - log_threaded_dest_driver_set_worker_partition_key_ref(&this->super->super.super.super, worker_partition_key); + this->extend_worker_partition_key(label.name + "=" + label.value->template_str); return syslogng::grpc::DestDriver::init(); } @@ -171,14 +145,6 @@ loki_dd_set_tenant_id(LogDriver *d, const gchar *tid) return cpp->set_tenant_id(tid); } -LogTemplateOptions * -loki_dd_get_template_options(LogDriver *d) -{ - GrpcDestDriver *self = (GrpcDestDriver *) d; - DestinationDriver *cpp = loki_dd_get_cpp(self); - return &cpp->get_template_options(); -} - LogDriver * loki_dd_new(GlobalConfig *cfg) { diff --git a/modules/grpc/loki/loki-dest.h b/modules/grpc/loki/loki-dest.h index 2c9773e097..af38b14b03 100644 --- a/modules/grpc/loki/loki-dest.h +++ b/modules/grpc/loki/loki-dest.h @@ -40,8 +40,6 @@ void loki_dd_add_label(LogDriver *d, const gchar *name, LogTemplate *value); gboolean loki_dd_set_timestamp(LogDriver *d, const gchar *t); void loki_dd_set_tenant_id(LogDriver *d, const gchar *tid); -LogTemplateOptions *loki_dd_get_template_options(LogDriver *d); - #include "compat/cpp-end.h" #endif diff --git a/modules/grpc/loki/loki-dest.hpp b/modules/grpc/loki/loki-dest.hpp index 8b4f70d449..0d7b949d0c 100644 --- a/modules/grpc/loki/loki-dest.hpp +++ b/modules/grpc/loki/loki-dest.hpp @@ -43,33 +43,6 @@ namespace syslogng { namespace grpc { namespace loki { -struct Label -{ - std::string name; - LogTemplate *value; - - Label(std::string name_, LogTemplate *value_) - : name(name_), value(log_template_ref(value_)) {} - - Label(const Label &a) - : name(a.name), value(log_template_ref(a.value)) {} - - Label &operator=(const Label &a) - { - name = a.name; - log_template_unref(value); - value = log_template_ref(a.value); - - return *this; - } - - ~Label() - { - log_template_unref(value); - } - -}; - class DestinationDriver final : public syslogng::grpc::DestDriver { public: @@ -82,11 +55,6 @@ class DestinationDriver final : public syslogng::grpc::DestDriver void add_label(std::string name, LogTemplate *value); - LogTemplateOptions &get_template_options() - { - return this->template_options; - } - void set_message_template_ref(LogTemplate *msg) { log_template_unref(this->message); @@ -113,12 +81,11 @@ class DestinationDriver final : public syslogng::grpc::DestDriver private: friend class DestinationWorker; - LogTemplateOptions template_options; std::string tenant_id; LogTemplate *message = nullptr; - std::vector