Skip to content

Commit

Permalink
grpc/clickhouse: implement worker
Browse files Browse the repository at this point in the history
Signed-off-by: Attila Szakacs <[email protected]>
  • Loading branch information
alltilla committed Oct 26, 2024
1 parent f8434d6 commit f156577
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 5 deletions.
75 changes: 70 additions & 5 deletions modules/grpc/clickhouse/clickhouse-dest-worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,38 @@
#include "clickhouse-dest-worker.hpp"
#include "clickhouse-dest.hpp"

#include <google/protobuf/util/delimited_message_util.h>

using syslogng::grpc::clickhouse::DestWorker;
using syslogng::grpc::clickhouse::DestDriver;

DestWorker::DestWorker(GrpcDestWorker *s)
: syslogng::grpc::DestWorker(s)
{
std::shared_ptr<::grpc::ChannelCredentials> credentials = this->create_credentials();
if (!credentials)
{
msg_error("Error querying ClickHouse credentials",
evt_tag_str("url", this->owner.get_url().c_str()),
log_pipe_location_tag(&this->super->super.owner->super.super.super));
throw std::runtime_error("Error querying ClickHouse credentials");
}

::grpc::ChannelArguments args = this->create_channel_args();

this->channel = ::grpc::CreateCustomChannel(this->owner.get_url(), credentials, args);
this->stub = ::clickhouse::grpc::ClickHouse::NewStub(this->channel);
}

bool
DestWorker::should_initiate_flush()
{
return this->current_batch_bytes >= this->get_owner()->batch_bytes;
}

LogThreadedResult
DestWorker::insert(LogMessage *msg)
{
<<<<<<< Updated upstream
=======
DestDriver *owner_ = this->get_owner();
std::streampos last_pos = this->query_data.tellp();
size_t row_bytes = 0;
Expand All @@ -63,7 +81,6 @@ DestWorker::insert(LogMessage *msg)
{
this->client_context = std::make_unique<::grpc::ClientContext>();
prepare_context_dynamic(*this->client_context, msg);
this->client_context->AddMetadata("param_table", owner_->get_table());
}

if (this->should_initiate_flush())
Expand Down Expand Up @@ -133,14 +150,62 @@ _map_grpc_status_to_log_threaded_result(const ::grpc::Status &status)
evt_tag_int("error_code", status.error_code()),
evt_tag_str("error_message", status.error_message().c_str()),
evt_tag_str("error_details", status.error_details().c_str()));
>>>>>>> Stashed changes
return LTR_NOT_CONNECTED;

permanent_error:
msg_error("ClickHouse server responded with a permanent error status code, dropping batch",
evt_tag_int("error_code", status.error_code()),
evt_tag_str("error_message", status.error_message().c_str()),
evt_tag_str("error_details", status.error_details().c_str()));
return LTR_DROP;
}

void
DestWorker::prepare_batch()
{
this->query_data.str("");
this->batch_size = 0;
this->current_batch_bytes = 0;
this->client_context.reset();
}

LogThreadedResult
DestWorker::flush(LogThreadedFlushMode mode)
{
return LTR_ERROR;
if (this->batch_size == 0)
return LTR_SUCCESS;

::clickhouse::grpc::QueryInfo query_info;
::clickhouse::grpc::Result query_result;

this->prepare_query_info(query_info);

::grpc::Status status = this->stub->ExecuteQuery(this->client_context.get(), query_info, &query_result);
LogThreadedResult result = _map_grpc_status_to_log_threaded_result(status);
if (result != LTR_SUCCESS)
goto exit;

if (query_result.has_exception())
{
const ::clickhouse::grpc::Exception &exception = query_result.exception();
msg_error("ClickHouse server responded with an exception, dropping batch",
evt_tag_int("code", exception.code()),
evt_tag_str("name", exception.name().c_str()),
evt_tag_str("display_text", exception.display_text().c_str()),
evt_tag_str("stack_trace", exception.stack_trace().c_str()));
result = LTR_DROP;
goto exit;
}

log_threaded_dest_worker_written_bytes_add(&this->super->super, this->current_batch_bytes);
log_threaded_dest_driver_insert_batch_length_stats(this->super->super.owner, this->current_batch_bytes);

msg_debug("ClickHouse batch delivered", log_pipe_location_tag(&this->super->super.owner->super.super.super));

exit:
this->get_owner()->metrics.insert_grpc_request_stats(status);
this->prepare_batch();
return result;
}

DestDriver *
Expand Down
16 changes: 16 additions & 0 deletions modules/grpc/clickhouse/clickhouse-dest-worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
#include "clickhouse-dest.hpp"
#include "grpc-dest-worker.hpp"

#include <sstream>

#include "clickhouse_grpc.grpc.pb.h"

namespace syslogng {
namespace grpc {
namespace clickhouse {
Expand All @@ -40,7 +44,19 @@ class DestWorker final : public syslogng::grpc::DestWorker
LogThreadedResult flush(LogThreadedFlushMode mode);

private:
bool should_initiate_flush();
void prepare_query_info(::clickhouse::grpc::QueryInfo &query_info);
void prepare_batch();
DestDriver *get_owner();

private:
std::shared_ptr<::grpc::Channel> channel;
std::unique_ptr<::clickhouse::grpc::ClickHouse::Stub> stub;
std::unique_ptr<::grpc::ClientContext> client_context;

std::ostringstream query_data;
size_t batch_size = 0;
size_t current_batch_bytes = 0;
};

}
Expand Down
1 change: 1 addition & 0 deletions modules/grpc/clickhouse/clickhouse-dest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ DestDriver::DestDriver(GrpcDestDriver *s)
&this->template_options, &this->super->super.super.super.super)
{
this->url = "localhost:9100";
this->enable_dynamic_headers();
}

bool
Expand Down

0 comments on commit f156577

Please sign in to comment.