Skip to content

Commit

Permalink
Merge pull request #44 from xtuml/feature/kafka-log-appender
Browse files Browse the repository at this point in the history
Feature/kafka log appender
  • Loading branch information
leviathan747 authored Mar 5, 2024
2 parents 5617faa + 84de394 commit b71186a
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 2 deletions.
9 changes: 8 additions & 1 deletion core-cpp/kafka/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
find_package(CppKafka CONFIG REQUIRED QUIET)
find_package(RdKafka CONFIG REQUIRED QUIET)
find_package(libuuid REQUIRED)
find_package(log4cplus REQUIRED)
find_package(fmt REQUIRED)
find_package(nlohmann_json REQUIRED)

simple_add_shared_library (
NAME Kafka
Expand All @@ -11,12 +14,16 @@ simple_add_shared_library (
ProcessHandler.cc
Producer.cc
ServiceHandler.cc
LogAppender.cc
LINKS swa
CppKafka::cppkafka
RdKafka::rdkafka
libuuid::libuuid
fmt::fmt
log4cplus::log4cplus
nlohmann_json::nlohmann_json
EXPORT MaslCore
INCLUDES
INCLUDES
kafka/BufferedIO.hh
kafka/Consumer.hh
kafka/Kafka.hh
Expand Down
8 changes: 7 additions & 1 deletion core-cpp/kafka/src/Kafka.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include "swa/CommandLine.hh"
#include "swa/Process.hh"

#include "LogAppender.hh"

#include <thread>

namespace Kafka {
Expand All @@ -25,7 +27,11 @@ bool startup() {

struct Init {
Init() {
// register command line arguments
// register Kafka log appender
log4cplus::spi::getAppenderFactoryRegistry().put(std::make_unique<Kafka::KafkaAppenderFactory>());


// register command line arguments
SWA::CommandLine::getInstance().registerOption(SWA::NamedOption(BrokersOption, std::string("Kafka Brokers"), true, "brokerList", true, false));
SWA::CommandLine::getInstance().registerOption(SWA::NamedOption(GroupIdOption, std::string("Kafka Group ID"), false, "groupId", true, false));
SWA::CommandLine::getInstance().registerOption(SWA::NamedOption(NamespaceOption, std::string("Kafka Topic Namespace"), false, "namespace", true, false));
Expand Down
57 changes: 57 additions & 0 deletions core-cpp/kafka/src/LogAppender.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#include "LogAppender.hh"

#include <log4cplus/initializer.h>
#include <cppkafka/producer.h>
#include <cppkafka/configuration.h>
#include <nlohmann/json.hpp>
#include <fmt/format.h>
#include <fmt/chrono.h>
#include "swa/CommandLine.hh"
#include "kafka/Kafka.hh"


namespace Kafka {
class KafkaAppender : public log4cplus::Appender {

public:
explicit KafkaAppender(const log4cplus::helpers::Properties &props)
: Appender(props),
config({{"metadata.broker.list", props.getProperty("broker", SWA::CommandLine::getInstance().getOption(BrokersOption))}}),
producer(config),
messageBuilder(props.getProperty("topic", "xtuml.logging")) {
}

void close() override {
producer.flush();
}

void append(const log4cplus::spi::InternalLoggingEvent &event) override {
std::string message = nlohmann::json(
{
{"timestamp", fmt::format("{:%FT%TZ}", event.getTimestamp())},
{"logger", event.getLoggerName()},
{"level", log4cplus::getLogLevelManager().toString(event.getLogLevel())},
{"file", event.getFile()},
{"function", event.getFunction()},
{"line", event.getLine()},
{"message", event.getMessage()},
}
).dump();
producer.produce(messageBuilder.payload(message));
}

~KafkaAppender() override {
destructorImpl();
}

private:
cppkafka::Configuration config;
cppkafka::Producer producer;
cppkafka::MessageBuilder messageBuilder;
};

log4cplus::SharedAppenderPtr KafkaAppenderFactory::createObject(const log4cplus::helpers::Properties &props) {
return log4cplus::SharedAppenderPtr(new KafkaAppender(props));
};

}
24 changes: 24 additions & 0 deletions core-cpp/kafka/src/LogAppender.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#ifndef Kafka_LogAppender_HH
#define Kafka_LogAppender_HH

#include <log4cplus/log4cplus.h>
#include <string>

namespace Kafka {

class KafkaAppenderFactory : public log4cplus::spi::AppenderFactory {
public:
[[nodiscard]] const log4cplus::tstring &getTypeName() const override {
return name;
}

log4cplus::SharedAppenderPtr createObject(const log4cplus::helpers::Properties &props) override;

private:
std::string name = "KafkaAppender";
};


}

#endif

0 comments on commit b71186a

Please sign in to comment.