Skip to content

Commit

Permalink
grpc: extends pubsub destination with protovar option
Browse files Browse the repository at this point in the history
The protovar option in the Pub/Sub destination allows users to directly access the Protobuf byte representation created by a FilterX function on the source side.

The Protobuf serialized variable is used to set the Pub/Sub message's content.
This enables users to fully control and manage the outgoing messages of the Pub/Sub destination from within FilterX.
By leveraging this option, users can customize the message payload dynamically, based on their specific requirements.
The data() and attributes() options cannot be used with protovar(). These options are mutually exclusive.

Signed-off-by: shifter <[email protected]>
  • Loading branch information
bshifter committed Jan 20, 2025
1 parent 2565ac7 commit dada488
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 10 deletions.
64 changes: 58 additions & 6 deletions modules/grpc/pubsub/pubsub-dest-worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,14 @@ DestWorker::format_template(LogTemplate *tmpl, LogMessage *msg, GString *value,
return Slice{value->str, value->len};
}

LogThreadedResult
DestWorker::insert(LogMessage *msg)
bool
DestWorker::handle_data_attributes(LogMessage *msg, ::google::pubsub::v1::PubsubMessage *message, size_t *message_bytes)
{
DestDriver *owner_ = this->get_owner();

ScratchBuffersMarker m;
GString *buf = scratch_buffers_alloc_and_mark(&m);
Slice buf_slice;
size_t message_bytes = 0;

::google::pubsub::v1::PubsubMessage *message = this->request.add_messages();

buf_slice = this->format_template(owner_->data, msg, buf, NULL, this->super->super.seq_num);
message->set_data(buf_slice.str, buf_slice.len);
Expand All @@ -119,10 +116,65 @@ DestWorker::insert(LogMessage *msg)
}

scratch_buffers_reclaim_marked(m);
return true;
}

bool
DestWorker::handle_protovar(LogMessage *msg, ::google::pubsub::v1::PubsubMessage *message, size_t *message_bytes)
{
DestDriver *owner_ = this->get_owner();

LogMessageValueType lmvt;
gssize len;
const gchar *proto = log_template_get_trivial_value_and_type(owner_->protovar, msg, &len, &lmvt);
if (lmvt != LM_VT_PROTOBUF)
{
msg_error("Error loggmessage type is not protobuf",
evt_tag_int("expected_type", LM_VT_PROTOBUF),
evt_tag_int("current_type", lmvt));
return false;
}

if (!message->ParsePartialFromArray(proto, len))
{
msg_error("Unable to deserialize protobuf message",
evt_tag_int("proto_size", len));
return false;
}

message_bytes += message->data().length();

for (const auto &pair : message->attributes())
{
const std::string &key = pair.first;
const std::string &value = pair.second;

message_bytes += key.length() + value.length();
}
return true;
}

LogThreadedResult
DestWorker::insert(LogMessage *msg)
{
DestDriver *owner_ = this->get_owner();

size_t message_bytes = 0;

::google::pubsub::v1::PubsubMessage *message = this->request.add_messages();

if (owner_->protovar)
{
if (!this->handle_protovar(msg, message, &message_bytes))
return LTR_ERROR;
}
else
{
if (!this->handle_data_attributes(msg, message, &message_bytes))
return LTR_ERROR;
}
this->current_batch_bytes += message_bytes;
log_threaded_dest_driver_insert_msg_length_stats(this->super->super.owner, message_bytes);

this->batch_size++;

if (!this->client_context.get())
Expand Down
3 changes: 3 additions & 0 deletions modules/grpc/pubsub/pubsub-dest-worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class DestWorker final : public syslogng::grpc::DestWorker
::google::pubsub::v1::PublishRequest request;
size_t batch_size = 0;
size_t current_batch_bytes = 0;

bool handle_protovar(LogMessage *msg, ::google::pubsub::v1::PubsubMessage *message, size_t *message_bytes);
bool handle_data_attributes(LogMessage *msg, ::google::pubsub::v1::PubsubMessage *message, size_t *message_bytes);
};

}
Expand Down
27 changes: 23 additions & 4 deletions modules/grpc/pubsub/pubsub-dest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,18 @@ DestDriver::DestDriver(GrpcDestDriver *s)
this->batch_bytes = MAX_BATCH_BYTES;

GlobalConfig *cfg = log_pipe_get_config(&s->super.super.super.super);
LogTemplate *default_data_template = log_template_new(cfg, NULL);
g_assert(log_template_compile(default_data_template, "$MESSAGE", NULL));
this->set_data(default_data_template);
log_template_unref(default_data_template);
this->default_data_template = log_template_new(cfg, NULL);
g_assert(log_template_compile(this->default_data_template, "$MESSAGE", NULL));
this->set_data(this->default_data_template);
}

DestDriver::~DestDriver()
{
log_template_unref(this->project);
log_template_unref(this->topic);
log_template_unref(this->data);
log_template_unref(this->protovar);
log_template_unref(this->default_data_template);
}

bool
Expand All @@ -77,6 +78,12 @@ DestDriver::init()
log_pipe_location_tag(&this->super->super.super.super.super));
return false;
}
if ((!this->attributes.empty() || this->data != this->default_data_template) && this->protovar != nullptr)
{
msg_error("Error initializing Google Pub/Sub destination: 'attributes()' and 'data()' cannot be used together with 'protovar()'. Please use either 'attributes()' and 'data()', or 'protovar()', but not both.",
log_pipe_location_tag(&this->super->super.super.super.super));
return false;
}

this->extend_worker_partition_key(std::string("project=") + this->project->template_str);
this->extend_worker_partition_key(std::string("topic=") + this->topic->template_str);
Expand Down Expand Up @@ -151,6 +158,18 @@ pubsub_dd_set_data(LogDriver *d, LogTemplate *data)
cpp->set_data(data);
}

gboolean
pubsub_dd_set_protovar(LogDriver *d, LogTemplate *proto)
{
if (!log_template_is_trivial(proto))
return FALSE;

GrpcDestDriver *self = (GrpcDestDriver *) d;
DestDriver *cpp = pubsub_dd_get_cpp(self);
cpp->set_protovar(proto);
return TRUE;
}

void
pubsub_dd_add_attribute(LogDriver *d, const gchar *name, LogTemplate *value)
{
Expand Down
1 change: 1 addition & 0 deletions modules/grpc/pubsub/pubsub-dest.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ LogDriver *pubsub_dd_new(GlobalConfig *cfg);
void pubsub_dd_set_project(LogDriver *d, LogTemplate *project);
void pubsub_dd_set_topic(LogDriver *d, LogTemplate *topic);
void pubsub_dd_set_data(LogDriver *d, LogTemplate *data);
gboolean pubsub_dd_set_protovar(LogDriver *d, LogTemplate *proto);
void pubsub_dd_add_attribute(LogDriver *d, const gchar *name, LogTemplate *value);

#include "compat/cpp-end.h"
Expand Down
8 changes: 8 additions & 0 deletions modules/grpc/pubsub/pubsub-dest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ class DestDriver final : public syslogng::grpc::DestDriver
this->data = log_template_ref(d);
}

void set_protovar(LogTemplate *d)
{
log_template_unref(this->data);
this->protovar = log_template_ref(d);
}

void add_attribute(const std::string &name, LogTemplate *value)
{
this->attributes.push_back(NameValueTemplatePair{name, value});
Expand All @@ -74,6 +80,8 @@ class DestDriver final : public syslogng::grpc::DestDriver
LogTemplate *project = nullptr;
LogTemplate *topic = nullptr;
LogTemplate *data = nullptr;
LogTemplate *protovar = nullptr;
LogTemplate *default_data_template = nullptr;
std::vector<NameValueTemplatePair> attributes;
};

Expand Down
3 changes: 3 additions & 0 deletions modules/grpc/pubsub/pubsub-grammar.ym
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
%token KW_TOPIC
%token KW_DATA
%token KW_ATTRIBUTES
%token KW_PROTOVAR

%type <ptr> pubsub_dest

Expand Down Expand Up @@ -84,6 +85,8 @@ pubsub_dest_option
| KW_TOPIC '(' template_name_or_content ')' { pubsub_dd_set_topic(last_driver, $3); log_template_unref($3); }
| KW_DATA '(' template_name_or_content ')' { pubsub_dd_set_data(last_driver, $3); log_template_unref($3); }
| KW_ATTRIBUTES '(' pubsub_dest_attributes ')'
| KW_PROTOVAR '(' template_name_or_content ')' { CHECK_ERROR(pubsub_dd_set_protovar(last_driver, $3), @1, "format is not trivial");
log_template_unref($3); }
| grpc_dest_general_option
;

Expand Down
1 change: 1 addition & 0 deletions modules/grpc/pubsub/pubsub-parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ static CfgLexerKeyword pubsub_keywords[] =
{ "topic", KW_TOPIC },
{ "data", KW_DATA },
{ "attributes", KW_ATTRIBUTES },
{ "proto_var", KW_PROTOVAR },
{ NULL }
};

Expand Down

0 comments on commit dada488

Please sign in to comment.