From 181e827574c57fb9af3c67d089786c118e546d66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A9rald=20Barr=C3=A9?= Date: Mon, 11 Mar 2024 17:49:56 -0400 Subject: [PATCH] [IDP-929] Add support for the pull delivery model (#42) * Add support for the pull model * Add release method * Add new methods * Added test validaiton for reject & release cloud events * Use type alias * Initial refactor and create pull handlers * Refactor according to feedback * Readme changes * Readme changes and ci changes * Upgrade project to .NET 8 * Upgrade docker .net version to 8 * Remove redundant adduser command from dockerfile * Refactor according to feedback * Refactor readme for more clarity regarding topic support * Undo appsettings changes * Readme update --------- Co-authored-by: He Qian Wang --- .github/workflows/ci.yml | 6 +- README.md | 83 +++++++- global.json | 2 +- src/.editorconfig | 1 + src/Directory.Build.props | 4 +- .../CustomWebApplicationFactory.cs | 47 ----- .../EmulatorValidationTests.cs | 14 +- .../FactoryClientBuilder.cs | 2 +- .../PullModelEventGridClientTests.cs | 125 ++++++++++++ ...ApplicationLifetimeLoggingHostedService.cs | 2 +- .../Configuration/TopicOptions.cs | 41 +++- src/EventGridEmulator/Dockerfile | 6 +- .../EventGridEmulator.csproj | 2 +- .../BaseEventHttpContextHandler.cs | 24 ++- .../CloudEventHttpContextHandler.cs | 5 +- .../EventGridEventHttpContextHandler.cs | 5 +- ...tHandler.cs => EventGridPublishHandler.cs} | 19 +- .../PullQueueHttpContextHandler.cs | 182 ++++++++++++++++++ .../EventHandling/TopicSubscribers.cs | 90 +++++++++ .../SubscriberPolicyHttpMessageHandler.cs | 3 +- src/EventGridEmulator/Program.cs | 15 +- src/Samples/Publisher/Publisher.csproj | 2 +- src/Samples/Subscriber/Subscriber.csproj | 2 +- src/global.json | 2 +- 24 files changed, 583 insertions(+), 101 deletions(-) delete mode 100644 src/EventGridEmulator.Tests/CustomWebApplicationFactory.cs create mode 100644 src/EventGridEmulator.Tests/PullModelEventGridClientTests.cs rename src/EventGridEmulator/EventHandling/{CompositeEventHttpContextHandler.cs => EventGridPublishHandler.cs} (57%) create mode 100644 src/EventGridEmulator/EventHandling/PullQueueHttpContextHandler.cs create mode 100644 src/EventGridEmulator/EventHandling/TopicSubscribers.cs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d3d1979..e50cb1e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,7 +2,7 @@ name: CI on: pull_request: - branches: ["main", "master"] + branches: ["main"] paths-ignore: ["*.md"] jobs: @@ -13,12 +13,14 @@ jobs: - name: Checkout repository uses: actions/checkout@v4 + - uses: actions/setup-dotnet@v4 + - name: Docker metadata id: meta uses: docker/metadata-action@8e5442c4ef9f78752691e2d8f8d19755c6f78e81 # v5 with: images: workleap/eventgridemulator - + - name: Docker build uses: docker/build-push-action@4a13e500e55cf31b7a5d59a38ab2040ab0f42f56 # v5 with: diff --git a/README.md b/README.md index 74a3ef1..a649f41 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,16 @@ # Workleap Azure Event Grid Emulator -This is an open source emulator for [Azure Event Grid](https://learn.microsoft.com/en-us/azure/event-grid/overview) that supports only the [push delivery](https://learn.microsoft.com/en-us/azure/event-grid/push-delivery-overview) model. Based on ASP.NET Core, this emulator provides a cross-platform experience for developers wanting to try Azure Event Grid easily in a local environment without having to deploy to Azure. +This is an open source emulator for [Azure Event Grid](https://learn.microsoft.com/en-us/azure/event-grid/overview) that supports both the [push delivery](https://learn.microsoft.com/en-us/azure/event-grid/push-delivery-overview) and the [pull delivery](https://learn.microsoft.com/en-us/azure/event-grid/pull-delivery-overview) models. Based on ASP.NET Core, this emulator provides a cross-platform experience for developers wanting to try Azure Event Grid easily in a local environment without having to deploy to Azure. + +The emulator supports two Events delivery formats: Push and Pull. This project is not affiliated, associated, authorized, endorsed by, or in any way officially connected with Microsoft. ## Features - -- Support for multiple Event Grid topics by sending events to `http://127.0.0.1:6500//api/events`. +- Support publishing events to Custom Topics (EventGridEvents/CloudEvents) and Namespace Topics (only CloudEvents) +- Support Push & Pull Delivery Models - Push delivery to configured webhooks defined in the emulator configuration file (more details below). +- Pull delivery API client commands supported in the emulator (more details below). - Simple but durable message delivery and retry based on the [Azure Event Grid documentation](https://learn.microsoft.com/en-us/azure/event-grid/delivery-and-retry). - Ability to add and remove topics and webhooks at runtime without having to restart the emulator. - As the emulator is built on top of ASP.NET Core, you can follow this [Microsoft documentation](https://learn.microsoft.com/en-us/aspnet/core/security/docker-compose-https) to run on HTTPS. @@ -21,7 +24,7 @@ You must have [Docker](https://www.docker.com/get-started/) installed. This Even The first step is to **create a configuration file** for the emulator to know the topics, and for each topic, the webhooks to call when an event is published. Create a configuration file named `appsettings.json` somewhere on your computer, for instance: `C:\eventgridemulator\appsettings.json`. -It should look like this: +### Push Delivery configuration ```json { @@ -31,13 +34,26 @@ It should look like this: "http://host.docker.internal:7221/eventgrid" ], "topic2": [ - "https://mydockercontainer:5122/eventgrid/domainevents", + "https://mydockercontainer:5122/eventgrid/domainevents" + ], + } +} +``` +In the example for push delivery, we have two topics, `topic1` and `topic2`. If an event is sent to the emulator on this URL `http://127.0.0.1:6500/topic1/api/events`, the emulator would forward the events to `https://host.docker.internal:5122/my-webhook` and `http://host.docker.internal:7221/eventgrid` on your host machine. As the emulator runs on Docker, you must use the `host.docker.internal` (emulator must make an http ) host whenever you want to call a webhook on your host machine. + +### Pull delivery configuration +```json +{ + "Topics": { + "topicfoobar": [ + "pull://foo-subscription", + "pull://bar-subscription" ] } } ``` -In this example, we have two topics, `topic1` and `topic2`. If an event is sent to the emulator on this URL `http://127.0.0.1:6500/topic1/api/events`, the emulator would forward the events to `https://host.docker.internal:5122/my-webhook` and `http://host.docker.internal:7221/eventgrid` on your host machine. As the emulator runs on Docker, you must use the `host.docker.internal` host whenever you want to call a webhook on your host machine. +In the example for pull delivery, we have a topics, `topicfoobar`. If an event is sent to the emulator on this URL `http://127.0.0.1:6500/topics/topicfoobar:publish`, the emulator would make the events available to pull at `pull://foo-subscription` and `pull://bar-subscription` on your host machine. **Run the Event Grid emulator with docker run** @@ -67,9 +83,9 @@ services: From the directory in which the file resides, run the `docker compose up` command. -**Sending and receiving events** +## Publish and Receive Events using Push Delivery for Custom Topic -Now that the emulator is running, you can send events to it and receive them in your webhooks. If you're using C#, follow [these steps from the Microsoft documentation](https://learn.microsoft.com/en-us/dotnet/api/overview/azure/messaging.eventgrid-readme?view=azure-dotnet): +Push Delivery: Now that the emulator is running, you can send both EventGridEvents and CloudEvents to the endpoint and receive them in your webhooks. If you're using C#, follow [these steps from the Microsoft documentation](https://learn.microsoft.com/en-us/dotnet/api/overview/azure/messaging.eventgrid-readme?view=azure-dotnet): ```csharp // Change "my-topic" to the name of your topic. @@ -78,6 +94,57 @@ Now that the emulator is running, you can send events to it and receive them in var client = new EventGridPublisherClient( new Uri("http://127.0.0.1:6500/my-topic/api/events"), new AzureKeyCredential("fakeAccessKey")); + +// Create and send a CloudEvent to EventGrid +var cloudEvent = new new CloudEvent("", "", data); +await client.SendEventAsync(cloudEvent); + +// Create and send an EventGridEvent to EventGrid +var eventGridEvent = new EventGridEvent( + subject: "", + eventType: "", + dataVersion: "", + data: data); +await client.SendEventAsync(eventGridEvent); + +// An url with the correct url would need to be exposed to process the push delivery events +[HttpPost("")] +public IActionResult Post([FromBody]EventGridEvent[] value) +{ +... +} + +## Publish and Receive Events with Pull Delivery Model for Namespace Topic + +``` +Pull Delivery: Once the emulator is running, we can send CloudEvents to the endpoint and pull/acknowledge events with api calls. + +We support the following Queue delivery APIs: + +- `PublishCloudEventsAsync`: publishes an event from the queue. +- `ReceiveCloudEventsAsync`: receives an event from the queue. +- `AcknowledgeCloudEventsAsync`: acknowledges that the received event is processed successfully and delete from the queue. +- `ReleaseCloudEventsAsync`: releases the received event and requeues the event. +- `RejectCloudEventsAsync`: rejects the received event and delete from the queue. + + +```csharp +// The authentication mechanism is actually ignored by the emulator. +// If you must provide a TokenCredential instead of an access key, the emulator must be running on HTTPS. +var client = new EventGridClient( + new Uri("http://127.0.0.1:6500"), + new AzureKeyCredential("fakeAccessKey")); + +// Example of how to can publish an event with EventGridClient +client.PublishCloudEventsAsync(topicName, new CloudEvent("", "", data)); + +// Example of how to receive an event with EventGridClient +var events = await client.ReceiveCloudEventsAsync(topicName, eventSubscriptionName); + +// Example of how to can acknowledge an event from queue +// Reject/Release would be the same format except with RejectCloudEventsAsync/RejectOptions and ReleaseCloudEventsAsync/ReleaseOptions +await client.AcknowledgeCloudEventsAsync(topicName, eventSubscriptionName, new AcknowledgeOptions([])); + ``` ## Additional information diff --git a/global.json b/global.json index a1e3b8e..e7bb0ec 100644 --- a/global.json +++ b/global.json @@ -1,6 +1,6 @@ { "sdk": { - "version": "7.0.406", + "version": "8.0.201", "rollForward": "latestMinor", "allowPrerelease": false } diff --git a/src/.editorconfig b/src/.editorconfig index 1ba6aa5..7bb72b3 100644 --- a/src/.editorconfig +++ b/src/.editorconfig @@ -157,6 +157,7 @@ dotnet_diagnostic.CA1835.severity = warning dotnet_diagnostic.CA1836.severity = warning dotnet_diagnostic.CA1837.severity = warning dotnet_diagnostic.CA1838.severity = none +dotnet_diagnostic.CA1848.severity = none dotnet_diagnostic.CA1900.severity = warning dotnet_diagnostic.CA1901.severity = warning dotnet_diagnostic.CA1903.severity = warning diff --git a/src/Directory.Build.props b/src/Directory.Build.props index 43aac11..50bfebb 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -1,13 +1,13 @@ - net7.0 + net8.0 Copyright © Workleap. $([System.DateTime]::UtcNow.ToString(yyyy)) Workleap Workleap https://github.com/gsoft-inc/wl-eventgrid-emulator Apache-2.0 true - 11 + latest enable enable latest-All diff --git a/src/EventGridEmulator.Tests/CustomWebApplicationFactory.cs b/src/EventGridEmulator.Tests/CustomWebApplicationFactory.cs deleted file mode 100644 index 1727a74..0000000 --- a/src/EventGridEmulator.Tests/CustomWebApplicationFactory.cs +++ /dev/null @@ -1,47 +0,0 @@ -using EventGridEmulator.Configuration; -using EventGridEmulator.Network; -using Microsoft.AspNetCore.Mvc.Testing; -using Microsoft.AspNetCore.TestHost; - -namespace EventGridEmulator.Tests; - -public class CustomWebApplicationFactory : WebApplicationFactory -{ - private readonly Action _configureServices; - - public CustomWebApplicationFactory(Action configureServices) - { - this._configureServices = configureServices; - } - - protected override void ConfigureWebHost(IWebHostBuilder builder) - { - builder.ConfigureTestServices(services => - { - this._configureServices(services); - - OptionsServiceCollectionExtensions.Configure(services, options => - { - options.Topics = new Dictionary(StringComparer.OrdinalIgnoreCase) - { - ["orders"] = new[] - { - null!, - string.Empty, - "invalid_url", - "https://localhost/orders-webhook1", - "https://localhost/orders-webhook2", - }, - ["customers"] = new[] - { - "https://localhost/customers-webhook1", - "https://localhost/customers-webhook1", - }, - }; - }); - - var handler = new TestHandler(_ => _ = _); - HttpClientFactoryServiceCollectionExtensions.AddHttpClient(services, SubscriberConstants.HttpClientName).ConfigurePrimaryHttpMessageHandler(() => handler); - }); - } -} \ No newline at end of file diff --git a/src/EventGridEmulator.Tests/EmulatorValidationTests.cs b/src/EventGridEmulator.Tests/EmulatorValidationTests.cs index 3e96a0c..ea065ed 100644 --- a/src/EventGridEmulator.Tests/EmulatorValidationTests.cs +++ b/src/EventGridEmulator.Tests/EmulatorValidationTests.cs @@ -36,12 +36,12 @@ public async Task ValidatePublishAndSubscribeRoundTripForEventGridEvent() // Create and send an event to EventGrid var eventGridEvent = new EventGridEvent( - subject: "foo", - eventType: "bar", - dataVersion: "1.0", + subject: "foo", + eventType: "bar", + dataVersion: "1.0", data: new DataModel(some: "data")); var response = await publisher.SendEventAsync(eventGridEvent); - + // Assert that the message was successfully sent Assert.Equal(200, response.Status); @@ -52,7 +52,7 @@ public async Task ValidatePublishAndSubscribeRoundTripForEventGridEvent() Assert.Equal("data", result?.Some); Assert.Equal($"{SubscriberConstants.DefaultTopicValue}{this.ExpectedTopic}", receivedTopic); } - + [Fact] public async Task ValidatePublishAndSubscribeRoundTripForCloudEvent() { @@ -73,7 +73,7 @@ public async Task ValidatePublishAndSubscribeRoundTripForCloudEvent() // Create and send an event to EventGrid var cloudEvent = new CloudEvent("foo", "bar", new DataModel(some: "data")); var response = await publisher.SendEventAsync(cloudEvent); - + // Assert that the message was successfully sent Assert.Equal(200, response.Status); @@ -85,7 +85,7 @@ public async Task ValidatePublishAndSubscribeRoundTripForCloudEvent() Assert.Equal($"{SubscriberConstants.DefaultTopicValue}{this.ExpectedTopic}", receivedTopic); } - private class DataModel + private sealed class DataModel { public DataModel(string some) { diff --git a/src/EventGridEmulator.Tests/FactoryClientBuilder.cs b/src/EventGridEmulator.Tests/FactoryClientBuilder.cs index 87a6306..f8fc6e3 100644 --- a/src/EventGridEmulator.Tests/FactoryClientBuilder.cs +++ b/src/EventGridEmulator.Tests/FactoryClientBuilder.cs @@ -29,7 +29,7 @@ public HttpClient Build() return factory.CreateClient(); } - private class Topic + private sealed class Topic { public Topic(string name, string url) { diff --git a/src/EventGridEmulator.Tests/PullModelEventGridClientTests.cs b/src/EventGridEmulator.Tests/PullModelEventGridClientTests.cs new file mode 100644 index 0000000..c3e5a5b --- /dev/null +++ b/src/EventGridEmulator.Tests/PullModelEventGridClientTests.cs @@ -0,0 +1,125 @@ +using Azure.Core.Pipeline; +using Azure; +using EventGridEmulator.Configuration; +using Microsoft.AspNetCore.Mvc.Testing; +using Microsoft.AspNetCore.TestHost; +using Azure.Messaging; +using Azure.Messaging.EventGrid.Namespaces; + +namespace EventGridEmulator.Tests; + +public sealed class PullModelEventGridClientTests +{ + [Fact] + public async Task CanSendReceiveEvents() + { + var topicName = "customers"; + var eventSubscriptionName = "CustomSubscription"; + + var client = await CreateTestEventGridClient(topicName, eventSubscriptionName); + + var data = new EventData("CustomId"); + _ = await client.PublishCloudEventsAsync(topicName, [new CloudEvent("source", "type", data)]); + + var events = await client.ReceiveCloudEventsAsync(topicName, eventSubscriptionName); + var ev = Assert.Single(events.Value.Value); + Assert.Equal(("source", "type"), (ev.Event.Source, ev.Event.Type)); + var deserializedData = ev.Event.Data!.ToObjectFromJson(); + Assert.Equal(data, deserializedData); + } + + [Fact] + public async Task CanSendReceiveAcknowledgeEvents() + { + var topicName = "customers"; + var eventSubscriptionName = "CustomSubscription"; + + var client = await CreateTestEventGridClient(topicName, eventSubscriptionName); + + var data = new EventData("CustomId"); + _ = await client.PublishCloudEventsAsync(topicName, [new CloudEvent("source", "type", data)]); + + var events = await client.ReceiveCloudEventsAsync(topicName, eventSubscriptionName); + var ev = Assert.Single(events.Value.Value); + + var acknowledgeResult = await client.AcknowledgeCloudEventsAsync(topicName, eventSubscriptionName, new AcknowledgeOptions([ev.BrokerProperties.LockToken])); + Assert.Single(acknowledgeResult.Value.SucceededLockTokens); + + // Assert queue is emtpy + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1)); + await Assert.ThrowsAsync(() => client.ReceiveCloudEventsAsync(topicName, eventSubscriptionName, cancellationToken: cts.Token)); + } + + [Fact] + public async Task CanSendReceiveReleaseEventsWithInvalidLockTokens() + { + var topicName = "customers"; + var eventSubscriptionName = "CustomSubscription"; + + var client = await CreateTestEventGridClient(topicName, eventSubscriptionName); + + var data = new EventData("CustomId"); + _ = await client.PublishCloudEventsAsync(topicName, [new CloudEvent("source", "type", data)]); + + var events = await client.ReceiveCloudEventsAsync(topicName, eventSubscriptionName); + var ev = Assert.Single(events.Value.Value); + + var releaseResult = await client.ReleaseCloudEventsAsync(topicName, eventSubscriptionName, new ReleaseOptions([ev.BrokerProperties.LockToken, "abcd", "efgh"])); + Assert.Single(releaseResult.Value.SucceededLockTokens); + Assert.Equal(2, releaseResult.Value.FailedLockTokens.Count); + + events = await client.ReceiveCloudEventsAsync(topicName, eventSubscriptionName); + Assert.Single(events.Value.Value); + } + + [Fact] + public async Task CanSendReceiveRejectvents() + { + var topicName = "customers"; + var eventSubscriptionName = "CustomSubscription"; + + var client = await CreateTestEventGridClient(topicName, eventSubscriptionName); + + var data = new EventData("CustomId"); + _ = await client.PublishCloudEventsAsync(topicName, [new CloudEvent("source", "type", data)]); + + var events = await client.ReceiveCloudEventsAsync(topicName, eventSubscriptionName); + var ev = Assert.Single(events.Value.Value); + + var rejectResult = await client.RejectCloudEventsAsync(topicName, eventSubscriptionName, new RejectOptions([ev.BrokerProperties.LockToken])); + Assert.Single(rejectResult.Value.SucceededLockTokens); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1)); + await Assert.ThrowsAsync(() => client.ReceiveCloudEventsAsync(topicName, eventSubscriptionName, cancellationToken: cts.Token)); + } + + private sealed record EventData(string Id); + + private sealed class CustomWebApplicationFactory(Action configureServices) : WebApplicationFactory + { + protected override void ConfigureWebHost(IWebHostBuilder builder) + => builder.ConfigureTestServices(configureServices); + } + + private static async Task CreateTestEventGridClient(string topicName, string eventSubscriptionName) + { + var factory = new CustomWebApplicationFactory(options => + { + options.Configure(topicOptions => + { + topicOptions.Topics = new Dictionary(StringComparer.OrdinalIgnoreCase) + { + [topicName] = [$"pull://{eventSubscriptionName}"], + }; + }); + }); + + var httpClientHandler = factory.Server.CreateHandler(); + + var client = new EventGridClient(new Uri("https://localhost"), new AzureKeyCredential("noop"), new EventGridClientOptions + { + Transport = new HttpClientTransport(httpClientHandler), + }); + return client; + } +} diff --git a/src/EventGridEmulator/Configuration/ApplicationLifetimeLoggingHostedService.cs b/src/EventGridEmulator/Configuration/ApplicationLifetimeLoggingHostedService.cs index 3158f2b..bd38fdb 100644 --- a/src/EventGridEmulator/Configuration/ApplicationLifetimeLoggingHostedService.cs +++ b/src/EventGridEmulator/Configuration/ApplicationLifetimeLoggingHostedService.cs @@ -45,7 +45,7 @@ private static void OnApplicationStarted(object? state) var addressesFeature = self._server.Features.Get(); if (addressesFeature != null) { - var addresses = string.Join(", ", addressesFeature.Addresses.Select(x => x + CompositeEventHttpContextHandler.Route)); + var addresses = string.Join(", ", addressesFeature.Addresses.Select(x => x + EventGridPublishHandler.CustomTopicRoute)); self._logger.LogInformation("Now listening for events on: {Addresses}", addresses); } diff --git a/src/EventGridEmulator/Configuration/TopicOptions.cs b/src/EventGridEmulator/Configuration/TopicOptions.cs index 6ada40f..536ae0a 100644 --- a/src/EventGridEmulator/Configuration/TopicOptions.cs +++ b/src/EventGridEmulator/Configuration/TopicOptions.cs @@ -1,4 +1,4 @@ -using System.Diagnostics.CodeAnalysis; +using System.Diagnostics.CodeAnalysis; namespace EventGridEmulator.Configuration; @@ -23,6 +23,39 @@ public TopicOptions(TopicOptions original) public Dictionary Topics { get; set; } = new Dictionary(StringComparer.OrdinalIgnoreCase); + public IEnumerable GetPushSubscribers(string topic) + { + if (!this.Topics.TryGetValue(topic, out var subscribers)) + { + yield break; + } + + foreach (var value in subscribers) + { + if (Uri.TryCreate(value, UriKind.Absolute, out var url) && (url.Scheme == Uri.UriSchemeHttp || url.Scheme == Uri.UriSchemeHttps)) + { + yield return new PushSubscriber(url.OriginalString); + } + } + } + + public IEnumerable GetPullSubscribers(string topic) + { + if (!this.Topics.TryGetValue(topic, out var subscribers)) + { + yield break; + } + + foreach (var value in subscribers) + { + // pull://subscriptionName + if (Uri.TryCreate(value, UriKind.Absolute, out var url) && url.Scheme == "pull") + { + yield return new PullSubscriber(url.Host); + } + } + } + public override bool Equals(object? obj) { if (ReferenceEquals(this, obj)) @@ -80,4 +113,8 @@ public override int GetHashCode() return hash; } } -} \ No newline at end of file +} + +internal sealed record PushSubscriber(string Uri); + +internal sealed record PullSubscriber(string SubscriptionName); \ No newline at end of file diff --git a/src/EventGridEmulator/Dockerfile b/src/EventGridEmulator/Dockerfile index 5df7a20..b0a45c1 100644 --- a/src/EventGridEmulator/Dockerfile +++ b/src/EventGridEmulator/Dockerfile @@ -1,18 +1,18 @@ -FROM mcr.microsoft.com/dotnet/aspnet:7.0-alpine AS base +FROM mcr.microsoft.com/dotnet/aspnet:8.0-alpine AS base WORKDIR /app EXPOSE 6500 ENV DOTNET_CLI_TELEMETRY_OPTOUT=true ENV DOTNET_ENVIRONMENT=Production ENV ASPNETCORE_URLS=http://+:6500 -FROM mcr.microsoft.com/dotnet/sdk:7.0-alpine AS publish +FROM mcr.microsoft.com/dotnet/sdk:8.0-alpine AS publish WORKDIR /src COPY . . WORKDIR "/src/EventGridEmulator" RUN dotnet publish "EventGridEmulator.csproj" -c Release -r linux-musl-x64 --no-self-contained --nologo -p:RunAnalyzers=false -o /app/publish FROM base AS final -RUN adduser --disabled-password --home /app --gecos '' app && chown -R app /app +RUN chown -R app /app USER app WORKDIR /app COPY --from=publish /app/publish . diff --git a/src/EventGridEmulator/EventGridEmulator.csproj b/src/EventGridEmulator/EventGridEmulator.csproj index decb866..e2f5371 100644 --- a/src/EventGridEmulator/EventGridEmulator.csproj +++ b/src/EventGridEmulator/EventGridEmulator.csproj @@ -4,7 +4,7 @@ - + diff --git a/src/EventGridEmulator/EventHandling/BaseEventHttpContextHandler.cs b/src/EventGridEmulator/EventHandling/BaseEventHttpContextHandler.cs index b491561..25e578b 100644 --- a/src/EventGridEmulator/EventHandling/BaseEventHttpContextHandler.cs +++ b/src/EventGridEmulator/EventHandling/BaseEventHttpContextHandler.cs @@ -1,4 +1,4 @@ -using EventGridEmulator.Configuration; +using EventGridEmulator.Configuration; using EventGridEmulator.Network; using Microsoft.Extensions.Options; @@ -9,17 +9,20 @@ internal abstract class BaseEventHttpContextHandler private readonly HttpClient _httpClient; private readonly ISubscriberCancellationTokenRegistry _cancellationTokenRegistry; private readonly IOptionsMonitor _options; + private readonly TopicSubscribers _eventQueue; private readonly ILogger _logger; protected BaseEventHttpContextHandler( IHttpClientFactory httpClientFactory, ISubscriberCancellationTokenRegistry cancellationTokenRegistry, IOptionsMonitor options, + TopicSubscribers eventQueue, ILogger logger) { this._httpClient = httpClientFactory.CreateClient(SubscriberConstants.HttpClientName); this._cancellationTokenRegistry = cancellationTokenRegistry; this._options = options; + this._eventQueue = eventQueue; this._logger = logger; } @@ -31,22 +34,23 @@ public async Task HandleAsync(HttpContext context, string topic) private async Task HandleInternalAsync(HttpContext context, string topic) { - if (!this._options.CurrentValue.Topics.TryGetValue(topic, out var subscribers)) - { - return Results.Ok(); - } - var events = await EventsSerializer.DeserializeEventsAsync(context); if (events == null) { return Results.BadRequest(); } - foreach (var subscriber in subscribers) + var pushSubscriber = this._options.CurrentValue.GetPushSubscribers(topic); + foreach (var subscriber in pushSubscriber) { - var cancellationToken = this._cancellationTokenRegistry.Get(topic, subscriber); + var cancellationToken = this._cancellationTokenRegistry.Get(topic, subscriber.Uri); this.EnhanceEventData(events, topic); - _ = this.SendEventsToSubscriberFireAndForget(topic, subscriber, events, cancellationToken); + _ = this.SendEventsToSubscriberFireAndForget(topic, subscriber.Uri, events, cancellationToken); + } + + foreach (var subscriber in this._options.CurrentValue.GetPullSubscribers(topic)) + { + this._eventQueue.AddEvent(topic, subscriber.SubscriptionName, events); } return Results.Ok(); @@ -78,7 +82,7 @@ private async Task SendEventsToSubscriberFireAndForget(string topic, string subs info.LogRequestFailed(this._logger, ex); } } - + protected virtual void EnhanceEventData(IEnumerable events, string topicName) { } diff --git a/src/EventGridEmulator/EventHandling/CloudEventHttpContextHandler.cs b/src/EventGridEmulator/EventHandling/CloudEventHttpContextHandler.cs index bb1c2c9..44bef91 100644 --- a/src/EventGridEmulator/EventHandling/CloudEventHttpContextHandler.cs +++ b/src/EventGridEmulator/EventHandling/CloudEventHttpContextHandler.cs @@ -1,4 +1,4 @@ -using Azure.Messaging; +using Azure.Messaging; using EventGridEmulator.Configuration; using EventGridEmulator.Network; using Microsoft.Extensions.Options; @@ -11,8 +11,9 @@ public CloudEventHttpContextHandler( IHttpClientFactory httpClientFactory, ISubscriberCancellationTokenRegistry cancellationTokenRegistry, IOptionsMonitor options, + TopicSubscribers queue, ILogger logger) - : base(httpClientFactory, cancellationTokenRegistry, options, logger) + : base(httpClientFactory, cancellationTokenRegistry, options, queue, logger) { } diff --git a/src/EventGridEmulator/EventHandling/EventGridEventHttpContextHandler.cs b/src/EventGridEmulator/EventHandling/EventGridEventHttpContextHandler.cs index 9521540..70e59ee 100644 --- a/src/EventGridEmulator/EventHandling/EventGridEventHttpContextHandler.cs +++ b/src/EventGridEmulator/EventHandling/EventGridEventHttpContextHandler.cs @@ -1,4 +1,4 @@ -using Azure.Messaging.EventGrid; +using Azure.Messaging.EventGrid; using EventGridEmulator.Configuration; using EventGridEmulator.Network; using Microsoft.Extensions.Options; @@ -11,8 +11,9 @@ public EventGridEventHttpContextHandler( IHttpClientFactory httpClientFactory, ISubscriberCancellationTokenRegistry cancellationTokenRegistry, IOptionsMonitor options, + TopicSubscribers queue, ILogger logger) - : base(httpClientFactory, cancellationTokenRegistry, options, logger) + : base(httpClientFactory, cancellationTokenRegistry, options, queue, logger) { } diff --git a/src/EventGridEmulator/EventHandling/CompositeEventHttpContextHandler.cs b/src/EventGridEmulator/EventHandling/EventGridPublishHandler.cs similarity index 57% rename from src/EventGridEmulator/EventHandling/CompositeEventHttpContextHandler.cs rename to src/EventGridEmulator/EventHandling/EventGridPublishHandler.cs index 816febe..39f7b5b 100644 --- a/src/EventGridEmulator/EventHandling/CompositeEventHttpContextHandler.cs +++ b/src/EventGridEmulator/EventHandling/EventGridPublishHandler.cs @@ -3,10 +3,13 @@ namespace EventGridEmulator.EventHandling; -internal sealed class CompositeEventHttpContextHandler +internal sealed class EventGridPublishHandler { [StringSyntax("Route")] - public const string Route = "/{topic}/api/events"; + public const string CustomTopicRoute = "/{topic}/api/events"; + + [StringSyntax("Route")] + public const string NamespaceTopicRoute = "/topics/{topic}:publish"; private const string CloudEventContentType = "application/cloudevents-batch+json; charset=utf-8"; private const string EventGridEventContentType = "application/json"; @@ -14,17 +17,23 @@ internal sealed class CompositeEventHttpContextHandler private readonly IEventGridEventHttpContextHandler _eventGridEventHttpContextHandler; private readonly ICloudEventHttpContextHandler _cloudEventHttpContextHandler; - public CompositeEventHttpContextHandler(IEventGridEventHttpContextHandler eventGridEventHttpContextHandler, ICloudEventHttpContextHandler cloudEventHttpContextHandler) + public EventGridPublishHandler(IEventGridEventHttpContextHandler eventGridEventHttpContextHandler, ICloudEventHttpContextHandler cloudEventHttpContextHandler) { this._eventGridEventHttpContextHandler = eventGridEventHttpContextHandler; this._cloudEventHttpContextHandler = cloudEventHttpContextHandler; } - public static async Task HandleAsync(HttpContext context, [FromRoute] string topic, [FromServices] CompositeEventHttpContextHandler handler) + public static async Task HandleCustomTopicEventAsync(HttpContext context, [FromRoute] string topic, [FromServices] EventGridPublishHandler handler) { await handler.HandleAsync(context, topic); } - + + public static async Task HandleNamespaceTopicEventAsync(HttpContext context, [FromRoute] string topic, [FromServices] EventGridPublishHandler handler) + { + await handler.HandleAsync(context, topic); + return Results.Ok(new object()); + } + private Task HandleAsync(HttpContext context, string topic) => context.Request.ContentType switch { EventGridEventContentType => this._eventGridEventHttpContextHandler.HandleAsync(context, topic), diff --git a/src/EventGridEmulator/EventHandling/PullQueueHttpContextHandler.cs b/src/EventGridEmulator/EventHandling/PullQueueHttpContextHandler.cs new file mode 100644 index 0000000..8ede581 --- /dev/null +++ b/src/EventGridEmulator/EventHandling/PullQueueHttpContextHandler.cs @@ -0,0 +1,182 @@ +using System.Diagnostics.CodeAnalysis; +using System.Text.Json.Serialization; +using Azure.Messaging; +using Microsoft.AspNetCore.Mvc; + +namespace EventGridEmulator.EventHandling; + +internal sealed class PullQueueHttpContextHandler +{ + [StringSyntax("Route")] + public const string ReceiveRoute = "topics/{topic}/eventsubscriptions/{subscription}:receive"; + + [StringSyntax("Route")] + public const string AcknowledgeRoute = "topics/{topic}/eventsubscriptions/{subscription}:acknowledge"; + + [StringSyntax("Route")] + public const string ReleaseRoute = "topics/{topic}/eventsubscriptions/{subscription}:release"; + + [StringSyntax("Route")] + public const string RejectRoute = "topics/{topic}/eventsubscriptions/{subscription}:reject"; + + public static async Task HandleReceiveAsync([FromRoute] string topic, [FromRoute] string subscription, [FromServices] TopicSubscribers topicSubscribers, CancellationToken cancellationToken) + { + var result = await topicSubscribers.GetEventAsync(topic, subscription, cancellationToken); + var receiveResults = new ReceiveResults + { + value = new[] + { + new EventObject + { + BrokerProperties = new BrokerProperties + { + DeliveryCount = 1, // currently only support receiving one event at a time + LockToken = result.LockToken, + }, + Event = result.Item, + }, + }, + }; + return Results.Ok(receiveResults); + } + + public static async Task HandleAcknowledgeAsync([FromRoute] string topic, [FromRoute] string subscription, [FromBody] LockTokensRequestData requestData, [FromServices] TopicSubscribers topicSubscribers) + { + var succeededLockTokens = new List(); + var failedLockTokens = new List(); + if (requestData?.LockTokens is not null) + { + foreach (var token in requestData.LockTokens) + { + if (token is null) + { + continue; + } + + if (topicSubscribers.TryDeleteEvent(topic, subscription, token)) + { + succeededLockTokens.Add(token); + } + else + { + failedLockTokens.Add(new() { LockToken = token, Error = new() { Message = "invalid token" } }); + } + } + } + + return Results.Ok(new LockTokensResultsData + { + FailedLockTokens = failedLockTokens, + SucceededLockTokens = succeededLockTokens, + }); + } + + public static async Task HandleReleaseAsync([FromRoute] string topic, [FromRoute] string subscription, [FromBody] LockTokensRequestData requestData, [FromServices] TopicSubscribers topicSubscribers) + { + var succeededLockTokens = new List(); + var failedLockTokens = new List(); + if (requestData?.LockTokens is not null) + { + foreach (var token in requestData.LockTokens) + { + if (token is null) + { + continue; + } + + if (topicSubscribers.TryReleaseEvent(topic, subscription, token)) + { + succeededLockTokens.Add(token); + } + else + { + failedLockTokens.Add(new() { LockToken = token, Error = new() { Message = "invalid token" } }); + } + } + } + + return Results.Ok(new LockTokensResultsData + { + FailedLockTokens = failedLockTokens, + SucceededLockTokens = succeededLockTokens, + }); + } + + public static async Task HandleRejectAsync([FromRoute] string topic, [FromRoute] string subscription, [FromBody] LockTokensRequestData requestData, [FromServices] TopicSubscribers topicSubscribers) + { + var succeededLockTokens = new List(); + var failedLockTokens = new List(); + if (requestData?.LockTokens is not null) + { + foreach (var token in requestData.LockTokens) + { + if (token is null) + { + continue; + } + + if (topicSubscribers.TryDeleteEvent(topic, subscription, token)) + { + succeededLockTokens.Add(token); + } + else + { + failedLockTokens.Add(new() { LockToken = token, Error = new() { Message = "invalid token" } }); + } + } + } + + return Results.Ok(new LockTokensResultsData + { + FailedLockTokens = failedLockTokens, + SucceededLockTokens = succeededLockTokens, + }); + } +} + +internal sealed class ReceiveResults +{ + public EventObject[]? value { get; set; } +} + +internal sealed class EventObject +{ + public BrokerProperties? BrokerProperties { get; set; } + + public CloudEvent? Event { get; set; } +} + +internal sealed class BrokerProperties +{ + public int DeliveryCount { get; set; } + + public string? LockToken { get; set; } +} + +internal sealed class LockTokensResultsData +{ + public List? FailedLockTokens { get; set; } + + public List? SucceededLockTokens { get; set; } +} + +internal sealed class LockTokensRequestData +{ + public string?[]? LockTokens { get; set; } +} + +internal sealed class FailedLockToken +{ + public string? LockToken { get; set; } + + public ResponseError? Error { get; set; } +} + +internal sealed class ResponseError +{ + [JsonPropertyName("code")] + public string? Code { get; set; } + + [JsonPropertyName("message")] + public string? Message { get; set; } +} \ No newline at end of file diff --git a/src/EventGridEmulator/EventHandling/TopicSubscribers.cs b/src/EventGridEmulator/EventHandling/TopicSubscribers.cs new file mode 100644 index 0000000..5fa6e48 --- /dev/null +++ b/src/EventGridEmulator/EventHandling/TopicSubscribers.cs @@ -0,0 +1,90 @@ +#pragma warning disable SA1121 // Use built-in type alias, doesn't work well with type aliases +using System.Collections.Concurrent; +using System.Globalization; +using System.Threading.Channels; +using LockToken = string; +using SubscriberName = string; +using TopicName = string; + +namespace EventGridEmulator.EventHandling; + +internal sealed class TopicSubscribers +{ + // For each topics, we create a list of subscribers. + // In subscription data, they contain the items in queue. Waiting for acknowledge, release, reject + private readonly ConcurrentDictionary> _subscriptions = new(StringComparer.OrdinalIgnoreCase); + + public void AddEvent(TopicName topicName, SubscriberName subscriptionName, T[] events) + { + var subscription = this.GetSubscriptionInfo(topicName, subscriptionName); + foreach (var item in events) + { + subscription.AddItem(item); + } + } + + public ValueTask<(T Item, LockToken LockToken)> GetEventAsync(TopicName topicName, SubscriberName subscriptionName, CancellationToken cancellationToken) + { + var subscription = this.GetSubscriptionInfo(topicName, subscriptionName); + return subscription.GetItemAsync(cancellationToken); + } + + public bool TryDeleteEvent(TopicName topicName, SubscriberName subscriptionName, LockToken lockToken) + { + var subscription = this.GetSubscriptionInfo(topicName, subscriptionName); + return subscription.RemoveItem(lockToken); + } + + public bool TryReleaseEvent(TopicName topicName, SubscriberName subscriptionName, LockToken lockToken) + { + var subscription = this.GetSubscriptionInfo(topicName, subscriptionName); + return subscription.ReleaseItem(lockToken); + } + + private SubscriptionData GetSubscriptionInfo(TopicName topicName, SubscriberName subscriptionName) + { + var subscriptions = this._subscriptions.GetOrAdd(topicName, _ => new(StringComparer.OrdinalIgnoreCase)); + return subscriptions.GetOrAdd(subscriptionName, _ => new()); + } + + private sealed class SubscriptionData + { + private readonly Channel _queue; + private readonly ConcurrentDictionary _inFlightItems; + + private long _lockToken; + + public SubscriptionData() + { + this._queue = Channel.CreateUnbounded(); + this._inFlightItems = new ConcurrentDictionary(); + } + + public void AddItem(T item) => this._queue.Writer.TryWrite(item); // TryWrite always succeeds with Unbounded channels + + public async ValueTask<(T Item, LockToken LockToken)> GetItemAsync(CancellationToken cancellationToken) + { + var item = await this._queue.Reader.ReadAsync(cancellationToken); + var token = "token-" + Interlocked.Increment(ref this._lockToken).ToString(CultureInfo.InvariantCulture); + this._inFlightItems.TryAdd(token, item); + return (item, token); + } + + public bool RemoveItem(LockToken lockToken) + { + return this._inFlightItems.TryRemove(lockToken, out _); + } + + // Azure EventGrid does not guarantee the ordering of events, the behavior of our emulator will be to enqueue the items back. + public bool ReleaseItem(LockToken lockToken) + { + if (this._inFlightItems.TryRemove(lockToken, out var item)) + { + this.AddItem(item!); + return true; + } + + return false; + } + } +} diff --git a/src/EventGridEmulator/Network/SubscriberPolicyHttpMessageHandler.cs b/src/EventGridEmulator/Network/SubscriberPolicyHttpMessageHandler.cs index c22d39b..2874e0f 100644 --- a/src/EventGridEmulator/Network/SubscriberPolicyHttpMessageHandler.cs +++ b/src/EventGridEmulator/Network/SubscriberPolicyHttpMessageHandler.cs @@ -5,6 +5,7 @@ using System.Net.Sockets; using Microsoft.Extensions.Http; using Polly; +using Polly.Retry; namespace EventGridEmulator.Network; @@ -21,7 +22,7 @@ public SubscriberPolicyHttpMessageHandler(ILogger GetRetryPolicy() => Policy + private static AsyncRetryPolicy GetRetryPolicy() => Policy .Handle() .OrResult(IsRetriableHttpResponseMessage) .WaitAndRetryAsync(JitteredEventGridRetrySchedule()); diff --git a/src/EventGridEmulator/Program.cs b/src/EventGridEmulator/Program.cs index 6598eb1..438bc0c 100644 --- a/src/EventGridEmulator/Program.cs +++ b/src/EventGridEmulator/Program.cs @@ -1,6 +1,8 @@ +using Azure.Messaging; using EventGridEmulator.Configuration; using EventGridEmulator.EventHandling; using EventGridEmulator.Network; +using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Options; using Serilog; @@ -34,17 +36,24 @@ builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); -builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(typeof(TopicSubscribers<>)); builder.Services.AddHostedService(); builder.Services.AddHostedService(); var app = builder.Build(); -app.MapPost(CompositeEventHttpContextHandler.Route, CompositeEventHttpContextHandler.HandleAsync); +app.MapPost(EventGridPublishHandler.CustomTopicRoute, EventGridPublishHandler.HandleCustomTopicEventAsync); +app.MapPost(EventGridPublishHandler.NamespaceTopicRoute, EventGridPublishHandler.HandleNamespaceTopicEventAsync); +app.MapPost(PullQueueHttpContextHandler.ReceiveRoute, PullQueueHttpContextHandler.HandleReceiveAsync); +app.MapPost(PullQueueHttpContextHandler.AcknowledgeRoute, PullQueueHttpContextHandler.HandleAcknowledgeAsync); +app.MapPost(PullQueueHttpContextHandler.ReleaseRoute, PullQueueHttpContextHandler.HandleReleaseAsync); +app.MapPost(PullQueueHttpContextHandler.RejectRoute, PullQueueHttpContextHandler.HandleRejectAsync); app.Run(); // For integration testing purposes only in order to use WebApplicationFactory public abstract partial class Program { -} \ No newline at end of file +} diff --git a/src/Samples/Publisher/Publisher.csproj b/src/Samples/Publisher/Publisher.csproj index dafaae0..d81a090 100644 --- a/src/Samples/Publisher/Publisher.csproj +++ b/src/Samples/Publisher/Publisher.csproj @@ -1,7 +1,7 @@ Exe - net7.0 + net8.0 enable enable diff --git a/src/Samples/Subscriber/Subscriber.csproj b/src/Samples/Subscriber/Subscriber.csproj index 3ac59c3..edbdea4 100644 --- a/src/Samples/Subscriber/Subscriber.csproj +++ b/src/Samples/Subscriber/Subscriber.csproj @@ -1,6 +1,6 @@ - net7.0 + net8.0 enable enable diff --git a/src/global.json b/src/global.json index 924a0e0..8ab359e 100644 --- a/src/global.json +++ b/src/global.json @@ -1,6 +1,6 @@ { "sdk": { "rollForward": "latestMinor", - "version": "7.0.406" + "version": "8.0.201" } } \ No newline at end of file