Skip to content

Commit

Permalink
Merge pull request #31 from mizrael/kafka
Browse files Browse the repository at this point in the history
added Kafka transport library
  • Loading branch information
mizrael authored Mar 24, 2021
2 parents 8a9599a + 093e635 commit 8d12110
Show file tree
Hide file tree
Showing 58 changed files with 1,733 additions and 50 deletions.
22 changes: 22 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ jobs:
environment:
SA_PASSWORD: "Sup3r_p4ssword123"
ACCEPT_EULA: "Y"
- image: 'bitnami/zookeeper:3'
environment:
ALLOW_ANONYMOUS_LOGIN: yes
- image: 'bitnami/kafka:2'
environment:
KAFKA_LISTENERS: LISTENER_BOB://localhost:29092,LISTENER_FRED://localhost:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://localhost:29092,LISTENER_FRED://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB
KAFKA_CFG_ZOOKEEPER_CONNECT: localhost:2181
ALLOW_PLAINTEXT_LISTENER: yes

steps:
- checkout
Expand All @@ -38,6 +49,17 @@ jobs:
environment:
SA_PASSWORD: "Sup3r_p4ssword123"
ACCEPT_EULA: "Y"
- image: 'bitnami/zookeeper:3'
environment:
ALLOW_ANONYMOUS_LOGIN: yes
- image: 'bitnami/kafka:2'
environment:
KAFKA_LISTENERS: LISTENER_BOB://localhost:29092,LISTENER_FRED://localhost:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://localhost:29092,LISTENER_FRED://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB
KAFKA_CFG_ZOOKEEPER_CONNECT: localhost:2181
ALLOW_PLAINTEXT_LISTENER: yes

steps:
- checkout
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ obj/
*.DotSettings.user
.docker/
packages/
TestResults/
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

All notable changes to this project will be documented in this file.

## [2021-03-23](https://github.com/mizrael/OpenSleigh/pull/31)
### Added
- added Kafka transport library

## [2021-02-24](https://github.com/mizrael/OpenSleigh/pull/27)
### Added
- added support for compensating transactions
Expand Down
14 changes: 14 additions & 0 deletions OpenSleigh.sln
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Persistence.Cosm
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Persistence.Cosmos.SQL.Tests", "tests\OpenSleigh.Persistence.Cosmos.SQL.Tests\OpenSleigh.Persistence.Cosmos.SQL.Tests.csproj", "{08E996DB-3D0C-4A63-8166-BF61732D3B21}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OpenSleigh.Transport.Kafka", "src\OpenSleigh.Transport.Kafka\OpenSleigh.Transport.Kafka.csproj", "{887358C5-9EFF-4498-A04B-E12B199EC259}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OpenSleigh.Transport.Kafka.Tests", "tests\OpenSleigh.Transport.Kafka.Tests\OpenSleigh.Transport.Kafka.Tests.csproj", "{5B363808-664B-42F4-8C38-EEFB9F05C500}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -109,6 +113,14 @@ Global
{08E996DB-3D0C-4A63-8166-BF61732D3B21}.Debug|Any CPU.Build.0 = Debug|Any CPU
{08E996DB-3D0C-4A63-8166-BF61732D3B21}.Release|Any CPU.ActiveCfg = Release|Any CPU
{08E996DB-3D0C-4A63-8166-BF61732D3B21}.Release|Any CPU.Build.0 = Release|Any CPU
{887358C5-9EFF-4498-A04B-E12B199EC259}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{887358C5-9EFF-4498-A04B-E12B199EC259}.Debug|Any CPU.Build.0 = Debug|Any CPU
{887358C5-9EFF-4498-A04B-E12B199EC259}.Release|Any CPU.ActiveCfg = Release|Any CPU
{887358C5-9EFF-4498-A04B-E12B199EC259}.Release|Any CPU.Build.0 = Release|Any CPU
{5B363808-664B-42F4-8C38-EEFB9F05C500}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5B363808-664B-42F4-8C38-EEFB9F05C500}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5B363808-664B-42F4-8C38-EEFB9F05C500}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5B363808-664B-42F4-8C38-EEFB9F05C500}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -128,6 +140,8 @@ Global
{13805E00-0578-4E71-AB8B-0AAFCCFD3551} = {5594CC89-F905-46B2-B938-27B8050D9CA3}
{E367B2EF-2111-4BBB-B37F-0263559E5FD2} = {5594CC89-F905-46B2-B938-27B8050D9CA3}
{08E996DB-3D0C-4A63-8166-BF61732D3B21} = {5594CC89-F905-46B2-B938-27B8050D9CA3}
{887358C5-9EFF-4498-A04B-E12B199EC259} = {86CDC8FD-5E6F-4F45-A073-9F2749192582}
{5B363808-664B-42F4-8C38-EEFB9F05C500} = {86CDC8FD-5E6F-4F45-A073-9F2749192582}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {D5297242-16B4-43D7-B329-362EBCE2A5A5}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Core.BackgroundServices
{
Expand All @@ -21,8 +22,15 @@ public SubscribersBackgroundService(IEnumerable<ISubscriber> subscribers, System
public override async Task StartAsync(CancellationToken cancellationToken)
{
if (!_systemInfo.PublishOnly)
await Task.WhenAll(_subscribers.Select(s => s.StartAsync(cancellationToken)));

{
var tasks = _subscribers.Select(s => s.StartAsync(cancellationToken));

await Task.Factory.StartNew(() => Task.WhenAll(tasks),
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Current);
}

await base.StartAsync(cancellationToken);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
using System.Threading;
using System.Threading.Tasks;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Core
namespace OpenSleigh.Core.Messaging
{
public interface ISubscriber
{
Expand Down
2 changes: 1 addition & 1 deletion src/OpenSleigh.Core/OpenSleigh.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<PackageVersion>1.1.1</PackageVersion>
<PackageVersion>1.1.2</PackageVersion>
<IsPackable>true</IsPackable>
<Authors>davidguida</Authors>
<Product>OpenSleigh</Product>
Expand Down
29 changes: 13 additions & 16 deletions src/OpenSleigh.Persistence.Cosmos.SQL/Entities/OutboxMessage.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
using System;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;

namespace OpenSleigh.Persistence.Cosmos.SQL.Entities
{
public record OutboxMessage(Guid Id, byte[] Data, string Type)
public class OutboxMessage
{
private OutboxMessage()
{
}

public Guid Id { get; init; }
public byte[] Data { get; init; }
public string Type { get; init; }
public string PartitionKey { get; init; }

public MessageStatuses Status { get; private set; }
public Guid? LockId { get; private set; }
public DateTime? LockTime { get; private set; }
Expand All @@ -26,8 +32,11 @@ public void Release()
this.Status = OutboxMessage.MessageStatuses.Processed;
}

public static OutboxMessage New(Guid id, byte[] data, string type, Guid correlationId) => new OutboxMessage(id, data, type)
public static OutboxMessage New(Guid id, byte[] data, string type, Guid correlationId) => new OutboxMessage
{
Id = id,
Data = data,
Type = type,
Status = MessageStatuses.Pending,
PartitionKey = correlationId.ToString()
};
Expand All @@ -38,16 +47,4 @@ public enum MessageStatuses
Processed
}
}

internal class OutboxMessageStateEntityTypeConfiguration : IEntityTypeConfiguration<OutboxMessage>
{
public void Configure(EntityTypeBuilder<OutboxMessage> builder)
{
builder.ToContainer("OutboxMessages")
.HasNoDiscriminator();

builder.HasKey(e => e.Id);
builder.HasPartitionKey(e => e.PartitionKey);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.Diagnostics.CodeAnalysis;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;

namespace OpenSleigh.Persistence.Cosmos.SQL.Entities
{
[ExcludeFromCodeCoverage]
internal class OutboxMessageStateEntityTypeConfiguration : IEntityTypeConfiguration<OutboxMessage>
{
public void Configure(EntityTypeBuilder<OutboxMessage> builder)
{
builder.ToContainer("OutboxMessages")
.HasNoDiscriminator();

builder.HasKey(e => e.Id);
builder.HasPartitionKey(e => e.PartitionKey);
}
}
}
14 changes: 0 additions & 14 deletions src/OpenSleigh.Persistence.Cosmos.SQL/Entities/SagaState.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using System;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;

namespace OpenSleigh.Persistence.Cosmos.SQL.Entities
{
Expand All @@ -11,16 +9,4 @@ public record SagaState(string PartitionKey, Guid CorrelationId, string Type)
public DateTime? LockTime { get; set; } = null;
}

internal class SagaStateEntityTypeConfiguration : IEntityTypeConfiguration<SagaState>
{
public void Configure(EntityTypeBuilder<SagaState> builder)
{
builder.ToContainer("SagaStates")
.HasNoDiscriminator();

builder.HasKey(e => new {e.CorrelationId, e.Type});
builder.HasPartitionKey(e => e.PartitionKey);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Diagnostics.CodeAnalysis;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;

namespace OpenSleigh.Persistence.Cosmos.SQL.Entities
{
[ExcludeFromCodeCoverage]
internal class SagaStateEntityTypeConfiguration : IEntityTypeConfiguration<SagaState>
{
public void Configure(EntityTypeBuilder<SagaState> builder)
{
builder.ToContainer("SagaStates")
.HasNoDiscriminator();

builder.HasKey(e => new {e.CorrelationId, e.Type});
builder.HasPartitionKey(e => e.PartitionKey);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<PackageVersion>0.0.2</PackageVersion>
<PackageVersion>0.0.3</PackageVersion>
<IsPackable>true</IsPackable>
<Authors>davidguida</Authors>
<Product>OpenSleigh.Persistence.Cosmos.SQL</Product>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using OpenSleigh.Core;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Persistence.InMemory.Messaging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<PackageVersion>1.0.1</PackageVersion>
<PackageVersion>1.0.2</PackageVersion>
<IsPackable>true</IsPackable>
<Authors>davidguida</Authors>
<Product>OpenSleigh.Persistence.InMemory</Product>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<PackageVersion>1.0.1</PackageVersion>
<PackageVersion>1.0.2</PackageVersion>
<IsPackable>true</IsPackable>
<Authors>davidguida</Authors>
<Product>OpenSleigh.Persistence.SQL</Product>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<PackageVersion>1.0.1</PackageVersion>
<PackageVersion>1.0.2</PackageVersion>
<IsPackable>true</IsPackable>
<Authors>davidguida</Authors>
<Product>OpenSleigh.Transport.AzureServiceBus</Product>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Logging;
using OpenSleigh.Core;
using OpenSleigh.Core.Messaging;
using System;
using System.Threading;
Expand Down
15 changes: 15 additions & 0 deletions src/OpenSleigh.Transport.Kafka/GuidDeserializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;
using System.Runtime.CompilerServices;
using Confluent.Kafka;

[assembly: InternalsVisibleTo("OpenSleigh.Transport.Kafka.Tests")]
namespace OpenSleigh.Transport.Kafka
{
internal class GuidDeserializer : IDeserializer<Guid>
{
public Guid Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
return new Guid(data);
}
}
}
8 changes: 8 additions & 0 deletions src/OpenSleigh.Transport.Kafka/HeaderNames.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace OpenSleigh.Transport.Kafka
{
public static class HeaderNames
{
public const string MessageType = "message-type";
public const string Error = "error";
}
}
33 changes: 33 additions & 0 deletions src/OpenSleigh.Transport.Kafka/IKafkaBusConfigurationBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using Microsoft.Extensions.DependencyInjection;
using OpenSleigh.Core.DependencyInjection;
using OpenSleigh.Core.Messaging;
using System;
using System.Diagnostics.CodeAnalysis;

namespace OpenSleigh.Transport.Kafka
{
public interface IKafkaBusConfigurationBuilder
{
void UseMessageNamingPolicy<TM>(QueueReferencesPolicy<TM> policy) where TM : IMessage;
}


[ExcludeFromCodeCoverage]
internal class DefaultKafkaBusConfigurationBuilder : IKafkaBusConfigurationBuilder
{
private readonly IBusConfigurator _busConfigurator;

public DefaultKafkaBusConfigurationBuilder(IBusConfigurator busConfigurator)
{
_busConfigurator = busConfigurator;
}

public void UseMessageNamingPolicy<TM>(QueueReferencesPolicy<TM> policy) where TM : IMessage
{
if (policy == null)
throw new ArgumentNullException(nameof(policy));

_busConfigurator.Services.AddSingleton(policy);
}
}
}
12 changes: 12 additions & 0 deletions src/OpenSleigh.Transport.Kafka/IKafkaMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;

namespace OpenSleigh.Transport.Kafka
{
public interface IKafkaMessageHandler
{
Task HandleAsync(ConsumeResult<Guid, byte[]> result, QueueReferences queueReferences, CancellationToken cancellationToken = default);
}
}
17 changes: 17 additions & 0 deletions src/OpenSleigh.Transport.Kafka/IKafkaPublisherExecutor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Transport.Kafka
{
public interface IKafkaPublisherExecutor
{
Task<DeliveryResult<Guid, byte[]>> PublishAsync(IMessage message,
string topic,
IEnumerable<Header> additionalHeaders = null,
CancellationToken cancellationToken = default);
}
}
Loading

0 comments on commit 8d12110

Please sign in to comment.