Skip to content

Commit

Permalink
Attempt to automatically create topics before subscribing
Browse files Browse the repository at this point in the history
  • Loading branch information
leviathan747 committed Feb 7, 2024
1 parent 6112fa9 commit 6b41d9c
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 1 deletion.
3 changes: 3 additions & 0 deletions core-cpp/kafka/include/kafka/Consumer.hh
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef Kafka_Consumer_HH
#define Kafka_Consumer_HH

#include "cppkafka/consumer.h"
#include "cppkafka/message.h"

#include <condition_variable>
Expand Down Expand Up @@ -35,6 +36,8 @@ private:
MessageQueue messageQueue;

void handleMessage();

void createTopics(cppkafka::Consumer& consumer, std::vector<std::string> topics);
};

} // namespace Kafka
Expand Down
79 changes: 78 additions & 1 deletion core-cpp/kafka/src/Consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
#include "kafka/ProcessHandler.hh"

#include "swa/CommandLine.hh"
#include "swa/Duration.hh"
#include "swa/Process.hh"
#include "swa/ProgramError.hh"
#include "swa/RealTimeSignalListener.hh"

#include "cppkafka/buffer.h"
#include "cppkafka/configuration.h"
#include "cppkafka/consumer.h"
#include "cppkafka/utils/consumer_dispatcher.h"

#include <uuid/uuid.h>
Expand Down Expand Up @@ -41,6 +42,12 @@ void Consumer::run() {
// Create the consumer
cppkafka::Consumer consumer(config);

// create topics if they don't already exist
createTopics(consumer, ProcessHandler::getInstance().getTopicNames());

// short delay to avoid race conditions if other processes initiated topic creation
SWA::delay(SWA::Duration::fromMillis(100));

// Subscribe to topics
consumer.subscribe(ProcessHandler::getInstance().getTopicNames());

Expand Down Expand Up @@ -89,6 +96,76 @@ void Consumer::handleMessage() {
}
}

void Consumer::createTopics(cppkafka::Consumer& consumer, std::vector<std::string> topics) {
// TODO clean up error handling in this routine
for (auto it = topics.begin(); it != topics.end(); it++) {

const char* topicname = (*it).data();
int partition_cnt = 1;
int replication_factor = 1;

rd_kafka_t *rk = consumer.get_handle();
rd_kafka_NewTopic_t *newt[1];
const size_t newt_cnt = 1;
rd_kafka_AdminOptions_t *options;
rd_kafka_queue_t *rkqu;
rd_kafka_event_t *rkev;
const rd_kafka_CreateTopics_result_t *res;
const rd_kafka_topic_result_t **terr;
int timeout_ms = 10000;
size_t res_cnt;
rd_kafka_resp_err_t err;
char errstr[512];

rkqu = rd_kafka_queue_new(rk);

newt[0] = rd_kafka_NewTopic_new(topicname, partition_cnt, replication_factor, errstr, sizeof(errstr));

if (newt[0] == NULL) {
throw SWA::ProgramError(errstr);
}

options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATETOPICS);
err = rd_kafka_AdminOptions_set_operation_timeout(options, timeout_ms, errstr, sizeof(errstr));

if (err) {
throw SWA::ProgramError(errstr);
}

rd_kafka_CreateTopics(rk, newt, newt_cnt, options, rkqu);

/* Wait for result */
rkev = rd_kafka_queue_poll(rkqu, timeout_ms + 2000);


if (rd_kafka_event_error(rkev)) {
throw SWA::ProgramError(rd_kafka_event_error_string(rkev));
}

res = rd_kafka_event_CreateTopics_result(rkev);

terr = rd_kafka_CreateTopics_result_topics(res, &res_cnt);

if (!terr) {
throw SWA::ProgramError("terr is null");
}

if (res_cnt != newt_cnt) {
throw SWA::ProgramError("res_cnt != newt_cnt");
}

if (rd_kafka_topic_result_error(terr[0]) && rd_kafka_topic_result_error(terr[0]) != RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS) {
throw SWA::ProgramError(std::string(rd_kafka_topic_result_name(terr[0])) + ": " + std::string(rd_kafka_topic_result_error_string(terr[0])));
}

rd_kafka_event_destroy(rkev);
rd_kafka_queue_destroy(rkqu);
rd_kafka_AdminOptions_destroy(options);
rd_kafka_NewTopic_destroy(newt[0]);

}
}

Consumer &Consumer::getInstance() {
static Consumer instance;
return instance;
Expand Down

0 comments on commit 6b41d9c

Please sign in to comment.