diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 7e099aab0..7299c01a0 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -1144,6 +1144,49 @@ pub async fn produce_consume_multi_topic_consumer(connection_builder: &KafkaConn } } +async fn describe_producers(admin: &KafkaAdmin) { + let producers = admin + .describe_producers(&[ + TopicPartition { + topic_name: "partitions1".to_owned(), + partition: 0, + }, + TopicPartition { + topic_name: "partitions3_case1".to_owned(), + partition: 0, + }, + ]) + .await; + + // producer ID is random so just assert that the producer exists, regardless of its fields. + assert_eq!(producers.len(), 2); + assert_eq!( + producers + .get(&TopicPartition { + topic_name: "partitions1".to_owned(), + partition: 0, + }) + .unwrap() + .len(), + // this partition has 1 registered producer + 1 + ); + assert_eq!( + producers + .get(&TopicPartition { + topic_name: "partitions3_case1".to_owned(), + partition: 0, + }) + .unwrap() + .len(), + // this partition has no registered producer + 0 + ); + + // I'm not sure why exactly partitions1 has a registered producer while partitions3_case1 does not. + // I think its up to the driver whether they send an InitProducerId request or not. +} + async fn produce_consume_transactions_with_abort(connection_builder: &KafkaConnectionBuilder) { let producer = connection_builder.connect_producer("1", 0).await; for i in 0..5 { @@ -1545,6 +1588,8 @@ async fn standard_test_suite_base(connection_builder: &KafkaConnectionBuilder) { .await; produce_consume_partitions1(connection_builder, "partitions1").await; + // misc other tests + describe_producers(&admin).await; list_offsets(&admin).await; } diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 3a895e0bb..b4cd8e763 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -16,6 +16,8 @@ use kafka_node::{ConnectionFactory, KafkaAddress, KafkaNode, KafkaNodeState}; use kafka_protocol::messages::add_partitions_to_txn_request::AddPartitionsToTxnTransaction; use kafka_protocol::messages::delete_records_request::DeleteRecordsTopic; use kafka_protocol::messages::delete_records_response::DeleteRecordsTopicResult; +use kafka_protocol::messages::describe_producers_request::TopicRequest; +use kafka_protocol::messages::describe_producers_response::TopicResponse; use kafka_protocol::messages::fetch_request::FetchTopic; use kafka_protocol::messages::fetch_response::LeaderIdAndEpoch as FetchResponseLeaderIdAndEpoch; use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic; @@ -30,13 +32,14 @@ use kafka_protocol::messages::produce_response::{ use kafka_protocol::messages::{ AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest, - DeleteRecordsResponse, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, - FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, - LeaveGroupRequest, ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, - ListTransactionsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest, - OffsetFetchResponse, OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, - ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, - SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest, + DeleteRecordsResponse, DescribeProducersRequest, DescribeProducersResponse, EndTxnRequest, + FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, + HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, + ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, + MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, + OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, + RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, + SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest, }; use kafka_protocol::protocol::StrBytes; use kafka_protocol::ResponseError; @@ -52,9 +55,10 @@ use serde::{Deserialize, Serialize}; use shotover_node::{ShotoverNode, ShotoverNodeConfig}; use split::{ AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, - DeleteRecordsRequestSplitAndRouter, ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter, - ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter, - OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter, + DeleteRecordsRequestSplitAndRouter, DescribeProducersRequestSplitAndRouter, + ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter, ListTransactionsSplitAndRouter, + OffsetFetchSplitAndRouter, OffsetForLeaderEpochRequestSplitAndRouter, + ProduceRequestSplitAndRouter, RequestSplitAndRouter, }; use std::collections::{HashMap, HashSet, VecDeque}; use std::hash::Hasher; @@ -697,6 +701,14 @@ impl KafkaSinkCluster { self.store_topic_names(&mut topic_names, topic.topic.clone()); } } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::DescribeProducers(body), + .. + })) => { + for topic in &body.topics { + self.store_topic_names(&mut topic_names, topic.name.clone()); + } + } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::Fetch(fetch), .. @@ -894,6 +906,12 @@ impl KafkaSinkCluster { })) => { self.split_and_route_request::(request)? } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::DescribeProducers(_), + .. + })) => { + self.split_and_route_request::(request)? + } // route to group coordinator Some(Frame::Kafka(KafkaFrame::Request { @@ -1404,6 +1422,57 @@ The connection to the client has been closed." Ok(()) } + /// This method removes all topics from the DescribeProducers request and returns them split up by their destination. + /// If any topics are unroutable they will have their BrokerId set to -1 + fn split_describe_producers_request_by_destination( + &mut self, + body: &mut DescribeProducersRequest, + ) -> HashMap> { + let mut result: HashMap> = Default::default(); + + for mut topic in body.topics.drain(..) { + let topic_name = &topic.name; + if let Some(topic_meta) = self.topic_by_name.get(topic_name) { + for partition_index in std::mem::take(&mut topic.partition_indexes) { + let destination = if let Some(partition) = + topic_meta.partitions.get(partition_index as usize) + { + if partition.leader_id == -1 { + tracing::warn!( + "leader_id is unknown for {topic_name:?} at partition index {partition_index}", + ); + } + partition.leader_id + } else { + let partition_len = topic_meta.partitions.len(); + tracing::warn!("no known partition for {topic_name:?} at partition index {partition_index} out of {partition_len} partitions, routing request to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client"); + BrokerId(-1) + }; + tracing::debug!( + "Routing DescribeProducers request portion of partition {partition_index} in {topic_name:?} to broker {}", + destination.0 + ); + let dest_topics = result.entry(destination).or_default(); + if let Some(dest_topic) = dest_topics.iter_mut().find(|x| x.name == topic.name) + { + dest_topic.partition_indexes.push(partition_index); + } else { + let mut topic = topic.clone(); + topic.partition_indexes.push(partition_index); + dest_topics.push(topic); + } + } + } else { + tracing::warn!("no known partition replica for {topic_name:?}, routing request to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client"); + let destination = BrokerId(-1); + let dest_topics = result.entry(destination).or_default(); + dest_topics.push(topic); + } + } + + result + } + /// This method removes all topics from the list offsets request and returns them split up by their destination /// If any topics are unroutable they will have their BrokerId set to -1 fn split_list_offsets_request_by_destination( @@ -2154,6 +2223,10 @@ The connection to the client has been closed." body: ResponseBody::OffsetFetch(base), .. })) => Self::combine_offset_fetch(base, drain)?, + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::DescribeProducers(base), + .. + })) => Self::combine_describe_producers(base, drain)?, Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::ListGroups(base), .. @@ -2444,6 +2517,49 @@ The connection to the client has been closed." Ok(()) } + fn combine_describe_producers( + base: &mut DescribeProducersResponse, + drain: impl Iterator, + ) -> Result<()> { + let mut base_responses: HashMap = + std::mem::take(&mut base.topics) + .into_iter() + .map(|response| (response.name.clone(), response)) + .collect(); + for mut next in drain { + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::DescribeProducers(next), + .. + })) = next.frame() + { + for next_response in std::mem::take(&mut next.topics) { + if let Some(base_response) = base_responses.get_mut(&next_response.name) { + for next_partition in &next_response.partitions { + for base_partition in &base_response.partitions { + if next_partition.partition_index == base_partition.partition_index + { + tracing::warn!("Duplicate partition indexes in combined DescribeProducers response, if this ever occurs we should investigate the repercussions") + } + } + } + // A partition can only be contained in one response so there is no risk of duplicating partitions + base_response.partitions.extend(next_response.partitions) + } else { + base_responses.insert(next_response.name.clone(), next_response); + } + } + } else { + return Err(anyhow!( + "Combining DescribeProducers responses but received another message type" + )); + } + } + + base.topics.extend(base_responses.into_values()); + + Ok(()) + } + fn combine_list_transactions( base_list_transactions: &mut ListTransactionsResponse, drain: impl Iterator, diff --git a/shotover/src/transforms/kafka/sink_cluster/split.rs b/shotover/src/transforms/kafka/sink_cluster/split.rs index 7b34bc0c3..f4737cf92 100644 --- a/shotover/src/transforms/kafka/sink_cluster/split.rs +++ b/shotover/src/transforms/kafka/sink_cluster/split.rs @@ -8,12 +8,13 @@ use crate::{ }; use kafka_protocol::messages::{ add_partitions_to_txn_request::AddPartitionsToTxnTransaction, - delete_records_request::DeleteRecordsTopic, list_offsets_request::ListOffsetsTopic, - offset_fetch_request::OffsetFetchRequestGroup, + delete_records_request::DeleteRecordsTopic, describe_producers_request::TopicRequest, + list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup, offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData, - AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, DeleteRecordsRequest, GroupId, - ListGroupsRequest, ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest, - OffsetForLeaderEpochRequest, ProduceRequest, TopicName, + AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, DeleteRecordsRequest, + DescribeProducersRequest, GroupId, ListGroupsRequest, ListOffsetsRequest, + ListTransactionsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest, + TopicName, }; use std::collections::HashMap; @@ -168,6 +169,34 @@ impl RequestSplitAndRouter for DeleteRecordsRequestSplitAndRouter { } } +pub struct DescribeProducersRequestSplitAndRouter; + +impl RequestSplitAndRouter for DescribeProducersRequestSplitAndRouter { + type Request = DescribeProducersRequest; + type SubRequests = Vec; + + fn split_by_destination( + transform: &mut KafkaSinkCluster, + request: &mut Self::Request, + ) -> HashMap { + transform.split_describe_producers_request_by_destination(request) + } + + fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> { + match request.frame() { + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::DescribeProducers(request), + .. + })) => Some(request), + _ => None, + } + } + + fn reassemble(request: &mut Self::Request, item: Self::SubRequests) { + request.topics = item; + } +} + pub struct DeleteGroupsSplitAndRouter; impl RequestSplitAndRouter for DeleteGroupsSplitAndRouter { diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index 2791f316b..70c912a2c 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -1,8 +1,9 @@ use super::{ Acl, AclOperation, AclPermissionType, AlterConfig, ConsumerConfig, ExpectedResponse, ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata, - OffsetSpec, PartitionReassignment, ProduceResult, Record, RecordsToDelete, ResourcePatternType, - ResourceSpecifier, ResourceType, TopicDescription, TopicPartition, TopicPartitionInfo, + OffsetSpec, PartitionReassignment, ProduceResult, ProducerState, Record, RecordsToDelete, + ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription, TopicPartition, + TopicPartitionInfo, }; use crate::connection::java::{map_iterator, Jvm, Value}; use anyhow::Result; @@ -442,6 +443,48 @@ impl KafkaAdminJava { self.create_topics_fallible(topics).await.unwrap(); } + pub async fn describe_producers( + &self, + topic_partitions: &[TopicPartition], + ) -> HashMap> { + let topic_partitions_java = self.jvm.new_set( + "org.apache.kafka.common.TopicPartition", + topic_partitions + .iter() + .map(|topic_partition| topic_partition_to_java(&self.jvm, topic_partition)) + .collect(), + ); + + let describe_results = self + .admin + .call("describeProducers", vec![topic_partitions_java]) + .call_async_fallible("all", vec![]) + .await + .unwrap(); + + map_iterator(describe_results) + .map(|(topic_partition, producer_states)| { + let producer_states = producer_states.cast( + "org.apache.kafka.clients.admin.DescribeProducersResult$PartitionProducerState", + ); + ( + topic_partition_to_rust(topic_partition), + producer_states + .call("activeProducers", vec![]) + .call("iterator", vec![]) + .into_iter() + .map(|producer_state| ProducerState { + producer_id: producer_state + .cast("org.apache.kafka.clients.admin.ProducerState") + .call("producerId", vec![]) + .into_rust(), + }) + .collect(), + ) + }) + .collect() + } + pub async fn describe_topics(&self, topic_names: &[&str]) -> Result> { let topics = self.jvm.new_list( "java.lang.String", diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index ac12926c8..f2f7e3fef 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -372,6 +372,18 @@ impl KafkaAdmin { KafkaAdmin::Java(java) => java.create_topics(topics).await, } } + pub async fn describe_producers( + &self, + topic_partitions: &[TopicPartition], + ) -> HashMap> { + match self { + #[cfg(feature = "kafka-cpp-driver-tests")] + Self::Cpp(_) => { + panic!("rdkafka-rs driver does not support describe_producers") + } + Self::Java(java) => java.describe_producers(topic_partitions).await, + } + } pub async fn create_topics_and_wait(&self, topics: &[NewTopic<'_>]) { self.create_topics(topics).await; @@ -749,3 +761,8 @@ pub struct PartitionReassignment { pub struct NewPartitionReassignment { pub replica_broker_ids: Vec, } + +#[derive(PartialEq, Debug)] +pub struct ProducerState { + pub producer_id: i64, +}