From 877467c46bd5dbd4fe25936efc16381d696bb46b Mon Sep 17 00:00:00 2001 From: shifter Date: Wed, 15 Jan 2025 08:33:18 +0100 Subject: [PATCH 1/4] grpc: fix grpc sources dir Signed-off-by: shifter --- modules/grpc/pubsub/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/grpc/pubsub/CMakeLists.txt b/modules/grpc/pubsub/CMakeLists.txt index 6971c7901..d138757f5 100644 --- a/modules/grpc/pubsub/CMakeLists.txt +++ b/modules/grpc/pubsub/CMakeLists.txt @@ -21,7 +21,7 @@ add_module( TARGET pubsub-cpp SOURCES ${PUBSUB_CPP_SOURCES} DEPENDS ${MODULE_GRPC_LIBS} grpc-protos grpc-common-cpp - INCLUDES ${PUBSUB_PROTO_BUILDDIR} ${PROJECT_SOURCE_DIR}/modules/grpc + INCLUDES ${GOOGLEAPIS_PROTO_GRPC_SOURCES} ${PROJECT_SOURCE_DIR}/modules/grpc LIBRARY_TYPE STATIC ) From c1eaaff019e42ddc6d47b6e34b8ebdccf4f63c51 Mon Sep 17 00:00:00 2001 From: shifter Date: Wed, 15 Jan 2025 08:39:51 +0100 Subject: [PATCH 2/4] filterx: add google pubsub protobuf object wrapper The implementation follows the existing patterns and designs of the OpenTelemetry (OTel) filterx modules for consistency. Signed-off-by: shifter --- modules/grpc/pubsub/CMakeLists.txt | 3 + modules/grpc/pubsub/Makefile.am | 8 +- modules/grpc/pubsub/filterx/CMakeLists.txt | 21 ++ modules/grpc/pubsub/filterx/Makefile.am | 34 ++ .../pubsub/filterx/object-pubsub-message.cpp | 343 ++++++++++++++++++ .../pubsub/filterx/object-pubsub-message.hpp | 76 ++++ modules/grpc/pubsub/filterx/object-pubsub.h | 59 +++ modules/grpc/pubsub/pubsub-plugin.c | 3 + 8 files changed, 545 insertions(+), 2 deletions(-) create mode 100644 modules/grpc/pubsub/filterx/CMakeLists.txt create mode 100644 modules/grpc/pubsub/filterx/Makefile.am create mode 100644 modules/grpc/pubsub/filterx/object-pubsub-message.cpp create mode 100644 modules/grpc/pubsub/filterx/object-pubsub-message.hpp create mode 100644 modules/grpc/pubsub/filterx/object-pubsub.h diff --git a/modules/grpc/pubsub/CMakeLists.txt b/modules/grpc/pubsub/CMakeLists.txt index d138757f5..5a4772dec 100644 --- a/modules/grpc/pubsub/CMakeLists.txt +++ b/modules/grpc/pubsub/CMakeLists.txt @@ -35,3 +35,6 @@ add_module( ) set_target_properties(pubsub PROPERTIES INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/lib;${CMAKE_INSTALL_PREFIX}/lib/syslog-ng") + +add_subdirectory(filterx) +target_link_libraries(pubsub PRIVATE pubsub_filterx_message_cpp) diff --git a/modules/grpc/pubsub/Makefile.am b/modules/grpc/pubsub/Makefile.am index 3854cd9a2..6d410bac4 100644 --- a/modules/grpc/pubsub/Makefile.am +++ b/modules/grpc/pubsub/Makefile.am @@ -42,7 +42,8 @@ modules_grpc_pubsub_libpubsub_la_LIBADD = \ $(MODULE_DEPS_LIBS) \ $(GRPC_COMMON_LIBS) \ $(top_builddir)/modules/grpc/protos/libgrpc-protos.la \ - $(top_builddir)/modules/grpc/pubsub/libpubsub_cpp.la + $(top_builddir)/modules/grpc/pubsub/libpubsub_cpp.la \ + $(top_builddir)/modules/grpc/pubsub/filterx/libfilterx.la nodist_EXTRA_modules_grpc_pubsub_libpubsub_la_SOURCES = force-cpp-linker-with-default-stdlib.cpp @@ -51,7 +52,8 @@ EXTRA_modules_grpc_pubsub_libpubsub_la_DEPENDENCIES = \ $(MODULE_DEPS_LIBS) \ $(GRPC_COMMON_LIBS) \ $(top_builddir)/modules/grpc/protos/libgrpc-protos.la \ - $(top_builddir)/modules/grpc/pubsub/libpubsub_cpp.la + $(top_builddir)/modules/grpc/pubsub/libpubsub_cpp.la \ + $(top_builddir)/modules/grpc/pubsub/filterx/libfilterx.la modules/grpc/pubsub modules/grpc/pubsub/ mod-pubsub: modules/grpc/pubsub/libpubsub.la @@ -71,3 +73,5 @@ EXTRA_DIST += \ modules/grpc/pubsub/CMakeLists.txt .PHONY: modules/grpc/pubsub/ mod-pubsub + +include modules/grpc/pubsub/filterx/Makefile.am diff --git a/modules/grpc/pubsub/filterx/CMakeLists.txt b/modules/grpc/pubsub/filterx/CMakeLists.txt new file mode 100644 index 000000000..7d8a6ec82 --- /dev/null +++ b/modules/grpc/pubsub/filterx/CMakeLists.txt @@ -0,0 +1,21 @@ +if(NOT ENABLE_CPP) + return() +endif() + +# C++ code + +set(PUBSUB_CPP_SOURCES + object-pubsub.h + object-pubsub-message.cpp + object-pubsub-message.hpp +) + +add_module( + TARGET pubsub_filterx_message_cpp + SOURCES ${PUBSUB_CPP_SOURCES} + DEPENDS ${MODULE_GRPC_LIBS} grpc-protos + INCLUDES ${GOOGLEAPIS_PROTO_GRPC_SOURCES} ${PROJECT_SOURCE_DIR}/modules/grpc + LIBRARY_TYPE STATIC +) + +set_target_properties(pubsub_filterx_message_cpp PROPERTIES COMPILE_FLAGS "-Wno-deprecated-declarations") diff --git a/modules/grpc/pubsub/filterx/Makefile.am b/modules/grpc/pubsub/filterx/Makefile.am new file mode 100644 index 000000000..64adf230d --- /dev/null +++ b/modules/grpc/pubsub/filterx/Makefile.am @@ -0,0 +1,34 @@ +if ENABLE_GRPC + +# noinst: Built as part of a larger libpubsub module as static library +noinst_LTLIBRARIES += modules/grpc/pubsub/filterx/libfilterx.la + +modules_grpc_pubsub_filterx_libfilterx_la_SOURCES = \ + modules/grpc/pubsub/filterx/object-pubsub.h \ + modules/grpc/pubsub/filterx/object-pubsub-message.cpp \ + modules/grpc/pubsub/filterx/object-pubsub-message.hpp + +modules_grpc_pubsub_filterx_libfilterx_la_CXXFLAGS = \ + $(AM_CXXFLAGS) \ + $(PROTOBUF_CFLAGS) \ + $(GRPCPP_CFLAGS) \ + -I$(GOOGLEAPIS_PROTO_BUILDDIR) \ + -I$(top_srcdir)/modules/grpc/pubsub/filterx \ + -I$(top_builddir)/modules/grpc/pubsub/filterx \ + -Wno-deprecated-declarations + +modules_grpc_pubsub_filterx_libfilterx_la_LIBADD = \ + $(MODULE_DEPS_LIBS) \ + $(PROTOBUF_LIBS) \ + $(top_builddir)/modules/grpc/protos/libgrpc-protos.la + +modules_grpc_pubsub_filterx_libfilterx_la_LDFLAGS = $(MODULE_LDFLAGS) + +EXTRA_modules_grpc_pubsub_filterx_libfilterx_la_DEPENDENCIES = \ + $(MODULE_DEPS_LIBS) \ + $(top_builddir)/modules/grpc/protos/libgrpc-protos.la + +endif + +EXTRA_DIST += \ + modules/grpc/pubsub/filterx/CMakeLists.txt diff --git a/modules/grpc/pubsub/filterx/object-pubsub-message.cpp b/modules/grpc/pubsub/filterx/object-pubsub-message.cpp new file mode 100644 index 000000000..054504def --- /dev/null +++ b/modules/grpc/pubsub/filterx/object-pubsub-message.cpp @@ -0,0 +1,343 @@ +/* + * Copyright (c) 2025 Axoflow + * Copyright (c) 2025 shifter + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#include "object-pubsub-message.hpp" + +#include "compat/cpp-start.h" + +#include "filterx/object-extractor.h" +#include "filterx/object-string.h" +#include "filterx/object-datetime.h" +#include "filterx/object-primitive.h" +#include "filterx/filterx-object-istype.h" +#include "filterx/filterx-ref.h" +#include "scratch-buffers.h" +#include "generic-number.h" + +#include "compat/cpp-end.h" + +#include +#include +#include +#include +#include + +using namespace syslogng::grpc::pubsub::filterx; + +/* C++ Implementations */ + +Message::Message(FilterXPubSubMessage *super_) : super(super_) +{ +} + +Message::Message(FilterXPubSubMessage *super_, const std::string data, + const std::map &attributes) : super(super_) +{ + message.set_data(data); + for (const auto &pair : attributes) + { + const auto &key = pair.first; + const auto &value = pair.second; + + (*message.mutable_attributes())[key] = value; + } +} + +Message::Message(FilterXPubSubMessage *super_, FilterXObject *protobuf_object) : super(super_) +{ + const gchar *value; + gsize length; + if (!filterx_object_extract_protobuf_ref(protobuf_object, &value, &length)) + throw std::runtime_error("Argument is not a protobuf object"); + + if (!message.ParsePartialFromArray(value, length)) + throw std::runtime_error("Failed to parse from protobuf object"); +} + +Message::Message(const Message &o, FilterXPubSubMessage *super_) : super(super_), + message(o.message) +{ +} + +std::string +Message::marshal(void) +{ + std::string serializedString = this->message.SerializePartialAsString(); + return serializedString; +} + +const google::pubsub::v1::PubsubMessage & +Message::get_value() const +{ + return this->message; +} + +bool +Message::set_attribute(const std::string &key, const std::string &value) +{ + try + { + if (key.empty()) + { + throw std::invalid_argument("Key cannot be empty"); + } + if (value.empty()) + { + throw std::invalid_argument("Value cannot be empty"); + } + auto attributes = message.mutable_attributes(); + if (attributes) + { + (*attributes)[key] = value; + } + else + { + throw std::runtime_error("Failed to access mutable attributes"); + } + } + catch (const std::exception &e) + { + msg_error("FilterX: Unable to set Pub/Sub attribute", evt_tag_str("error", e.what())); + return false; + } + return true; +} + +bool +Message::set_data(const std::string &data) +{ + try + { + auto mdata = message.mutable_data(); + *mdata = data; + } + catch (const std::exception &e) + { + msg_error("FilterX: Unable to set Pub/Sub data", evt_tag_str("error", e.what())); + return false; + } + return true; +} + +// /* C Wrappers */ + +static void +_free(FilterXObject *s) +{ + FilterXPubSubMessage *self = (FilterXPubSubMessage *) s; + + delete self->cpp; + self->cpp = NULL; + + filterx_object_free_method(s); +} + +static gboolean +_truthy(FilterXObject *s) +{ + return TRUE; +} + +static gboolean +_marshal(FilterXObject *s, GString *repr, LogMessageValueType *t) +{ + FilterXPubSubMessage *self = (FilterXPubSubMessage *) s; + + std::string serialized = self->cpp->marshal(); + + g_string_append_len(repr, serialized.c_str(), serialized.length()); + *t = LM_VT_PROTOBUF; + return TRUE; +} + +static gboolean +_map_to_json(FilterXObject *s, struct json_object **object, FilterXObject **assoc_object) +{ + FilterXPubSubMessage *self = (FilterXPubSubMessage *) s; + + *object = json_object_new_object(); + + const std::string data_val = self->cpp->get_value().data(); + + json_object_object_add(*object, "data", json_object_new_string(data_val.c_str())); + + struct json_object *attributes = json_object_new_object(); + + for (const auto &pair : self->cpp->get_value().attributes()) + { + const std::string &key = pair.first; + const std::string &value = pair.second; + json_object_object_add(attributes, key.c_str(), json_object_new_string(value.c_str())); + } + + json_object_object_add(*object, "attributes", attributes); + + *assoc_object = filterx_json_new_from_object(json_object_get(*object)); + return TRUE; +} + +static void +_init_instance(FilterXPubSubMessage *self) +{ + filterx_object_init_instance(&self->super, &FILTERX_TYPE_NAME(pubsub_message)); +} + +FilterXObject * +_filterx_pubsub_message_clone(FilterXObject *s) +{ + FilterXPubSubMessage *self = (FilterXPubSubMessage *) s; + + FilterXPubSubMessage *clone = g_new0(FilterXPubSubMessage, 1); + _init_instance(clone); + + try + { + clone->cpp = new Message(*self->cpp, clone); + } + catch (const std::runtime_error &) + { + g_assert_not_reached(); + } + + return &clone->super; +} + +gboolean +_get_repr(FilterXObject *obj, std::string &str, GString *repr) +{ + const gchar *buf = NULL; + gsize len; + if (filterx_object_extract_string_ref(obj, &buf, &len)) + { + str = std::string(buf, len); + } + else + { + g_string_truncate(repr, 0); + if (!filterx_object_repr(obj, repr)) + return FALSE; + str = std::string(repr->str, repr->len); + } + return TRUE; +} + +gboolean +_build_map(FilterXObject *key, FilterXObject *val, gpointer user_data) +{ + + auto *msg = static_cast(static_cast(user_data)[0]); + GString *buf = static_cast(static_cast(user_data)[1]); + + std::string key_cpp; + std::string val_cpp; + + if (!_get_repr(key, key_cpp, buf)) + return FALSE; + if (!_get_repr(val, val_cpp, buf)) + return FALSE; + + return msg->set_attribute(key_cpp, val_cpp); +} + +FilterXObject * +filterx_pubsub_message_new_from_args(FilterXExpr *s, FilterXObject *args[], gsize args_len) +{ + FilterXPubSubMessage *self = g_new0(FilterXPubSubMessage, 1); + _init_instance(self); + ScratchBuffersMarker m; + GString *buf = scratch_buffers_alloc_and_mark(&m); + try + { + if (!args || args_len == 0) + { + self->cpp = new Message(self); + } + else if (args_len == 2) + { + FilterXObject *data = args[0]; + FilterXObject *attributes = args[1]; + FilterXObject *attributes_arg = filterx_ref_unwrap_ro(attributes); + if (filterx_object_is_type(attributes_arg, &FILTERX_TYPE_NAME(dict))) + { + self->cpp = new Message(self); + std::string data_cpp; + gsize len = 0; + const gchar *data_str; + if (filterx_object_extract_string_ref(data, &data_str, &len) || + filterx_object_extract_bytes_ref(data, &data_str, &len) || + filterx_object_extract_protobuf_ref(data, &data_str, &len) + ) + { + data_cpp = std::string(data_str, len); + } + else + { + LogMessageValueType lmvt; + if (!filterx_object_marshal(data, buf, &lmvt)) + { + throw std::runtime_error("unable to parse first argument as string! current type:" + std::string( + log_msg_value_type_to_str(lmvt))); + } + data_cpp = std::string(buf->str, buf->len); + } + self->cpp->set_data(data_cpp); + + gpointer user_data[] = {static_cast(self->cpp), static_cast(buf)}; + if (!filterx_dict_iter(attributes_arg, _build_map, user_data)) + { + throw std::runtime_error("dictionary argument iterator resulted with some error"); + } + } + else + { + throw std::runtime_error("Invalid type of arguments"); + } + } + else + { + throw std::runtime_error("Invalid number of arguments"); + } + scratch_buffers_reclaim_marked(m); + } + catch (const std::runtime_error &e) + { + scratch_buffers_reclaim_marked(m); + msg_error("FilterX: Failed to create Pub/Sub Message object", evt_tag_str("error", e.what())); + filterx_object_unref(&self->super); + return NULL; + } + + scratch_buffers_reclaim_marked(m); + return &self->super; +} + +FILTERX_SIMPLE_FUNCTION(pubsub_message, filterx_pubsub_message_new_from_args); + +FILTERX_DEFINE_TYPE(pubsub_message, FILTERX_TYPE_NAME(object), + .is_mutable = TRUE, + .marshal = _marshal, + .clone = _filterx_pubsub_message_clone, + .map_to_json = _map_to_json, + .truthy = _truthy, + .free_fn = _free, + ); diff --git a/modules/grpc/pubsub/filterx/object-pubsub-message.hpp b/modules/grpc/pubsub/filterx/object-pubsub-message.hpp new file mode 100644 index 000000000..6134c14b4 --- /dev/null +++ b/modules/grpc/pubsub/filterx/object-pubsub-message.hpp @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2025 Axoflow + * Copyright (c) 2025 shifter + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#ifndef OBJECT_PUBSUB_MESSAGE_HPP +#define OBJECT_PUBSUB_MESSAGE_HPP + +#include "syslog-ng.h" + +#include "compat/cpp-start.h" +#include "filterx/object-dict-interface.h" +#include "object-pubsub.h" +#include "compat/cpp-end.h" + +#include "google/pubsub/v1/pubsub.grpc.pb.h" + +typedef struct FilterXPubSubMessage_ FilterXPubSubMessage; + +FilterXObject *_filterx_pubsub_message_clone(FilterXObject *s); + +namespace syslogng { +namespace grpc { +namespace pubsub { +namespace filterx { + +class Message +{ +public: + Message(FilterXPubSubMessage *super); + Message(FilterXPubSubMessage *super, const std::string data, const std::map &attributes); + Message(FilterXPubSubMessage *super, FilterXObject *protobuf_object); + Message(Message &o) = delete; + Message(Message &&o) = delete; + std::string marshal(void); + const google::pubsub::v1::PubsubMessage &get_value() const; + + bool set_attribute(const std::string &key, const std::string &value); + bool set_data(const std::string &data); +private: + FilterXPubSubMessage *super; + google::pubsub::v1::PubsubMessage message; + Message(const Message &o, FilterXPubSubMessage *super); + friend FilterXObject *::_filterx_pubsub_message_clone(FilterXObject *s); +}; + +} +} +} +} + +struct FilterXPubSubMessage_ +{ + FilterXObject super; + syslogng::grpc::pubsub::filterx::Message *cpp; +}; + +#endif diff --git a/modules/grpc/pubsub/filterx/object-pubsub.h b/modules/grpc/pubsub/filterx/object-pubsub.h new file mode 100644 index 000000000..c590a75f6 --- /dev/null +++ b/modules/grpc/pubsub/filterx/object-pubsub.h @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2025 Axoflow + * Copyright (c) 2025 shifter + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#ifndef OBJECT_PUBSUB_H +#define OBJECT_PUBSUB_H + +#include "syslog-ng.h" + +#include "compat/cpp-start.h" +#include "filterx/filterx-object.h" +#include "filterx/expr-function.h" + +FILTERX_SIMPLE_FUNCTION_DECLARE(pubsub_message); + +FilterXObject *filterx_pubsub_message_new_from_args(FilterXExpr *s, FilterXObject *args[], gsize args_len); + +static inline FilterXObject * +filterx_pubsub_message_new(void) +{ + return filterx_pubsub_message_new_from_args(NULL, NULL, 0); +} + +FILTERX_DECLARE_TYPE(pubsub_message); + +static inline void +pubsub_filterx_objects_global_init(void) +{ + static gboolean initialized = FALSE; + + if (!initialized) + { + filterx_type_init(&FILTERX_TYPE_NAME(pubsub_message)); + initialized = TRUE; + } +} + +#include "compat/cpp-end.h" + +#endif diff --git a/modules/grpc/pubsub/pubsub-plugin.c b/modules/grpc/pubsub/pubsub-plugin.c index f87c3a656..55680fdce 100644 --- a/modules/grpc/pubsub/pubsub-plugin.c +++ b/modules/grpc/pubsub/pubsub-plugin.c @@ -25,6 +25,7 @@ #include "plugin.h" #include "plugin-types.h" #include "protos/apphook.h" +#include "filterx/object-pubsub.h" extern CfgParser pubsub_parser; @@ -35,11 +36,13 @@ static Plugin pubsub_plugins[] = .name = "google_pubsub_grpc", .parser = &pubsub_parser, }, + FILTERX_SIMPLE_FUNCTION_PLUGIN(pubsub_message), }; gboolean google_pubsub_grpc_module_init(PluginContext *context, CfgArgs *args) { + pubsub_filterx_objects_global_init(); plugin_register(context, pubsub_plugins, G_N_ELEMENTS(pubsub_plugins)); grpc_register_global_initializers(); return TRUE; From ab56249485682c462e698f27b5553ef6ed1e1137 Mon Sep 17 00:00:00 2001 From: shifter Date: Wed, 15 Jan 2025 08:49:38 +0100 Subject: [PATCH 3/4] grpc: extends pubsub destination with protovar option The protovar option in the Pub/Sub destination allows users to directly access the Protobuf byte representation created by a FilterX function on the source side. The Protobuf serialized variable is used to set the Pub/Sub message's content. This enables users to fully control and manage the outgoing messages of the Pub/Sub destination from within FilterX. By leveraging this option, users can customize the message payload dynamically, based on their specific requirements. The data() and attributes() options cannot be used with protovar(). These options are mutually exclusive. Signed-off-by: shifter --- modules/grpc/pubsub/pubsub-dest-worker.cpp | 64 ++++++++++++++++++++-- modules/grpc/pubsub/pubsub-dest-worker.hpp | 3 + modules/grpc/pubsub/pubsub-dest.cpp | 27 +++++++-- modules/grpc/pubsub/pubsub-dest.h | 1 + modules/grpc/pubsub/pubsub-dest.hpp | 8 +++ modules/grpc/pubsub/pubsub-grammar.ym | 3 + modules/grpc/pubsub/pubsub-parser.c | 1 + 7 files changed, 97 insertions(+), 10 deletions(-) diff --git a/modules/grpc/pubsub/pubsub-dest-worker.cpp b/modules/grpc/pubsub/pubsub-dest-worker.cpp index 4781d6a4c..ee3e7aea0 100644 --- a/modules/grpc/pubsub/pubsub-dest-worker.cpp +++ b/modules/grpc/pubsub/pubsub-dest-worker.cpp @@ -94,17 +94,14 @@ DestWorker::format_template(LogTemplate *tmpl, LogMessage *msg, GString *value, return Slice{value->str, value->len}; } -LogThreadedResult -DestWorker::insert(LogMessage *msg) +bool +DestWorker::handle_data_attributes(LogMessage *msg, ::google::pubsub::v1::PubsubMessage *message, size_t *message_bytes) { DestDriver *owner_ = this->get_owner(); ScratchBuffersMarker m; GString *buf = scratch_buffers_alloc_and_mark(&m); Slice buf_slice; - size_t message_bytes = 0; - - ::google::pubsub::v1::PubsubMessage *message = this->request.add_messages(); buf_slice = this->format_template(owner_->data, msg, buf, NULL, this->super->super.seq_num); message->set_data(buf_slice.str, buf_slice.len); @@ -119,10 +116,65 @@ DestWorker::insert(LogMessage *msg) } scratch_buffers_reclaim_marked(m); + return true; +} +bool +DestWorker::handle_protovar(LogMessage *msg, ::google::pubsub::v1::PubsubMessage *message, size_t *message_bytes) +{ + DestDriver *owner_ = this->get_owner(); + + LogMessageValueType lmvt; + gssize len; + const gchar *proto = log_template_get_trivial_value_and_type(owner_->protovar, msg, &len, &lmvt); + if (lmvt != LM_VT_PROTOBUF) + { + msg_error("Error loggmessage type is not protobuf", + evt_tag_int("expected_type", LM_VT_PROTOBUF), + evt_tag_int("current_type", lmvt)); + return false; + } + + if (!message->ParsePartialFromArray(proto, len)) + { + msg_error("Unable to deserialize protobuf message", + evt_tag_int("proto_size", len)); + return false; + } + + message_bytes += message->data().length(); + + for (const auto &pair : message->attributes()) + { + const std::string &key = pair.first; + const std::string &value = pair.second; + + message_bytes += key.length() + value.length(); + } + return true; +} + +LogThreadedResult +DestWorker::insert(LogMessage *msg) +{ + DestDriver *owner_ = this->get_owner(); + + size_t message_bytes = 0; + + ::google::pubsub::v1::PubsubMessage *message = this->request.add_messages(); + + if (owner_->protovar) + { + if (!this->handle_protovar(msg, message, &message_bytes)) + return LTR_ERROR; + } + else + { + if (!this->handle_data_attributes(msg, message, &message_bytes)) + return LTR_ERROR; + } this->current_batch_bytes += message_bytes; log_threaded_dest_driver_insert_msg_length_stats(this->super->super.owner, message_bytes); - this->batch_size++; if (!this->client_context.get()) diff --git a/modules/grpc/pubsub/pubsub-dest-worker.hpp b/modules/grpc/pubsub/pubsub-dest-worker.hpp index 848696ba4..1bbc352c2 100644 --- a/modules/grpc/pubsub/pubsub-dest-worker.hpp +++ b/modules/grpc/pubsub/pubsub-dest-worker.hpp @@ -64,6 +64,9 @@ class DestWorker final : public syslogng::grpc::DestWorker ::google::pubsub::v1::PublishRequest request; size_t batch_size = 0; size_t current_batch_bytes = 0; + + bool handle_protovar(LogMessage *msg, ::google::pubsub::v1::PubsubMessage *message, size_t *message_bytes); + bool handle_data_attributes(LogMessage *msg, ::google::pubsub::v1::PubsubMessage *message, size_t *message_bytes); }; } diff --git a/modules/grpc/pubsub/pubsub-dest.cpp b/modules/grpc/pubsub/pubsub-dest.cpp index 5399bfa03..fde0af0f5 100644 --- a/modules/grpc/pubsub/pubsub-dest.cpp +++ b/modules/grpc/pubsub/pubsub-dest.cpp @@ -46,10 +46,9 @@ DestDriver::DestDriver(GrpcDestDriver *s) this->batch_bytes = MAX_BATCH_BYTES; GlobalConfig *cfg = log_pipe_get_config(&s->super.super.super.super); - LogTemplate *default_data_template = log_template_new(cfg, NULL); - g_assert(log_template_compile(default_data_template, "$MESSAGE", NULL)); - this->set_data(default_data_template); - log_template_unref(default_data_template); + this->default_data_template = log_template_new(cfg, NULL); + g_assert(log_template_compile(this->default_data_template, "$MESSAGE", NULL)); + this->set_data(this->default_data_template); } DestDriver::~DestDriver() @@ -57,6 +56,8 @@ DestDriver::~DestDriver() log_template_unref(this->project); log_template_unref(this->topic); log_template_unref(this->data); + log_template_unref(this->protovar); + log_template_unref(this->default_data_template); } bool @@ -77,6 +78,12 @@ DestDriver::init() log_pipe_location_tag(&this->super->super.super.super.super)); return false; } + if ((!this->attributes.empty() || this->data != this->default_data_template) && this->protovar != nullptr) + { + msg_error("Error initializing Google Pub/Sub destination: 'attributes()' and 'data()' cannot be used together with 'protovar()'. Please use either 'attributes()' and 'data()', or 'protovar()', but not both.", + log_pipe_location_tag(&this->super->super.super.super.super)); + return false; + } this->extend_worker_partition_key(std::string("project=") + this->project->template_str); this->extend_worker_partition_key(std::string("topic=") + this->topic->template_str); @@ -151,6 +158,18 @@ pubsub_dd_set_data(LogDriver *d, LogTemplate *data) cpp->set_data(data); } +gboolean +pubsub_dd_set_protovar(LogDriver *d, LogTemplate *proto) +{ + if (!log_template_is_trivial(proto)) + return FALSE; + + GrpcDestDriver *self = (GrpcDestDriver *) d; + DestDriver *cpp = pubsub_dd_get_cpp(self); + cpp->set_protovar(proto); + return TRUE; +} + void pubsub_dd_add_attribute(LogDriver *d, const gchar *name, LogTemplate *value) { diff --git a/modules/grpc/pubsub/pubsub-dest.h b/modules/grpc/pubsub/pubsub-dest.h index 12ae04880..0834cbb80 100644 --- a/modules/grpc/pubsub/pubsub-dest.h +++ b/modules/grpc/pubsub/pubsub-dest.h @@ -35,6 +35,7 @@ LogDriver *pubsub_dd_new(GlobalConfig *cfg); void pubsub_dd_set_project(LogDriver *d, LogTemplate *project); void pubsub_dd_set_topic(LogDriver *d, LogTemplate *topic); void pubsub_dd_set_data(LogDriver *d, LogTemplate *data); +gboolean pubsub_dd_set_protovar(LogDriver *d, LogTemplate *proto); void pubsub_dd_add_attribute(LogDriver *d, const gchar *name, LogTemplate *value); #include "compat/cpp-end.h" diff --git a/modules/grpc/pubsub/pubsub-dest.hpp b/modules/grpc/pubsub/pubsub-dest.hpp index 22ff14fd4..a106322fd 100644 --- a/modules/grpc/pubsub/pubsub-dest.hpp +++ b/modules/grpc/pubsub/pubsub-dest.hpp @@ -62,6 +62,12 @@ class DestDriver final : public syslogng::grpc::DestDriver this->data = log_template_ref(d); } + void set_protovar(LogTemplate *d) + { + log_template_unref(this->data); + this->protovar = log_template_ref(d); + } + void add_attribute(const std::string &name, LogTemplate *value) { this->attributes.push_back(NameValueTemplatePair{name, value}); @@ -74,6 +80,8 @@ class DestDriver final : public syslogng::grpc::DestDriver LogTemplate *project = nullptr; LogTemplate *topic = nullptr; LogTemplate *data = nullptr; + LogTemplate *protovar = nullptr; + LogTemplate *default_data_template = nullptr; std::vector attributes; }; diff --git a/modules/grpc/pubsub/pubsub-grammar.ym b/modules/grpc/pubsub/pubsub-grammar.ym index 7991bde78..8019ee44c 100644 --- a/modules/grpc/pubsub/pubsub-grammar.ym +++ b/modules/grpc/pubsub/pubsub-grammar.ym @@ -56,6 +56,7 @@ %token KW_TOPIC %token KW_DATA %token KW_ATTRIBUTES +%token KW_PROTOVAR %type pubsub_dest @@ -84,6 +85,8 @@ pubsub_dest_option | KW_TOPIC '(' template_name_or_content ')' { pubsub_dd_set_topic(last_driver, $3); log_template_unref($3); } | KW_DATA '(' template_name_or_content ')' { pubsub_dd_set_data(last_driver, $3); log_template_unref($3); } | KW_ATTRIBUTES '(' pubsub_dest_attributes ')' + | KW_PROTOVAR '(' template_name_or_content ')' { CHECK_ERROR(pubsub_dd_set_protovar(last_driver, $3), @1, "format is not trivial"); + log_template_unref($3); } | grpc_dest_general_option ; diff --git a/modules/grpc/pubsub/pubsub-parser.c b/modules/grpc/pubsub/pubsub-parser.c index 5dd4b627f..54943da88 100644 --- a/modules/grpc/pubsub/pubsub-parser.c +++ b/modules/grpc/pubsub/pubsub-parser.c @@ -39,6 +39,7 @@ static CfgLexerKeyword pubsub_keywords[] = { "topic", KW_TOPIC }, { "data", KW_DATA }, { "attributes", KW_ATTRIBUTES }, + { "proto_var", KW_PROTOVAR }, { NULL } }; From 2a3dfc991b14de20f2a77e3a418900d34a5ba8e1 Mon Sep 17 00:00:00 2001 From: shifter Date: Wed, 15 Jan 2025 09:15:29 +0100 Subject: [PATCH 4/4] filterx: add light test for pubsub_message Signed-off-by: shifter --- .../functional_tests/filterx/test_filterx.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/tests/light/functional_tests/filterx/test_filterx.py b/tests/light/functional_tests/filterx/test_filterx.py index e08de0cd7..c95bf0c02 100644 --- a/tests/light/functional_tests/filterx/test_filterx.py +++ b/tests/light/functional_tests/filterx/test_filterx.py @@ -2627,3 +2627,20 @@ def test_keys(config, syslog_ng): r""""direct_access":"foo"}""" + "\n" ) assert file_true.read_log() == exp + + +def test_pubsub_message(config, syslog_ng): + (file_true, file_false) = create_config( + config, r""" + $MSG = json(); + $MSG.msg = pubsub_message("my pubsub message", {"foo":"bar"}); + """, + ) + syslog_ng.start(config) + + assert file_true.get_stats()["processed"] == 1 + assert "processed" not in file_false.get_stats() + exp = ( + r"""{"msg":{"data":"my pubsub message","attributes":{"foo":"bar"}}}""" + "\n" + ) + assert file_true.read_log() == exp