From 4f28c24650391e3713195f9427d6463be7ecf0bf Mon Sep 17 00:00:00 2001 From: sahma19 Date: Wed, 29 Jan 2025 15:37:08 +0100 Subject: [PATCH 1/2] add filter --- .../RandomDelayConfigurationExtensions.cs | 65 ++++++++++++++ src/ProjectOrigin.Vault/Startup.cs | 1 + ...RandomDelayConfigurationExtensionsTests.cs | 84 +++++++++++++++++++ .../RandomDelaySpecificationTests.cs | 47 +++++++++++ 4 files changed, 197 insertions(+) create mode 100644 src/ProjectOrigin.Vault/Extensions/RandomDelayConfigurationExtensions.cs create mode 100644 test/ProjectOrigin.Vault.Tests/Extensions/RandomDelayConfigurationExtensionsTests.cs create mode 100644 test/ProjectOrigin.Vault.Tests/Extensions/RandomDelaySpecificationTests.cs diff --git a/src/ProjectOrigin.Vault/Extensions/RandomDelayConfigurationExtensions.cs b/src/ProjectOrigin.Vault/Extensions/RandomDelayConfigurationExtensions.cs new file mode 100644 index 00000000..389d4b1c --- /dev/null +++ b/src/ProjectOrigin.Vault/Extensions/RandomDelayConfigurationExtensions.cs @@ -0,0 +1,65 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using MassTransit; +using MassTransit.Configuration; + +namespace ProjectOrigin.Vault.Extensions; + +public static class RandomDelayConfigurationExtensions +{ + public static void UseRandomDelay(this IConsumerConfigurator configurator, int minDelayInMilliseconds = 1, + int maxDelayInMilliseconds = 1500) + where TConsumer : class, IConsumer + where TMessage : class + { + configurator.AddPipeSpecification(new RandomDelaySpecification(minDelayInMilliseconds, maxDelayInMilliseconds)); + } +} + +public class RandomDelaySpecification(int minDelayInMilliseconds, int maxDelayInMilliseconds) + : IPipeSpecification> + where TConsumer : class, IConsumer + where TMessage : class +{ + public void Apply(IPipeBuilder> builder) + { + builder.AddFilter(new RandomDelayFilter(minDelayInMilliseconds, maxDelayInMilliseconds)); + } + + public IEnumerable Validate() + { + if (minDelayInMilliseconds < 0) + yield return this.Failure("RandomDelayFilter", "minDelayInMilliseconds cannot be negative"); + + if (maxDelayInMilliseconds <= minDelayInMilliseconds) + yield return this.Failure("RandomDelayFilter", + "maxDelayInMilliseconds must be greater than minDelayInMilliseconds"); + } +} + +public class RandomDelayFilter(int minDelayInMilliseconds, int maxDelayInMilliseconds) : IFilter> + where TConsumer : class +{ + public async Task Send(ConsumerConsumeContext context, + IPipe> next) + { + var delay = Random.Shared.Next(minDelayInMilliseconds, maxDelayInMilliseconds + 1); + + context.GetOrAddPayload(() => new RandomDelayPayload { Delay = delay }); + + await Task.Delay(delay); + await next.Send(context); + } + + public void Probe(ProbeContext context) + { + var scope = context.CreateFilterScope("random-delay"); + scope.Add("description", $"Adds a random delay between {minDelayInMilliseconds}-{maxDelayInMilliseconds} ms to each message."); + } +} + +public class RandomDelayPayload +{ + public int Delay { get; set; } +} diff --git a/src/ProjectOrigin.Vault/Startup.cs b/src/ProjectOrigin.Vault/Startup.cs index 2bd3ed98..a8ff9459 100644 --- a/src/ProjectOrigin.Vault/Startup.cs +++ b/src/ProjectOrigin.Vault/Startup.cs @@ -113,6 +113,7 @@ public void ConfigureServices(IServiceCollection services) { cfg.UseMessageRetry(r => r.Interval(20, TimeSpan.FromSeconds(5)) .Handle()); + cfg.UseRandomDelay(1, 1500); }); o.AddConsumer(); diff --git a/test/ProjectOrigin.Vault.Tests/Extensions/RandomDelayConfigurationExtensionsTests.cs b/test/ProjectOrigin.Vault.Tests/Extensions/RandomDelayConfigurationExtensionsTests.cs new file mode 100644 index 00000000..e18e4d9c --- /dev/null +++ b/test/ProjectOrigin.Vault.Tests/Extensions/RandomDelayConfigurationExtensionsTests.cs @@ -0,0 +1,84 @@ +using System.Linq; +using System.Threading.Tasks; +using MassTransit; +using MassTransit.Internals; +using MassTransit.Testing; +using Microsoft.Extensions.DependencyInjection; +using ProjectOrigin.Vault.Extensions; +using Xunit; + +namespace ProjectOrigin.Vault.Tests.Extensions; + +public class RandomDelayConfigurationExtensionsTests +{ + [Fact] + public async Task Should_Apply_RandomDelay_To_Each_Message() + { + const int minDelayInMilliseconds = 5; + const int maxDelayInMilliseconds = 1000; + await using var provider = new ServiceCollection() + .AddMassTransitTestHarness(cfg => + { + cfg.AddConsumer(consumerCfg => + { + consumerCfg.UseRandomDelay(minDelayInMilliseconds, maxDelayInMilliseconds); + }); + + cfg.UsingInMemory((context, inMemoryQueue) => { inMemoryQueue.ConfigureEndpoints(context); }); + }) + .BuildServiceProvider(true); + + var harness = provider.GetRequiredService(); + await harness.Start(); + + try + { + for (var i = 0; i < 5; i++) + { + await harness.Bus.Publish(new TestMessage()); + } + + Assert.True(await harness.Consumed.Any(), + "Expected the test harness to consume TestMessage, but none was found."); + + var consumerHarness = harness.GetConsumerHarness(); + + var consumedContexts = await consumerHarness + .Consumed + .SelectAsync() + .Take(5) + .ToListAsync(); + + Assert.Equal(5, consumedContexts.Count); + + var delays = consumedContexts + .Select(ctx => + ctx.Context.TryGetPayload(out var payload) + ? payload.Delay + : 0) + .ToList(); + + Assert.Equal(5, delays.Count); + + Assert.All(delays, d => Assert.InRange(d, minDelayInMilliseconds, maxDelayInMilliseconds)); + + Assert.Equal(5, delays.Distinct().Count()); + } + finally + { + await harness.Stop(); + } + } +} + +public record TestMessage; + +public class TestConsumer : IConsumer +{ + public Task Consume(ConsumeContext context) + { + return Task.CompletedTask; + } +} + + diff --git a/test/ProjectOrigin.Vault.Tests/Extensions/RandomDelaySpecificationTests.cs b/test/ProjectOrigin.Vault.Tests/Extensions/RandomDelaySpecificationTests.cs new file mode 100644 index 00000000..13319e1c --- /dev/null +++ b/test/ProjectOrigin.Vault.Tests/Extensions/RandomDelaySpecificationTests.cs @@ -0,0 +1,47 @@ +using System.Threading.Tasks; +using MassTransit; +using ProjectOrigin.Vault.Extensions; +using Xunit; + +namespace ProjectOrigin.Vault.Tests.Extensions +{ + public class RandomDelaySpecificationTests + { + [Fact] + public void Validate_ShouldReturnErrorIfMinDelayIsNegative() + { + var specification = new RandomDelaySpecification(-1, 100); + + var error = Assert.Single(specification.Validate()); + + Assert.Equal("RandomDelayFilter", error.Key); + Assert.Equal("minDelayInMilliseconds cannot be negative", error.Message); + } + + [Fact] + public void Validate_ShouldReturnErrorIfMaxDelayIsLessThanOrEqualToMinDelay() + { + var specification = new RandomDelaySpecification(5, 5); + + var error = Assert.Single(specification.Validate()); + + Assert.Equal("RandomDelayFilter", error.Key); + Assert.Equal("maxDelayInMilliseconds must be greater than minDelayInMilliseconds", error.Message); + } + + [Fact] + public void Validate_ShouldReturnNoErrorsForValidRange() + { + var specification = new RandomDelaySpecification(0, 10); + + Assert.Empty(specification.Validate()); + } + + private class TestMessage; + + private class TestConsumer : IConsumer + { + public Task Consume(ConsumeContext context) => Task.CompletedTask; + } + } +} From a2455f5a9dbe2feac89853ba2a3b59332b9f7c75 Mon Sep 17 00:00:00 2001 From: sahma19 Date: Wed, 29 Jan 2025 18:45:37 +0100 Subject: [PATCH 2/2] remove default value --- src/ProjectOrigin.Vault/Startup.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ProjectOrigin.Vault/Startup.cs b/src/ProjectOrigin.Vault/Startup.cs index a8ff9459..6f8f9023 100644 --- a/src/ProjectOrigin.Vault/Startup.cs +++ b/src/ProjectOrigin.Vault/Startup.cs @@ -113,7 +113,7 @@ public void ConfigureServices(IServiceCollection services) { cfg.UseMessageRetry(r => r.Interval(20, TimeSpan.FromSeconds(5)) .Handle()); - cfg.UseRandomDelay(1, 1500); + cfg.UseRandomDelay(); }); o.AddConsumer();