Skip to content

Commit

Permalink
Merge pull request #338 from alltilla/grpc-schema-common
Browse files Browse the repository at this point in the history
grpc: factor out schema related functionality from bigquery to common
  • Loading branch information
OverOrion authored Oct 22, 2024
2 parents bda0770 + 9667e86 commit 3d33f33
Show file tree
Hide file tree
Showing 20 changed files with 680 additions and 490 deletions.
237 changes: 25 additions & 212 deletions modules/grpc/bigquery/bigquery-dest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,128 +31,61 @@
#include "messages.h"
#include "compat/cpp-end.h"

#include <absl/strings/string_view.h>

#include <cstring>
#include <strings.h>

using syslogng::grpc::bigquery::DestinationDriver;

static void
_template_unref(gpointer data)
{
LogTemplate *tpl = (LogTemplate *) data;
log_template_unref(tpl);
}

namespace {
class ErrorCollector : public google::protobuf::compiler::MultiFileErrorCollector
{
public:
ErrorCollector() {}
~ErrorCollector() override {}

// override is missing for compatibility with older protobuf versions
void RecordError(absl::string_view filename, int line, int column, absl::string_view message)
{
std::string file{filename};
std::string msg{message};

msg_error("Error parsing protobuf-schema() file",
evt_tag_str("filename", file.c_str()), evt_tag_int("line", line), evt_tag_int("column", column),
evt_tag_str("error", msg.c_str()));
}

// override is missing for compatibility with older protobuf versions
void RecordWarning(absl::string_view filename, int line, int column, absl::string_view message)
{
std::string file{filename};
std::string msg{message};

msg_error("Warning during parsing protobuf-schema() file",
evt_tag_str("filename", file.c_str()), evt_tag_int("line", line), evt_tag_int("column", column),
evt_tag_str("warning", msg.c_str()));
}

private:
/* deprecated interface */
void AddError(const std::string &filename, int line, int column, const std::string &message)
{
this->RecordError(filename, line, column, message);
}

void AddWarning(const std::string &filename, int line, int column, const std::string &message)
{
this->RecordWarning(filename, line, column, message);
}
};
}

DestinationDriver::DestinationDriver(GrpcDestDriver *s)
: syslogng::grpc::DestDriver(s)
: syslogng::grpc::DestDriver(s),
schema(2, "bigquery_record.proto", "BigQueryRecord", map_schema_type,
&this->template_options, &this->super->super.super.super.super)
{
this->url = "bigquerystorage.googleapis.com";
this->credentials_builder.set_mode(GCAM_ADC);
}

DestinationDriver::~DestinationDriver()
{
g_list_free_full(this->protobuf_schema.values, _template_unref);
}

bool
DestinationDriver::add_field(std::string name, std::string type, LogTemplate *value)
DestinationDriver::map_schema_type(const std::string &type_in, google::protobuf::FieldDescriptorProto::Type &type_out)
{
/* https://cloud.google.com/bigquery/docs/write-api#data_type_conversions */

google::protobuf::FieldDescriptorProto::Type proto_type;
const char *type_str = type.c_str();
if (type.empty() || strcasecmp(type_str, "STRING") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_STRING;
const char *type_str = type_in.c_str();
if (type_in.empty() || strcasecmp(type_str, "STRING") == 0)
type_out = google::protobuf::FieldDescriptorProto::TYPE_STRING;
else if (strcasecmp(type_str, "BYTES") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_BYTES;
type_out = google::protobuf::FieldDescriptorProto::TYPE_BYTES;
else if (strcasecmp(type_str, "INTEGER") == 0 || strcasecmp(type_str, "INT64") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_INT64;
type_out = google::protobuf::FieldDescriptorProto::TYPE_INT64;
else if (strcasecmp(type_str, "FLOAT") == 0 || strcasecmp(type_str, "FLOAT64") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_DOUBLE;
type_out = google::protobuf::FieldDescriptorProto::TYPE_DOUBLE;
else if (strcasecmp(type_str, "BOOLEAN") == 0 || strcasecmp(type_str, "BOOL") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_BOOL;
type_out = google::protobuf::FieldDescriptorProto::TYPE_BOOL;
else if (strcasecmp(type_str, "TIMESTAMP") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_INT64;
type_out = google::protobuf::FieldDescriptorProto::TYPE_INT64;
else if (strcasecmp(type_str, "DATE") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_INT32;
type_out = google::protobuf::FieldDescriptorProto::TYPE_INT32;
else if (strcasecmp(type_str, "TIME") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_STRING;
type_out = google::protobuf::FieldDescriptorProto::TYPE_STRING;
else if (strcasecmp(type_str, "DATETIME") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_STRING;
type_out = google::protobuf::FieldDescriptorProto::TYPE_STRING;
else if (strcasecmp(type_str, "JSON") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_STRING;
type_out = google::protobuf::FieldDescriptorProto::TYPE_STRING;
else if (strcasecmp(type_str, "NUMERIC") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_INT64;
type_out = google::protobuf::FieldDescriptorProto::TYPE_INT64;
else if (strcasecmp(type_str, "BIGNUMERIC") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_STRING;
type_out = google::protobuf::FieldDescriptorProto::TYPE_STRING;
else if (strcasecmp(type_str, "GEOGRAPHY") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_STRING;
type_out = google::protobuf::FieldDescriptorProto::TYPE_STRING;
else if (strcasecmp(type_str, "RECORD") == 0 || strcasecmp(type_str, "STRUCT") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_MESSAGE;
type_out = google::protobuf::FieldDescriptorProto::TYPE_MESSAGE;
else if (strcasecmp(type_str, "INTERVAL") == 0)
proto_type = google::protobuf::FieldDescriptorProto::TYPE_STRING;
type_out = google::protobuf::FieldDescriptorProto::TYPE_STRING;
else
return false;

this->fields.push_back(Field{name, proto_type, value});

return true;
}

void
DestinationDriver::set_protobuf_schema(std::string proto_path, GList *values)
{
this->protobuf_schema.proto_path = proto_path;

g_list_free_full(this->protobuf_schema.values, _template_unref);
this->protobuf_schema.values = values;
}

bool
DestinationDriver::init()
{
Expand All @@ -164,15 +97,10 @@ DestinationDriver::init()
return false;
}

if (this->protobuf_schema.proto_path.empty())
this->construct_schema_prototype();
else
{
if (!this->protobuf_schema.loaded && !this->load_protobuf_schema())
return false;
}
if (!this->schema.init())
return false;

if (this->fields.size() == 0)
if (this->schema.empty())
{
msg_error("Error initializing BigQuery destination, schema() or protobuf-schema() is empty",
log_pipe_location_tag(&this->super->super.super.super.super));
Expand Down Expand Up @@ -224,105 +152,6 @@ DestinationDriver::construct_worker(int worker_index)
return &worker->super;
}

void
DestinationDriver::construct_schema_prototype()
{
this->msg_factory = std::make_unique<google::protobuf::DynamicMessageFactory>();
this->descriptor_pool.~DescriptorPool();
new (&this->descriptor_pool) google::protobuf::DescriptorPool();

google::protobuf::FileDescriptorProto file_descriptor_proto;
file_descriptor_proto.set_name("bigquery_record.proto");
file_descriptor_proto.set_syntax("proto2");
google::protobuf::DescriptorProto *descriptor_proto = file_descriptor_proto.add_message_type();
descriptor_proto->set_name("BigQueryRecord");

int32_t num = 1;
for (auto &field : this->fields)
{
google::protobuf::FieldDescriptorProto *field_desc_proto = descriptor_proto->add_field();
field_desc_proto->set_name(field.nv.name);
field_desc_proto->set_type(field.type);
field_desc_proto->set_number(num++);
}


const google::protobuf::FileDescriptor *file_descriptor = this->descriptor_pool.BuildFile(file_descriptor_proto);
this->schema_descriptor = file_descriptor->message_type(0);

for (int i = 0; i < this->schema_descriptor->field_count(); ++i)
{
this->fields[i].field_desc = this->schema_descriptor->field(i);
}

this->schema_prototype = this->msg_factory->GetPrototype(this->schema_descriptor);
}

bool
DestinationDriver::load_protobuf_schema()
{
this->protobuf_schema.loaded = false;
this->msg_factory = std::make_unique<google::protobuf::DynamicMessageFactory>();
this->protobuf_schema.importer.reset(nullptr);

this->protobuf_schema.src_tree = std::make_unique<google::protobuf::compiler::DiskSourceTree>();
this->protobuf_schema.src_tree->MapPath(this->protobuf_schema.proto_path, this->protobuf_schema.proto_path);

this->protobuf_schema.error_coll = std::make_unique<ErrorCollector>();

this->protobuf_schema.importer =
std::make_unique<google::protobuf::compiler::Importer>(this->protobuf_schema.src_tree.get(),
this->protobuf_schema.error_coll.get());

const google::protobuf::FileDescriptor *file_descriptor =
this->protobuf_schema.importer->Import(this->protobuf_schema.proto_path);

if (!file_descriptor || file_descriptor->message_type_count() == 0)
{
msg_error("Error initializing BigQuery destination, protobuf-schema() file can't be loaded",
log_pipe_location_tag(&this->super->super.super.super.super));
return false;
}

this->schema_descriptor = file_descriptor->message_type(0);

this->fields.clear();

GList *current_value = this->protobuf_schema.values;
for (int i = 0; i < this->schema_descriptor->field_count(); ++i)
{
auto field = this->schema_descriptor->field(i);

if (!current_value)
{
msg_error("Error initializing BigQuery destination, protobuf-schema() file has more fields than "
"values listed in the config",
log_pipe_location_tag(&this->super->super.super.super.super));
return false;
}

LogTemplate *value = (LogTemplate *) current_value->data;

this->fields.push_back(Field{field->name(), (google::protobuf::FieldDescriptorProto::Type) field->type(), value});
this->fields[i].field_desc = field;

current_value = current_value->next;
}

if (current_value)
{
msg_error("Error initializing BigQuery destination, protobuf-schema() file has less fields than "
"values listed in the config",
log_pipe_location_tag(&this->super->super.super.super.super));
return false;
}


this->schema_prototype = this->msg_factory->GetPrototype(this->schema_descriptor);
this->protobuf_schema.loaded = true;
return true;
}


/* C Wrappers */

Expand Down Expand Up @@ -355,22 +184,6 @@ void bigquery_dd_set_table(LogDriver *d, const gchar *table)
cpp->set_table(table);
}

gboolean
bigquery_dd_add_field(LogDriver *d, const gchar *name, const gchar *type, LogTemplate *value)
{
GrpcDestDriver *self = (GrpcDestDriver *) d;
DestinationDriver *cpp = bigquery_dd_get_cpp(self);
return cpp->add_field(name, type ? type : "", value);
}

void
bigquery_dd_set_protobuf_schema(LogDriver *d, const gchar *proto_path, GList *values)
{
GrpcDestDriver *self = (GrpcDestDriver *) d;
DestinationDriver *cpp = bigquery_dd_get_cpp(self);
cpp->set_protobuf_schema(proto_path, values);
}

LogDriver *
bigquery_dd_new(GlobalConfig *cfg)
{
Expand Down
3 changes: 0 additions & 3 deletions modules/grpc/bigquery/bigquery-dest.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ void bigquery_dd_set_project(LogDriver *d, const gchar *project);
void bigquery_dd_set_dataset(LogDriver *d, const gchar *dataset);
void bigquery_dd_set_table(LogDriver *d, const gchar *table);

gboolean bigquery_dd_add_field(LogDriver *d, const gchar *name, const gchar *type, LogTemplate *value);
void bigquery_dd_set_protobuf_schema(LogDriver *d, const gchar *proto_path, GList *values);

#include "compat/cpp-end.h"

#endif
Loading

0 comments on commit 3d33f33

Please sign in to comment.