Skip to content

Commit

Permalink
grpc/otel: support dynamic headers
Browse files Browse the repository at this point in the history
Signed-off-by: Attila Szakacs <[email protected]>
  • Loading branch information
alltilla committed Oct 12, 2024
1 parent 1237e18 commit 885051d
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 14 deletions.
24 changes: 11 additions & 13 deletions modules/grpc/otel/otel-dest-worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,12 @@ DestWorker::insert(LogMessage *msg)
g_assert_not_reached();
}

if (!client_context.get())
{
client_context = std::make_unique<::grpc::ClientContext>();
prepare_context_dynamic(*client_context, msg);
}

if (should_initiate_flush())
return log_threaded_dest_worker_flush(&super->super, LTF_FLUSH_NORMAL);

Expand Down Expand Up @@ -408,11 +414,8 @@ _map_grpc_status_to_log_threaded_result(const ::grpc::Status &status)
LogThreadedResult
DestWorker::flush_log_records()
{
::grpc::ClientContext client_context;
prepare_context(client_context);

logs_service_response.Clear();
::grpc::Status status = logs_service_stub->Export(&client_context, logs_service_request,
::grpc::Status status = logs_service_stub->Export(client_context.get(), logs_service_request,
&logs_service_response);
owner.metrics.insert_grpc_request_stats(status);
LogThreadedResult result = _map_grpc_status_to_log_threaded_result(status);
Expand All @@ -429,11 +432,8 @@ DestWorker::flush_log_records()
LogThreadedResult
DestWorker::flush_metrics()
{
::grpc::ClientContext client_context;
prepare_context(client_context);

metrics_service_response.Clear();
::grpc::Status status = metrics_service_stub->Export(&client_context, metrics_service_request,
::grpc::Status status = metrics_service_stub->Export(client_context.get(), metrics_service_request,
&metrics_service_response);
owner.metrics.insert_grpc_request_stats(status);
LogThreadedResult result = _map_grpc_status_to_log_threaded_result(status);
Expand All @@ -450,11 +450,8 @@ DestWorker::flush_metrics()
LogThreadedResult
DestWorker::flush_spans()
{
::grpc::ClientContext client_context;
prepare_context(client_context);

trace_service_response.Clear();
::grpc::Status status = trace_service_stub->Export(&client_context, trace_service_request,
::grpc::Status status = trace_service_stub->Export(client_context.get(), trace_service_request,
&trace_service_response);
owner.metrics.insert_grpc_request_stats(status);
LogThreadedResult result = _map_grpc_status_to_log_threaded_result(status);
Expand All @@ -471,7 +468,7 @@ DestWorker::flush_spans()
LogThreadedResult
DestWorker::flush(LogThreadedFlushMode mode)
{
LogThreadedResult result;
LogThreadedResult result = LTR_SUCCESS;

if (mode == LTF_FLUSH_EXPEDITE)
return LTR_RETRY;
Expand All @@ -498,6 +495,7 @@ DestWorker::flush(LogThreadedFlushMode mode)
}

exit:
client_context.reset();
logs_service_request.Clear();
metrics_service_request.Clear();
trace_service_request.Clear();
Expand Down
1 change: 1 addition & 0 deletions modules/grpc/otel/otel-dest-worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class DestWorker : public syslogng::grpc::DestWorker

protected:
std::shared_ptr<::grpc::Channel> channel;
std::unique_ptr<::grpc::ClientContext> client_context;
std::unique_ptr<LogsService::Stub> logs_service_stub;
std::unique_ptr<MetricsService::Stub> metrics_service_stub;
std::unique_ptr<TraceService::Stub> trace_service_stub;
Expand Down
6 changes: 6 additions & 0 deletions modules/grpc/otel/otel-dest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ using namespace syslogng::grpc::otel;

/* C++ Implementations */

DestDriver::DestDriver(GrpcDestDriver *s) :
syslogng::grpc::DestDriver(s)
{
this->enable_dynamic_headers();
}

const char *
DestDriver::generate_persist_name()
{
Expand Down
2 changes: 1 addition & 1 deletion modules/grpc/otel/otel-dest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ namespace otel {
class DestDriver: public syslogng::grpc::DestDriver
{
public:
DestDriver(GrpcDestDriver *s) : syslogng::grpc::DestDriver(s) {};
DestDriver(GrpcDestDriver *s);

const char *format_stats_key(StatsClusterKeyBuilder *kb);
const char *generate_persist_name();
Expand Down

0 comments on commit 885051d

Please sign in to comment.