diff --git a/modules/grpc/clickhouse/clickhouse-dest-worker.cpp b/modules/grpc/clickhouse/clickhouse-dest-worker.cpp index 9820510de..4641128c9 100644 --- a/modules/grpc/clickhouse/clickhouse-dest-worker.cpp +++ b/modules/grpc/clickhouse/clickhouse-dest-worker.cpp @@ -24,6 +24,7 @@ #include "clickhouse-dest-worker.hpp" #include "clickhouse-dest.hpp" +#include using syslogng::grpc::clickhouse::DestWorker; using syslogng::grpc::clickhouse::DestDriver; @@ -31,13 +32,30 @@ using syslogng::grpc::clickhouse::DestDriver; DestWorker::DestWorker(GrpcDestWorker *s) : syslogng::grpc::DestWorker(s) { + std::shared_ptr<::grpc::ChannelCredentials> credentials = this->create_credentials(); + if (!credentials) + { + msg_error("Error querying ClickHouse credentials", + evt_tag_str("url", this->owner.get_url().c_str()), + log_pipe_location_tag(&this->super->super.owner->super.super.super)); + throw std::runtime_error("Error querying ClickHouse credentials"); + } + + ::grpc::ChannelArguments args = this->create_channel_args(); + + this->channel = ::grpc::CreateCustomChannel(this->owner.get_url(), credentials, args); + this->stub = ::clickhouse::grpc::ClickHouse::NewStub(this->channel); +} + +bool +DestWorker::should_initiate_flush() +{ + return this->current_batch_bytes >= this->get_owner()->batch_bytes; } LogThreadedResult DestWorker::insert(LogMessage *msg) { -<<<<<<< Updated upstream -======= DestDriver *owner_ = this->get_owner(); std::streampos last_pos = this->query_data.tellp(); size_t row_bytes = 0; @@ -63,7 +81,6 @@ DestWorker::insert(LogMessage *msg) { this->client_context = std::make_unique<::grpc::ClientContext>(); prepare_context_dynamic(*this->client_context, msg); - this->client_context->AddMetadata("param_table", owner_->get_table()); } if (this->should_initiate_flush()) @@ -133,14 +150,62 @@ _map_grpc_status_to_log_threaded_result(const ::grpc::Status &status) evt_tag_int("error_code", status.error_code()), evt_tag_str("error_message", status.error_message().c_str()), evt_tag_str("error_details", status.error_details().c_str())); ->>>>>>> Stashed changes return LTR_NOT_CONNECTED; + +permanent_error: + msg_error("ClickHouse server responded with a permanent error status code, dropping batch", + evt_tag_int("error_code", status.error_code()), + evt_tag_str("error_message", status.error_message().c_str()), + evt_tag_str("error_details", status.error_details().c_str())); + return LTR_DROP; +} + +void +DestWorker::prepare_batch() +{ + this->query_data.str(""); + this->batch_size = 0; + this->current_batch_bytes = 0; + this->client_context.reset(); } LogThreadedResult DestWorker::flush(LogThreadedFlushMode mode) { - return LTR_ERROR; + if (this->batch_size == 0) + return LTR_SUCCESS; + + ::clickhouse::grpc::QueryInfo query_info; + ::clickhouse::grpc::Result query_result; + + this->prepare_query_info(query_info); + + ::grpc::Status status = this->stub->ExecuteQuery(this->client_context.get(), query_info, &query_result); + LogThreadedResult result = _map_grpc_status_to_log_threaded_result(status); + if (result != LTR_SUCCESS) + goto exit; + + if (query_result.has_exception()) + { + const ::clickhouse::grpc::Exception &exception = query_result.exception(); + msg_error("ClickHouse server responded with an exception, dropping batch", + evt_tag_int("code", exception.code()), + evt_tag_str("name", exception.name().c_str()), + evt_tag_str("display_text", exception.display_text().c_str()), + evt_tag_str("stack_trace", exception.stack_trace().c_str())); + result = LTR_DROP; + goto exit; + } + + log_threaded_dest_worker_written_bytes_add(&this->super->super, this->current_batch_bytes); + log_threaded_dest_driver_insert_batch_length_stats(this->super->super.owner, this->current_batch_bytes); + + msg_debug("ClickHouse batch delivered", log_pipe_location_tag(&this->super->super.owner->super.super.super)); + +exit: + this->get_owner()->metrics.insert_grpc_request_stats(status); + this->prepare_batch(); + return result; } DestDriver * diff --git a/modules/grpc/clickhouse/clickhouse-dest-worker.hpp b/modules/grpc/clickhouse/clickhouse-dest-worker.hpp index 051c3df92..22ba8fe43 100644 --- a/modules/grpc/clickhouse/clickhouse-dest-worker.hpp +++ b/modules/grpc/clickhouse/clickhouse-dest-worker.hpp @@ -27,6 +27,10 @@ #include "clickhouse-dest.hpp" #include "grpc-dest-worker.hpp" +#include + +#include "clickhouse_grpc.grpc.pb.h" + namespace syslogng { namespace grpc { namespace clickhouse { @@ -40,7 +44,19 @@ class DestWorker final : public syslogng::grpc::DestWorker LogThreadedResult flush(LogThreadedFlushMode mode); private: + bool should_initiate_flush(); + void prepare_query_info(::clickhouse::grpc::QueryInfo &query_info); + void prepare_batch(); DestDriver *get_owner(); + +private: + std::shared_ptr<::grpc::Channel> channel; + std::unique_ptr<::clickhouse::grpc::ClickHouse::Stub> stub; + std::unique_ptr<::grpc::ClientContext> client_context; + + std::ostringstream query_data; + size_t batch_size = 0; + size_t current_batch_bytes = 0; }; } diff --git a/modules/grpc/clickhouse/clickhouse-dest.cpp b/modules/grpc/clickhouse/clickhouse-dest.cpp index cb018f1f3..4a5138d6a 100644 --- a/modules/grpc/clickhouse/clickhouse-dest.cpp +++ b/modules/grpc/clickhouse/clickhouse-dest.cpp @@ -40,6 +40,7 @@ DestDriver::DestDriver(GrpcDestDriver *s) &this->template_options, &this->super->super.super.super.super) { this->url = "localhost:9100"; + this->enable_dynamic_headers(); } bool