Skip to content

Commit

Permalink
Initial port of Java library
Browse files Browse the repository at this point in the history
  • Loading branch information
douggish committed Apr 25, 2019
1 parent 31f819d commit 57245bc
Show file tree
Hide file tree
Showing 74 changed files with 3,918 additions and 2 deletions.
4 changes: 4 additions & 0 deletions IO.Eventuate.Tram.IntegrationTests/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM microsoft/dotnet:2.1-sdk
WORKDIR /app

ENTRYPOINT [ "dotnet", "vstest", "IO.Eventuate.Tram.IntegrationTests.dll", "/logger:trx" ]
5 changes: 5 additions & 0 deletions IO.Eventuate.Tram.IntegrationTests/Dockerfile-dbsetup
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM microsoft/mssql-tools:latest
WORKDIR /scripts
COPY ./TestDatabase/* ./
RUN chmod +x ./entrypoint.sh
CMD ./entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp2.1</TargetFramework>

<IsPackable>false</IsPackable>

<StartupObject></StartupObject>
</PropertyGroup>

<ItemGroup>
<None Remove="nlog.config" />
<None Remove="testsettings.json" />
</ItemGroup>

<ItemGroup>
<Content Include="nlog.config">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
<Content Include="testsettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.0.0-beta3" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="2.1.4" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="2.1.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.8.0" />
<PackageReference Include="NUnit" Version="3.11.0" />
<PackageReference Include="NUnit3TestAdapter" Version="3.12.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\IO.Eventuate.Tram\IO.Eventuate.Tram.csproj" />
</ItemGroup>

</Project>
9 changes: 9 additions & 0 deletions IO.Eventuate.Tram.IntegrationTests/TestDatabase/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash

#build the Tram test database
/opt/mssql-tools/bin/sqlcmd -S $TRAM_DB_SERVER -U sa -P $TRAM_SA_PASSWORD -b -Q "CREATE DATABASE $TRAM_DB" || exit 1
/opt/mssql-tools/bin/sqlcmd -S $TRAM_DB_SERVER -U sa -P $TRAM_SA_PASSWORD -b -Q "ALTER DATABASE $TRAM_DB SET RECOVERY SIMPLE" || exit 1
export TRAM_SCHEMA=$TRAM_SCHEMA
/opt/mssql-tools/bin/sqlcmd -S $TRAM_DB_SERVER -U sa -P $TRAM_SA_PASSWORD -b -d $TRAM_DB -I -i initialize-database.sql || exit 1
export TRAM_SCHEMA=$TRAM_SCHEMA2
/opt/mssql-tools/bin/sqlcmd -S $TRAM_DB_SERVER -U sa -P $TRAM_SA_PASSWORD -b -d $TRAM_DB -I -i initialize-database.sql || exit 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
IF (NOT EXISTS(
SELECT name
FROM sys.schemas
WHERE name = '$(TRAM_SCHEMA)'))
BEGIN
EXEC('CREATE SCHEMA [$(TRAM_SCHEMA)]')
END

DROP TABLE IF EXISTS $(TRAM_SCHEMA).message;
DROP TABLE IF EXISTS $(TRAM_SCHEMA).received_messages;

CREATE TABLE $(TRAM_SCHEMA).message (
id NVARCHAR(200) PRIMARY KEY,
destination NVARCHAR(1000) NOT NULL,
headers NVARCHAR(1000) NOT NULL,
payload NVARCHAR(MAX) NOT NULL,
published SMALLINT DEFAULT 0
);

CREATE TABLE $(TRAM_SCHEMA).received_messages (
consumer_id NVARCHAR(200),
message_id NVARCHAR(200),
PRIMARY KEY(consumer_id, message_id)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System.Collections.Generic;
using IO.Eventuate.Tram.Events.Common;
using IO.Eventuate.Tram.IntegrationTests.TestHelpers;
using IO.Eventuate.Tram.Messaging.Consumer.Kafka;
using NUnit.Framework;
using Microsoft.EntityFrameworkCore;

namespace IO.Eventuate.Tram.IntegrationTests.TestFixtures
{
[TestFixture]
public class BadSchemaIntegrationTests : IntegrationTestsBase
{
[SetUp]
public void Setup()
{
TestSetup("badschema", false, EventuateKafkaConsumerConfigurationProperties.Empty());
}

[TearDown]
public void TearDown()
{
DisposeTestHost();
}

[Test]
public void Publish_DatabaseSchemaNotCreated_ThrowsException()
{
// Arrange
TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2);
TestEventConsumer consumer = GetTestConsumer();

// Act
GetTestPublisher().Publish(AggregateType, AggregateType, new List<IDomainEvent> { msg1 });
Assert.Throws<DbUpdateException>(delegate ()
{
GetDbContext().SaveChanges();
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Confluent.Kafka;
using IO.Eventuate.Tram.Database;
using IO.Eventuate.Tram.Events.Publisher;
using IO.Eventuate.Tram.IntegrationTests.TestHelpers;
using IO.Eventuate.Tram.Messaging.Common;
using IO.Eventuate.Tram.Messaging.Consumer.Kafka;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using NUnit.Framework;

namespace IO.Eventuate.Tram.IntegrationTests.TestFixtures
{
public class IntegrationTestsBase
{
private const string TestSettingsFile = "testsettings.json";
private string _subscriberId = "xx";
protected const string AggregateType = "TestMessagesTopic";
protected string EventuateDatabaseSchemaName = "eventuate";

protected TestSettings TestSettings;

// There can only be one of these between all of the tests because there's no good way to tear it down and start again
private static IHost _host = null;
private static EventuateTramDbContext _dbContext = null;
private static IDomainEventPublisher _domainEventPublisher = null;
private static TestEventConsumer _testEventConsumer = null;
private static TestMessageInterceptor _interceptor = null;

public IntegrationTestsBase()
{
IConfigurationRoot configuration = null;
try
{
IConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
ConfigureFromEnvironmentAndSettingsFile(configurationBuilder);
configuration = configurationBuilder.Build();
}
catch
{
IConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
ConfigureFromEnvironment(configurationBuilder);
configuration = configurationBuilder.Build();
}

TestSettings = configuration.Get<TestSettings>();
}

protected void TestSetup(string schema, bool withInterceptor, EventuateKafkaConsumerConfigurationProperties consumerConfigProperties)
{
EventuateDatabaseSchemaName = schema;
_subscriberId = Guid.NewGuid().ToString();

if (_host == null)
{
_host = SetupTestHost(withInterceptor, consumerConfigProperties);
_dbContext = _host.Services.GetService<EventuateTramDbContext>();
_domainEventPublisher = _host.Services.GetService<IDomainEventPublisher>();
_testEventConsumer = _host.Services.GetService<TestEventConsumer>();
_interceptor = (TestMessageInterceptor)_host.Services.GetService<IMessageInterceptor>();
}
}

protected void CleanupTest()
{
ClearDb(GetDbContext(), EventuateDatabaseSchemaName);
GetTestConsumer().Reset();
GetTestMessageInterceptor()?.Reset();
}

protected void CleanupKafka()
{
var config = new AdminClientConfig();
config.BootstrapServers = TestSettings.KafkaBootstrapServers;
using (var admin = new AdminClientBuilder(config).Build())
{
admin.DeleteTopicsAsync(new string[] { AggregateType }).Wait();
}
}

protected void ShowTestResults()
{
TestContext.WriteLine("Test Config");
TestContext.WriteLine(" Connection String: {0}", TestSettings.ConnectionStrings.EventuateTramDbConnection);
TestContext.WriteLine(" Kafka server: {0}", TestSettings.KafkaBootstrapServers);
TestContext.WriteLine(" Schema: {0}", EventuateDatabaseSchemaName);
TestContext.WriteLine(" Dispatcher Id: {0}", _subscriberId);
TestContext.WriteLine(" Aggregate Type: {0}", AggregateType);

TestContext.WriteLine("Test Results");
TestContext.WriteLine(" N Messages in DB: {0}", _dbContext.Messages.Count());
TestContext.WriteLine(" Unpublished Count: {0}", _dbContext.Messages.Count(msg => msg.Published == 0));
TestContext.WriteLine(" N Received in DB: {0}", _dbContext.ReceivedMessages.Count(msg => msg.MessageId != null));
TestContext.WriteLine(" Received Type 1: {0}", _testEventConsumer.Type1MessageCount);
TestContext.WriteLine(" Received Type 2: {0}", _testEventConsumer.Type2MessageCount);
TestContext.WriteLine(" Exception Count: {0}", _testEventConsumer.ExceptionCount);

if (_interceptor != null)
{
TestContext.WriteLine("Message Interceptor Counts");
TestContext.WriteLine(" Pre Send: {0}", _interceptor.PreSendCount);
TestContext.WriteLine(" Post Send: {0}", _interceptor.PostSendCount);
TestContext.WriteLine(" Pre Receive: {0}", _interceptor.PreReceiveCount);
TestContext.WriteLine(" Post Receive: {0}", _interceptor.PostReceiveCount);
TestContext.WriteLine(" Pre Handle: {0}", _interceptor.PreHandleCount);
TestContext.WriteLine(" Post Handle: {0}", _interceptor.PostHandleCount);
}
}

/// <summary>
/// Set up the configuration for the HostBuilder
/// </summary>
protected void ConfigureFromEnvironmentAndSettingsFile(IConfigurationBuilder config,
Dictionary<string, string> overrides = null)
{
config
.AddJsonFile(TestSettingsFile, false)
.AddEnvironmentVariables()
.AddInMemoryCollection(overrides);
}

/// <summary>
/// Set up the configuration for the HostBuilder
/// </summary>
protected void ConfigureFromEnvironment(IConfigurationBuilder config,
Dictionary<string, string> overrides = null)
{
config
.AddEnvironmentVariables()
.AddInMemoryCollection(overrides);
}

protected IHost SetupTestHost(bool withInterceptor, EventuateKafkaConsumerConfigurationProperties consumerConfigProperties)
{
var host = new TestHostBuilder()
.SetConnectionString(TestSettings.ConnectionStrings.EventuateTramDbConnection)
.SetEventuateDatabaseSchemaName(EventuateDatabaseSchemaName)
.SetKafkaBootstrapServers(TestSettings.KafkaBootstrapServers)
.SetSubscriberId(_subscriberId)
.SetDomainEventHandlersFactory(
provider =>
{
var consumer = provider.GetRequiredService<TestEventConsumer>();
return consumer.DomainEventHandlers(AggregateType);
})
.SetConsumerConfigProperties(consumerConfigProperties)
.Build<TestEventConsumer>(withInterceptor);
host.StartAsync().Wait();
return host;
}

protected void DisposeTestHost()
{
if (_host == null)
return;

_host.StopAsync().Wait();
_host.Dispose();
_host = null;
_dbContext = null;
_domainEventPublisher = null;
_testEventConsumer = null;
}

protected TestEventConsumer GetTestConsumer()
{
return _testEventConsumer;
}

protected TestMessageInterceptor GetTestMessageInterceptor()
{
return _interceptor;
}

protected IDomainEventPublisher GetTestPublisher()
{
return _domainEventPublisher;
}

protected EventuateTramDbContext GetDbContext()
{
return _dbContext;
}

protected void ClearDb(EventuateTramDbContext dbContext, String eventuateDatabaseSchemaName)
{
dbContext.Database.ExecuteSqlCommand(String.Format("Delete from [{0}].[message]", eventuateDatabaseSchemaName));
dbContext.Database.ExecuteSqlCommand(String.Format("Delete from [{0}].[received_messages]", eventuateDatabaseSchemaName));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using IO.Eventuate.Tram.Events.Common;
using IO.Eventuate.Tram.IntegrationTests.TestHelpers;
using IO.Eventuate.Tram.Messaging.Consumer.Kafka;
using NUnit.Framework;

namespace IO.Eventuate.Tram.IntegrationTests.TestFixtures
{
[TestFixture]
public class PerformanceTests : IntegrationTestsBase
{
[SetUp]
public void Setup()
{
CleanupKafka();
TestSetup("eventuate", false, EventuateKafkaConsumerConfigurationProperties.Empty());
CleanupTest();
}

[TearDown]
public void TearDown()
{
DisposeTestHost();
}

[Test]
public void Send1000Message_Within1Minute()
{
// Arrange
TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2);
TestEventConsumer consumer = GetTestConsumer();

// Act
for (int x = 0; x < 1000; x++)
{
GetTestPublisher().Publish(AggregateType, AggregateType, new List<IDomainEvent> { msg1 });
GetDbContext().SaveChanges();
}

// Allow time for messages to process
int count = 300;
while (consumer.Type1MessageCount < 1000 && count > 0)
{
Thread.Sleep(1000);
count--;
}

ShowTestResults();

// Assert
Assert.AreEqual(1000, GetDbContext().Messages.Count(), "Expect 1000 messages produced");
Assert.AreEqual(1000, consumer.Type1MessageCount, "Received by consumer count must be 1000");
Assert.AreEqual(0, GetDbContext().Messages.Count(msg => msg.Published == 0), "No unpublished messages");
Assert.AreEqual(1000, GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), "Expect 1000 messages received");
Assert.Less(consumer.Type1Duration().TotalSeconds, 60.0, "Time must be less than 60 seconds");

TestContext.WriteLine("Performance Test completed in {0} seconds", consumer.Type1Duration().TotalSeconds);
}
}
}
Loading

0 comments on commit 57245bc

Please sign in to comment.