Skip to content

Auto-generate reverse mappings for rd_kafka_resp_err_t #196

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/asynchronous_processing.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::thread;
use std::time::Duration;

use clap::{App, Arg, value_t};
use futures::{StreamExt, TryStreamExt};
use clap::{value_t, App, Arg};
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryStreamExt};
use log::info;

use rdkafka::config::ClientConfig;
Expand Down
1 change: 1 addition & 0 deletions rdkafka-sys/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ categories = ["external-ffi-bindings"]
edition = "2018"

[dependencies]
num_enum = "0.4.2"
libc = "0.2.65"
openssl-sys = { version = "0.9.48", optional = true }
libz-sys = { version = "1.0", optional = true }
Expand Down
3 changes: 2 additions & 1 deletion rdkafka-sys/src/bindings.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/* automatically generated by rust-bindgen */

use num_enum::TryFromPrimitive;
type FILE = libc::FILE;
type sockaddr = libc::sockaddr;

Expand Down Expand Up @@ -97,7 +98,7 @@ pub struct rd_kafka_topic_result_s {
}
pub type rd_kafka_topic_result_t = rd_kafka_topic_result_s;
#[repr(i32)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, TryFromPrimitive)]
pub enum rd_kafka_resp_err_t {
RD_KAFKA_RESP_ERR__BEGIN = -200,
RD_KAFKA_RESP_ERR__BAD_MSG = -199,
Expand Down
163 changes: 0 additions & 163 deletions rdkafka-sys/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,154 +3,6 @@ use crate::types::RDKafkaError::*;
use crate::types::RDKafkaRespErr;
use crate::types::RDKafkaRespErr::*;

/// This is not great. For legacy reasons some usage of rd_kafka_resp_err_t is
/// passed as an integer in some places. There seems to be no easy way in Rust to
/// match this the other way around so we do it manually.
pub fn primitive_to_rd_kafka_resp_err_t(error: i32) -> Option<RDKafkaRespErr> {
match error {
-200 => Some(RD_KAFKA_RESP_ERR__BEGIN),
-199 => Some(RD_KAFKA_RESP_ERR__BAD_MSG),
-198 => Some(RD_KAFKA_RESP_ERR__BAD_COMPRESSION),
-197 => Some(RD_KAFKA_RESP_ERR__DESTROY),
-196 => Some(RD_KAFKA_RESP_ERR__FAIL),
-195 => Some(RD_KAFKA_RESP_ERR__TRANSPORT),
-194 => Some(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE),
-193 => Some(RD_KAFKA_RESP_ERR__RESOLVE),
-192 => Some(RD_KAFKA_RESP_ERR__MSG_TIMED_OUT),
-191 => Some(RD_KAFKA_RESP_ERR__PARTITION_EOF),
-190 => Some(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION),
-189 => Some(RD_KAFKA_RESP_ERR__FS),
-188 => Some(RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC),
-187 => Some(RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN),
-186 => Some(RD_KAFKA_RESP_ERR__INVALID_ARG),
-185 => Some(RD_KAFKA_RESP_ERR__TIMED_OUT),
-184 => Some(RD_KAFKA_RESP_ERR__QUEUE_FULL),
-183 => Some(RD_KAFKA_RESP_ERR__ISR_INSUFF),
-182 => Some(RD_KAFKA_RESP_ERR__NODE_UPDATE),
-181 => Some(RD_KAFKA_RESP_ERR__SSL),
-180 => Some(RD_KAFKA_RESP_ERR__WAIT_COORD),
-179 => Some(RD_KAFKA_RESP_ERR__UNKNOWN_GROUP),
-178 => Some(RD_KAFKA_RESP_ERR__IN_PROGRESS),
-177 => Some(RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS),
-176 => Some(RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION),
-175 => Some(RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS),
-174 => Some(RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS),
-173 => Some(RD_KAFKA_RESP_ERR__CONFLICT),
-172 => Some(RD_KAFKA_RESP_ERR__STATE),
-171 => Some(RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL),
-170 => Some(RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED),
-169 => Some(RD_KAFKA_RESP_ERR__AUTHENTICATION),
-168 => Some(RD_KAFKA_RESP_ERR__NO_OFFSET),
-167 => Some(RD_KAFKA_RESP_ERR__OUTDATED),
-166 => Some(RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE),
-165 => Some(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE),
-164 => Some(RD_KAFKA_RESP_ERR__WAIT_CACHE),
-163 => Some(RD_KAFKA_RESP_ERR__INTR),
-162 => Some(RD_KAFKA_RESP_ERR__KEY_SERIALIZATION),
-161 => Some(RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION),
-160 => Some(RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION),
-159 => Some(RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION),
-158 => Some(RD_KAFKA_RESP_ERR__PARTIAL),
-157 => Some(RD_KAFKA_RESP_ERR__READ_ONLY),
-156 => Some(RD_KAFKA_RESP_ERR__NOENT),
-155 => Some(RD_KAFKA_RESP_ERR__UNDERFLOW),
-154 => Some(RD_KAFKA_RESP_ERR__INVALID_TYPE),
-153 => Some(RD_KAFKA_RESP_ERR__RETRY),
-152 => Some(RD_KAFKA_RESP_ERR__PURGE_QUEUE),
-151 => Some(RD_KAFKA_RESP_ERR__PURGE_INFLIGHT),
-150 => Some(RD_KAFKA_RESP_ERR__FATAL),
-149 => Some(RD_KAFKA_RESP_ERR__INCONSISTENT),
-148 => Some(RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE),
-147 => Some(RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED),
-100 => Some(RD_KAFKA_RESP_ERR__END),
-1 => Some(RD_KAFKA_RESP_ERR_UNKNOWN),
0 => Some(RD_KAFKA_RESP_ERR_NO_ERROR),
1 => Some(RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE),
2 => Some(RD_KAFKA_RESP_ERR_INVALID_MSG),
3 => Some(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART),
4 => Some(RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE),
5 => Some(RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE),
6 => Some(RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION),
7 => Some(RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT),
8 => Some(RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE),
9 => Some(RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE),
10 => Some(RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE),
11 => Some(RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH),
12 => Some(RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE),
13 => Some(RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION),
14 => Some(RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS),
15 => Some(RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE),
16 => Some(RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP),
17 => Some(RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION),
18 => Some(RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE),
19 => Some(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS),
20 => Some(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND),
21 => Some(RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS),
22 => Some(RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION),
23 => Some(RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL),
24 => Some(RD_KAFKA_RESP_ERR_INVALID_GROUP_ID),
25 => Some(RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID),
26 => Some(RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT),
27 => Some(RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS),
28 => Some(RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE),
29 => Some(RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED),
30 => Some(RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED),
31 => Some(RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED),
32 => Some(RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP),
33 => Some(RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM),
34 => Some(RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE),
35 => Some(RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION),
36 => Some(RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS),
37 => Some(RD_KAFKA_RESP_ERR_INVALID_PARTITIONS),
38 => Some(RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR),
39 => Some(RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT),
40 => Some(RD_KAFKA_RESP_ERR_INVALID_CONFIG),
41 => Some(RD_KAFKA_RESP_ERR_NOT_CONTROLLER),
42 => Some(RD_KAFKA_RESP_ERR_INVALID_REQUEST),
43 => Some(RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT),
44 => Some(RD_KAFKA_RESP_ERR_POLICY_VIOLATION),
45 => Some(RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER),
46 => Some(RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER),
47 => Some(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH),
48 => Some(RD_KAFKA_RESP_ERR_INVALID_TXN_STATE),
49 => Some(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING),
50 => Some(RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT),
51 => Some(RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS),
52 => Some(RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED),
53 => Some(RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED),
54 => Some(RD_KAFKA_RESP_ERR_SECURITY_DISABLED),
55 => Some(RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED),
56 => Some(RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR),
57 => Some(RD_KAFKA_RESP_ERR_LOG_DIR_NOT_FOUND),
58 => Some(RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED),
59 => Some(RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID),
60 => Some(RD_KAFKA_RESP_ERR_REASSIGNMENT_IN_PROGRESS),
61 => Some(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTH_DISABLED),
62 => Some(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_NOT_FOUND),
63 => Some(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_OWNER_MISMATCH),
64 => Some(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED),
65 => Some(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED),
66 => Some(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_EXPIRED),
67 => Some(RD_KAFKA_RESP_ERR_INVALID_PRINCIPAL_TYPE),
68 => Some(RD_KAFKA_RESP_ERR_NON_EMPTY_GROUP),
69 => Some(RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND),
70 => Some(RD_KAFKA_RESP_ERR_FETCH_SESSION_ID_NOT_FOUND),
71 => Some(RD_KAFKA_RESP_ERR_INVALID_FETCH_SESSION_EPOCH),
72 => Some(RD_KAFKA_RESP_ERR_LISTENER_NOT_FOUND),
73 => Some(RD_KAFKA_RESP_ERR_TOPIC_DELETION_DISABLED),
74 => Some(RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH),
75 => Some(RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH),
76 => Some(RD_KAFKA_RESP_ERR_UNSUPPORTED_COMPRESSION_TYPE),
77 => Some(RD_KAFKA_RESP_ERR_STALE_BROKER_EPOCH),
78 => Some(RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE),
79 => Some(RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED),
80 => Some(RD_KAFKA_RESP_ERR_PREFERRED_LEADER_NOT_AVAILABLE),
81 => Some(RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED),
// END ALL
_ => None,
}
}

pub fn rd_kafka_resp_err_t_to_rdkafka_error(err: RDKafkaRespErr) -> RDKafkaError {
match err {
RD_KAFKA_RESP_ERR__BEGIN => Begin,
Expand Down Expand Up @@ -298,18 +150,3 @@ pub fn rd_kafka_resp_err_t_to_rdkafka_error(err: RDKafkaRespErr) -> RDKafkaError
RD_KAFKA_RESP_ERR_END_ALL => EndAll,
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_conversion() {
for error_code in -299..300 {
if let Some(resp_err) = primitive_to_rd_kafka_resp_err_t(error_code) {
let kafka_error = rd_kafka_resp_err_t_to_rdkafka_error(resp_err);
assert_eq!(error_code, kafka_error as i32);
}
}
}
}
8 changes: 5 additions & 3 deletions rdkafka-sys/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
//! This module contains type aliases for types defined in the auto-generated bindings.

use std::convert::TryFrom;
use std::ffi::CStr;
use std::{error, fmt};

Expand Down Expand Up @@ -395,14 +397,14 @@ impl From<RDKafkaRespErr> for RDKafkaError {

impl fmt::Display for RDKafkaError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let description = match helpers::primitive_to_rd_kafka_resp_err_t(*self as i32) {
Some(err) => {
let description = match RDKafkaRespErr::try_from(*self as i32) {
Ok(err) => {
let cstr = unsafe { bindings::rd_kafka_err2str(err) };
unsafe { CStr::from_ptr(cstr) }
.to_string_lossy()
.into_owned()
}
None => "Unknown error".to_owned(),
Err(_) => "Unknown error".to_owned(),
};

write!(f, "{:?} ({})", self, description)
Expand Down
4 changes: 4 additions & 0 deletions rdkafka-sys/update-bindings.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ bindgen \
--whitelist-var "rd_kafka.*|RD_KAFKA_.*" \
--no-recursive-whitelist \
--blacklist-function "rd_kafka_conf_set_open_cb" \
--raw-line "use num_enum::TryFromPrimitive;" \
--raw-line "type FILE = libc::FILE;" \
--raw-line "type sockaddr = libc::sockaddr;" \
librdkafka/src/rdkafka.h -o src/bindings.rs

# Derive TryFromPrimitive for rd_kafka_resp_err_t.
perl -i -p0e 's/#\[derive\((.*)\)\]\npub enum rd_kafka_resp_err_t/#\[derive($1, TryFromPrimitive)\]\npub enum rd_kafka_resp_err_t/s' src/bindings.rs
4 changes: 2 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Common client functionalities.

use std::convert::TryFrom;
use std::ffi::{CStr, CString};
use std::mem;
use std::os::raw::c_char;
Expand Down Expand Up @@ -420,8 +421,7 @@ pub(crate) unsafe extern "C" fn native_error_cb<C: ClientContext>(
reason: *const c_char,
opaque: *mut c_void,
) {
let err = rdsys::primitive_to_rd_kafka_resp_err_t(err)
.expect("global error not an rd_kafka_resp_err_t");
let err = RDKafkaRespErr::try_from(err).expect("global error not an rd_kafka_resp_err_t");
let error = KafkaError::Global(err.into());
let reason = CStr::from_ptr(reason).to_string_lossy();

Expand Down