From 5cd421a4b9ed4320c9bc7242241b0a2d60081e09 Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Sun, 24 Sep 2023 10:52:22 -0700 Subject: [PATCH] https://telecominfraproject.atlassian.net/browse/WIFI-12954 Signed-off-by: stephb9959 --- build | 2 +- src/framework/KafkaManager.cpp | 154 +++++++++------------------------ src/framework/KafkaManager.h | 42 ++++----- 3 files changed, 59 insertions(+), 139 deletions(-) diff --git a/build b/build index 410b14d2c..978b4e8e5 100644 --- a/build +++ b/build @@ -1 +1 @@ -25 \ No newline at end of file +26 \ No newline at end of file diff --git a/src/framework/KafkaManager.cpp b/src/framework/KafkaManager.cpp index d90365291..938106538 100644 --- a/src/framework/KafkaManager.cpp +++ b/src/framework/KafkaManager.cpp @@ -6,6 +6,7 @@ #include "fmt/format.h" #include "framework/MicroServiceFuncs.h" +#include "cppkafka/utils/consumer_dispatcher.h" namespace OpenWifi { @@ -159,45 +160,49 @@ namespace OpenWifi { } }); - bool AutoCommit = MicroServiceConfigGetBool("openwifi.kafka.auto.commit", false); - auto BatchSize = MicroServiceConfigGetInt("openwifi.kafka.consumer.batchsize", 100); + // bool AutoCommit = MicroServiceConfigGetBool("openwifi.kafka.auto.commit", false); + // auto BatchSize = MicroServiceConfigGetInt("openwifi.kafka.consumer.batchsize", 100); Types::StringVec Topics; - KafkaManager()->Topics(Topics); + std::for_each(Topics_.begin(),Topics_.end(), + [&](const std::string & T) { Topics.emplace_back(T); }); Consumer.subscribe(Topics); Running_ = true; std::vector MsgVec; - while (Running_) { - try { - MsgVec.clear(); - MsgVec.reserve(BatchSize); - MsgVec = Consumer.poll_batch(BatchSize, std::chrono::milliseconds(1000)); - for (auto const &Msg : MsgVec) { - if (!Msg) - continue; - if (Msg.get_error()) { - if (!Msg.is_eof()) { - poco_error(Logger_, - fmt::format("Error: {}", Msg.get_error().to_string())); + + Dispatcher_ = std::make_unique(Consumer); + + Dispatcher_->run( + // Callback executed whenever a new message is consumed + [&](cppkafka::Message msg) { + // Print the key (if any) + std::lock_guard G(ConsumerMutex_); + auto It = Notifiers_.find(msg.get_topic()); + if (It != Notifiers_.end()) { + const auto &FL = It->second; + for (const auto &[CallbackFunc, _] : FL) { + try { + CallbackFunc(msg.get_key(), msg.get_payload()); + } catch(const Poco::Exception &E) { + + } catch(...) { + } - if (!AutoCommit) - Consumer.commit(Msg); - continue; } - KafkaManager()->Dispatch(Msg.get_topic().c_str(), Msg.get_key(), Msg.get_payload()); - if (!AutoCommit) - Consumer.commit(Msg); } - } catch (const cppkafka::HandleException &E) { - poco_warning(Logger_, - fmt::format("Caught a Kafka exception (consumer): {}", E.what())); - } catch (const Poco::Exception &E) { - Logger_.log(E); - } catch (...) { - poco_error(Logger_, "std::exception"); + Consumer.commit(msg); + }, + // Whenever there's an error (other than the EOF soft error) + [&Logger_](cppkafka::Error error) { + poco_warning(Logger_,fmt::format("Error: {}", error.to_string())); + }, + // Whenever EOF is reached on a partition, print this + [&Logger_](cppkafka::ConsumerDispatcher::EndOfFile, const cppkafka::TopicPartition& topic_partition) { + poco_debug(Logger_,fmt::format("Partition {} EOF", topic_partition.get_partition())); } - } + ); + Consumer.unsubscribe(); poco_information(Logger_, "Stopped..."); } @@ -225,7 +230,6 @@ namespace OpenWifi { void KafkaConsumer::Start() { if (!Running_) { - Running_ = true; Worker_.start(*this); } } @@ -233,29 +237,16 @@ namespace OpenWifi { void KafkaConsumer::Stop() { if (Running_) { Running_ = false; - Worker_.wakeUp(); - Worker_.join(); - } - } - - void KafkaDispatcher::Start() { - if (!Running_) { - Running_ = true; - Worker_.start(*this); - } - } - - void KafkaDispatcher::Stop() { - if (Running_) { - Running_ = false; - Queue_.wakeUpAll(); + if(Dispatcher_) { + Dispatcher_->stop(); + } Worker_.join(); } } - auto KafkaDispatcher::RegisterTopicWatcher(const std::string &Topic, + std::uint64_t KafkaConsumer::RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F) { - std::lock_guard G(Mutex_); + std::lock_guard G(ConsumerMutex_); auto It = Notifiers_.find(Topic); if (It == Notifiers_.end()) { Types::TopicNotifyFunctionList L; @@ -264,11 +255,12 @@ namespace OpenWifi { } else { It->second.emplace(It->second.end(), std::make_pair(F, FunctionId_)); } + Topics_.insert(Topic); return FunctionId_++; } - void KafkaDispatcher::UnregisterTopicWatcher(const std::string &Topic, int Id) { - std::lock_guard G(Mutex_); + void KafkaConsumer::UnregisterTopicWatcher(const std::string &Topic, int Id) { + std::lock_guard G(ConsumerMutex_); auto It = Notifiers_.find(Topic); if (It != Notifiers_.end()) { Types::TopicNotifyFunctionList &L = It->second; @@ -280,56 +272,17 @@ namespace OpenWifi { } } - void KafkaDispatcher::Dispatch(const char *Topic, const std::string &Key, - const std::string & Payload) { - std::lock_guard G(Mutex_); - auto It = Notifiers_.find(Topic); - if (It != Notifiers_.end()) { - Queue_.enqueueNotification(new KafkaMessage(Topic, Key, Payload)); - } - } - - void KafkaDispatcher::run() { - Poco::Logger &Logger_ = - Poco::Logger::create("KAFKA-DISPATCHER", KafkaManager()->Logger().getChannel()); - poco_information(Logger_, "Starting..."); - Poco::AutoPtr Note(Queue_.waitDequeueNotification()); - Utils::SetThreadName("kafka:dispatch"); - while (Note && Running_) { - auto Msg = dynamic_cast(Note.get()); - if (Msg != nullptr) { - auto It = Notifiers_.find(Msg->Topic()); - if (It != Notifiers_.end()) { - const auto &FL = It->second; - for (const auto &[CallbackFunc, _] : FL) { - CallbackFunc(Msg->Key(), Msg->Payload()); - } - } - } - Note = Queue_.waitDequeueNotification(); - } - poco_information(Logger_, "Stopped..."); - } - - void KafkaDispatcher::Topics(std::vector &T) { - T.clear(); - for (const auto &[TopicName, _] : Notifiers_) - T.push_back(TopicName); - } - int KafkaManager::Start() { if (!KafkaEnabled_) return 0; ConsumerThr_.Start(); ProducerThr_.Start(); - Dispatcher_.Start(); return 0; } void KafkaManager::Stop() { if (KafkaEnabled_) { poco_information(Logger(), "Stopping..."); - Dispatcher_.Stop(); ProducerThr_.Stop(); ConsumerThr_.Stop(); poco_information(Logger(), "Stopped..."); @@ -353,12 +306,6 @@ namespace OpenWifi { } } - - void KafkaManager::Dispatch(const char *Topic, const std::string &Key, - const std::string &Payload) { - Dispatcher_.Dispatch(Topic, Key, Payload); - } - [[nodiscard]] std::string KafkaManager::WrapSystemId(const std::string & PayLoad) { return fmt::format( R"lit({{ "system" : {{ "id" : {}, "host" : "{}" }}, @@ -366,23 +313,6 @@ namespace OpenWifi { MicroServicePrivateEndPoint(), PayLoad ) ; } - uint64_t KafkaManager::RegisterTopicWatcher(const std::string &Topic, - Types::TopicNotifyFunction &F) { - if (KafkaEnabled_) { - return Dispatcher_.RegisterTopicWatcher(Topic, F); - } else { - return 0; - } - } - - void KafkaManager::UnregisterTopicWatcher(const std::string &Topic, uint64_t Id) { - if (KafkaEnabled_) { - Dispatcher_.UnregisterTopicWatcher(Topic, Id); - } - } - - void KafkaManager::Topics(std::vector &T) { Dispatcher_.Topics(T); } - void KafkaManager::PartitionAssignment(const cppkafka::TopicPartitionList &partitions) { poco_information( Logger(), fmt::format("Partition assigned: {}...", partitions.front().get_partition())); diff --git a/src/framework/KafkaManager.h b/src/framework/KafkaManager.h index 31cf0932a..e6272d337 100644 --- a/src/framework/KafkaManager.h +++ b/src/framework/KafkaManager.h @@ -39,7 +39,7 @@ namespace OpenWifi { void Produce(const char *Topic, const std::string &Key, const std::string & Payload); private: - std::recursive_mutex Mutex_; + std::mutex Mutex_; Poco::Thread Worker_; mutable std::atomic_bool Running_ = false; Poco::NotificationQueue Queue_; @@ -47,33 +47,22 @@ namespace OpenWifi { class KafkaConsumer : public Poco::Runnable { public: - void run() override; void Start(); void Stop(); private: - std::recursive_mutex Mutex_; - Poco::Thread Worker_; + std::mutex ConsumerMutex_; + Types::NotifyTable Notifiers_; + Poco::Thread Worker_; mutable std::atomic_bool Running_ = false; - }; + uint64_t FunctionId_ = 1; + std::unique_ptr Dispatcher_; + std::set Topics_; - class KafkaDispatcher : public Poco::Runnable { - public: - void Start(); - void Stop(); - auto RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F); - void UnregisterTopicWatcher(const std::string &Topic, int Id); - void Dispatch(const char *Topic, const std::string &Key, const std::string & Payload); void run() override; - void Topics(std::vector &T); - - private: - std::recursive_mutex Mutex_; - Types::NotifyTable Notifiers_; - Poco::Thread Worker_; - mutable std::atomic_bool Running_ = false; - uint64_t FunctionId_ = 1; - Poco::NotificationQueue Queue_; + friend class KafkaManager; + std::uint64_t RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F); + void UnregisterTopicWatcher(const std::string &Topic, int Id); }; class KafkaManager : public SubSystemServer { @@ -96,19 +85,20 @@ namespace OpenWifi { void PostMessage(const char *topic, const std::string &key, const Poco::JSON::Object &Object, bool WrapMessage = true); - void Dispatch(const char *Topic, const std::string &Key, const std::string &Payload); [[nodiscard]] std::string WrapSystemId(const std::string & PayLoad); [[nodiscard]] inline bool Enabled() const { return KafkaEnabled_; } - uint64_t RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F); - void UnregisterTopicWatcher(const std::string &Topic, uint64_t Id); - void Topics(std::vector &T); + inline std::uint64_t RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F) { + return ConsumerThr_.RegisterTopicWatcher(Topic,F); + } + inline void UnregisterTopicWatcher(const std::string &Topic, uint64_t Id) { + return ConsumerThr_.UnregisterTopicWatcher(Topic,Id); + } private: bool KafkaEnabled_ = false; std::string SystemInfoWrapper_; KafkaProducer ProducerThr_; KafkaConsumer ConsumerThr_; - KafkaDispatcher Dispatcher_; void PartitionAssignment(const cppkafka::TopicPartitionList &partitions); void PartitionRevocation(const cppkafka::TopicPartitionList &partitions);