Skip to content

Commit

Permalink
fixup! dekaf: Remove unneccesary ensure_topics routine
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Oct 7, 2024
1 parent 871a21a commit 990b986
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 0 deletions.
55 changes: 55 additions & 0 deletions crates/dekaf/src/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,4 +538,59 @@ impl KafkaApiClient {

Ok(version.to_owned())
}

#[instrument(skip_all)]
pub async fn ensure_topics(&self, topic_names: Vec<messages::TopicName>) -> anyhow::Result<()> {
let req = messages::MetadataRequest::default()
.with_topics(Some(
topic_names
.iter()
.map(|name| {
messages::metadata_request::MetadataRequestTopic::default()
.with_name(Some(name.clone()))
})
.collect(),
))
.with_allow_auto_topic_creation(true);

let coord = self.connect_to_controller().await?;
let resp = coord.send_request(req, None).await?;
tracing::debug!(metadata=?resp, "Got metadata response");

if resp
.topics
.iter()
.all(|(name, topic)| topic_names.contains(&name) && topic.error_code == 0)
{
return Ok(());
} else {
let mut topics_map = kafka_protocol::indexmap::IndexMap::new();
for topic_name in topic_names.into_iter() {
topics_map.insert(
topic_name,
messages::create_topics_request::CreatableTopic::default()
.with_replication_factor(2)
.with_num_partitions(-1),
);
}
let create_req = messages::CreateTopicsRequest::default().with_topics(topics_map);
let create_resp = coord.send_request(create_req, None).await?;
tracing::debug!(create_response=?create_resp, "Got create response");

for (name, topic) in create_resp.topics {
if topic.error_code > 0 {
let err = kafka_protocol::ResponseError::try_from_code(topic.error_code);
tracing::warn!(
topic = name.to_string(),
error = ?err,
message = topic.error_message.map(|m|m.to_string()),
"Failed to create topic"
);
bail!("Failed to create topic");
}
}

Ok(())
}
}
}
18 changes: 18 additions & 0 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,17 @@ impl Session {
.connect_to_group_coordinator(req.group_id.as_str())
.await?;

self.app
.kafka_client
.ensure_topics(
mutated_req
.topics
.iter()
.map(|t| t.name.to_owned())
.collect(),
)
.await?;

let mut resp = client.send_request(mutated_req, Some(header)).await?;

for topic in resp.topics.iter_mut() {
Expand Down Expand Up @@ -872,6 +883,13 @@ impl Session {
.connect_to_group_coordinator(req.group_id.as_str())
.await?;

if let Some(ref topics) = mutated_req.topics {
self.app
.kafka_client
.ensure_topics(topics.iter().map(|t| t.name.to_owned()).collect())
.await?;
}

let mut resp = client.send_request(mutated_req, Some(header)).await?;

for topic in resp.topics.iter_mut() {
Expand Down

0 comments on commit 990b986

Please sign in to comment.