diff --git a/Source/Euonia.Application/Euonia.Application.csproj b/Source/Euonia.Application/Euonia.Application.csproj index 8d407c7..2f6db60 100644 --- a/Source/Euonia.Application/Euonia.Application.csproj +++ b/Source/Euonia.Application/Euonia.Application.csproj @@ -6,10 +6,6 @@ disable - - - - diff --git a/Source/Euonia.Application/Seedwork/IRequestContextAccessor.cs b/Source/Euonia.Application/Seedwork/IRequestContextAccessor.cs index 0ba7962..fd633be 100644 --- a/Source/Euonia.Application/Seedwork/IRequestContextAccessor.cs +++ b/Source/Euonia.Application/Seedwork/IRequestContextAccessor.cs @@ -1,4 +1,6 @@ -using System.Security.Claims; +using System.Net; +using System.Security.Claims; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Primitives; namespace Nerosoft.Euonia.Application; @@ -8,35 +10,65 @@ namespace Nerosoft.Euonia.Application; /// public interface IRequestContextAccessor { - /// - /// Gets the authenticated user information for the current request. - /// - ClaimsPrincipal User { get; } - - /// - /// Gets the items for the current request. - /// - IDictionary Items { get; } - - /// - /// Gets the request trace identifier. - /// - string TraceIdentifier { get; } - - /// - /// Gets the injected service provider for the current request. - /// - IServiceProvider RequestServices { get; } - - /// - /// Gets the request cancellation token. - /// - CancellationToken RequestAborted { get; } - - /// - /// Gets the request headers. - /// - IDictionary RequestHeaders { get; } + /// + /// Gets a unique identifier to represent the connection. + /// + string ConnectionId { get; } + + /// + /// Gets the IP address of the remote target. Can be null. + /// + IPAddress RemoteIpAddress { get; } + + /// + /// Gets the port of the remote target. + /// + int RemotePort { get; } + + /// + /// + /// + bool IsWebSocketRequest { get; } + + /// + /// Gets the authenticated user information for the current request. + /// + ClaimsPrincipal User { get; } + + /// + /// Gets the items for the current request. + /// + IDictionary Items { get; } + + /// + /// Gets the request trace identifier. + /// + string TraceIdentifier { get; } + + /// + /// Gets the injected service provider for the current request. + /// + IServiceProvider RequestServices { get; } + + /// + /// Gets the request cancellation token. + /// + CancellationToken RequestAborted { get; } + + /// + /// Gets the request headers. + /// + IDictionary RequestHeaders { get; } + + /// + /// Gets the Authorization HTTP header. + /// + StringValues Authorization => RequestHeaders?.TryGetValue(nameof(Authorization)) ?? default; + + /// + /// Gets or sets the Request-Id HTTP header. + /// + StringValues RequestId => RequestHeaders?.TryGetValue("Request-Id") ?? default; } /// @@ -69,64 +101,72 @@ public interface IRequestContextAccessor /// public delegate IDictionary GetRequestHeadersDelegate(); +/// +/// Define delegation to get connection information of current request. +/// +/// +public delegate Tuple GetConnectionInfoDelegate(); + /// /// An implementation of using delegate methods. /// public class DelegateRequestContextAccessor : IRequestContextAccessor { - private readonly GetRequestUserDelegate _getUser; - private readonly GetRequestItemsDelegate _getItems; - private readonly GetRequestTraceIdentifierDelegate _getTraceIdentifier; - private readonly GetRequestServicesDelegate _getServices; - private readonly GetRequestAbortedDelegate _getAborted; - private readonly GetRequestHeadersDelegate _getHeaders; - - /// - /// Initializes a new instance of the class. - /// - public DelegateRequestContextAccessor() - { - } - - /// - public DelegateRequestContextAccessor(GetRequestUserDelegate getUser, GetRequestItemsDelegate getItems, GetRequestTraceIdentifierDelegate getTraceIdentifier, GetRequestServicesDelegate getServices, GetRequestAbortedDelegate getAborted, GetRequestHeadersDelegate getHeaders) - : this() - { - _getUser = getUser; - _getItems = getItems; - _getTraceIdentifier = getTraceIdentifier; - _getServices = getServices; - _getAborted = getAborted; - _getHeaders = getHeaders; - } - - /// - /// Gets the authenticated user information for the current request. - /// - public ClaimsPrincipal User => _getUser?.Invoke(); - - /// - /// Gets the items for the current request. - /// - public IDictionary Items => _getItems?.Invoke(); - - /// - /// Gets the request trace identifier. - /// - public string TraceIdentifier => _getTraceIdentifier?.Invoke(); - - /// - /// Gets the injected service provider for the current request. - /// - public IServiceProvider RequestServices => _getServices?.Invoke(); - - /// - /// Gets the request cancellation token. - /// - public CancellationToken RequestAborted => _getAborted?.Invoke() ?? default; - - /// - /// Gets the request headers. - /// - public IDictionary RequestHeaders => _getHeaders?.Invoke(); + + private readonly IServiceProvider _provider; + + /// + /// Initializes a new instance of the class. + /// + public DelegateRequestContextAccessor(IServiceProvider provider) + { + _provider = provider; + var (connectionId, remoteIpAddress, remotePort, isWebSocketRequest) = provider.GetService()?.Invoke() ?? default; + ConnectionId = connectionId; + RemoteIpAddress = remoteIpAddress; + RemotePort = remotePort; + IsWebSocketRequest = isWebSocketRequest; + } + + /// + public string ConnectionId { get; } + + /// + public IPAddress RemoteIpAddress { get; } + + /// + public int RemotePort { get; } + + /// + public bool IsWebSocketRequest { get; } + + /// + /// Gets the authenticated user information for the current request. + /// + public ClaimsPrincipal User => _provider.GetService()?.Invoke(); + + /// + /// Gets the items for the current request. + /// + public IDictionary Items => _provider.GetService()?.Invoke(); + + /// + /// Gets the request trace identifier. + /// + public string TraceIdentifier => _provider.GetService()?.Invoke(); + + /// + /// Gets the injected service provider for the current request. + /// + public IServiceProvider RequestServices => _provider.GetService()?.Invoke(); + + /// + /// Gets the request cancellation token. + /// + public CancellationToken RequestAborted => _provider.GetService()?.Invoke() ?? default; + + /// + /// Gets the request headers. + /// + public IDictionary RequestHeaders => _provider.GetService()?.Invoke(); } \ No newline at end of file diff --git a/Source/Euonia.Application/Services/BaseApplicationService.cs b/Source/Euonia.Application/Services/BaseApplicationService.cs index f10362b..0e3de55 100644 --- a/Source/Euonia.Application/Services/BaseApplicationService.cs +++ b/Source/Euonia.Application/Services/BaseApplicationService.cs @@ -22,4 +22,9 @@ public abstract class BaseApplicationService : IApplicationService /// Gets the current request user principal. /// protected virtual UserPrincipal User => LazyServiceProvider.GetService(); + + /// + /// Gets the current request context accessor. + /// + protected virtual IRequestContextAccessor HttpRequestAccessor => LazyServiceProvider.GetService(); } \ No newline at end of file diff --git a/Source/Euonia.Bus.Abstract/Contracts/IBusConfigurator.cs b/Source/Euonia.Bus.Abstract/Contracts/IBusConfigurator.cs index 0ca20c3..e215f1a 100644 --- a/Source/Euonia.Bus.Abstract/Contracts/IBusConfigurator.cs +++ b/Source/Euonia.Bus.Abstract/Contracts/IBusConfigurator.cs @@ -17,7 +17,7 @@ public interface IBusConfigurator /// /// /// - IBusConfigurator SerFactory() + IBusConfigurator SetFactory() where TFactory : class, IBusFactory; /// @@ -26,7 +26,7 @@ IBusConfigurator SerFactory() /// /// /// - IBusConfigurator SerFactory(TFactory factory) + IBusConfigurator SetFactory(TFactory factory) where TFactory : class, IBusFactory; /// @@ -35,7 +35,7 @@ IBusConfigurator SerFactory(TFactory factory) /// /// /// - IBusConfigurator SerFactory(Func factory) + IBusConfigurator SetFactory(Func factory) where TFactory : class, IBusFactory; /// diff --git a/Source/Euonia.Bus.Abstract/RoutedMessage.cs b/Source/Euonia.Bus.Abstract/RoutedMessage.cs index 626a67e..5bb1646 100644 --- a/Source/Euonia.Bus.Abstract/RoutedMessage.cs +++ b/Source/Euonia.Bus.Abstract/RoutedMessage.cs @@ -36,13 +36,13 @@ protected RoutedMessage() /// Gets or sets the conversation identifier. /// [DataMember] - public virtual string ConversationId { get; } + public virtual string ConversationId { get; set; } = Guid.NewGuid().ToString(); /// /// Gets or sets the request trace identifier. /// [DataMember] - public virtual string RequestTraceId { get; } + public virtual string RequestTraceId { get; set; } /// /// Gets or sets the channel that the message send to. diff --git a/Source/Euonia.Bus.InMemory/BusConfiguratorExtensions.cs b/Source/Euonia.Bus.InMemory/BusConfiguratorExtensions.cs index 18b3443..98c151f 100644 --- a/Source/Euonia.Bus.InMemory/BusConfiguratorExtensions.cs +++ b/Source/Euonia.Bus.InMemory/BusConfiguratorExtensions.cs @@ -1,6 +1,5 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; -using Microsoft.Extensions.Options; using Nerosoft.Euonia.Bus.InMemory; namespace Nerosoft.Euonia.Bus; @@ -15,29 +14,13 @@ public static class BusConfiguratorExtensions /// /// /// - /// - /// public static void UseInMemory(this IBusConfigurator configurator, Action configuration) { configurator.Service.Configure(configuration); - configurator.Service.TryAddSingleton(provider => - { - var options = provider.GetService>()?.Value; - if (options == null) - { - throw new InvalidOperationException("The in-memory message dispatcher options is not configured."); - } - - IMessenger messenger = options.MessengerReference switch - { - MessengerReferenceType.StrongReference => StrongReferenceMessenger.Default, - MessengerReferenceType.WeakReference => WeakReferenceMessenger.Default, - _ => throw new ArgumentOutOfRangeException(nameof(options.MessengerReference), options.MessengerReference, null) - }; - return messenger; - }); + configurator.Service.TryAddTransient(); + configurator.Service.TryAddTransient(); configurator.Service.TryAddSingleton(); configurator.Service.AddTransient(); - configurator.SerFactory(); + configurator.SetFactory(); } } \ No newline at end of file diff --git a/Source/Euonia.Bus.InMemory/InMemoryBusOptions.cs b/Source/Euonia.Bus.InMemory/InMemoryBusOptions.cs index 109b819..69f2732 100644 --- a/Source/Euonia.Bus.InMemory/InMemoryBusOptions.cs +++ b/Source/Euonia.Bus.InMemory/InMemoryBusOptions.cs @@ -22,9 +22,4 @@ public class InMemoryBusOptions /// true if the subscriber should create for each message channel; otherwise, false. default is false. /// public bool MultipleSubscriberInstance { get; set; } - - /// - /// Gets or sets the messenger reference type. - /// - public MessengerReferenceType MessengerReference { get; set; } = MessengerReferenceType.StrongReference; } \ No newline at end of file diff --git a/Source/Euonia.Bus.InMemory/InMemoryDispatcher.cs b/Source/Euonia.Bus.InMemory/InMemoryDispatcher.cs index 4b3c7df..b58fc64 100644 --- a/Source/Euonia.Bus.InMemory/InMemoryDispatcher.cs +++ b/Source/Euonia.Bus.InMemory/InMemoryDispatcher.cs @@ -8,17 +8,6 @@ public class InMemoryDispatcher : DisposableObject, IDispatcher /// public event EventHandler Delivered; - private readonly IMessenger _messenger; - - /// - /// Initializes a new instance of the class. - /// - /// - public InMemoryDispatcher(IMessenger messenger) - { - _messenger = messenger; - } - /// public async Task PublishAsync(RoutedMessage message, CancellationToken cancellationToken = default) where TMessage : class @@ -28,7 +17,7 @@ public async Task PublishAsync(RoutedMessage message, Cancel { Aborted = cancellationToken }; - _messenger.Send(pack, message.Channel); + WeakReferenceMessenger.Default.Send(pack, message.Channel); Delivered?.Invoke(this, new MessageDispatchedEventArgs(message.Data, context)); await Task.CompletedTask; } @@ -55,7 +44,8 @@ public async Task SendAsync(RoutedMessage message, Cancellat taskCompletion.SetResult(); }; - _messenger.Send(pack, message.Channel); + StrongReferenceMessenger.Default.UnsafeSend(pack, message.Channel); + Delivered?.Invoke(this, new MessageDispatchedEventArgs(message.Data, context)); await taskCompletion.Task; @@ -87,7 +77,7 @@ public async Task SendAsync(RoutedMessage SendAsync(RoutedMessage protected override void Dispose(bool disposing) { - _messenger.Cleanup(); + StrongReferenceMessenger.Default.Reset(); + WeakReferenceMessenger.Default.Reset(); } } \ No newline at end of file diff --git a/Source/Euonia.Bus.InMemory/InMemoryRecipientRegistrar.cs b/Source/Euonia.Bus.InMemory/InMemoryRecipientRegistrar.cs index 2d9e758..ec1dace 100644 --- a/Source/Euonia.Bus.InMemory/InMemoryRecipientRegistrar.cs +++ b/Source/Euonia.Bus.InMemory/InMemoryRecipientRegistrar.cs @@ -1,4 +1,5 @@ -using Microsoft.Extensions.DependencyInjection; +using System.Collections.Concurrent; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; namespace Nerosoft.Euonia.Bus.InMemory; @@ -11,64 +12,54 @@ public sealed class InMemoryRecipientRegistrar : IRecipientRegistrar private readonly InMemoryBusOptions _options; private readonly IMessageConvention _convention; private readonly IServiceProvider _provider; - private readonly IMessenger _messenger; /// /// Initializes a new instance of the . /// - /// /// /// /// - public InMemoryRecipientRegistrar(IMessenger messenger, IMessageConvention convention, IServiceProvider provider, IOptions options) + /// + public InMemoryRecipientRegistrar(IMessageConvention convention, IServiceProvider provider, IOptions options) { _options = options.Value; _convention = convention; _provider = provider; - _messenger = messenger; } /// public async Task RegisterAsync(IEnumerable registrations, CancellationToken cancellationToken = default) { - if (_options.MultipleSubscriberInstance) + var recipients = new ConcurrentDictionary(); + + foreach (var registration in registrations) { - foreach (var registration in registrations) + if (_convention.IsQueueType(registration.MessageType)) + { + var recipient = GetRecipient(); + StrongReferenceMessenger.Default.Register(recipient, registration.Channel); + } + else if (_convention.IsTopicType(registration.MessageType)) + { + var recipient = GetRecipient(); + WeakReferenceMessenger.Default.Register(recipient, registration.Channel); + } + else { - InMemoryRecipient recipient; - if (_convention.IsQueueType(registration.MessageType)) - { - recipient = ActivatorUtilities.GetServiceOrCreateInstance(_provider); - } - else if (_convention.IsTopicType(registration.MessageType)) - { - recipient = ActivatorUtilities.GetServiceOrCreateInstance(_provider); - } - else - { - throw new InvalidOperationException(); - } - _messenger.Register(recipient, registration.Channel); + throw new InvalidOperationException(); } } - else + + TRecipient GetRecipient() + where TRecipient : InMemoryRecipient, IRecipient { - foreach (var registration in registrations) + if (_options.MultipleSubscriberInstance) + { + return _provider.GetService(); + } + else { - InMemoryRecipient recipient; - if (_convention.IsQueueType(registration.MessageType)) - { - recipient = Singleton.Get(() => ActivatorUtilities.GetServiceOrCreateInstance(_provider)); - } - else if (_convention.IsTopicType(registration.MessageType)) - { - recipient = Singleton.Get(() => ActivatorUtilities.GetServiceOrCreateInstance(_provider)); - } - else - { - throw new InvalidOperationException(); - } - _messenger.Register(recipient, registration.Channel); + return (TRecipient)recipients.GetOrAdd(typeof(TRecipient), type => _provider.GetService()); } } diff --git a/Source/Euonia.Bus.InMemory/Messenger/StrongReferenceMessenger.cs b/Source/Euonia.Bus.InMemory/Messenger/StrongReferenceMessenger.cs index 3a3c30d..c3e7ddd 100644 --- a/Source/Euonia.Bus.InMemory/Messenger/StrongReferenceMessenger.cs +++ b/Source/Euonia.Bus.InMemory/Messenger/StrongReferenceMessenger.cs @@ -233,7 +233,7 @@ public void UnregisterAll(object recipient) foreach (var mapping in set) { if (mapping.TryRemove(key) && - mapping.Count == 0) + mapping.Count == 0) { // Maps here are really of type Mapping<,> and with unknown type arguments. // If after removing the current recipient a given map becomes empty, it means @@ -332,7 +332,7 @@ public void UnregisterAll(object recipient, TToken token) // Try to remove the registered handler for the input token, // for the current message type (unknown from here). if (holder.TryRemove(token) && - holder.Count == 0) + holder.Count == 0) { // If the map is empty, remove the recipient entirely from its container _ = handlersMap.TryRemove(key); @@ -450,7 +450,7 @@ public void Unregister(object recipient, TToken token) // Remove the target handler if (dictionary.TryRemove(token) && - dictionary.Count == 0) + dictionary.Count == 0) { // If the map is empty, it means that the current recipient has no remaining // registered handlers for the current combination, regardless, @@ -593,7 +593,136 @@ public TMessage Send(TMessage message, TToken token) ArrayPool.Shared.Return(rentedArray); } - End: + End: + return message; + } + + /// + /// Sends a message of the specified type to all registered recipients. + /// + /// The type of message to send. + /// The type of token to identify what channel to use to send the message. + /// The message to send. + /// The token indicating what channel to use. + /// The message that was sent (ie. ). + /// Thrown if or are . + public TMessage UnsafeSend(TMessage message, TToken token) + where TMessage : class + where TToken : IEquatable + { + ArgumentAssert.ThrowIfNull(message); + ArgumentAssert.For.ThrowIfNull(token); + + object[] rentedArray; + Span pairs; + var i = 0; + + lock (_recipientsMap) + { + if (typeof(TToken) == typeof(Unit)) + { + // Check whether there are any registered recipients + if (!TryGetMapping(out var mapping)) + { + throw new InvalidOperationException("No recipients registered for the input message type."); + } + + // Check the number of remaining handlers, see below + var totalHandlersCount = mapping.Count; + + if (totalHandlersCount == 0) + { + throw new InvalidOperationException("No recipients registered for the input message type."); + } + + pairs = rentedArray = ArrayPool.Shared.Rent(2 * totalHandlersCount); + + // Same logic as below, except here we're only traversing one handler per recipient + var mappingEnumerator = mapping.GetEnumerator(); + + while (mappingEnumerator.MoveNext()) + { + pairs[2 * i] = mappingEnumerator.GetValue(); + pairs[(2 * i) + 1] = mappingEnumerator.GetKey().Target; + i++; + } + } + else + { + // Check whether there are any registered recipients + if (!TryGetMapping(out var mapping)) + { + throw new InvalidOperationException("No recipients registered for the input message type and token."); + } + + // We need to make a local copy of the currently registered handlers, since users might + // try to unregister (or register) new handlers from inside one of the currently existing + // handlers. We can use memory pooling to reuse arrays, to minimize the average memory + // usage. In practice, we usually just need to pay the small overhead of copying the items. + // The current mapping contains all the currently registered recipients and handlers for + // the combination in use. In the worst case scenario, all recipients + // will have a registered handler with a token matching the input one, meaning that we could + // have at worst a number of pending handlers to invoke equal to the total number of recipient + // in the mapping. This relies on the fact that tokens are unique, and that there is only + // one handler associated with a given token. We can use this upper bound as the requested + // size for each array rented from the pool, which guarantees that we'll have enough space. + var totalHandlersCount = mapping.Count; + + if (totalHandlersCount == 0) + { + throw new InvalidOperationException("No recipients registered for the input message type."); + } + + // Rent the array and also assign it to a span, which will be used to access values. + // We're doing this to avoid the array covariance checks slowdown in the loops below. + pairs = rentedArray = ArrayPool.Shared.Rent(2 * totalHandlersCount); + + // Copy the handlers to the local collection. + // The array is oversized at this point, since it also includes + // handlers for different tokens. We can reuse the same variable + // to count the number of matching handlers to invoke later on. + // This will be the array slice with valid handler in the rented buffer. + var mappingEnumerator = mapping.GetEnumerator(); + + // Explicit enumerator usage here as we're using a custom one + // that doesn't expose the single standard Current property. + while (mappingEnumerator.MoveNext()) + { + // Pick the target handler, if the token is a match for the recipient + if (mappingEnumerator.GetValue().TryGetValue(token, out var handler)) + { + // This span access should always guaranteed to be valid due to the size of the + // array being set according to the current total number of registered handlers, + // which will always be greater or equal than the ones matching the previous test. + // We're still using a checked span accesses here though to make sure an out of + // bounds write can never happen even if an error was present in the logic above. + pairs[2 * i] = handler; + pairs[(2 * i) + 1] = mappingEnumerator.GetKey().Target; + i++; + } + } + + if (i == 0) + { + throw new InvalidOperationException("No recipients registered for the input message type and token."); + } + } + } + + try + { + // The core broadcasting logic is the same as the weak reference messenger one + WeakReferenceMessenger.SendAll(pairs, i, message); + } + finally + { + // As before, we also need to clear it first to avoid having potentially long + // lasting memory leaks due to leftover references being stored in the pool. + Array.Clear(rentedArray, 0, 2 * i); + + ArrayPool.Shared.Return(rentedArray); + } + return message; } diff --git a/Source/Euonia.Bus.RabbitMq/BusConfiguratorExtensions.cs b/Source/Euonia.Bus.RabbitMq/BusConfiguratorExtensions.cs index c823e9b..8feaac4 100644 --- a/Source/Euonia.Bus.RabbitMq/BusConfiguratorExtensions.cs +++ b/Source/Euonia.Bus.RabbitMq/BusConfiguratorExtensions.cs @@ -37,6 +37,6 @@ public static void UseRabbitMq(this IBusConfigurator configurator, Action(); configurator.Service.AddTransient(); - configurator.SerFactory(); + configurator.SetFactory(); } } \ No newline at end of file diff --git a/Source/Euonia.Bus/BusConfigurator.cs b/Source/Euonia.Bus/BusConfigurator.cs index 4bd7b7f..a05ed2a 100644 --- a/Source/Euonia.Bus/BusConfigurator.cs +++ b/Source/Euonia.Bus/BusConfigurator.cs @@ -33,11 +33,11 @@ public BusConfigurator(IServiceCollection service) public IServiceCollection Service { get; } /// - /// + /// Sets the bus factory. /// /// /// - public IBusConfigurator SerFactory() + public IBusConfigurator SetFactory() where TFactory : class, IBusFactory { Service.TryAddSingleton(); @@ -45,12 +45,12 @@ public IBusConfigurator SerFactory() } /// - /// + /// Sets the bus factory. /// /// /// /// - public IBusConfigurator SerFactory(TFactory factory) + public IBusConfigurator SetFactory(TFactory factory) where TFactory : class, IBusFactory { Service.TryAddSingleton(factory); @@ -58,12 +58,12 @@ public IBusConfigurator SerFactory(TFactory factory) } /// - /// + /// Sets the message serializer. /// /// /// /// - public IBusConfigurator SerFactory(Func factory) + public IBusConfigurator SetFactory(Func factory) where TFactory : class, IBusFactory { Service.TryAddSingleton(factory); @@ -71,7 +71,7 @@ public IBusConfigurator SerFactory(Func fa } /// - /// Set the message serializer. + /// Sets the message serializer. /// /// /// @@ -83,7 +83,7 @@ public BusConfigurator SetSerializer() } /// - /// Set the message serializer. + /// Sets the message serializer. /// /// /// @@ -96,7 +96,7 @@ public BusConfigurator SetSerializer(TSerializer serializer) } /// - /// Set the message store provider. + /// Sets the message store provider. /// /// /// @@ -108,7 +108,7 @@ public IBusConfigurator SetMessageStore() } /// - /// Set the message store provider. + /// Sets the message store provider. /// /// /// @@ -159,7 +159,7 @@ public BusConfigurator RegisterHandlers(IEnumerable types) } /// - /// Set the message convention. + /// Sets the message convention. /// /// /// @@ -169,4 +169,15 @@ public BusConfigurator SetConventions(Action configure Service.TryAddSingleton(ConventionBuilder.Convention); return this; } + + /// + /// Sets the request context accessor. + /// + /// + /// + public BusConfigurator SetRequestContextAccessor(RequestContextAccessor accessor) + { + Service.TryAddScoped(provider => accessor); + return this; + } } \ No newline at end of file diff --git a/Source/Euonia.Bus/Core/ExtendableOptions.cs b/Source/Euonia.Bus/Core/ExtendableOptions.cs index 619cf73..39412a8 100644 --- a/Source/Euonia.Bus/Core/ExtendableOptions.cs +++ b/Source/Euonia.Bus/Core/ExtendableOptions.cs @@ -31,4 +31,9 @@ public abstract class ExtendableOptions /// Gets or sets the queue priority. /// public virtual int Priority { get; set; } + + /// + /// Gets or sets the request trace id. + /// + public virtual string RequestTraceId { get; set; } } \ No newline at end of file diff --git a/Source/Euonia.Bus/Core/IBus.cs b/Source/Euonia.Bus/Core/IBus.cs index be8d303..4a52a22 100644 --- a/Source/Euonia.Bus/Core/IBus.cs +++ b/Source/Euonia.Bus/Core/IBus.cs @@ -13,17 +13,18 @@ public interface IBus /// /// Task PublishAsync(TMessage message, CancellationToken cancellationToken = default) - where TMessage : class => PublishAsync(message, new PublishOptions(), cancellationToken); + where TMessage : class => PublishAsync(message, new PublishOptions(), null, cancellationToken); /// /// Publishes the specified message. /// /// /// + /// /// /// /// - Task PublishAsync(TMessage message, PublishOptions options, CancellationToken cancellationToken = default) + Task PublishAsync(TMessage message, PublishOptions options, Action metadataSetter = null, CancellationToken cancellationToken = default) where TMessage : class; /// @@ -35,7 +36,19 @@ Task PublishAsync(TMessage message, PublishOptions options, Cancellati /// /// Task PublishAsync(string channel, TMessage message, CancellationToken cancellationToken = default) - where TMessage : class => PublishAsync(message, new PublishOptions { Channel = channel }, cancellationToken); + where TMessage : class => PublishAsync(channel, message, null, cancellationToken); + + /// + /// Publishes the specified message. + /// + /// + /// + /// + /// + /// + /// + Task PublishAsync(string channel, TMessage message, Action metadataSetter = null, CancellationToken cancellationToken = default) + where TMessage : class => PublishAsync(message, new PublishOptions { Channel = channel }, metadataSetter, cancellationToken); /// /// Sends the specified message. @@ -45,17 +58,18 @@ Task PublishAsync(string channel, TMessage message, CancellationToken /// /// Task SendAsync(TMessage message, CancellationToken cancellationToken = default) - where TMessage : class => SendAsync(message, new SendOptions(), cancellationToken); + where TMessage : class => SendAsync(message, new SendOptions(), null, cancellationToken); /// /// Sends the specified message. /// + /// /// /// + /// /// - /// /// - Task SendAsync(TMessage message, SendOptions options, CancellationToken cancellationToken = default) + Task SendAsync(TMessage message, SendOptions options, Action metadataSetter = null, CancellationToken cancellationToken = default) where TMessage : class; /// @@ -67,18 +81,20 @@ Task SendAsync(TMessage message, SendOptions options, CancellationToke /// /// Task SendAsync(TMessage message, CancellationToken cancellationToken = default) - where TMessage : class => SendAsync(message, new SendOptions(), cancellationToken); + where TMessage : class => SendAsync(message, new SendOptions(), null, null, cancellationToken); /// /// Sends the specified message. /// + /// + /// /// /// + /// + /// /// - /// - /// /// - Task SendAsync(TMessage message, SendOptions options, CancellationToken cancellationToken = default) + Task SendAsync(TMessage message, SendOptions options, Action metadataSetter = null, Action callback = null, CancellationToken cancellationToken = default) where TMessage : class; /// @@ -88,7 +104,7 @@ Task SendAsync(TMessage message, SendOptions options /// /// /// - Task SendAsync(IQueue message, CancellationToken cancellationToken = default) => SendAsync(message, new SendOptions(), cancellationToken); + Task SendAsync(IQueue message, CancellationToken cancellationToken = default) => SendAsync(message, new SendOptions(), null, null, cancellationToken); /// /// Sends the specified message. @@ -96,7 +112,9 @@ Task SendAsync(TMessage message, SendOptions options /// /// /// + /// + /// /// /// - Task SendAsync(IQueue message, SendOptions options, CancellationToken cancellationToken = default); + Task SendAsync(IQueue message, SendOptions options, Action metadataSetter = null, Action callback = null, CancellationToken cancellationToken = default); } \ No newline at end of file diff --git a/Source/Euonia.Bus/Core/IHandler.cs b/Source/Euonia.Bus/Core/IHandler.cs index 02387f4..5692159 100644 --- a/Source/Euonia.Bus/Core/IHandler.cs +++ b/Source/Euonia.Bus/Core/IHandler.cs @@ -5,12 +5,14 @@ /// public interface IHandler { + /* /// /// Determines whether the current message handler can handle the message with the specified message type. /// /// Type of the message to be checked. /// true if the current message handler can handle the message with the specified message type; otherwise, false. bool CanHandle(Type messageType); + */ } /// @@ -26,6 +28,6 @@ public interface IHandler : IHandler /// The message. /// The message context. /// The cancellation token. - /// Task<System.Boolean>. + /// Task HandleAsync(TMessage message, MessageContext messageContext, CancellationToken cancellationToken = default); } \ No newline at end of file diff --git a/Source/Euonia.Bus/Core/RequestContext.cs b/Source/Euonia.Bus/Core/RequestContext.cs new file mode 100644 index 0000000..5cd2c87 --- /dev/null +++ b/Source/Euonia.Bus/Core/RequestContext.cs @@ -0,0 +1,58 @@ +using System.Security.Claims; +using Microsoft.Extensions.Primitives; + +namespace Nerosoft.Euonia.Bus; + +/// +/// Contains information about the current request. +/// +public sealed class RequestContext +{ + /// + /// Gets or sets a unique identifier to represent the connection. + /// + public string ConnectionId { get; } + + /// + /// Gets or sets the user for this request. + /// + public ClaimsPrincipal User { get; set; } + + /// + /// Gets the request headers. + /// + /// The request headers. + public IDictionary Headers { get; set; } + + /// + /// Gets the Authorization HTTP header. + /// + public string Authorization + { + get + { + if (Headers == null) + { + return null; + } + + return Headers.TryGetValue(nameof(Authorization), out var value) ? value : string.Empty; + } + } + + /// + /// Gets or sets the that provides access to the request's service container. + /// + public IServiceProvider RequestServices { get; set; } + + /// + /// Notifies when the connection underlying this request is aborted and thus request operations should be + /// cancelled. + /// + public CancellationToken RequestAborted { get; set; } + + /// + /// Gets or sets a unique identifier to represent this request in trace logs. + /// + public string TraceIdentifier { get; set; } +} diff --git a/Source/Euonia.Bus/Core/RequestContextAccessor.cs b/Source/Euonia.Bus/Core/RequestContextAccessor.cs new file mode 100644 index 0000000..71cf5e2 --- /dev/null +++ b/Source/Euonia.Bus/Core/RequestContextAccessor.cs @@ -0,0 +1,7 @@ +namespace Nerosoft.Euonia.Bus; + +/// +/// The delegate to get the instance of current request. +/// +/// +public delegate RequestContext RequestContextAccessor(IServiceProvider provider); \ No newline at end of file diff --git a/Source/Euonia.Bus/Core/ServiceBus.cs b/Source/Euonia.Bus/Core/ServiceBus.cs index 60cb56b..5d39b4c 100644 --- a/Source/Euonia.Bus/Core/ServiceBus.cs +++ b/Source/Euonia.Bus/Core/ServiceBus.cs @@ -1,4 +1,6 @@ -namespace Nerosoft.Euonia.Bus; +using Microsoft.Extensions.DependencyInjection; + +namespace Nerosoft.Euonia.Bus; /// /// The implementation of interface. @@ -7,6 +9,8 @@ public sealed class ServiceBus : IBus { private readonly IDispatcher _dispatcher; private readonly IMessageConvention _convention; + private readonly IServiceAccessor _serviceAccessor; + private readonly RequestContextAccessor _contextAccessor; /// /// Initializes a new instance of the class. @@ -19,8 +23,33 @@ public ServiceBus(IBusFactory factory, IMessageConvention convention) _convention = convention; } + /// + /// Initializes a new instance of the class. + /// + /// + /// + /// + public ServiceBus(IBusFactory factory, IMessageConvention convention, IServiceAccessor serviceAccessor) + : this(factory, convention) + { + _serviceAccessor = serviceAccessor; + } + + /// + /// Initializes a new instance of the class. + /// + /// + /// + /// + /// + public ServiceBus(IBusFactory factory, IMessageConvention convention, IServiceAccessor serviceAccessor, RequestContextAccessor contextAccessor) + : this(factory, convention, serviceAccessor) + { + _contextAccessor = contextAccessor; + } + /// - public async Task PublishAsync(TMessage message, PublishOptions options, CancellationToken cancellationToken = default) + public Task PublishAsync(TMessage message, PublishOptions options, Action metadataSetter = null, CancellationToken cancellationToken = default) where TMessage : class { if (!_convention.IsTopicType(message.GetType())) @@ -28,16 +57,20 @@ public async Task PublishAsync(TMessage message, PublishOptions option throw new InvalidOperationException("The message type is not an event type."); } + var context = GetRequestContext(); + var channelName = options?.Channel ?? MessageCache.Default.GetOrAddChannel(); var pack = new RoutedMessage(message, channelName) { - MessageId = options?.MessageId ?? Guid.NewGuid().ToString() + MessageId = options?.MessageId ?? Guid.NewGuid().ToString(), + RequestTraceId = context?.TraceIdentifier ?? options.RequestTraceId ?? Guid.NewGuid().ToString("N"), }; - await _dispatcher.PublishAsync(pack, cancellationToken); + metadataSetter?.Invoke(pack.Metadata); + return _dispatcher.PublishAsync(pack, cancellationToken); } /// - public async Task SendAsync(TMessage message, SendOptions options, CancellationToken cancellationToken = default) + public Task SendAsync(TMessage message, SendOptions options, Action metadataSetter = null, CancellationToken cancellationToken = default) where TMessage : class { if (!_convention.IsQueueType(message.GetType())) @@ -45,17 +78,24 @@ public async Task SendAsync(TMessage message, SendOptions options, Can throw new InvalidOperationException("The message type is not a queue type."); } + var context = GetRequestContext(); + var channelName = options?.Channel ?? MessageCache.Default.GetOrAddChannel(); var pack = new RoutedMessage(message, channelName) { MessageId = options?.MessageId ?? Guid.NewGuid().ToString(), - CorrelationId = options?.CorrelationId ?? Guid.NewGuid().ToString() + CorrelationId = options?.CorrelationId ?? Guid.NewGuid().ToString(), + RequestTraceId = context?.TraceIdentifier ?? options.RequestTraceId ?? Guid.NewGuid().ToString("N"), + Authorization = context?.Authorization, }; - await _dispatcher.SendAsync(pack, cancellationToken); + + metadataSetter?.Invoke(pack.Metadata); + + return _dispatcher.SendAsync(pack, cancellationToken).ContinueWith(task => task.WaitAndUnwrapException()); } /// - public async Task SendAsync(TMessage message, SendOptions options, CancellationToken cancellationToken = default) + public Task SendAsync(TMessage message, SendOptions options, Action metadataSetter = null, Action callback = null, CancellationToken cancellationToken = default) where TMessage : class { if (!_convention.IsQueueType(message.GetType())) @@ -63,24 +103,63 @@ public async Task SendAsync(TMessage message, SendOp throw new InvalidOperationException("The message type is not a queue type."); } + var context = GetRequestContext(); + var channelName = options?.Channel ?? MessageCache.Default.GetOrAddChannel(); var pack = new RoutedMessage(message, channelName) { MessageId = options?.MessageId ?? Guid.NewGuid().ToString(), - CorrelationId = options?.CorrelationId ?? Guid.NewGuid().ToString() + CorrelationId = options?.CorrelationId ?? Guid.NewGuid().ToString(), + RequestTraceId = context?.TraceIdentifier ?? options.RequestTraceId ?? Guid.NewGuid().ToString("N"), + Authorization = context?.Authorization, }; - return await _dispatcher.SendAsync(pack, cancellationToken); + + metadataSetter?.Invoke(pack.Metadata); + + return _dispatcher.SendAsync(pack, cancellationToken) + .ContinueWith(task => + { + task.WaitAndUnwrapException(); + var result = task.Result; + callback?.Invoke(result); + return result; + }); } /// - public async Task SendAsync(IQueue message, SendOptions options, CancellationToken cancellationToken = default) + public Task SendAsync(IQueue message, SendOptions options, Action metadataSetter = null, Action callback = null, CancellationToken cancellationToken = default) { + var context = GetRequestContext(); + var channelName = options?.Channel ?? MessageCache.Default.GetOrAddChannel(message.GetType()); var pack = new RoutedMessage, TResult>(message, channelName) { MessageId = options?.MessageId ?? Guid.NewGuid().ToString(), - CorrelationId = options?.CorrelationId ?? Guid.NewGuid().ToString() + CorrelationId = options?.CorrelationId ?? Guid.NewGuid().ToString(), + RequestTraceId = context?.TraceIdentifier ?? options.RequestTraceId ?? Guid.NewGuid().ToString("N"), + Authorization = context?.Authorization, }; - return await _dispatcher.SendAsync(pack, cancellationToken); + + metadataSetter?.Invoke(pack.Metadata); + + return _dispatcher.SendAsync(pack, cancellationToken) + .ContinueWith(task => + { + task.WaitAndUnwrapException(); + var result = task.Result; + callback?.Invoke(result); + return result; + }); + } + + private RequestContext GetRequestContext() + { + if (_contextAccessor == null) + { + return null; + } + + var context = _contextAccessor(_serviceAccessor.ServiceProvider); + return context; } } \ No newline at end of file diff --git a/Tests/Euonia.Bus.InMemory.Tests/Startup.cs b/Tests/Euonia.Bus.InMemory.Tests/Startup.cs index f2e32e1..800eb4c 100644 --- a/Tests/Euonia.Bus.InMemory.Tests/Startup.cs +++ b/Tests/Euonia.Bus.InMemory.Tests/Startup.cs @@ -40,7 +40,6 @@ public void ConfigureServices(IServiceCollection services, HostBuilderContext ho }); config.UseInMemory(options => { - options.MessengerReference = MessengerReferenceType.StrongReference; options.MultipleSubscriberInstance = false; }); }); diff --git a/Tests/Euonia.Bus.Tests.Shared/ServiceBusTests.cs b/Tests/Euonia.Bus.Tests.Shared/ServiceBusTests.cs index fac500a..fb6bb90 100644 --- a/Tests/Euonia.Bus.Tests.Shared/ServiceBusTests.cs +++ b/Tests/Euonia.Bus.Tests.Shared/ServiceBusTests.cs @@ -52,7 +52,7 @@ public async Task TestSendCommand_HasReponse_UseSubscribeAttribute() } else { - var result = await _provider.GetService().SendAsync(new FooCreateCommand(), new SendOptions { Channel = "foo.create" }); + var result = await _provider.GetService().SendAsync(new FooCreateCommand(), new SendOptions { Channel = "foo.create" }, null, (int i) => Console.Write(i)); Assert.Equal(1, result); } } @@ -70,4 +70,20 @@ public async Task TestSendCommand_HasReponse_MessageHasResultInherites() Assert.Equal(1, result); } } + + [Fact] + public async Task TestSendCommand_HasReponse_MessageHasResultInherites_NoRecipient() + { + if (_preventRunTests) + { + Assert.True(true); + } + else + { + await Assert.ThrowsAnyAsync(async () => + { + var result = await _provider.GetService().SendAsync(new FooCreateCommand()); + }); + } + } } \ No newline at end of file diff --git a/project.props b/project.props index 70e465e..d2382f5 100644 --- a/project.props +++ b/project.props @@ -1,6 +1,6 @@ - 8.0.1 + 8.0.2 damon Nerosoft Ltd. Euonia