diff --git a/Kudu.Core/Functions/KafkaTriggerKedaAuthProvider.cs b/Kudu.Core/Functions/KafkaTriggerKedaAuthProvider.cs index bf0fdcec..7b70f5a5 100644 --- a/Kudu.Core/Functions/KafkaTriggerKedaAuthProvider.cs +++ b/Kudu.Core/Functions/KafkaTriggerKedaAuthProvider.cs @@ -12,27 +12,22 @@ namespace Kudu.Core.Functions { public class KafkaTriggerKedaAuthProvider : IKedaAuthRefProvider { - // private readonly IKubernetes _kubernetesClient; - - // public KafkaTriggerKedaAuthProvider(IKubernetes kubernetesClient) - // { - // _kubernetesClient = kubernetesClient; - // } public IDictionary PopulateAuthenticationRef(JToken bindings, string functionName) { IDictionary functionData = bindings.ToObject>() .Where(i => i.Value.Type == JTokenType.String) - .ToDictionary(k => k.Key, v => v.Value.ToString()); + .ToDictionary(k => k.Key, v => v.Value.ToString(), StringComparer.OrdinalIgnoreCase); + + if (!IsTriggerAuthRequired(functionData, functionName)) { + return null; + } + //map of secret keys to keda params required for trigger auth IDictionary secretKeyToKedaParam = new Dictionary(); IDictionary secretsForAppSettings = new Dictionary(); - //creates the map of secret keys to keda params required for trigger auth - if (functionData.ContainsKey(TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL) - && (functionData[TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL].Equals("SaslSsl", StringComparison.OrdinalIgnoreCase) - || functionData[TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL].Equals("SaslPlaintext", StringComparison.OrdinalIgnoreCase) - && functionData.ContainsKey(TriggerAuthConstants.KAFKA_TRIGGER_AUTH_MODE) - && !functionData[TriggerAuthConstants.KAFKA_TRIGGER_AUTH_MODE].Equals(TriggerAuthConstants.KAFKA_TRIGGER_AUTH_MODE_NOT_SET, StringComparison.OrdinalIgnoreCase))) + if ((functionData[TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL].Equals(TriggerAuthConstants.KAFKA_TRIGGER_SASL_SSL_PROTOCOL, StringComparison.OrdinalIgnoreCase) + || functionData[TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL].Equals(TriggerAuthConstants.KAFKA_TRIGGER_SASL_PLAINTEXT_PROTOCOL, StringComparison.OrdinalIgnoreCase))) { secretKeyToKedaParam.Add(TriggerAuthConstants.KAFKA_TRIGGER_AUTH_MODE, getKedaProperty(TriggerAuthConstants.KAFKA_TRIGGER_AUTH_MODE)); secretKeyToKedaParam.Add(TriggerAuthConstants.KAFKA_TRIGGER_USERNAME, getKedaProperty(TriggerAuthConstants.KAFKA_TRIGGER_USERNAME)); @@ -44,8 +39,8 @@ public IDictionary PopulateAuthenticationRef(JToken bindings, st } if (functionData.ContainsKey(TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL) - && (functionData[TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL].Equals("SaslSsl", StringComparison.OrdinalIgnoreCase) - || functionData[TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL].Equals("Ssl", StringComparison.OrdinalIgnoreCase))) { + && (functionData[TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL].Equals(TriggerAuthConstants.KAFKA_TRIGGER_SASL_SSL_PROTOCOL, StringComparison.OrdinalIgnoreCase) + || functionData[TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL].Equals(TriggerAuthConstants.KAFKA_TRIGGER_SSL_PROTOCOL, StringComparison.OrdinalIgnoreCase))) { secretKeyToKedaParam.Add(TriggerAuthConstants.KAFKA_TRIGGER_TLS, getKedaProperty(TriggerAuthConstants.KAFKA_TRIGGER_TLS)); secretsForAppSettings.Add(TriggerAuthConstants.KAFKA_TRIGGER_TLS, "enable"); } @@ -69,13 +64,11 @@ public IDictionary PopulateAuthenticationRef(JToken bindings, st } //step 1: add the required trigger auth data as secrets in appsetting secrets file - string appNamespace = System.Environment.GetEnvironmentVariable("K8SE_APPS_NAMESPACE"); - try + try { - //add data as appsettings - K8SEDeploymentHelper.UpdateKubernetesSecrets(secretsForAppSettings, functionName + "-secrets", appNamespace); - } - catch (Exception ex) + AddTriggerAuthAppSettingsSecrets(secretsForAppSettings, functionName); + + } catch (Exception ex) { //logging and continuing as keda handles if secret expected is not found Console.WriteLine("Error while adding secrets required for trigger auth ", ex.ToString()); @@ -91,20 +84,25 @@ public IDictionary PopulateAuthenticationRef(JToken bindings, st catch (Exception ex) { Console.WriteLine("Error while creating Trigger Authentication Ref, function name : {0} ", functionName, ex.ToString()); + Console.WriteLine("KEDA might not to able to scale the app {0} due to missing Trigger Authentication details", functionName, ex.ToString()); return null; } - return authRef; } internal virtual void CreateTriggerAuthenticationRef(IDictionary secretKeyToKedaParam, string functionName) { string secretKeyToKedaParamMap = System.Convert.ToBase64String(ASCIIEncoding.ASCII.GetBytes(JsonConvert.SerializeObject(secretKeyToKedaParam))); - // functionName + "-secrets" is the filename for appsettings secrets K8SEDeploymentHelper.CreateTriggerAuthenticationRef(functionName + "-secrets", secretKeyToKedaParamMap, functionName); } + internal virtual void AddTriggerAuthAppSettingsSecrets(IDictionary secretsForAppSettings, string functionName) + { + string appNamespace = System.Environment.GetEnvironmentVariable("K8SE_APPS_NAMESPACE"); + K8SEDeploymentHelper.UpdateKubernetesSecrets(secretsForAppSettings, functionName + "-secrets", appNamespace); + } + internal string getKedaProperty(string triggerBinding) { if (triggerBinding == null) @@ -113,5 +111,22 @@ internal string getKedaProperty(string triggerBinding) } return TriggerAuthConstants.KafkaTriggerBindingToKedaProperty.GetValueOrDefault(triggerBinding); } + + internal Boolean IsTriggerAuthRequired(IDictionary functionBindings, string functionName) { + + if (!functionBindings.ContainsKey(TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL) || !functionBindings.ContainsKey(TriggerAuthConstants.KAFKA_TRIGGER_AUTH_MODE)) { + return false; + } + + if (functionBindings[TriggerAuthConstants.KAFKA_TRIGGER_AUTH_MODE].Equals(TriggerAuthConstants.KAFKA_TRIGGER_AUTH_MODE_NOT_SET, StringComparison.OrdinalIgnoreCase) + || functionBindings[TriggerAuthConstants.KAFKA_TRIGGER_AUTH_MODE].Equals(TriggerAuthConstants.KAFKA_TRIGGER_PROTOCOL_NOT_SET, StringComparison.OrdinalIgnoreCase)) { + return false; + } + + if (functionBindings[TriggerAuthConstants.KAFKA_TRIGGER_AUTH_MODE].Equals("Gssapi", StringComparison.OrdinalIgnoreCase)) { + Console.WriteLine("Gssapi as Authentication Mode is not supported in Keda, function app {0} might not be able to scale", functionName); + } + return true; + } } } diff --git a/Kudu.Core/Functions/TriggerAuthConstants.cs b/Kudu.Core/Functions/TriggerAuthConstants.cs index f9b1ac89..00e9b5f5 100644 --- a/Kudu.Core/Functions/TriggerAuthConstants.cs +++ b/Kudu.Core/Functions/TriggerAuthConstants.cs @@ -12,9 +12,9 @@ public static class TriggerAuthConstants public const string KAFKA_TRIGGER_AUTH_MODE = "authenticationMode"; public const string KAFKA_TRIGGER_USERNAME = "username"; public const string KAFKA_TRIGGER_PASSWORD = "password"; - public const string KAFKA_TRIGGER_SSL_CA_LOCATION = "SslCaLocation"; - public const string KAFKA_TRIGGER_SSL_CERT_LOCATION = "SslCertificateLocation"; - public const string KAFKA_TRIGGER_SSL_KEY_LOCATION = "SslKeyLocation"; + public const string KAFKA_TRIGGER_SSL_CA_LOCATION = "sslCaLocation"; + public const string KAFKA_TRIGGER_SSL_CERT_LOCATION = "sslCertificateLocation"; + public const string KAFKA_TRIGGER_SSL_KEY_LOCATION = "sslKeyLocation"; public const string KAFKA_TRIGGER_TLS = "tls"; public const string KAFKA_TRIGGER_AUTH_MODE_NOT_SET = "NotSet"; @@ -28,6 +28,12 @@ public static class TriggerAuthConstants public const string KAFKA_KEDA_PARAM_KEY_LOCATION = "key"; public const string KAFKA_KEDA_PARAM_TLS = "tls"; + //Below protocol values must be same as BrokerProtocol values from kafka extension + public const string KAFKA_TRIGGER_SASL_SSL_PROTOCOL = "SaslSsl"; + public const string KAFKA_TRIGGER_SSL_PROTOCOL = "Ssl"; + public const string KAFKA_TRIGGER_SASL_PLAINTEXT_PROTOCOL = "SaslPlaintext"; + public const string KAFKA_TRIGGER_PLAINTEXT_PROTOCOL = "Plaintext"; + public static readonly Dictionary KafkaTriggerBindingToKedaProperty = new Dictionary() { { KAFKA_TRIGGER_AUTH_MODE, KAFKA_KEDA_PARAM_AUTH_MODE }, diff --git a/Kudu.Tests/Core/Function/KafkaTriggerKedaAuthProviderTest.cs b/Kudu.Tests/Core/Function/KafkaTriggerKedaAuthProviderTest.cs index 168e9df1..ff3dabe1 100644 --- a/Kudu.Tests/Core/Function/KafkaTriggerKedaAuthProviderTest.cs +++ b/Kudu.Tests/Core/Function/KafkaTriggerKedaAuthProviderTest.cs @@ -9,16 +9,33 @@ namespace Kudu.Tests.Core.Function { public class KafkaTriggerKedaAuthProviderTest { + [Fact] + public void TestPopulateAuthenticationRef_Is_BindingCaseInsensitive() + { + KafkaTriggerKedaAuthProviderOverload kafkaTriggerKedaAuthProvider = new KafkaTriggerKedaAuthProviderOverload(); + string jsonText = @" + { + ""Protocol"": ""SaslSsl"", + ""authenticationMode"": ""Plain"", + ""username"": ""test"", + ""password"": ""test"" + }"; + + JToken jsonObj = JToken.Parse(jsonText); + IDictionary authRef = kafkaTriggerKedaAuthProvider.PopulateAuthenticationRef(jsonObj, "testFunctionName"); + Assert.Equal(1, authRef.Count); + } + [Fact] public void PopulateAuthenticationRef_With_ProtocolData() { KafkaTriggerKedaAuthProviderOverload kafkaTriggerKedaAuthProvider = new KafkaTriggerKedaAuthProviderOverload(); string jsonText = @" { - ""Protocol"": ""SASL_SSL"", - ""AuthenticationMode"": ""PLAINTEXT"", - ""Username"": ""test"", - ""Password"": ""test"" + ""protocol"": ""SaslSsl"", + ""authenticationMode"": ""Plain"", + ""username"": ""test"", + ""password"": ""test"" }"; JToken jsonObj = JToken.Parse(jsonText); @@ -32,10 +49,10 @@ public void PopulateAuthenticationRef_Fails_When_TriggerAuthCreationFails() KafkaTriggerKedaAuthProviderErrorMock kafkaTriggerKedaAuthProvider = new KafkaTriggerKedaAuthProviderErrorMock(); string jsonText = @" { - ""Protocol"": ""SASL_SSL"", - ""AuthenticationMode"": ""PLAINTEXT"", - ""Username"": ""test"", - ""Password"": ""test"" + ""protocol"": ""SaslSsl"", + ""authenticationMode"": ""Plain"", + ""username"": ""test"", + ""password"": ""test"" }"; JToken jsonObj = JToken.Parse(jsonText); @@ -43,6 +60,56 @@ public void PopulateAuthenticationRef_Fails_When_TriggerAuthCreationFails() Assert.Null(authRef); } + [Fact] + public void PopulateAuthenticationRef_Continues_When_AddSecretsFails() + { + KafkaTriggerKedaAuthProviderErrorMock kafkaTriggerKedaAuthProvider = new KafkaTriggerKedaAuthProviderErrorMock(); + string jsonText = @" + { + ""protocol"": ""SaslSsl"", + ""authenticationMode"": ""Plain"", + ""username"": ""test"", + ""password"": ""test"" + }"; + + JToken jsonObj = JToken.Parse(jsonText); + IDictionary authRef = kafkaTriggerKedaAuthProvider.PopulateAuthenticationRef(jsonObj, "testFunctionName"); + Assert.Equal(1, authRef.Count); + } + + [Fact] + public void TestIFTriggerAuthIsNull_With_NoAuthenticationMode() + { + KafkaTriggerKedaAuthProviderOverload kafkaTriggerKedaAuthProvider = new KafkaTriggerKedaAuthProviderOverload(); + string jsonText = @" + { + ""protocol"": ""SaslSsl"", + ""username"": ""test"", + ""password"": ""test"" + }"; + + JToken jsonObj = JToken.Parse(jsonText); + IDictionary authRef = kafkaTriggerKedaAuthProvider.PopulateAuthenticationRef(jsonObj, "testFunctionName"); + Assert.Null(authRef); + } + + [Fact] + public void TestIFTriggerAuthIsNull_With_NoProtocol() + { + KafkaTriggerKedaAuthProviderOverload kafkaTriggerKedaAuthProvider = new KafkaTriggerKedaAuthProviderOverload(); + string jsonText = @" + { + ""authenticationMode"": ""Plain"", + ""username"": ""test"", + ""password"": ""test"" + }"; + + JToken jsonObj = JToken.Parse(jsonText); + IDictionary authRef = kafkaTriggerKedaAuthProvider.PopulateAuthenticationRef(jsonObj, "testFunctionName"); + Assert.Null(authRef); + } + + private class KafkaTriggerKedaAuthProviderOverload : KafkaTriggerKedaAuthProvider { internal override void CreateTriggerAuthenticationRef(IDictionary secretKeyToKedaParam, string functionName) @@ -58,6 +125,11 @@ internal override void CreateTriggerAuthenticationRef(IDictionary secretsForAppSettings, string functionName) + { + throw new Exception("exception for unit test"); + } } } }