From ccacdb32c4b25c51311adea727cfc72864327a73 Mon Sep 17 00:00:00 2001 From: Charles d'Avernas Date: Mon, 6 Jan 2025 17:44:44 +0100 Subject: [PATCH] feat(Client): Added a new Kafka binding handler Signed-off-by: Charles d'Avernas --- Neuroglia.AsyncApi.sln | 7 + ...IAsyncApiClientOptionsBuilderExtensions.cs | 1 + ...roglia.AsyncApi.Client.Bindings.All.csproj | 1 + .../KafkaBindingHandlerOptions.cs | 25 ++ ...cApiConfigurationBuilderKafkaExtensions.cs | 35 +++ .../KafkaBindingHandler.cs | 136 ++++++++++ .../KafkaPublishOperationResult.cs | 58 +++++ .../KafkaSubscribeOperationResult.cs | 35 +++ .../KafkaSubscription.cs | 238 ++++++++++++++++++ ...glia.AsyncApi.Client.Bindings.Kafka.csproj | 40 +++ .../Usings.cs | 21 ++ .../NatsSubscription.cs | 1 - ...oglia.AsyncApi.Client.Bindings.Nats.csproj | 1 - .../Bindings/Amqp/AmqpChannelType.cs | 3 - .../Kafka/KafkaChannelBindingDefinition.cs | 26 +- .../Kafka/KafkaMessageBindingDefinition.cs | 3 - .../Kafka/KafkaOperationBindingDefinition.cs | 5 +- .../Kafka/KafkaServerBindingDefinition.cs | 14 +- .../Bindings/Kafka/KafkaTopicCleanupPolicy.cs | 42 ++++ .../Bindings/Kafka/KafkaTopicConfiguration.cs | 77 ++++++ .../Bindings/KafkaBindingHandlerTests.cs | 217 ++++++++++++++++ .../Containers/KafkaContainerBuilder.cs | 36 +++ 22 files changed, 1008 insertions(+), 14 deletions(-) create mode 100644 src/Neuroglia.AsyncApi.Client.Bindings.Kafka/Configuration/KafkaBindingHandlerOptions.cs create mode 100644 src/Neuroglia.AsyncApi.Client.Bindings.Kafka/Extensions/IAsyncApiConfigurationBuilderKafkaExtensions.cs create mode 100644 src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaBindingHandler.cs create mode 100644 src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaPublishOperationResult.cs create mode 100644 src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaSubscribeOperationResult.cs create mode 100644 src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaSubscription.cs create mode 100644 src/Neuroglia.AsyncApi.Client.Bindings.Kafka/Neuroglia.AsyncApi.Client.Bindings.Kafka.csproj create mode 100644 src/Neuroglia.AsyncApi.Client.Bindings.Kafka/Usings.cs create mode 100644 src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaTopicCleanupPolicy.cs create mode 100644 src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaTopicConfiguration.cs create mode 100644 tests/Neuroglia.AsyncApi.UnitTests/Cases/Client/Bindings/KafkaBindingHandlerTests.cs create mode 100644 tests/Neuroglia.AsyncApi.UnitTests/Containers/KafkaContainerBuilder.cs diff --git a/Neuroglia.AsyncApi.sln b/Neuroglia.AsyncApi.sln index 71fc0db..d71270a 100644 --- a/Neuroglia.AsyncApi.sln +++ b/Neuroglia.AsyncApi.sln @@ -65,6 +65,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Neuroglia.AsyncApi.Client.B EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Neuroglia.AsyncApi.Client.Bindings.Nats", "src\Neuroglia.AsyncApi.Client.Bindings.Nats\Neuroglia.AsyncApi.Client.Bindings.Nats.csproj", "{DCADC636-53BA-408A-A870-CC07FD63A3F3}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Neuroglia.AsyncApi.Client.Bindings.Kafka", "src\Neuroglia.AsyncApi.Client.Bindings.Kafka\Neuroglia.AsyncApi.Client.Bindings.Kafka.csproj", "{84AAE014-BD45-4B6F-96CE-01E364B802EB}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -139,6 +141,10 @@ Global {DCADC636-53BA-408A-A870-CC07FD63A3F3}.Debug|Any CPU.Build.0 = Debug|Any CPU {DCADC636-53BA-408A-A870-CC07FD63A3F3}.Release|Any CPU.ActiveCfg = Release|Any CPU {DCADC636-53BA-408A-A870-CC07FD63A3F3}.Release|Any CPU.Build.0 = Release|Any CPU + {84AAE014-BD45-4B6F-96CE-01E364B802EB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {84AAE014-BD45-4B6F-96CE-01E364B802EB}.Debug|Any CPU.Build.0 = Debug|Any CPU + {84AAE014-BD45-4B6F-96CE-01E364B802EB}.Release|Any CPU.ActiveCfg = Release|Any CPU + {84AAE014-BD45-4B6F-96CE-01E364B802EB}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -163,6 +169,7 @@ Global {8FD99A95-410A-4E38-AC7C-938894053E87} = {4B933DF9-CD24-44B1-AF64-0D5E75B9AB45} {D7665DAF-42CD-47E7-8A77-52C82B696D9B} = {4B933DF9-CD24-44B1-AF64-0D5E75B9AB45} {DCADC636-53BA-408A-A870-CC07FD63A3F3} = {4B933DF9-CD24-44B1-AF64-0D5E75B9AB45} + {84AAE014-BD45-4B6F-96CE-01E364B802EB} = {4B933DF9-CD24-44B1-AF64-0D5E75B9AB45} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {DC433DEB-01E5-4328-B0BB-6FFFE8C7363F} diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.All/Extensions/IAsyncApiClientOptionsBuilderExtensions.cs b/src/Neuroglia.AsyncApi.Client.Bindings.All/Extensions/IAsyncApiClientOptionsBuilderExtensions.cs index 797aadf..6e55b69 100644 --- a/src/Neuroglia.AsyncApi.Client.Bindings.All/Extensions/IAsyncApiClientOptionsBuilderExtensions.cs +++ b/src/Neuroglia.AsyncApi.Client.Bindings.All/Extensions/IAsyncApiClientOptionsBuilderExtensions.cs @@ -29,6 +29,7 @@ public static class IAsyncApiClientOptionsBuilderExtensions public static IAsyncApiClientOptionsBuilder AddAllBindingHandlers(this IAsyncApiClientOptionsBuilder builder) { builder.AddHttpBindingHandler(); + builder.AddKafkaBindingHandler(); builder.AddMqttBindingHandler(); builder.AddNatsBindingHandler(); builder.AddRedisBindingHandler(); diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.All/Neuroglia.AsyncApi.Client.Bindings.All.csproj b/src/Neuroglia.AsyncApi.Client.Bindings.All/Neuroglia.AsyncApi.Client.Bindings.All.csproj index 6b6c88a..a4e2209 100644 --- a/src/Neuroglia.AsyncApi.Client.Bindings.All/Neuroglia.AsyncApi.Client.Bindings.All.csproj +++ b/src/Neuroglia.AsyncApi.Client.Bindings.All/Neuroglia.AsyncApi.Client.Bindings.All.csproj @@ -30,6 +30,7 @@ + diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/Configuration/KafkaBindingHandlerOptions.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/Configuration/KafkaBindingHandlerOptions.cs new file mode 100644 index 0000000..52ecdb7 --- /dev/null +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/Configuration/KafkaBindingHandlerOptions.cs @@ -0,0 +1,25 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Neuroglia.AsyncApi.Client.Bindings.Kafka.Configuration; + +/// +/// Represents the options used to configure a +/// +public class KafkaBindingHandlerOptions + : BindingHandlerOptions +{ + + + +} diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/Extensions/IAsyncApiConfigurationBuilderKafkaExtensions.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/Extensions/IAsyncApiConfigurationBuilderKafkaExtensions.cs new file mode 100644 index 0000000..8f825fe --- /dev/null +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/Extensions/IAsyncApiConfigurationBuilderKafkaExtensions.cs @@ -0,0 +1,35 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Neuroglia.AsyncApi.Client.Bindings; + +/// +/// Defines extensions for s +/// +public static class IAsyncApiConfigurationBuilderKafkaExtensions +{ + + /// + /// Adds and configures an used to handle Kafka operations + /// + /// The extended + /// An , if any, used to setup the 's options + /// The configured + public static IAsyncApiClientOptionsBuilder AddKafkaBindingHandler(this IAsyncApiClientOptionsBuilder builder, Action? setup = null) + { + setup ??= _ => { }; + builder.AddBindingHandler(setup); + return builder; + } + +} diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaBindingHandler.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaBindingHandler.cs new file mode 100644 index 0000000..d4fb043 --- /dev/null +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaBindingHandler.cs @@ -0,0 +1,136 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Confluent.Kafka; +using Json.Schema; +using Microsoft.Extensions.DependencyInjection; +using System.Text; + +namespace Neuroglia.AsyncApi.Client.Bindings.Kafka; + +/// +/// Represents the default NATS implementation of the interface +/// +/// The current +/// The service used to perform logging +/// The service used to access the current +/// The service used to provide s +/// The service used to serialize/deserialize data to/from JSON +public class KafkaBindingHandler(IServiceProvider serviceProvider, ILogger logger, IOptions options, ISerializerProvider serializerProvider, IJsonSerializer jsonSerializer) + : IBindingHandler +{ + + /// + /// Gets the current + /// + protected IServiceProvider ServiceProvider { get; } = serviceProvider; + + /// + /// Gets the service used to perform logging + /// + protected ILogger Logger { get; } = logger; + + /// + /// Gets the current + /// + protected KafkaBindingHandlerOptions Options { get; } = options.Value; + + /// + /// Gets the service used to provide s + /// + protected ISerializerProvider SerializerProvider { get; } = serializerProvider; + + /// + /// Gets the service used to serialize/deserialize data to/from JSON + /// + protected IJsonSerializer JsonSerializer { get; } = jsonSerializer; + + /// + public virtual bool Supports(string protocol, string? protocolVersion) => protocol.Equals(AsyncApiProtocol.Kafka, StringComparison.OrdinalIgnoreCase); + + /// + public virtual async Task PublishAsync(AsyncApiPublishOperationContext context, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(context); + try + { + var serverBinding = context.ServerBinding as KafkaServerBindingDefinition; + var channelBinding = context.ChannelBinding as KafkaChannelBindingDefinition; + var operationBinding = context.OperationBinding as KafkaOperationBindingDefinition; + var messageBinding = context.MessageBinding as KafkaMessageBindingDefinition; + var producerConfig = new ProducerConfig + { + BootstrapServers = $"{context.Host}{context.Path}", + AllowAutoCreateTopics = true + }; + using var producer = new ProducerBuilder(producerConfig).Build(); + using var stream = new MemoryStream(); + var serializer = SerializerProvider.GetSerializersFor(context.ContentType).FirstOrDefault() ?? throw new NullReferenceException($"Failed to find a serializer for the specified content type '{context.ContentType}'"); + serializer.Serialize(context.Payload ?? new { }, stream); + await stream.FlushAsync(cancellationToken).ConfigureAwait(false); + stream.Position = 0; + var payload = stream.ToArray(); + var headers = context.Headers == null ? null : new Headers(); + if (headers != null && context.Headers != null) foreach (var header in context.Headers.ToDictionary()!) headers.Add(header.Key, serializer.SerializeToByteArray(header.Value)); + var message = new Message() + { + Value = payload, + Headers = headers, + }; + var topic = channelBinding?.Topic ?? context.Channel!; + var result = await producer.ProduceAsync(topic, message, cancellationToken).ConfigureAwait(false); + return new KafkaPublishOperationResult() + { + PersistenceStatus = result.Status, + Partition = result.Partition, + Offset = result.Offset, + TopicPartition = result.TopicPartition, + TopicPartitionOffset = result.TopicPartitionOffset + }; + } + catch (Exception ex) + { + return new KafkaPublishOperationResult() { Exception = ex }; + } + } + + /// + public virtual Task SubscribeAsync(AsyncApiSubscribeOperationContext context, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(context); + try + { + var serverBinding = context.ServerBinding as KafkaServerBindingDefinition; + var channelBinding = context.ChannelBinding as KafkaChannelBindingDefinition; + var operationBinding = context.OperationBinding as KafkaOperationBindingDefinition; + var groupId = operationBinding?.GroupId?.GetKeyword()?.Value ?? operationBinding?.GroupId?.GetKeyword()?.Values?[0]; + var clientId = operationBinding?.ClientId?.GetKeyword()?.Value ?? operationBinding?.ClientId?.GetKeyword()?.Values?[0]; + var topic = channelBinding?.Topic ?? context.Channel!; + var consumerConfig = new ConsumerConfig + { + BootstrapServers = $"{context.Host}{context.Path}", + GroupId = groupId == null ? null : JsonSerializer.Deserialize(groupId), + ClientId = clientId == null ? null : JsonSerializer.Deserialize(clientId) + }; + var consumer = new ConsumerBuilder(consumerConfig).Build(); + consumer.Subscribe(topic); + var subscription = ActivatorUtilities.CreateInstance(ServiceProvider, context.Document, context.Messages, context.DefaultContentType, consumer); + return Task.FromResult(new KafkaSubscribeOperationResult(subscription)); + } + catch(Exception ex) + { + return Task.FromResult(new KafkaSubscribeOperationResult() { Exception = ex }); + } + } + +} \ No newline at end of file diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaPublishOperationResult.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaPublishOperationResult.cs new file mode 100644 index 0000000..44aa19d --- /dev/null +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaPublishOperationResult.cs @@ -0,0 +1,58 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Confluent.Kafka; + +namespace Neuroglia.AsyncApi.Client.Bindings.Kafka; + +/// +/// Represents an object used to describe the result of a Kafka publish operation +/// +public class KafkaPublishOperationResult + : AsyncApiPublishOperationResult +{ + + /// + /// Gets the persistence status of the published message + /// + public virtual PersistenceStatus? PersistenceStatus { get; init; } + + /// + /// Gets the partition associated with the published message + /// + public virtual Partition? Partition { get; init; } + + /// + /// Gets the partition offset associated with the published message + /// + public virtual Offset? Offset { get; init; } + + /// + /// Gets the topic partition associated with the published message + /// + public virtual TopicPartition? TopicPartition { get; init; } + + /// + /// Gets the topic partition offset associated with the published message + /// + public virtual TopicPartitionOffset? TopicPartitionOffset { get; init; } + + /// + /// Gets/sets the , if any, that occurred during publishing + /// + public virtual Exception? Exception { get; init; } + + /// + public override bool IsSuccessful => Exception == null; + +} diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaSubscribeOperationResult.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaSubscribeOperationResult.cs new file mode 100644 index 0000000..d3266db --- /dev/null +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaSubscribeOperationResult.cs @@ -0,0 +1,35 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Neuroglia.AsyncApi.Client.Bindings.Kafka; + +/// +/// Represents an object used to describe the result of a Kafka subscribe operation +/// +/// An , if any, used to observe incoming s +public class KafkaSubscribeOperationResult(IObservable? messages = null) + : AsyncApiSubscribeOperationResult +{ + + /// + /// Gets/sets the , if any, that occurred during subscription + /// + public virtual Exception? Exception { get; init; } + + /// + public override IObservable? Messages { get; } = messages; + + /// + public override bool IsSuccessful => Exception == null; + +} \ No newline at end of file diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaSubscription.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaSubscription.cs new file mode 100644 index 0000000..e17c95e --- /dev/null +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaSubscription.cs @@ -0,0 +1,238 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Confluent.Kafka; +using Neuroglia.AsyncApi.v3; +using System.Reactive.Subjects; +using System.Text; +using YamlDotNet.Core.Tokens; + +namespace Neuroglia.AsyncApi.Client.Bindings.Kafka; + +/// +/// Represents a subscription to a Kafka channel, used to stream s +/// +public class KafkaSubscription + : IObservable, IDisposable, IAsyncDisposable +{ + + bool _disposed; + + /// + /// Initializes a new + /// + /// The service used to perform logging + /// The service used to consume Kafka messages + /// The content type of consumed messages + /// The service used to evaluate runtime expressions + /// The service used to provide s + /// The service used to provide s + /// The that defines the operation for which to consume MQTT messages + /// An containing the definitions of all messages that can potentially be consumed + public KafkaSubscription(ILogger logger, IConsumer consumer, string messageContentType, IRuntimeExpressionEvaluator runtimeExpressionEvaluator, ISchemaHandlerProvider schemaHandlerProvider, ISerializerProvider serializerProvider, V3AsyncApiDocument document, IEnumerable messageDefinitions) + { + Logger = logger; + Consumer = consumer; + MessageContentType = messageContentType; + RuntimeExpressionEvaluator = runtimeExpressionEvaluator; + SchemaHandlerProvider = schemaHandlerProvider; + SerializerProvider = serializerProvider; + Document = document; + MessageDefinitions = messageDefinitions; + _ = Task.Run(ReadAsync); + } + + /// + /// Gets the service used to perform logging + /// + protected ILogger Logger { get; } + + /// + /// Gets the service used to consume Kafka messages + /// + protected IConsumer Consumer { get; } + + /// + /// Gets the content type of consumed messages + /// + protected string MessageContentType { get; } + + /// + /// Gets the service used to evaluate runtime expressions + /// + protected IRuntimeExpressionEvaluator RuntimeExpressionEvaluator { get; } + + /// + /// Gets the service used to provide s + /// + protected ISchemaHandlerProvider SchemaHandlerProvider { get; } + + /// + /// Gets the service used to provide s + /// + protected ISerializerProvider SerializerProvider { get; } + + /// + /// Gets the that defines the operation for which to consume MQTT messages + /// + protected V3AsyncApiDocument Document { get; } + + /// + /// Gets an containing the definitions of all messages that can potentially be consumed + /// + protected IEnumerable MessageDefinitions { get; } + + /// + /// Gets the 's + /// + protected CancellationTokenSource CancellationTokenSource { get; } = new(); + + /// + /// Gets the used to observe consumed + /// + protected Subject Subject { get; } = new(); + + /// + public virtual IDisposable Subscribe(IObserver observer) => Subject.Subscribe(observer); + + /// + /// Reads s from the underlying + /// + /// A new awaitable + protected virtual async Task ReadAsync() + { + while (!CancellationTokenSource.IsCancellationRequested) + { + try + { + var consumeResult = Consumer.Consume(CancellationTokenSource.Token); + if (consumeResult == null) continue; + var buffer = consumeResult.Message.Value; + if (buffer == null) return; + using var stream = new MemoryStream(buffer); + var serializer = SerializerProvider.GetSerializersFor(MessageContentType).FirstOrDefault() ?? throw new NullReferenceException($"Failed to find a serializer for the specified content type '{MessageContentType}'"); + var payload = serializer.Deserialize(stream); + var headers = consumeResult.Message.Headers?.ToDictionary(kvp => kvp.Key, kvp => serializer.Deserialize(kvp.GetValueBytes())); + var messageDefinition = await MessageDefinitions.ToAsyncEnumerable().SingleOrDefaultAwaitAsync(async m => await MessageMatchesAsync(payload, headers, m, CancellationTokenSource.Token).ConfigureAwait(false), CancellationTokenSource.Token).ConfigureAwait(false) ?? throw new NullReferenceException("Failed to resolve the message definition for the specified operation. Make sure the message matches one and only one of the message definitions configured for the specified operation"); ; + var correlationId = string.Empty; + if (messageDefinition.CorrelationId != null) + { + var correlationIdDefinition = messageDefinition.CorrelationId.IsReference ? Document.DereferenceCorrelationId(messageDefinition.CorrelationId.Reference!) : messageDefinition.CorrelationId; + correlationId = await RuntimeExpressionEvaluator.EvaluateAsync(correlationIdDefinition.Location, payload, headers, CancellationTokenSource.Token).ConfigureAwait(false); + } + var message = new AsyncApiMessage(MessageContentType, payload, headers, correlationId); + Subject.OnNext(message); + } + catch (Exception ex) + { + Logger.LogError("An error occurred while consuming a Kafka message: {ex}", ex); + } + } + } + + /// + /// Determines whether or not the specified payload matches the specified + /// + /// The message's payload, if any + /// The message's headers, if any + /// The to check + /// A + /// A boolean indicating whether or not the specified matches the specified + protected virtual async Task MessageMatchesAsync(object? payload, object? headers, V3MessageDefinition message, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(message); + if (message.Payload != null) + { + var schemaDefinition = message.Payload.IsReference ? Document.DereferenceSchema(message.Payload.Reference!) : message.Payload; + var schemaFormat = message.Payload.SchemaFormat ?? SchemaFormat.AsyncApi; + var schemaHandler = SchemaHandlerProvider.GetHandler(schemaFormat); + if (schemaHandler == null) this.Logger.LogWarning("Failed to find an handler used to validate the specified schema format '{schemaFormat}", schemaFormat); + else + { + var result = await schemaHandler.ValidateAsync(payload ?? new { }, schemaDefinition.Schema, cancellationToken).ConfigureAwait(false); + if (!result.IsSuccess()) return false; + } + } + if (message.Headers != null) + { + var schemaDefinition = message.Headers.IsReference ? Document.DereferenceSchema(message.Headers.Reference!) : message.Headers; + var schemaFormat = message.Headers.SchemaFormat ?? SchemaFormat.AsyncApi; + var schemaHandler = SchemaHandlerProvider.GetHandler(schemaFormat); + if (schemaHandler == null) this.Logger.LogWarning("Failed to find an handler used to validate the specified schema format '{schemaFormat}", schemaFormat); + else + { + var result = await schemaHandler.ValidateAsync(headers ?? new { }, schemaDefinition.Schema, cancellationToken).ConfigureAwait(false); + if (!result.IsSuccess()) return false; + } + } + if (message.CorrelationId != null) + { + var correlationIdDefinition = message.CorrelationId.IsReference ? Document.DereferenceCorrelationId(message.CorrelationId.Reference!) : message.CorrelationId; + var correlationId = await RuntimeExpressionEvaluator.EvaluateAsync(correlationIdDefinition.Location, payload, headers, cancellationToken).ConfigureAwait(false); + if (string.IsNullOrWhiteSpace(correlationId)) return false; + } + return true; + } + + /// + /// Disposes of the + /// + /// A boolean indicating whether or not the is being disposed of + protected virtual void Dispose(bool disposing) + { + if (!_disposed) + { + if (disposing) + { + CancellationTokenSource.Dispose(); + Consumer.Dispose(); + Subject.Dispose(); + } + _disposed = true; + } + } + + /// + public void Dispose() + { + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + + /// + /// Disposes of the + /// + /// A boolean indicating whether or not the is being disposed of + protected virtual ValueTask DisposeAsync(bool disposing) + { + if (!_disposed) + { + if (disposing) + { + CancellationTokenSource.Dispose(); + Consumer.Dispose(); + Subject.Dispose(); + } + _disposed = true; + } + return ValueTask.CompletedTask; + } + + /// + public async ValueTask DisposeAsync() + { + await DisposeAsync(disposing: true).ConfigureAwait(false); + GC.SuppressFinalize(this); + } + +} \ No newline at end of file diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/Neuroglia.AsyncApi.Client.Bindings.Kafka.csproj b/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/Neuroglia.AsyncApi.Client.Bindings.Kafka.csproj new file mode 100644 index 0000000..67b1e10 --- /dev/null +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/Neuroglia.AsyncApi.Client.Bindings.Kafka.csproj @@ -0,0 +1,40 @@ + + + + net9.0 + enable + enable + True + Neuroglia SRL + Copyright © 2023-Present Neuroglia SRL. All rights reserved. + Neuroglia SRL + https://github.com/neuroglia-io/asyncapi + git + neuroglia asyncapi async api client binding kafka + 3.0.1 + en + Apache-2.0 + true + true + logo_white_on_blue_256.png + true + embedded + + + + + True + \ + + + + + + + + + + + + + diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/Usings.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/Usings.cs new file mode 100644 index 0000000..0b4df1d --- /dev/null +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/Usings.cs @@ -0,0 +1,21 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +global using Microsoft.Extensions.Logging; +global using Microsoft.Extensions.Options; +global using Neuroglia.AsyncApi.Bindings.Kafka; +global using Neuroglia.AsyncApi.Client.Bindings.Kafka; +global using Neuroglia.AsyncApi.Client.Bindings.Kafka.Configuration; +global using Neuroglia.AsyncApi.Client.Configuration; +global using Neuroglia.AsyncApi.Client.Services; +global using Neuroglia.Serialization; diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Nats/NatsSubscription.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Nats/NatsSubscription.cs index 09e288a..ffc06dd 100644 --- a/src/Neuroglia.AsyncApi.Client.Bindings.Nats/NatsSubscription.cs +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Nats/NatsSubscription.cs @@ -11,7 +11,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -using MQTTnet; using NATS.Client.Core; using Neuroglia.AsyncApi.v3; using System.Reactive.Subjects; diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Nats/Neuroglia.AsyncApi.Client.Bindings.Nats.csproj b/src/Neuroglia.AsyncApi.Client.Bindings.Nats/Neuroglia.AsyncApi.Client.Bindings.Nats.csproj index ea6d0ee..27488f5 100644 --- a/src/Neuroglia.AsyncApi.Client.Bindings.Nats/Neuroglia.AsyncApi.Client.Bindings.Nats.csproj +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Nats/Neuroglia.AsyncApi.Client.Bindings.Nats.csproj @@ -29,7 +29,6 @@ - diff --git a/src/Neuroglia.AsyncApi.Core/Bindings/Amqp/AmqpChannelType.cs b/src/Neuroglia.AsyncApi.Core/Bindings/Amqp/AmqpChannelType.cs index 02346c0..1aa79ee 100644 --- a/src/Neuroglia.AsyncApi.Core/Bindings/Amqp/AmqpChannelType.cs +++ b/src/Neuroglia.AsyncApi.Core/Bindings/Amqp/AmqpChannelType.cs @@ -11,9 +11,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -using Neuroglia.Serialization.Json.Converters; -using System.ComponentModel; - namespace Neuroglia.AsyncApi.Bindings.Amqp; /// diff --git a/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaChannelBindingDefinition.cs b/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaChannelBindingDefinition.cs index 89f9c77..f02390c 100644 --- a/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaChannelBindingDefinition.cs +++ b/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaChannelBindingDefinition.cs @@ -21,10 +21,34 @@ public record KafkaChannelBindingDefinition : KafkaBindingDefinition, IChannelBindingDefinition { + /// + /// Gets/sets the topic name, if different from channel name. + /// + [DataMember(Order = 1, Name = "topic"), JsonPropertyOrder(1), JsonPropertyName("topic"), YamlMember(Order = 1, Alias = "topic")] + public virtual string? Topic { get; set; } + + /// + /// Gets/sets the number of partitions configured on this topic (useful to know how many parallel consumers you may run). + /// + [DataMember(Order = 2, Name = "partitions"), JsonPropertyOrder(2), JsonPropertyName("partitions"), YamlMember(Order = 2, Alias = "partitions")] + public virtual uint? Partitions { get; set; } + + /// + /// Gets/sets the number of replicas configured on this topic. + /// + [DataMember(Order = 3, Name = "replicas"), JsonPropertyOrder(3), JsonPropertyName("replicas"), YamlMember(Order = 3, Alias = "replicas")] + public virtual uint? Replicas { get; set; } + + /// + /// Gets/sets the topic configuration properties that are relevant for the API. + /// + [DataMember(Order = 4, Name = "topicConfiguration"), JsonPropertyOrder(4), JsonPropertyName("topicConfiguration"), YamlMember(Order = 4, Alias = "topicConfiguration")] + public virtual KafkaTopicConfiguration? TopicConfiguration { get; set; } + /// /// Gets/sets the version of this binding. Defaults to 'latest'. /// - [DataMember(Order = 1, Name = "bindingVersion"), JsonPropertyOrder(1), JsonPropertyName("bindingVersion"), YamlMember(Order = 1, Alias = "bindingVersion")] + [DataMember(Order = 5, Name = "bindingVersion"), JsonPropertyOrder(5), JsonPropertyName("bindingVersion"), YamlMember(Order = 5, Alias = "bindingVersion")] public virtual string BindingVersion { get; set; } = "latest"; } diff --git a/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaMessageBindingDefinition.cs b/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaMessageBindingDefinition.cs index 141f7b8..de06d73 100644 --- a/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaMessageBindingDefinition.cs +++ b/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaMessageBindingDefinition.cs @@ -11,9 +11,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -using Json.Schema; -using Neuroglia.AsyncApi.Bindings; - namespace Neuroglia.AsyncApi.Bindings.Kafka; /// diff --git a/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaOperationBindingDefinition.cs b/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaOperationBindingDefinition.cs index 856a092..2d8a708 100644 --- a/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaOperationBindingDefinition.cs +++ b/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaOperationBindingDefinition.cs @@ -11,9 +11,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -using Json.Schema; -using Neuroglia.AsyncApi.Bindings; - namespace Neuroglia.AsyncApi.Bindings.Kafka; /// @@ -39,7 +36,7 @@ public record KafkaOperationBindingDefinition /// /// Gets/sets the version of this binding. Defaults to 'latest'. /// - [DataMember(Order = 3, Name = "bindingVersion"), JsonPropertyOrder(3), JsonPropertyName("groubindingVersionpId"), YamlMember(Order = 3, Alias = "bindingVersion")] + [DataMember(Order = 3, Name = "bindingVersion"), JsonPropertyOrder(3), JsonPropertyName("bindingVersion"), YamlMember(Order = 3, Alias = "bindingVersion")] public virtual string BindingVersion { get; set; } = "latest"; } diff --git a/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaServerBindingDefinition.cs b/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaServerBindingDefinition.cs index 0b828ce..c9b0001 100644 --- a/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaServerBindingDefinition.cs +++ b/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaServerBindingDefinition.cs @@ -21,10 +21,22 @@ public record KafkaServerBindingDefinition : KafkaBindingDefinition, IServerBindingDefinition { + /// + /// Gets/sets the API URL for the Schema Registry used when producing Kafka messages (if a Schema Registry was used). + /// + [DataMember(Order = 1, Name = "schemaRegistryUrl"), JsonPropertyOrder(1), JsonPropertyName("schemaRegistryUrl"), YamlMember(Order = 1, Alias = "schemaRegistryUrl")] + public virtual Uri? SchemaRegistryUrl { get; set; } + + /// + /// Gets/sets the vendor of Schema Registry and Kafka serdes library that should be used (e.g. apicurio, confluent, ibm, or karapace) + /// + [DataMember(Order = 2, Name = "schemaRegistryVendor"), JsonPropertyOrder(2), JsonPropertyName("schemaRegistryVendor"), YamlMember(Order = 2, Alias = "schemaRegistryVendor")] + public virtual string? SchemaRegistryVendor { get; set; } + /// /// Gets/sets the version of this binding. Defaults to 'latest'. /// - [DataMember(Order = 1, Name = "bindingVersion"), JsonPropertyOrder(1), JsonPropertyName("bindingVersion"), YamlMember(Order = 1, Alias = "bindingVersion")] + [DataMember(Order = 3, Name = "bindingVersion"), JsonPropertyOrder(3), JsonPropertyName("bindingVersion"), YamlMember(Order = 3, Alias = "bindingVersion")] public virtual string BindingVersion { get; set; } = "latest"; } diff --git a/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaTopicCleanupPolicy.cs b/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaTopicCleanupPolicy.cs new file mode 100644 index 0000000..56d91d2 --- /dev/null +++ b/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaTopicCleanupPolicy.cs @@ -0,0 +1,42 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Neuroglia.AsyncApi.Bindings.Kafka; + +/// +/// Enumerates all supported topic cleanup policies +/// +public static class KafkaTopicCleanupPolicy +{ + + /// + /// Gets the name of the policy used to discard old segments when their retention time or size limit has been reached + /// + public const string Delete = "delete"; + + /// + /// Gets the name of the policy used to enable log compaction, which retains the latest value for each key + /// + public const string Compact = "compact"; + + /// + /// Gets a new containing all supported topic cleanup policies + /// + /// A new containing all supported topic cleanup policies + public static IEnumerable AsEnumerable() + { + yield return Delete; + yield return Compact; + } + +} \ No newline at end of file diff --git a/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaTopicConfiguration.cs b/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaTopicConfiguration.cs new file mode 100644 index 0000000..c728f7b --- /dev/null +++ b/src/Neuroglia.AsyncApi.Core/Bindings/Kafka/KafkaTopicConfiguration.cs @@ -0,0 +1,77 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Neuroglia.AsyncApi.Bindings.Kafka; + +/// +/// Represents an object used to configure a Kafka topic +/// +[DataContract] +public record KafkaTopicConfiguration +{ + + /// + /// Gets/sets the cleanup policy, if any + /// + [DataMember(Order = 1, Name = "cleanup.policy"), JsonPropertyOrder(1), JsonPropertyName("cleanup.policy"), YamlMember(Order = 1, Alias = "cleanup.policy")] + public virtual string[]? CleanupPolicy { get; set; } + + /// + /// Gets/sets the retention duration in milliseconds, if any + /// + [DataMember(Order = 2, Name = "retention.ms"), JsonPropertyOrder(2), JsonPropertyName("retention.ms"), YamlMember(Order = 2, Alias = "retention.ms")] + public virtual long? RetentionMilliseconds { get; set; } + + /// + /// Gets/sets the retention bytes, if any + /// + [DataMember(Order = 3, Name = "retention.bytes"), JsonPropertyOrder(3), JsonPropertyName("retention.bytes"), YamlMember(Order = 3, Alias = "retention.bytes")] + public virtual long? RetentionBytes { get; set; } + + /// + /// Gets/sets the delete retention duration in milliseconds, if any + /// + [DataMember(Order = 4, Name = "delete.retention.ms"), JsonPropertyOrder(4), JsonPropertyName("delete.retention.ms"), YamlMember(Order = 4, Alias = "delete.retention.ms")] + public virtual long? DeleteRetentionMilliseconds { get; set; } + + /// + /// Gets/sets the maximum length in bytes, if any, for the topic's messages + /// + [DataMember(Order = 5, Name = "max.message.bytes"), JsonPropertyOrder(5), JsonPropertyName("max.message.bytes"), YamlMember(Order = 5, Alias = "max.message.bytes")] + public virtual int? MaxMessageBytes { get; set; } + + /// + /// Gets/sets a boolean indicating whether or not to validate the key schema. This configuration is specific to Confluent + /// + [DataMember(Order = 6, Name = "confluent.key.schema.validation"), JsonPropertyOrder(6), JsonPropertyName("confluent.key.schema.validation"), YamlMember(Order = 6, Alias = "confluent.key.schema.validation")] + public virtual bool ConfluentKeySchemaValidation { get; set; } + + /// + /// Gets/sets the name of the schema lookup strategy for the message key. This configuration is specific to Confluent + /// + [DataMember(Order = 7, Name = "confluent.key.subject.name.strategy"), JsonPropertyOrder(7), JsonPropertyName("confluent.key.subject.name.strategy"), YamlMember(Order = 7, Alias = "confluent.key.subject.name.strategy")] + public virtual bool ConfluentKeySubjectNameStrategy { get; set; } + + /// + /// Gets/sets a boolean indicating whether or not whether the schema validation for the message value is enabled. This configuration is specific to Confluent + /// + [DataMember(Order = 8, Name = "confluent.value.schema.validation"), JsonPropertyOrder(8), JsonPropertyName("confluent.value.schema.validation"), YamlMember(Order = 8, Alias = "confluent.value.schema.validation")] + public virtual bool ConfluentValueSchemaValidation { get; set; } + + /// + /// Gets/sets the name of the schema lookup strategy for the message key. This configuration is specific to Confluent + /// + [DataMember(Order = 9, Name = "confluent.value.subject.name.strategy"), JsonPropertyOrder(9), JsonPropertyName("confluent.value.subject.name.strategy"), YamlMember(Order = 9, Alias = "confluent.value.subject.name.strategy")] + public virtual bool ConfluentValueSubjectNameStrategy { get; set; } + +} diff --git a/tests/Neuroglia.AsyncApi.UnitTests/Cases/Client/Bindings/KafkaBindingHandlerTests.cs b/tests/Neuroglia.AsyncApi.UnitTests/Cases/Client/Bindings/KafkaBindingHandlerTests.cs new file mode 100644 index 0000000..99cff76 --- /dev/null +++ b/tests/Neuroglia.AsyncApi.UnitTests/Cases/Client/Bindings/KafkaBindingHandlerTests.cs @@ -0,0 +1,217 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Confluent.Kafka; +using Neuroglia.AsyncApi.Bindings.Kafka; +using Neuroglia.AsyncApi.Client; +using Neuroglia.AsyncApi.Client.Bindings; +using Neuroglia.AsyncApi.UnitTests.Containers; +using System.Text; + +namespace Neuroglia.AsyncApi.UnitTests.Cases.Client.Bindings; + +public class KafkaBindingHandlerTests + : BindingHandlerTestsBase +{ + public KafkaBindingHandlerTests() + : base(builder => builder.AddKafkaBindingHandler(), ConfigureServices) + { + + } + + [Fact] + public async Task Publish_Should_Work() + { + //arrange + var serverId = "kafka-server"; + var channelId = "cloud-events"; + var operationId = "publishCloudEvent"; + var messageId = "cloudEvent"; + var stringSchema = new JsonSchemaBuilder().Type(SchemaValueType.String).Build(); + var objectSchema = new JsonSchemaBuilder().Type(SchemaValueType.Object).AdditionalProperties(true).Build(); + var document = DocumentBuilder + .UsingAsyncApiV3() + .WithTitle("Test Kafka API") + .WithVersion("1.0.0") + .WithServer(serverId, server => server + .WithHost($"localhost:{ServiceProvider.GetRequiredKeyedService("kafka").GetMappedPublicPort(KafkaContainerBuilder.PublicPort)}") + .WithProtocol(AsyncApiProtocol.Kafka) + .WithBinding(new KafkaServerBindingDefinition())) + .WithChannel(channelId, channel => channel + .WithAddress("cloud-event") + .WithServer($"#/servers/{serverId}") + .WithMessage(messageId, message => message + .WithContentType(CloudEventContentType.Json) + .WithPayloadSchema(schemaDefinition => schemaDefinition + .WithJsonSchema(schema => schema + .Type(SchemaValueType.Object) + .Properties(new Dictionary() + { + { CloudEventAttributes.SpecVersion, stringSchema }, + { CloudEventAttributes.Id, stringSchema }, + { CloudEventAttributes.Time, stringSchema }, + { CloudEventAttributes.Source, stringSchema }, + { CloudEventAttributes.Type, stringSchema }, + { CloudEventAttributes.Subject, stringSchema }, + { CloudEventAttributes.DataSchema, stringSchema }, + { CloudEventAttributes.DataContentType, stringSchema }, + { CloudEventAttributes.Data, objectSchema }, + }) + .Required(CloudEventAttributes.GetRequiredAttributes()) + .AdditionalProperties(true))) + .WithBinding(new KafkaMessageBindingDefinition())) + .WithBinding(new KafkaChannelBindingDefinition())) + .WithOperation(operationId, operation => operation + .WithAction(v3.V3OperationAction.Receive) + .WithChannel($"#/channels/{channelId}") + .WithMessage($"#/channels/{channelId}/messages/{messageId}") + .WithBinding(new KafkaOperationBindingDefinition())) + .Build(); + await using var client = ClientFactory.CreateFor(document); + + //act + var e = new CloudEvent() + { + Id = Guid.NewGuid().ToString(), + SpecVersion = CloudEventSpecVersion.V1.Version, + Source = new("https://unit-tests.v3.asyncapi.neuroglia.io"), + Type = "io.neuroglia.asyncapi.v3.test.v1", + DataContentType = MediaTypeNames.Application.Json, + Data = new + { + Greetings = "Hello, World!" + } + }; + var parameters = new AsyncApiPublishOperationParameters(operationId) + { + Payload = e + }; + await using var result = await client.PublishAsync(parameters); + + //assert + result.IsSuccessful.Should().BeTrue(); + } + + [Fact] + public async Task Subscribe_Should_Work() + { + //arrange + var serverId = "kafka-server"; + var serverAddress = $"localhost:{ServiceProvider.GetRequiredKeyedService("kafka").GetMappedPublicPort(KafkaContainerBuilder.PublicPort)}"; + var channelId = "cloud-events"; + var channelAddress = "cloud-event"; + var operationId = "subscribeToCloudEvents"; + var messageId = "cloud-event"; + var stringSchema = new JsonSchemaBuilder().Type(SchemaValueType.String).Build(); + var objectSchema = new JsonSchemaBuilder().Type(SchemaValueType.Object).AdditionalProperties(true).Build(); + var document = DocumentBuilder + .UsingAsyncApiV3() + .WithTitle("Test Kafka API") + .WithVersion("1.0.0") + .WithServer(serverId, server => server + .WithHost(serverAddress) + .WithProtocol(AsyncApiProtocol.Kafka) + .WithBinding(new KafkaServerBindingDefinition())) + .WithChannel(channelId, channel => channel + .WithAddress(channelAddress) + .WithServer($"#/servers/{serverId}") + .WithMessage(messageId, message => message + .WithContentType(CloudEventContentType.Json) + .WithPayloadSchema(schemaDefinition => schemaDefinition + .WithJsonSchema(schema => schema + .Type(SchemaValueType.Object) + .Properties(new Dictionary() + { + { CloudEventAttributes.SpecVersion, stringSchema }, + { CloudEventAttributes.Id, stringSchema }, + { CloudEventAttributes.Time, stringSchema }, + { CloudEventAttributes.Source, stringSchema }, + { CloudEventAttributes.Type, stringSchema }, + { CloudEventAttributes.Subject, stringSchema }, + { CloudEventAttributes.DataSchema, stringSchema }, + { CloudEventAttributes.DataContentType, stringSchema }, + { CloudEventAttributes.Data, objectSchema }, + }) + .Required(CloudEventAttributes.GetRequiredAttributes()) + .AdditionalProperties(true))) + .WithBinding(new KafkaMessageBindingDefinition())) + .WithBinding(new KafkaChannelBindingDefinition())) + .WithOperation(operationId, operation => operation + .WithAction(v3.V3OperationAction.Send) + .WithChannel($"#/channels/{channelId}") + .WithMessage($"#/channels/{channelId}/messages/{messageId}") + .WithBinding(new KafkaOperationBindingDefinition() + { + GroupId = new JsonSchemaBuilder().Type(SchemaValueType.String).Default("test-consumer-group") + })) + .Build(); + await using var client = ClientFactory.CreateFor(document); + + //act + var parameters = new AsyncApiSubscribeOperationParameters(operationId); + await using var result = await client.SubscribeAsync(parameters); + var messageCount = 10; + var messagesToSend = new List>(); + for (var i = 0; i < messageCount; i++) + { + var e = new CloudEvent() + { + Id = Guid.NewGuid().ToString(), + SpecVersion = CloudEventSpecVersion.V1.Version, + Source = new("https://unit-tests.v3.asyncapi.neuroglia.io"), + Subject = i.ToString(), + Type = "io.neuroglia.asyncapi.v3.test.v1", + DataContentType = MediaTypeNames.Application.Json, + Data = new + { + Greetings = "Hello, World!" + } + }; + var payload = JsonSerializer.Default.SerializeToByteArray(e)!; + var headers = new Headers + { + { "Header1", JsonSerializer.Default.SerializeToByteArray("value1") }, + { "Header2", JsonSerializer.Default.SerializeToByteArray("value2") }, + { "Header3", JsonSerializer.Default.SerializeToByteArray("value3") } + }; + messagesToSend.Add(new() + { + Value = payload, + Headers = headers + }); + } + var messagesReceived = new List(); + var subscription = result.Messages?.Subscribe(messagesReceived.Add); + var producerConfig = new ProducerConfig() + { + BootstrapServers = serverAddress + }; + using var producer = new ProducerBuilder(producerConfig).Build(); + foreach (var message in messagesToSend) await producer.ProduceAsync(channelAddress, message); + await Task.Delay(3500); + subscription?.Dispose(); + + //assert + result.IsSuccessful.Should().BeTrue(); + result.Messages.Should().NotBeNull(); + messagesReceived.Should().NotBeEmpty(); + } + + static void ConfigureServices(IServiceCollection services) + { + services.AddKeyedSingleton("kafka", KafkaContainerBuilder.Build()); + services.AddSingleton(provider => provider.GetRequiredKeyedService("kafka")); + services.AddHostedService(); + } + +} \ No newline at end of file diff --git a/tests/Neuroglia.AsyncApi.UnitTests/Containers/KafkaContainerBuilder.cs b/tests/Neuroglia.AsyncApi.UnitTests/Containers/KafkaContainerBuilder.cs new file mode 100644 index 0000000..8198b65 --- /dev/null +++ b/tests/Neuroglia.AsyncApi.UnitTests/Containers/KafkaContainerBuilder.cs @@ -0,0 +1,36 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using DotNet.Testcontainers.Builders; +using DotNet.Testcontainers.Containers; + +namespace Neuroglia.AsyncApi.UnitTests.Containers; + +public static class KafkaContainerBuilder +{ + + public const int PublicPort = 9092; + + public static IContainer Build() + { + return new ContainerBuilder() + .WithName($"kafka-{Guid.NewGuid():N}") + .WithImage("confluentinc/confluent-local") + .WithPortBinding(PublicPort, PublicPort) + .WithWaitStrategy(Wait + .ForUnixContainer() + .UntilMessageIsLogged(".* Kafka Server started .*")) + .Build(); + } + +}