Skip to content

Commit

Permalink
Merge pull request #334 from alltilla/grpc-dynamic-headers
Browse files Browse the repository at this point in the history
otel, loki: support dynamic `headers()`
  • Loading branch information
MrAnno authored Oct 21, 2024
2 parents 148b7b1 + a12a4a8 commit 5a31414
Show file tree
Hide file tree
Showing 25 changed files with 214 additions and 144 deletions.
1 change: 1 addition & 0 deletions lib/logthrdest/logthrdestdrv.c
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,7 @@ log_threaded_dest_driver_free(LogPipe *s)
LogThreadedDestDriver *self = (LogThreadedDestDriver *)s;

g_free(self->workers);
log_template_unref(self->worker_partition_key);
log_dest_driver_free((LogPipe *)self);
}

Expand Down
15 changes: 1 addition & 14 deletions modules/grpc/bigquery/bigquery-dest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,11 @@ DestinationDriver::DestinationDriver(GrpcDestDriver *s)
{
this->url = "bigquerystorage.googleapis.com";
this->credentials_builder.set_mode(GCAM_ADC);
log_template_options_defaults(&this->template_options);
}

DestinationDriver::~DestinationDriver()
{
g_list_free_full(this->protobuf_schema.values, _template_unref);
log_template_options_destroy(&this->template_options);
}

bool
Expand Down Expand Up @@ -166,9 +164,6 @@ DestinationDriver::init()
return false;
}

GlobalConfig *cfg = log_pipe_get_config(&this->super->super.super.super.super);
log_template_options_init(&this->template_options, cfg);

if (this->protobuf_schema.proto_path.empty())
this->construct_schema_prototype();
else
Expand Down Expand Up @@ -246,7 +241,7 @@ DestinationDriver::construct_schema_prototype()
for (auto &field : this->fields)
{
google::protobuf::FieldDescriptorProto *field_desc_proto = descriptor_proto->add_field();
field_desc_proto->set_name(field.name);
field_desc_proto->set_name(field.nv.name);
field_desc_proto->set_type(field.type);
field_desc_proto->set_number(num++);
}
Expand Down Expand Up @@ -376,14 +371,6 @@ bigquery_dd_set_protobuf_schema(LogDriver *d, const gchar *proto_path, GList *va
cpp->set_protobuf_schema(proto_path, values);
}

LogTemplateOptions *
bigquery_dd_get_template_options(LogDriver *d)
{
GrpcDestDriver *self = (GrpcDestDriver *) d;
DestinationDriver *cpp = bigquery_dd_get_cpp(self);
return &cpp->get_template_options();
}

LogDriver *
bigquery_dd_new(GlobalConfig *cfg)
{
Expand Down
2 changes: 0 additions & 2 deletions modules/grpc/bigquery/bigquery-dest.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ void bigquery_dd_set_table(LogDriver *d, const gchar *table);
gboolean bigquery_dd_add_field(LogDriver *d, const gchar *name, const gchar *type, LogTemplate *value);
void bigquery_dd_set_protobuf_schema(LogDriver *d, const gchar *proto_path, GList *values);

LogTemplateOptions *bigquery_dd_get_template_options(LogDriver *d);

#include "compat/cpp-end.h"

#endif
23 changes: 4 additions & 19 deletions modules/grpc/bigquery/bigquery-dest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,33 +49,25 @@ namespace bigquery {

struct Field
{
std::string name;
NameValueTemplatePair nv;
google::protobuf::FieldDescriptorProto::Type type;
LogTemplate *value;
const google::protobuf::FieldDescriptor *field_desc;

Field(std::string name_, google::protobuf::FieldDescriptorProto::Type type_, LogTemplate *value_)
: name(name_), type(type_), value(log_template_ref(value_)), field_desc(nullptr) {}
: nv(name_, value_), type(type_), field_desc(nullptr) {}

Field(const Field &a)
: name(a.name), type(a.type), value(log_template_ref(a.value)), field_desc(a.field_desc) {}
: nv(a.nv), type(a.type), field_desc(a.field_desc) {}

Field &operator=(const Field &a)
{
name = a.name;
nv = a.nv;
type = a.type;
log_template_unref(value);
value = log_template_ref(a.value);
field_desc = a.field_desc;

return *this;
}

~Field()
{
log_template_unref(value);
}

};

class DestinationDriver final : public syslogng::grpc::DestDriver
Expand All @@ -91,11 +83,6 @@ class DestinationDriver final : public syslogng::grpc::DestDriver
bool add_field(std::string name, std::string type, LogTemplate *value);
void set_protobuf_schema(std::string proto_path, GList *values);

LogTemplateOptions &get_template_options()
{
return this->template_options;
}

void set_project(std::string p)
{
this->project = p;
Expand Down Expand Up @@ -132,8 +119,6 @@ class DestinationDriver final : public syslogng::grpc::DestDriver
bool load_protobuf_schema();

private:
LogTemplateOptions template_options;

std::string project;
std::string dataset;
std::string table;
Expand Down
1 change: 0 additions & 1 deletion modules/grpc/bigquery/bigquery-grammar.ym
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ bigquery_dest_option
free($3);
}
| grpc_dest_option
| { last_template_options = bigquery_dd_get_template_options(last_driver); } template_option
;

bigquery_schema_fields
Expand Down
4 changes: 2 additions & 2 deletions modules/grpc/bigquery/bigquery-worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,13 @@ DestinationWorker::insert_field(const google::protobuf::Reflection *reflection,

LogMessageValueType type;

Slice value = this->format_template(field.value, msg, buf, &type);
Slice value = this->format_template(field.nv.value, msg, buf, &type);

if (type == LM_VT_NULL)
{
if (field.field_desc->is_required())
{
msg_error("Missing required field", evt_tag_str("field", field.name.c_str()));
msg_error("Missing required field", evt_tag_str("field", field.nv.name.c_str()));
goto error;
}

Expand Down
37 changes: 36 additions & 1 deletion modules/grpc/common/grpc-dest-worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@

#include "grpc-dest-worker.hpp"

#include "compat/cpp-start.h"
#include "scratch-buffers.h"
#include "compat/cpp-end.h"

using namespace syslogng::grpc;

/* C++ Implementations */
Expand Down Expand Up @@ -60,6 +64,7 @@ DestWorker::create_channel_args()
args.SetCompressionAlgorithm(GRPC_COMPRESS_GZIP);

args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);

for (auto nv : this->owner.int_extra_channel_args)
args.SetInt(nv.first, nv.second);
Expand All @@ -78,6 +83,7 @@ DestWorker::init()
void
DestWorker::deinit()
{
log_threaded_dest_worker_deinit_method(&super->super);
}

bool
Expand All @@ -94,8 +100,37 @@ DestWorker::disconnect()
void
DestWorker::prepare_context(::grpc::ClientContext &context)
{
g_assert(!this->owner.dynamic_headers_enabled);

for (auto nv : owner.headers)
context.AddMetadata(nv.name, nv.value->template_str);
}

void
DestWorker::prepare_context_dynamic(::grpc::ClientContext &context, LogMessage *msg)
{
g_assert(this->owner.dynamic_headers_enabled);

LogTemplateEvalOptions options = {&this->owner.template_options, LTZ_SEND, this->super->super.seq_num, NULL,
LM_VT_STRING
};

ScratchBuffersMarker marker;
GString *buf = scratch_buffers_alloc_and_mark(&marker);

for (auto nv : owner.headers)
context.AddMetadata(nv.first, nv.second);
{
if (log_template_is_literal_string(nv.value))
{
context.AddMetadata(nv.name, log_template_get_literal_value(nv.value, NULL));
continue;
}

log_template_format(nv.value, msg, &options, buf);
context.AddMetadata(nv.name, buf->str);
}

scratch_buffers_reclaim_marked(marker);
}

/* C Wrappers */
Expand Down
1 change: 1 addition & 0 deletions modules/grpc/common/grpc-dest-worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class DestWorker

protected:
void prepare_context(::grpc::ClientContext &context);
void prepare_context_dynamic(::grpc::ClientContext &context, LogMessage *msg);
std::shared_ptr<::grpc::ChannelCredentials> create_credentials();
::grpc::ChannelArguments create_channel_args();

Expand Down
56 changes: 52 additions & 4 deletions modules/grpc/common/grpc-dest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,50 @@ using namespace syslogng::grpc;

DestDriver::DestDriver(GrpcDestDriver *s)
: super(s), compression(false), batch_bytes(4 * 1000 * 1000),
keepalive_time(-1), keepalive_timeout(-1), keepalive_max_pings_without_data(-1)
keepalive_time(-1), keepalive_timeout(-1), keepalive_max_pings_without_data(-1),
flush_on_key_change(false), dynamic_headers_enabled(false)
{
log_template_options_defaults(&this->template_options);
credentials_builder_wrapper.self = &credentials_builder;
}

DestDriver::~DestDriver()
{
log_template_options_destroy(&this->template_options);
}

bool
DestDriver::set_worker_partition_key()
{
GlobalConfig *cfg = log_pipe_get_config(&this->super->super.super.super.super);

LogTemplate *worker_partition_key_tpl = log_template_new(cfg, NULL);
if (!log_template_compile(worker_partition_key_tpl, this->worker_partition_key.str().c_str(), NULL))
{
msg_error("Error compiling worker partition key template",
evt_tag_str("template", this->worker_partition_key.str().c_str()));
return false;
}

if (log_template_is_literal_string(worker_partition_key_tpl))
{
log_template_unref(worker_partition_key_tpl);
}
else
{
log_threaded_dest_driver_set_worker_partition_key_ref(&this->super->super.super.super, worker_partition_key_tpl);
log_threaded_dest_driver_set_flush_on_worker_key_change(&this->super->super.super.super,
this->flush_on_key_change);
}

return true;
}

bool
DestDriver::init()
{
GlobalConfig *cfg = log_pipe_get_config(&this->super->super.super.super.super);

if (url.length() == 0)
{
msg_error("url() option is mandatory",
Expand All @@ -51,6 +87,11 @@ DestDriver::init()
return false;
}

if (this->worker_partition_key.rdbuf()->in_avail() && !this->set_worker_partition_key())
return false;

log_template_options_init(&this->template_options, cfg);

if (!log_threaded_dest_driver_init_method(&this->super->super.super.super.super))
return false;

Expand Down Expand Up @@ -128,11 +169,18 @@ grpc_dd_add_string_channel_arg(LogDriver *s, const gchar *name, const gchar *val
self->cpp->add_extra_channel_arg(name, value);
}

void
grpc_dd_add_header(LogDriver *s, const gchar *name, const gchar *value)
gboolean
grpc_dd_add_header(LogDriver *s, const gchar *name, LogTemplate *value)
{
GrpcDestDriver *self = (GrpcDestDriver *) s;
self->cpp->add_header(name, value);
return self->cpp->add_header(name, value);
}

LogTemplateOptions *
grpc_dd_get_template_options(LogDriver *d)
{
GrpcDestDriver *self = (GrpcDestDriver *) d;
return &self->cpp->get_template_options();
}

GrpcClientCredentialsBuilderW *
Expand Down
3 changes: 2 additions & 1 deletion modules/grpc/common/grpc-dest.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ void grpc_dd_set_keepalive_timeout(LogDriver *s, gint t);
void grpc_dd_set_keepalive_max_pings(LogDriver *s, gint p);
void grpc_dd_add_int_channel_arg(LogDriver *s, const gchar *name, glong value);
void grpc_dd_add_string_channel_arg(LogDriver *s, const gchar *name, const gchar *value);
void grpc_dd_add_header(LogDriver *s, const gchar *name, const gchar *value);
gboolean grpc_dd_add_header(LogDriver *s, const gchar *name, LogTemplate *value);
LogTemplateOptions *grpc_dd_get_template_options(LogDriver *d);
GrpcClientCredentialsBuilderW *grpc_dd_get_credentials_builder(LogDriver *s);

#include "compat/cpp-end.h"
Expand Down
Loading

0 comments on commit 5a31414

Please sign in to comment.