diff --git a/modules/grpc/loki/loki-dest.cpp b/modules/grpc/loki/loki-dest.cpp index 4739ff44c..292c99888 100644 --- a/modules/grpc/loki/loki-dest.cpp +++ b/modules/grpc/loki/loki-dest.cpp @@ -43,7 +43,7 @@ DestinationDriver::DestinationDriver(GrpcDestDriver *s) : syslogng::grpc::DestDriver(s), timestamp(LM_TS_PROCESSED) { this->url = "localhost:9095"; - this->flush_on_key_change = true; + this->enable_dynamic_headers(); } DestinationDriver::~DestinationDriver() diff --git a/modules/grpc/loki/loki-worker.cpp b/modules/grpc/loki/loki-worker.cpp index 89f2113ea..c0bd1e606 100644 --- a/modules/grpc/loki/loki-worker.cpp +++ b/modules/grpc/loki/loki-worker.cpp @@ -121,6 +121,7 @@ DestinationWorker::prepare_batch() this->current_batch = logproto::PushRequest{}; this->current_batch.add_streams(); this->current_batch_bytes = 0; + this->client_context.reset(); } bool @@ -205,6 +206,14 @@ DestinationWorker::insert(LogMessage *msg) this->current_batch_bytes += message->len; log_threaded_dest_driver_insert_msg_length_stats(super->super.owner, message->len); + if (!this->client_context.get()) + { + this->client_context = std::make_unique<::grpc::ClientContext>(); + this->prepare_context_dynamic(*this->client_context, msg); + if (!owner_->tenant_id.empty()) + client_context->AddMetadata("x-scope-orgid", owner_->tenant_id); + } + msg_trace("Message added to Loki batch", log_pipe_location_tag((LogPipe *) this->super->super.owner)); if (this->should_initiate_flush()) @@ -224,13 +233,7 @@ DestinationWorker::flush(LogThreadedFlushMode mode) LogThreadedResult result; logproto::PushResponse response{}; - ::grpc::ClientContext ctx; - this->prepare_context(ctx); - - if (!owner_->tenant_id.empty()) - ctx.AddMetadata("x-scope-orgid", owner_->tenant_id); - - ::grpc::Status status = this->stub->Push(&ctx, this->current_batch, &response); + ::grpc::Status status = this->stub->Push(client_context.get(), this->current_batch, &response); this->get_owner()->metrics.insert_grpc_request_stats(status); if (!status.ok()) diff --git a/modules/grpc/loki/loki-worker.hpp b/modules/grpc/loki/loki-worker.hpp index 743d745a1..2e3e5cfe2 100644 --- a/modules/grpc/loki/loki-worker.hpp +++ b/modules/grpc/loki/loki-worker.hpp @@ -66,6 +66,7 @@ class DestinationWorker final: public syslogng::grpc::DestWorker bool connected; std::shared_ptr<::grpc::Channel> channel; + std::unique_ptr<::grpc::ClientContext> client_context; std::unique_ptr stub; logproto::PushRequest current_batch; size_t current_batch_bytes = 0;