Skip to content

Commit

Permalink
grpc/clickhouse: implement worker
Browse files Browse the repository at this point in the history
Signed-off-by: Attila Szakacs <[email protected]>
  • Loading branch information
alltilla committed Oct 22, 2024
1 parent 4e99781 commit 342617b
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 1 deletion.
165 changes: 164 additions & 1 deletion modules/grpc/clickhouse/clickhouse-dest-worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,188 @@
#include "clickhouse-dest-worker.hpp"
#include "clickhouse-dest.hpp"

#include <google/protobuf/util/delimited_message_util.h>

using syslogng::grpc::clickhouse::DestWorker;
using syslogng::grpc::clickhouse::DestDriver;

DestWorker::DestWorker(GrpcDestWorker *s)
: syslogng::grpc::DestWorker(s)
{
std::shared_ptr<::grpc::ChannelCredentials> credentials = this->create_credentials();
if (!credentials)
{
msg_error("Error querying ClickHouse credentials",
evt_tag_str("url", this->owner.get_url().c_str()),
log_pipe_location_tag(&this->super->super.owner->super.super.super));
throw std::runtime_error("Error querying ClickHouse credentials");
}

::grpc::ChannelArguments args = this->create_channel_args();

this->channel = ::grpc::CreateCustomChannel(this->owner.get_url(), credentials, args);
this->stub = ::clickhouse::grpc::ClickHouse::NewStub(this->channel);
}

bool
DestWorker::should_initiate_flush()
{
return this->current_batch_bytes >= this->get_owner()->batch_bytes;
}

LogThreadedResult
DestWorker::insert(LogMessage *msg)
{
DestDriver *owner_ = this->get_owner();
std::streampos last_pos = this->query_data.tellp();
size_t row_bytes = 0;

google::protobuf::Message *message = owner_->schema.format(msg, this->super->super.seq_num);
if (!message)
goto drop;

this->batch_size++;

if (!google::protobuf::util::SerializeDelimitedToOstream(*message, &this->query_data))
goto drop;

row_bytes = this->query_data.tellp() - last_pos;
this->current_batch_bytes += row_bytes;
log_threaded_dest_driver_insert_msg_length_stats(this->super->super.owner, row_bytes);

msg_trace("Message added to ClickHouse batch", log_pipe_location_tag(&this->super->super.owner->super.super.super));

delete message;

if (!this->client_context.get())
{
this->client_context = std::make_unique<::grpc::ClientContext>();
prepare_context_dynamic(*this->client_context, msg);
}

if (this->should_initiate_flush())
return log_threaded_dest_worker_flush(&this->super->super, LTF_FLUSH_NORMAL);

return LTR_QUEUED;

drop:
if (!(owner_->template_options.on_error & ON_ERROR_SILENT))
{
msg_error("Failed to format message for ClickHouse, dropping message",
log_pipe_location_tag(&this->super->super.owner->super.super.super));
}

/* LTR_DROP currently drops the entire batch */
return LTR_QUEUED;
}

void
DestWorker::prepare_query_info(::clickhouse::grpc::QueryInfo &query_info)
{
DestDriver *owner_ = this->get_owner();

query_info.set_database(owner_->get_database());
query_info.set_user_name(owner_->get_user());
query_info.set_password(owner_->get_password());
query_info.set_query(owner_->get_query());
query_info.set_input_data(this->query_data.str());
}

static LogThreadedResult
_map_grpc_status_to_log_threaded_result(const ::grpc::Status &status)
{
// TODO: this is based on OTLP, we should check how the ClickHouse gRPC server behaves

switch (status.error_code())
{
case ::grpc::StatusCode::OK:
return LTR_SUCCESS;
case ::grpc::StatusCode::UNAVAILABLE:
case ::grpc::StatusCode::CANCELLED:
case ::grpc::StatusCode::DEADLINE_EXCEEDED:
case ::grpc::StatusCode::ABORTED:
case ::grpc::StatusCode::OUT_OF_RANGE:
case ::grpc::StatusCode::DATA_LOSS:
goto temporary_error;
case ::grpc::StatusCode::UNKNOWN:
case ::grpc::StatusCode::INVALID_ARGUMENT:
case ::grpc::StatusCode::NOT_FOUND:
case ::grpc::StatusCode::ALREADY_EXISTS:
case ::grpc::StatusCode::PERMISSION_DENIED:
case ::grpc::StatusCode::UNAUTHENTICATED:
case ::grpc::StatusCode::FAILED_PRECONDITION:
case ::grpc::StatusCode::UNIMPLEMENTED:
case ::grpc::StatusCode::INTERNAL:
goto permanent_error;
case ::grpc::StatusCode::RESOURCE_EXHAUSTED:
if (status.error_details().length() > 0)
goto temporary_error;
goto permanent_error;
default:
g_assert_not_reached();
}

temporary_error:
msg_debug("ClickHouse server responded with a temporary error status code, retrying after time-reopen() seconds",
evt_tag_int("error_code", status.error_code()),
evt_tag_str("error_message", status.error_message().c_str()),
evt_tag_str("error_details", status.error_details().c_str()));
return LTR_NOT_CONNECTED;

permanent_error:
msg_error("ClickHouse server responded with a permanent error status code, dropping batch",
evt_tag_int("error_code", status.error_code()),
evt_tag_str("error_message", status.error_message().c_str()),
evt_tag_str("error_details", status.error_details().c_str()));
return LTR_DROP;
}

void
DestWorker::prepare_batch()
{
this->query_data.str("");
this->batch_size = 0;
this->current_batch_bytes = 0;
this->client_context.reset();
}

LogThreadedResult
DestWorker::flush(LogThreadedFlushMode mode)
{
return LTR_ERROR;
if (this->batch_size == 0)
return LTR_SUCCESS;

::clickhouse::grpc::QueryInfo query_info;
::clickhouse::grpc::Result query_result;

this->prepare_query_info(query_info);

::grpc::Status status = this->stub->ExecuteQuery(this->client_context.get(), query_info, &query_result);
LogThreadedResult result = _map_grpc_status_to_log_threaded_result(status);
if (result != LTR_SUCCESS)
goto exit;

if (query_result.has_exception())
{
const ::clickhouse::grpc::Exception &exception = query_result.exception();
msg_error("ClickHouse server responded with an exception, dropping batch",
evt_tag_int("code", exception.code()),
evt_tag_str("name", exception.name().c_str()),
evt_tag_str("display_text", exception.display_text().c_str()),
evt_tag_str("stack_trace", exception.stack_trace().c_str()));
result = LTR_DROP;
goto exit;
}

log_threaded_dest_worker_written_bytes_add(&this->super->super, this->current_batch_bytes);
log_threaded_dest_driver_insert_batch_length_stats(this->super->super.owner, this->current_batch_bytes);

msg_debug("ClickHouse batch delivered", log_pipe_location_tag(&this->super->super.owner->super.super.super));

exit:
this->get_owner()->metrics.insert_grpc_request_stats(status);
this->prepare_batch();
return result;
}

DestDriver *
Expand Down
16 changes: 16 additions & 0 deletions modules/grpc/clickhouse/clickhouse-dest-worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
#include "clickhouse-dest.hpp"
#include "grpc-dest-worker.hpp"

#include <sstream>

#include "clickhouse_grpc.grpc.pb.h"

namespace syslogng {
namespace grpc {
namespace clickhouse {
Expand All @@ -40,7 +44,19 @@ class DestWorker final : public syslogng::grpc::DestWorker
LogThreadedResult flush(LogThreadedFlushMode mode);

private:
bool should_initiate_flush();
void prepare_query_info(::clickhouse::grpc::QueryInfo &query_info);
void prepare_batch();
DestDriver *get_owner();

private:
std::shared_ptr<::grpc::Channel> channel;
std::unique_ptr<::clickhouse::grpc::ClickHouse::Stub> stub;
std::unique_ptr<::grpc::ClientContext> client_context;

std::ostringstream query_data;
size_t batch_size = 0;
size_t current_batch_bytes = 0;
};

}
Expand Down
1 change: 1 addition & 0 deletions modules/grpc/clickhouse/clickhouse-dest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ DestDriver::DestDriver(GrpcDestDriver *s)
&this->template_options, &this->super->super.super.super.super)
{
this->url = "localhost:9100";
this->enable_dynamic_headers();
}

bool
Expand Down

0 comments on commit 342617b

Please sign in to comment.