Skip to content

Commit

Permalink
added error check for partition list (#90)
Browse files Browse the repository at this point in the history
  • Loading branch information
accelerated authored and mfontanini committed Jun 26, 2018
1 parent 6158d93 commit 577bbb0
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 14 deletions.
2 changes: 2 additions & 0 deletions include/cppkafka/kafka_handle_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ class CPPKAFKA_API KafkaHandleBase {

void set_handle(rd_kafka_t* handle);
void check_error(rd_kafka_resp_err_t error) const;
void check_error(rd_kafka_resp_err_t error,
const rd_kafka_topic_partition_list_t* list_ptr) const;
rd_kafka_conf_t* get_configuration_handle();
private:
static const std::chrono::milliseconds DEFAULT_TIMEOUT;
Expand Down
32 changes: 21 additions & 11 deletions src/consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,16 @@ void Consumer::unsubscribe() {
}

void Consumer::assign(const TopicPartitionList& topic_partitions) {
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
// If the list is empty, then we need to use a null pointer
auto handle = topic_partitions.empty() ? nullptr : topic_list_handle.get();
rd_kafka_resp_err_t error = rd_kafka_assign(get_handle(), handle);
check_error(error);
rd_kafka_resp_err_t error;
if (topic_partitions.empty()) {
error = rd_kafka_assign(get_handle(), nullptr);
check_error(error);
}
else {
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
error = rd_kafka_assign(get_handle(), topic_list_handle.get());
check_error(error, topic_list_handle.get());
}
}

void Consumer::unassign() {
Expand Down Expand Up @@ -181,15 +186,15 @@ Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions) cons
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), topic_list_handle.get(),
static_cast<int>(get_timeout().count()));
check_error(error);
check_error(error, topic_list_handle.get());
return convert(topic_list_handle);
}

TopicPartitionList
Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) const {
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_resp_err_t error = rd_kafka_position(get_handle(), topic_list_handle.get());
check_error(error);
check_error(error, topic_list_handle.get());
return convert(topic_list_handle);
}

Expand Down Expand Up @@ -287,10 +292,15 @@ void Consumer::commit(const Message& msg, bool async) {

void Consumer::commit(const TopicPartitionList* topic_partitions, bool async) {
rd_kafka_resp_err_t error;
error = rd_kafka_commit(get_handle(),
!topic_partitions ? nullptr : convert(*topic_partitions).get(),
async ? 1 : 0);
check_error(error);
if (topic_partitions == nullptr) {
error = rd_kafka_commit(get_handle(), nullptr, async ? 1 : 0);
check_error(error);
}
else {
TopicPartitionsListPtr topic_list_handle = convert(*topic_partitions);
error = rd_kafka_commit(get_handle(), topic_list_handle.get(), async ? 1 : 0);
check_error(error, topic_list_handle.get());
}
}

void Consumer::handle_rebalance(rd_kafka_resp_err_t error,
Expand Down
21 changes: 18 additions & 3 deletions src/kafka_handle_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void KafkaHandleBase::pause_partitions(const TopicPartitionList& topic_partition
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_resp_err_t error = rd_kafka_pause_partitions(get_handle(),
topic_list_handle.get());
check_error(error);
check_error(error, topic_list_handle.get());
}

void KafkaHandleBase::pause(const std::string& topic) {
Expand All @@ -72,7 +72,7 @@ void KafkaHandleBase::resume_partitions(const TopicPartitionList& topic_partitio
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_resp_err_t error = rd_kafka_resume_partitions(get_handle(),
topic_list_handle.get());
check_error(error);
check_error(error, topic_list_handle.get());
}

void KafkaHandleBase::resume(const std::string& topic) {
Expand Down Expand Up @@ -153,7 +153,7 @@ KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queri
const int timeout = static_cast<int>(timeout_ms_.count());
rd_kafka_resp_err_t result = rd_kafka_offsets_for_times(handle_.get(), topic_list_handle.get(),
timeout);
check_error(result);
check_error(result, topic_list_handle.get());
return convert(topic_list_handle);
}

Expand Down Expand Up @@ -228,6 +228,21 @@ void KafkaHandleBase::check_error(rd_kafka_resp_err_t error) const {
}
}

void KafkaHandleBase::check_error(rd_kafka_resp_err_t error,
const rd_kafka_topic_partition_list_t* list_ptr) const {
if (error != RD_KAFKA_RESP_ERR_NO_ERROR) {
throw HandleException(error);
}
if (list_ptr) {
//check if any partition has errors
for (int i = 0; i < list_ptr->cnt; ++i) {
if (list_ptr->elems[i].err != RD_KAFKA_RESP_ERR_NO_ERROR) {
throw HandleException(error);
}
}
}
}

rd_kafka_conf_t* KafkaHandleBase::get_configuration_handle() {
return config_.get_handle();
}
Expand Down

0 comments on commit 577bbb0

Please sign in to comment.