diff --git a/examples/asynchronous_processing.rs b/examples/asynchronous_processing.rs index 40c785b41..ddb562695 100644 --- a/examples/asynchronous_processing.rs +++ b/examples/asynchronous_processing.rs @@ -1,9 +1,9 @@ use std::thread; use std::time::Duration; -use clap::{App, Arg, value_t}; -use futures::{StreamExt, TryStreamExt}; +use clap::{value_t, App, Arg}; use futures::stream::FuturesUnordered; +use futures::{StreamExt, TryStreamExt}; use log::info; use rdkafka::config::ClientConfig; diff --git a/rdkafka-sys/Cargo.toml b/rdkafka-sys/Cargo.toml index 47788665b..62c420537 100644 --- a/rdkafka-sys/Cargo.toml +++ b/rdkafka-sys/Cargo.toml @@ -12,6 +12,7 @@ categories = ["external-ffi-bindings"] edition = "2018" [dependencies] +num_enum = "0.4.2" libc = "0.2.65" openssl-sys = { version = "0.9.48", optional = true } libz-sys = { version = "1.0", optional = true } diff --git a/rdkafka-sys/src/bindings.rs b/rdkafka-sys/src/bindings.rs index ba3dde6b1..ed6a71bd3 100644 --- a/rdkafka-sys/src/bindings.rs +++ b/rdkafka-sys/src/bindings.rs @@ -1,5 +1,6 @@ /* automatically generated by rust-bindgen */ +use num_enum::TryFromPrimitive; type FILE = libc::FILE; type sockaddr = libc::sockaddr; @@ -97,7 +98,7 @@ pub struct rd_kafka_topic_result_s { } pub type rd_kafka_topic_result_t = rd_kafka_topic_result_s; #[repr(i32)] -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, TryFromPrimitive)] pub enum rd_kafka_resp_err_t { RD_KAFKA_RESP_ERR__BEGIN = -200, RD_KAFKA_RESP_ERR__BAD_MSG = -199, diff --git a/rdkafka-sys/src/helpers.rs b/rdkafka-sys/src/helpers.rs index 3e99ecca1..be46fe015 100644 --- a/rdkafka-sys/src/helpers.rs +++ b/rdkafka-sys/src/helpers.rs @@ -3,154 +3,6 @@ use crate::types::RDKafkaError::*; use crate::types::RDKafkaRespErr; use crate::types::RDKafkaRespErr::*; -/// This is not great. For legacy reasons some usage of rd_kafka_resp_err_t is -/// passed as an integer in some places. There seems to be no easy way in Rust to -/// match this the other way around so we do it manually. -pub fn primitive_to_rd_kafka_resp_err_t(error: i32) -> Option { - match error { - -200 => Some(RD_KAFKA_RESP_ERR__BEGIN), - -199 => Some(RD_KAFKA_RESP_ERR__BAD_MSG), - -198 => Some(RD_KAFKA_RESP_ERR__BAD_COMPRESSION), - -197 => Some(RD_KAFKA_RESP_ERR__DESTROY), - -196 => Some(RD_KAFKA_RESP_ERR__FAIL), - -195 => Some(RD_KAFKA_RESP_ERR__TRANSPORT), - -194 => Some(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE), - -193 => Some(RD_KAFKA_RESP_ERR__RESOLVE), - -192 => Some(RD_KAFKA_RESP_ERR__MSG_TIMED_OUT), - -191 => Some(RD_KAFKA_RESP_ERR__PARTITION_EOF), - -190 => Some(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION), - -189 => Some(RD_KAFKA_RESP_ERR__FS), - -188 => Some(RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC), - -187 => Some(RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN), - -186 => Some(RD_KAFKA_RESP_ERR__INVALID_ARG), - -185 => Some(RD_KAFKA_RESP_ERR__TIMED_OUT), - -184 => Some(RD_KAFKA_RESP_ERR__QUEUE_FULL), - -183 => Some(RD_KAFKA_RESP_ERR__ISR_INSUFF), - -182 => Some(RD_KAFKA_RESP_ERR__NODE_UPDATE), - -181 => Some(RD_KAFKA_RESP_ERR__SSL), - -180 => Some(RD_KAFKA_RESP_ERR__WAIT_COORD), - -179 => Some(RD_KAFKA_RESP_ERR__UNKNOWN_GROUP), - -178 => Some(RD_KAFKA_RESP_ERR__IN_PROGRESS), - -177 => Some(RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS), - -176 => Some(RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION), - -175 => Some(RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS), - -174 => Some(RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS), - -173 => Some(RD_KAFKA_RESP_ERR__CONFLICT), - -172 => Some(RD_KAFKA_RESP_ERR__STATE), - -171 => Some(RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL), - -170 => Some(RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED), - -169 => Some(RD_KAFKA_RESP_ERR__AUTHENTICATION), - -168 => Some(RD_KAFKA_RESP_ERR__NO_OFFSET), - -167 => Some(RD_KAFKA_RESP_ERR__OUTDATED), - -166 => Some(RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE), - -165 => Some(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE), - -164 => Some(RD_KAFKA_RESP_ERR__WAIT_CACHE), - -163 => Some(RD_KAFKA_RESP_ERR__INTR), - -162 => Some(RD_KAFKA_RESP_ERR__KEY_SERIALIZATION), - -161 => Some(RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION), - -160 => Some(RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION), - -159 => Some(RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION), - -158 => Some(RD_KAFKA_RESP_ERR__PARTIAL), - -157 => Some(RD_KAFKA_RESP_ERR__READ_ONLY), - -156 => Some(RD_KAFKA_RESP_ERR__NOENT), - -155 => Some(RD_KAFKA_RESP_ERR__UNDERFLOW), - -154 => Some(RD_KAFKA_RESP_ERR__INVALID_TYPE), - -153 => Some(RD_KAFKA_RESP_ERR__RETRY), - -152 => Some(RD_KAFKA_RESP_ERR__PURGE_QUEUE), - -151 => Some(RD_KAFKA_RESP_ERR__PURGE_INFLIGHT), - -150 => Some(RD_KAFKA_RESP_ERR__FATAL), - -149 => Some(RD_KAFKA_RESP_ERR__INCONSISTENT), - -148 => Some(RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE), - -147 => Some(RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED), - -100 => Some(RD_KAFKA_RESP_ERR__END), - -1 => Some(RD_KAFKA_RESP_ERR_UNKNOWN), - 0 => Some(RD_KAFKA_RESP_ERR_NO_ERROR), - 1 => Some(RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE), - 2 => Some(RD_KAFKA_RESP_ERR_INVALID_MSG), - 3 => Some(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART), - 4 => Some(RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE), - 5 => Some(RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE), - 6 => Some(RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION), - 7 => Some(RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT), - 8 => Some(RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE), - 9 => Some(RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE), - 10 => Some(RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE), - 11 => Some(RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH), - 12 => Some(RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE), - 13 => Some(RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION), - 14 => Some(RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS), - 15 => Some(RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE), - 16 => Some(RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP), - 17 => Some(RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION), - 18 => Some(RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE), - 19 => Some(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS), - 20 => Some(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND), - 21 => Some(RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS), - 22 => Some(RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION), - 23 => Some(RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL), - 24 => Some(RD_KAFKA_RESP_ERR_INVALID_GROUP_ID), - 25 => Some(RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID), - 26 => Some(RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT), - 27 => Some(RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS), - 28 => Some(RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE), - 29 => Some(RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED), - 30 => Some(RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED), - 31 => Some(RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED), - 32 => Some(RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP), - 33 => Some(RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM), - 34 => Some(RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE), - 35 => Some(RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION), - 36 => Some(RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS), - 37 => Some(RD_KAFKA_RESP_ERR_INVALID_PARTITIONS), - 38 => Some(RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR), - 39 => Some(RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT), - 40 => Some(RD_KAFKA_RESP_ERR_INVALID_CONFIG), - 41 => Some(RD_KAFKA_RESP_ERR_NOT_CONTROLLER), - 42 => Some(RD_KAFKA_RESP_ERR_INVALID_REQUEST), - 43 => Some(RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT), - 44 => Some(RD_KAFKA_RESP_ERR_POLICY_VIOLATION), - 45 => Some(RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER), - 46 => Some(RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER), - 47 => Some(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH), - 48 => Some(RD_KAFKA_RESP_ERR_INVALID_TXN_STATE), - 49 => Some(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING), - 50 => Some(RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT), - 51 => Some(RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS), - 52 => Some(RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED), - 53 => Some(RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED), - 54 => Some(RD_KAFKA_RESP_ERR_SECURITY_DISABLED), - 55 => Some(RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED), - 56 => Some(RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR), - 57 => Some(RD_KAFKA_RESP_ERR_LOG_DIR_NOT_FOUND), - 58 => Some(RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED), - 59 => Some(RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID), - 60 => Some(RD_KAFKA_RESP_ERR_REASSIGNMENT_IN_PROGRESS), - 61 => Some(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTH_DISABLED), - 62 => Some(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_NOT_FOUND), - 63 => Some(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_OWNER_MISMATCH), - 64 => Some(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED), - 65 => Some(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED), - 66 => Some(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_EXPIRED), - 67 => Some(RD_KAFKA_RESP_ERR_INVALID_PRINCIPAL_TYPE), - 68 => Some(RD_KAFKA_RESP_ERR_NON_EMPTY_GROUP), - 69 => Some(RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND), - 70 => Some(RD_KAFKA_RESP_ERR_FETCH_SESSION_ID_NOT_FOUND), - 71 => Some(RD_KAFKA_RESP_ERR_INVALID_FETCH_SESSION_EPOCH), - 72 => Some(RD_KAFKA_RESP_ERR_LISTENER_NOT_FOUND), - 73 => Some(RD_KAFKA_RESP_ERR_TOPIC_DELETION_DISABLED), - 74 => Some(RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH), - 75 => Some(RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH), - 76 => Some(RD_KAFKA_RESP_ERR_UNSUPPORTED_COMPRESSION_TYPE), - 77 => Some(RD_KAFKA_RESP_ERR_STALE_BROKER_EPOCH), - 78 => Some(RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE), - 79 => Some(RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED), - 80 => Some(RD_KAFKA_RESP_ERR_PREFERRED_LEADER_NOT_AVAILABLE), - 81 => Some(RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED), - // END ALL - _ => None, - } -} - pub fn rd_kafka_resp_err_t_to_rdkafka_error(err: RDKafkaRespErr) -> RDKafkaError { match err { RD_KAFKA_RESP_ERR__BEGIN => Begin, @@ -298,18 +150,3 @@ pub fn rd_kafka_resp_err_t_to_rdkafka_error(err: RDKafkaRespErr) -> RDKafkaError RD_KAFKA_RESP_ERR_END_ALL => EndAll, } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_conversion() { - for error_code in -299..300 { - if let Some(resp_err) = primitive_to_rd_kafka_resp_err_t(error_code) { - let kafka_error = rd_kafka_resp_err_t_to_rdkafka_error(resp_err); - assert_eq!(error_code, kafka_error as i32); - } - } - } -} diff --git a/rdkafka-sys/src/types.rs b/rdkafka-sys/src/types.rs index 76a3ac471..6ecd691d8 100644 --- a/rdkafka-sys/src/types.rs +++ b/rdkafka-sys/src/types.rs @@ -1,4 +1,6 @@ //! This module contains type aliases for types defined in the auto-generated bindings. + +use std::convert::TryFrom; use std::ffi::CStr; use std::{error, fmt}; @@ -395,14 +397,14 @@ impl From for RDKafkaError { impl fmt::Display for RDKafkaError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let description = match helpers::primitive_to_rd_kafka_resp_err_t(*self as i32) { - Some(err) => { + let description = match RDKafkaRespErr::try_from(*self as i32) { + Ok(err) => { let cstr = unsafe { bindings::rd_kafka_err2str(err) }; unsafe { CStr::from_ptr(cstr) } .to_string_lossy() .into_owned() } - None => "Unknown error".to_owned(), + Err(_) => "Unknown error".to_owned(), }; write!(f, "{:?} ({})", self, description) diff --git a/rdkafka-sys/update-bindings.sh b/rdkafka-sys/update-bindings.sh index c40d2fb49..c9fb77eab 100755 --- a/rdkafka-sys/update-bindings.sh +++ b/rdkafka-sys/update-bindings.sh @@ -14,6 +14,10 @@ bindgen \ --whitelist-var "rd_kafka.*|RD_KAFKA_.*" \ --no-recursive-whitelist \ --blacklist-function "rd_kafka_conf_set_open_cb" \ + --raw-line "use num_enum::TryFromPrimitive;" \ --raw-line "type FILE = libc::FILE;" \ --raw-line "type sockaddr = libc::sockaddr;" \ librdkafka/src/rdkafka.h -o src/bindings.rs + +# Derive TryFromPrimitive for rd_kafka_resp_err_t. +perl -i -p0e 's/#\[derive\((.*)\)\]\npub enum rd_kafka_resp_err_t/#\[derive($1, TryFromPrimitive)\]\npub enum rd_kafka_resp_err_t/s' src/bindings.rs diff --git a/src/client.rs b/src/client.rs index 0e5333a78..eddbdeb77 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,5 +1,6 @@ //! Common client functionalities. +use std::convert::TryFrom; use std::ffi::{CStr, CString}; use std::mem; use std::os::raw::c_char; @@ -420,8 +421,7 @@ pub(crate) unsafe extern "C" fn native_error_cb( reason: *const c_char, opaque: *mut c_void, ) { - let err = rdsys::primitive_to_rd_kafka_resp_err_t(err) - .expect("global error not an rd_kafka_resp_err_t"); + let err = RDKafkaRespErr::try_from(err).expect("global error not an rd_kafka_resp_err_t"); let error = KafkaError::Global(err.into()); let reason = CStr::from_ptr(reason).to_string_lossy();