Skip to content

Commit

Permalink
KafkaSinkCluster: route DescribeProducers (#1814)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Nov 15, 2024
1 parent 0310cbf commit 65ce7cb
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 17 deletions.
45 changes: 45 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,49 @@ pub async fn produce_consume_multi_topic_consumer(connection_builder: &KafkaConn
}
}

async fn describe_producers(admin: &KafkaAdmin) {
let producers = admin
.describe_producers(&[
TopicPartition {
topic_name: "partitions1".to_owned(),
partition: 0,
},
TopicPartition {
topic_name: "partitions3_case1".to_owned(),
partition: 0,
},
])
.await;

// producer ID is random so just assert that the producer exists, regardless of its fields.
assert_eq!(producers.len(), 2);
assert_eq!(
producers
.get(&TopicPartition {
topic_name: "partitions1".to_owned(),
partition: 0,
})
.unwrap()
.len(),
// this partition has 1 registered producer
1
);
assert_eq!(
producers
.get(&TopicPartition {
topic_name: "partitions3_case1".to_owned(),
partition: 0,
})
.unwrap()
.len(),
// this partition has no registered producer
0
);

// I'm not sure why exactly partitions1 has a registered producer while partitions3_case1 does not.
// I think its up to the driver whether they send an InitProducerId request or not.
}

async fn produce_consume_transactions_with_abort(connection_builder: &KafkaConnectionBuilder) {
let producer = connection_builder.connect_producer("1", 0).await;
for i in 0..5 {
Expand Down Expand Up @@ -1545,6 +1588,8 @@ async fn standard_test_suite_base(connection_builder: &KafkaConnectionBuilder) {
.await;
produce_consume_partitions1(connection_builder, "partitions1").await;

// misc other tests
describe_producers(&admin).await;
list_offsets(&admin).await;
}

Expand Down
136 changes: 126 additions & 10 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use kafka_node::{ConnectionFactory, KafkaAddress, KafkaNode, KafkaNodeState};
use kafka_protocol::messages::add_partitions_to_txn_request::AddPartitionsToTxnTransaction;
use kafka_protocol::messages::delete_records_request::DeleteRecordsTopic;
use kafka_protocol::messages::delete_records_response::DeleteRecordsTopicResult;
use kafka_protocol::messages::describe_producers_request::TopicRequest;
use kafka_protocol::messages::describe_producers_response::TopicResponse;
use kafka_protocol::messages::fetch_request::FetchTopic;
use kafka_protocol::messages::fetch_response::LeaderIdAndEpoch as FetchResponseLeaderIdAndEpoch;
use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic;
Expand All @@ -30,13 +32,14 @@ use kafka_protocol::messages::produce_response::{
use kafka_protocol::messages::{
AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey,
BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest,
DeleteRecordsResponse, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest,
FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest,
LeaveGroupRequest, ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse,
ListTransactionsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest,
OffsetFetchResponse, OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest,
ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse,
SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest,
DeleteRecordsResponse, DescribeProducersRequest, DescribeProducersResponse, EndTxnRequest,
FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId,
HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest,
ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse,
MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse,
OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse,
RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest,
SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
Expand All @@ -52,9 +55,10 @@ use serde::{Deserialize, Serialize};
use shotover_node::{ShotoverNode, ShotoverNodeConfig};
use split::{
AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter,
DeleteRecordsRequestSplitAndRouter, ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter,
ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter,
OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter,
DeleteRecordsRequestSplitAndRouter, DescribeProducersRequestSplitAndRouter,
ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter, ListTransactionsSplitAndRouter,
OffsetFetchSplitAndRouter, OffsetForLeaderEpochRequestSplitAndRouter,
ProduceRequestSplitAndRouter, RequestSplitAndRouter,
};
use std::collections::{HashMap, HashSet, VecDeque};
use std::hash::Hasher;
Expand Down Expand Up @@ -697,6 +701,14 @@ impl KafkaSinkCluster {
self.store_topic_names(&mut topic_names, topic.topic.clone());
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DescribeProducers(body),
..
})) => {
for topic in &body.topics {
self.store_topic_names(&mut topic_names, topic.name.clone());
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Fetch(fetch),
..
Expand Down Expand Up @@ -894,6 +906,12 @@ impl KafkaSinkCluster {
})) => {
self.split_and_route_request::<DeleteRecordsRequestSplitAndRouter>(request)?
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DescribeProducers(_),
..
})) => {
self.split_and_route_request::<DescribeProducersRequestSplitAndRouter>(request)?
}

// route to group coordinator
Some(Frame::Kafka(KafkaFrame::Request {
Expand Down Expand Up @@ -1404,6 +1422,57 @@ The connection to the client has been closed."
Ok(())
}

/// This method removes all topics from the DescribeProducers request and returns them split up by their destination.
/// If any topics are unroutable they will have their BrokerId set to -1
fn split_describe_producers_request_by_destination(
&mut self,
body: &mut DescribeProducersRequest,
) -> HashMap<BrokerId, Vec<TopicRequest>> {
let mut result: HashMap<BrokerId, Vec<TopicRequest>> = Default::default();

for mut topic in body.topics.drain(..) {
let topic_name = &topic.name;
if let Some(topic_meta) = self.topic_by_name.get(topic_name) {
for partition_index in std::mem::take(&mut topic.partition_indexes) {
let destination = if let Some(partition) =
topic_meta.partitions.get(partition_index as usize)
{
if partition.leader_id == -1 {
tracing::warn!(
"leader_id is unknown for {topic_name:?} at partition index {partition_index}",
);
}
partition.leader_id
} else {
let partition_len = topic_meta.partitions.len();
tracing::warn!("no known partition for {topic_name:?} at partition index {partition_index} out of {partition_len} partitions, routing request to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
BrokerId(-1)
};
tracing::debug!(
"Routing DescribeProducers request portion of partition {partition_index} in {topic_name:?} to broker {}",
destination.0
);
let dest_topics = result.entry(destination).or_default();
if let Some(dest_topic) = dest_topics.iter_mut().find(|x| x.name == topic.name)
{
dest_topic.partition_indexes.push(partition_index);
} else {
let mut topic = topic.clone();
topic.partition_indexes.push(partition_index);
dest_topics.push(topic);
}
}
} else {
tracing::warn!("no known partition replica for {topic_name:?}, routing request to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
let destination = BrokerId(-1);
let dest_topics = result.entry(destination).or_default();
dest_topics.push(topic);
}
}

result
}

/// This method removes all topics from the list offsets request and returns them split up by their destination
/// If any topics are unroutable they will have their BrokerId set to -1
fn split_list_offsets_request_by_destination(
Expand Down Expand Up @@ -2154,6 +2223,10 @@ The connection to the client has been closed."
body: ResponseBody::OffsetFetch(base),
..
})) => Self::combine_offset_fetch(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DescribeProducers(base),
..
})) => Self::combine_describe_producers(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ListGroups(base),
..
Expand Down Expand Up @@ -2444,6 +2517,49 @@ The connection to the client has been closed."
Ok(())
}

fn combine_describe_producers(
base: &mut DescribeProducersResponse,
drain: impl Iterator<Item = Message>,
) -> Result<()> {
let mut base_responses: HashMap<TopicName, TopicResponse> =
std::mem::take(&mut base.topics)
.into_iter()
.map(|response| (response.name.clone(), response))
.collect();
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DescribeProducers(next),
..
})) = next.frame()
{
for next_response in std::mem::take(&mut next.topics) {
if let Some(base_response) = base_responses.get_mut(&next_response.name) {
for next_partition in &next_response.partitions {
for base_partition in &base_response.partitions {
if next_partition.partition_index == base_partition.partition_index
{
tracing::warn!("Duplicate partition indexes in combined DescribeProducers response, if this ever occurs we should investigate the repercussions")
}
}
}
// A partition can only be contained in one response so there is no risk of duplicating partitions
base_response.partitions.extend(next_response.partitions)
} else {
base_responses.insert(next_response.name.clone(), next_response);
}
}
} else {
return Err(anyhow!(
"Combining DescribeProducers responses but received another message type"
));
}
}

base.topics.extend(base_responses.into_values());

Ok(())
}

fn combine_list_transactions(
base_list_transactions: &mut ListTransactionsResponse,
drain: impl Iterator<Item = Message>,
Expand Down
39 changes: 34 additions & 5 deletions shotover/src/transforms/kafka/sink_cluster/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ use crate::{
};
use kafka_protocol::messages::{
add_partitions_to_txn_request::AddPartitionsToTxnTransaction,
delete_records_request::DeleteRecordsTopic, list_offsets_request::ListOffsetsTopic,
offset_fetch_request::OffsetFetchRequestGroup,
delete_records_request::DeleteRecordsTopic, describe_producers_request::TopicRequest,
list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup,
offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData,
AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, DeleteRecordsRequest, GroupId,
ListGroupsRequest, ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest,
OffsetForLeaderEpochRequest, ProduceRequest, TopicName,
AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, DeleteRecordsRequest,
DescribeProducersRequest, GroupId, ListGroupsRequest, ListOffsetsRequest,
ListTransactionsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest,
TopicName,
};
use std::collections::HashMap;

Expand Down Expand Up @@ -168,6 +169,34 @@ impl RequestSplitAndRouter for DeleteRecordsRequestSplitAndRouter {
}
}

pub struct DescribeProducersRequestSplitAndRouter;

impl RequestSplitAndRouter for DescribeProducersRequestSplitAndRouter {
type Request = DescribeProducersRequest;
type SubRequests = Vec<TopicRequest>;

fn split_by_destination(
transform: &mut KafkaSinkCluster,
request: &mut Self::Request,
) -> HashMap<BrokerId, Self::SubRequests> {
transform.split_describe_producers_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DescribeProducers(request),
..
})) => Some(request),
_ => None,
}
}

fn reassemble(request: &mut Self::Request, item: Self::SubRequests) {
request.topics = item;
}
}

pub struct DeleteGroupsSplitAndRouter;

impl RequestSplitAndRouter for DeleteGroupsSplitAndRouter {
Expand Down
47 changes: 45 additions & 2 deletions test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use super::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConsumerConfig, ExpectedResponse,
ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata,
OffsetSpec, PartitionReassignment, ProduceResult, Record, RecordsToDelete, ResourcePatternType,
ResourceSpecifier, ResourceType, TopicDescription, TopicPartition, TopicPartitionInfo,
OffsetSpec, PartitionReassignment, ProduceResult, ProducerState, Record, RecordsToDelete,
ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription, TopicPartition,
TopicPartitionInfo,
};
use crate::connection::java::{map_iterator, Jvm, Value};
use anyhow::Result;
Expand Down Expand Up @@ -442,6 +443,48 @@ impl KafkaAdminJava {
self.create_topics_fallible(topics).await.unwrap();
}

pub async fn describe_producers(
&self,
topic_partitions: &[TopicPartition],
) -> HashMap<TopicPartition, Vec<ProducerState>> {
let topic_partitions_java = self.jvm.new_set(
"org.apache.kafka.common.TopicPartition",
topic_partitions
.iter()
.map(|topic_partition| topic_partition_to_java(&self.jvm, topic_partition))
.collect(),
);

let describe_results = self
.admin
.call("describeProducers", vec![topic_partitions_java])
.call_async_fallible("all", vec![])
.await
.unwrap();

map_iterator(describe_results)
.map(|(topic_partition, producer_states)| {
let producer_states = producer_states.cast(
"org.apache.kafka.clients.admin.DescribeProducersResult$PartitionProducerState",
);
(
topic_partition_to_rust(topic_partition),
producer_states
.call("activeProducers", vec![])
.call("iterator", vec![])
.into_iter()
.map(|producer_state| ProducerState {
producer_id: producer_state
.cast("org.apache.kafka.clients.admin.ProducerState")
.call("producerId", vec![])
.into_rust(),
})
.collect(),
)
})
.collect()
}

pub async fn describe_topics(&self, topic_names: &[&str]) -> Result<Vec<TopicDescription>> {
let topics = self.jvm.new_list(
"java.lang.String",
Expand Down
17 changes: 17 additions & 0 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,18 @@ impl KafkaAdmin {
KafkaAdmin::Java(java) => java.create_topics(topics).await,
}
}
pub async fn describe_producers(
&self,
topic_partitions: &[TopicPartition],
) -> HashMap<TopicPartition, Vec<ProducerState>> {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(_) => {
panic!("rdkafka-rs driver does not support describe_producers")
}
Self::Java(java) => java.describe_producers(topic_partitions).await,
}
}

pub async fn create_topics_and_wait(&self, topics: &[NewTopic<'_>]) {
self.create_topics(topics).await;
Expand Down Expand Up @@ -749,3 +761,8 @@ pub struct PartitionReassignment {
pub struct NewPartitionReassignment {
pub replica_broker_ids: Vec<i32>,
}

#[derive(PartialEq, Debug)]
pub struct ProducerState {
pub producer_id: i64,
}

0 comments on commit 65ce7cb

Please sign in to comment.