diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index a4c2406250..bf7281ef66 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -809,7 +809,11 @@ impl Session { consumer_protocol_subscription_msg .topics .iter_mut() - .for_each(|topic| *topic = self.encrypt_topic_name(topic.to_owned().into()).into()); + .for_each(|topic| { + let transformed = self.encrypt_topic_name(topic.to_owned().into()).into(); + tracing::info!(topic_name = ?topic, encrypted_name=?transformed, "Joining group"); + *topic = transformed; + }); let mut new_protocol_subscription = BytesMut::new(); @@ -950,6 +954,7 @@ impl Session { .into_iter() .map(|part| { let transformed_topic = self.encrypt_topic_name(part.topic.to_owned()); + tracing::info!(topic_name = ?part.topic, encrypted_name=?transformed_topic, "Syncing group"); part.with_topic(transformed_topic) }) .collect(); @@ -1036,7 +1041,9 @@ impl Session { ) -> anyhow::Result { let mut mutated_req = req.clone(); for topic in &mut mutated_req.topics { - topic.name = self.encrypt_topic_name(topic.name.clone()) + let encrypted = self.encrypt_topic_name(topic.name.clone()); + tracing::info!(topic_name = ?topic.name, encrypted_name=?encrypted, "Committing offset"); + topic.name = encrypted; } let client = self @@ -1118,7 +1125,8 @@ impl Session { let mut mutated_req = req.clone(); if let Some(ref mut topics) = mutated_req.topics { for topic in topics { - topic.name = self.encrypt_topic_name(topic.name.clone()) + let encrypted = self.encrypt_topic_name(topic.name.clone()); + tracing::info!(topic_name = ?topic.name, encrypted_name = ?encrypted, "Fetching offset"); } }