Skip to content

Commit

Permalink
feat(Client): Added a new AMQP binding handler
Browse files Browse the repository at this point in the history
Signed-off-by: Charles d'Avernas <[email protected]>
  • Loading branch information
cdavernas committed Jan 6, 2025
1 parent ccacdb3 commit a7e2960
Show file tree
Hide file tree
Showing 22 changed files with 905 additions and 13 deletions.
7 changes: 7 additions & 0 deletions Neuroglia.AsyncApi.sln
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Neuroglia.AsyncApi.Client.B
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Neuroglia.AsyncApi.Client.Bindings.Kafka", "src\Neuroglia.AsyncApi.Client.Bindings.Kafka\Neuroglia.AsyncApi.Client.Bindings.Kafka.csproj", "{84AAE014-BD45-4B6F-96CE-01E364B802EB}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Neuroglia.AsyncApi.Client.Bindings.Amqp", "src\Neuroglia.AsyncApi.Client.Bindings.Amqp\Neuroglia.AsyncApi.Client.Bindings.Amqp.csproj", "{40A5544D-723E-4329-98A6-E512EC367F62}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -145,6 +147,10 @@ Global
{84AAE014-BD45-4B6F-96CE-01E364B802EB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{84AAE014-BD45-4B6F-96CE-01E364B802EB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{84AAE014-BD45-4B6F-96CE-01E364B802EB}.Release|Any CPU.Build.0 = Release|Any CPU
{40A5544D-723E-4329-98A6-E512EC367F62}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{40A5544D-723E-4329-98A6-E512EC367F62}.Debug|Any CPU.Build.0 = Debug|Any CPU
{40A5544D-723E-4329-98A6-E512EC367F62}.Release|Any CPU.ActiveCfg = Release|Any CPU
{40A5544D-723E-4329-98A6-E512EC367F62}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -170,6 +176,7 @@ Global
{D7665DAF-42CD-47E7-8A77-52C82B696D9B} = {4B933DF9-CD24-44B1-AF64-0D5E75B9AB45}
{DCADC636-53BA-408A-A870-CC07FD63A3F3} = {4B933DF9-CD24-44B1-AF64-0D5E75B9AB45}
{84AAE014-BD45-4B6F-96CE-01E364B802EB} = {4B933DF9-CD24-44B1-AF64-0D5E75B9AB45}
{40A5544D-723E-4329-98A6-E512EC367F62} = {4B933DF9-CD24-44B1-AF64-0D5E75B9AB45}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {DC433DEB-01E5-4328-B0BB-6FFFE8C7363F}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public static class IAsyncApiClientOptionsBuilderExtensions
/// <returns>The configured <see cref="IAsyncApiClientOptionsBuilder"/></returns>
public static IAsyncApiClientOptionsBuilder AddAllBindingHandlers(this IAsyncApiClientOptionsBuilder builder)
{
builder.AddAmqpBindingHandler();
builder.AddHttpBindingHandler();
builder.AddKafkaBindingHandler();
builder.AddMqttBindingHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Neuroglia.AsyncApi.Client.Bindings.Amqp\Neuroglia.AsyncApi.Client.Bindings.Amqp.csproj" />
<ProjectReference Include="..\Neuroglia.AsyncApi.Client.Bindings.Http\Neuroglia.AsyncApi.Client.Bindings.Http.csproj" />
<ProjectReference Include="..\Neuroglia.AsyncApi.Client.Bindings.Kafka\Neuroglia.AsyncApi.Client.Bindings.Kafka.csproj" />
<ProjectReference Include="..\Neuroglia.AsyncApi.Client.Bindings.Mqtt\Neuroglia.AsyncApi.Client.Bindings.Mqtt.csproj" />
Expand Down
190 changes: 190 additions & 0 deletions src/Neuroglia.AsyncApi.Client.Bindings.Amqp/AmqpBindingHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright © 2021-Present Neuroglia SRL. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

namespace Neuroglia.AsyncApi.Client.Bindings.Amqp;

/// <summary>
/// Represents the default Amqp implementation of the <see cref="IBindingHandler"/> interface
/// </summary>
/// <param name="serviceProvider">The current <see cref="IServiceProvider"/></param>
/// <param name="logger">The service used to perform logging</param>
/// <param name="options">The service used to access the current <see cref="AmqpBindingHandlerOptions"/></param>
/// <param name="serializerProvider">The service used to provide <see cref="ISerializer"/>s</param>
public class AmqpBindingHandler(IServiceProvider serviceProvider, ILogger<AmqpBindingHandler> logger, IOptions<AmqpBindingHandlerOptions> options, ISerializerProvider serializerProvider)
: IBindingHandler<AmqpBindingHandlerOptions>
{

/// <summary>
/// Gets the current <see cref="IServiceProvider"/>
/// </summary>
protected IServiceProvider ServiceProvider { get; } = serviceProvider;

/// <summary>
/// Gets the service used to perform logging
/// </summary>
protected ILogger Logger { get; } = logger;

/// <summary>
/// Gets the current <see cref="AmqpBindingHandlerOptions"/>
/// </summary>
protected AmqpBindingHandlerOptions Options { get; } = options.Value;

/// <summary>
/// Gets the service used to provide <see cref="ISerializer"/>s
/// </summary>
protected ISerializerProvider SerializerProvider { get; } = serializerProvider;

/// <inheritdoc/>
public virtual bool Supports(string protocol, string? protocolVersion) => protocol.Equals(AsyncApiProtocol.Amqp, StringComparison.OrdinalIgnoreCase);

/// <inheritdoc/>
public virtual async Task<IAsyncApiPublishOperationResult> PublishAsync(AsyncApiPublishOperationContext context, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(context);
try
{
var channelBinding = context.ChannelBinding as AmqpChannelBindingDefinition;
var operationBinding = context.OperationBinding as AmqpOperationBindingDefinition;
var messageBinding = context.MessageBinding as AmqpMessageBindingDefinition;
var channelType = channelBinding?.Type ?? AmqpChannelType.Queue;
var virtualHost = channelBinding?.Type switch
{
AmqpChannelType.Queue => channelBinding?.Queue?.VirtualHost,
AmqpChannelType.RoutingKey => channelBinding?.Exchange?.VirtualHost,
_ => null
};
var hostComponents = context.Host.Split(':', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries);
var host = hostComponents[0];
var port = 5672;
if (hostComponents.Length > 1) port = int.Parse(hostComponents[1]);
var connectionFactory = new ConnectionFactory
{
HostName = host,
Port = port
};
if(!string.IsNullOrWhiteSpace(virtualHost)) connectionFactory.VirtualHost = virtualHost;
using var connection = await connectionFactory.CreateConnectionAsync(cancellationToken).ConfigureAwait(false);
using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
try
{
switch (channelType)
{
case AmqpChannelType.Queue:
var queueDeclareResult = await channel.QueueDeclareAsync(channelBinding?.Queue?.Name ?? string.Empty, channelBinding?.Queue?.Durable ?? false, channelBinding?.Queue?.Exclusive ?? false, channelBinding?.Queue?.AutoDelete ?? false, cancellationToken: cancellationToken).ConfigureAwait(false);
break;
case AmqpChannelType.RoutingKey:
await channel.ExchangeDeclareAsync(channelBinding?.Exchange?.Name ?? string.Empty, EnumHelper.Stringify(channelBinding?.Exchange?.Type ?? AmqpExchangeType.Default), channelBinding?.Exchange?.Durable ?? false, channelBinding?.Exchange?.AutoDelete ?? false, cancellationToken: cancellationToken).ConfigureAwait(false);
break;
default:
throw new NotSupportedException($"The specified AMQP channel type '{channelType}' is not supported");
}
}
catch { }
var exchangeName = channelBinding?.Exchange?.Name ?? string.Empty;
var serializer = SerializerProvider.GetSerializersFor(context.ContentType).FirstOrDefault() ?? throw new NullReferenceException($"Failed to find a serializer for the specified content type '{context.ContentType}'");
using var stream = new MemoryStream();
serializer.Serialize(context.Payload ?? new { }, stream);
await stream.FlushAsync(cancellationToken).ConfigureAwait(false);
stream.Position = 0;
var deliveryMode = operationBinding?.DeliveryMode switch
{
AmqpDeliveryMode.Transient => (DeliveryModes?)DeliveryModes.Transient,
AmqpDeliveryMode.Persistent => DeliveryModes.Persistent,
_ => null
};
var properties = new BasicProperties()
{
ContentType = context.ContentType,
UserId = operationBinding?.UserId,
Priority = operationBinding?.Priority ?? 0,
ReplyTo = operationBinding?.ReplyTo,
Headers = context.Headers.ToDictionary()!
};
if(deliveryMode.HasValue) properties.DeliveryMode = deliveryMode.Value;
var payload = stream.ToArray();
await channel.BasicPublishAsync(exchangeName, context.Channel!, operationBinding?.Mandatory ?? false, properties, payload, cancellationToken).ConfigureAwait(false);
return new AmqpPublishOperationResult();
}
catch (Exception ex)
{
return new AmqpPublishOperationResult()
{
Exception = ex
};
}
}

/// <inheritdoc/>
public virtual async Task<IAsyncApiSubscribeOperationResult> SubscribeAsync(AsyncApiSubscribeOperationContext context, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(context);
try
{
var channelBinding = context.ChannelBinding as AmqpChannelBindingDefinition;
var operationBinding = context.OperationBinding as AmqpOperationBindingDefinition;
var channelType = channelBinding?.Type ?? AmqpChannelType.Queue;
var virtualHost = channelBinding?.Type switch
{
AmqpChannelType.Queue => channelBinding?.Queue?.VirtualHost,
AmqpChannelType.RoutingKey => channelBinding?.Exchange?.VirtualHost,
_ => null
};
var hostComponents = context.Host.Split(':', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries);
var host = hostComponents[0];
var port = 5672;
if (hostComponents.Length > 1) port = int.Parse(hostComponents[1]);
var connectionFactory = new ConnectionFactory
{
HostName = host,
Port = port
};
if (!string.IsNullOrWhiteSpace(virtualHost)) connectionFactory.VirtualHost = virtualHost;
var connection = await connectionFactory.CreateConnectionAsync(cancellationToken).ConfigureAwait(false);
var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
var queueName = channelBinding?.Queue?.Name ?? context.Channel ?? string.Empty;
var exchangeName = channelBinding?.Exchange?.Name ?? string.Empty;
try
{
switch (channelType)
{
case AmqpChannelType.Queue:
var queueDeclareResult = await channel.QueueDeclareAsync(queueName, channelBinding?.Queue?.Durable ?? false, channelBinding?.Queue?.Exclusive ?? false, channelBinding?.Queue?.AutoDelete ?? false, cancellationToken: cancellationToken).ConfigureAwait(false);
break;
case AmqpChannelType.RoutingKey:
await channel.ExchangeDeclareAsync(exchangeName, EnumHelper.Stringify(channelBinding?.Exchange?.Type ?? AmqpExchangeType.Default), channelBinding?.Exchange?.Durable ?? false, channelBinding?.Exchange?.AutoDelete ?? false, cancellationToken: cancellationToken).ConfigureAwait(false);
break;
default:
throw new NotSupportedException($"The specified AMQP channel type '{channelType}' is not supported");
}
}
catch { }
var consumer = new AsyncEventingBasicConsumer(channel);
var subscription = ActivatorUtilities.CreateInstance<AmqpSubscription>(ServiceProvider, context.Document, context.Messages, context.DefaultContentType, connection, channel, consumer);
await channel.BasicConsumeAsync(queueName, !(operationBinding?.Ack ?? false), consumer, cancellationToken).ConfigureAwait(false);
return new AmqpSubscribeOperationResult(subscription);
}
catch (Exception ex)
{
return new AmqpSubscribeOperationResult()
{
Exception = ex
};
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright © 2021-Present Neuroglia SRL. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace Neuroglia.AsyncApi.Client.Bindings.Amqp;

/// <summary>
/// Represents an object used to describe the result of a Amqp publish operation
/// </summary>
public class AmqpPublishOperationResult
: AsyncApiPublishOperationResult
{

/// <summary>
/// Gets/sets the <see cref="System.Exception"/>, if any, that occurred during publishing
/// </summary>
public virtual Exception? Exception { get; init; }

/// <inheritdoc/>
public override bool IsSuccessful => Exception == null;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright © 2021-Present Neuroglia SRL. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace Neuroglia.AsyncApi.Client.Bindings.Amqp;

/// <summary>
/// Represents an object used to describe the result of a Amqp subscribe operation
/// </summary>
/// <param name="messages">An <see cref="IObservable{T}"/>, if any, used to observe incoming <see cref="IAsyncApiMessage"/>s</param>
public class AmqpSubscribeOperationResult(IObservable<IAsyncApiMessage>? messages = null)
: AsyncApiSubscribeOperationResult
{

/// <summary>
/// Gets/sets the <see cref="System.Exception"/>, if any, that occurred during subscription
/// </summary>
public virtual Exception? Exception { get; init; }

/// <inheritdoc/>
public override IObservable<IAsyncApiMessage>? Messages { get; } = messages;

/// <inheritdoc/>
public override bool IsSuccessful => Exception == null;

}
Loading

0 comments on commit a7e2960

Please sign in to comment.