diff --git a/modules/grpc/pubsub/pubsub-dest-worker.cpp b/modules/grpc/pubsub/pubsub-dest-worker.cpp index 4781d6a4c..ee3e7aea0 100644 --- a/modules/grpc/pubsub/pubsub-dest-worker.cpp +++ b/modules/grpc/pubsub/pubsub-dest-worker.cpp @@ -94,17 +94,14 @@ DestWorker::format_template(LogTemplate *tmpl, LogMessage *msg, GString *value, return Slice{value->str, value->len}; } -LogThreadedResult -DestWorker::insert(LogMessage *msg) +bool +DestWorker::handle_data_attributes(LogMessage *msg, ::google::pubsub::v1::PubsubMessage *message, size_t *message_bytes) { DestDriver *owner_ = this->get_owner(); ScratchBuffersMarker m; GString *buf = scratch_buffers_alloc_and_mark(&m); Slice buf_slice; - size_t message_bytes = 0; - - ::google::pubsub::v1::PubsubMessage *message = this->request.add_messages(); buf_slice = this->format_template(owner_->data, msg, buf, NULL, this->super->super.seq_num); message->set_data(buf_slice.str, buf_slice.len); @@ -119,10 +116,65 @@ DestWorker::insert(LogMessage *msg) } scratch_buffers_reclaim_marked(m); + return true; +} +bool +DestWorker::handle_protovar(LogMessage *msg, ::google::pubsub::v1::PubsubMessage *message, size_t *message_bytes) +{ + DestDriver *owner_ = this->get_owner(); + + LogMessageValueType lmvt; + gssize len; + const gchar *proto = log_template_get_trivial_value_and_type(owner_->protovar, msg, &len, &lmvt); + if (lmvt != LM_VT_PROTOBUF) + { + msg_error("Error loggmessage type is not protobuf", + evt_tag_int("expected_type", LM_VT_PROTOBUF), + evt_tag_int("current_type", lmvt)); + return false; + } + + if (!message->ParsePartialFromArray(proto, len)) + { + msg_error("Unable to deserialize protobuf message", + evt_tag_int("proto_size", len)); + return false; + } + + message_bytes += message->data().length(); + + for (const auto &pair : message->attributes()) + { + const std::string &key = pair.first; + const std::string &value = pair.second; + + message_bytes += key.length() + value.length(); + } + return true; +} + +LogThreadedResult +DestWorker::insert(LogMessage *msg) +{ + DestDriver *owner_ = this->get_owner(); + + size_t message_bytes = 0; + + ::google::pubsub::v1::PubsubMessage *message = this->request.add_messages(); + + if (owner_->protovar) + { + if (!this->handle_protovar(msg, message, &message_bytes)) + return LTR_ERROR; + } + else + { + if (!this->handle_data_attributes(msg, message, &message_bytes)) + return LTR_ERROR; + } this->current_batch_bytes += message_bytes; log_threaded_dest_driver_insert_msg_length_stats(this->super->super.owner, message_bytes); - this->batch_size++; if (!this->client_context.get()) diff --git a/modules/grpc/pubsub/pubsub-dest-worker.hpp b/modules/grpc/pubsub/pubsub-dest-worker.hpp index 848696ba4..1bbc352c2 100644 --- a/modules/grpc/pubsub/pubsub-dest-worker.hpp +++ b/modules/grpc/pubsub/pubsub-dest-worker.hpp @@ -64,6 +64,9 @@ class DestWorker final : public syslogng::grpc::DestWorker ::google::pubsub::v1::PublishRequest request; size_t batch_size = 0; size_t current_batch_bytes = 0; + + bool handle_protovar(LogMessage *msg, ::google::pubsub::v1::PubsubMessage *message, size_t *message_bytes); + bool handle_data_attributes(LogMessage *msg, ::google::pubsub::v1::PubsubMessage *message, size_t *message_bytes); }; } diff --git a/modules/grpc/pubsub/pubsub-dest.cpp b/modules/grpc/pubsub/pubsub-dest.cpp index 5399bfa03..fde0af0f5 100644 --- a/modules/grpc/pubsub/pubsub-dest.cpp +++ b/modules/grpc/pubsub/pubsub-dest.cpp @@ -46,10 +46,9 @@ DestDriver::DestDriver(GrpcDestDriver *s) this->batch_bytes = MAX_BATCH_BYTES; GlobalConfig *cfg = log_pipe_get_config(&s->super.super.super.super); - LogTemplate *default_data_template = log_template_new(cfg, NULL); - g_assert(log_template_compile(default_data_template, "$MESSAGE", NULL)); - this->set_data(default_data_template); - log_template_unref(default_data_template); + this->default_data_template = log_template_new(cfg, NULL); + g_assert(log_template_compile(this->default_data_template, "$MESSAGE", NULL)); + this->set_data(this->default_data_template); } DestDriver::~DestDriver() @@ -57,6 +56,8 @@ DestDriver::~DestDriver() log_template_unref(this->project); log_template_unref(this->topic); log_template_unref(this->data); + log_template_unref(this->protovar); + log_template_unref(this->default_data_template); } bool @@ -77,6 +78,12 @@ DestDriver::init() log_pipe_location_tag(&this->super->super.super.super.super)); return false; } + if ((!this->attributes.empty() || this->data != this->default_data_template) && this->protovar != nullptr) + { + msg_error("Error initializing Google Pub/Sub destination: 'attributes()' and 'data()' cannot be used together with 'protovar()'. Please use either 'attributes()' and 'data()', or 'protovar()', but not both.", + log_pipe_location_tag(&this->super->super.super.super.super)); + return false; + } this->extend_worker_partition_key(std::string("project=") + this->project->template_str); this->extend_worker_partition_key(std::string("topic=") + this->topic->template_str); @@ -151,6 +158,18 @@ pubsub_dd_set_data(LogDriver *d, LogTemplate *data) cpp->set_data(data); } +gboolean +pubsub_dd_set_protovar(LogDriver *d, LogTemplate *proto) +{ + if (!log_template_is_trivial(proto)) + return FALSE; + + GrpcDestDriver *self = (GrpcDestDriver *) d; + DestDriver *cpp = pubsub_dd_get_cpp(self); + cpp->set_protovar(proto); + return TRUE; +} + void pubsub_dd_add_attribute(LogDriver *d, const gchar *name, LogTemplate *value) { diff --git a/modules/grpc/pubsub/pubsub-dest.h b/modules/grpc/pubsub/pubsub-dest.h index 12ae04880..0834cbb80 100644 --- a/modules/grpc/pubsub/pubsub-dest.h +++ b/modules/grpc/pubsub/pubsub-dest.h @@ -35,6 +35,7 @@ LogDriver *pubsub_dd_new(GlobalConfig *cfg); void pubsub_dd_set_project(LogDriver *d, LogTemplate *project); void pubsub_dd_set_topic(LogDriver *d, LogTemplate *topic); void pubsub_dd_set_data(LogDriver *d, LogTemplate *data); +gboolean pubsub_dd_set_protovar(LogDriver *d, LogTemplate *proto); void pubsub_dd_add_attribute(LogDriver *d, const gchar *name, LogTemplate *value); #include "compat/cpp-end.h" diff --git a/modules/grpc/pubsub/pubsub-dest.hpp b/modules/grpc/pubsub/pubsub-dest.hpp index 22ff14fd4..a106322fd 100644 --- a/modules/grpc/pubsub/pubsub-dest.hpp +++ b/modules/grpc/pubsub/pubsub-dest.hpp @@ -62,6 +62,12 @@ class DestDriver final : public syslogng::grpc::DestDriver this->data = log_template_ref(d); } + void set_protovar(LogTemplate *d) + { + log_template_unref(this->data); + this->protovar = log_template_ref(d); + } + void add_attribute(const std::string &name, LogTemplate *value) { this->attributes.push_back(NameValueTemplatePair{name, value}); @@ -74,6 +80,8 @@ class DestDriver final : public syslogng::grpc::DestDriver LogTemplate *project = nullptr; LogTemplate *topic = nullptr; LogTemplate *data = nullptr; + LogTemplate *protovar = nullptr; + LogTemplate *default_data_template = nullptr; std::vector attributes; }; diff --git a/modules/grpc/pubsub/pubsub-grammar.ym b/modules/grpc/pubsub/pubsub-grammar.ym index 7991bde78..8019ee44c 100644 --- a/modules/grpc/pubsub/pubsub-grammar.ym +++ b/modules/grpc/pubsub/pubsub-grammar.ym @@ -56,6 +56,7 @@ %token KW_TOPIC %token KW_DATA %token KW_ATTRIBUTES +%token KW_PROTOVAR %type pubsub_dest @@ -84,6 +85,8 @@ pubsub_dest_option | KW_TOPIC '(' template_name_or_content ')' { pubsub_dd_set_topic(last_driver, $3); log_template_unref($3); } | KW_DATA '(' template_name_or_content ')' { pubsub_dd_set_data(last_driver, $3); log_template_unref($3); } | KW_ATTRIBUTES '(' pubsub_dest_attributes ')' + | KW_PROTOVAR '(' template_name_or_content ')' { CHECK_ERROR(pubsub_dd_set_protovar(last_driver, $3), @1, "format is not trivial"); + log_template_unref($3); } | grpc_dest_general_option ; diff --git a/modules/grpc/pubsub/pubsub-parser.c b/modules/grpc/pubsub/pubsub-parser.c index 5dd4b627f..54943da88 100644 --- a/modules/grpc/pubsub/pubsub-parser.c +++ b/modules/grpc/pubsub/pubsub-parser.c @@ -39,6 +39,7 @@ static CfgLexerKeyword pubsub_keywords[] = { "topic", KW_TOPIC }, { "data", KW_DATA }, { "attributes", KW_ATTRIBUTES }, + { "proto_var", KW_PROTOVAR }, { NULL } };