diff --git a/include/kafka/addons/KafkaRecoverableProducer.h b/include/kafka/addons/KafkaRecoverableProducer.h index 018d7c3f..e0d3b4cc 100644 --- a/include/kafka/addons/KafkaRecoverableProducer.h +++ b/include/kafka/addons/KafkaRecoverableProducer.h @@ -2,14 +2,12 @@ #include -#include -#include #include -#include -#include +#include +#include #include -#include +#include namespace KAFKA_API { namespace clients { namespace producer { @@ -20,8 +18,8 @@ class KafkaRecoverableProducer explicit KafkaRecoverableProducer(const Properties& properties) : _properties(properties), _running(true) { - _properties.put("enable.manual.events.poll", "true"); - _properties.put("error_cb", [this](const Error& error) { if (error.isFatal()) _fatalError = std::make_unique(error); }); + _properties.put(Config::ENABLE_MANUAL_EVENTS_POLL, "true"); + _properties.put(Config::ERROR_CB, [this](const Error& error) { if (error.isFatal()) _fatalError = std::make_unique(error); }); _producer = createProducer(); @@ -60,8 +58,8 @@ class KafkaRecoverableProducer { const std::lock_guard lock(_producerMutex); - _logLevel = level; - _producer->setLogLevel(*_logLevel); + _properties.put(Config::LOG_LEVEL, std::to_string(level)); + _producer->setLogLevel(level); } /** @@ -295,16 +293,11 @@ class KafkaRecoverableProducer std::unique_ptr createProducer() { - auto producer = std::make_unique(_properties); - - if (_logLevel) producer->setLogLevel(*_logLevel); - - return producer; + return std::make_unique(_properties); } // Configurations for producer - Properties _properties; - Optional _logLevel; + Properties _properties; std::unique_ptr _fatalError;