Skip to content

Commit

Permalink
https://telecominfraproject.atlassian.net/browse/WIFI-12954
Browse files Browse the repository at this point in the history
Signed-off-by: stephb9959 <[email protected]>
  • Loading branch information
stephb9959 committed Sep 23, 2023
1 parent 4f1fa18 commit e5a22a1
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions src/framework/KafkaManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,17 +160,19 @@ namespace OpenWifi {
});

bool AutoCommit = MicroServiceConfigGetBool("openwifi.kafka.auto.commit", false);
auto BatchSize = MicroServiceConfigGetInt("openwifi.kafka.consumer.batchsize", 20);
auto BatchSize = MicroServiceConfigGetInt("openwifi.kafka.consumer.batchsize", 100);

Types::StringVec Topics;
KafkaManager()->Topics(Topics);
Consumer.subscribe(Topics);

Running_ = true;
std::vector<cppkafka::Message> MsgVec;
while (Running_) {
try {
std::vector<cppkafka::Message> MsgVec =
Consumer.poll_batch(BatchSize, std::chrono::milliseconds(100));
MsgVec.clear();
MsgVec.reserve(BatchSize);
MsgVec = Consumer.poll_batch(BatchSize, std::chrono::milliseconds(1000));
for (auto const &Msg : MsgVec) {
if (!Msg)
continue;
Expand All @@ -180,12 +182,12 @@ namespace OpenWifi {
fmt::format("Error: {}", Msg.get_error().to_string()));
}
if (!AutoCommit)
Consumer.async_commit(Msg);
Consumer.commit(Msg);
continue;
}
KafkaManager()->Dispatch(Msg.get_topic().c_str(), Msg.get_key(), Msg.get_payload());
if (!AutoCommit)
Consumer.async_commit(Msg);
Consumer.commit(Msg);
}
} catch (const cppkafka::HandleException &E) {
poco_warning(Logger_,
Expand Down

0 comments on commit e5a22a1

Please sign in to comment.