From 8a132352ea3124f2707f704cd4691e8d347b6ed5 Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Sun, 4 Aug 2024 12:59:52 -0400 Subject: [PATCH] Refresh OAuth token in the background So that OAUTHBEARER authentication works even when poll isn't being called regularly (e.g., in a client just used for fetching metadata). --- src/client.rs | 14 ++++++++++++-- src/error.rs | 6 ++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/client.rs b/src/client.rs index 76115d324..f65736887 100644 --- a/src/client.rs +++ b/src/client.rs @@ -28,7 +28,7 @@ use rdkafka_sys::types::*; use crate::config::{ClientConfig, NativeClientConfig, RDKafkaLogLevel}; use crate::consumer::RebalanceProtocol; -use crate::error::{IsError, KafkaError, KafkaResult}; +use crate::error::{IsError, KafkaError, KafkaResult, RDKafkaError}; use crate::groups::GroupList; use crate::log::{debug, error, info, trace, warn}; use crate::metadata::Metadata; @@ -247,7 +247,8 @@ impl Client { rdsys::rd_kafka_conf_set_oauthbearer_token_refresh_cb( native_config.ptr(), Some(native_oauth_refresh_cb::), - ) + ); + rdkafka_sys::rd_kafka_conf_enable_sasl_queue(native_config.ptr(), 1); }; } @@ -268,6 +269,15 @@ impl Client { unsafe { rdsys::rd_kafka_set_log_level(client_ptr, config.log_level as i32) }; + if C::ENABLE_REFRESH_OAUTH_TOKEN { + let ret = unsafe { + RDKafkaError::from_ptr(rdsys::rd_kafka_sasl_background_callbacks_enable(client_ptr)) + }; + if ret.is_error() { + return Err(KafkaError::OAuthConfig(ret)); + } + } + Ok(Client { native: unsafe { NativeClient::from_ptr(client_ptr) }, context, diff --git a/src/error.rs b/src/error.rs index 4940c21dd..b2518487a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -163,6 +163,8 @@ pub enum KafkaError { NoMessageReceived, /// Unexpected null pointer Nul(ffi::NulError), + /// OAuth configuration failed. + OAuthConfig(RDKafkaError), /// Offset fetch failed. OffsetFetch(RDKafkaErrorCode), /// End of partition reached. @@ -218,6 +220,7 @@ impl fmt::Debug for KafkaError { write!(f, "No message received within the given poll interval") } KafkaError::Nul(_) => write!(f, "FFI null error"), + KafkaError::OAuthConfig(err) => write!(f, "KafkaError (OAuth config error: {})", err), KafkaError::OffsetFetch(err) => write!(f, "KafkaError (Offset fetch error: {})", err), KafkaError::PartitionEOF(part_n) => write!(f, "KafkaError (Partition EOF: {})", part_n), KafkaError::PauseResume(ref err) => { @@ -259,6 +262,7 @@ impl fmt::Display for KafkaError { write!(f, "No message received within the given poll interval") } KafkaError::Nul(_) => write!(f, "FFI nul error"), + KafkaError::OAuthConfig(err) => write!(f, "OAuth config error: {}", err), KafkaError::OffsetFetch(err) => write!(f, "Offset fetch error: {}", err), KafkaError::PartitionEOF(part_n) => write!(f, "Partition EOF: {}", part_n), KafkaError::PauseResume(ref err) => write!(f, "Pause/resume error: {}", err), @@ -288,6 +292,7 @@ impl Error for KafkaError { KafkaError::MetadataFetch(err) => Some(err), KafkaError::NoMessageReceived => None, KafkaError::Nul(_) => None, + KafkaError::OAuthConfig(err) => Some(err), KafkaError::OffsetFetch(err) => Some(err), KafkaError::PartitionEOF(_) => None, KafkaError::PauseResume(_) => None, @@ -325,6 +330,7 @@ impl KafkaError { KafkaError::MetadataFetch(err) => Some(*err), KafkaError::NoMessageReceived => None, KafkaError::Nul(_) => None, + KafkaError::OAuthConfig(err) => Some(err.code()), KafkaError::OffsetFetch(err) => Some(*err), KafkaError::PartitionEOF(_) => None, KafkaError::PauseResume(_) => None,