From 57245bc90be0eb027e83f042c7092e7bb5e3a702 Mon Sep 17 00:00:00 2001 From: Doug Gish Date: Thu, 25 Apr 2019 10:40:57 -0600 Subject: [PATCH] Initial port of Java library --- IO.Eventuate.Tram.IntegrationTests/Dockerfile | 4 + .../Dockerfile-dbsetup | 5 + .../IO.Eventuate.Tram.IntegrationTests.csproj | 39 ++ .../TestDatabase/entrypoint.sh | 9 + .../TestDatabase/initialize-database.sql | 24 ++ .../TestFixtures/BadSchemaIntegrationTests.cs | 40 ++ .../TestFixtures/IntegrationTestsBase.cs | 195 ++++++++++ .../TestFixtures/PerformanceTests.cs | 62 +++ .../ProducerConsumerIntegrationTests.cs | 355 ++++++++++++++++++ .../TestHelpers/TestEventConsumer.cs | 103 +++++ .../TestHelpers/TestHostBuilder.cs | 87 +++++ .../TestHelpers/TestMessageInterceptor.cs | 69 ++++ .../TestHelpers/TestMessageType1.cs | 30 ++ .../TestHelpers/TestMessageType2.cs | 29 ++ .../TestHelpers/TestMessageType3.cs | 14 + .../TestSettings.cs | 26 ++ .../docker-compose.yml | 97 +++++ .../nlog.config | 32 ++ .../testsettings.json | 6 + .../Subscriber/DomainEventDispatcherTests.cs | 76 ++++ .../IO.Eventuate.Tram.UnitTests.csproj | 20 + .../Producer/AbstractMessageProducerTests.cs | 54 +++ .../Producer/HttpDateHeaderFormatUtilTests.cs | 17 + IO.Eventuate.Tram.sln | 40 ++ IO.Eventuate.Tram/Database/EventuateSchema.cs | 19 + .../Database/EventuateTramDbContext.cs | 83 ++++ .../AttributeEventTypeNamingStrategy.cs | 22 ++ .../Events/Common/EventMessageHeaders.cs | 9 + .../Events/Common/EventTypeAttribute.cs | 25 ++ .../Events/Common/IDomainEvent.cs | 10 + .../Events/Common/IEventTypeNamingStrategy.cs | 17 + .../Events/Publisher/DomainEventPublisher.cs | 63 ++++ .../Events/Publisher/IDomainEventPublisher.cs | 12 + .../Subscriber/DomainEventDispatcher.cs | 65 ++++ .../DomainEventDispatcherInitializer.cs | 36 ++ .../Events/Subscriber/DomainEventEnvelope.cs | 27 ++ .../Events/Subscriber/DomainEventHandler.cs | 35 ++ .../Events/Subscriber/DomainEventHandlers.cs | 28 ++ .../Subscriber/DomainEventHandlersBuilder.cs | 46 +++ .../Events/Subscriber/IDomainEventEnvelope.cs | 15 + .../Events/Subscriber/IDomainEventHandler.cs | 9 + ...ventuateTramServiceCollectionExtensions.cs | 87 +++++ IO.Eventuate.Tram/IIdGenerator.cs | 7 + IO.Eventuate.Tram/IO.Eventuate.Tram.csproj | 17 + IO.Eventuate.Tram/IdGenerator.cs | 136 +++++++ IO.Eventuate.Tram/Int128.cs | 89 +++++ IO.Eventuate.Tram/JsonMapper.cs | 35 ++ .../Messaging/Common/IMessage.cs | 19 + .../Messaging/Common/IMessageInterceptor.cs | 19 + IO.Eventuate.Tram/Messaging/Common/Message.cs | 76 ++++ .../Messaging/Common/MessageHeaders.cs | 10 + .../Messaging/Consumer/IMessageConsumer.cs | 19 + .../Kafka/ConsumerPropertiesFactory.cs | 23 ++ .../Consumer/Kafka/EventuateKafkaConsumer.cs | 173 +++++++++ ...ateKafkaConsumerConfigurationProperties.cs | 14 + .../Kafka/IDuplicateMessageDetector.cs | 7 + .../Consumer/Kafka/KafkaMessageConsumer.cs | 193 ++++++++++ .../Consumer/Kafka/KafkaMessageProcessor.cs | 92 +++++ .../Messaging/Consumer/Kafka/OffsetTracker.cs | 119 ++++++ .../Consumer/Kafka/ReceivedMessage.cs | 8 + .../SqlTableBasedDuplicateMessageDetector.cs | 49 +++ .../Consumer/Kafka/SwimlaneBasedDispatcher.cs | 48 +++ .../Consumer/Kafka/SwimlaneDispatcher.cs | 98 +++++ .../Consumer/Kafka/TopicPartitionOffsets.cs | 113 ++++++ .../Messaging/Consumer/MessageHandler.cs | 7 + .../Producer/AbstractMessageProducer.cs | 78 ++++ .../Producer/HttpDateHeaderFormatUtil.cs | 13 + .../Messaging/Producer/IMessageProducer.cs | 17 + .../Messaging/Producer/IMessageSender.cs | 9 + .../Messaging/Producer/MessageBuilder.cs | 53 +++ .../Messaging/Producer/Outbox/Message.cs | 26 ++ .../Producer/Outbox/OutboxMessageProducer.cs | 63 ++++ LICENSE.md | 11 + README.md | 238 +++++++++++- 74 files changed, 3918 insertions(+), 2 deletions(-) create mode 100644 IO.Eventuate.Tram.IntegrationTests/Dockerfile create mode 100644 IO.Eventuate.Tram.IntegrationTests/Dockerfile-dbsetup create mode 100644 IO.Eventuate.Tram.IntegrationTests/IO.Eventuate.Tram.IntegrationTests.csproj create mode 100644 IO.Eventuate.Tram.IntegrationTests/TestDatabase/entrypoint.sh create mode 100644 IO.Eventuate.Tram.IntegrationTests/TestDatabase/initialize-database.sql create mode 100644 IO.Eventuate.Tram.IntegrationTests/TestFixtures/BadSchemaIntegrationTests.cs create mode 100644 IO.Eventuate.Tram.IntegrationTests/TestFixtures/IntegrationTestsBase.cs create mode 100644 IO.Eventuate.Tram.IntegrationTests/TestFixtures/PerformanceTests.cs create mode 100644 IO.Eventuate.Tram.IntegrationTests/TestFixtures/ProducerConsumerIntegrationTests.cs create mode 100644 IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestEventConsumer.cs create mode 100644 IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestHostBuilder.cs create mode 100644 IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageInterceptor.cs create mode 100644 IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType1.cs create mode 100644 IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType2.cs create mode 100644 IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType3.cs create mode 100644 IO.Eventuate.Tram.IntegrationTests/TestSettings.cs create mode 100644 IO.Eventuate.Tram.IntegrationTests/docker-compose.yml create mode 100644 IO.Eventuate.Tram.IntegrationTests/nlog.config create mode 100644 IO.Eventuate.Tram.IntegrationTests/testsettings.json create mode 100644 IO.Eventuate.Tram.UnitTests/Events/Subscriber/DomainEventDispatcherTests.cs create mode 100644 IO.Eventuate.Tram.UnitTests/IO.Eventuate.Tram.UnitTests.csproj create mode 100644 IO.Eventuate.Tram.UnitTests/Messaging/Producer/AbstractMessageProducerTests.cs create mode 100644 IO.Eventuate.Tram.UnitTests/Messaging/Producer/HttpDateHeaderFormatUtilTests.cs create mode 100644 IO.Eventuate.Tram.sln create mode 100644 IO.Eventuate.Tram/Database/EventuateSchema.cs create mode 100644 IO.Eventuate.Tram/Database/EventuateTramDbContext.cs create mode 100644 IO.Eventuate.Tram/Events/Common/AttributeEventTypeNamingStrategy.cs create mode 100644 IO.Eventuate.Tram/Events/Common/EventMessageHeaders.cs create mode 100644 IO.Eventuate.Tram/Events/Common/EventTypeAttribute.cs create mode 100644 IO.Eventuate.Tram/Events/Common/IDomainEvent.cs create mode 100644 IO.Eventuate.Tram/Events/Common/IEventTypeNamingStrategy.cs create mode 100644 IO.Eventuate.Tram/Events/Publisher/DomainEventPublisher.cs create mode 100644 IO.Eventuate.Tram/Events/Publisher/IDomainEventPublisher.cs create mode 100644 IO.Eventuate.Tram/Events/Subscriber/DomainEventDispatcher.cs create mode 100644 IO.Eventuate.Tram/Events/Subscriber/DomainEventDispatcherInitializer.cs create mode 100644 IO.Eventuate.Tram/Events/Subscriber/DomainEventEnvelope.cs create mode 100644 IO.Eventuate.Tram/Events/Subscriber/DomainEventHandler.cs create mode 100644 IO.Eventuate.Tram/Events/Subscriber/DomainEventHandlers.cs create mode 100644 IO.Eventuate.Tram/Events/Subscriber/DomainEventHandlersBuilder.cs create mode 100644 IO.Eventuate.Tram/Events/Subscriber/IDomainEventEnvelope.cs create mode 100644 IO.Eventuate.Tram/Events/Subscriber/IDomainEventHandler.cs create mode 100644 IO.Eventuate.Tram/EventuateTramServiceCollectionExtensions.cs create mode 100644 IO.Eventuate.Tram/IIdGenerator.cs create mode 100644 IO.Eventuate.Tram/IO.Eventuate.Tram.csproj create mode 100644 IO.Eventuate.Tram/IdGenerator.cs create mode 100644 IO.Eventuate.Tram/Int128.cs create mode 100644 IO.Eventuate.Tram/JsonMapper.cs create mode 100644 IO.Eventuate.Tram/Messaging/Common/IMessage.cs create mode 100644 IO.Eventuate.Tram/Messaging/Common/IMessageInterceptor.cs create mode 100644 IO.Eventuate.Tram/Messaging/Common/Message.cs create mode 100644 IO.Eventuate.Tram/Messaging/Common/MessageHeaders.cs create mode 100644 IO.Eventuate.Tram/Messaging/Consumer/IMessageConsumer.cs create mode 100644 IO.Eventuate.Tram/Messaging/Consumer/Kafka/ConsumerPropertiesFactory.cs create mode 100644 IO.Eventuate.Tram/Messaging/Consumer/Kafka/EventuateKafkaConsumer.cs create mode 100644 IO.Eventuate.Tram/Messaging/Consumer/Kafka/EventuateKafkaConsumerConfigurationProperties.cs create mode 100644 IO.Eventuate.Tram/Messaging/Consumer/Kafka/IDuplicateMessageDetector.cs create mode 100644 IO.Eventuate.Tram/Messaging/Consumer/Kafka/KafkaMessageConsumer.cs create mode 100644 IO.Eventuate.Tram/Messaging/Consumer/Kafka/KafkaMessageProcessor.cs create mode 100644 IO.Eventuate.Tram/Messaging/Consumer/Kafka/OffsetTracker.cs create mode 100644 IO.Eventuate.Tram/Messaging/Consumer/Kafka/ReceivedMessage.cs create mode 100644 IO.Eventuate.Tram/Messaging/Consumer/Kafka/SqlTableBasedDuplicateMessageDetector.cs create mode 100644 IO.Eventuate.Tram/Messaging/Consumer/Kafka/SwimlaneBasedDispatcher.cs create mode 100644 IO.Eventuate.Tram/Messaging/Consumer/Kafka/SwimlaneDispatcher.cs create mode 100644 IO.Eventuate.Tram/Messaging/Consumer/Kafka/TopicPartitionOffsets.cs create mode 100644 IO.Eventuate.Tram/Messaging/Consumer/MessageHandler.cs create mode 100644 IO.Eventuate.Tram/Messaging/Producer/AbstractMessageProducer.cs create mode 100644 IO.Eventuate.Tram/Messaging/Producer/HttpDateHeaderFormatUtil.cs create mode 100644 IO.Eventuate.Tram/Messaging/Producer/IMessageProducer.cs create mode 100644 IO.Eventuate.Tram/Messaging/Producer/IMessageSender.cs create mode 100644 IO.Eventuate.Tram/Messaging/Producer/MessageBuilder.cs create mode 100644 IO.Eventuate.Tram/Messaging/Producer/Outbox/Message.cs create mode 100644 IO.Eventuate.Tram/Messaging/Producer/Outbox/OutboxMessageProducer.cs create mode 100644 LICENSE.md diff --git a/IO.Eventuate.Tram.IntegrationTests/Dockerfile b/IO.Eventuate.Tram.IntegrationTests/Dockerfile new file mode 100644 index 0000000..e993dcd --- /dev/null +++ b/IO.Eventuate.Tram.IntegrationTests/Dockerfile @@ -0,0 +1,4 @@ +FROM microsoft/dotnet:2.1-sdk +WORKDIR /app + +ENTRYPOINT [ "dotnet", "vstest", "IO.Eventuate.Tram.IntegrationTests.dll", "/logger:trx" ] diff --git a/IO.Eventuate.Tram.IntegrationTests/Dockerfile-dbsetup b/IO.Eventuate.Tram.IntegrationTests/Dockerfile-dbsetup new file mode 100644 index 0000000..d2942b0 --- /dev/null +++ b/IO.Eventuate.Tram.IntegrationTests/Dockerfile-dbsetup @@ -0,0 +1,5 @@ +FROM microsoft/mssql-tools:latest +WORKDIR /scripts +COPY ./TestDatabase/* ./ +RUN chmod +x ./entrypoint.sh +CMD ./entrypoint.sh \ No newline at end of file diff --git a/IO.Eventuate.Tram.IntegrationTests/IO.Eventuate.Tram.IntegrationTests.csproj b/IO.Eventuate.Tram.IntegrationTests/IO.Eventuate.Tram.IntegrationTests.csproj new file mode 100644 index 0000000..c5ce5bb --- /dev/null +++ b/IO.Eventuate.Tram.IntegrationTests/IO.Eventuate.Tram.IntegrationTests.csproj @@ -0,0 +1,39 @@ + + + + netcoreapp2.1 + + false + + + + + + + + + + + + PreserveNewest + + + PreserveNewest + + + + + + + + + + + + + + + + + + diff --git a/IO.Eventuate.Tram.IntegrationTests/TestDatabase/entrypoint.sh b/IO.Eventuate.Tram.IntegrationTests/TestDatabase/entrypoint.sh new file mode 100644 index 0000000..4394f2e --- /dev/null +++ b/IO.Eventuate.Tram.IntegrationTests/TestDatabase/entrypoint.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +#build the Tram test database +/opt/mssql-tools/bin/sqlcmd -S $TRAM_DB_SERVER -U sa -P $TRAM_SA_PASSWORD -b -Q "CREATE DATABASE $TRAM_DB" || exit 1 +/opt/mssql-tools/bin/sqlcmd -S $TRAM_DB_SERVER -U sa -P $TRAM_SA_PASSWORD -b -Q "ALTER DATABASE $TRAM_DB SET RECOVERY SIMPLE" || exit 1 +export TRAM_SCHEMA=$TRAM_SCHEMA +/opt/mssql-tools/bin/sqlcmd -S $TRAM_DB_SERVER -U sa -P $TRAM_SA_PASSWORD -b -d $TRAM_DB -I -i initialize-database.sql || exit 1 +export TRAM_SCHEMA=$TRAM_SCHEMA2 +/opt/mssql-tools/bin/sqlcmd -S $TRAM_DB_SERVER -U sa -P $TRAM_SA_PASSWORD -b -d $TRAM_DB -I -i initialize-database.sql || exit 1 \ No newline at end of file diff --git a/IO.Eventuate.Tram.IntegrationTests/TestDatabase/initialize-database.sql b/IO.Eventuate.Tram.IntegrationTests/TestDatabase/initialize-database.sql new file mode 100644 index 0000000..80e0ef0 --- /dev/null +++ b/IO.Eventuate.Tram.IntegrationTests/TestDatabase/initialize-database.sql @@ -0,0 +1,24 @@ +IF (NOT EXISTS( + SELECT name + FROM sys.schemas + WHERE name = '$(TRAM_SCHEMA)')) +BEGIN + EXEC('CREATE SCHEMA [$(TRAM_SCHEMA)]') +END + +DROP TABLE IF EXISTS $(TRAM_SCHEMA).message; +DROP TABLE IF EXISTS $(TRAM_SCHEMA).received_messages; + +CREATE TABLE $(TRAM_SCHEMA).message ( + id NVARCHAR(200) PRIMARY KEY, + destination NVARCHAR(1000) NOT NULL, + headers NVARCHAR(1000) NOT NULL, + payload NVARCHAR(MAX) NOT NULL, + published SMALLINT DEFAULT 0 +); + +CREATE TABLE $(TRAM_SCHEMA).received_messages ( + consumer_id NVARCHAR(200), + message_id NVARCHAR(200), + PRIMARY KEY(consumer_id, message_id) +); diff --git a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/BadSchemaIntegrationTests.cs b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/BadSchemaIntegrationTests.cs new file mode 100644 index 0000000..eee4b9b --- /dev/null +++ b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/BadSchemaIntegrationTests.cs @@ -0,0 +1,40 @@ +using System.Collections.Generic; +using IO.Eventuate.Tram.Events.Common; +using IO.Eventuate.Tram.IntegrationTests.TestHelpers; +using IO.Eventuate.Tram.Messaging.Consumer.Kafka; +using NUnit.Framework; +using Microsoft.EntityFrameworkCore; + +namespace IO.Eventuate.Tram.IntegrationTests.TestFixtures +{ + [TestFixture] + public class BadSchemaIntegrationTests : IntegrationTestsBase + { + [SetUp] + public void Setup() + { + TestSetup("badschema", false, EventuateKafkaConsumerConfigurationProperties.Empty()); + } + + [TearDown] + public void TearDown() + { + DisposeTestHost(); + } + + [Test] + public void Publish_DatabaseSchemaNotCreated_ThrowsException() + { + // Arrange + TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2); + TestEventConsumer consumer = GetTestConsumer(); + + // Act + GetTestPublisher().Publish(AggregateType, AggregateType, new List { msg1 }); + Assert.Throws(delegate () + { + GetDbContext().SaveChanges(); + }); + } + } +} diff --git a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/IntegrationTestsBase.cs b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/IntegrationTestsBase.cs new file mode 100644 index 0000000..b1178e6 --- /dev/null +++ b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/IntegrationTestsBase.cs @@ -0,0 +1,195 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Confluent.Kafka; +using IO.Eventuate.Tram.Database; +using IO.Eventuate.Tram.Events.Publisher; +using IO.Eventuate.Tram.IntegrationTests.TestHelpers; +using IO.Eventuate.Tram.Messaging.Common; +using IO.Eventuate.Tram.Messaging.Consumer.Kafka; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using NUnit.Framework; + +namespace IO.Eventuate.Tram.IntegrationTests.TestFixtures +{ + public class IntegrationTestsBase + { + private const string TestSettingsFile = "testsettings.json"; + private string _subscriberId = "xx"; + protected const string AggregateType = "TestMessagesTopic"; + protected string EventuateDatabaseSchemaName = "eventuate"; + + protected TestSettings TestSettings; + + // There can only be one of these between all of the tests because there's no good way to tear it down and start again + private static IHost _host = null; + private static EventuateTramDbContext _dbContext = null; + private static IDomainEventPublisher _domainEventPublisher = null; + private static TestEventConsumer _testEventConsumer = null; + private static TestMessageInterceptor _interceptor = null; + + public IntegrationTestsBase() + { + IConfigurationRoot configuration = null; + try + { + IConfigurationBuilder configurationBuilder = new ConfigurationBuilder(); + ConfigureFromEnvironmentAndSettingsFile(configurationBuilder); + configuration = configurationBuilder.Build(); + } + catch + { + IConfigurationBuilder configurationBuilder = new ConfigurationBuilder(); + ConfigureFromEnvironment(configurationBuilder); + configuration = configurationBuilder.Build(); + } + + TestSettings = configuration.Get(); + } + + protected void TestSetup(string schema, bool withInterceptor, EventuateKafkaConsumerConfigurationProperties consumerConfigProperties) + { + EventuateDatabaseSchemaName = schema; + _subscriberId = Guid.NewGuid().ToString(); + + if (_host == null) + { + _host = SetupTestHost(withInterceptor, consumerConfigProperties); + _dbContext = _host.Services.GetService(); + _domainEventPublisher = _host.Services.GetService(); + _testEventConsumer = _host.Services.GetService(); + _interceptor = (TestMessageInterceptor)_host.Services.GetService(); + } + } + + protected void CleanupTest() + { + ClearDb(GetDbContext(), EventuateDatabaseSchemaName); + GetTestConsumer().Reset(); + GetTestMessageInterceptor()?.Reset(); + } + + protected void CleanupKafka() + { + var config = new AdminClientConfig(); + config.BootstrapServers = TestSettings.KafkaBootstrapServers; + using (var admin = new AdminClientBuilder(config).Build()) + { + admin.DeleteTopicsAsync(new string[] { AggregateType }).Wait(); + } + } + + protected void ShowTestResults() + { + TestContext.WriteLine("Test Config"); + TestContext.WriteLine(" Connection String: {0}", TestSettings.ConnectionStrings.EventuateTramDbConnection); + TestContext.WriteLine(" Kafka server: {0}", TestSettings.KafkaBootstrapServers); + TestContext.WriteLine(" Schema: {0}", EventuateDatabaseSchemaName); + TestContext.WriteLine(" Dispatcher Id: {0}", _subscriberId); + TestContext.WriteLine(" Aggregate Type: {0}", AggregateType); + + TestContext.WriteLine("Test Results"); + TestContext.WriteLine(" N Messages in DB: {0}", _dbContext.Messages.Count()); + TestContext.WriteLine(" Unpublished Count: {0}", _dbContext.Messages.Count(msg => msg.Published == 0)); + TestContext.WriteLine(" N Received in DB: {0}", _dbContext.ReceivedMessages.Count(msg => msg.MessageId != null)); + TestContext.WriteLine(" Received Type 1: {0}", _testEventConsumer.Type1MessageCount); + TestContext.WriteLine(" Received Type 2: {0}", _testEventConsumer.Type2MessageCount); + TestContext.WriteLine(" Exception Count: {0}", _testEventConsumer.ExceptionCount); + + if (_interceptor != null) + { + TestContext.WriteLine("Message Interceptor Counts"); + TestContext.WriteLine(" Pre Send: {0}", _interceptor.PreSendCount); + TestContext.WriteLine(" Post Send: {0}", _interceptor.PostSendCount); + TestContext.WriteLine(" Pre Receive: {0}", _interceptor.PreReceiveCount); + TestContext.WriteLine(" Post Receive: {0}", _interceptor.PostReceiveCount); + TestContext.WriteLine(" Pre Handle: {0}", _interceptor.PreHandleCount); + TestContext.WriteLine(" Post Handle: {0}", _interceptor.PostHandleCount); + } + } + + /// + /// Set up the configuration for the HostBuilder + /// + protected void ConfigureFromEnvironmentAndSettingsFile(IConfigurationBuilder config, + Dictionary overrides = null) + { + config + .AddJsonFile(TestSettingsFile, false) + .AddEnvironmentVariables() + .AddInMemoryCollection(overrides); + } + + /// + /// Set up the configuration for the HostBuilder + /// + protected void ConfigureFromEnvironment(IConfigurationBuilder config, + Dictionary overrides = null) + { + config + .AddEnvironmentVariables() + .AddInMemoryCollection(overrides); + } + + protected IHost SetupTestHost(bool withInterceptor, EventuateKafkaConsumerConfigurationProperties consumerConfigProperties) + { + var host = new TestHostBuilder() + .SetConnectionString(TestSettings.ConnectionStrings.EventuateTramDbConnection) + .SetEventuateDatabaseSchemaName(EventuateDatabaseSchemaName) + .SetKafkaBootstrapServers(TestSettings.KafkaBootstrapServers) + .SetSubscriberId(_subscriberId) + .SetDomainEventHandlersFactory( + provider => + { + var consumer = provider.GetRequiredService(); + return consumer.DomainEventHandlers(AggregateType); + }) + .SetConsumerConfigProperties(consumerConfigProperties) + .Build(withInterceptor); + host.StartAsync().Wait(); + return host; + } + + protected void DisposeTestHost() + { + if (_host == null) + return; + + _host.StopAsync().Wait(); + _host.Dispose(); + _host = null; + _dbContext = null; + _domainEventPublisher = null; + _testEventConsumer = null; + } + + protected TestEventConsumer GetTestConsumer() + { + return _testEventConsumer; + } + + protected TestMessageInterceptor GetTestMessageInterceptor() + { + return _interceptor; + } + + protected IDomainEventPublisher GetTestPublisher() + { + return _domainEventPublisher; + } + + protected EventuateTramDbContext GetDbContext() + { + return _dbContext; + } + + protected void ClearDb(EventuateTramDbContext dbContext, String eventuateDatabaseSchemaName) + { + dbContext.Database.ExecuteSqlCommand(String.Format("Delete from [{0}].[message]", eventuateDatabaseSchemaName)); + dbContext.Database.ExecuteSqlCommand(String.Format("Delete from [{0}].[received_messages]", eventuateDatabaseSchemaName)); + } + } +} diff --git a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/PerformanceTests.cs b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/PerformanceTests.cs new file mode 100644 index 0000000..6bb125c --- /dev/null +++ b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/PerformanceTests.cs @@ -0,0 +1,62 @@ +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using IO.Eventuate.Tram.Events.Common; +using IO.Eventuate.Tram.IntegrationTests.TestHelpers; +using IO.Eventuate.Tram.Messaging.Consumer.Kafka; +using NUnit.Framework; + +namespace IO.Eventuate.Tram.IntegrationTests.TestFixtures +{ + [TestFixture] + public class PerformanceTests : IntegrationTestsBase + { + [SetUp] + public void Setup() + { + CleanupKafka(); + TestSetup("eventuate", false, EventuateKafkaConsumerConfigurationProperties.Empty()); + CleanupTest(); + } + + [TearDown] + public void TearDown() + { + DisposeTestHost(); + } + + [Test] + public void Send1000Message_Within1Minute() + { + // Arrange + TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2); + TestEventConsumer consumer = GetTestConsumer(); + + // Act + for (int x = 0; x < 1000; x++) + { + GetTestPublisher().Publish(AggregateType, AggregateType, new List { msg1 }); + GetDbContext().SaveChanges(); + } + + // Allow time for messages to process + int count = 300; + while (consumer.Type1MessageCount < 1000 && count > 0) + { + Thread.Sleep(1000); + count--; + } + + ShowTestResults(); + + // Assert + Assert.AreEqual(1000, GetDbContext().Messages.Count(), "Expect 1000 messages produced"); + Assert.AreEqual(1000, consumer.Type1MessageCount, "Received by consumer count must be 1000"); + Assert.AreEqual(0, GetDbContext().Messages.Count(msg => msg.Published == 0), "No unpublished messages"); + Assert.AreEqual(1000, GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), "Expect 1000 messages received"); + Assert.Less(consumer.Type1Duration().TotalSeconds, 60.0, "Time must be less than 60 seconds"); + + TestContext.WriteLine("Performance Test completed in {0} seconds", consumer.Type1Duration().TotalSeconds); + } + } +} diff --git a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/ProducerConsumerIntegrationTests.cs b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/ProducerConsumerIntegrationTests.cs new file mode 100644 index 0000000..bf3a9a5 --- /dev/null +++ b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/ProducerConsumerIntegrationTests.cs @@ -0,0 +1,355 @@ +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using IO.Eventuate.Tram.Events.Common; +using IO.Eventuate.Tram.IntegrationTests.TestHelpers; +using IO.Eventuate.Tram.Messaging.Consumer.Kafka; +using NUnit.Framework; + +namespace IO.Eventuate.Tram.IntegrationTests.TestFixtures +{ + [TestFixture("eventuate")] + [TestFixture("schema1")] + public class ProducerConsumerIntegrationTests : IntegrationTestsBase + { + private readonly string _schema; + + public ProducerConsumerIntegrationTests(string schema) + { + _schema = schema; + } + + [SetUp] + public void Setup() + { + CleanupKafka(); + TestSetup(_schema, true, EventuateKafkaConsumerConfigurationProperties.Empty()); + CleanupTest(); + } + + [TearDown] + public void TearDown() + { + DisposeTestHost(); + } + + [Test] + public void Publish_SingleSubscribedMessageType1_MessageReceived() + { + // Arrange + TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2); + TestEventConsumer consumer = GetTestConsumer(); + + // Act + GetTestPublisher().Publish(AggregateType, AggregateType, new List { msg1 }); + GetDbContext().SaveChanges(); + + // Allow time for messages to process + int count = 10; + while (consumer.Type1MessageCount < 1 && count > 0) + { + Thread.Sleep(1000); + count--; + } + + ShowTestResults(); + + // Assert + Assert.That(GetDbContext().Messages.Count(), + Is.EqualTo(1), "Number of messages produced"); + Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), + Is.EqualTo(0), "Number of unpublished messages"); + Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), + Is.EqualTo(1), "Number of received messages"); + Assert.That(consumer.Type1MessageCount, + Is.EqualTo(1), "Number of Type 1 messages received by consumer"); + Assert.That(consumer.ReceivedType1Messages.Count, + Is.EqualTo(1), "Number of received type 1 messages"); + msg1.AssertGoodMessageReceived(consumer.ReceivedType1Messages[0]); + + GetTestMessageInterceptor()?.AssertCounts(1, 1, 1, 1, 1, 1); + } + + [Test] + public void Publish_SingleSubscribedMessageType2_MessageReceived() + { + // Arrange + TestMessageType2 msg2 = new TestMessageType2("Msg2", 2); + TestEventConsumer consumer = GetTestConsumer(); + + // Act + GetTestPublisher().Publish(AggregateType, AggregateType, new List { msg2 }); + GetDbContext().SaveChanges(); + + // Allow time for messages to process + int count = 10; + while (consumer.Type2MessageCount < 1 && count > 0) + { + Thread.Sleep(1000); + count--; + } + + ShowTestResults(); + + // Assert + Assert.That(GetDbContext().Messages.Count(), + Is.EqualTo(1), "Number of messages produced"); + Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), + Is.EqualTo(0), "Number of unpublished messages"); + Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), + Is.EqualTo(1), "Number of received messages"); + Assert.That(consumer.Type2MessageCount, + Is.EqualTo(1), "Number of Type 2 messages received by consumer"); + Assert.That(consumer.ReceivedType2Messages.Count, + Is.EqualTo(1), "Number of received type 2 messages"); + + msg2.AssertGoodMessageReceived(consumer.ReceivedType2Messages[0]); + + GetTestMessageInterceptor()?.AssertCounts(1, 1, 1, 1, 1, 1); + } + + [Test] + public void Publish_MultipleSubscribedMessageTypes_AllMessagesReceived() + { + // Arrange + TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2); + TestMessageType2 msg2 = new TestMessageType2("Msg2", 2); + TestEventConsumer consumer = GetTestConsumer(); + + // Act + GetTestPublisher().Publish(AggregateType, AggregateType, new List { msg1 }); + GetDbContext().SaveChanges(); + GetTestPublisher().Publish(AggregateType, AggregateType, new List { msg2 }); + GetDbContext().SaveChanges(); + + // Allow time for messages to process + int count = 10; + while (consumer.TotalMessageCount() < 2 && count > 0) + { + Thread.Sleep(1000); + count--; + } + + ShowTestResults(); + + // Assert + Assert.That(GetDbContext().Messages.Count(), + Is.EqualTo(2), "Number of messages produced"); + Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), + Is.EqualTo(0), "Number of unpublished messages"); + Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), + Is.EqualTo(2), "Number of received messages"); + Assert.That(consumer.TotalMessageCount(), + Is.EqualTo(2), "Total number of messages received by consumer"); + Assert.That(consumer.Type1MessageCount, + Is.EqualTo(1), "Number of Type 1 messages received by consumer"); + Assert.That(consumer.Type2MessageCount, + Is.EqualTo(1), "Number of Type 2 messages received by consumer"); + Assert.That(consumer.ReceivedType1Messages.Count, + Is.EqualTo(1), "Number of received type 1 messages"); + Assert.That(consumer.ReceivedType2Messages.Count, + Is.EqualTo(1), "Number of received type 2 messages"); + msg1.AssertGoodMessageReceived(consumer.ReceivedType1Messages[0]); + msg2.AssertGoodMessageReceived(consumer.ReceivedType2Messages[0]); + + GetTestMessageInterceptor()?.AssertCounts(2, 2, 2, 2, 2, 2); + } + + [Test] + public void Publish_SubscribedMessageTypeAndUnsubscribedMessageType_ReceivedOnlySubscribedMessageType() + { + // Arrange + TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2); + TestMessageType3 msg3 = new TestMessageType3("Msg3"); + TestEventConsumer consumer = GetTestConsumer(); + + // Act + GetTestPublisher().Publish(AggregateType, AggregateType, new List { msg3 }); + GetDbContext().SaveChanges(); + // Send a following message to identify when we're done + GetTestPublisher().Publish(AggregateType, AggregateType, new List { msg1 }); + GetDbContext().SaveChanges(); + + // Allow time for messages to process + int count = 10; + while (consumer.Type1MessageCount < 1 && count > 0) + { + Thread.Sleep(1000); + count--; + } + + ShowTestResults(); + + // Assert + Assert.That(GetDbContext().Messages.Count(), + Is.EqualTo(2), "Number of messages produced"); + Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), + Is.EqualTo(0), "Number of unpublished messages"); + Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), + Is.EqualTo(2), "Number of received messages"); + Assert.That(consumer.TotalMessageCount(), + Is.EqualTo(1), "Total number of messages received by consumer"); + + GetTestMessageInterceptor()?.AssertCounts(2, 2, 2, 2, 2, 2); + } + + [Test] + public void Publish_SubscribedTopicAndUnsubscribedTopic_ReceivesOnlySubscribedMessage() + { + // Arrange + TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2); + TestEventConsumer consumer = GetTestConsumer(); + + // Act + GetTestPublisher().Publish("BadTopic", "BadTopic", new List { msg1 }); + GetDbContext().SaveChanges(); + // Send a following message to identify when we're done + GetTestPublisher().Publish(AggregateType, AggregateType, new List { msg1 }); + GetDbContext().SaveChanges(); + + // Allow time for messages to process + int count = 10; + while (consumer.Type1MessageCount < 1 && count > 0) + { + Thread.Sleep(1000); + count--; + } + + ShowTestResults(); + + // Assert + Assert.That(GetDbContext().Messages.Count(), + Is.EqualTo(2), "Number of messages produced"); + Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), + Is.EqualTo(0), "Number of unpublished messages"); + Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), + Is.EqualTo(1), "Number of received messages"); + Assert.That(consumer.TotalMessageCount(), + Is.EqualTo(1), "Total number of messages received by consumer"); + + GetTestMessageInterceptor()?.AssertCounts(2, 2, 1, 1, 1, 1); + } + + [Test] + public void Publish_SubscriberThrowsExceptionOn1OfMultipleMessages_AllMessagesHandled() + { + // Arrange + TestMessageType1 badmsg1 = new TestMessageType1("ThrowException", 1, 1.2); + TestMessageType2 msg2a = new TestMessageType2("Msg2a", 1); + TestMessageType2 msg2b = new TestMessageType2("Msg2b", 2); + TestEventConsumer consumer = GetTestConsumer(); + + // Act + GetTestPublisher().Publish(AggregateType, AggregateType, new List { badmsg1 }); + GetDbContext().SaveChanges(); + GetTestPublisher().Publish(AggregateType, AggregateType, new List { msg2a }); + GetDbContext().SaveChanges(); + GetTestPublisher().Publish(AggregateType, AggregateType, new List { msg2b }); + GetDbContext().SaveChanges(); + + // Allow time for messages to process + int count = 10; + while (consumer.Type2MessageCount < 2 && count > 0) + { + Thread.Sleep(1000); + count--; + } + + ShowTestResults(); + + // Assert + Assert.That(GetDbContext().Messages.Count(), + Is.EqualTo(3), "Number of messages produced"); + Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), + Is.EqualTo(0), "Number of unpublished messages"); + Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), + Is.EqualTo(3), "Number of received messages"); + Assert.That(consumer.TotalMessageCount(), + Is.EqualTo(3), "Total number of messages received by consumer"); + Assert.That(consumer.Type1MessageCount, + Is.EqualTo(1), "Number of Type 1 messages received by consumer"); + Assert.That(consumer.Type2MessageCount, + Is.EqualTo(2), "Number of Type 2 messages received by consumer"); + } + + [Test] + public void Publish_DefaultEventTypeName_CorrectEventTypeHeader() + { + // Arrange + TestMessageType1 msg1 = new TestMessageType1("Msg1", 2, 3.3); + TestEventConsumer consumer = GetTestConsumer(); + + // Act + GetTestPublisher().Publish(AggregateType, AggregateType, new List { msg1 }); + GetDbContext().SaveChanges(); + + // Allow time for messages to process + int count = 10; + while (consumer.Type1MessageCount < 1 && count > 0) + { + Thread.Sleep(1000); + count--; + } + + ShowTestResults(); + + // Assert + Assert.That(GetDbContext().Messages.Count(), + Is.EqualTo(1), "Number of messages produced"); + Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), + Is.EqualTo(0), "Number of unpublished messages"); + Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), + Is.EqualTo(1), "Number of received messages"); + Assert.That(consumer.Type1MessageCount, + Is.EqualTo(1), "Number of Type 1 messages received by consumer"); + Assert.That(consumer.ReceivedType1Messages.Count, + Is.EqualTo(1), "Number of received type 1 messages"); + + msg1.AssertGoodMessageReceived(consumer.ReceivedType1Messages[0]); + Assert.That(consumer.ReceivedType1Messages[0].Message.GetHeader(EventMessageHeaders.EventType), + Is.EqualTo(typeof(TestMessageType1).FullName), "Event type header"); + + GetTestMessageInterceptor()?.AssertCounts(1, 1, 1, 1, 1, 1); + } + + [Test] + public void Publish_CustomEventTypeName_CorrectEventTypeHeader() + { + // Arrange + TestMessageType2 msg2 = new TestMessageType2("Msg2", 2); + TestEventConsumer consumer = GetTestConsumer(); + + // Act + GetTestPublisher().Publish(AggregateType, AggregateType, new List { msg2 }); + GetDbContext().SaveChanges(); + + // Allow time for messages to process + int count = 10; + while (consumer.Type2MessageCount < 1 && count > 0) + { + Thread.Sleep(1000); + count--; + } + + ShowTestResults(); + + // Assert + Assert.That(GetDbContext().Messages.Count(), + Is.EqualTo(1), "Number of messages produced"); + Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), + Is.EqualTo(0), "Number of unpublished messages"); + Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), + Is.EqualTo(1), "Number of received messages"); + Assert.That(consumer.Type2MessageCount, + Is.EqualTo(1), "Number of Type 2 messages received by consumer"); + Assert.That(consumer.ReceivedType2Messages.Count, + Is.EqualTo(1), "Number of received type 2 messages"); + + msg2.AssertGoodMessageReceived(consumer.ReceivedType2Messages[0]); + Assert.That(consumer.ReceivedType2Messages[0].Message.GetHeader(EventMessageHeaders.EventType), + Is.EqualTo(TestMessageType2.EventTypeName), "Event type header"); + + GetTestMessageInterceptor()?.AssertCounts(1, 1, 1, 1, 1, 1); + } + } +} diff --git a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestEventConsumer.cs b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestEventConsumer.cs new file mode 100644 index 0000000..5027fac --- /dev/null +++ b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestEventConsumer.cs @@ -0,0 +1,103 @@ +using System; +using System.Collections.Generic; +using IO.Eventuate.Tram.Events.Subscriber; +using Microsoft.Extensions.Logging; + +namespace IO.Eventuate.Tram.IntegrationTests.TestHelpers +{ + public class TestEventConsumer + { + private readonly ILogger _logger; + + public int Type1MessageCount { get; set; } + public int Type2MessageCount { get; set; } + public DateTime Type1FirstMessage { get; set; } + public DateTime Type2FirstMessage { get; set; } + public DateTime Type1LastMessage { get; set; } + public DateTime Type2LastMessage { get; set; } + public List> ReceivedType1Messages { get; set; } + public List> ReceivedType2Messages { get; set; } + public int ExceptionCount = 0; + + public TestEventConsumer(ILogger logger) + { + _logger = logger; + } + + public void Reset() + { + Type1MessageCount = 0; + Type2MessageCount = 0; + Type1FirstMessage = DateTime.MaxValue; + Type2FirstMessage = DateTime.MaxValue; + Type1LastMessage = DateTime.MinValue; + Type2LastMessage = DateTime.MinValue; + ReceivedType1Messages = new List>(); + ReceivedType2Messages = new List>(); + ExceptionCount = 0; + } + + public void DontSaveMessages() + { + ReceivedType1Messages = null; + ReceivedType2Messages = null; + } + + public int TotalMessageCount() + { + return Type1MessageCount + Type2MessageCount; + } + public TimeSpan Type1Duration() + { + return Type1LastMessage > Type1FirstMessage ? Type1LastMessage - Type1FirstMessage : TimeSpan.Zero; + } + public TimeSpan Type2Duration() + { + return Type2LastMessage > Type2FirstMessage ? Type2LastMessage - Type2FirstMessage : TimeSpan.Zero; + } + public TimeSpan TotalDuration() + { + return Type1Duration() + Type2Duration(); + } + + public DomainEventHandlers DomainEventHandlers(String aggregateType) + { + return DomainEventHandlersBuilder.ForAggregateType(aggregateType) + .OnEvent(HandleMessageType1Event) + .OnEvent(HandleMessageType2Event) + .Build(); + } + + private void HandleMessageType1Event(IDomainEventEnvelope @event) + { + _logger.LogDebug("Got MessageType1Event with id={} and value={}", @event.EventId, + @event.Event.ToString()); + DateTime receivedTime = DateTime.Now; + if (receivedTime < Type1FirstMessage) + Type1FirstMessage = receivedTime; + if (receivedTime > Type1LastMessage) + Type1LastMessage = receivedTime; + Type1MessageCount++; + ReceivedType1Messages?.Add(@event); + + if (@event.Event.Name.Equals("ThrowException") && ExceptionCount < 5) + { + ExceptionCount++; + throw (new Exception()); + } + } + + private void HandleMessageType2Event(IDomainEventEnvelope @event) + { + _logger.LogDebug("Got message MessageType2Event with id={} and value={}", @event.EventId, + @event.Event.ToString()); + DateTime receivedTime = DateTime.Now; + if (receivedTime < Type2FirstMessage) + Type2FirstMessage = receivedTime; + if (receivedTime > Type2LastMessage) + Type2LastMessage = receivedTime; + Type2MessageCount++; + ReceivedType2Messages?.Add(@event); + } + } +} diff --git a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestHostBuilder.cs b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestHostBuilder.cs new file mode 100644 index 0000000..3903fe9 --- /dev/null +++ b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestHostBuilder.cs @@ -0,0 +1,87 @@ +using System; +using IO.Eventuate.Tram.Database; +using IO.Eventuate.Tram.Events.Subscriber; +using IO.Eventuate.Tram.Messaging.Common; +using IO.Eventuate.Tram.Messaging.Consumer.Kafka; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +namespace IO.Eventuate.Tram.IntegrationTests.TestHelpers +{ + public class TestHostBuilder + { + private String _sqlConnectionString = null; + private String _eventuateDatabaseSchemaName = null; + private String _kafkaBootstrapServers = null; + private String _subscriberId = null; + private Func _domainEventHandlersFactory = null; + private EventuateKafkaConsumerConfigurationProperties _consumerConfigProperties = EventuateKafkaConsumerConfigurationProperties.Empty(); + + private IHost _host; + + public TestHostBuilder SetConnectionString(String sqlConnectionString) + { + this._sqlConnectionString = sqlConnectionString; + return this; + } + + public TestHostBuilder SetEventuateDatabaseSchemaName(String eventuateDatabaseSchemaName) + { + this._eventuateDatabaseSchemaName = eventuateDatabaseSchemaName; + return this; + } + + public TestHostBuilder SetKafkaBootstrapServers(String kafkaBootstrapServers) + { + this._kafkaBootstrapServers = kafkaBootstrapServers; + return this; + } + + public TestHostBuilder SetSubscriberId(String subscriberId) + { + this._subscriberId = subscriberId; + return this; + } + + public TestHostBuilder SetDomainEventHandlersFactory( + Func domainEventHandlersFactory) + { + this._domainEventHandlersFactory = domainEventHandlersFactory; + return this; + } + + public TestHostBuilder SetConsumerConfigProperties(EventuateKafkaConsumerConfigurationProperties consumerConfigProperties) + { + this._consumerConfigProperties = consumerConfigProperties; + return this; + } + + + public IHost Build(bool withInterceptor) where TConsumerType : class + { + _host = new HostBuilder() + .ConfigureServices((hostContext, services) => + { + services.AddDbContext((provider, o) => + { + o.UseSqlServer(_sqlConnectionString); + }); + services.AddEventuateTramSqlKafkaTransport(_eventuateDatabaseSchemaName, _kafkaBootstrapServers, EventuateKafkaConsumerConfigurationProperties.Empty()); + if (withInterceptor) + { + services.AddSingleton(new TestMessageInterceptor()); + } + + // Publisher Setup + services.AddEventuateTramEventsPublisher(); + + // Consumer Setup + services.AddSingleton(); + services.AddEventuateTramDomainEventDispatcher(_subscriberId, _domainEventHandlersFactory); + }) + .Build(); + return _host; + } + } +} diff --git a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageInterceptor.cs b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageInterceptor.cs new file mode 100644 index 0000000..41b4c17 --- /dev/null +++ b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageInterceptor.cs @@ -0,0 +1,69 @@ +using System; +using System.Collections.Generic; +using System.Text; +using IO.Eventuate.Tram.Messaging.Common; +using IO.Eventuate.Tram.Messaging.Consumer.Kafka; +using NUnit.Framework; + +namespace IO.Eventuate.Tram.IntegrationTests.TestHelpers +{ + public class TestMessageInterceptor : IMessageInterceptor + { + public int PreSendCount; + public int PostSendCount; + public int PreReceiveCount; + public int PreHandleCount; + public int PostHandleCount; + public int PostReceiveCount; + + public void Reset() + { + PreSendCount = 0; + PostSendCount = 0; + PreReceiveCount = 0; + PostReceiveCount = 0; + PreHandleCount = 0; + PostHandleCount = 0; + } + + public void AssertCounts(int preSend, int postSend, int preReceive, int postReceive, int preHandle, int postHandle) + { + Assert.AreEqual(preSend, PreSendCount, $"Message Interceptor PreSendCount value should be {preSend}"); + Assert.AreEqual(postSend, PostSendCount, $"Message Interceptor PostSendCount value should be {postSend}"); + Assert.AreEqual(preReceive, PreReceiveCount, $"Message Interceptor PreReceiveCount value should be {preReceive}"); + Assert.AreEqual(postReceive, PostReceiveCount, $"Message Interceptor PostReceiveCount value should be {postReceive}"); + Assert.AreEqual(preHandle, PreHandleCount, $"Message Interceptor PreHandleCount value should be {preHandle}"); + Assert.AreEqual(postHandle, PostHandleCount, $"Message Interceptor PostHandleCount value should be {postHandle}"); + } + + public void PreSend(IMessage message) + { + PreSendCount++; + } + + public void PostSend(IMessage message, Exception e) + { + PostSendCount++; + } + + public void PreReceive(IMessage message) + { + PreReceiveCount++; + } + + public void PreHandle(string subscriberId, IMessage message) + { + PreHandleCount++; + } + + public void PostHandle(string subscriberId, IMessage message, Exception e) + { + PostHandleCount++; + } + + public void PostReceive(IMessage message) + { + PostReceiveCount++; + } + } +} diff --git a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType1.cs b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType1.cs new file mode 100644 index 0000000..9984612 --- /dev/null +++ b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType1.cs @@ -0,0 +1,30 @@ +using System; +using IO.Eventuate.Tram.Events.Common; +using IO.Eventuate.Tram.Events.Subscriber; +using IO.Eventuate.Tram.Messaging.Common; +using NUnit.Framework; + +namespace IO.Eventuate.Tram.IntegrationTests.TestHelpers +{ + public class TestMessageType1 : IDomainEvent + { + public String Name { get; set; } + public int Value { get; set; } + public double Number { get; set; } + + public TestMessageType1(String name, int value, double number) + { + this.Name = name; + this.Value = value; + this.Number = number; + } + + public void AssertGoodMessageReceived(IDomainEventEnvelope receivedMessage) + { + Assert.True(receivedMessage.Message.HasHeader(MessageHeaders.Id), "Message ID is in the header"); + Assert.AreEqual(Name, receivedMessage.Event.Name, "Message Name is the same"); + Assert.AreEqual(Value, receivedMessage.Event.Value, "Message Value is the same"); + Assert.AreEqual(Number, receivedMessage.Event.Number, "Message Number is the same"); + } + } +} diff --git a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType2.cs b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType2.cs new file mode 100644 index 0000000..d3fb6dc --- /dev/null +++ b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType2.cs @@ -0,0 +1,29 @@ +using System; +using IO.Eventuate.Tram.Events.Common; +using IO.Eventuate.Tram.Events.Subscriber; +using IO.Eventuate.Tram.Messaging.Common; +using NUnit.Framework; + +namespace IO.Eventuate.Tram.IntegrationTests.TestHelpers +{ + [EventType(EventTypeName)] + public class TestMessageType2 : IDomainEvent + { + public const string EventTypeName = "testing.TestMessageType2"; + + public String Name { get; set; } + public int Value { get; set; } + public TestMessageType2(String name, int value) + { + this.Name = name; + this.Value = value; + } + + public void AssertGoodMessageReceived(IDomainEventEnvelope receivedMessage) + { + Assert.True(receivedMessage.Message.HasHeader(MessageHeaders.Id), "Message ID is in the header"); + Assert.AreEqual(Name, receivedMessage.Event.Name, "Message Name is the same"); + Assert.AreEqual(Value, receivedMessage.Event.Value, "Message Value is the same"); + } + } +} diff --git a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType3.cs b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType3.cs new file mode 100644 index 0000000..f1bf079 --- /dev/null +++ b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType3.cs @@ -0,0 +1,14 @@ +using System; +using IO.Eventuate.Tram.Events.Common; + +namespace IO.Eventuate.Tram.IntegrationTests.TestHelpers +{ + public class TestMessageType3 : IDomainEvent + { + public String Name { get; set; } + public TestMessageType3(String name) + { + this.Name = name; + } + } +} diff --git a/IO.Eventuate.Tram.IntegrationTests/TestSettings.cs b/IO.Eventuate.Tram.IntegrationTests/TestSettings.cs new file mode 100644 index 0000000..cf9840a --- /dev/null +++ b/IO.Eventuate.Tram.IntegrationTests/TestSettings.cs @@ -0,0 +1,26 @@ +namespace IO.Eventuate.Tram.IntegrationTests +{ + /// + /// Test configuration settings + /// + public class TestSettings + { + public string KafkaBootstrapServers { get; set; } = "kafka:9092"; + /// + /// Database connection strings + /// + public ConnectionStrings ConnectionStrings { get; set; } = new ConnectionStrings(); + } + + /// + /// Set of database connections + /// + public class ConnectionStrings + { + /// + /// Eventuate Tram database connection string + /// + public string EventuateTramDbConnection { get; set; } = "Server=mssql,1433;Database=TramDb;User Id=sa;Password=TestPa$$word"; + + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram.IntegrationTests/docker-compose.yml b/IO.Eventuate.Tram.IntegrationTests/docker-compose.yml new file mode 100644 index 0000000..559919a --- /dev/null +++ b/IO.Eventuate.Tram.IntegrationTests/docker-compose.yml @@ -0,0 +1,97 @@ +version: '3.3' +services: + dbsetup: + depends_on: + - mssql + build: + context: . + dockerfile: Dockerfile-dbsetup + environment: + TRAM_DB_SERVER: "mssql" + TRAM_SA_PASSWORD: "TestPa$$word" + TRAM_DB: "TramDb" + TRAM_SCHEMA: "eventuate" + TRAM_SCHEMA2: "schema1" + eventuatetramtests: + depends_on: + - zookeeper + - kafka + - mssql + - cdcservice1 + - cdcservice2 + build: . + environment: + KafkaBootstrapServers: "kafka:29092" + ConnectionStrings__EventuateTramDbConnection: "Server=mssql;Database=TramDb;User Id=sa;Password=TestPa$$word" + volumes: + - ./bin/Release/netcoreapp2.1/publish:/app + zookeeper: + image: confluentinc/cp-zookeeper:latest + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + kafka: + image: confluentinc/cp-kafka:latest + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + cdcservice1: + image: ${CDC_SERVICE_DOCKER_IMAGE}:${CDC_SERVICE_DOCKER_VERSION} + depends_on: + - kafka + - mssql + environment: + SPRING_DATASOURCE_URL: jdbc:sqlserver://mssql;databaseName=TramDb + SPRING_DATASOURCE_USERNAME: sa + SPRING_DATASOURCE_PASSWORD: TestPa$$word + SPRING_DATASOURCE_TEST_ON_BORROW: "true" + SPRING_DATASOURCE_VALIDATION_QUERY: SELECT 1 + SPRING_DATASOURCE_DRIVER_CLASS_NAME: com.microsoft.sqlserver.jdbc.SQLServerDriver + + SPRING_PROFILES_ACTIVE: EventuatePolling + + EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS: kafka:29092 + EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING: zookeeper:2181 + + EVENTUATE_DATABASE_SCHEMA: eventuate + + EVENTUATELOCAL_CDC_POLLING_INTERVAL_IN_MILLISECONDS: 500 + EVENTUATELOCAL_CDC_MAX_EVENTS_PER_POLLING: 1000 + EVENTUATELOCAL_CDC_MAX_ATTEMPTS_FOR_POLLING: 100 + EVENTUATELOCAL_CDC_POLLING_RETRY_INTERVAL_IN_MILLISECONDS: 500 + cdcservice2: + image: ${CDC_SERVICE_DOCKER_IMAGE}:${CDC_SERVICE_DOCKER_VERSION} + depends_on: + - kafka + - mssql + environment: + SPRING_DATASOURCE_URL: jdbc:sqlserver://mssql;databaseName=TramDb + SPRING_DATASOURCE_USERNAME: sa + SPRING_DATASOURCE_PASSWORD: TestPa$$word + SPRING_DATASOURCE_TEST_ON_BORROW: "true" + SPRING_DATASOURCE_VALIDATION_QUERY: SELECT 1 + SPRING_DATASOURCE_DRIVER_CLASS_NAME: com.microsoft.sqlserver.jdbc.SQLServerDriver + + SPRING_PROFILES_ACTIVE: EventuatePolling + + EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS: kafka:29092 + EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING: zookeeper:2181 + + EVENTUATE_DATABASE_SCHEMA: schema1 + + EVENTUATELOCAL_CDC_POLLING_INTERVAL_IN_MILLISECONDS: 500 + EVENTUATELOCAL_CDC_MAX_EVENTS_PER_POLLING: 1000 + EVENTUATELOCAL_CDC_MAX_ATTEMPTS_FOR_POLLING: 100 + EVENTUATELOCAL_CDC_POLLING_RETRY_INTERVAL_IN_MILLISECONDS: 500 + mssql: + image: microsoft/mssql-server-linux:2017-latest + environment: + SA_PASSWORD: "TestPa$$word" + ACCEPT_EULA: "Y" + MSSQL_MEMORY_LIMIT_MB: "500" diff --git a/IO.Eventuate.Tram.IntegrationTests/nlog.config b/IO.Eventuate.Tram.IntegrationTests/nlog.config new file mode 100644 index 0000000..f68af0f --- /dev/null +++ b/IO.Eventuate.Tram.IntegrationTests/nlog.config @@ -0,0 +1,32 @@ + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/IO.Eventuate.Tram.IntegrationTests/testsettings.json b/IO.Eventuate.Tram.IntegrationTests/testsettings.json new file mode 100644 index 0000000..f7d625f --- /dev/null +++ b/IO.Eventuate.Tram.IntegrationTests/testsettings.json @@ -0,0 +1,6 @@ +{ + "KafkaBootstrapServers": "kafka:9092", + "ConnectionStrings": { + "EventuateTramDbConnection": "Server=mssql,1433;Database=TramDb;User Id=sa;Password=TestPa$$word" + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram.UnitTests/Events/Subscriber/DomainEventDispatcherTests.cs b/IO.Eventuate.Tram.UnitTests/Events/Subscriber/DomainEventDispatcherTests.cs new file mode 100644 index 0000000..88d4f1c --- /dev/null +++ b/IO.Eventuate.Tram.UnitTests/Events/Subscriber/DomainEventDispatcherTests.cs @@ -0,0 +1,76 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using IO.Eventuate.Tram.Events.Common; +using IO.Eventuate.Tram.Events.Publisher; +using IO.Eventuate.Tram.Events.Subscriber; +using IO.Eventuate.Tram.Messaging.Common; +using IO.Eventuate.Tram.Messaging.Consumer; +using Microsoft.Extensions.Logging; +using NSubstitute; +using NUnit.Framework; + +namespace IO.Eventuate.Tram.UnitTests.Events.Subscriber +{ + public class DomainEventDispatcherTests + { + private String _subscriberId; + private static String aggregateType = "AggregateType"; + + private String aggregateId = "xyz"; + private String messageId = "message-" + DateTime.Now; + + public class MyTarget + { + public ConcurrentQueue> Queue = new ConcurrentQueue>(); + + public DomainEventHandlers domainEventHandlers() + { + return DomainEventHandlersBuilder + .ForAggregateType(aggregateType) + .OnEvent(HandleAccountDebited) + .Build(); + } + + internal void HandleAccountDebited(IDomainEventEnvelope message) + { + Queue.Enqueue(message); + } + + } + + public class MyDomainEvent : IDomainEvent + { + } + + [Test] + public void MessageHandler_ValidMessage_RegisteredHandlerCalled() + { + // Arrange + MyTarget target = new MyTarget(); + + var messageConsumer = Substitute.For(); + var serviceProvider = Substitute.For(); + var logger = Substitute.For>(); + var eventTypeNamingStrategy = Substitute.For(); + eventTypeNamingStrategy.GetEventTypeName(typeof(MyDomainEvent)).Returns(typeof(MyDomainEvent).FullName); + + DomainEventDispatcher dispatcher = new DomainEventDispatcher( + _subscriberId, target.domainEventHandlers(), messageConsumer, eventTypeNamingStrategy, logger); + + dispatcher.Initialize(); + + // Act + dispatcher.MessageHandler(DomainEventPublisher.MakeMessageForDomainEvent(aggregateType, + aggregateId, new Dictionary() {{ MessageHeaders.Id, messageId } }, + new MyDomainEvent(), eventTypeNamingStrategy), serviceProvider); + + // Assert + Assert.True(target.Queue.TryPeek(out var dee)); + Assert.NotNull(dee); + Assert.AreEqual(aggregateId, dee.AggregateId); + Assert.AreEqual(aggregateType, dee.AggregateType); + Assert.AreEqual(messageId, dee.EventId); + } + } +} diff --git a/IO.Eventuate.Tram.UnitTests/IO.Eventuate.Tram.UnitTests.csproj b/IO.Eventuate.Tram.UnitTests/IO.Eventuate.Tram.UnitTests.csproj new file mode 100644 index 0000000..acfd4d7 --- /dev/null +++ b/IO.Eventuate.Tram.UnitTests/IO.Eventuate.Tram.UnitTests.csproj @@ -0,0 +1,20 @@ + + + + netcoreapp2.1 + + false + + + + + + + + + + + + + + diff --git a/IO.Eventuate.Tram.UnitTests/Messaging/Producer/AbstractMessageProducerTests.cs b/IO.Eventuate.Tram.UnitTests/Messaging/Producer/AbstractMessageProducerTests.cs new file mode 100644 index 0000000..11addcf --- /dev/null +++ b/IO.Eventuate.Tram.UnitTests/Messaging/Producer/AbstractMessageProducerTests.cs @@ -0,0 +1,54 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Text; +using IO.Eventuate.Tram.Messaging.Common; +using IO.Eventuate.Tram.Messaging.Producer; +using Microsoft.Extensions.Logging; +using NSubstitute; +using NUnit.Framework; + +namespace IO.Eventuate.Tram.UnitTests.Messaging.Producer +{ + public class AbstractMessageProducerTests + { + + public class MyMessageProducer : AbstractMessageProducer + { + public MyMessageProducer(IEnumerable messageInterceptors, + ILogger logger) + : base(messageInterceptors, logger) + { + + } + + public void Send(string destination, IMessage message, IMessageSender ms) + { + SendMessage("id", destination, message, ms); + } + + + } + + [Test] + public void Send_SimpleMessage_MessageHeadersAreApplied() + { + // Arrange + Message sendMessage = null; + + var ms = Substitute.For(); + ms.Send(Arg.Do(arg => sendMessage = arg)); + + // Act + MyMessageProducer mp = new MyMessageProducer(new List(), + Substitute.For()); + mp.Send("Destination", MessageBuilder.WithPayload("x").Build(), ms); + + // Assert + Assert.NotNull(sendMessage); + Assert.NotNull(sendMessage.GetRequiredHeader(MessageHeaders.Id)); + Assert.NotNull(sendMessage.GetRequiredHeader(MessageHeaders.Destination)); + Assert.NotNull(sendMessage.GetRequiredHeader(MessageHeaders.Date)); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram.UnitTests/Messaging/Producer/HttpDateHeaderFormatUtilTests.cs b/IO.Eventuate.Tram.UnitTests/Messaging/Producer/HttpDateHeaderFormatUtilTests.cs new file mode 100644 index 0000000..6f0e3d2 --- /dev/null +++ b/IO.Eventuate.Tram.UnitTests/Messaging/Producer/HttpDateHeaderFormatUtilTests.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Text; +using IO.Eventuate.Tram.Messaging.Producer; +using NUnit.Framework; + +namespace IO.Eventuate.Tram.UnitTests.Messaging.Producer +{ + public class HttpDateHeaderFormatUtilTests + { + [Test] + public void NowAsHttpDateString_GetResult_NotNull() + { + Assert.NotNull((HttpDateHeaderFormatUtil.NowAsHttpDateString())); + } + } +} diff --git a/IO.Eventuate.Tram.sln b/IO.Eventuate.Tram.sln new file mode 100644 index 0000000..e99828f --- /dev/null +++ b/IO.Eventuate.Tram.sln @@ -0,0 +1,40 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 2012 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "IO.Eventuate.Tram", "IO.Eventuate.Tram\IO.Eventuate.Tram.csproj", "{96F3A919-06F2-4064-B8B7-882E2694E989}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{2ABE527D-03E2-4126-8392-6ABAEC94EEF8}" + ProjectSection(SolutionItems) = preProject + README.md = README.md + EndProjectSection +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "IO.Eventuate.Tram.UnitTests", "IO.Eventuate.Tram.UnitTests\IO.Eventuate.Tram.UnitTests.csproj", "{E64ED890-5DE8-49B1-9585-A48EC1AB8337}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "IO.Eventuate.Tram.IntegrationTests", "IO.Eventuate.Tram.IntegrationTests\IO.Eventuate.Tram.IntegrationTests.csproj", "{2E184839-4872-45CD-8542-C0BD91932C9A}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {96F3A919-06F2-4064-B8B7-882E2694E989}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {96F3A919-06F2-4064-B8B7-882E2694E989}.Debug|Any CPU.Build.0 = Debug|Any CPU + {96F3A919-06F2-4064-B8B7-882E2694E989}.Release|Any CPU.ActiveCfg = Release|Any CPU + {96F3A919-06F2-4064-B8B7-882E2694E989}.Release|Any CPU.Build.0 = Release|Any CPU + {E64ED890-5DE8-49B1-9585-A48EC1AB8337}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E64ED890-5DE8-49B1-9585-A48EC1AB8337}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E64ED890-5DE8-49B1-9585-A48EC1AB8337}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E64ED890-5DE8-49B1-9585-A48EC1AB8337}.Release|Any CPU.Build.0 = Release|Any CPU + {2E184839-4872-45CD-8542-C0BD91932C9A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {2E184839-4872-45CD-8542-C0BD91932C9A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2E184839-4872-45CD-8542-C0BD91932C9A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {2E184839-4872-45CD-8542-C0BD91932C9A}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {5EAB0342-E46E-4FC6-84F3-BBF24D40DC61} + EndGlobalSection +EndGlobal diff --git a/IO.Eventuate.Tram/Database/EventuateSchema.cs b/IO.Eventuate.Tram/Database/EventuateSchema.cs new file mode 100644 index 0000000..5c4b7cc --- /dev/null +++ b/IO.Eventuate.Tram/Database/EventuateSchema.cs @@ -0,0 +1,19 @@ +using System; + +namespace IO.Eventuate.Tram.Database +{ + public class EventuateSchema + { + public const string DefaultSchema = "eventuate"; + + public EventuateSchema() { + EventuateDatabaseSchema = DefaultSchema; + } + + public EventuateSchema(string eventuateDatabaseSchema) { + EventuateDatabaseSchema = String.IsNullOrWhiteSpace(eventuateDatabaseSchema) ? DefaultSchema : eventuateDatabaseSchema; + } + + public string EventuateDatabaseSchema { get; } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Database/EventuateTramDbContext.cs b/IO.Eventuate.Tram/Database/EventuateTramDbContext.cs new file mode 100644 index 0000000..9060551 --- /dev/null +++ b/IO.Eventuate.Tram/Database/EventuateTramDbContext.cs @@ -0,0 +1,83 @@ +using IO.Eventuate.Tram.Messaging.Consumer.Kafka; +using IO.Eventuate.Tram.Messaging.Producer.Outbox; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Metadata.Builders; + +namespace IO.Eventuate.Tram.Database +{ + /// + /// The database context for the eventuate tracking tables. + /// + public class EventuateTramDbContext : DbContext + { + private readonly EventuateSchema _eventuateSchema; + + /// + /// Default constructor + /// + public EventuateTramDbContext() + { + + } + + /// + /// Create the context and specify the schema for the eventuate tables + /// + /// Database context options for the base DbContext + /// Name for the schema to add the eventuate tables to + public EventuateTramDbContext(DbContextOptions options, EventuateSchema eventuateSchema) : base(options) + { + _eventuateSchema = eventuateSchema; + } + + /// + /// Table to hold published messages to get sent to the messaging system by CDC + /// + public DbSet Messages { get; set; } + + /// + /// Table to track which messages have been processed for subscribers + /// + public DbSet ReceivedMessages { get; set; } + + /// + /// Override to get the tables created. + /// + /// DbContext build object + protected override void OnModelCreating(ModelBuilder builder) + { + builder.HasDefaultSchema(_eventuateSchema.EventuateDatabaseSchema); + builder.Entity(ConfigureMessage); + builder.Entity(ConfigureReceivedMessage); + } + + private void ConfigureMessage(EntityTypeBuilder builder) + { + builder.ToTable("message"); + + builder.HasKey(m => m.Id); + + builder.Property(m => m.Id); + + builder.Property(m => m.Destination).IsRequired(); + + builder.Property(m => m.Headers).IsRequired(); + + builder.Property(m => m.Payload).IsRequired(); + + builder.Property(m => m.Published) + .HasDefaultValue(0); + } + + private void ConfigureReceivedMessage(EntityTypeBuilder builder) + { + builder.ToTable("received_messages"); + + builder.HasKey(rm => new {rm.ConsumerId, rm.MessageId}); + + builder.Property(rm => rm.ConsumerId).HasColumnName("consumer_id").IsRequired(); + + builder.Property(rm => rm.MessageId).HasColumnName("message_id").IsRequired(); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Events/Common/AttributeEventTypeNamingStrategy.cs b/IO.Eventuate.Tram/Events/Common/AttributeEventTypeNamingStrategy.cs new file mode 100644 index 0000000..37fcdfc --- /dev/null +++ b/IO.Eventuate.Tram/Events/Common/AttributeEventTypeNamingStrategy.cs @@ -0,0 +1,22 @@ +using System; +using System.Reflection; + +namespace IO.Eventuate.Tram.Events.Common +{ + /// + /// Uses the EventTypeAttribute if present to determine the event type name to use in the message header + /// for a particular type of domain event. Otherwise, falls back to use the the full name of the type. + /// + public class AttributeEventTypeNamingStrategy : IEventTypeNamingStrategy + { + /// + public string GetEventTypeName(Type eventType) + { + // Get event type attribute + var eventTypeAttribute = eventType.GetCustomAttribute(); + + // Use event type from attribute, if it exists; otherwise, use type full name + return eventTypeAttribute != null ? eventTypeAttribute.EventType : eventType.FullName; + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Events/Common/EventMessageHeaders.cs b/IO.Eventuate.Tram/Events/Common/EventMessageHeaders.cs new file mode 100644 index 0000000..d923f2d --- /dev/null +++ b/IO.Eventuate.Tram/Events/Common/EventMessageHeaders.cs @@ -0,0 +1,9 @@ +namespace IO.Eventuate.Tram.Events.Common +{ + public static class EventMessageHeaders + { + public const string EventType = "event-type"; + public const string AggregateType = "event-aggregate-type"; + public const string AggregateId = "event-aggregate-id"; + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Events/Common/EventTypeAttribute.cs b/IO.Eventuate.Tram/Events/Common/EventTypeAttribute.cs new file mode 100644 index 0000000..3cd2702 --- /dev/null +++ b/IO.Eventuate.Tram/Events/Common/EventTypeAttribute.cs @@ -0,0 +1,25 @@ +using System; + +namespace IO.Eventuate.Tram.Events.Common +{ + /// + /// Allows overriding the event type used in the header of published domain event messages + /// + [AttributeUsage(AttributeTargets.Class, Inherited = false)] + public class EventTypeAttribute : Attribute + { + /// + /// Event type name to use + /// + public string EventType { get; } + + /// + /// Overrides the event type name used in the header of published domain event messages + /// + /// Event type name to use + public EventTypeAttribute(string eventType) + { + EventType = eventType; + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Events/Common/IDomainEvent.cs b/IO.Eventuate.Tram/Events/Common/IDomainEvent.cs new file mode 100644 index 0000000..a12d05a --- /dev/null +++ b/IO.Eventuate.Tram/Events/Common/IDomainEvent.cs @@ -0,0 +1,10 @@ +namespace IO.Eventuate.Tram.Events.Common +{ + /// + /// Identifies a domain event type. + /// + public interface IDomainEvent + { + + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Events/Common/IEventTypeNamingStrategy.cs b/IO.Eventuate.Tram/Events/Common/IEventTypeNamingStrategy.cs new file mode 100644 index 0000000..8622c3b --- /dev/null +++ b/IO.Eventuate.Tram/Events/Common/IEventTypeNamingStrategy.cs @@ -0,0 +1,17 @@ +using System; + +namespace IO.Eventuate.Tram.Events.Common +{ + /// + /// Strategy for determining the event type name to use in the message header for a particular type of domain event + /// + public interface IEventTypeNamingStrategy + { + /// + /// Determine the event type to use in the message header for a particular type of domain event. + /// + /// Type of domain event. + /// Event type name for specified domain event type. + string GetEventTypeName(Type eventType); + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Events/Publisher/DomainEventPublisher.cs b/IO.Eventuate.Tram/Events/Publisher/DomainEventPublisher.cs new file mode 100644 index 0000000..4ef0755 --- /dev/null +++ b/IO.Eventuate.Tram/Events/Publisher/DomainEventPublisher.cs @@ -0,0 +1,63 @@ +using System.Collections.Generic; +using IO.Eventuate.Tram.Events.Common; +using IO.Eventuate.Tram.Messaging.Common; +using IO.Eventuate.Tram.Messaging.Producer; +using Microsoft.Extensions.Logging; + +namespace IO.Eventuate.Tram.Events.Publisher +{ + public class DomainEventPublisher : IDomainEventPublisher + { + private readonly ILogger _logger; + private readonly IMessageProducer _messageProducer; + private readonly IEventTypeNamingStrategy _eventTypeNamingStrategy; + + public DomainEventPublisher(IMessageProducer messageProducer, IEventTypeNamingStrategy eventTypeNamingStrategy, + ILogger logger) + { + _messageProducer = messageProducer; + _eventTypeNamingStrategy = eventTypeNamingStrategy; + _logger = logger; + } + + public void Publish(string aggregateType, object aggregateId, IList domainEvents) + { + Publish(aggregateType, aggregateId, new Dictionary(), domainEvents); + } + + public void Publish(string aggregateType, object aggregateId, IDictionary headers, + IList domainEvents) + { + var logContext = $"{nameof(Publish)}, aggregateType='{aggregateType}', aggregateId='{aggregateId}' " + + $"with {headers.Count} headers and {domainEvents.Count} events"; + _logger.LogDebug($"+{logContext}"); + foreach (IDomainEvent domainEvent in domainEvents) + { + _messageProducer.Send(aggregateType, + MakeMessageForDomainEvent(aggregateType, aggregateId, headers, domainEvent, + _eventTypeNamingStrategy)); + } + _logger.LogDebug($"-{logContext}"); + } + + public void Publish(object aggregateId, IList domainEvents) + { + Publish(typeof(TAggregate).FullName, aggregateId, domainEvents); + } + + public static IMessage MakeMessageForDomainEvent(string aggregateType, object aggregateId, + IDictionary headers, IDomainEvent @event, IEventTypeNamingStrategy eventTypeNamingStrategy) + { + string aggregateIdAsString = aggregateId.ToString(); + string eventType = eventTypeNamingStrategy.GetEventTypeName(@event.GetType()); + return MessageBuilder + .WithPayload(JsonMapper.ToJson(@event)) + .WithExtraHeaders("", headers) + .WithHeader(MessageHeaders.PartitionId, aggregateIdAsString) + .WithHeader(EventMessageHeaders.AggregateId, aggregateIdAsString) + .WithHeader(EventMessageHeaders.AggregateType, aggregateType) + .WithHeader(EventMessageHeaders.EventType, eventType) + .Build(); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Events/Publisher/IDomainEventPublisher.cs b/IO.Eventuate.Tram/Events/Publisher/IDomainEventPublisher.cs new file mode 100644 index 0000000..3620de1 --- /dev/null +++ b/IO.Eventuate.Tram/Events/Publisher/IDomainEventPublisher.cs @@ -0,0 +1,12 @@ +using System.Collections.Generic; +using IO.Eventuate.Tram.Events.Common; + +namespace IO.Eventuate.Tram.Events.Publisher +{ + public interface IDomainEventPublisher + { + void Publish(string aggregateType, object aggregateId, IList domainEvents); + void Publish(string aggregateType, object aggregateId, IDictionary headers, IList domainEvents); + void Publish(object aggregateId, IList domainEvents); + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Events/Subscriber/DomainEventDispatcher.cs b/IO.Eventuate.Tram/Events/Subscriber/DomainEventDispatcher.cs new file mode 100644 index 0000000..7c25f6f --- /dev/null +++ b/IO.Eventuate.Tram/Events/Subscriber/DomainEventDispatcher.cs @@ -0,0 +1,65 @@ +using System; +using IO.Eventuate.Tram.Events.Common; +using IO.Eventuate.Tram.Messaging.Common; +using IO.Eventuate.Tram.Messaging.Consumer; +using Microsoft.Extensions.Logging; + +namespace IO.Eventuate.Tram.Events.Subscriber +{ + public class DomainEventDispatcher + { + private readonly ILogger _logger; + private readonly string _dispatcherContext; + private readonly string _subscriberId; + private readonly DomainEventHandlers _domainEventHandlers; + private readonly IMessageConsumer _messageConsumer; + private readonly IEventTypeNamingStrategy _eventTypeNamingStrategy; + + public DomainEventDispatcher(string subscriberId, DomainEventHandlers domainEventHandlers, + IMessageConsumer messageConsumer, IEventTypeNamingStrategy eventTypeNamingStrategy, + ILogger logger) + { + _subscriberId = subscriberId; + _domainEventHandlers = domainEventHandlers; + _messageConsumer = messageConsumer; + _eventTypeNamingStrategy = eventTypeNamingStrategy; + _logger = logger; + _dispatcherContext = $"SubscriberId='{subscriberId}', " + + $"DomainEventHandlers for'{String.Join(",", domainEventHandlers.GetAggregateTypes())}'"; + } + + public void Initialize() + { + _messageConsumer.Subscribe(_subscriberId, _domainEventHandlers.GetAggregateTypes(), + MessageHandler); + } + + public void MessageHandler(IMessage message, IServiceProvider serviceProvider) + { + var logContext = $"{nameof(MessageHandler)} on {_dispatcherContext}, MessageId={message.Id}"; + _logger.LogDebug($"+{logContext}"); + string aggregateType = message.GetRequiredHeader(EventMessageHeaders.AggregateType); + + DomainEventHandler handler = _domainEventHandlers.FindTargetMethod(message, _eventTypeNamingStrategy); + + if (handler == null) + { + _logger.LogDebug($"{logContext}: No handler found for type='{aggregateType}'"); + return; + } + + var param = (IDomainEvent)JsonMapper.FromJson(message.Payload, handler.EventType); + + Type envelopeType = typeof(DomainEventEnvelope<>).MakeGenericType(handler.EventType); + var envelope = (IDomainEventEnvelope) Activator.CreateInstance(envelopeType, + message, + aggregateType, + message.GetRequiredHeader(EventMessageHeaders.AggregateId), + message.GetRequiredHeader(MessageHeaders.Id), + param); + + handler.Invoke(envelope, serviceProvider); + _logger.LogDebug($"-{logContext}: Processed message of type='{aggregateType}'"); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Events/Subscriber/DomainEventDispatcherInitializer.cs b/IO.Eventuate.Tram/Events/Subscriber/DomainEventDispatcherInitializer.cs new file mode 100644 index 0000000..982ee98 --- /dev/null +++ b/IO.Eventuate.Tram/Events/Subscriber/DomainEventDispatcherInitializer.cs @@ -0,0 +1,36 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; + +namespace IO.Eventuate.Tram.Events.Subscriber +{ + /// + /// Responsible for initializing all the DomainEventDispatcher instances on service startup. + /// In Java this is done with a @PostConstruct annotation on the initialize() method of DomainEventDispatcher + /// + public class DomainEventDispatcherInitializer : IHostedService + { + private readonly IEnumerable _domainEventDispatchers; + + public DomainEventDispatcherInitializer(IEnumerable domainEventDispatchers) + { + _domainEventDispatchers = domainEventDispatchers; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + foreach (DomainEventDispatcher domainEventDispatcher in _domainEventDispatchers) + { + domainEventDispatcher.Initialize(); + } + + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Events/Subscriber/DomainEventEnvelope.cs b/IO.Eventuate.Tram/Events/Subscriber/DomainEventEnvelope.cs new file mode 100644 index 0000000..b4eaaab --- /dev/null +++ b/IO.Eventuate.Tram/Events/Subscriber/DomainEventEnvelope.cs @@ -0,0 +1,27 @@ +using IO.Eventuate.Tram.Events.Common; +using IO.Eventuate.Tram.Messaging.Common; + +namespace IO.Eventuate.Tram.Events.Subscriber +{ + public class DomainEventEnvelope : IDomainEventEnvelope where T : IDomainEvent + { + public DomainEventEnvelope(IMessage message, string aggregateType, string aggregateId, string eventId, T @event) + { + Message = message; + AggregateType = aggregateType; + AggregateId = aggregateId; + EventId = eventId; + Event = @event; + } + + public string AggregateId { get; } + + public IMessage Message { get; } + + public T Event { get; } + + public string AggregateType { get; } + + public string EventId { get; } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Events/Subscriber/DomainEventHandler.cs b/IO.Eventuate.Tram/Events/Subscriber/DomainEventHandler.cs new file mode 100644 index 0000000..27ddc4d --- /dev/null +++ b/IO.Eventuate.Tram/Events/Subscriber/DomainEventHandler.cs @@ -0,0 +1,35 @@ +using System; +using IO.Eventuate.Tram.Events.Common; +using IO.Eventuate.Tram.Messaging.Common; + +namespace IO.Eventuate.Tram.Events.Subscriber +{ + public class DomainEventHandler + { + private readonly Action, IServiceProvider> _handler; + + public DomainEventHandler(string aggregateType, Type eventType, + Action, IServiceProvider> handler) + { + AggregateType = aggregateType; + EventType = eventType; + _handler = handler; + } + + public bool Handles(IMessage message, IEventTypeNamingStrategy eventTypeNamingStrategy) + { + string eventTypeName = eventTypeNamingStrategy.GetEventTypeName(EventType); + return AggregateType.Equals(message.GetRequiredHeader(EventMessageHeaders.AggregateType)) + && String.Equals(eventTypeName, message.GetRequiredHeader(EventMessageHeaders.EventType)); + } + + public void Invoke(IDomainEventEnvelope domainEventEnvelope, IServiceProvider serviceProvider) + { + _handler(domainEventEnvelope, serviceProvider); + } + + public Type EventType { get; } + + public string AggregateType { get; } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Events/Subscriber/DomainEventHandlers.cs b/IO.Eventuate.Tram/Events/Subscriber/DomainEventHandlers.cs new file mode 100644 index 0000000..b5436d1 --- /dev/null +++ b/IO.Eventuate.Tram/Events/Subscriber/DomainEventHandlers.cs @@ -0,0 +1,28 @@ +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using IO.Eventuate.Tram.Events.Common; +using IO.Eventuate.Tram.Messaging.Common; + +namespace IO.Eventuate.Tram.Events.Subscriber +{ + public class DomainEventHandlers + { + private readonly IList _handlers; + + public DomainEventHandlers(IList handlers) + { + _handlers = handlers; + } + + public ISet GetAggregateTypes() + { + return _handlers.Select(h => h.AggregateType).ToImmutableHashSet(); + } + + public DomainEventHandler FindTargetMethod(IMessage message, IEventTypeNamingStrategy eventTypeNamingStrategy) + { + return _handlers.FirstOrDefault(h => h.Handles(message, eventTypeNamingStrategy)); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Events/Subscriber/DomainEventHandlersBuilder.cs b/IO.Eventuate.Tram/Events/Subscriber/DomainEventHandlersBuilder.cs new file mode 100644 index 0000000..8216df6 --- /dev/null +++ b/IO.Eventuate.Tram/Events/Subscriber/DomainEventHandlersBuilder.cs @@ -0,0 +1,46 @@ +using System; +using System.Collections.Generic; +using IO.Eventuate.Tram.Events.Common; +using Microsoft.Extensions.DependencyInjection; + +namespace IO.Eventuate.Tram.Events.Subscriber +{ + public class DomainEventHandlersBuilder + { + private readonly string _aggregateType; + private readonly IList _handlers = new List(); + + public DomainEventHandlersBuilder(string aggregateType) + { + _aggregateType = aggregateType; + } + + public static DomainEventHandlersBuilder ForAggregateType(string aggregateType) + { + return new DomainEventHandlersBuilder(aggregateType); + } + + public DomainEventHandlersBuilder OnEvent(Action> handler) where TEvent : IDomainEvent + { + _handlers.Add(new DomainEventHandler(_aggregateType, typeof(TEvent), (e, p) => handler((IDomainEventEnvelope) e))); + return this; + } + + public DomainEventHandlersBuilder OnEvent() + where TEvent : IDomainEvent + where TEventHandler : IDomainEventHandler + { + _handlers.Add(new DomainEventHandler(_aggregateType, typeof(TEvent), (e, p) => + { + var eventHandler = p.GetRequiredService(); + eventHandler.Handle((IDomainEventEnvelope) e); + })); + return this; + } + + public DomainEventHandlers Build() + { + return new DomainEventHandlers(_handlers); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Events/Subscriber/IDomainEventEnvelope.cs b/IO.Eventuate.Tram/Events/Subscriber/IDomainEventEnvelope.cs new file mode 100644 index 0000000..9841189 --- /dev/null +++ b/IO.Eventuate.Tram/Events/Subscriber/IDomainEventEnvelope.cs @@ -0,0 +1,15 @@ +using IO.Eventuate.Tram.Events.Common; +using IO.Eventuate.Tram.Messaging.Common; + +namespace IO.Eventuate.Tram.Events.Subscriber +{ + public interface IDomainEventEnvelope where T : IDomainEvent + { + string AggregateId { get; } + IMessage Message { get; } + string AggregateType { get; } + string EventId { get; } + + T Event { get; } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Events/Subscriber/IDomainEventHandler.cs b/IO.Eventuate.Tram/Events/Subscriber/IDomainEventHandler.cs new file mode 100644 index 0000000..b0ad672 --- /dev/null +++ b/IO.Eventuate.Tram/Events/Subscriber/IDomainEventHandler.cs @@ -0,0 +1,9 @@ +using IO.Eventuate.Tram.Events.Common; + +namespace IO.Eventuate.Tram.Events.Subscriber +{ + public interface IDomainEventHandler where T : IDomainEvent + { + void Handle(IDomainEventEnvelope @event); + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/EventuateTramServiceCollectionExtensions.cs b/IO.Eventuate.Tram/EventuateTramServiceCollectionExtensions.cs new file mode 100644 index 0000000..f70220b --- /dev/null +++ b/IO.Eventuate.Tram/EventuateTramServiceCollectionExtensions.cs @@ -0,0 +1,87 @@ +using System; +using System.Collections.Generic; +using IO.Eventuate.Tram.Database; +using IO.Eventuate.Tram.Events.Common; +using IO.Eventuate.Tram.Events.Publisher; +using IO.Eventuate.Tram.Events.Subscriber; +using IO.Eventuate.Tram.Messaging.Common; +using IO.Eventuate.Tram.Messaging.Consumer; +using IO.Eventuate.Tram.Messaging.Consumer.Kafka; +using IO.Eventuate.Tram.Messaging.Producer; +using IO.Eventuate.Tram.Messaging.Producer.Outbox; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Logging; + +namespace IO.Eventuate.Tram +{ + public static class EventuateTramServiceCollectionExtensions + { + public static void AddEventuateTramSqlKafkaTransport(this IServiceCollection serviceCollection, + string eventuateDatabaseSchema, string bootstrapServers, EventuateKafkaConsumerConfigurationProperties consumerConfigurationProperties) + { + AddEventuateTramSqlProducer(serviceCollection, eventuateDatabaseSchema); + AddEventuateTramKafkaConsumer(serviceCollection, eventuateDatabaseSchema, bootstrapServers, + consumerConfigurationProperties); + } + + public static void AddEventuateTramEventsPublisher(this IServiceCollection serviceCollection) + { + serviceCollection.TryAddSingleton(); + serviceCollection.TryAddScoped(); + } + + public static void AddEventuateTramDomainEventDispatcher( + this IServiceCollection serviceCollection, string subscriberId, + Func domainEventHandlersFactory) + { + serviceCollection.TryAddSingleton(); + serviceCollection.AddSingleton(provider => + { + var messageConsumer = provider.GetRequiredService(); + var logger = provider.GetRequiredService>(); + var eventTypeNamingStrategy = provider.GetRequiredService(); + + var dispatcher = new DomainEventDispatcher(subscriberId, domainEventHandlersFactory(provider), + messageConsumer, eventTypeNamingStrategy, logger); + + return dispatcher; + }); + } + + public static void AddEventuateTramSqlProducer(this IServiceCollection serviceCollection, + string eventuateDatabaseSchema) + { + AddEventuateTramCommonSqlMessagingServices(serviceCollection, eventuateDatabaseSchema); + serviceCollection.TryAddSingleton(); + serviceCollection.TryAddScoped(); + } + + private static void AddEventuateTramCommonSqlMessagingServices( + this IServiceCollection serviceCollection, string eventuateDatabaseSchema) + { + serviceCollection.TryAddSingleton(provider => new EventuateSchema(eventuateDatabaseSchema)); + } + + public static void AddEventuateTramKafkaConsumer(this IServiceCollection serviceCollection, + string eventuateDatabaseSchema, string bootstrapServers, + EventuateKafkaConsumerConfigurationProperties consumerConfigurationProperties) + { + AddEventuateTramCommonSqlMessagingServices(serviceCollection, eventuateDatabaseSchema); + serviceCollection.TryAddScoped(); + serviceCollection.TryAddSingleton(provider => + { + var loggerFactory = provider.GetRequiredService(); + var serviceScopeFactory = provider.GetRequiredService(); + IEnumerable messageInterceptors = provider.GetServices(); + + IMessageConsumer messageConsumer = new KafkaMessageConsumer(bootstrapServers, + consumerConfigurationProperties, messageInterceptors, + loggerFactory, serviceScopeFactory); + + return messageConsumer; + }); + serviceCollection.AddHostedService(); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/IIdGenerator.cs b/IO.Eventuate.Tram/IIdGenerator.cs new file mode 100644 index 0000000..884bf40 --- /dev/null +++ b/IO.Eventuate.Tram/IIdGenerator.cs @@ -0,0 +1,7 @@ +namespace IO.Eventuate.Tram +{ + public interface IIdGenerator + { + Int128 GenId(); + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/IO.Eventuate.Tram.csproj b/IO.Eventuate.Tram/IO.Eventuate.Tram.csproj new file mode 100644 index 0000000..af5ac76 --- /dev/null +++ b/IO.Eventuate.Tram/IO.Eventuate.Tram.csproj @@ -0,0 +1,17 @@ + + + + netstandard2.0 + 0.1.5 + local + + + + + + + + + + + diff --git a/IO.Eventuate.Tram/IdGenerator.cs b/IO.Eventuate.Tram/IdGenerator.cs new file mode 100644 index 0000000..04b8774 --- /dev/null +++ b/IO.Eventuate.Tram/IdGenerator.cs @@ -0,0 +1,136 @@ +using System; +using System.Linq; +using System.Net.NetworkInformation; +using System.Threading; +using Microsoft.Extensions.Logging; + +namespace IO.Eventuate.Tram +{ + /// + /// Generates unique IDs for messages based on the MAC address, + /// Unix 1 ms ticks, and an index. + /// Note that multiple applications on the same hardware (same MAC) + /// have a small opportunity for collision if IDs are generated in + /// the same 1 ms tick. + /// + public class IdGenerator : IIdGenerator + { + private readonly ILogger _logger; + private const long MaxCounter = 1 << 16; + + private readonly object _lockObject = new object(); + private readonly long _macAddress; + private long _currentPeriod = TimeNow(); + private long _counter; + + /// + /// Construct the ID generator. Reads the MAC address of the first NIC. + /// + /// Logger + public IdGenerator(ILogger logger) + { + _logger = logger; + var logContext = $"{nameof(IdGenerator)} constructor"; + _logger.LogDebug($"+{logContext}"); + NetworkInterface[] interfaces = NetworkInterface.GetAllNetworkInterfaces(); + + long macAddress = interfaces.Select(i => + { + PhysicalAddress address = i.GetPhysicalAddress(); + byte[] macAddressBytes = address.GetAddressBytes(); + + // If the address doesn't have the expected length of 6, return 0 to skip to the next interface + if (macAddressBytes.Length != 6) + { + _logger.LogTrace($"{logContext}: skipping MAC address {address}."); + return 0L; + } + + return ToLong(macAddressBytes); + }) + .FirstOrDefault(addressAsLong => addressAsLong != 0L); + + if (macAddress == default(long)) + { + _logger.LogError($"{logContext}: cannot find MAC address"); + throw new InvalidOperationException("Cannot find mac address"); + } + + _macAddress = macAddress; + _logger.LogDebug($"{logContext}: Mac address {_macAddress}"); + } + + /// + /// Use the same tick counter as the Java implementation + /// since CDC sorts by ID. + /// + private static long TimeNow() + { + return DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); ; + } + + /// + /// Creates a long value from a byte array by left shifting the array byte by byte into the long variable. + /// For byte arrays with length less than 8 (e.g. 6), the least significant bytes of the long value will be populated, + /// leaving the most significant bytes as 0. + /// + /// Input byte array. + /// The long value. + /// Thrown if the input byte array exceeds the maximum length for a long value. + private static long ToLong(byte[] bytes) + { + if (bytes.Length > 8) + { + throw new ArgumentException("Input byte array exceeds maximum length for a long value.", nameof(bytes)); + } + + long result = 0L; + foreach (byte b in bytes) + { + result = (result << 8) + b; + } + + return result; + } + + private Int128 MakeId() + { + return new Int128(_currentPeriod, (_macAddress << 16) + _counter); + } + + private Int128 GenIdInternal() + { + var logContext = $"{nameof(GenIdInternal)}"; + _logger.LogDebug($"+{logContext}"); + long now = TimeNow(); + if (_currentPeriod != now || _counter == MaxCounter) + { + _logger.LogInformation($"{logContext}: Need to delay to reset the counter"); + long oldPeriod = _currentPeriod; + while ((_currentPeriod = TimeNow()) <= oldPeriod) + { + // Just do nothing + Thread.Sleep(1); + } + _counter = 0; + } + + Int128 id = MakeId(); + _counter = _counter + 1; + _logger.LogDebug($"-{logContext}: returning id={id}, _counter={_counter}"); + return id; + } + + /// + /// Generate a 128 bit unique identifier. + /// + /// + public Int128 GenId() + { + lock (_lockObject) + { + return GenIdInternal(); + } + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Int128.cs b/IO.Eventuate.Tram/Int128.cs new file mode 100644 index 0000000..e1f5507 --- /dev/null +++ b/IO.Eventuate.Tram/Int128.cs @@ -0,0 +1,89 @@ +using System; +using System.Globalization; + +namespace IO.Eventuate.Tram +{ + public class Int128 : IComparable, IComparable + { + private readonly long _hi; + private readonly long _lo; + + public Int128(long hi, long lo) { + this._hi = hi; + this._lo = lo; + } + + public string AsString() { + return $"{_hi:x16}-{_lo:x16}"; + } + + public override string ToString() { + return "Int128{" + AsString() + '}'; + } + + protected bool Equals(Int128 other) + { + return _hi == other._hi && _lo == other._lo; + } + + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) + { + return false; + } + + if (ReferenceEquals(this, obj)) + { + return true; + } + + if (obj.GetType() != GetType()) + { + return false; + } + + return Equals((Int128) obj); + } + + public override int GetHashCode() + { + unchecked + { + return (_hi.GetHashCode() * 397) ^ _lo.GetHashCode(); + } + } + + public static Int128 FromString(string str) { + string[] s = str.Split('-'); + if (s.Length != 2) + { + throw new ArgumentException("Should have length of 2: " + str); + } + + // NumberStyles.HexNumber does not allow leading sign so it is equivalent + // to the Java version uses parseUnsignedLong. + return new Int128(Int64.Parse(s[0], NumberStyles.HexNumber), Int64.Parse(s[1], NumberStyles.HexNumber)); + } + + public long Hi => _hi; + + public long Lo => _lo; + + public int CompareTo(Int128 other) + { + if (ReferenceEquals(this, other)) return 0; + if (ReferenceEquals(null, other)) return 1; + int hiComparison = _hi.CompareTo(other._hi); + if (hiComparison != 0) return hiComparison; + return _lo.CompareTo(other._lo); + } + + public int CompareTo(object obj) + { + if (ReferenceEquals(null, obj)) return 1; + if (ReferenceEquals(this, obj)) return 0; + return obj is Int128 other ? CompareTo(other) : throw new ArgumentException($"Object must be of type {nameof(Int128)}"); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/JsonMapper.cs b/IO.Eventuate.Tram/JsonMapper.cs new file mode 100644 index 0000000..5f871ae --- /dev/null +++ b/IO.Eventuate.Tram/JsonMapper.cs @@ -0,0 +1,35 @@ +using System; +using Newtonsoft.Json; +using Newtonsoft.Json.Serialization; + +namespace IO.Eventuate.Tram +{ + public static class JsonMapper + { + public static readonly JsonSerializerSettings JsonSerializerSettings = new JsonSerializerSettings + { + ContractResolver = new CamelCasePropertyNamesContractResolver + { + // Prevent Header dictionary keys from being camel cased + NamingStrategy = {ProcessDictionaryKeys = false} + }, + MissingMemberHandling = MissingMemberHandling.Ignore, + NullValueHandling = NullValueHandling.Ignore + }; + + public static string ToJson(object o) + { + return JsonConvert.SerializeObject(o, JsonSerializerSettings); + } + + public static T FromJson(string json) + { + return JsonConvert.DeserializeObject(json, JsonSerializerSettings); + } + + public static object FromJson(string json, Type type) + { + return JsonConvert.DeserializeObject(json, type, JsonSerializerSettings); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Common/IMessage.cs b/IO.Eventuate.Tram/Messaging/Common/IMessage.cs new file mode 100644 index 0000000..b7937d8 --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Common/IMessage.cs @@ -0,0 +1,19 @@ +using System.Collections.Generic; + +namespace IO.Eventuate.Tram.Messaging.Common +{ + public interface IMessage + { + string Id { get; } + IDictionary Headers { get; set; } + string Payload { get; set; } + + string GetHeader(string name); + string GetRequiredHeader(string name); + + bool HasHeader(string name); + + void SetHeader(string name, string value); + void RemoveHeader(string key); + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Common/IMessageInterceptor.cs b/IO.Eventuate.Tram/Messaging/Common/IMessageInterceptor.cs new file mode 100644 index 0000000..e1fd58e --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Common/IMessageInterceptor.cs @@ -0,0 +1,19 @@ +using System; + +namespace IO.Eventuate.Tram.Messaging.Common +{ + public interface IMessageInterceptor + { + void PreSend(IMessage message); + + void PostSend(IMessage message, Exception e); + + void PreReceive(IMessage message); + + void PreHandle(string subscriberId, IMessage message); + + void PostHandle(string subscriberId, IMessage message, Exception e); + + void PostReceive(IMessage message); + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Common/Message.cs b/IO.Eventuate.Tram/Messaging/Common/Message.cs new file mode 100644 index 0000000..c791289 --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Common/Message.cs @@ -0,0 +1,76 @@ +using System; +using System.Collections.Generic; + +namespace IO.Eventuate.Tram.Messaging.Common +{ + public class Message : IMessage + { + public Message() + { + + } + + public Message(string payload, IDictionary headers) + { + Payload = payload; + Headers = headers; + } + + /// + /// Id header. + /// Return null instead of exception of not present otherwise logging + /// and other users have issues. + /// + public string Id => GetHeader(MessageHeaders.Id); + + public IDictionary Headers { get; set; } + + public string Payload { get; set; } + + public string GetHeader(string name) + { + if (Headers == null) + { + return null; + } + + return Headers.TryGetValue(name, out string value) ? value : null; + } + + public string GetRequiredHeader(string name) + { + string value = GetHeader(name); + if (value == null) + { + throw new ArgumentException($"No such header: {name} in this message {this}", nameof(name)); + } + + return value; + } + + public bool HasHeader(string name) + { + return Headers != null && Headers.ContainsKey(name); + } + + public void SetHeader(string name, string value) + { + if (Headers == null) + { + Headers = new Dictionary(); + } + + Headers[name] = value; + } + + public void RemoveHeader(string key) + { + Headers?.Remove(key); + } + + public override string ToString() + { + return $"{nameof(Id)}: {Id}, {nameof(Headers)}: {Headers}, {nameof(Payload)}: {Payload}"; + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Common/MessageHeaders.cs b/IO.Eventuate.Tram/Messaging/Common/MessageHeaders.cs new file mode 100644 index 0000000..524804a --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Common/MessageHeaders.cs @@ -0,0 +1,10 @@ +namespace IO.Eventuate.Tram.Messaging.Common +{ + public static class MessageHeaders + { + public const string Id = "ID"; + public const string PartitionId = "PARTITION_ID"; + public const string Destination = "DESTINATION"; + public const string Date = "DATE"; + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Consumer/IMessageConsumer.cs b/IO.Eventuate.Tram/Messaging/Consumer/IMessageConsumer.cs new file mode 100644 index 0000000..930d277 --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Consumer/IMessageConsumer.cs @@ -0,0 +1,19 @@ +using System.Collections.Generic; + +namespace IO.Eventuate.Tram.Messaging.Consumer +{ + /// + /// Supports basic message consumption + /// + public interface IMessageConsumer + { + /// + /// Subscribe to and register a message handler for messages published to the specified set of channels. + /// + /// The subscriber ID to use for this subscription. Multiple subscriptions + /// using the same subscriber ID will result in a particular message being sent to only one of the subscribers. + /// The set of channels to subscribe to. + /// A message handler method to call when a message is received. + void Subscribe(string subscriberId, ISet channels, MessageHandler handler); + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Consumer/Kafka/ConsumerPropertiesFactory.cs b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/ConsumerPropertiesFactory.cs new file mode 100644 index 0000000..a3a312f --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/ConsumerPropertiesFactory.cs @@ -0,0 +1,23 @@ +using System.Collections.Generic; +using Confluent.Kafka; + +namespace IO.Eventuate.Tram.Messaging.Consumer.Kafka +{ + public static class ConsumerPropertiesFactory + { + public static ConsumerConfig MakeDefaultConsumerProperties(string bootstrapServers, + string subscriberId) + { + var consumerProperties = new ConsumerConfig + { + BootstrapServers = bootstrapServers, + GroupId = subscriberId, + EnableAutoCommit = false, + SessionTimeoutMs = 30000, + AutoOffsetReset = AutoOffsetReset.Earliest + }; + + return consumerProperties; + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Consumer/Kafka/EventuateKafkaConsumer.cs b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/EventuateKafkaConsumer.cs new file mode 100644 index 0000000..d0afead --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/EventuateKafkaConsumer.cs @@ -0,0 +1,173 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Confluent.Kafka; +using Microsoft.Extensions.Logging; + +namespace IO.Eventuate.Tram.Messaging.Consumer.Kafka +{ + /// + /// Kafka consumer listens for a set of topics and triggers a callback when + /// an event is received. + /// Disposing of the the consumer shuts down the subscription. + /// + public class EventuateKafkaConsumer : IDisposable + { + private const int ConsumePollMilliseconds = 100; + private const int AdminClientTimeoutMilliseconds = 10; + + private readonly string _subscriberId; + private readonly Action, Action> _handler; + private readonly IList _topics; + private readonly ILoggerFactory _loggerFactory; + + private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); + + private readonly IDictionary _consumerProperties; + private readonly ILogger _logger; + + public EventuateKafkaConsumer(string subscriberId, + Action, Action> handler, + IList topics, + string bootstrapServers, + EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties, + ILoggerFactory loggerFactory) + { + _subscriberId = subscriberId; + _handler = handler; + _topics = topics; + _loggerFactory = loggerFactory; + _logger = loggerFactory.CreateLogger(); + + _consumerProperties = + ConsumerPropertiesFactory.MakeDefaultConsumerProperties(bootstrapServers, subscriberId) + .ToDictionary(p => p.Key, p => p.Value); + + foreach (KeyValuePair pair in eventuateKafkaConsumerConfigurationProperties.Properties) + { + _consumerProperties[pair.Key] = pair.Value; + } + } + + private void VerifyTopicExistsBeforeSubscribing(IAdminClient adminClient, string topic) + { + var logContext = $"{nameof(VerifyTopicExistsBeforeSubscribing)} " + + $"for subscriberId='{_subscriberId}', topic='{topic}'"; + try + { + _logger.LogDebug($"+{logContext}"); + Metadata metadata = adminClient.GetMetadata(topic, TimeSpan.FromSeconds(AdminClientTimeoutMilliseconds)); + + List partitions = metadata.Topics[0].Partitions; + _logger.LogDebug($"-{logContext}: found partitions='{String.Join(",", partitions.Select(p => p.PartitionId))}'"); + } + catch (Exception e) + { + _logger.LogError(e, $"{logContext}: Got exception: {e}"); + throw; + } + } + + private void MaybeCommitOffsets(Consumer consumer, KafkaMessageProcessor processor) + { + var logContext = $"{nameof(MaybeCommitOffsets)} for SubscriberId='{_subscriberId}'"; + List offsetsToCommit = processor.OffsetsToCommit().ToList(); + if (offsetsToCommit.Any()) + { + _logger.LogDebug($"{logContext}: Committing offsets='{String.Join(",", offsetsToCommit)}'"); + consumer.Commit(offsetsToCommit); + processor.NoteOffsetsCommitted(offsetsToCommit); + _logger.LogDebug($"-{logContext}"); + } + } + + public void Start() + { + var logContext = $"{nameof(Start)} for SubscriberId={_subscriberId}"; + try + { + Consumer consumer = new ConsumerBuilder(_consumerProperties).Build(); + var processor = new KafkaMessageProcessor(_subscriberId, _handler, + _loggerFactory.CreateLogger()); + + using (var adminClient = new AdminClient(consumer.Handle)) + { + foreach (string topic in _topics) + { + VerifyTopicExistsBeforeSubscribing(adminClient, topic); + } + } + + List topicsList = new List(_topics); + _logger.LogDebug($"{logContext}: Subscribing to topics='{String.Join(",", topicsList)}'"); + + consumer.Subscribe(topicsList); + + Task.Run(() => + { + try + { + while (!_cancellationTokenSource.IsCancellationRequested) + { + try + { + ConsumeResult record = consumer.Consume(TimeSpan.FromMilliseconds(ConsumePollMilliseconds)); + + if (record != null) + { + _logger.LogDebug($"{logContext}: process record at offset='{record.Offset}', key='{record.Key}', value='{record.Value}'"); + + processor.Process(record); + } + + MaybeCommitOffsets(consumer, processor); + } + catch (ConsumeException e) + { + _logger.LogError($"{logContext}: ConsumeException - {e.Error}. Continuing."); + } + } + } + catch (TaskCanceledException) + { + _logger.LogInformation($"{logContext}: Shutdown by cancel"); + } + catch (Exception e) + { + _logger.LogError($"{logContext}: Exception - {e}"); + } + finally + { + // Try to put the last of the offsets away. Note that the + // callbacks are done asynchronously so there is no guarantee + // that all the offsets are ready. Worst case is that there + // are messages processed more than once. + MaybeCommitOffsets(consumer, processor); + consumer.Close(); + consumer.Dispose(); + } + }, _cancellationTokenSource.Token); + } + catch (Exception e) + { + _logger.LogError(e, $"{logContext}: Error subscribing"); + throw; + } + } + + public void Dispose() + { + var logContext = $"{nameof(Dispose)} for SubscriberId={_subscriberId}"; + _logger.LogDebug($"+{logContext}"); + if (!_cancellationTokenSource.IsCancellationRequested) + { + _logger.LogDebug($"+{logContext}: Sending cancel to consumer thread."); + _cancellationTokenSource.Cancel(); + } + _cancellationTokenSource.Dispose(); + _logger.LogDebug($"-{logContext}"); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Consumer/Kafka/EventuateKafkaConsumerConfigurationProperties.cs b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/EventuateKafkaConsumerConfigurationProperties.cs new file mode 100644 index 0000000..6f33b91 --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/EventuateKafkaConsumerConfigurationProperties.cs @@ -0,0 +1,14 @@ +using System.Collections.Generic; + +namespace IO.Eventuate.Tram.Messaging.Consumer.Kafka +{ + public class EventuateKafkaConsumerConfigurationProperties + { + public IDictionary Properties { get; set; } = new Dictionary(); + + public static EventuateKafkaConsumerConfigurationProperties Empty() + { + return new EventuateKafkaConsumerConfigurationProperties(); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Consumer/Kafka/IDuplicateMessageDetector.cs b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/IDuplicateMessageDetector.cs new file mode 100644 index 0000000..2e8a838 --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/IDuplicateMessageDetector.cs @@ -0,0 +1,7 @@ +namespace IO.Eventuate.Tram.Messaging.Consumer.Kafka +{ + public interface IDuplicateMessageDetector + { + bool IsDuplicate(string consumerId, string messageId); + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Consumer/Kafka/KafkaMessageConsumer.cs b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/KafkaMessageConsumer.cs new file mode 100644 index 0000000..6e12e19 --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/KafkaMessageConsumer.cs @@ -0,0 +1,193 @@ +using System; +using System.Collections.Generic; +using System.Transactions; +using Confluent.Kafka; +using IO.Eventuate.Tram.Database; +using IO.Eventuate.Tram.Messaging.Common; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace IO.Eventuate.Tram.Messaging.Consumer.Kafka +{ + public class KafkaMessageConsumer : IMessageConsumer, IDisposable + { + private readonly ILogger _logger; + private readonly EventuateKafkaConsumerConfigurationProperties _eventuateKafkaConsumerConfigurationProperties; + private readonly IEnumerable _messageInterceptors; + private readonly ILoggerFactory _loggerFactory; + private readonly IServiceScopeFactory _serviceScopeFactory; + + private readonly string _bootstrapServers; + private readonly List _consumers = new List(); + + public KafkaMessageConsumer(string bootstrapServers, + EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties, + IEnumerable messageInterceptors, ILoggerFactory loggerFactory, + IServiceScopeFactory serviceScopeFactory) + { + _bootstrapServers = bootstrapServers; + _eventuateKafkaConsumerConfigurationProperties = eventuateKafkaConsumerConfigurationProperties; + _messageInterceptors = messageInterceptors; + _loggerFactory = loggerFactory; + _serviceScopeFactory = serviceScopeFactory; + _logger = _loggerFactory.CreateLogger(); + } + + public void Subscribe(string subscriberId, ISet channels, MessageHandler handler) + { + var logContext = $"{nameof(Subscribe)} for subscriberId='{subscriberId}', " + + $"channels='{String.Join(",", channels)}', " + + $"handler='{handler.Method.Name}'"; + _logger.LogDebug($"+{logContext}"); + + var swimLaneBasedDispatcher = new SwimlaneBasedDispatcher(subscriberId, _loggerFactory); + + Action, Action> kcHandler = + (record, completionCallback) => + HandleKafkaConsumeResult(subscriberId, handler, swimLaneBasedDispatcher, logContext, record, completionCallback); + + var kc = new EventuateKafkaConsumer(subscriberId, + kcHandler, + new List(channels), + _bootstrapServers, + _eventuateKafkaConsumerConfigurationProperties, + _loggerFactory); + + _consumers.Add(kc); + + kc.Start(); + _logger.LogDebug($"-{logContext}"); + } + + /// + /// Handles a Kafka ConsumeResult by converting it to an IMessage and dispatching it using the specified dispatcher + /// + /// The subscriber ID (Kafka consumer group) + /// External message handler + /// Swim lane dispatcher that routes messages by partition ID + /// Log context + /// Kafka consume result + /// Called after the message has been handled with the exception if applicable + private void HandleKafkaConsumeResult(string subscriberId, MessageHandler handler, + SwimlaneBasedDispatcher swimLaneBasedDispatcher, string logContext, ConsumeResult record, Action completionCallback) + { + swimLaneBasedDispatcher.Dispatch(ToMessage(record), record.Partition, + message => { HandleDispatchedMessage(subscriberId, handler, logContext, message, completionCallback); } + ); + } + + /// + /// Handle a dispatched message. Calls message interceptor hooks and the external message handler. + /// + /// The subscriber ID (Kafka consumer group) + /// External message handler + /// Log context + /// Dispatched message + /// Called after the message has been handled with the exception if applicable + private void HandleDispatchedMessage(string subscriberId, MessageHandler handler, string logContext, IMessage message, + Action completionCallback) + { + _logger.LogDebug($"{logContext}: Dispatcher called for messageId='{message.Id}'"); + using (IServiceScope serviceScope = _serviceScopeFactory.CreateScope()) + { + IServiceProvider scopedServiceProvider = serviceScope.ServiceProvider; + PreReceive(message); + try + { + var dbContext = scopedServiceProvider.GetRequiredService(); + IExecutionStrategy strategy = dbContext.Database.CreateExecutionStrategy(); + strategy.Execute(() => + { + using (var transactionScope = new TransactionScope()) + { + var duplicateMessageDetector = scopedServiceProvider + .GetRequiredService(); + if (duplicateMessageDetector.IsDuplicate(subscriberId, message.Id)) + { + _logger.LogDebug($"{logContext}: messageId='{message.Id}' is a duplicate"); + completionCallback(null); + transactionScope.Complete(); + return; + } + + try + { + _logger.LogDebug($"{logContext}: Invoking handlers for messageId='{message.Id}'"); + PreHandle(subscriberId, message); + handler(message, scopedServiceProvider); + PostHandle(subscriberId, message, null); + } + catch (Exception e) + { + PostHandle(subscriberId, message, e); + _logger.LogError(e, $"{logContext}: Exception processing messageId='{message.Id}'"); + completionCallback(e); + transactionScope.Complete(); + return; + } + + completionCallback(null); + transactionScope.Complete(); + _logger.LogDebug($"{logContext}: Processed messageId='{message.Id}'"); + } + }); + } + finally + { + PostReceive(message); + } + } + + _logger.LogDebug($"{logContext}: Dispatcher done with messageId='{message.Id}'"); + } + + private void PreReceive(IMessage message) + { + foreach (IMessageInterceptor interceptor in _messageInterceptors) + { + interceptor.PreReceive(message); + } + } + + private void PreHandle(string subscriberId, IMessage message) + { + foreach (IMessageInterceptor interceptor in _messageInterceptors) + { + interceptor.PreHandle(subscriberId, message); + } + } + + private void PostHandle(string subscriberId, IMessage message, Exception e) + { + foreach (IMessageInterceptor interceptor in _messageInterceptors) + { + interceptor.PostHandle(subscriberId, message, e); + } + } + + private void PostReceive(IMessage message) + { + foreach (IMessageInterceptor interceptor in _messageInterceptors) + { + interceptor.PostReceive(message); + } + } + + private IMessage ToMessage(ConsumeResult record) + { + return JsonMapper.FromJson(record.Value); + } + + public void Dispose() + { + _logger.LogDebug($"+{nameof(Dispose)}"); + foreach (EventuateKafkaConsumer consumer in _consumers) + { + consumer.Dispose(); + } + _logger.LogDebug($"-{nameof(Dispose)}"); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Consumer/Kafka/KafkaMessageProcessor.cs b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/KafkaMessageProcessor.cs new file mode 100644 index 0000000..9a20270 --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/KafkaMessageProcessor.cs @@ -0,0 +1,92 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using Confluent.Kafka; +using Microsoft.Extensions.Logging; + +namespace IO.Eventuate.Tram.Messaging.Consumer.Kafka +{ + /// + /// Processes a Kafka message and tracks the message offsets that have been successfully processed and can be committed + /// + public class KafkaMessageProcessor + { + private readonly ILogger _logger; + private readonly string _loggingObjectContext; + + private readonly Action, Action> _handler; + private readonly OffsetTracker _offsetTracker; + + // The java solution used a blocking queue but none of the methods that use + // the blocking feature. ConcurrentQueue is simpler. + private readonly ConcurrentQueue> _processedRecords = + new ConcurrentQueue>(); + + public KafkaMessageProcessor(string subscriberId, + Action, Action> handler, + ILogger logger) + { + _handler = handler; + _logger = logger; + _loggingObjectContext = $"SubscriberId='{subscriberId}', handler='{handler.Method.Name}'"; + _offsetTracker = new OffsetTracker(_logger); + } + + public void Process(ConsumeResult record) + { + var logContext = $"{nameof(Process)} for {_loggingObjectContext}, " + + $"record.Key='{record.Key}', record.Topic='{record.Topic}'"; + _logger.LogDebug($"+{logContext}"); + _offsetTracker.NoteUnprocessed(new TopicPartition(record.Topic, record.Partition), record.Offset); + _handler(record, e => + { + if (e != null) + { + _logger.LogError(e, $"{logContext}: Exception processing record"); + } + else + { + _logger.LogDebug($"{logContext}: Adding process record to queue"); + _processedRecords.Enqueue(record); + } + }); + _logger.LogDebug($"-{logContext}"); + } + + public IEnumerable OffsetsToCommit() + { + var logContext = $"{nameof(OffsetsToCommit)} for {_loggingObjectContext}"; + _logger.LogDebug($"+{logContext}"); + int count = 0; + while (true) + { + if (!_processedRecords.TryDequeue(out ConsumeResult record)) + { + break; + } + + count++; + _offsetTracker.NoteProcessed(new TopicPartition(record.Topic, record.Partition), record.Offset); + } + + List offsetsToCommit = + new List(_offsetTracker.OffsetsToCommit()); + _logger.LogDebug($"-{logContext}: Marked {count} records as processed, returning {offsetsToCommit.Count} offsets to commit"); + return offsetsToCommit; + } + + public void NoteOffsetsCommitted(IEnumerable offsetsToCommit) + { + var logContext = $"{nameof(NoteOffsetsCommitted)} for {_loggingObjectContext}"; + _logger.LogDebug($"{logContext}"); + _offsetTracker.NoteOffsetsCommitted(offsetsToCommit); + } + + public OffsetTracker GetPending() + { + var logContext = $"{nameof(GetPending)} for {_loggingObjectContext}"; + _logger.LogDebug($"{logContext}"); + return _offsetTracker; + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Consumer/Kafka/OffsetTracker.cs b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/OffsetTracker.cs new file mode 100644 index 0000000..81eaef6 --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/OffsetTracker.cs @@ -0,0 +1,119 @@ +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Confluent.Kafka; +using Microsoft.Extensions.Logging; + +namespace IO.Eventuate.Tram.Messaging.Consumer.Kafka +{ + /// + /// Keeps track of message offsets that are (a) being processed and (b) have been processed and can be committed + /// + public class OffsetTracker + { + private readonly ILogger _logger; + private readonly IDictionary _state = + new Dictionary(); + + public OffsetTracker(ILogger logger) + { + _logger = logger; + } + + public override string ToString() + { + // Output the collection of topics and offsets + StringBuilder stringBuilder = new StringBuilder($"{nameof(_state)}: "); + foreach (KeyValuePair topicOffsetPair in _state) + { + stringBuilder.Append($"{topicOffsetPair.Key}={topicOffsetPair.Value},"); + } + + if (_state.Count > 0) + { + // Remove the last comma + stringBuilder.Remove(stringBuilder.Length - 1, 1); + } + + return stringBuilder.ToString(); + } + + TopicPartitionOffsets Fetch(TopicPartition topicPartition) + { + var logContext = $"OffsetTracker.Fetch for topicPartition='{topicPartition}'"; + _logger.LogDebug($"+{logContext}"); + _state.TryGetValue(topicPartition, out TopicPartitionOffsets tpo); + if (tpo == null) + { + _logger.LogDebug($"{logContext}: Creating new topic partition offset"); + tpo = new TopicPartitionOffsets(_logger); + _state[topicPartition] = tpo; + } + + _logger.LogDebug($"-{logContext}"); + return tpo; + } + + public void NoteUnprocessed(TopicPartition topicPartition, long offset) + { + var logContext = $"OffsetTracker.NoteUnprocessed for topicPartition='{topicPartition}', offset={offset}"; + _logger.LogDebug($"{logContext}"); + Fetch(topicPartition).NoteUnprocessed(offset); + } + + public void NoteProcessed(TopicPartition topicPartition, long offset) + { + var logContext = $"OffsetTracker.NoteProcessed for topicPartition='{topicPartition}', offset={offset}"; + _logger.LogDebug($"{logContext}"); + Fetch(topicPartition).NoteProcessed(offset); + } + + public IEnumerable OffsetsToCommit() + { + var logContext = "OffsetTracker.OffsetsToCommit"; + _logger.LogDebug($"+{logContext}"); + var result = new List(); + foreach (KeyValuePair pair in _state) + { + long? offset = pair.Value.OffsetToCommit(); + if (offset.HasValue) + { + result.Add(new TopicPartitionOffset(pair.Key, new Offset(offset.Value + 1))); + } + } + + _logger.LogDebug($"-{logContext}: collected {result.Count} offsets to commit"); + return result; + } + + public void NoteOffsetsCommitted(IEnumerable offsetsToCommit) + { + List offsets = new List(offsetsToCommit); + var logContext = $"OffsetTracker.NoteOffsetsCommitted with {offsets.Count} offsets"; + _logger.LogDebug($"+{logContext}"); + foreach (TopicPartitionOffset topicPartitionOffset in offsets) + { + Fetch(topicPartitionOffset.TopicPartition).NoteOffsetCommitted(topicPartitionOffset.Offset); + } + _logger.LogDebug($"-{logContext}"); + } + + public IDictionary> GetPending() + { + var logContext = "OffsetTracker.GetPending"; + _logger.LogDebug($"+{logContext}"); + IDictionary> result = new Dictionary>(); + foreach (KeyValuePair pair in _state) + { + ISet pending = pair.Value.GetPending(); + if (pending.Any()) + { + result[pair.Key] = pending; + } + } + + _logger.LogDebug($"-{logContext}: found {result.Count} pending offsets"); + return result; + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Consumer/Kafka/ReceivedMessage.cs b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/ReceivedMessage.cs new file mode 100644 index 0000000..d2acab1 --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/ReceivedMessage.cs @@ -0,0 +1,8 @@ +namespace IO.Eventuate.Tram.Messaging.Consumer.Kafka +{ + public class ReceivedMessage + { + public string ConsumerId { get; set; } + public string MessageId { get; set; } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Consumer/Kafka/SqlTableBasedDuplicateMessageDetector.cs b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/SqlTableBasedDuplicateMessageDetector.cs new file mode 100644 index 0000000..3133f0e --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/SqlTableBasedDuplicateMessageDetector.cs @@ -0,0 +1,49 @@ +using System; +using System.Data.SqlClient; +using IO.Eventuate.Tram.Database; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; + +namespace IO.Eventuate.Tram.Messaging.Consumer.Kafka +{ + public class SqlTableBasedDuplicateMessageDetector : IDuplicateMessageDetector + { + private readonly EventuateTramDbContext _dbContext; + private readonly ILogger _logger; + + public SqlTableBasedDuplicateMessageDetector(EventuateTramDbContext dbContext, + ILogger logger) + { + _dbContext = dbContext; + _logger = logger; + } + + public bool IsDuplicate(string consumerId, string messageId) + { + string logContext = $"{nameof(IsDuplicate)} " + + $"for {nameof(consumerId)}='{consumerId}', {nameof(messageId)}='{messageId}'"; + try + { + _logger.LogDebug($"+{logContext}"); + _dbContext.ReceivedMessages.Add(new ReceivedMessage {ConsumerId = consumerId, MessageId = messageId}); + _dbContext.SaveChanges(); + _logger.LogDebug($"-{logContext}"); + + return false; + } + catch (DbUpdateException e) + { + const int duplicateKeyError = 2627; + + if (e.InnerException is SqlException sqlException && sqlException.Number == duplicateKeyError) + { + _logger.LogInformation($"{logContext}: Detected duplicate."); + return true; + } + + _logger.LogError(e, $"{logContext}: Got exception {e}"); + throw; + } + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Consumer/Kafka/SwimlaneBasedDispatcher.cs b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/SwimlaneBasedDispatcher.cs new file mode 100644 index 0000000..1033b0b --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/SwimlaneBasedDispatcher.cs @@ -0,0 +1,48 @@ +using System; +using System.Collections.Concurrent; +using IO.Eventuate.Tram.Messaging.Common; +using Microsoft.Extensions.Logging; + +namespace IO.Eventuate.Tram.Messaging.Consumer.Kafka +{ + public class SwimlaneBasedDispatcher + { + private readonly ConcurrentDictionary _map = new ConcurrentDictionary(); + private readonly string _subscriberId; + private readonly ILoggerFactory _loggerFactory; + private readonly ILogger _logger; + private readonly string _dispatcherContext; + + public SwimlaneBasedDispatcher(string subscriberId, ILoggerFactory loggerFactory) + { + _subscriberId = subscriberId; + _loggerFactory = loggerFactory; + _logger = _loggerFactory.CreateLogger(); + _dispatcherContext = $"SubscriberId='{subscriberId}'"; + } + + public void Dispatch(IMessage message, int swimlane, Action target) + { + var logContext = $"{nameof(Dispatch)} for {_dispatcherContext}, swimlane={swimlane}, MessageId={message.Id}"; + _logger.LogDebug($"+{logContext}"); + if (!_map.TryGetValue(swimlane, out SwimlaneDispatcher swimlaneDispatcher)) + { + _logger.LogDebug($"{logContext}: No dispatcher found, attempting to create"); + swimlaneDispatcher = new SwimlaneDispatcher(_subscriberId, swimlane, _loggerFactory.CreateLogger()); + SwimlaneDispatcher r = _map.GetOrAdd(swimlane, swimlaneDispatcher); + if (r != swimlaneDispatcher) + { + _logger.LogDebug($"{logContext}: Using concurrently created SwimlaneDispatcher"); + swimlaneDispatcher = r; + } + else + { + _logger.LogDebug($"{logContext}: Using newly created SwimlaneDispatcher"); + } + } + + swimlaneDispatcher.Dispatch(message, target); + _logger.LogDebug($"-{logContext}"); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Consumer/Kafka/SwimlaneDispatcher.cs b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/SwimlaneDispatcher.cs new file mode 100644 index 0000000..4123815 --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/SwimlaneDispatcher.cs @@ -0,0 +1,98 @@ +using System; +using System.Collections.Concurrent; +using System.Threading.Tasks; +using IO.Eventuate.Tram.Messaging.Common; +using Microsoft.Extensions.Logging; + +namespace IO.Eventuate.Tram.Messaging.Consumer.Kafka +{ + public class SwimlaneDispatcher + { + private readonly object _lockObject = new object(); + + private readonly ILogger _logger; + private readonly string _dispatcherContext; + + // The java solution used a blocking queue but none of the methods that use + // the blocking feature. ConcurrentQueue is simpler. + private readonly ConcurrentQueue _queue = + new ConcurrentQueue(); + + // The running flag must only be accessed within a lock(_lockObject) + private bool _running; + + public SwimlaneDispatcher(string subscriberId, int swimlane, ILogger logger) + { + _logger = logger; + _dispatcherContext = $"SubscriberId='{subscriberId}', SwimLane='{swimlane}'"; + } + + public void Dispatch(IMessage message, Action messageConsumer) + { + var logContext = $"{nameof(Dispatch)} for {_dispatcherContext}, MessageId={message.Id}"; + _logger.LogDebug($"+{logContext}"); + lock (_lockObject) + { + var queuedMessage = new QueuedMessage(message, messageConsumer); + _queue.Enqueue(queuedMessage); + // Start a message processor if one is not running + if (!_running) + { + _logger.LogDebug($"{logContext}: Added message and starting message processor"); + _running = true; + StartMessageProcessor(); + } + else + { + _logger.LogDebug($"{logContext}: Added message for already running message processor"); + } + } + _logger.LogDebug($"-{logContext}"); + } + + private void StartMessageProcessor() + { + // Java implementation uses a ThreadPoolExecutor as executor. + // DOTNET Task.Run queues the work to run on the ThreadPool. + Task.Run(() => ProcessQueuedMessage()); + } + + private void ProcessQueuedMessage() + { + var logContext = $"{nameof(ProcessQueuedMessage)} for {_dispatcherContext}"; + _logger.LogDebug($"+{logContext}"); + while (true) + { + if (!_queue.TryDequeue(out QueuedMessage queuedMessage)) + { + // Queue was empty, check one more time with the lock on to + // avoid a race and stop the processor if really empty + lock (_lockObject) + { + if (!_queue.TryDequeue(out queuedMessage)) + { + _logger.LogDebug($"{logContext}: No more messages, stopping message processor"); + _running = false; + return; + } + } + } + _logger.LogDebug($"{logContext}: Invoking handler for MessageId='{queuedMessage.Message.Id}'"); + queuedMessage.MessageConsumer(queuedMessage.Message); + } + } + + private class QueuedMessage + { + public QueuedMessage(IMessage message, Action messageConsumer) + { + Message = message; + MessageConsumer = messageConsumer; + } + + public IMessage Message { get; } + + public Action MessageConsumer { get; } + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Consumer/Kafka/TopicPartitionOffsets.cs b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/TopicPartitionOffsets.cs new file mode 100644 index 0000000..5b95aeb --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Consumer/Kafka/TopicPartitionOffsets.cs @@ -0,0 +1,113 @@ +using System.Collections.Generic; +using System.Linq; +using Microsoft.Extensions.Logging; + +namespace IO.Eventuate.Tram.Messaging.Consumer.Kafka +{ + /// + /// Tracks the offsets for a TopicPartition that are being processed or have been processed + /// + public class TopicPartitionOffsets + { + private readonly ILogger _logger; + + /// + /// offsets that are being processed + /// + private SortedSet _unprocessed = new SortedSet(); + + /// + /// offsets that have been processed + /// + private ISet _processed = new HashSet(); + + public TopicPartitionOffsets(ILogger logger) + { + _logger = logger; + } + + public override string ToString() + { + return $"Unprocessed: {_unprocessed}, Processed: {_processed}"; + } + + /// + /// Mark the offset for an event as being processed + /// + /// Offset of an event being processed + public void NoteUnprocessed(long offset) + { + _logger.LogDebug($"TopicPartitionOffset.NoteUnprocessed for offset={offset}"); + _unprocessed.Add(offset); + } + + /// + /// Mark the offset of an event as processed (completed processing) + /// + /// Offset of a processed event + public void NoteProcessed(long offset) + { + _logger.LogDebug($"TopicPartitionOffset.NoteProcessed for offset={offset}"); + _processed.Add(offset); + } + + /// + /// Returns the highest offset that has been process. It is assumed that events are + /// process in order so that all lower offsets are already processed. + /// + /// Largest of all offsets that have been processed and can be committed + public long? OffsetToCommit() + { + var logContext = $"TopicPartitionOffset.OffsetToCommit"; + _logger.LogDebug($"+{logContext}"); + long? result = null; + foreach (long x in _unprocessed) + { + if (_processed.Contains(x)) + { + result = x; + } + else + { + break; + } + } + + _logger.LogDebug($"-{logContext}: returning offset={result}"); + return result; + } + + /// + /// Mark an offset as committed so that all lower offsets can be + /// removed from processing tracking. + /// There is some off by 1 oddness in the usage so that + /// the offset to commit is actually the lowest not committed offset + /// + /// Offset to mark as committed implying that + /// all lower offsets are also committed + public void NoteOffsetCommitted(long offset) + { + var logContext = $"TopicPartitionOffset.NoteOffsetCommitted"; + _logger.LogDebug($"+{logContext}"); + _unprocessed = new SortedSet(_unprocessed.Where(x => x >= offset)); + _processed = new HashSet(_processed.Where(x => x >= offset)); + _logger.LogDebug($"-{logContext}: unprocessed count={_unprocessed.Count}, " + + $"process count={_processed.Count} "); + } + + /// + /// Pending means processing is started but not yet marked as + /// completed. + /// + /// Set of offsets that are being processed. + public ISet GetPending() + { + var logContext = "TopicPartitionOffset.GetPending"; + _logger.LogDebug($"+{logContext}"); + ISet result = new HashSet(_unprocessed); + result.ExceptWith(_processed); + _logger.LogDebug($"-{logContext}: returning {result.Count} offsets"); + return result; + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Consumer/MessageHandler.cs b/IO.Eventuate.Tram/Messaging/Consumer/MessageHandler.cs new file mode 100644 index 0000000..f96b758 --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Consumer/MessageHandler.cs @@ -0,0 +1,7 @@ +using System; +using IO.Eventuate.Tram.Messaging.Common; + +namespace IO.Eventuate.Tram.Messaging.Consumer +{ + public delegate void MessageHandler(IMessage message, IServiceProvider serviceProvider); +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Producer/AbstractMessageProducer.cs b/IO.Eventuate.Tram/Messaging/Producer/AbstractMessageProducer.cs new file mode 100644 index 0000000..b8ffb17 --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Producer/AbstractMessageProducer.cs @@ -0,0 +1,78 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using IO.Eventuate.Tram.Messaging.Common; +using Microsoft.Extensions.Logging; + +namespace IO.Eventuate.Tram.Messaging.Producer +{ + public abstract class AbstractMessageProducer + { + protected readonly ILogger Logger; + protected readonly IMessageInterceptor[] MessageInterceptors; + + protected AbstractMessageProducer(IEnumerable messageInterceptors, + ILogger logger) + { + MessageInterceptors = messageInterceptors.ToArray(); + Logger = logger; + } + + protected void PreSend(IMessage message) + { + var logContext = $"{nameof(PreSend)} message.Id={message.Id}"; + Logger.LogDebug($"+{logContext}"); + foreach (IMessageInterceptor messageInterceptor in MessageInterceptors) + { + messageInterceptor.PreSend(message); + } + Logger.LogDebug($"-{logContext}: sent to {MessageInterceptors.Length} message interceptors"); + } + + protected void PostSend(IMessage message, Exception e) + { + var logContext = $"{nameof(PostSend)} message.Id={message.Id}"; + Logger.LogDebug($"+{logContext}"); + foreach (IMessageInterceptor messageInterceptor in MessageInterceptors) + { + messageInterceptor.PostSend(message, e); + } + Logger.LogDebug($"-{logContext}: sent to {MessageInterceptors.Length} message interceptors"); + } + + protected void SendMessage(string id, string destination, IMessage message, IMessageSender messageSender) + { + var logContext = $"{nameof(SendMessage)} id='{id}', destination='{destination}', message.Id='{message.Id}'"; + Logger.LogDebug($"+{logContext}"); + if (id == null) + { + if (message.GetHeader(MessageHeaders.Id) == null) + { + Logger.LogError($"{logContext}: Message missing Id header"); + throw new ArgumentNullException(nameof(id), "message needs an id"); + } + } + else + { + message.SetHeader(MessageHeaders.Id, id); + } + + message.SetHeader(MessageHeaders.Destination, destination); + + message.SetHeader(MessageHeaders.Date, HttpDateHeaderFormatUtil.NowAsHttpDateString()); + + PreSend(message); + try + { + messageSender.Send(message); + PostSend(message, null); + } + catch (Exception e) + { + Logger.LogError(e, $"{logContext}: Exception sending message"); + PostSend(message, e); + throw; + } + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Producer/HttpDateHeaderFormatUtil.cs b/IO.Eventuate.Tram/Messaging/Producer/HttpDateHeaderFormatUtil.cs new file mode 100644 index 0000000..1c10d62 --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Producer/HttpDateHeaderFormatUtil.cs @@ -0,0 +1,13 @@ +using System; + +namespace IO.Eventuate.Tram.Messaging.Producer +{ + public static class HttpDateHeaderFormatUtil + { + public static string NowAsHttpDateString() + { + // Use RFC 1123 format + return DateTimeOffset.UtcNow.ToString("R"); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Producer/IMessageProducer.cs b/IO.Eventuate.Tram/Messaging/Producer/IMessageProducer.cs new file mode 100644 index 0000000..697a1fb --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Producer/IMessageProducer.cs @@ -0,0 +1,17 @@ +using IO.Eventuate.Tram.Messaging.Common; + +namespace IO.Eventuate.Tram.Messaging.Producer +{ + /// + /// Supports sending basic messages + /// + public interface IMessageProducer + { + /// + /// Send a message + /// + /// The destination channel + /// The message to send + void Send(string destination, IMessage message); + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Producer/IMessageSender.cs b/IO.Eventuate.Tram/Messaging/Producer/IMessageSender.cs new file mode 100644 index 0000000..0ee89e1 --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Producer/IMessageSender.cs @@ -0,0 +1,9 @@ +using IO.Eventuate.Tram.Messaging.Common; + +namespace IO.Eventuate.Tram.Messaging.Producer +{ + public interface IMessageSender + { + void Send(IMessage message); + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Producer/MessageBuilder.cs b/IO.Eventuate.Tram/Messaging/Producer/MessageBuilder.cs new file mode 100644 index 0000000..021c0f5 --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Producer/MessageBuilder.cs @@ -0,0 +1,53 @@ +using System.Collections.Generic; +using IO.Eventuate.Tram.Messaging.Common; + +namespace IO.Eventuate.Tram.Messaging.Producer +{ + public class MessageBuilder + { + protected readonly string Body; + protected readonly IDictionary Headers = new Dictionary(); + + protected MessageBuilder() + { + + } + + public MessageBuilder(string body) + { + Body = body; + } + + public MessageBuilder(IMessage message) : this(message.Payload) + { + Headers = message.Headers; + } + + public static MessageBuilder WithPayload(string payload) { + return new MessageBuilder(payload); + } + + public MessageBuilder WithHeader(string name, string value) { + Headers[name] = value; + return this; + } + + public MessageBuilder WithExtraHeaders(string prefix, IDictionary headers) { + + foreach (KeyValuePair pair in headers) + { + Headers[prefix + pair.Key] = pair.Value; + } + + return this; + } + + public Message Build() { + return new Message(Body, Headers); + } + + public static MessageBuilder WithMessage(Message message) { + return new MessageBuilder(message); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Producer/Outbox/Message.cs b/IO.Eventuate.Tram/Messaging/Producer/Outbox/Message.cs new file mode 100644 index 0000000..1e6b0aa --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Producer/Outbox/Message.cs @@ -0,0 +1,26 @@ +using IO.Eventuate.Tram.Messaging.Common; + +namespace IO.Eventuate.Tram.Messaging.Producer.Outbox +{ + public class Message + { + public Message() + { + + } + + public Message(IMessage message) + { + Id = message.Id; + Destination = message.GetRequiredHeader(MessageHeaders.Destination); + Headers = JsonMapper.ToJson(message.Headers); + Payload = message.Payload; + } + + public string Id { get; set; } + public string Destination { get; set; } + public string Headers { get; set; } + public string Payload { get; set; } + public short Published { get; set; } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/Messaging/Producer/Outbox/OutboxMessageProducer.cs b/IO.Eventuate.Tram/Messaging/Producer/Outbox/OutboxMessageProducer.cs new file mode 100644 index 0000000..ad11803 --- /dev/null +++ b/IO.Eventuate.Tram/Messaging/Producer/Outbox/OutboxMessageProducer.cs @@ -0,0 +1,63 @@ +using System.Collections.Generic; +using IO.Eventuate.Tram.Database; +using IO.Eventuate.Tram.Messaging.Common; +using Microsoft.Extensions.Logging; + +namespace IO.Eventuate.Tram.Messaging.Producer.Outbox +{ + /// + /// Implements the AbstractMessageProducer using a database context as the + /// outbox for published messages to be sent to the message queue. + /// Eventuate-tram CDC takes messages from the database and puts them + /// into the message queue. + /// + public class OutboxMessageProducer : AbstractMessageProducer, IMessageProducer, IMessageSender + { + private readonly IIdGenerator _idGenerator; + private readonly EventuateTramDbContext _eventuateTramDbContext; + + /// + /// Construct an OutboxMessageProducer + /// + /// Collection of intercepts applied before and + /// after sending the message to outbox + /// Function to use for generating keys + /// Database context that provides + /// persistence for the outbox + /// Logger for diagnostic messages + public OutboxMessageProducer(IEnumerable messageInterceptors, + IIdGenerator idGenerator, EventuateTramDbContext eventuateTramDbContext, + ILogger logger) : base(messageInterceptors, logger) + { + _idGenerator = idGenerator; + _eventuateTramDbContext = eventuateTramDbContext; + } + + /// + /// Send a message to a specified destination (aka topic in Kafka speak). + /// + /// Destination channel (topic) to publish to + /// Message to publish + public void Send(string destination, IMessage message) + { + var logContext = $"{nameof(Send)} destination='{destination}', message.Id={message.Id}"; + Logger.LogDebug($"+{logContext}"); + string id = _idGenerator.GenId().AsString(); + SendMessage(id, destination, message, this); + Logger.LogDebug($"-{logContext}: sent message id={id}"); + } + + /// + /// Send message puts the message in the database to be processed + /// by the CDC. + /// + /// Message to publish + void IMessageSender.Send(IMessage message) + { + var messageEntity = new Message(message); + _eventuateTramDbContext.Messages.Add(messageEntity); + // Don't save changes on _eventuateTramDbContext here so that the application can have more + // control over when the save happens (within a transaction with other DbContext changes, etc.) + } + } +} \ No newline at end of file diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..d955a86 --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,11 @@ +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. \ No newline at end of file diff --git a/README.md b/README.md index 4652aa2..b54c8c6 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,236 @@ -# eventuate-tram-core-dotnet -.NET port of the Eventuate Tram client libraries +# Eventuate Tram (Transactional Messaging) framework +A .NET port of https://github.com/eventuate-tram/eventuate-tram-core + +## Usage + +First you need to register the EventuateTramDbContext in Startup.cs. You probably want use the same +DbConnection from another DbContext (e.g. AppDbContext) to allow for events to be published +and consumed within the same transaction as other state changes are saved. This can be done as +follows: +```c# +services.AddDbContext((provider, o) => +{ + // Share the same connection as AppDbContext + var appDbContext = provider.GetRequiredService(); + o.UseSqlServer(appDbContext.Database.GetDbConnection()); +}); +``` + +Next you need to register the core Eventuate Tram services in Startup.cs: +```c# +services.AddEventuateTramSqlKafkaTransport(dbSchemaName, bootstrapServers, + eventuateKafkaConsumerConfigurationProperties); +``` +To use the default eventuateKafkaConsumerConfigurationProperties, just pass in EventuateKafkaConsumerConfigurationProperties.Empty() + +Next you need to run the database initialization script (https://github.build.ge.com/plantapps/eventuate-tram-core/blob/ms-sql-support/mssql/initialize-database.sql), +modifying the database schema to match what you configured in the previous step. + +### Publishing Events +To enable domain event publishing register the DomainEventPublisher service in Startup.cs: +```c# +services.AddEventuateTramEventsPublisher(); +``` + +To publish a domain event, inject an IDomainEventPublisher into your service and then call IDomainEventPublisher.Publish: +```c# +var @event = new ExampleEvent +{ + ... +}; + +_domainEventPublisher.Publish(aggregateType, aggregateId, new List {@event}); +``` +The aggregateType will be used as the Kafka topic, and the aggregateId as the partition ID. +Your event type needs to implement IDomainEvent. +You also need to save the changes to the EventuateTramDbContext (probably within the same +transaction as other changes in another DbContext, e.g. AppDbContext): +```c# +using (IDbContextTransaction transaction = await _AppDbContext.Database.BeginTransactionAsync(stoppingToken)) +{ + await _AppDbContext.SaveChangesAsync(stoppingToken); + _eventuateTramDbContext.Database.UseTransaction(transaction.GetDbTransaction()); + await _eventuateTramDbContext.SaveChangesAsync(stoppingToken); + transaction.Commit(); +} +``` +Alternatively, you could use a TransactionScope instead of the explicit BeginTransactionAsync. + +If EnableRetryOnFailure is enabled on your DbContext, you will need to wrap the transaction in +an ExecutionStrategy ExecuteAsync(): +```c# +IExecutionStrategy strategy = _AppDbContext.Database.CreateExecutionStrategy(); +rowCount = await strategy.ExecuteAsync(async (stoppingToken) => +{ + using (IDbContextTransaction transaction = await _AppDbContext.Database.BeginTransactionAsync(stoppingToken)) + { + int count = await _AppDbContext.SaveChangesAsync(stoppingToken); + _eventuateTramDbContext.Database.UseTransaction(transaction.GetDbTransaction()); + await _eventuateTramDbContext.SaveChangesAsync(stoppingToken); + transaction.Commit(); + return count; + } +}, cancellationToken); + +``` + +Finally, the Eventuate Tram CDC service will need to be running and monitoring your database +for new events to publish. This is easiest to do by running it in a docker container. + +### Consuming Events (Option 1) + +Create one or more event handler services that implements IDomainEventHandler. You could have +one handler service per event type, or handle multiple event types in the same handler service: +```c# +public class ExampleEventHandler : IDomainEventHandler, IDomainEventHandler +{ + private readonly ExampleDbContext _dbContext; + private readonly ILogger _logger; + + public ExampleEventHandler(ExampleDbContext dbContext, ILogger logger) + { + _dbContext = dbContext; + _logger = logger; + } + + public void Handle(IDomainEventEnvelope @event) + { + _logger.LogDebug("Got message Example1Event with id={} and value={}", @event.EventId, + @event.Event.ToString()); + _dbContext.DoSomething(...); + _dbContext.SaveChanges(); + } + + public void Handle(IDomainEventEnvelope @event) + { + _logger.LogDebug("Got message Example2Event with id={} and value={}", @event.EventId, + @event.Event.ToString()); + _dbContext.DoSomething(...); + _dbContext.SaveChanges(); + } +} +``` + +Register your event handler service(s) as well as a DomainEventDispatcher in Startup.cs: +```c# +services.AddScoped(); +services.AddEventuateTramDomainEventDispatcher(subscriberId, + provider => DomainEventHandlersBuilder.ForAggregateType("ExampleAggregate") + .OnEvent() + .OnEvent() + .Build()); +``` +The aggregate type will be used as the Kafka topic and the subscriberId will be used as the Kafka +consumer group. + +Note that the Handle method is executed within a transaction scope and uses the execution strategy configured +on the EventuateTramDbContext. If EnableRetryOnFailure is configured on a DbContext used by your handler, you should +also set EnableRetryOnFailure on EventuateTramDbContext. Otherwise, you will get the exception: +```c# +System.InvalidOperationException: The configured execution strategy 'SqlServerRetryingExecutionStrategy' does not +support user initiated transactions. Use the execution strategy returned by 'DbContext.Database.CreateExecutionStrategy()' +to execute all the operations in the transaction as a retriable unit. +``` +Also, note that if EnableRetryOnFailure is configured, your handler may get called multiple times if there is a +transient failure that causes a retry to occur. + +### Consuming Events (Option 2) + +Create an event consumer service and implement handlers for the different event types: +```c# +public class ExampleEventConsumer +{ + private readonly ILogger _logger; + + public ExampleEventConsumer(ILogger logger) + { + _logger = logger; + } + + public DomainEventHandlers DomainEventHandlers() + { + return DomainEventHandlersBuilder.ForAggregateType("ExampleAggregate") + .OnEvent(HandleExample1Event) + .OnEvent(HandleExample2Event) + .Build(); + } + + private void HandleExample1Event(IDomainEventEnvelope @event) + { + _logger.LogDebug("Got Example1Event with id={} and value={}", @event.EventId, + @event.Event.ToString()); + } + + private void HandleExample2Event(IDomainEventEnvelope @event) + { + _logger.LogDebug("Got message Example2Event with id={} and value={}", @event.EventId, + @event.Event.ToString()); + } +} +``` +The aggregate type will be used as the Kafka topic. + +Register your event consumer service as well as a DomainEventDispatcher in Startup.cs: +```c# +services.AddSingleton(); +services.AddEventuateTramDomainEventDispatcher(subscriberId, + provider => + { + var consumer = provider.GetRequiredService(); + return consumer.DomainEventHandlers(); + }); +``` +The subscriberId will be used as the Kafka consumer group. + +Note that the handle method is executed within a transaction scope and uses the execution strategy configured +on the EventuateTramDbContext. If EnableRetryOnFailure is configured on a DbContext used by your handler, you should +also set EnableRetryOnFailure on EventuateTramDbContext. Otherwise, you will get the exception: +```c# +System.InvalidOperationException: The configured execution strategy 'SqlServerRetryingExecutionStrategy' does not +support user initiated transactions. Use the execution strategy returned by 'DbContext.Database.CreateExecutionStrategy()' +to execute all the operations in the transaction as a retriable unit. +``` +Also, note that if EnableRetryOnFailure is configured, your handler may get called multiple times if there is a +transient failure that causes a retry to occur. + +### Troubleshooting + +If you are using this library on OSX, you may get an exception similar to "System.DllNotFoundException: Failed to +load the librdkafka native library". See issue https://github.com/confluentinc/confluent-kafka-dotnet/issues/778 +for more information and a work-around. This should be resolved in the final version of Confluent.Kafka 1.0.0 +(currently we are using 1.0.0-beta3). + +## Running Integration Tests + +### Services Required + +The integration tests require a set of support services to be running and accessible. +The IntegrationTests folder contains a docker-compose.yml that can be used to start the +support services and run the tests. + +You can use the following steps to run the tests: + +``` +$ cd IO.Eventuate.Tram.IntegrationTests +$ dotnet publish -c Release +$ export CDC_SERVICE_DOCKER_IMAGE= +$ export CDC_SERVICE_DOCKER_VERSION= +$ docker-compose down +$ docker-compose build +$ docker-compose up -d mssql +$ docker-compose up --exit-code-from dbsetup dbsetup +$ docker-compose up -d zookeeper +$ docker-compose up -d kafka +$ docker-compose up -d cdcservice1 +$ docker-compose up -d cdcservice2 +$ docker-compose up eventuatetramtests +``` + +Test results will be written to ./bin/Release/netcoreapp2.1/publish/TestResults. + +### Environment Variable Configuration + +If running the tests from an IDE, set the following environment variables: + +> `ConnectionStrings__EventuateTramDbConnection` identifies where to find the database for the outbox messages. +> `KafkaBootstrapServers` identifies where the Kafka server is running. \ No newline at end of file