From a7e29603ef5441ae4c131147b72a63e8c07ffc63 Mon Sep 17 00:00:00 2001 From: Charles d'Avernas Date: Mon, 6 Jan 2025 22:44:34 +0100 Subject: [PATCH] feat(Client): Added a new AMQP binding handler Signed-off-by: Charles d'Avernas --- Neuroglia.AsyncApi.sln | 7 + ...IAsyncApiClientOptionsBuilderExtensions.cs | 1 + ...roglia.AsyncApi.Client.Bindings.All.csproj | 1 + .../AmqpBindingHandler.cs | 190 +++++++++++++ .../AmqpPublishOperationResult.cs | 31 +++ .../AmqpSubscribeOperationResult.cs | 35 +++ .../AmqpSubscription.cs | 250 ++++++++++++++++++ .../AmqpBindingHandlerOptions.cs | 25 ++ ...ncApiConfigurationBuilderAmqpExtensions.cs | 35 +++ ...oglia.AsyncApi.Client.Bindings.Amqp.csproj | 40 +++ .../Usings.cs | 21 ++ .../ChunkedJsonMessageStream.cs | 2 +- .../NewlineDelimitedJsonMessageStream.cs | 2 +- .../ServerSentEventMessageStream.cs | 2 +- .../KafkaBindingHandler.cs | 11 +- .../KafkaSubscription.cs | 2 +- .../MqttSubscription.cs | 2 +- .../NatsSubscription.cs | 2 +- .../RedisSubscription.cs | 2 +- .../Amqp/AmqpChannelBindingDefinition.cs | 2 +- .../Bindings/AmqpBindingHandlerTests.cs | 219 +++++++++++++++ .../Containers/AmqpContainerBuilder.cs | 36 +++ 22 files changed, 905 insertions(+), 13 deletions(-) create mode 100644 src/Neuroglia.AsyncApi.Client.Bindings.Amqp/AmqpBindingHandler.cs create mode 100644 src/Neuroglia.AsyncApi.Client.Bindings.Amqp/AmqpPublishOperationResult.cs create mode 100644 src/Neuroglia.AsyncApi.Client.Bindings.Amqp/AmqpSubscribeOperationResult.cs create mode 100644 src/Neuroglia.AsyncApi.Client.Bindings.Amqp/AmqpSubscription.cs create mode 100644 src/Neuroglia.AsyncApi.Client.Bindings.Amqp/Configuration/AmqpBindingHandlerOptions.cs create mode 100644 src/Neuroglia.AsyncApi.Client.Bindings.Amqp/Extensions/IAsyncApiConfigurationBuilderAmqpExtensions.cs create mode 100644 src/Neuroglia.AsyncApi.Client.Bindings.Amqp/Neuroglia.AsyncApi.Client.Bindings.Amqp.csproj create mode 100644 src/Neuroglia.AsyncApi.Client.Bindings.Amqp/Usings.cs create mode 100644 tests/Neuroglia.AsyncApi.UnitTests/Cases/Client/Bindings/AmqpBindingHandlerTests.cs create mode 100644 tests/Neuroglia.AsyncApi.UnitTests/Containers/AmqpContainerBuilder.cs diff --git a/Neuroglia.AsyncApi.sln b/Neuroglia.AsyncApi.sln index d71270a..673a562 100644 --- a/Neuroglia.AsyncApi.sln +++ b/Neuroglia.AsyncApi.sln @@ -67,6 +67,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Neuroglia.AsyncApi.Client.B EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Neuroglia.AsyncApi.Client.Bindings.Kafka", "src\Neuroglia.AsyncApi.Client.Bindings.Kafka\Neuroglia.AsyncApi.Client.Bindings.Kafka.csproj", "{84AAE014-BD45-4B6F-96CE-01E364B802EB}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Neuroglia.AsyncApi.Client.Bindings.Amqp", "src\Neuroglia.AsyncApi.Client.Bindings.Amqp\Neuroglia.AsyncApi.Client.Bindings.Amqp.csproj", "{40A5544D-723E-4329-98A6-E512EC367F62}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -145,6 +147,10 @@ Global {84AAE014-BD45-4B6F-96CE-01E364B802EB}.Debug|Any CPU.Build.0 = Debug|Any CPU {84AAE014-BD45-4B6F-96CE-01E364B802EB}.Release|Any CPU.ActiveCfg = Release|Any CPU {84AAE014-BD45-4B6F-96CE-01E364B802EB}.Release|Any CPU.Build.0 = Release|Any CPU + {40A5544D-723E-4329-98A6-E512EC367F62}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {40A5544D-723E-4329-98A6-E512EC367F62}.Debug|Any CPU.Build.0 = Debug|Any CPU + {40A5544D-723E-4329-98A6-E512EC367F62}.Release|Any CPU.ActiveCfg = Release|Any CPU + {40A5544D-723E-4329-98A6-E512EC367F62}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -170,6 +176,7 @@ Global {D7665DAF-42CD-47E7-8A77-52C82B696D9B} = {4B933DF9-CD24-44B1-AF64-0D5E75B9AB45} {DCADC636-53BA-408A-A870-CC07FD63A3F3} = {4B933DF9-CD24-44B1-AF64-0D5E75B9AB45} {84AAE014-BD45-4B6F-96CE-01E364B802EB} = {4B933DF9-CD24-44B1-AF64-0D5E75B9AB45} + {40A5544D-723E-4329-98A6-E512EC367F62} = {4B933DF9-CD24-44B1-AF64-0D5E75B9AB45} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {DC433DEB-01E5-4328-B0BB-6FFFE8C7363F} diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.All/Extensions/IAsyncApiClientOptionsBuilderExtensions.cs b/src/Neuroglia.AsyncApi.Client.Bindings.All/Extensions/IAsyncApiClientOptionsBuilderExtensions.cs index 6e55b69..2be1b09 100644 --- a/src/Neuroglia.AsyncApi.Client.Bindings.All/Extensions/IAsyncApiClientOptionsBuilderExtensions.cs +++ b/src/Neuroglia.AsyncApi.Client.Bindings.All/Extensions/IAsyncApiClientOptionsBuilderExtensions.cs @@ -28,6 +28,7 @@ public static class IAsyncApiClientOptionsBuilderExtensions /// The configured public static IAsyncApiClientOptionsBuilder AddAllBindingHandlers(this IAsyncApiClientOptionsBuilder builder) { + builder.AddAmqpBindingHandler(); builder.AddHttpBindingHandler(); builder.AddKafkaBindingHandler(); builder.AddMqttBindingHandler(); diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.All/Neuroglia.AsyncApi.Client.Bindings.All.csproj b/src/Neuroglia.AsyncApi.Client.Bindings.All/Neuroglia.AsyncApi.Client.Bindings.All.csproj index a4e2209..1e4b10e 100644 --- a/src/Neuroglia.AsyncApi.Client.Bindings.All/Neuroglia.AsyncApi.Client.Bindings.All.csproj +++ b/src/Neuroglia.AsyncApi.Client.Bindings.All/Neuroglia.AsyncApi.Client.Bindings.All.csproj @@ -29,6 +29,7 @@ + diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/AmqpBindingHandler.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/AmqpBindingHandler.cs new file mode 100644 index 0000000..85634ca --- /dev/null +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/AmqpBindingHandler.cs @@ -0,0 +1,190 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Microsoft.Extensions.DependencyInjection; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using System.Text; + +namespace Neuroglia.AsyncApi.Client.Bindings.Amqp; + +/// +/// Represents the default Amqp implementation of the interface +/// +/// The current +/// The service used to perform logging +/// The service used to access the current +/// The service used to provide s +public class AmqpBindingHandler(IServiceProvider serviceProvider, ILogger logger, IOptions options, ISerializerProvider serializerProvider) + : IBindingHandler +{ + + /// + /// Gets the current + /// + protected IServiceProvider ServiceProvider { get; } = serviceProvider; + + /// + /// Gets the service used to perform logging + /// + protected ILogger Logger { get; } = logger; + + /// + /// Gets the current + /// + protected AmqpBindingHandlerOptions Options { get; } = options.Value; + + /// + /// Gets the service used to provide s + /// + protected ISerializerProvider SerializerProvider { get; } = serializerProvider; + + /// + public virtual bool Supports(string protocol, string? protocolVersion) => protocol.Equals(AsyncApiProtocol.Amqp, StringComparison.OrdinalIgnoreCase); + + /// + public virtual async Task PublishAsync(AsyncApiPublishOperationContext context, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(context); + try + { + var channelBinding = context.ChannelBinding as AmqpChannelBindingDefinition; + var operationBinding = context.OperationBinding as AmqpOperationBindingDefinition; + var messageBinding = context.MessageBinding as AmqpMessageBindingDefinition; + var channelType = channelBinding?.Type ?? AmqpChannelType.Queue; + var virtualHost = channelBinding?.Type switch + { + AmqpChannelType.Queue => channelBinding?.Queue?.VirtualHost, + AmqpChannelType.RoutingKey => channelBinding?.Exchange?.VirtualHost, + _ => null + }; + var hostComponents = context.Host.Split(':', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries); + var host = hostComponents[0]; + var port = 5672; + if (hostComponents.Length > 1) port = int.Parse(hostComponents[1]); + var connectionFactory = new ConnectionFactory + { + HostName = host, + Port = port + }; + if(!string.IsNullOrWhiteSpace(virtualHost)) connectionFactory.VirtualHost = virtualHost; + using var connection = await connectionFactory.CreateConnectionAsync(cancellationToken).ConfigureAwait(false); + using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + try + { + switch (channelType) + { + case AmqpChannelType.Queue: + var queueDeclareResult = await channel.QueueDeclareAsync(channelBinding?.Queue?.Name ?? string.Empty, channelBinding?.Queue?.Durable ?? false, channelBinding?.Queue?.Exclusive ?? false, channelBinding?.Queue?.AutoDelete ?? false, cancellationToken: cancellationToken).ConfigureAwait(false); + break; + case AmqpChannelType.RoutingKey: + await channel.ExchangeDeclareAsync(channelBinding?.Exchange?.Name ?? string.Empty, EnumHelper.Stringify(channelBinding?.Exchange?.Type ?? AmqpExchangeType.Default), channelBinding?.Exchange?.Durable ?? false, channelBinding?.Exchange?.AutoDelete ?? false, cancellationToken: cancellationToken).ConfigureAwait(false); + break; + default: + throw new NotSupportedException($"The specified AMQP channel type '{channelType}' is not supported"); + } + } + catch { } + var exchangeName = channelBinding?.Exchange?.Name ?? string.Empty; + var serializer = SerializerProvider.GetSerializersFor(context.ContentType).FirstOrDefault() ?? throw new NullReferenceException($"Failed to find a serializer for the specified content type '{context.ContentType}'"); + using var stream = new MemoryStream(); + serializer.Serialize(context.Payload ?? new { }, stream); + await stream.FlushAsync(cancellationToken).ConfigureAwait(false); + stream.Position = 0; + var deliveryMode = operationBinding?.DeliveryMode switch + { + AmqpDeliveryMode.Transient => (DeliveryModes?)DeliveryModes.Transient, + AmqpDeliveryMode.Persistent => DeliveryModes.Persistent, + _ => null + }; + var properties = new BasicProperties() + { + ContentType = context.ContentType, + UserId = operationBinding?.UserId, + Priority = operationBinding?.Priority ?? 0, + ReplyTo = operationBinding?.ReplyTo, + Headers = context.Headers.ToDictionary()! + }; + if(deliveryMode.HasValue) properties.DeliveryMode = deliveryMode.Value; + var payload = stream.ToArray(); + await channel.BasicPublishAsync(exchangeName, context.Channel!, operationBinding?.Mandatory ?? false, properties, payload, cancellationToken).ConfigureAwait(false); + return new AmqpPublishOperationResult(); + } + catch (Exception ex) + { + return new AmqpPublishOperationResult() + { + Exception = ex + }; + } + } + + /// + public virtual async Task SubscribeAsync(AsyncApiSubscribeOperationContext context, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(context); + try + { + var channelBinding = context.ChannelBinding as AmqpChannelBindingDefinition; + var operationBinding = context.OperationBinding as AmqpOperationBindingDefinition; + var channelType = channelBinding?.Type ?? AmqpChannelType.Queue; + var virtualHost = channelBinding?.Type switch + { + AmqpChannelType.Queue => channelBinding?.Queue?.VirtualHost, + AmqpChannelType.RoutingKey => channelBinding?.Exchange?.VirtualHost, + _ => null + }; + var hostComponents = context.Host.Split(':', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries); + var host = hostComponents[0]; + var port = 5672; + if (hostComponents.Length > 1) port = int.Parse(hostComponents[1]); + var connectionFactory = new ConnectionFactory + { + HostName = host, + Port = port + }; + if (!string.IsNullOrWhiteSpace(virtualHost)) connectionFactory.VirtualHost = virtualHost; + var connection = await connectionFactory.CreateConnectionAsync(cancellationToken).ConfigureAwait(false); + var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + var queueName = channelBinding?.Queue?.Name ?? context.Channel ?? string.Empty; + var exchangeName = channelBinding?.Exchange?.Name ?? string.Empty; + try + { + switch (channelType) + { + case AmqpChannelType.Queue: + var queueDeclareResult = await channel.QueueDeclareAsync(queueName, channelBinding?.Queue?.Durable ?? false, channelBinding?.Queue?.Exclusive ?? false, channelBinding?.Queue?.AutoDelete ?? false, cancellationToken: cancellationToken).ConfigureAwait(false); + break; + case AmqpChannelType.RoutingKey: + await channel.ExchangeDeclareAsync(exchangeName, EnumHelper.Stringify(channelBinding?.Exchange?.Type ?? AmqpExchangeType.Default), channelBinding?.Exchange?.Durable ?? false, channelBinding?.Exchange?.AutoDelete ?? false, cancellationToken: cancellationToken).ConfigureAwait(false); + break; + default: + throw new NotSupportedException($"The specified AMQP channel type '{channelType}' is not supported"); + } + } + catch { } + var consumer = new AsyncEventingBasicConsumer(channel); + var subscription = ActivatorUtilities.CreateInstance(ServiceProvider, context.Document, context.Messages, context.DefaultContentType, connection, channel, consumer); + await channel.BasicConsumeAsync(queueName, !(operationBinding?.Ack ?? false), consumer, cancellationToken).ConfigureAwait(false); + return new AmqpSubscribeOperationResult(subscription); + } + catch (Exception ex) + { + return new AmqpSubscribeOperationResult() + { + Exception = ex + }; + } + } + +} diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/AmqpPublishOperationResult.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/AmqpPublishOperationResult.cs new file mode 100644 index 0000000..837aba5 --- /dev/null +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/AmqpPublishOperationResult.cs @@ -0,0 +1,31 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Neuroglia.AsyncApi.Client.Bindings.Amqp; + +/// +/// Represents an object used to describe the result of a Amqp publish operation +/// +public class AmqpPublishOperationResult + : AsyncApiPublishOperationResult +{ + + /// + /// Gets/sets the , if any, that occurred during publishing + /// + public virtual Exception? Exception { get; init; } + + /// + public override bool IsSuccessful => Exception == null; + +} diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/AmqpSubscribeOperationResult.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/AmqpSubscribeOperationResult.cs new file mode 100644 index 0000000..fa0eaa0 --- /dev/null +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/AmqpSubscribeOperationResult.cs @@ -0,0 +1,35 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Neuroglia.AsyncApi.Client.Bindings.Amqp; + +/// +/// Represents an object used to describe the result of a Amqp subscribe operation +/// +/// An , if any, used to observe incoming s +public class AmqpSubscribeOperationResult(IObservable? messages = null) + : AsyncApiSubscribeOperationResult +{ + + /// + /// Gets/sets the , if any, that occurred during subscription + /// + public virtual Exception? Exception { get; init; } + + /// + public override IObservable? Messages { get; } = messages; + + /// + public override bool IsSuccessful => Exception == null; + +} \ No newline at end of file diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/AmqpSubscription.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/AmqpSubscription.cs new file mode 100644 index 0000000..7a908d2 --- /dev/null +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/AmqpSubscription.cs @@ -0,0 +1,250 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Neuroglia.AsyncApi.v3; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using System.Reactive.Linq; +using System.Reactive.Subjects; + +namespace Neuroglia.AsyncApi.Client.Bindings.Amqp; + +/// +/// Represents a subscription to a Amqp channel, used to stream s +/// +public class AmqpSubscription + : IObservable, IDisposable, IAsyncDisposable +{ + + bool _disposed; + + /// + /// Initializes a new + /// + /// The service used to perform logging + /// The current AMQP connection + /// The current AMQP channel + /// The service used to consume AMQP messages + /// The content type of consumed messages + /// The service used to evaluate runtime expressions + /// The service used to provide s + /// The service used to provide s + /// The that defines the operation for which to consume MQTT messages + /// An containing the definitions of all messages that can potentially be consumed + public AmqpSubscription(ILogger logger, IConnection connection, IChannel channel, IAsyncBasicConsumer consumer, string messageContentType, IRuntimeExpressionEvaluator runtimeExpressionEvaluator, ISchemaHandlerProvider schemaHandlerProvider, ISerializerProvider serializerProvider, V3AsyncApiDocument document, IEnumerable messageDefinitions) + { + Logger = logger; + Connection = connection; + Channel = channel; + Consumer = consumer; + if (consumer is AsyncEventingBasicConsumer asyncEventingBasicConsumer) asyncEventingBasicConsumer.ReceivedAsync += OnAmqpMessageAsync; + MessageContentType = messageContentType; + RuntimeExpressionEvaluator = runtimeExpressionEvaluator; + SchemaHandlerProvider = schemaHandlerProvider; + SerializerProvider = serializerProvider; + Document = document; + MessageDefinitions = messageDefinitions; + } + + /// + /// Gets the service used to perform logging + /// + protected ILogger Logger { get; } + + /// + /// Gets the current AMQP connection + /// + protected IConnection Connection { get; } + + /// + /// Gets the current AMQP channel + /// + protected IChannel Channel { get; } + + /// + /// Gets the service used to consume AMQP messages + /// + protected IAsyncBasicConsumer Consumer { get; } + + /// + /// Gets the content type of consumed messages + /// + protected string MessageContentType { get; } + + /// + /// Gets the service used to evaluate runtime expressions + /// + protected IRuntimeExpressionEvaluator RuntimeExpressionEvaluator { get; } + + /// + /// Gets the service used to provide s + /// + protected ISchemaHandlerProvider SchemaHandlerProvider { get; } + + /// + /// Gets the service used to provide s + /// + protected ISerializerProvider SerializerProvider { get; } + + /// + /// Gets the that defines the operation for which to consume MQTT messages + /// + protected V3AsyncApiDocument Document { get; } + + /// + /// Gets an containing the definitions of all messages that can potentially be consumed + /// + protected IEnumerable MessageDefinitions { get; } + + /// + /// Gets the 's + /// + protected CancellationTokenSource CancellationTokenSource { get; } = new(); + + /// + /// Gets the used to observe consumed + /// + protected Subject Subject { get; } = new(); + + /// + public virtual IDisposable Subscribe(IObserver observer) => Subject.Subscribe(observer); + + /// + /// Handles the consumption of an AMQP message + /// + /// The sender of the message + /// The that wraps the consumed AMQP message + /// A new awaitable + protected virtual async Task OnAmqpMessageAsync(object sender, BasicDeliverEventArgs e) + { + try + { + var buffer = e.Body.ToArray(); + using var stream = new MemoryStream(buffer); + var contentType = e.BasicProperties.ContentType ?? MessageContentType; + var serializer = SerializerProvider.GetSerializersFor(contentType).FirstOrDefault() ?? throw new NullReferenceException($"Failed to find a serializer for the specified content type '{contentType}'"); + var payload = serializer.Deserialize(stream); + var headers = e.BasicProperties.Headers; + var messageDefinition = await MessageDefinitions.ToAsyncEnumerable().SingleOrDefaultAwaitAsync(async m => await MessageMatchesAsync(payload, headers, m, CancellationTokenSource.Token).ConfigureAwait(false), CancellationTokenSource.Token).ConfigureAwait(false) ?? throw new NullReferenceException("Failed to resolve the message definition for the specified operation. Make sure the message matches one and only one of the message definitions configured for the specified operation"); + var correlationId = string.Empty; + if (messageDefinition.CorrelationId != null) + { + var correlationIdDefinition = messageDefinition.CorrelationId.IsReference ? Document.DereferenceCorrelationId(messageDefinition.CorrelationId.Reference!) : messageDefinition.CorrelationId; + correlationId = await RuntimeExpressionEvaluator.EvaluateAsync(correlationIdDefinition.Location, payload, headers, CancellationTokenSource.Token).ConfigureAwait(false); + } + var message = new AsyncApiMessage(contentType, payload, headers, correlationId); + Subject.OnNext(message); + } + catch (Exception ex) + { + Logger.LogError("An error occurred while consuming a NATS message: {ex}", ex); + } + } + + /// + /// Determines whether or not the specified payload matches the specified + /// + /// The message's payload, if any + /// The message's headers, if any + /// The to check + /// A + /// A boolean indicating whether or not the specified matches the specified + protected virtual async Task MessageMatchesAsync(object? payload, object? headers, V3MessageDefinition message, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(message); + if (message.Payload != null) + { + var schemaDefinition = message.Payload.IsReference ? Document.DereferenceSchema(message.Payload.Reference!) : message.Payload; + var schemaFormat = message.Payload.SchemaFormat ?? SchemaFormat.AsyncApi; + var schemaHandler = SchemaHandlerProvider.GetHandler(schemaFormat); + if (schemaHandler == null) this.Logger.LogWarning("Failed to find an handler used to validate the specified schema format '{schemaFormat}", schemaFormat); + else + { + var result = await schemaHandler.ValidateAsync(payload ?? new { }, schemaDefinition.Schema, cancellationToken).ConfigureAwait(false); + if (!result.IsSuccess()) return false; + } + } + if (message.Headers != null) + { + var schemaDefinition = message.Headers.IsReference ? Document.DereferenceSchema(message.Headers.Reference!) : message.Headers; + var schemaFormat = message.Headers.SchemaFormat ?? SchemaFormat.AsyncApi; + var schemaHandler = SchemaHandlerProvider.GetHandler(schemaFormat); + if (schemaHandler == null) this.Logger.LogWarning("Failed to find an handler used to validate the specified schema format '{schemaFormat}", schemaFormat); + else + { + var result = await schemaHandler.ValidateAsync(headers ?? new { }, schemaDefinition.Schema, cancellationToken).ConfigureAwait(false); + if (!result.IsSuccess()) return false; + } + } + if (message.CorrelationId != null) + { + var correlationIdDefinition = message.CorrelationId.IsReference ? Document.DereferenceCorrelationId(message.CorrelationId.Reference!) : message.CorrelationId; + var correlationId = await RuntimeExpressionEvaluator.EvaluateAsync(correlationIdDefinition.Location, payload, headers, cancellationToken).ConfigureAwait(false); + if (string.IsNullOrWhiteSpace(correlationId)) return false; + } + return true; + } + + /// + /// Disposes of the + /// + /// A boolean indicating whether or not the is being disposed of + protected virtual void Dispose(bool disposing) + { + if (!_disposed) + { + if (disposing) + { + CancellationTokenSource.Dispose(); + Channel.Dispose(); + Connection.Dispose(); + Subject.Dispose(); + } + _disposed = true; + } + } + + /// + public void Dispose() + { + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + + /// + /// Disposes of the + /// + /// A boolean indicating whether or not the is being disposed of + protected virtual async ValueTask DisposeAsync(bool disposing) + { + if (!_disposed) + { + if (disposing) + { + CancellationTokenSource.Dispose(); + await Channel.DisposeAsync().ConfigureAwait(false); + await Connection.DisposeAsync().ConfigureAwait(false); + Subject.Dispose(); + } + _disposed = true; + } + } + + /// + public async ValueTask DisposeAsync() + { + await DisposeAsync(disposing: true).ConfigureAwait(false); + GC.SuppressFinalize(this); + } + +} \ No newline at end of file diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/Configuration/AmqpBindingHandlerOptions.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/Configuration/AmqpBindingHandlerOptions.cs new file mode 100644 index 0000000..09d1aa4 --- /dev/null +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/Configuration/AmqpBindingHandlerOptions.cs @@ -0,0 +1,25 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Neuroglia.AsyncApi.Client.Bindings.Amqp.Configuration; + +/// +/// Represents the options used to configure a +/// +public class AmqpBindingHandlerOptions + : BindingHandlerOptions +{ + + + +} diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/Extensions/IAsyncApiConfigurationBuilderAmqpExtensions.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/Extensions/IAsyncApiConfigurationBuilderAmqpExtensions.cs new file mode 100644 index 0000000..b420b42 --- /dev/null +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/Extensions/IAsyncApiConfigurationBuilderAmqpExtensions.cs @@ -0,0 +1,35 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Neuroglia.AsyncApi.Client.Bindings; + +/// +/// Defines extensions for s +/// +public static class IAsyncApiConfigurationBuilderAmqpExtensions +{ + + /// + /// Adds and configures an used to handle Amqp operations + /// + /// The extended + /// An , if any, used to setup the 's options + /// The configured + public static IAsyncApiClientOptionsBuilder AddAmqpBindingHandler(this IAsyncApiClientOptionsBuilder builder, Action? setup = null) + { + setup ??= _ => { }; + builder.AddBindingHandler(setup); + return builder; + } + +} diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/Neuroglia.AsyncApi.Client.Bindings.Amqp.csproj b/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/Neuroglia.AsyncApi.Client.Bindings.Amqp.csproj new file mode 100644 index 0000000..0889ee2 --- /dev/null +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/Neuroglia.AsyncApi.Client.Bindings.Amqp.csproj @@ -0,0 +1,40 @@ + + + + net9.0 + enable + enable + True + Neuroglia SRL + Copyright © 2023-Present Neuroglia SRL. All rights reserved. + Neuroglia SRL + https://github.com/neuroglia-io/asyncapi + git + neuroglia asyncapi async api client binding amqp + 3.0.1 + en + Apache-2.0 + true + true + logo_white_on_blue_256.png + true + embedded + + + + + True + \ + + + + + + + + + + + + + diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/Usings.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/Usings.cs new file mode 100644 index 0000000..80be89f --- /dev/null +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Amqp/Usings.cs @@ -0,0 +1,21 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +global using Microsoft.Extensions.Logging; +global using Microsoft.Extensions.Options; +global using Neuroglia.AsyncApi.Bindings.Amqp; +global using Neuroglia.AsyncApi.Client.Bindings.Amqp; +global using Neuroglia.AsyncApi.Client.Bindings.Amqp.Configuration; +global using Neuroglia.AsyncApi.Client.Configuration; +global using Neuroglia.AsyncApi.Client.Services; +global using Neuroglia.Serialization; diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Http/ChunkedJsonMessageStream.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Http/ChunkedJsonMessageStream.cs index 9434945..477976e 100644 --- a/src/Neuroglia.AsyncApi.Client.Bindings.Http/ChunkedJsonMessageStream.cs +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Http/ChunkedJsonMessageStream.cs @@ -38,7 +38,7 @@ protected override async Task ReadAsync() await foreach(var payload in JsonSerializer.DeserializeAsyncEnumerable(Stream, CancellationTokenSource.Token).ConfigureAwait(false)) { var headers = (object?)null; - var messageDefinition = await MessageDefinitions.ToAsyncEnumerable().SingleOrDefaultAwaitAsync(async m => await MessageMatchesAsync(payload, headers, m, CancellationTokenSource.Token).ConfigureAwait(false), CancellationTokenSource.Token).ConfigureAwait(false) ?? throw new NullReferenceException("Failed to resolve the message definition for the specified operation. Make sure the message matches one and only one of the message definitions configured for the specified operation"); ; + var messageDefinition = await MessageDefinitions.ToAsyncEnumerable().SingleOrDefaultAwaitAsync(async m => await MessageMatchesAsync(payload, headers, m, CancellationTokenSource.Token).ConfigureAwait(false), CancellationTokenSource.Token).ConfigureAwait(false) ?? throw new NullReferenceException("Failed to resolve the message definition for the specified operation. Make sure the message matches one and only one of the message definitions configured for the specified operation"); var correlationId = string.Empty; if (messageDefinition.CorrelationId != null) { diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Http/NewlineDelimitedJsonMessageStream.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Http/NewlineDelimitedJsonMessageStream.cs index 8dcc803..69d406c 100644 --- a/src/Neuroglia.AsyncApi.Client.Bindings.Http/NewlineDelimitedJsonMessageStream.cs +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Http/NewlineDelimitedJsonMessageStream.cs @@ -47,7 +47,7 @@ protected override async Task ReadAsync() object? payload; object? headers = null; payload = JsonSerializer.Deserialize(json); - var messageDefinition = await MessageDefinitions.ToAsyncEnumerable().SingleOrDefaultAwaitAsync(async m => await MessageMatchesAsync(payload, headers, m, CancellationTokenSource.Token).ConfigureAwait(false), CancellationTokenSource.Token).ConfigureAwait(false) ?? throw new NullReferenceException("Failed to resolve the message definition for the specified operation. Make sure the message matches one and only one of the message definitions configured for the specified operation"); ; + var messageDefinition = await MessageDefinitions.ToAsyncEnumerable().SingleOrDefaultAwaitAsync(async m => await MessageMatchesAsync(payload, headers, m, CancellationTokenSource.Token).ConfigureAwait(false), CancellationTokenSource.Token).ConfigureAwait(false) ?? throw new NullReferenceException("Failed to resolve the message definition for the specified operation. Make sure the message matches one and only one of the message definitions configured for the specified operation"); var correlationId = string.Empty; if (messageDefinition.CorrelationId != null) { diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Http/ServerSentEventMessageStream.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Http/ServerSentEventMessageStream.cs index ea40442..575dadd 100644 --- a/src/Neuroglia.AsyncApi.Client.Bindings.Http/ServerSentEventMessageStream.cs +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Http/ServerSentEventMessageStream.cs @@ -67,7 +67,7 @@ protected override async Task ReadAsync() { payload = JsonSerializer.Deserialize(@$"""json"""); } - var messageDefinition = await MessageDefinitions.ToAsyncEnumerable().SingleOrDefaultAwaitAsync(async m => await MessageMatchesAsync(payload, headers, m, CancellationTokenSource.Token).ConfigureAwait(false), CancellationTokenSource.Token).ConfigureAwait(false) ?? throw new NullReferenceException("Failed to resolve the message definition for the specified operation. Make sure the message matches one and only one of the message definitions configured for the specified operation"); ; + var messageDefinition = await MessageDefinitions.ToAsyncEnumerable().SingleOrDefaultAwaitAsync(async m => await MessageMatchesAsync(payload, headers, m, CancellationTokenSource.Token).ConfigureAwait(false), CancellationTokenSource.Token).ConfigureAwait(false) ?? throw new NullReferenceException("Failed to resolve the message definition for the specified operation. Make sure the message matches one and only one of the message definitions configured for the specified operation"); var correlationId = string.Empty; if (messageDefinition.CorrelationId != null) { diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaBindingHandler.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaBindingHandler.cs index d4fb043..89beefa 100644 --- a/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaBindingHandler.cs +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaBindingHandler.cs @@ -14,7 +14,6 @@ using Confluent.Kafka; using Json.Schema; using Microsoft.Extensions.DependencyInjection; -using System.Text; namespace Neuroglia.AsyncApi.Client.Bindings.Kafka; @@ -113,14 +112,16 @@ public virtual Task SubscribeAsync(AsyncApiSu var serverBinding = context.ServerBinding as KafkaServerBindingDefinition; var channelBinding = context.ChannelBinding as KafkaChannelBindingDefinition; var operationBinding = context.OperationBinding as KafkaOperationBindingDefinition; - var groupId = operationBinding?.GroupId?.GetKeyword()?.Value ?? operationBinding?.GroupId?.GetKeyword()?.Values?[0]; - var clientId = operationBinding?.ClientId?.GetKeyword()?.Value ?? operationBinding?.ClientId?.GetKeyword()?.Values?[0]; + var groupIdJson = operationBinding?.GroupId?.GetKeyword()?.Value ?? operationBinding?.GroupId?.GetKeyword()?.Values?[0]; + var groupId = groupIdJson == null ? "default" : JsonSerializer.Deserialize(groupIdJson); + var clientIdJson = operationBinding?.ClientId?.GetKeyword()?.Value ?? operationBinding?.ClientId?.GetKeyword()?.Values?[0]; + var clientId = clientIdJson == null ? "rdkafka" : JsonSerializer.Deserialize(clientIdJson); var topic = channelBinding?.Topic ?? context.Channel!; var consumerConfig = new ConsumerConfig { BootstrapServers = $"{context.Host}{context.Path}", - GroupId = groupId == null ? null : JsonSerializer.Deserialize(groupId), - ClientId = clientId == null ? null : JsonSerializer.Deserialize(clientId) + GroupId = groupId, + ClientId = clientId }; var consumer = new ConsumerBuilder(consumerConfig).Build(); consumer.Subscribe(topic); diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaSubscription.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaSubscription.cs index e17c95e..05c74ec 100644 --- a/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaSubscription.cs +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaSubscription.cs @@ -123,7 +123,7 @@ protected virtual async Task ReadAsync() var serializer = SerializerProvider.GetSerializersFor(MessageContentType).FirstOrDefault() ?? throw new NullReferenceException($"Failed to find a serializer for the specified content type '{MessageContentType}'"); var payload = serializer.Deserialize(stream); var headers = consumeResult.Message.Headers?.ToDictionary(kvp => kvp.Key, kvp => serializer.Deserialize(kvp.GetValueBytes())); - var messageDefinition = await MessageDefinitions.ToAsyncEnumerable().SingleOrDefaultAwaitAsync(async m => await MessageMatchesAsync(payload, headers, m, CancellationTokenSource.Token).ConfigureAwait(false), CancellationTokenSource.Token).ConfigureAwait(false) ?? throw new NullReferenceException("Failed to resolve the message definition for the specified operation. Make sure the message matches one and only one of the message definitions configured for the specified operation"); ; + var messageDefinition = await MessageDefinitions.ToAsyncEnumerable().SingleOrDefaultAwaitAsync(async m => await MessageMatchesAsync(payload, headers, m, CancellationTokenSource.Token).ConfigureAwait(false), CancellationTokenSource.Token).ConfigureAwait(false) ?? throw new NullReferenceException("Failed to resolve the message definition for the specified operation. Make sure the message matches one and only one of the message definitions configured for the specified operation"); var correlationId = string.Empty; if (messageDefinition.CorrelationId != null) { diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Mqtt/MqttSubscription.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Mqtt/MqttSubscription.cs index 616bd58..c3b9079 100644 --- a/src/Neuroglia.AsyncApi.Client.Bindings.Mqtt/MqttSubscription.cs +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Mqtt/MqttSubscription.cs @@ -114,7 +114,7 @@ protected virtual async Task OnMessageReceivedAsync(MqttApplicationMessageReceiv var serializer = SerializerProvider.GetSerializersFor(e.ApplicationMessage.ContentType).FirstOrDefault() ?? throw new NullReferenceException($"Failed to find a serializer for the specified content type '{e.ApplicationMessage.ContentType}'"); var payload = serializer.Deserialize(stream); var headers = e.ApplicationMessage.UserProperties?.ToDictionary(kvp => kvp.Name, kvp => kvp.Value); - var messageDefinition = await MessageDefinitions.ToAsyncEnumerable().SingleOrDefaultAwaitAsync(async m => await MessageMatchesAsync(payload, headers, m, CancellationTokenSource.Token).ConfigureAwait(false), CancellationTokenSource.Token).ConfigureAwait(false) ?? throw new NullReferenceException("Failed to resolve the message definition for the specified operation. Make sure the message matches one and only one of the message definitions configured for the specified operation"); ; + var messageDefinition = await MessageDefinitions.ToAsyncEnumerable().SingleOrDefaultAwaitAsync(async m => await MessageMatchesAsync(payload, headers, m, CancellationTokenSource.Token).ConfigureAwait(false), CancellationTokenSource.Token).ConfigureAwait(false) ?? throw new NullReferenceException("Failed to resolve the message definition for the specified operation. Make sure the message matches one and only one of the message definitions configured for the specified operation"); var correlationId = string.Empty; if (messageDefinition.CorrelationId != null) { diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Nats/NatsSubscription.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Nats/NatsSubscription.cs index ffc06dd..c2e633b 100644 --- a/src/Neuroglia.AsyncApi.Client.Bindings.Nats/NatsSubscription.cs +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Nats/NatsSubscription.cs @@ -126,7 +126,7 @@ protected virtual async Task ReadAsync() var serializer = SerializerProvider.GetSerializersFor(MessageContentType).FirstOrDefault() ?? throw new NullReferenceException($"Failed to find a serializer for the specified content type '{MessageContentType}'"); var payload = serializer.Deserialize(stream); var headers = natsMessage.Headers?.ToDictionary(kvp => kvp.Key, kvp => string.Join(',', (string?)kvp.Value)); - var messageDefinition = await MessageDefinitions.ToAsyncEnumerable().SingleOrDefaultAwaitAsync(async m => await MessageMatchesAsync(payload, headers, m, CancellationTokenSource.Token).ConfigureAwait(false), CancellationTokenSource.Token).ConfigureAwait(false) ?? throw new NullReferenceException("Failed to resolve the message definition for the specified operation. Make sure the message matches one and only one of the message definitions configured for the specified operation"); ; + var messageDefinition = await MessageDefinitions.ToAsyncEnumerable().SingleOrDefaultAwaitAsync(async m => await MessageMatchesAsync(payload, headers, m, CancellationTokenSource.Token).ConfigureAwait(false), CancellationTokenSource.Token).ConfigureAwait(false) ?? throw new NullReferenceException("Failed to resolve the message definition for the specified operation. Make sure the message matches one and only one of the message definitions configured for the specified operation"); var correlationId = string.Empty; if (messageDefinition.CorrelationId != null) { diff --git a/src/Neuroglia.AsyncApi.Client.Bindings.Redis/RedisSubscription.cs b/src/Neuroglia.AsyncApi.Client.Bindings.Redis/RedisSubscription.cs index 1096670..27a2d16 100644 --- a/src/Neuroglia.AsyncApi.Client.Bindings.Redis/RedisSubscription.cs +++ b/src/Neuroglia.AsyncApi.Client.Bindings.Redis/RedisSubscription.cs @@ -119,7 +119,7 @@ protected virtual async void OnMessageReceivedAsync(RedisChannel channel, RedisV var serializer = SerializerProvider.GetSerializersFor(MessageContentType).FirstOrDefault() ?? throw new NullReferenceException($"Failed to find a serializer for the specified content type '{MessageContentType}'"); var payload = serializer.Deserialize(stream); var headers = (object?)null; - var messageDefinition = await MessageDefinitions.ToAsyncEnumerable().SingleOrDefaultAwaitAsync(async m => await MessageMatchesAsync(payload, headers, m, CancellationTokenSource.Token).ConfigureAwait(false), CancellationTokenSource.Token).ConfigureAwait(false) ?? throw new NullReferenceException("Failed to resolve the message definition for the specified operation. Make sure the message matches one and only one of the message definitions configured for the specified operation"); ; + var messageDefinition = await MessageDefinitions.ToAsyncEnumerable().SingleOrDefaultAwaitAsync(async m => await MessageMatchesAsync(payload, headers, m, CancellationTokenSource.Token).ConfigureAwait(false), CancellationTokenSource.Token).ConfigureAwait(false) ?? throw new NullReferenceException("Failed to resolve the message definition for the specified operation. Make sure the message matches one and only one of the message definitions configured for the specified operation"); var correlationId = string.Empty; if (messageDefinition.CorrelationId != null) { diff --git a/src/Neuroglia.AsyncApi.Core/Bindings/Amqp/AmqpChannelBindingDefinition.cs b/src/Neuroglia.AsyncApi.Core/Bindings/Amqp/AmqpChannelBindingDefinition.cs index a815f86..d8e8ef2 100644 --- a/src/Neuroglia.AsyncApi.Core/Bindings/Amqp/AmqpChannelBindingDefinition.cs +++ b/src/Neuroglia.AsyncApi.Core/Bindings/Amqp/AmqpChannelBindingDefinition.cs @@ -25,7 +25,7 @@ public record AmqpChannelBindingDefinition /// Gets/sets AMQP channel's type /// [DataMember(Order = 1, Name = "is"), JsonPropertyOrder(1), JsonPropertyName("is"), YamlMember(Order = 1, Alias = "is")] - public virtual AmqpChannelType Type { get; set; } + public virtual AmqpChannelType? Type { get; set; } /// /// Gets/sets the object used to configure the AMQP exchange, when is set to diff --git a/tests/Neuroglia.AsyncApi.UnitTests/Cases/Client/Bindings/AmqpBindingHandlerTests.cs b/tests/Neuroglia.AsyncApi.UnitTests/Cases/Client/Bindings/AmqpBindingHandlerTests.cs new file mode 100644 index 0000000..a770889 --- /dev/null +++ b/tests/Neuroglia.AsyncApi.UnitTests/Cases/Client/Bindings/AmqpBindingHandlerTests.cs @@ -0,0 +1,219 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Neuroglia.AsyncApi.Bindings.Amqp; +using Neuroglia.AsyncApi.Client; +using Neuroglia.AsyncApi.Client.Bindings; +using Neuroglia.AsyncApi.UnitTests.Containers; +using RabbitMQ.Client; +using System.Security.Authentication.ExtendedProtection; +using System.Threading; + +namespace Neuroglia.AsyncApi.UnitTests.Cases.Client.Bindings; + +public class AmqpBindingHandlerTests + : BindingHandlerTestsBase +{ + public AmqpBindingHandlerTests() + : base(builder => builder.AddAmqpBindingHandler(), ConfigureServices) + { + + } + + [Fact] + public async Task Publish_Should_Work() + { + //arrange + var serverId = "amqp-server"; + var channelId = "cloud-events"; + var operationId = "publishCloudEvent"; + var messageId = "cloudEvent"; + var stringSchema = new JsonSchemaBuilder().Type(SchemaValueType.String).Build(); + var objectSchema = new JsonSchemaBuilder().Type(SchemaValueType.Object).AdditionalProperties(true).Build(); + var document = DocumentBuilder + .UsingAsyncApiV3() + .WithTitle("Test AMQP API") + .WithVersion("1.0.0") + .WithServer(serverId, server => server + .WithHost($"localhost:{ServiceProvider.GetRequiredKeyedService("amqp").GetMappedPublicPort(AmqpContainerBuilder.PublicPort)}") + .WithProtocol(AsyncApiProtocol.Amqp) + .WithBinding(new AmqpServerBindingDefinition())) + .WithChannel(channelId, channel => channel + .WithAddress("cloud-event") + .WithServer($"#/servers/{serverId}") + .WithMessage(messageId, message => message + .WithContentType(CloudEventContentType.Json) + .WithPayloadSchema(schemaDefinition => schemaDefinition + .WithJsonSchema(schema => schema + .Type(SchemaValueType.Object) + .Properties(new Dictionary() + { + { CloudEventAttributes.SpecVersion, stringSchema }, + { CloudEventAttributes.Id, stringSchema }, + { CloudEventAttributes.Time, stringSchema }, + { CloudEventAttributes.Source, stringSchema }, + { CloudEventAttributes.Type, stringSchema }, + { CloudEventAttributes.Subject, stringSchema }, + { CloudEventAttributes.DataSchema, stringSchema }, + { CloudEventAttributes.DataContentType, stringSchema }, + { CloudEventAttributes.Data, objectSchema }, + }) + .Required(CloudEventAttributes.GetRequiredAttributes()) + .AdditionalProperties(true))) + .WithBinding(new AmqpMessageBindingDefinition())) + .WithBinding(new AmqpChannelBindingDefinition())) + .WithOperation(operationId, operation => operation + .WithAction(v3.V3OperationAction.Receive) + .WithChannel($"#/channels/{channelId}") + .WithMessage($"#/channels/{channelId}/messages/{messageId}") + .WithBinding(new AmqpOperationBindingDefinition())) + .Build(); + await using var client = ClientFactory.CreateFor(document); + + //act + var e = new CloudEvent() + { + Id = Guid.NewGuid().ToString(), + SpecVersion = CloudEventSpecVersion.V1.Version, + Source = new("https://unit-tests.v3.asyncapi.neuroglia.io"), + Type = "io.neuroglia.asyncapi.v3.test.v1", + DataContentType = MediaTypeNames.Application.Json, + Data = new + { + Greetings = "Hello, World!" + } + }; + var parameters = new AsyncApiPublishOperationParameters(operationId) + { + Payload = e + }; + await using var result = await client.PublishAsync(parameters); + + //assert + result.IsSuccessful.Should().BeTrue(); + } + + [Fact] + public async Task Subscribe_Should_Work() + { + //arrange + var serverId = "amqp-server"; + var host = "localhost"; + var port = ServiceProvider.GetRequiredKeyedService("amqp").GetMappedPublicPort(AmqpContainerBuilder.PublicPort); + var serverAddress = $"{host}:{port}"; + var channelId = "cloud-events"; + var channelAddress = "cloud-event"; + var operationId = "subscribeToCloudEvents"; + var messageId = "cloud-event"; + var stringSchema = new JsonSchemaBuilder().Type(SchemaValueType.String).Build(); + var objectSchema = new JsonSchemaBuilder().Type(SchemaValueType.Object).AdditionalProperties(true).Build(); + var document = DocumentBuilder + .UsingAsyncApiV3() + .WithTitle("Test AMQP API") + .WithVersion("1.0.0") + .WithServer(serverId, server => server + .WithHost(serverAddress) + .WithProtocol(AsyncApiProtocol.Amqp) + .WithBinding(new AmqpServerBindingDefinition())) + .WithChannel(channelId, channel => channel + .WithAddress(channelAddress) + .WithServer($"#/servers/{serverId}") + .WithMessage(messageId, message => message + .WithContentType(CloudEventContentType.Json) + .WithPayloadSchema(schemaDefinition => schemaDefinition + .WithJsonSchema(schema => schema + .Type(SchemaValueType.Object) + .Properties(new Dictionary() + { + { CloudEventAttributes.SpecVersion, stringSchema }, + { CloudEventAttributes.Id, stringSchema }, + { CloudEventAttributes.Time, stringSchema }, + { CloudEventAttributes.Source, stringSchema }, + { CloudEventAttributes.Type, stringSchema }, + { CloudEventAttributes.Subject, stringSchema }, + { CloudEventAttributes.DataSchema, stringSchema }, + { CloudEventAttributes.DataContentType, stringSchema }, + { CloudEventAttributes.Data, objectSchema }, + }) + .Required(CloudEventAttributes.GetRequiredAttributes()) + .AdditionalProperties(true))) + .WithBinding(new AmqpMessageBindingDefinition())) + .WithBinding(new AmqpChannelBindingDefinition())) + .WithOperation(operationId, operation => operation + .WithAction(v3.V3OperationAction.Send) + .WithChannel($"#/channels/{channelId}") + .WithMessage($"#/channels/{channelId}/messages/{messageId}") + .WithBinding(new AmqpOperationBindingDefinition())) + .Build(); + await using var client = ClientFactory.CreateFor(document); + + //act + var parameters = new AsyncApiSubscribeOperationParameters(operationId); + await using var result = await client.SubscribeAsync(parameters); + var messageCount = 10; + var messagesToSend = new List>(); + for (var i = 0; i < messageCount; i++) + { + var e = new CloudEvent() + { + Id = Guid.NewGuid().ToString(), + SpecVersion = CloudEventSpecVersion.V1.Version, + Source = new("https://unit-tests.v3.asyncapi.neuroglia.io"), + Subject = i.ToString(), + Type = "io.neuroglia.asyncapi.v3.test.v1", + DataContentType = MediaTypeNames.Application.Json, + Data = new + { + Greetings = "Hello, World!" + } + }; + var payload = JsonSerializer.Default.SerializeToByteArray(e)!; + var headers = new Dictionary() + { + { "HeaderValue1", 69 }, + { "HeaderValue2", "Lorem Ipsum" }, + { "HeaderValue3", true } + }; + var properties = new BasicProperties() + { + ContentType = MediaTypeNames.Application.Json, + Headers = headers + }; + messagesToSend.Add(new(properties, payload)); + } + var messagesReceived = new List(); + var subscription = result.Messages?.Subscribe(messagesReceived.Add); + var connectionFactory = new ConnectionFactory + { + HostName = host, + Port = port + }; + using var connection = await connectionFactory.CreateConnectionAsync(); + using var channel = await connection.CreateChannelAsync(); + foreach (var message in messagesToSend) await channel.BasicPublishAsync(string.Empty, channelAddress, false, message.Item1, message.Item2); + await Task.Delay(3500); + subscription?.Dispose(); + + //assert + result.IsSuccessful.Should().BeTrue(); + result.Messages.Should().NotBeNull(); + messagesReceived.Should().NotBeEmpty(); + } + static void ConfigureServices(IServiceCollection services) + { + services.AddKeyedSingleton("amqp", AmqpContainerBuilder.Build()); + services.AddSingleton(provider => provider.GetRequiredKeyedService("amqp")); + services.AddHostedService(); + } + +} \ No newline at end of file diff --git a/tests/Neuroglia.AsyncApi.UnitTests/Containers/AmqpContainerBuilder.cs b/tests/Neuroglia.AsyncApi.UnitTests/Containers/AmqpContainerBuilder.cs new file mode 100644 index 0000000..3ab92d9 --- /dev/null +++ b/tests/Neuroglia.AsyncApi.UnitTests/Containers/AmqpContainerBuilder.cs @@ -0,0 +1,36 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using DotNet.Testcontainers.Builders; +using DotNet.Testcontainers.Containers; + +namespace Neuroglia.AsyncApi.UnitTests.Containers; + +public static class AmqpContainerBuilder +{ + + public const int PublicPort = 5672; + + public static IContainer Build() + { + return new ContainerBuilder() + .WithName($"amqp-{Guid.NewGuid():N}") + .WithImage("rabbitmq") + .WithPortBinding(PublicPort, true) + .WithWaitStrategy(Wait + .ForUnixContainer() + .UntilMessageIsLogged(".* Time to start RabbitMQ: .*")) + .Build(); + } + +}