Skip to content

Commit

Permalink
Merge pull request axoflow#456 from bshifter/fx-pubsub
Browse files Browse the repository at this point in the history
filterx pubsub message
alltilla authored Jan 21, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
2 parents e814ffb + 2a3dfc9 commit 7a1ef06
Showing 16 changed files with 660 additions and 13 deletions.
5 changes: 4 additions & 1 deletion modules/grpc/pubsub/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ add_module(
TARGET pubsub-cpp
SOURCES ${PUBSUB_CPP_SOURCES}
DEPENDS ${MODULE_GRPC_LIBS} grpc-protos grpc-common-cpp
INCLUDES ${PUBSUB_PROTO_BUILDDIR} ${PROJECT_SOURCE_DIR}/modules/grpc
INCLUDES ${GOOGLEAPIS_PROTO_GRPC_SOURCES} ${PROJECT_SOURCE_DIR}/modules/grpc
LIBRARY_TYPE STATIC
)

@@ -35,3 +35,6 @@ add_module(
)

set_target_properties(pubsub PROPERTIES INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/lib;${CMAKE_INSTALL_PREFIX}/lib/syslog-ng")

add_subdirectory(filterx)
target_link_libraries(pubsub PRIVATE pubsub_filterx_message_cpp)
8 changes: 6 additions & 2 deletions modules/grpc/pubsub/Makefile.am
Original file line number Diff line number Diff line change
@@ -42,7 +42,8 @@ modules_grpc_pubsub_libpubsub_la_LIBADD = \
$(MODULE_DEPS_LIBS) \
$(GRPC_COMMON_LIBS) \
$(top_builddir)/modules/grpc/protos/libgrpc-protos.la \
$(top_builddir)/modules/grpc/pubsub/libpubsub_cpp.la
$(top_builddir)/modules/grpc/pubsub/libpubsub_cpp.la \
$(top_builddir)/modules/grpc/pubsub/filterx/libfilterx.la

nodist_EXTRA_modules_grpc_pubsub_libpubsub_la_SOURCES = force-cpp-linker-with-default-stdlib.cpp

@@ -51,7 +52,8 @@ EXTRA_modules_grpc_pubsub_libpubsub_la_DEPENDENCIES = \
$(MODULE_DEPS_LIBS) \
$(GRPC_COMMON_LIBS) \
$(top_builddir)/modules/grpc/protos/libgrpc-protos.la \
$(top_builddir)/modules/grpc/pubsub/libpubsub_cpp.la
$(top_builddir)/modules/grpc/pubsub/libpubsub_cpp.la \
$(top_builddir)/modules/grpc/pubsub/filterx/libfilterx.la

modules/grpc/pubsub modules/grpc/pubsub/ mod-pubsub: modules/grpc/pubsub/libpubsub.la

@@ -71,3 +73,5 @@ EXTRA_DIST += \
modules/grpc/pubsub/CMakeLists.txt

.PHONY: modules/grpc/pubsub/ mod-pubsub

include modules/grpc/pubsub/filterx/Makefile.am
21 changes: 21 additions & 0 deletions modules/grpc/pubsub/filterx/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
if(NOT ENABLE_CPP)
return()
endif()

# C++ code

set(PUBSUB_CPP_SOURCES
object-pubsub.h
object-pubsub-message.cpp
object-pubsub-message.hpp
)

add_module(
TARGET pubsub_filterx_message_cpp
SOURCES ${PUBSUB_CPP_SOURCES}
DEPENDS ${MODULE_GRPC_LIBS} grpc-protos
INCLUDES ${GOOGLEAPIS_PROTO_GRPC_SOURCES} ${PROJECT_SOURCE_DIR}/modules/grpc
LIBRARY_TYPE STATIC
)

set_target_properties(pubsub_filterx_message_cpp PROPERTIES COMPILE_FLAGS "-Wno-deprecated-declarations")
34 changes: 34 additions & 0 deletions modules/grpc/pubsub/filterx/Makefile.am
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
if ENABLE_GRPC

# noinst: Built as part of a larger libpubsub module as static library
noinst_LTLIBRARIES += modules/grpc/pubsub/filterx/libfilterx.la

modules_grpc_pubsub_filterx_libfilterx_la_SOURCES = \
modules/grpc/pubsub/filterx/object-pubsub.h \
modules/grpc/pubsub/filterx/object-pubsub-message.cpp \
modules/grpc/pubsub/filterx/object-pubsub-message.hpp

modules_grpc_pubsub_filterx_libfilterx_la_CXXFLAGS = \
$(AM_CXXFLAGS) \
$(PROTOBUF_CFLAGS) \
$(GRPCPP_CFLAGS) \
-I$(GOOGLEAPIS_PROTO_BUILDDIR) \
-I$(top_srcdir)/modules/grpc/pubsub/filterx \
-I$(top_builddir)/modules/grpc/pubsub/filterx \
-Wno-deprecated-declarations

modules_grpc_pubsub_filterx_libfilterx_la_LIBADD = \
$(MODULE_DEPS_LIBS) \
$(PROTOBUF_LIBS) \
$(top_builddir)/modules/grpc/protos/libgrpc-protos.la

modules_grpc_pubsub_filterx_libfilterx_la_LDFLAGS = $(MODULE_LDFLAGS)

EXTRA_modules_grpc_pubsub_filterx_libfilterx_la_DEPENDENCIES = \
$(MODULE_DEPS_LIBS) \
$(top_builddir)/modules/grpc/protos/libgrpc-protos.la

endif

EXTRA_DIST += \
modules/grpc/pubsub/filterx/CMakeLists.txt
343 changes: 343 additions & 0 deletions modules/grpc/pubsub/filterx/object-pubsub-message.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,343 @@
/*
* Copyright (c) 2025 Axoflow
* Copyright (c) 2025 shifter
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 as published
* by the Free Software Foundation, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
* As an additional exemption you are allowed to compile & link against the
* OpenSSL libraries as published by the OpenSSL project. See the file
* COPYING for details.
*
*/

#include "object-pubsub-message.hpp"

#include "compat/cpp-start.h"

#include "filterx/object-extractor.h"
#include "filterx/object-string.h"
#include "filterx/object-datetime.h"
#include "filterx/object-primitive.h"
#include "filterx/filterx-object-istype.h"
#include "filterx/filterx-ref.h"
#include "scratch-buffers.h"
#include "generic-number.h"

#include "compat/cpp-end.h"

#include <unistd.h>
#include <cstdio>
#include <memory>
#include <stdexcept>
#include <vector>

using namespace syslogng::grpc::pubsub::filterx;

/* C++ Implementations */

Message::Message(FilterXPubSubMessage *super_) : super(super_)
{
}

Message::Message(FilterXPubSubMessage *super_, const std::string data,
const std::map<std::string, std::string> &attributes) : super(super_)
{
message.set_data(data);
for (const auto &pair : attributes)
{
const auto &key = pair.first;
const auto &value = pair.second;

(*message.mutable_attributes())[key] = value;
}
}

Message::Message(FilterXPubSubMessage *super_, FilterXObject *protobuf_object) : super(super_)
{
const gchar *value;
gsize length;
if (!filterx_object_extract_protobuf_ref(protobuf_object, &value, &length))
throw std::runtime_error("Argument is not a protobuf object");

if (!message.ParsePartialFromArray(value, length))
throw std::runtime_error("Failed to parse from protobuf object");
}

Message::Message(const Message &o, FilterXPubSubMessage *super_) : super(super_),
message(o.message)
{
}

std::string
Message::marshal(void)
{
std::string serializedString = this->message.SerializePartialAsString();
return serializedString;
}

const google::pubsub::v1::PubsubMessage &
Message::get_value() const
{
return this->message;
}

bool
Message::set_attribute(const std::string &key, const std::string &value)
{
try
{
if (key.empty())
{
throw std::invalid_argument("Key cannot be empty");
}
if (value.empty())
{
throw std::invalid_argument("Value cannot be empty");
}
auto attributes = message.mutable_attributes();
if (attributes)
{
(*attributes)[key] = value;
}
else
{
throw std::runtime_error("Failed to access mutable attributes");
}
}
catch (const std::exception &e)
{
msg_error("FilterX: Unable to set Pub/Sub attribute", evt_tag_str("error", e.what()));
return false;
}
return true;
}

bool
Message::set_data(const std::string &data)
{
try
{
auto mdata = message.mutable_data();
*mdata = data;
}
catch (const std::exception &e)
{
msg_error("FilterX: Unable to set Pub/Sub data", evt_tag_str("error", e.what()));
return false;
}
return true;
}

// /* C Wrappers */

static void
_free(FilterXObject *s)
{
FilterXPubSubMessage *self = (FilterXPubSubMessage *) s;

delete self->cpp;
self->cpp = NULL;

filterx_object_free_method(s);
}

static gboolean
_truthy(FilterXObject *s)
{
return TRUE;
}

static gboolean
_marshal(FilterXObject *s, GString *repr, LogMessageValueType *t)
{
FilterXPubSubMessage *self = (FilterXPubSubMessage *) s;

std::string serialized = self->cpp->marshal();

g_string_append_len(repr, serialized.c_str(), serialized.length());
*t = LM_VT_PROTOBUF;
return TRUE;
}

static gboolean
_map_to_json(FilterXObject *s, struct json_object **object, FilterXObject **assoc_object)
{
FilterXPubSubMessage *self = (FilterXPubSubMessage *) s;

*object = json_object_new_object();

const std::string data_val = self->cpp->get_value().data();

json_object_object_add(*object, "data", json_object_new_string(data_val.c_str()));

struct json_object *attributes = json_object_new_object();

for (const auto &pair : self->cpp->get_value().attributes())
{
const std::string &key = pair.first;
const std::string &value = pair.second;
json_object_object_add(attributes, key.c_str(), json_object_new_string(value.c_str()));
}

json_object_object_add(*object, "attributes", attributes);

*assoc_object = filterx_json_new_from_object(json_object_get(*object));
return TRUE;
}

static void
_init_instance(FilterXPubSubMessage *self)
{
filterx_object_init_instance(&self->super, &FILTERX_TYPE_NAME(pubsub_message));
}

FilterXObject *
_filterx_pubsub_message_clone(FilterXObject *s)
{
FilterXPubSubMessage *self = (FilterXPubSubMessage *) s;

FilterXPubSubMessage *clone = g_new0(FilterXPubSubMessage, 1);
_init_instance(clone);

try
{
clone->cpp = new Message(*self->cpp, clone);
}
catch (const std::runtime_error &)
{
g_assert_not_reached();
}

return &clone->super;
}

gboolean
_get_repr(FilterXObject *obj, std::string &str, GString *repr)
{
const gchar *buf = NULL;
gsize len;
if (filterx_object_extract_string_ref(obj, &buf, &len))
{
str = std::string(buf, len);
}
else
{
g_string_truncate(repr, 0);
if (!filterx_object_repr(obj, repr))
return FALSE;
str = std::string(repr->str, repr->len);
}
return TRUE;
}

gboolean
_build_map(FilterXObject *key, FilterXObject *val, gpointer user_data)
{

auto *msg = static_cast<syslogng::grpc::pubsub::filterx::Message *>(static_cast<gpointer *>(user_data)[0]);
GString *buf = static_cast<GString *>(static_cast<gpointer *>(user_data)[1]);

std::string key_cpp;
std::string val_cpp;

if (!_get_repr(key, key_cpp, buf))
return FALSE;
if (!_get_repr(val, val_cpp, buf))
return FALSE;

return msg->set_attribute(key_cpp, val_cpp);
}

FilterXObject *
filterx_pubsub_message_new_from_args(FilterXExpr *s, FilterXObject *args[], gsize args_len)
{
FilterXPubSubMessage *self = g_new0(FilterXPubSubMessage, 1);
_init_instance(self);
ScratchBuffersMarker m;
GString *buf = scratch_buffers_alloc_and_mark(&m);
try
{
if (!args || args_len == 0)
{
self->cpp = new Message(self);
}
else if (args_len == 2)
{
FilterXObject *data = args[0];
FilterXObject *attributes = args[1];
FilterXObject *attributes_arg = filterx_ref_unwrap_ro(attributes);
if (filterx_object_is_type(attributes_arg, &FILTERX_TYPE_NAME(dict)))
{
self->cpp = new Message(self);
std::string data_cpp;
gsize len = 0;
const gchar *data_str;
if (filterx_object_extract_string_ref(data, &data_str, &len) ||
filterx_object_extract_bytes_ref(data, &data_str, &len) ||
filterx_object_extract_protobuf_ref(data, &data_str, &len)
)
{
data_cpp = std::string(data_str, len);
}
else
{
LogMessageValueType lmvt;
if (!filterx_object_marshal(data, buf, &lmvt))
{
throw std::runtime_error("unable to parse first argument as string! current type:" + std::string(
log_msg_value_type_to_str(lmvt)));
}
data_cpp = std::string(buf->str, buf->len);
}
self->cpp->set_data(data_cpp);

gpointer user_data[] = {static_cast<gpointer>(self->cpp), static_cast<gpointer>(buf)};
if (!filterx_dict_iter(attributes_arg, _build_map, user_data))
{
throw std::runtime_error("dictionary argument iterator resulted with some error");
}
}
else
{
throw std::runtime_error("Invalid type of arguments");
}
}
else
{
throw std::runtime_error("Invalid number of arguments");
}
scratch_buffers_reclaim_marked(m);
}
catch (const std::runtime_error &e)
{
scratch_buffers_reclaim_marked(m);
msg_error("FilterX: Failed to create Pub/Sub Message object", evt_tag_str("error", e.what()));
filterx_object_unref(&self->super);
return NULL;
}

scratch_buffers_reclaim_marked(m);
return &self->super;
}

FILTERX_SIMPLE_FUNCTION(pubsub_message, filterx_pubsub_message_new_from_args);

FILTERX_DEFINE_TYPE(pubsub_message, FILTERX_TYPE_NAME(object),
.is_mutable = TRUE,
.marshal = _marshal,
.clone = _filterx_pubsub_message_clone,
.map_to_json = _map_to_json,
.truthy = _truthy,
.free_fn = _free,
);
76 changes: 76 additions & 0 deletions modules/grpc/pubsub/filterx/object-pubsub-message.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2025 Axoflow
* Copyright (c) 2025 shifter
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 as published
* by the Free Software Foundation, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
* As an additional exemption you are allowed to compile & link against the
* OpenSSL libraries as published by the OpenSSL project. See the file
* COPYING for details.
*
*/

#ifndef OBJECT_PUBSUB_MESSAGE_HPP
#define OBJECT_PUBSUB_MESSAGE_HPP

#include "syslog-ng.h"

#include "compat/cpp-start.h"
#include "filterx/object-dict-interface.h"
#include "object-pubsub.h"
#include "compat/cpp-end.h"

#include "google/pubsub/v1/pubsub.grpc.pb.h"

typedef struct FilterXPubSubMessage_ FilterXPubSubMessage;

FilterXObject *_filterx_pubsub_message_clone(FilterXObject *s);

namespace syslogng {
namespace grpc {
namespace pubsub {
namespace filterx {

class Message
{
public:
Message(FilterXPubSubMessage *super);
Message(FilterXPubSubMessage *super, const std::string data, const std::map<std::string, std::string> &attributes);
Message(FilterXPubSubMessage *super, FilterXObject *protobuf_object);
Message(Message &o) = delete;
Message(Message &&o) = delete;
std::string marshal(void);
const google::pubsub::v1::PubsubMessage &get_value() const;

bool set_attribute(const std::string &key, const std::string &value);
bool set_data(const std::string &data);
private:
FilterXPubSubMessage *super;
google::pubsub::v1::PubsubMessage message;
Message(const Message &o, FilterXPubSubMessage *super);
friend FilterXObject *::_filterx_pubsub_message_clone(FilterXObject *s);
};

}
}
}
}

struct FilterXPubSubMessage_
{
FilterXObject super;
syslogng::grpc::pubsub::filterx::Message *cpp;
};

#endif
59 changes: 59 additions & 0 deletions modules/grpc/pubsub/filterx/object-pubsub.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2025 Axoflow
* Copyright (c) 2025 shifter
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 as published
* by the Free Software Foundation, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
* As an additional exemption you are allowed to compile & link against the
* OpenSSL libraries as published by the OpenSSL project. See the file
* COPYING for details.
*
*/

#ifndef OBJECT_PUBSUB_H
#define OBJECT_PUBSUB_H

#include "syslog-ng.h"

#include "compat/cpp-start.h"
#include "filterx/filterx-object.h"
#include "filterx/expr-function.h"

FILTERX_SIMPLE_FUNCTION_DECLARE(pubsub_message);

FilterXObject *filterx_pubsub_message_new_from_args(FilterXExpr *s, FilterXObject *args[], gsize args_len);

static inline FilterXObject *
filterx_pubsub_message_new(void)
{
return filterx_pubsub_message_new_from_args(NULL, NULL, 0);
}

FILTERX_DECLARE_TYPE(pubsub_message);

static inline void
pubsub_filterx_objects_global_init(void)
{
static gboolean initialized = FALSE;

if (!initialized)
{
filterx_type_init(&FILTERX_TYPE_NAME(pubsub_message));
initialized = TRUE;
}
}

#include "compat/cpp-end.h"

#endif
64 changes: 58 additions & 6 deletions modules/grpc/pubsub/pubsub-dest-worker.cpp
Original file line number Diff line number Diff line change
@@ -94,17 +94,14 @@ DestWorker::format_template(LogTemplate *tmpl, LogMessage *msg, GString *value,
return Slice{value->str, value->len};
}

LogThreadedResult
DestWorker::insert(LogMessage *msg)
bool
DestWorker::handle_data_attributes(LogMessage *msg, ::google::pubsub::v1::PubsubMessage *message, size_t *message_bytes)
{
DestDriver *owner_ = this->get_owner();

ScratchBuffersMarker m;
GString *buf = scratch_buffers_alloc_and_mark(&m);
Slice buf_slice;
size_t message_bytes = 0;

::google::pubsub::v1::PubsubMessage *message = this->request.add_messages();

buf_slice = this->format_template(owner_->data, msg, buf, NULL, this->super->super.seq_num);
message->set_data(buf_slice.str, buf_slice.len);
@@ -119,10 +116,65 @@ DestWorker::insert(LogMessage *msg)
}

scratch_buffers_reclaim_marked(m);
return true;
}

bool
DestWorker::handle_protovar(LogMessage *msg, ::google::pubsub::v1::PubsubMessage *message, size_t *message_bytes)
{
DestDriver *owner_ = this->get_owner();

LogMessageValueType lmvt;
gssize len;
const gchar *proto = log_template_get_trivial_value_and_type(owner_->protovar, msg, &len, &lmvt);
if (lmvt != LM_VT_PROTOBUF)
{
msg_error("Error loggmessage type is not protobuf",
evt_tag_int("expected_type", LM_VT_PROTOBUF),
evt_tag_int("current_type", lmvt));
return false;
}

if (!message->ParsePartialFromArray(proto, len))
{
msg_error("Unable to deserialize protobuf message",
evt_tag_int("proto_size", len));
return false;
}

message_bytes += message->data().length();

for (const auto &pair : message->attributes())
{
const std::string &key = pair.first;
const std::string &value = pair.second;

message_bytes += key.length() + value.length();
}
return true;
}

LogThreadedResult
DestWorker::insert(LogMessage *msg)
{
DestDriver *owner_ = this->get_owner();

size_t message_bytes = 0;

::google::pubsub::v1::PubsubMessage *message = this->request.add_messages();

if (owner_->protovar)
{
if (!this->handle_protovar(msg, message, &message_bytes))
return LTR_ERROR;
}
else
{
if (!this->handle_data_attributes(msg, message, &message_bytes))
return LTR_ERROR;
}
this->current_batch_bytes += message_bytes;
log_threaded_dest_driver_insert_msg_length_stats(this->super->super.owner, message_bytes);

this->batch_size++;

if (!this->client_context.get())
3 changes: 3 additions & 0 deletions modules/grpc/pubsub/pubsub-dest-worker.hpp
Original file line number Diff line number Diff line change
@@ -64,6 +64,9 @@ class DestWorker final : public syslogng::grpc::DestWorker
::google::pubsub::v1::PublishRequest request;
size_t batch_size = 0;
size_t current_batch_bytes = 0;

bool handle_protovar(LogMessage *msg, ::google::pubsub::v1::PubsubMessage *message, size_t *message_bytes);
bool handle_data_attributes(LogMessage *msg, ::google::pubsub::v1::PubsubMessage *message, size_t *message_bytes);
};

}
27 changes: 23 additions & 4 deletions modules/grpc/pubsub/pubsub-dest.cpp
Original file line number Diff line number Diff line change
@@ -46,17 +46,18 @@ DestDriver::DestDriver(GrpcDestDriver *s)
this->batch_bytes = MAX_BATCH_BYTES;

GlobalConfig *cfg = log_pipe_get_config(&s->super.super.super.super);
LogTemplate *default_data_template = log_template_new(cfg, NULL);
g_assert(log_template_compile(default_data_template, "$MESSAGE", NULL));
this->set_data(default_data_template);
log_template_unref(default_data_template);
this->default_data_template = log_template_new(cfg, NULL);
g_assert(log_template_compile(this->default_data_template, "$MESSAGE", NULL));
this->set_data(this->default_data_template);
}

DestDriver::~DestDriver()
{
log_template_unref(this->project);
log_template_unref(this->topic);
log_template_unref(this->data);
log_template_unref(this->protovar);
log_template_unref(this->default_data_template);
}

bool
@@ -77,6 +78,12 @@ DestDriver::init()
log_pipe_location_tag(&this->super->super.super.super.super));
return false;
}
if ((!this->attributes.empty() || this->data != this->default_data_template) && this->protovar != nullptr)
{
msg_error("Error initializing Google Pub/Sub destination: 'attributes()' and 'data()' cannot be used together with 'protovar()'. Please use either 'attributes()' and 'data()', or 'protovar()', but not both.",
log_pipe_location_tag(&this->super->super.super.super.super));
return false;
}

this->extend_worker_partition_key(std::string("project=") + this->project->template_str);
this->extend_worker_partition_key(std::string("topic=") + this->topic->template_str);
@@ -151,6 +158,18 @@ pubsub_dd_set_data(LogDriver *d, LogTemplate *data)
cpp->set_data(data);
}

gboolean
pubsub_dd_set_protovar(LogDriver *d, LogTemplate *proto)
{
if (!log_template_is_trivial(proto))
return FALSE;

GrpcDestDriver *self = (GrpcDestDriver *) d;
DestDriver *cpp = pubsub_dd_get_cpp(self);
cpp->set_protovar(proto);
return TRUE;
}

void
pubsub_dd_add_attribute(LogDriver *d, const gchar *name, LogTemplate *value)
{
1 change: 1 addition & 0 deletions modules/grpc/pubsub/pubsub-dest.h
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@ LogDriver *pubsub_dd_new(GlobalConfig *cfg);
void pubsub_dd_set_project(LogDriver *d, LogTemplate *project);
void pubsub_dd_set_topic(LogDriver *d, LogTemplate *topic);
void pubsub_dd_set_data(LogDriver *d, LogTemplate *data);
gboolean pubsub_dd_set_protovar(LogDriver *d, LogTemplate *proto);
void pubsub_dd_add_attribute(LogDriver *d, const gchar *name, LogTemplate *value);

#include "compat/cpp-end.h"
8 changes: 8 additions & 0 deletions modules/grpc/pubsub/pubsub-dest.hpp
Original file line number Diff line number Diff line change
@@ -62,6 +62,12 @@ class DestDriver final : public syslogng::grpc::DestDriver
this->data = log_template_ref(d);
}

void set_protovar(LogTemplate *d)
{
log_template_unref(this->data);
this->protovar = log_template_ref(d);
}

void add_attribute(const std::string &name, LogTemplate *value)
{
this->attributes.push_back(NameValueTemplatePair{name, value});
@@ -74,6 +80,8 @@ class DestDriver final : public syslogng::grpc::DestDriver
LogTemplate *project = nullptr;
LogTemplate *topic = nullptr;
LogTemplate *data = nullptr;
LogTemplate *protovar = nullptr;
LogTemplate *default_data_template = nullptr;
std::vector<NameValueTemplatePair> attributes;
};

3 changes: 3 additions & 0 deletions modules/grpc/pubsub/pubsub-grammar.ym
Original file line number Diff line number Diff line change
@@ -56,6 +56,7 @@
%token KW_TOPIC
%token KW_DATA
%token KW_ATTRIBUTES
%token KW_PROTOVAR

%type <ptr> pubsub_dest

@@ -84,6 +85,8 @@ pubsub_dest_option
| KW_TOPIC '(' template_name_or_content ')' { pubsub_dd_set_topic(last_driver, $3); log_template_unref($3); }
| KW_DATA '(' template_name_or_content ')' { pubsub_dd_set_data(last_driver, $3); log_template_unref($3); }
| KW_ATTRIBUTES '(' pubsub_dest_attributes ')'
| KW_PROTOVAR '(' template_name_or_content ')' { CHECK_ERROR(pubsub_dd_set_protovar(last_driver, $3), @1, "format is not trivial");
log_template_unref($3); }
| grpc_dest_general_option
;

1 change: 1 addition & 0 deletions modules/grpc/pubsub/pubsub-parser.c
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@ static CfgLexerKeyword pubsub_keywords[] =
{ "topic", KW_TOPIC },
{ "data", KW_DATA },
{ "attributes", KW_ATTRIBUTES },
{ "proto_var", KW_PROTOVAR },
{ NULL }
};

3 changes: 3 additions & 0 deletions modules/grpc/pubsub/pubsub-plugin.c
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
#include "plugin.h"
#include "plugin-types.h"
#include "protos/apphook.h"
#include "filterx/object-pubsub.h"

extern CfgParser pubsub_parser;

@@ -35,11 +36,13 @@ static Plugin pubsub_plugins[] =
.name = "google_pubsub_grpc",
.parser = &pubsub_parser,
},
FILTERX_SIMPLE_FUNCTION_PLUGIN(pubsub_message),
};

gboolean
google_pubsub_grpc_module_init(PluginContext *context, CfgArgs *args)
{
pubsub_filterx_objects_global_init();
plugin_register(context, pubsub_plugins, G_N_ELEMENTS(pubsub_plugins));
grpc_register_global_initializers();
return TRUE;
17 changes: 17 additions & 0 deletions tests/light/functional_tests/filterx/test_filterx.py
Original file line number Diff line number Diff line change
@@ -2627,3 +2627,20 @@ def test_keys(config, syslog_ng):
r""""direct_access":"foo"}""" + "\n"
)
assert file_true.read_log() == exp


def test_pubsub_message(config, syslog_ng):
(file_true, file_false) = create_config(
config, r"""
$MSG = json();
$MSG.msg = pubsub_message("my pubsub message", {"foo":"bar"});
""",
)
syslog_ng.start(config)

assert file_true.get_stats()["processed"] == 1
assert "processed" not in file_false.get_stats()
exp = (
r"""{"msg":{"data":"my pubsub message","attributes":{"foo":"bar"}}}""" + "\n"
)
assert file_true.read_log() == exp

0 comments on commit 7a1ef06

Please sign in to comment.