Skip to content

Commit

Permalink
Refresh OAuth token in the background
Browse files Browse the repository at this point in the history
So that OAUTHBEARER authentication works even when poll isn't being
called regularly (e.g., in a client just used for fetching metadata).
  • Loading branch information
benesch committed Aug 4, 2024
1 parent 1e27332 commit 8a13235
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
14 changes: 12 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use rdkafka_sys::types::*;

use crate::config::{ClientConfig, NativeClientConfig, RDKafkaLogLevel};
use crate::consumer::RebalanceProtocol;
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::error::{IsError, KafkaError, KafkaResult, RDKafkaError};
use crate::groups::GroupList;
use crate::log::{debug, error, info, trace, warn};
use crate::metadata::Metadata;
Expand Down Expand Up @@ -247,7 +247,8 @@ impl<C: ClientContext + 'static> Client<C> {
rdsys::rd_kafka_conf_set_oauthbearer_token_refresh_cb(
native_config.ptr(),
Some(native_oauth_refresh_cb::<C>),
)
);
rdkafka_sys::rd_kafka_conf_enable_sasl_queue(native_config.ptr(), 1);
};
}

Expand All @@ -268,6 +269,15 @@ impl<C: ClientContext + 'static> Client<C> {

unsafe { rdsys::rd_kafka_set_log_level(client_ptr, config.log_level as i32) };

if C::ENABLE_REFRESH_OAUTH_TOKEN {
let ret = unsafe {
RDKafkaError::from_ptr(rdsys::rd_kafka_sasl_background_callbacks_enable(client_ptr))
};
if ret.is_error() {
return Err(KafkaError::OAuthConfig(ret));
}
}

Ok(Client {
native: unsafe { NativeClient::from_ptr(client_ptr) },
context,
Expand Down
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ pub enum KafkaError {
NoMessageReceived,
/// Unexpected null pointer
Nul(ffi::NulError),
/// OAuth configuration failed.
OAuthConfig(RDKafkaError),
/// Offset fetch failed.
OffsetFetch(RDKafkaErrorCode),
/// End of partition reached.
Expand Down Expand Up @@ -218,6 +220,7 @@ impl fmt::Debug for KafkaError {
write!(f, "No message received within the given poll interval")
}
KafkaError::Nul(_) => write!(f, "FFI null error"),
KafkaError::OAuthConfig(err) => write!(f, "KafkaError (OAuth config error: {})", err),
KafkaError::OffsetFetch(err) => write!(f, "KafkaError (Offset fetch error: {})", err),
KafkaError::PartitionEOF(part_n) => write!(f, "KafkaError (Partition EOF: {})", part_n),
KafkaError::PauseResume(ref err) => {
Expand Down Expand Up @@ -259,6 +262,7 @@ impl fmt::Display for KafkaError {
write!(f, "No message received within the given poll interval")
}
KafkaError::Nul(_) => write!(f, "FFI nul error"),
KafkaError::OAuthConfig(err) => write!(f, "OAuth config error: {}", err),
KafkaError::OffsetFetch(err) => write!(f, "Offset fetch error: {}", err),
KafkaError::PartitionEOF(part_n) => write!(f, "Partition EOF: {}", part_n),
KafkaError::PauseResume(ref err) => write!(f, "Pause/resume error: {}", err),
Expand Down Expand Up @@ -288,6 +292,7 @@ impl Error for KafkaError {
KafkaError::MetadataFetch(err) => Some(err),
KafkaError::NoMessageReceived => None,
KafkaError::Nul(_) => None,
KafkaError::OAuthConfig(err) => Some(err),
KafkaError::OffsetFetch(err) => Some(err),
KafkaError::PartitionEOF(_) => None,
KafkaError::PauseResume(_) => None,
Expand Down Expand Up @@ -325,6 +330,7 @@ impl KafkaError {
KafkaError::MetadataFetch(err) => Some(*err),
KafkaError::NoMessageReceived => None,
KafkaError::Nul(_) => None,
KafkaError::OAuthConfig(err) => Some(err.code()),
KafkaError::OffsetFetch(err) => Some(*err),
KafkaError::PartitionEOF(_) => None,
KafkaError::PauseResume(_) => None,
Expand Down

0 comments on commit 8a13235

Please sign in to comment.