Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Confluent Kafka SetSaslCredentials is not working as expected #2306

Open
4 of 8 tasks
arunprakashn opened this issue Sep 2, 2024 · 2 comments
Open
4 of 8 tasks

Confluent Kafka SetSaslCredentials is not working as expected #2306

arunprakashn opened this issue Sep 2, 2024 · 2 comments

Comments

@arunprakashn
Copy link

arunprakashn commented Sep 2, 2024

Description

I am trying to use the new enhancement "SetSaslCredentials" and I think it's not working as expected

How to reproduce

  1. Create a producer builder with proper working configuration(a working Api key and secret)
  2. Try to call producer.SetSaslCredentials("dummykey", "dummysecret")
  3. If you try to send a message using producer.ProduceAsync it will throw auth error which is expected
  4. Then call producer.SetSaslCredentials("workingKey", "workingSecret")
  5. Try to send a message and it gets delivered which is expected
  6. In the next line, once again do producer.SetSaslCrecentials("dummykey","dummysecret")
  7. Then send a message and the message gets delivered. If I Set the credentials wrong, it should throw auth error
  8. Follow-up question: If I have two set of credentials and keep switching them between calls without rebuilding the producer, I believe, the producer wont switch the auth properly. Once if the auth is successful and works, it wont switch again even if you call "producer.SetSaslCredentials"

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • Confluent.Kafka nuget version. - 2.5.2
  • Apache Kafka version.
  • Client configuration.
  • Operating system.
  • Provide logs (with "debug" : "..." as necessary in configuration).
  • Provide broker log excerpts.
  • Critical issue.
@arunprakashn
Copy link
Author

arunprakashn commented Sep 2, 2024

`
using Confluent.Kafka;
using System.Diagnostics.Tracing;
using System.Text;

Console.WriteLine("Hello, World!");
string apiKey = "WorkingKey";
string secret = "WorkingSecret";
string bootstrapUrl = "dummy.azure.confluent.cloud:9092";
var config = new ProducerConfig
{
BootstrapServers = bootstrapUrl,
SaslMechanism = SaslMechanism.Plain,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslUsername = apiKey,
SaslPassword = secret,
RetryBackoffMaxMs = 2000,
MessageTimeoutMs = 10000,
};
var producerBuilder = new ProducerBuilder<string, byte[]>(config);

var producer = producerBuilder
.SetErrorHandler((p, error) =>
{
if (error.IsFatal)
{
Console.WriteLine($"Confluent Kafka Producer Error Handler : FATAL : Error Code: {error.Code} {error.Reason}", EventLevel.Critical);
}
else
{
Console.WriteLine($"Confluent Kafka Producer Error Handler : Error Code: {error.Code} {error.Reason}", EventLevel.LogAlways);
}
})
.SetLogHandler((p, logHandler) =>
{
Console.WriteLine(
$"Confluent Kafka Producer Log Handler : {logHandler.Level.ToString().ToUpper()}|{DateTime.UtcNow}|{logHandler.Facility}|{logHandler.Name}|{logHandler.Message}",
EventLevel.Verbose);

    })
    .Build();

producer.SetSaslCredentials("hello", "hello");
for (int i = 0; i < 1; i++) //This wont work. Expected
{
try
{
var result = producer.ProduceAsync("arun.poc", new Message<string, byte[]> { Key = (string)(object)"key", Value = (byte[])(object)Encoding.UTF8.GetBytes("value") }).GetAwaiter().GetResult();
Console.WriteLine($"Message sent to Partition: {result.Partition} with Offset: {result.Offset}");
}
catch (Exception ex)
{
Console.WriteLine($"Exception: {ex.Message}");
}
}

producer.SetSaslCredentials(apiKey, secret);
for (int i = 0; i < 10; i++) //This will work, expected
{
var result = producer.ProduceAsync("arun.poc", new Message<string, byte[]> { Key = (string)(object)"key", Value = (byte[])(object)Encoding.UTF8.GetBytes("value") }).GetAwaiter().GetResult();
Console.WriteLine($"Message sent to Partition: {result.Partition} with Offset: {result.Offset}");
}

producer.SetSaslCredentials("hello", "hello");

for (int i = 0; i < 1; i++) //This should not working as the creds are set to hello which are not right
{
try
{
var result = producer.ProduceAsync("arun.poc", new Message<string, byte[]> { Key = (string)(object)"key", Value = (byte[])(object)Encoding.UTF8.GetBytes("value") }).GetAwaiter().GetResult();
Console.WriteLine($"Message sent to Partition: {result.Partition} with Offset: {result.Offset}");
}
catch (Exception ex)
{
Console.WriteLine($"Exception: {ex.Message}");
}
}`

@milindl
Copy link
Contributor

milindl commented Oct 22, 2024

Once if the auth is successful and works, it wont switch again even if you call "producer.SetSaslCredentials"

Currently auth is only required when connecting to a broker. Once a connection is established, changing the credentials afterward to something incorrect doesn't cause any issue as long as the connection is intact (Connections are expected to be persisted)

Depending on where you are running the broker, you can check the broker property "connections.max.reauth.ms" which mandates reauthentication of successful connections within this duration.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants