Skip to content

Commit

Permalink
grpc/loki: support dynamic headers
Browse files Browse the repository at this point in the history
Signed-off-by: Attila Szakacs <[email protected]>
  • Loading branch information
alltilla committed Oct 12, 2024
1 parent 885051d commit 96ec7a2
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 8 deletions.
2 changes: 1 addition & 1 deletion modules/grpc/loki/loki-dest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ DestinationDriver::DestinationDriver(GrpcDestDriver *s)
: syslogng::grpc::DestDriver(s), timestamp(LM_TS_PROCESSED)
{
this->url = "localhost:9095";
this->flush_on_key_change = true;
this->enable_dynamic_headers();
}

DestinationDriver::~DestinationDriver()
Expand Down
17 changes: 10 additions & 7 deletions modules/grpc/loki/loki-worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ DestinationWorker::prepare_batch()
this->current_batch = logproto::PushRequest{};
this->current_batch.add_streams();
this->current_batch_bytes = 0;
this->client_context.reset();
}

bool
Expand Down Expand Up @@ -205,6 +206,14 @@ DestinationWorker::insert(LogMessage *msg)
this->current_batch_bytes += message->len;
log_threaded_dest_driver_insert_msg_length_stats(super->super.owner, message->len);

if (!this->client_context.get())
{
this->client_context = std::make_unique<::grpc::ClientContext>();
this->prepare_context_dynamic(*this->client_context, msg);
if (!owner_->tenant_id.empty())
client_context->AddMetadata("x-scope-orgid", owner_->tenant_id);
}

msg_trace("Message added to Loki batch", log_pipe_location_tag((LogPipe *) this->super->super.owner));

if (this->should_initiate_flush())
Expand All @@ -224,13 +233,7 @@ DestinationWorker::flush(LogThreadedFlushMode mode)
LogThreadedResult result;
logproto::PushResponse response{};

::grpc::ClientContext ctx;
this->prepare_context(ctx);

if (!owner_->tenant_id.empty())
ctx.AddMetadata("x-scope-orgid", owner_->tenant_id);

::grpc::Status status = this->stub->Push(&ctx, this->current_batch, &response);
::grpc::Status status = this->stub->Push(client_context.get(), this->current_batch, &response);
this->get_owner()->metrics.insert_grpc_request_stats(status);

if (!status.ok())
Expand Down
1 change: 1 addition & 0 deletions modules/grpc/loki/loki-worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class DestinationWorker final: public syslogng::grpc::DestWorker
bool connected;

std::shared_ptr<::grpc::Channel> channel;
std::unique_ptr<::grpc::ClientContext> client_context;
std::unique_ptr<logproto::Pusher::Stub> stub;
logproto::PushRequest current_batch;
size_t current_batch_bytes = 0;
Expand Down

0 comments on commit 96ec7a2

Please sign in to comment.