Skip to content

Commit

Permalink
dekaf: Log all topic names in use
Browse files Browse the repository at this point in the history
We need to clear out a bunch of junk topics from MSK, and this is the best way we could think of to figure out which ones to keep
  • Loading branch information
jshearer committed Dec 2, 2024
1 parent 37d7222 commit a2ee6f5
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,11 @@ impl Session {
consumer_protocol_subscription_msg
.topics
.iter_mut()
.for_each(|topic| *topic = self.encrypt_topic_name(topic.to_owned().into()).into());
.for_each(|topic| {
let transformed = self.encrypt_topic_name(topic.to_owned().into()).into();
tracing::info!(topic_name = ?topic, encrypted_name=?transformed, "Joining group");
*topic = transformed;
});

let mut new_protocol_subscription = BytesMut::new();

Expand Down Expand Up @@ -950,6 +954,7 @@ impl Session {
.into_iter()
.map(|part| {
let transformed_topic = self.encrypt_topic_name(part.topic.to_owned());
tracing::info!(topic_name = ?part.topic, encrypted_name=?transformed_topic, "Syncing group");
part.with_topic(transformed_topic)
})
.collect();
Expand Down Expand Up @@ -1036,7 +1041,9 @@ impl Session {
) -> anyhow::Result<messages::OffsetCommitResponse> {
let mut mutated_req = req.clone();
for topic in &mut mutated_req.topics {
topic.name = self.encrypt_topic_name(topic.name.clone())
let encrypted = self.encrypt_topic_name(topic.name.clone());
tracing::info!(topic_name = ?topic.name, encrypted_name=?encrypted, "Committing offset");
topic.name = encrypted;
}

let client = self
Expand Down Expand Up @@ -1118,7 +1125,8 @@ impl Session {
let mut mutated_req = req.clone();
if let Some(ref mut topics) = mutated_req.topics {
for topic in topics {
topic.name = self.encrypt_topic_name(topic.name.clone())
let encrypted = self.encrypt_topic_name(topic.name.clone());
tracing::info!(topic_name = ?topic.name, encrypted_name = ?encrypted, "Fetching offset");
}
}

Expand Down

0 comments on commit a2ee6f5

Please sign in to comment.