Skip to content

Commit

Permalink
https://telecominfraproject.atlassian.net/browse/WIFI-12954
Browse files Browse the repository at this point in the history
Signed-off-by: stephb9959 <[email protected]>
  • Loading branch information
stephb9959 committed Sep 24, 2023
1 parent b5a0c96 commit 5cd421a
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 139 deletions.
2 changes: 1 addition & 1 deletion build
Original file line number Diff line number Diff line change
@@ -1 +1 @@
25
26
154 changes: 42 additions & 112 deletions src/framework/KafkaManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "fmt/format.h"
#include "framework/MicroServiceFuncs.h"
#include "cppkafka/utils/consumer_dispatcher.h"

namespace OpenWifi {

Expand Down Expand Up @@ -159,45 +160,49 @@ namespace OpenWifi {
}
});

bool AutoCommit = MicroServiceConfigGetBool("openwifi.kafka.auto.commit", false);
auto BatchSize = MicroServiceConfigGetInt("openwifi.kafka.consumer.batchsize", 100);
// bool AutoCommit = MicroServiceConfigGetBool("openwifi.kafka.auto.commit", false);
// auto BatchSize = MicroServiceConfigGetInt("openwifi.kafka.consumer.batchsize", 100);

Types::StringVec Topics;
KafkaManager()->Topics(Topics);
std::for_each(Topics_.begin(),Topics_.end(),
[&](const std::string & T) { Topics.emplace_back(T); });
Consumer.subscribe(Topics);

Running_ = true;
std::vector<cppkafka::Message> MsgVec;
while (Running_) {
try {
MsgVec.clear();
MsgVec.reserve(BatchSize);
MsgVec = Consumer.poll_batch(BatchSize, std::chrono::milliseconds(1000));
for (auto const &Msg : MsgVec) {
if (!Msg)
continue;
if (Msg.get_error()) {
if (!Msg.is_eof()) {
poco_error(Logger_,
fmt::format("Error: {}", Msg.get_error().to_string()));

Dispatcher_ = std::make_unique<cppkafka::ConsumerDispatcher>(Consumer);

Dispatcher_->run(
// Callback executed whenever a new message is consumed
[&](cppkafka::Message msg) {
// Print the key (if any)
std::lock_guard G(ConsumerMutex_);
auto It = Notifiers_.find(msg.get_topic());
if (It != Notifiers_.end()) {
const auto &FL = It->second;
for (const auto &[CallbackFunc, _] : FL) {
try {
CallbackFunc(msg.get_key(), msg.get_payload());
} catch(const Poco::Exception &E) {

} catch(...) {

}
if (!AutoCommit)
Consumer.commit(Msg);
continue;
}
KafkaManager()->Dispatch(Msg.get_topic().c_str(), Msg.get_key(), Msg.get_payload());
if (!AutoCommit)
Consumer.commit(Msg);
}
} catch (const cppkafka::HandleException &E) {
poco_warning(Logger_,
fmt::format("Caught a Kafka exception (consumer): {}", E.what()));
} catch (const Poco::Exception &E) {
Logger_.log(E);
} catch (...) {
poco_error(Logger_, "std::exception");
Consumer.commit(msg);
},
// Whenever there's an error (other than the EOF soft error)
[&Logger_](cppkafka::Error error) {
poco_warning(Logger_,fmt::format("Error: {}", error.to_string()));
},
// Whenever EOF is reached on a partition, print this
[&Logger_](cppkafka::ConsumerDispatcher::EndOfFile, const cppkafka::TopicPartition& topic_partition) {
poco_debug(Logger_,fmt::format("Partition {} EOF", topic_partition.get_partition()));
}
}
);

Consumer.unsubscribe();
poco_information(Logger_, "Stopped...");
}
Expand Down Expand Up @@ -225,37 +230,23 @@ namespace OpenWifi {

void KafkaConsumer::Start() {
if (!Running_) {
Running_ = true;
Worker_.start(*this);
}
}

void KafkaConsumer::Stop() {
if (Running_) {
Running_ = false;
Worker_.wakeUp();
Worker_.join();
}
}

void KafkaDispatcher::Start() {
if (!Running_) {
Running_ = true;
Worker_.start(*this);
}
}

void KafkaDispatcher::Stop() {
if (Running_) {
Running_ = false;
Queue_.wakeUpAll();
if(Dispatcher_) {
Dispatcher_->stop();
}
Worker_.join();
}
}

auto KafkaDispatcher::RegisterTopicWatcher(const std::string &Topic,
std::uint64_t KafkaConsumer::RegisterTopicWatcher(const std::string &Topic,
Types::TopicNotifyFunction &F) {
std::lock_guard G(Mutex_);
std::lock_guard G(ConsumerMutex_);
auto It = Notifiers_.find(Topic);
if (It == Notifiers_.end()) {
Types::TopicNotifyFunctionList L;
Expand All @@ -264,11 +255,12 @@ namespace OpenWifi {
} else {
It->second.emplace(It->second.end(), std::make_pair(F, FunctionId_));
}
Topics_.insert(Topic);
return FunctionId_++;
}

void KafkaDispatcher::UnregisterTopicWatcher(const std::string &Topic, int Id) {
std::lock_guard G(Mutex_);
void KafkaConsumer::UnregisterTopicWatcher(const std::string &Topic, int Id) {
std::lock_guard G(ConsumerMutex_);
auto It = Notifiers_.find(Topic);
if (It != Notifiers_.end()) {
Types::TopicNotifyFunctionList &L = It->second;
Expand All @@ -280,56 +272,17 @@ namespace OpenWifi {
}
}

void KafkaDispatcher::Dispatch(const char *Topic, const std::string &Key,
const std::string & Payload) {
std::lock_guard G(Mutex_);
auto It = Notifiers_.find(Topic);
if (It != Notifiers_.end()) {
Queue_.enqueueNotification(new KafkaMessage(Topic, Key, Payload));
}
}

void KafkaDispatcher::run() {
Poco::Logger &Logger_ =
Poco::Logger::create("KAFKA-DISPATCHER", KafkaManager()->Logger().getChannel());
poco_information(Logger_, "Starting...");
Poco::AutoPtr<Poco::Notification> Note(Queue_.waitDequeueNotification());
Utils::SetThreadName("kafka:dispatch");
while (Note && Running_) {
auto Msg = dynamic_cast<KafkaMessage *>(Note.get());
if (Msg != nullptr) {
auto It = Notifiers_.find(Msg->Topic());
if (It != Notifiers_.end()) {
const auto &FL = It->second;
for (const auto &[CallbackFunc, _] : FL) {
CallbackFunc(Msg->Key(), Msg->Payload());
}
}
}
Note = Queue_.waitDequeueNotification();
}
poco_information(Logger_, "Stopped...");
}

void KafkaDispatcher::Topics(std::vector<std::string> &T) {
T.clear();
for (const auto &[TopicName, _] : Notifiers_)
T.push_back(TopicName);
}

int KafkaManager::Start() {
if (!KafkaEnabled_)
return 0;
ConsumerThr_.Start();
ProducerThr_.Start();
Dispatcher_.Start();
return 0;
}

void KafkaManager::Stop() {
if (KafkaEnabled_) {
poco_information(Logger(), "Stopping...");
Dispatcher_.Stop();
ProducerThr_.Stop();
ConsumerThr_.Stop();
poco_information(Logger(), "Stopped...");
Expand All @@ -353,36 +306,13 @@ namespace OpenWifi {
}
}


void KafkaManager::Dispatch(const char *Topic, const std::string &Key,
const std::string &Payload) {
Dispatcher_.Dispatch(Topic, Key, Payload);
}

[[nodiscard]] std::string KafkaManager::WrapSystemId(const std::string & PayLoad) {
return fmt::format( R"lit({{ "system" : {{ "id" : {},
"host" : "{}" }},
"payload" : {} }})lit", MicroServiceID(),
MicroServicePrivateEndPoint(), PayLoad ) ;
}

uint64_t KafkaManager::RegisterTopicWatcher(const std::string &Topic,
Types::TopicNotifyFunction &F) {
if (KafkaEnabled_) {
return Dispatcher_.RegisterTopicWatcher(Topic, F);
} else {
return 0;
}
}

void KafkaManager::UnregisterTopicWatcher(const std::string &Topic, uint64_t Id) {
if (KafkaEnabled_) {
Dispatcher_.UnregisterTopicWatcher(Topic, Id);
}
}

void KafkaManager::Topics(std::vector<std::string> &T) { Dispatcher_.Topics(T); }

void KafkaManager::PartitionAssignment(const cppkafka::TopicPartitionList &partitions) {
poco_information(
Logger(), fmt::format("Partition assigned: {}...", partitions.front().get_partition()));
Expand Down
42 changes: 16 additions & 26 deletions src/framework/KafkaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,41 +39,30 @@ namespace OpenWifi {
void Produce(const char *Topic, const std::string &Key, const std::string & Payload);

private:
std::recursive_mutex Mutex_;
std::mutex Mutex_;
Poco::Thread Worker_;
mutable std::atomic_bool Running_ = false;
Poco::NotificationQueue Queue_;
};

class KafkaConsumer : public Poco::Runnable {
public:
void run() override;
void Start();
void Stop();

private:
std::recursive_mutex Mutex_;
Poco::Thread Worker_;
std::mutex ConsumerMutex_;
Types::NotifyTable Notifiers_;
Poco::Thread Worker_;
mutable std::atomic_bool Running_ = false;
};
uint64_t FunctionId_ = 1;
std::unique_ptr<cppkafka::ConsumerDispatcher> Dispatcher_;
std::set<std::string> Topics_;

class KafkaDispatcher : public Poco::Runnable {
public:
void Start();
void Stop();
auto RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F);
void UnregisterTopicWatcher(const std::string &Topic, int Id);
void Dispatch(const char *Topic, const std::string &Key, const std::string & Payload);
void run() override;
void Topics(std::vector<std::string> &T);

private:
std::recursive_mutex Mutex_;
Types::NotifyTable Notifiers_;
Poco::Thread Worker_;
mutable std::atomic_bool Running_ = false;
uint64_t FunctionId_ = 1;
Poco::NotificationQueue Queue_;
friend class KafkaManager;
std::uint64_t RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F);
void UnregisterTopicWatcher(const std::string &Topic, int Id);
};

class KafkaManager : public SubSystemServer {
Expand All @@ -96,19 +85,20 @@ namespace OpenWifi {
void PostMessage(const char *topic, const std::string &key,
const Poco::JSON::Object &Object, bool WrapMessage = true);

void Dispatch(const char *Topic, const std::string &Key, const std::string &Payload);
[[nodiscard]] std::string WrapSystemId(const std::string & PayLoad);
[[nodiscard]] inline bool Enabled() const { return KafkaEnabled_; }
uint64_t RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F);
void UnregisterTopicWatcher(const std::string &Topic, uint64_t Id);
void Topics(std::vector<std::string> &T);
inline std::uint64_t RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F) {
return ConsumerThr_.RegisterTopicWatcher(Topic,F);
}
inline void UnregisterTopicWatcher(const std::string &Topic, uint64_t Id) {
return ConsumerThr_.UnregisterTopicWatcher(Topic,Id);
}

private:
bool KafkaEnabled_ = false;
std::string SystemInfoWrapper_;
KafkaProducer ProducerThr_;
KafkaConsumer ConsumerThr_;
KafkaDispatcher Dispatcher_;

void PartitionAssignment(const cppkafka::TopicPartitionList &partitions);
void PartitionRevocation(const cppkafka::TopicPartitionList &partitions);
Expand Down

0 comments on commit 5cd421a

Please sign in to comment.