Skip to content

Commit

Permalink
feat: expose low-level metadata to user
Browse files Browse the repository at this point in the history
Helpful for debugging and certain admin operations like topic
re-assignment and leader election.
  • Loading branch information
crepererum committed Jul 21, 2022
1 parent eea2003 commit 0c5b67d
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 0 deletions.
50 changes: 50 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use thiserror::Error;
use crate::{
client::partition::PartitionClient,
connection::{BrokerConnector, TlsConfig},
metadata::{Metadata, MetadataBroker, MetadataPartition, MetadataTopic},
protocol::primitives::Boolean,
topic::Topic,
};
Expand Down Expand Up @@ -145,4 +146,53 @@ impl Client {
})
.collect())
}

/// Return cluster-wide metadata.
pub async fn metadata(&self) -> Result<Metadata> {
let response = self.brokers.request_metadata(None, None).await?;

Ok(Metadata {
brokers: response
.brokers
.into_iter()
.map(|response| MetadataBroker {
node_id: response.node_id.0,
host: response.host.0,
port: response.port.0,
rack: response.rack.and_then(|s| s.0),
})
.collect(),
controller_id: response.controller_id.map(|id| id.0),
topics: response
.topics
.into_iter()
.map(|response| MetadataTopic {
name: response.name.0,
is_internal: response.is_internal.map(|b| b.0),
partitions: response
.partitions
.into_iter()
.map(|response| MetadataPartition {
partition_index: response.partition_index.0,
leader_id: response.leader_id.0,
replica_nodes: response
.replica_nodes
.0
.unwrap_or_default()
.into_iter()
.map(|i| i.0)
.collect(),
isr_nodes: response
.isr_nodes
.0
.unwrap_or_default()
.into_iter()
.map(|i| i.0)
.collect(),
})
.collect(),
})
.collect(),
})
}
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ mod backoff;
pub mod client;

mod connection;

#[cfg(feature = "unstable-fuzzing")]
pub mod messenger;
#[cfg(not(feature = "unstable-fuzzing"))]
mod messenger;

pub mod metadata;

#[cfg(feature = "unstable-fuzzing")]
pub mod protocol;
#[cfg(not(feature = "unstable-fuzzing"))]
Expand Down
59 changes: 59 additions & 0 deletions src/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//! Cluster-wide Kafka metadata.
/// Metadata container for the entire cluster.
#[derive(Debug, PartialEq)]
pub struct Metadata {
/// Brokers.
pub brokers: Vec<MetadataBroker>,

/// The ID of the controller broker.
pub controller_id: Option<i32>,

/// Topics.
pub topics: Vec<MetadataTopic>,
}

/// Metadata for a certain broker.
#[derive(Debug, PartialEq)]
pub struct MetadataBroker {
/// The broker ID
pub node_id: i32,

/// The broker hostname
pub host: String,

/// The broker port
pub port: i32,

/// Rack.
pub rack: Option<String>,
}

/// Metadata for a certain topic.
#[derive(Debug, PartialEq)]
pub struct MetadataTopic {
/// The topic name
pub name: String,

/// True if the topic is internal
pub is_internal: Option<bool>,

/// Each partition in the topic
pub partitions: Vec<MetadataPartition>,
}

/// Metadata for a certain partition.
#[derive(Debug, PartialEq)]
pub struct MetadataPartition {
/// The partition index
pub partition_index: i32,

/// The ID of the leader broker
pub leader_id: i32,

/// The set of all nodes that host this partition
pub replica_nodes: Vec<i32>,

/// The set of all nodes that are in sync with the leader for this partition
pub isr_nodes: Vec<i32>,
}
34 changes: 34 additions & 0 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,40 @@ async fn test_delete_records() {
);
}

#[tokio::test]
async fn test_metadata() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(connection).build().await.unwrap();

let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, 1, 1, 5_000)
.await
.unwrap();

let md = client.metadata().await.unwrap();
assert!(!md.brokers.is_empty());

// topic metadata might take a while to converge
tokio::time::timeout(Duration::from_millis(1_000), async {
loop {
let md = client.metadata().await.unwrap();
let topic = md.topics.into_iter().find(|topic| topic.name == topic_name);
if topic.is_some() {
return;
}

tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap();
}

#[tokio::test]
async fn test_reassign_partitions() {
maybe_start_logging();
Expand Down

0 comments on commit 0c5b67d

Please sign in to comment.