Skip to content

Commit c1af7e0

Browse files
igozalibenesch
authored andcommitted
Use num_enum to auto-generate reverse mappings for rd_kafka_resp_err_t
Signed-off-by: Ivan Gozali <[email protected]>
1 parent d159783 commit c1af7e0

File tree

7 files changed

+16
-171
lines changed

7 files changed

+16
-171
lines changed

examples/asynchronous_processing.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use std::thread;
22
use std::time::Duration;
33

4-
use clap::{App, Arg, value_t};
5-
use futures::{StreamExt, TryStreamExt};
4+
use clap::{value_t, App, Arg};
65
use futures::stream::FuturesUnordered;
6+
use futures::{StreamExt, TryStreamExt};
77
use log::info;
88

99
use rdkafka::config::ClientConfig;

rdkafka-sys/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ categories = ["external-ffi-bindings"]
1212
edition = "2018"
1313

1414
[dependencies]
15+
num_enum = "0.4.2"
1516
libc = "0.2.65"
1617
openssl-sys = { version = "0.9.48", optional = true }
1718
libz-sys = { version = "1.0", optional = true }

rdkafka-sys/src/bindings.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
/* automatically generated by rust-bindgen */
22

3+
use num_enum::TryFromPrimitive;
34
type FILE = libc::FILE;
45
type sockaddr = libc::sockaddr;
56

@@ -97,7 +98,7 @@ pub struct rd_kafka_topic_result_s {
9798
}
9899
pub type rd_kafka_topic_result_t = rd_kafka_topic_result_s;
99100
#[repr(i32)]
100-
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
101+
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, TryFromPrimitive)]
101102
pub enum rd_kafka_resp_err_t {
102103
RD_KAFKA_RESP_ERR__BEGIN = -200,
103104
RD_KAFKA_RESP_ERR__BAD_MSG = -199,

rdkafka-sys/src/helpers.rs

Lines changed: 0 additions & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -3,154 +3,6 @@ use crate::types::RDKafkaError::*;
33
use crate::types::RDKafkaRespErr;
44
use crate::types::RDKafkaRespErr::*;
55

6-
/// This is not great. For legacy reasons some usage of rd_kafka_resp_err_t is
7-
/// passed as an integer in some places. There seems to be no easy way in Rust to
8-
/// match this the other way around so we do it manually.
9-
pub fn primitive_to_rd_kafka_resp_err_t(error: i32) -> Option<RDKafkaRespErr> {
10-
match error {
11-
-200 => Some(RD_KAFKA_RESP_ERR__BEGIN),
12-
-199 => Some(RD_KAFKA_RESP_ERR__BAD_MSG),
13-
-198 => Some(RD_KAFKA_RESP_ERR__BAD_COMPRESSION),
14-
-197 => Some(RD_KAFKA_RESP_ERR__DESTROY),
15-
-196 => Some(RD_KAFKA_RESP_ERR__FAIL),
16-
-195 => Some(RD_KAFKA_RESP_ERR__TRANSPORT),
17-
-194 => Some(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE),
18-
-193 => Some(RD_KAFKA_RESP_ERR__RESOLVE),
19-
-192 => Some(RD_KAFKA_RESP_ERR__MSG_TIMED_OUT),
20-
-191 => Some(RD_KAFKA_RESP_ERR__PARTITION_EOF),
21-
-190 => Some(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION),
22-
-189 => Some(RD_KAFKA_RESP_ERR__FS),
23-
-188 => Some(RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC),
24-
-187 => Some(RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN),
25-
-186 => Some(RD_KAFKA_RESP_ERR__INVALID_ARG),
26-
-185 => Some(RD_KAFKA_RESP_ERR__TIMED_OUT),
27-
-184 => Some(RD_KAFKA_RESP_ERR__QUEUE_FULL),
28-
-183 => Some(RD_KAFKA_RESP_ERR__ISR_INSUFF),
29-
-182 => Some(RD_KAFKA_RESP_ERR__NODE_UPDATE),
30-
-181 => Some(RD_KAFKA_RESP_ERR__SSL),
31-
-180 => Some(RD_KAFKA_RESP_ERR__WAIT_COORD),
32-
-179 => Some(RD_KAFKA_RESP_ERR__UNKNOWN_GROUP),
33-
-178 => Some(RD_KAFKA_RESP_ERR__IN_PROGRESS),
34-
-177 => Some(RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS),
35-
-176 => Some(RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION),
36-
-175 => Some(RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS),
37-
-174 => Some(RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS),
38-
-173 => Some(RD_KAFKA_RESP_ERR__CONFLICT),
39-
-172 => Some(RD_KAFKA_RESP_ERR__STATE),
40-
-171 => Some(RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL),
41-
-170 => Some(RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED),
42-
-169 => Some(RD_KAFKA_RESP_ERR__AUTHENTICATION),
43-
-168 => Some(RD_KAFKA_RESP_ERR__NO_OFFSET),
44-
-167 => Some(RD_KAFKA_RESP_ERR__OUTDATED),
45-
-166 => Some(RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE),
46-
-165 => Some(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE),
47-
-164 => Some(RD_KAFKA_RESP_ERR__WAIT_CACHE),
48-
-163 => Some(RD_KAFKA_RESP_ERR__INTR),
49-
-162 => Some(RD_KAFKA_RESP_ERR__KEY_SERIALIZATION),
50-
-161 => Some(RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION),
51-
-160 => Some(RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION),
52-
-159 => Some(RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION),
53-
-158 => Some(RD_KAFKA_RESP_ERR__PARTIAL),
54-
-157 => Some(RD_KAFKA_RESP_ERR__READ_ONLY),
55-
-156 => Some(RD_KAFKA_RESP_ERR__NOENT),
56-
-155 => Some(RD_KAFKA_RESP_ERR__UNDERFLOW),
57-
-154 => Some(RD_KAFKA_RESP_ERR__INVALID_TYPE),
58-
-153 => Some(RD_KAFKA_RESP_ERR__RETRY),
59-
-152 => Some(RD_KAFKA_RESP_ERR__PURGE_QUEUE),
60-
-151 => Some(RD_KAFKA_RESP_ERR__PURGE_INFLIGHT),
61-
-150 => Some(RD_KAFKA_RESP_ERR__FATAL),
62-
-149 => Some(RD_KAFKA_RESP_ERR__INCONSISTENT),
63-
-148 => Some(RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE),
64-
-147 => Some(RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED),
65-
-100 => Some(RD_KAFKA_RESP_ERR__END),
66-
-1 => Some(RD_KAFKA_RESP_ERR_UNKNOWN),
67-
0 => Some(RD_KAFKA_RESP_ERR_NO_ERROR),
68-
1 => Some(RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE),
69-
2 => Some(RD_KAFKA_RESP_ERR_INVALID_MSG),
70-
3 => Some(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART),
71-
4 => Some(RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE),
72-
5 => Some(RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE),
73-
6 => Some(RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION),
74-
7 => Some(RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT),
75-
8 => Some(RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE),
76-
9 => Some(RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE),
77-
10 => Some(RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE),
78-
11 => Some(RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH),
79-
12 => Some(RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE),
80-
13 => Some(RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION),
81-
14 => Some(RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS),
82-
15 => Some(RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE),
83-
16 => Some(RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP),
84-
17 => Some(RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION),
85-
18 => Some(RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE),
86-
19 => Some(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS),
87-
20 => Some(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND),
88-
21 => Some(RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS),
89-
22 => Some(RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION),
90-
23 => Some(RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL),
91-
24 => Some(RD_KAFKA_RESP_ERR_INVALID_GROUP_ID),
92-
25 => Some(RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID),
93-
26 => Some(RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT),
94-
27 => Some(RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS),
95-
28 => Some(RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE),
96-
29 => Some(RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED),
97-
30 => Some(RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED),
98-
31 => Some(RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED),
99-
32 => Some(RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP),
100-
33 => Some(RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM),
101-
34 => Some(RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE),
102-
35 => Some(RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION),
103-
36 => Some(RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS),
104-
37 => Some(RD_KAFKA_RESP_ERR_INVALID_PARTITIONS),
105-
38 => Some(RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR),
106-
39 => Some(RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT),
107-
40 => Some(RD_KAFKA_RESP_ERR_INVALID_CONFIG),
108-
41 => Some(RD_KAFKA_RESP_ERR_NOT_CONTROLLER),
109-
42 => Some(RD_KAFKA_RESP_ERR_INVALID_REQUEST),
110-
43 => Some(RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT),
111-
44 => Some(RD_KAFKA_RESP_ERR_POLICY_VIOLATION),
112-
45 => Some(RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER),
113-
46 => Some(RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER),
114-
47 => Some(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH),
115-
48 => Some(RD_KAFKA_RESP_ERR_INVALID_TXN_STATE),
116-
49 => Some(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING),
117-
50 => Some(RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT),
118-
51 => Some(RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS),
119-
52 => Some(RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED),
120-
53 => Some(RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED),
121-
54 => Some(RD_KAFKA_RESP_ERR_SECURITY_DISABLED),
122-
55 => Some(RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED),
123-
56 => Some(RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR),
124-
57 => Some(RD_KAFKA_RESP_ERR_LOG_DIR_NOT_FOUND),
125-
58 => Some(RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED),
126-
59 => Some(RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID),
127-
60 => Some(RD_KAFKA_RESP_ERR_REASSIGNMENT_IN_PROGRESS),
128-
61 => Some(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTH_DISABLED),
129-
62 => Some(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_NOT_FOUND),
130-
63 => Some(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_OWNER_MISMATCH),
131-
64 => Some(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED),
132-
65 => Some(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED),
133-
66 => Some(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_EXPIRED),
134-
67 => Some(RD_KAFKA_RESP_ERR_INVALID_PRINCIPAL_TYPE),
135-
68 => Some(RD_KAFKA_RESP_ERR_NON_EMPTY_GROUP),
136-
69 => Some(RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND),
137-
70 => Some(RD_KAFKA_RESP_ERR_FETCH_SESSION_ID_NOT_FOUND),
138-
71 => Some(RD_KAFKA_RESP_ERR_INVALID_FETCH_SESSION_EPOCH),
139-
72 => Some(RD_KAFKA_RESP_ERR_LISTENER_NOT_FOUND),
140-
73 => Some(RD_KAFKA_RESP_ERR_TOPIC_DELETION_DISABLED),
141-
74 => Some(RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH),
142-
75 => Some(RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH),
143-
76 => Some(RD_KAFKA_RESP_ERR_UNSUPPORTED_COMPRESSION_TYPE),
144-
77 => Some(RD_KAFKA_RESP_ERR_STALE_BROKER_EPOCH),
145-
78 => Some(RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE),
146-
79 => Some(RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED),
147-
80 => Some(RD_KAFKA_RESP_ERR_PREFERRED_LEADER_NOT_AVAILABLE),
148-
81 => Some(RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED),
149-
// END ALL
150-
_ => None,
151-
}
152-
}
153-
1546
pub fn rd_kafka_resp_err_t_to_rdkafka_error(err: RDKafkaRespErr) -> RDKafkaError {
1557
match err {
1568
RD_KAFKA_RESP_ERR__BEGIN => Begin,
@@ -298,18 +150,3 @@ pub fn rd_kafka_resp_err_t_to_rdkafka_error(err: RDKafkaRespErr) -> RDKafkaError
298150
RD_KAFKA_RESP_ERR_END_ALL => EndAll,
299151
}
300152
}
301-
302-
#[cfg(test)]
303-
mod tests {
304-
use super::*;
305-
306-
#[test]
307-
fn test_conversion() {
308-
for error_code in -299..300 {
309-
if let Some(resp_err) = primitive_to_rd_kafka_resp_err_t(error_code) {
310-
let kafka_error = rd_kafka_resp_err_t_to_rdkafka_error(resp_err);
311-
assert_eq!(error_code, kafka_error as i32);
312-
}
313-
}
314-
}
315-
}

rdkafka-sys/src/types.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
//! This module contains type aliases for types defined in the auto-generated bindings.
2+
3+
use std::convert::TryFrom;
24
use std::ffi::CStr;
35
use std::{error, fmt};
46

@@ -395,14 +397,14 @@ impl From<RDKafkaRespErr> for RDKafkaError {
395397

396398
impl fmt::Display for RDKafkaError {
397399
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
398-
let description = match helpers::primitive_to_rd_kafka_resp_err_t(*self as i32) {
399-
Some(err) => {
400+
let description = match RDKafkaRespErr::try_from(*self as i32) {
401+
Ok(err) => {
400402
let cstr = unsafe { bindings::rd_kafka_err2str(err) };
401403
unsafe { CStr::from_ptr(cstr) }
402404
.to_string_lossy()
403405
.into_owned()
404406
}
405-
None => "Unknown error".to_owned(),
407+
Err(_) => "Unknown error".to_owned(),
406408
};
407409

408410
write!(f, "{:?} ({})", self, description)

rdkafka-sys/update-bindings.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ bindgen \
1414
--whitelist-var "rd_kafka.*|RD_KAFKA_.*" \
1515
--no-recursive-whitelist \
1616
--blacklist-function "rd_kafka_conf_set_open_cb" \
17+
--raw-line "use num_enum::TryFromPrimitive;" \
1718
--raw-line "type FILE = libc::FILE;" \
1819
--raw-line "type sockaddr = libc::sockaddr;" \
1920
librdkafka/src/rdkafka.h -o src/bindings.rs
21+
22+
# Derive TryFromPrimitive for rd_kafka_resp_err_t.
23+
perl -i -p0e 's/#\[derive\((.*)\)\]\npub enum rd_kafka_resp_err_t/#\[derive($1, TryFromPrimitive)\]\npub enum rd_kafka_resp_err_t/s' src/bindings.rs

src/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! Common client functionalities.
22
3+
use std::convert::TryFrom;
34
use std::ffi::{CStr, CString};
45
use std::mem;
56
use std::os::raw::c_char;
@@ -420,8 +421,7 @@ pub(crate) unsafe extern "C" fn native_error_cb<C: ClientContext>(
420421
reason: *const c_char,
421422
opaque: *mut c_void,
422423
) {
423-
let err = rdsys::primitive_to_rd_kafka_resp_err_t(err)
424-
.expect("global error not an rd_kafka_resp_err_t");
424+
let err = RDKafkaRespErr::try_from(err).expect("global error not an rd_kafka_resp_err_t");
425425
let error = KafkaError::Global(err.into());
426426
let reason = CStr::from_ptr(reason).to_string_lossy();
427427

0 commit comments

Comments
 (0)