Skip to content

Commit

Permalink
feat(Client): Added a new Kafka binding handler
Browse files Browse the repository at this point in the history
Signed-off-by: Charles d'Avernas <[email protected]>
cdavernas committed Jan 6, 2025

Verified

This commit was signed with the committer’s verified signature.
cdavernas Charles d'Avernas
1 parent 55c38f2 commit ccacdb3
Showing 22 changed files with 1,008 additions and 14 deletions.
7 changes: 7 additions & 0 deletions Neuroglia.AsyncApi.sln
Original file line number Diff line number Diff line change
@@ -65,6 +65,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Neuroglia.AsyncApi.Client.B
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Neuroglia.AsyncApi.Client.Bindings.Nats", "src\Neuroglia.AsyncApi.Client.Bindings.Nats\Neuroglia.AsyncApi.Client.Bindings.Nats.csproj", "{DCADC636-53BA-408A-A870-CC07FD63A3F3}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Neuroglia.AsyncApi.Client.Bindings.Kafka", "src\Neuroglia.AsyncApi.Client.Bindings.Kafka\Neuroglia.AsyncApi.Client.Bindings.Kafka.csproj", "{84AAE014-BD45-4B6F-96CE-01E364B802EB}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -139,6 +141,10 @@ Global
{DCADC636-53BA-408A-A870-CC07FD63A3F3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DCADC636-53BA-408A-A870-CC07FD63A3F3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DCADC636-53BA-408A-A870-CC07FD63A3F3}.Release|Any CPU.Build.0 = Release|Any CPU
{84AAE014-BD45-4B6F-96CE-01E364B802EB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{84AAE014-BD45-4B6F-96CE-01E364B802EB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{84AAE014-BD45-4B6F-96CE-01E364B802EB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{84AAE014-BD45-4B6F-96CE-01E364B802EB}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -163,6 +169,7 @@ Global
{8FD99A95-410A-4E38-AC7C-938894053E87} = {4B933DF9-CD24-44B1-AF64-0D5E75B9AB45}
{D7665DAF-42CD-47E7-8A77-52C82B696D9B} = {4B933DF9-CD24-44B1-AF64-0D5E75B9AB45}
{DCADC636-53BA-408A-A870-CC07FD63A3F3} = {4B933DF9-CD24-44B1-AF64-0D5E75B9AB45}
{84AAE014-BD45-4B6F-96CE-01E364B802EB} = {4B933DF9-CD24-44B1-AF64-0D5E75B9AB45}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {DC433DEB-01E5-4328-B0BB-6FFFE8C7363F}
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ public static class IAsyncApiClientOptionsBuilderExtensions
public static IAsyncApiClientOptionsBuilder AddAllBindingHandlers(this IAsyncApiClientOptionsBuilder builder)
{
builder.AddHttpBindingHandler();
builder.AddKafkaBindingHandler();
builder.AddMqttBindingHandler();
builder.AddNatsBindingHandler();
builder.AddRedisBindingHandler();
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@

<ItemGroup>
<ProjectReference Include="..\Neuroglia.AsyncApi.Client.Bindings.Http\Neuroglia.AsyncApi.Client.Bindings.Http.csproj" />
<ProjectReference Include="..\Neuroglia.AsyncApi.Client.Bindings.Kafka\Neuroglia.AsyncApi.Client.Bindings.Kafka.csproj" />
<ProjectReference Include="..\Neuroglia.AsyncApi.Client.Bindings.Mqtt\Neuroglia.AsyncApi.Client.Bindings.Mqtt.csproj" />
<ProjectReference Include="..\Neuroglia.AsyncApi.Client.Bindings.Nats\Neuroglia.AsyncApi.Client.Bindings.Nats.csproj" />
<ProjectReference Include="..\Neuroglia.AsyncApi.Client.Bindings.Redis\Neuroglia.AsyncApi.Client.Bindings.Redis.csproj" />
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright © 2021-Present Neuroglia SRL. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace Neuroglia.AsyncApi.Client.Bindings.Kafka.Configuration;

/// <summary>
/// Represents the options used to configure a <see cref="KafkaBindingHandler"/>
/// </summary>
public class KafkaBindingHandlerOptions
: BindingHandlerOptions
{



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright © 2021-Present Neuroglia SRL. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace Neuroglia.AsyncApi.Client.Bindings;

/// <summary>
/// Defines extensions for <see cref="IAsyncApiClientOptionsBuilder"/>s
/// </summary>
public static class IAsyncApiConfigurationBuilderKafkaExtensions
{

/// <summary>
/// Adds and configures an <see cref="IBindingHandler"/> used to handle Kafka operations
/// </summary>
/// <param name="builder">The extended <see cref="IAsyncApiClientOptionsBuilder"/></param>
/// <param name="setup">An <see cref="Action{T}"/>, if any, used to setup the <see cref="IBindingHandler"/>'s options</param>
/// <returns>The configured <see cref="IAsyncApiClientOptionsBuilder"/></returns>
public static IAsyncApiClientOptionsBuilder AddKafkaBindingHandler(this IAsyncApiClientOptionsBuilder builder, Action<KafkaBindingHandlerOptions>? setup = null)
{
setup ??= _ => { };
builder.AddBindingHandler<KafkaBindingHandler, KafkaBindingHandlerOptions>(setup);
return builder;
}

}
136 changes: 136 additions & 0 deletions src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaBindingHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright © 2021-Present Neuroglia SRL. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Confluent.Kafka;
using Json.Schema;
using Microsoft.Extensions.DependencyInjection;
using System.Text;

namespace Neuroglia.AsyncApi.Client.Bindings.Kafka;

/// <summary>
/// Represents the default NATS implementation of the <see cref="IBindingHandler"/> interface
/// </summary>
/// <param name="serviceProvider">The current <see cref="IServiceProvider"/></param>
/// <param name="logger">The service used to perform logging</param>
/// <param name="options">The service used to access the current <see cref="KafkaBindingHandlerOptions"/></param>
/// <param name="serializerProvider">The service used to provide <see cref="ISerializer"/>s</param>
/// <param name="jsonSerializer">The service used to serialize/deserialize data to/from JSON</param>
public class KafkaBindingHandler(IServiceProvider serviceProvider, ILogger<KafkaBindingHandler> logger, IOptions<KafkaBindingHandlerOptions> options, ISerializerProvider serializerProvider, IJsonSerializer jsonSerializer)
: IBindingHandler<KafkaBindingHandlerOptions>
{

/// <summary>
/// Gets the current <see cref="IServiceProvider"/>
/// </summary>
protected IServiceProvider ServiceProvider { get; } = serviceProvider;

/// <summary>
/// Gets the service used to perform logging
/// </summary>
protected ILogger Logger { get; } = logger;

/// <summary>
/// Gets the current <see cref="KafkaBindingHandlerOptions"/>
/// </summary>
protected KafkaBindingHandlerOptions Options { get; } = options.Value;

/// <summary>
/// Gets the service used to provide <see cref="ISerializer"/>s
/// </summary>
protected ISerializerProvider SerializerProvider { get; } = serializerProvider;

/// <summary>
/// Gets the service used to serialize/deserialize data to/from JSON
/// </summary>
protected IJsonSerializer JsonSerializer { get; } = jsonSerializer;

/// <inheritdoc/>
public virtual bool Supports(string protocol, string? protocolVersion) => protocol.Equals(AsyncApiProtocol.Kafka, StringComparison.OrdinalIgnoreCase);

/// <inheritdoc/>
public virtual async Task<IAsyncApiPublishOperationResult> PublishAsync(AsyncApiPublishOperationContext context, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(context);
try
{
var serverBinding = context.ServerBinding as KafkaServerBindingDefinition;
var channelBinding = context.ChannelBinding as KafkaChannelBindingDefinition;
var operationBinding = context.OperationBinding as KafkaOperationBindingDefinition;
var messageBinding = context.MessageBinding as KafkaMessageBindingDefinition;
var producerConfig = new ProducerConfig
{
BootstrapServers = $"{context.Host}{context.Path}",
AllowAutoCreateTopics = true
};
using var producer = new ProducerBuilder<Null, byte[]>(producerConfig).Build();
using var stream = new MemoryStream();
var serializer = SerializerProvider.GetSerializersFor(context.ContentType).FirstOrDefault() ?? throw new NullReferenceException($"Failed to find a serializer for the specified content type '{context.ContentType}'");
serializer.Serialize(context.Payload ?? new { }, stream);
await stream.FlushAsync(cancellationToken).ConfigureAwait(false);
stream.Position = 0;
var payload = stream.ToArray();
var headers = context.Headers == null ? null : new Headers();
if (headers != null && context.Headers != null) foreach (var header in context.Headers.ToDictionary()!) headers.Add(header.Key, serializer.SerializeToByteArray(header.Value));
var message = new Message<Null, byte[]>()
{
Value = payload,
Headers = headers,
};
var topic = channelBinding?.Topic ?? context.Channel!;
var result = await producer.ProduceAsync(topic, message, cancellationToken).ConfigureAwait(false);
return new KafkaPublishOperationResult()
{
PersistenceStatus = result.Status,
Partition = result.Partition,
Offset = result.Offset,
TopicPartition = result.TopicPartition,
TopicPartitionOffset = result.TopicPartitionOffset
};
}
catch (Exception ex)
{
return new KafkaPublishOperationResult() { Exception = ex };
}
}

/// <inheritdoc/>
public virtual Task<IAsyncApiSubscribeOperationResult> SubscribeAsync(AsyncApiSubscribeOperationContext context, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(context);
try
{
var serverBinding = context.ServerBinding as KafkaServerBindingDefinition;
var channelBinding = context.ChannelBinding as KafkaChannelBindingDefinition;
var operationBinding = context.OperationBinding as KafkaOperationBindingDefinition;
var groupId = operationBinding?.GroupId?.GetKeyword<DefaultKeyword>()?.Value ?? operationBinding?.GroupId?.GetKeyword<EnumKeyword>()?.Values?[0];
var clientId = operationBinding?.ClientId?.GetKeyword<DefaultKeyword>()?.Value ?? operationBinding?.ClientId?.GetKeyword<EnumKeyword>()?.Values?[0];
var topic = channelBinding?.Topic ?? context.Channel!;
var consumerConfig = new ConsumerConfig
{
BootstrapServers = $"{context.Host}{context.Path}",
GroupId = groupId == null ? null : JsonSerializer.Deserialize<string>(groupId),
ClientId = clientId == null ? null : JsonSerializer.Deserialize<string>(clientId)
};
var consumer = new ConsumerBuilder<Ignore, byte[]>(consumerConfig).Build();
consumer.Subscribe(topic);
var subscription = ActivatorUtilities.CreateInstance<KafkaSubscription>(ServiceProvider, context.Document, context.Messages, context.DefaultContentType, consumer);
return Task.FromResult<IAsyncApiSubscribeOperationResult>(new KafkaSubscribeOperationResult(subscription));
}
catch(Exception ex)
{
return Task.FromResult<IAsyncApiSubscribeOperationResult>(new KafkaSubscribeOperationResult() { Exception = ex });
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright © 2021-Present Neuroglia SRL. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Confluent.Kafka;

namespace Neuroglia.AsyncApi.Client.Bindings.Kafka;

/// <summary>
/// Represents an object used to describe the result of a Kafka publish operation
/// </summary>
public class KafkaPublishOperationResult
: AsyncApiPublishOperationResult
{

/// <summary>
/// Gets the persistence status of the published message
/// </summary>
public virtual PersistenceStatus? PersistenceStatus { get; init; }

/// <summary>
/// Gets the partition associated with the published message
/// </summary>
public virtual Partition? Partition { get; init; }

/// <summary>
/// Gets the partition offset associated with the published message
/// </summary>
public virtual Offset? Offset { get; init; }

/// <summary>
/// Gets the topic partition associated with the published message
/// </summary>
public virtual TopicPartition? TopicPartition { get; init; }

/// <summary>
/// Gets the topic partition offset associated with the published message
/// </summary>
public virtual TopicPartitionOffset? TopicPartitionOffset { get; init; }

/// <summary>
/// Gets/sets the <see cref="System.Exception"/>, if any, that occurred during publishing
/// </summary>
public virtual Exception? Exception { get; init; }

/// <inheritdoc/>
public override bool IsSuccessful => Exception == null;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright © 2021-Present Neuroglia SRL. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace Neuroglia.AsyncApi.Client.Bindings.Kafka;

/// <summary>
/// Represents an object used to describe the result of a Kafka subscribe operation
/// </summary>
/// <param name="messages">An <see cref="IObservable{T}"/>, if any, used to observe incoming <see cref="IAsyncApiMessage"/>s</param>
public class KafkaSubscribeOperationResult(IObservable<IAsyncApiMessage>? messages = null)
: AsyncApiSubscribeOperationResult
{

/// <summary>
/// Gets/sets the <see cref="System.Exception"/>, if any, that occurred during subscription
/// </summary>
public virtual Exception? Exception { get; init; }

/// <inheritdoc/>
public override IObservable<IAsyncApiMessage>? Messages { get; } = messages;

/// <inheritdoc/>
public override bool IsSuccessful => Exception == null;

}
238 changes: 238 additions & 0 deletions src/Neuroglia.AsyncApi.Client.Bindings.Kafka/KafkaSubscription.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
// Copyright © 2021-Present Neuroglia SRL. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Confluent.Kafka;
using Neuroglia.AsyncApi.v3;
using System.Reactive.Subjects;
using System.Text;
using YamlDotNet.Core.Tokens;

namespace Neuroglia.AsyncApi.Client.Bindings.Kafka;

/// <summary>
/// Represents a subscription to a Kafka channel, used to stream <see cref="IAsyncApiMessage"/>s
/// </summary>
public class KafkaSubscription
: IObservable<IAsyncApiMessage>, IDisposable, IAsyncDisposable
{

bool _disposed;

/// <summary>
/// Initializes a new <see cref="KafkaSubscription"/>
/// </summary>
/// <param name="logger">The service used to perform logging</param>
/// <param name="consumer">The service used to consume Kafka messages</param>
/// <param name="messageContentType">The content type of consumed messages</param>
/// <param name="runtimeExpressionEvaluator">The service used to evaluate runtime expressions</param>
/// <param name="schemaHandlerProvider">The service used to provide <see cref="ISchemaHandler"/>s</param>
/// <param name="serializerProvider">The service used to provide <see cref="ISerializer"/>s</param>
/// <param name="document">The <see cref="V3AsyncApiDocument"/> that defines the operation for which to consume MQTT messages</param>
/// <param name="messageDefinitions">An <see cref="IEnumerable{T}"/> containing the definitions of all messages that can potentially be consumed</param>
public KafkaSubscription(ILogger<KafkaSubscription> logger, IConsumer<Ignore, byte[]> consumer, string messageContentType, IRuntimeExpressionEvaluator runtimeExpressionEvaluator, ISchemaHandlerProvider schemaHandlerProvider, ISerializerProvider serializerProvider, V3AsyncApiDocument document, IEnumerable<V3MessageDefinition> messageDefinitions)
{
Logger = logger;
Consumer = consumer;
MessageContentType = messageContentType;
RuntimeExpressionEvaluator = runtimeExpressionEvaluator;
SchemaHandlerProvider = schemaHandlerProvider;
SerializerProvider = serializerProvider;
Document = document;
MessageDefinitions = messageDefinitions;
_ = Task.Run(ReadAsync);
}

/// <summary>
/// Gets the service used to perform logging
/// </summary>
protected ILogger Logger { get; }

/// <summary>
/// Gets the service used to consume Kafka messages
/// </summary>
protected IConsumer<Ignore, byte[]> Consumer { get; }

/// <summary>
/// Gets the content type of consumed messages
/// </summary>
protected string MessageContentType { get; }

/// <summary>
/// Gets the service used to evaluate runtime expressions
/// </summary>
protected IRuntimeExpressionEvaluator RuntimeExpressionEvaluator { get; }

/// <summary>
/// Gets the service used to provide <see cref="ISchemaHandler"/>s
/// </summary>
protected ISchemaHandlerProvider SchemaHandlerProvider { get; }

/// <summary>
/// Gets the service used to provide <see cref="ISerializer"/>s
/// </summary>
protected ISerializerProvider SerializerProvider { get; }

/// <summary>
/// Gets the <see cref="V3AsyncApiDocument"/> that defines the operation for which to consume MQTT messages
/// </summary>
protected V3AsyncApiDocument Document { get; }

/// <summary>
/// Gets an <see cref="IEnumerable{T}"/> containing the definitions of all messages that can potentially be consumed
/// </summary>
protected IEnumerable<V3MessageDefinition> MessageDefinitions { get; }

/// <summary>
/// Gets the <see cref="KafkaSubscription"/>'s <see cref="System.Threading.CancellationTokenSource"/>
/// </summary>
protected CancellationTokenSource CancellationTokenSource { get; } = new();

/// <summary>
/// Gets the <see cref="Subject{T}"/> used to observe consumed <see cref="IAsyncApiMessage"/>
/// </summary>
protected Subject<IAsyncApiMessage> Subject { get; } = new();

/// <inheritdoc/>
public virtual IDisposable Subscribe(IObserver<IAsyncApiMessage> observer) => Subject.Subscribe(observer);

/// <summary>
/// Reads <see cref="IAsyncApiMessage"/>s from the underlying <see cref="Stream"/>
/// </summary>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual async Task ReadAsync()
{
while (!CancellationTokenSource.IsCancellationRequested)
{
try
{
var consumeResult = Consumer.Consume(CancellationTokenSource.Token);
if (consumeResult == null) continue;
var buffer = consumeResult.Message.Value;
if (buffer == null) return;
using var stream = new MemoryStream(buffer);
var serializer = SerializerProvider.GetSerializersFor(MessageContentType).FirstOrDefault() ?? throw new NullReferenceException($"Failed to find a serializer for the specified content type '{MessageContentType}'");
var payload = serializer.Deserialize<object>(stream);
var headers = consumeResult.Message.Headers?.ToDictionary(kvp => kvp.Key, kvp => serializer.Deserialize<object>(kvp.GetValueBytes()));
var messageDefinition = await MessageDefinitions.ToAsyncEnumerable().SingleOrDefaultAwaitAsync(async m => await MessageMatchesAsync(payload, headers, m, CancellationTokenSource.Token).ConfigureAwait(false), CancellationTokenSource.Token).ConfigureAwait(false) ?? throw new NullReferenceException("Failed to resolve the message definition for the specified operation. Make sure the message matches one and only one of the message definitions configured for the specified operation"); ;
var correlationId = string.Empty;
if (messageDefinition.CorrelationId != null)
{
var correlationIdDefinition = messageDefinition.CorrelationId.IsReference ? Document.DereferenceCorrelationId(messageDefinition.CorrelationId.Reference!) : messageDefinition.CorrelationId;
correlationId = await RuntimeExpressionEvaluator.EvaluateAsync(correlationIdDefinition.Location, payload, headers, CancellationTokenSource.Token).ConfigureAwait(false);
}
var message = new AsyncApiMessage(MessageContentType, payload, headers, correlationId);
Subject.OnNext(message);
}
catch (Exception ex)
{
Logger.LogError("An error occurred while consuming a Kafka message: {ex}", ex);
}
}
}

/// <summary>
/// Determines whether or not the specified payload matches the specified <see cref="V3MessageDefinition"/>
/// </summary>
/// <param name="payload">The message's payload, if any</param>
/// <param name="headers">The message's headers, if any</param>
/// <param name="message">The <see cref="V3MessageDefinition"/> to check</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A boolean indicating whether or not the specified <see cref="AsyncApiPublishOperationParameters"/> matches the specified <see cref="V3MessageDefinition"/></returns>
protected virtual async Task<bool> MessageMatchesAsync(object? payload, object? headers, V3MessageDefinition message, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(message);
if (message.Payload != null)
{
var schemaDefinition = message.Payload.IsReference ? Document.DereferenceSchema(message.Payload.Reference!) : message.Payload;
var schemaFormat = message.Payload.SchemaFormat ?? SchemaFormat.AsyncApi;
var schemaHandler = SchemaHandlerProvider.GetHandler(schemaFormat);
if (schemaHandler == null) this.Logger.LogWarning("Failed to find an handler used to validate the specified schema format '{schemaFormat}", schemaFormat);
else
{
var result = await schemaHandler.ValidateAsync(payload ?? new { }, schemaDefinition.Schema, cancellationToken).ConfigureAwait(false);
if (!result.IsSuccess()) return false;
}
}
if (message.Headers != null)
{
var schemaDefinition = message.Headers.IsReference ? Document.DereferenceSchema(message.Headers.Reference!) : message.Headers;
var schemaFormat = message.Headers.SchemaFormat ?? SchemaFormat.AsyncApi;
var schemaHandler = SchemaHandlerProvider.GetHandler(schemaFormat);
if (schemaHandler == null) this.Logger.LogWarning("Failed to find an handler used to validate the specified schema format '{schemaFormat}", schemaFormat);
else
{
var result = await schemaHandler.ValidateAsync(headers ?? new { }, schemaDefinition.Schema, cancellationToken).ConfigureAwait(false);
if (!result.IsSuccess()) return false;
}
}
if (message.CorrelationId != null)
{
var correlationIdDefinition = message.CorrelationId.IsReference ? Document.DereferenceCorrelationId(message.CorrelationId.Reference!) : message.CorrelationId;
var correlationId = await RuntimeExpressionEvaluator.EvaluateAsync(correlationIdDefinition.Location, payload, headers, cancellationToken).ConfigureAwait(false);
if (string.IsNullOrWhiteSpace(correlationId)) return false;
}
return true;
}

/// <summary>
/// Disposes of the <see cref="KafkaSubscription"/>
/// </summary>
/// <param name="disposing">A boolean indicating whether or not the <see cref="KafkaSubscription"/> is being disposed of</param>
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
CancellationTokenSource.Dispose();
Consumer.Dispose();
Subject.Dispose();
}
_disposed = true;
}
}

/// <inheritdoc/>
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}

/// <summary>
/// Disposes of the <see cref="KafkaSubscription"/>
/// </summary>
/// <param name="disposing">A boolean indicating whether or not the <see cref="KafkaSubscription"/> is being disposed of</param>
protected virtual ValueTask DisposeAsync(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
CancellationTokenSource.Dispose();
Consumer.Dispose();
Subject.Dispose();
}
_disposed = true;
}
return ValueTask.CompletedTask;
}

/// <inheritdoc/>
public async ValueTask DisposeAsync()
{
await DisposeAsync(disposing: true).ConfigureAwait(false);
GC.SuppressFinalize(this);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net9.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<Company>Neuroglia SRL</Company>
<Copyright>Copyright © 2023-Present Neuroglia SRL. All rights reserved.</Copyright>
<Authors>Neuroglia SRL</Authors>
<RepositoryUrl>https://github.com/neuroglia-io/asyncapi</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<PackageTags>neuroglia asyncapi async api client binding kafka</PackageTags>
<Version>3.0.1</Version>
<NeutralLanguage>en</NeutralLanguage>
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
<PackageIcon>logo_white_on_blue_256.png</PackageIcon>
<IsPackable>true</IsPackable>
<DebugType>embedded</DebugType>
</PropertyGroup>

<ItemGroup>
<None Include="..\..\assets\img\logo_white_on_blue_256.png">
<Pack>True</Pack>
<PackagePath>\</PackagePath>
</None>
</ItemGroup>

<ItemGroup> <PackageReference Include="Confluent.Kafka" Version="2.6.1" />

<PackageReference Include="System.Reactive" Version="6.0.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Neuroglia.AsyncApi.Client\Neuroglia.AsyncApi.Client.csproj" />
</ItemGroup>

</Project>
21 changes: 21 additions & 0 deletions src/Neuroglia.AsyncApi.Client.Bindings.Kafka/Usings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright © 2021-Present Neuroglia SRL. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

global using Microsoft.Extensions.Logging;
global using Microsoft.Extensions.Options;
global using Neuroglia.AsyncApi.Bindings.Kafka;
global using Neuroglia.AsyncApi.Client.Bindings.Kafka;
global using Neuroglia.AsyncApi.Client.Bindings.Kafka.Configuration;
global using Neuroglia.AsyncApi.Client.Configuration;
global using Neuroglia.AsyncApi.Client.Services;
global using Neuroglia.Serialization;
Original file line number Diff line number Diff line change
@@ -11,7 +11,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using MQTTnet;
using NATS.Client.Core;
using Neuroglia.AsyncApi.v3;
using System.Reactive.Subjects;
Original file line number Diff line number Diff line change
@@ -29,7 +29,6 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="MQTTnet" Version="5.0.1.1416" />
<PackageReference Include="NATS.Net" Version="2.5.5" />
<PackageReference Include="System.Reactive" Version="6.0.1" />
</ItemGroup>
3 changes: 0 additions & 3 deletions src/Neuroglia.AsyncApi.Core/Bindings/Amqp/AmqpChannelType.cs
Original file line number Diff line number Diff line change
@@ -11,9 +11,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using Neuroglia.Serialization.Json.Converters;
using System.ComponentModel;

namespace Neuroglia.AsyncApi.Bindings.Amqp;

/// <summary>
Original file line number Diff line number Diff line change
@@ -21,10 +21,34 @@ public record KafkaChannelBindingDefinition
: KafkaBindingDefinition, IChannelBindingDefinition
{

/// <summary>
/// Gets/sets the topic name, if different from channel name.
/// </summary>
[DataMember(Order = 1, Name = "topic"), JsonPropertyOrder(1), JsonPropertyName("topic"), YamlMember(Order = 1, Alias = "topic")]
public virtual string? Topic { get; set; }

/// <summary>
/// Gets/sets the number of partitions configured on this topic (useful to know how many parallel consumers you may run).
/// </summary>
[DataMember(Order = 2, Name = "partitions"), JsonPropertyOrder(2), JsonPropertyName("partitions"), YamlMember(Order = 2, Alias = "partitions")]
public virtual uint? Partitions { get; set; }

/// <summary>
/// Gets/sets the number of replicas configured on this topic.
/// </summary>
[DataMember(Order = 3, Name = "replicas"), JsonPropertyOrder(3), JsonPropertyName("replicas"), YamlMember(Order = 3, Alias = "replicas")]
public virtual uint? Replicas { get; set; }

/// <summary>
/// Gets/sets the topic configuration properties that are relevant for the API.
/// </summary>
[DataMember(Order = 4, Name = "topicConfiguration"), JsonPropertyOrder(4), JsonPropertyName("topicConfiguration"), YamlMember(Order = 4, Alias = "topicConfiguration")]
public virtual KafkaTopicConfiguration? TopicConfiguration { get; set; }

/// <summary>
/// Gets/sets the version of this binding. Defaults to 'latest'.
/// </summary>
[DataMember(Order = 1, Name = "bindingVersion"), JsonPropertyOrder(1), JsonPropertyName("bindingVersion"), YamlMember(Order = 1, Alias = "bindingVersion")]
[DataMember(Order = 5, Name = "bindingVersion"), JsonPropertyOrder(5), JsonPropertyName("bindingVersion"), YamlMember(Order = 5, Alias = "bindingVersion")]
public virtual string BindingVersion { get; set; } = "latest";

}
Original file line number Diff line number Diff line change
@@ -11,9 +11,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using Json.Schema;
using Neuroglia.AsyncApi.Bindings;

namespace Neuroglia.AsyncApi.Bindings.Kafka;

/// <summary>
Original file line number Diff line number Diff line change
@@ -11,9 +11,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using Json.Schema;
using Neuroglia.AsyncApi.Bindings;

namespace Neuroglia.AsyncApi.Bindings.Kafka;

/// <summary>
@@ -39,7 +36,7 @@ public record KafkaOperationBindingDefinition
/// <summary>
/// Gets/sets the version of this binding. Defaults to 'latest'.
/// </summary>
[DataMember(Order = 3, Name = "bindingVersion"), JsonPropertyOrder(3), JsonPropertyName("groubindingVersionpId"), YamlMember(Order = 3, Alias = "bindingVersion")]
[DataMember(Order = 3, Name = "bindingVersion"), JsonPropertyOrder(3), JsonPropertyName("bindingVersion"), YamlMember(Order = 3, Alias = "bindingVersion")]
public virtual string BindingVersion { get; set; } = "latest";

}
Original file line number Diff line number Diff line change
@@ -21,10 +21,22 @@ public record KafkaServerBindingDefinition
: KafkaBindingDefinition, IServerBindingDefinition
{

/// <summary>
/// Gets/sets the API URL for the Schema Registry used when producing Kafka messages (if a Schema Registry was used).
/// </summary>
[DataMember(Order = 1, Name = "schemaRegistryUrl"), JsonPropertyOrder(1), JsonPropertyName("schemaRegistryUrl"), YamlMember(Order = 1, Alias = "schemaRegistryUrl")]
public virtual Uri? SchemaRegistryUrl { get; set; }

/// <summary>
/// Gets/sets the vendor of Schema Registry and Kafka serdes library that should be used (e.g. apicurio, confluent, ibm, or karapace)
/// </summary>
[DataMember(Order = 2, Name = "schemaRegistryVendor"), JsonPropertyOrder(2), JsonPropertyName("schemaRegistryVendor"), YamlMember(Order = 2, Alias = "schemaRegistryVendor")]
public virtual string? SchemaRegistryVendor { get; set; }

/// <summary>
/// Gets/sets the version of this binding. Defaults to 'latest'.
/// </summary>
[DataMember(Order = 1, Name = "bindingVersion"), JsonPropertyOrder(1), JsonPropertyName("bindingVersion"), YamlMember(Order = 1, Alias = "bindingVersion")]
[DataMember(Order = 3, Name = "bindingVersion"), JsonPropertyOrder(3), JsonPropertyName("bindingVersion"), YamlMember(Order = 3, Alias = "bindingVersion")]
public virtual string BindingVersion { get; set; } = "latest";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright © 2021-Present Neuroglia SRL. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace Neuroglia.AsyncApi.Bindings.Kafka;

/// <summary>
/// Enumerates all supported topic cleanup policies
/// </summary>
public static class KafkaTopicCleanupPolicy
{

/// <summary>
/// Gets the name of the policy used to discard old segments when their retention time or size limit has been reached
/// </summary>
public const string Delete = "delete";

/// <summary>
/// Gets the name of the policy used to enable log compaction, which retains the latest value for each key
/// </summary>
public const string Compact = "compact";

/// <summary>
/// Gets a new <see cref="IEnumerable{T}"/> containing all supported topic cleanup policies
/// </summary>
/// <returns>A new <see cref="IEnumerable{T}"/> containing all supported topic cleanup policies</returns>
public static IEnumerable<string> AsEnumerable()
{
yield return Delete;
yield return Compact;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright © 2021-Present Neuroglia SRL. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace Neuroglia.AsyncApi.Bindings.Kafka;

/// <summary>
/// Represents an object used to configure a Kafka topic
/// </summary>
[DataContract]
public record KafkaTopicConfiguration
{

/// <summary>
/// Gets/sets the cleanup policy, if any
/// </summary>
[DataMember(Order = 1, Name = "cleanup.policy"), JsonPropertyOrder(1), JsonPropertyName("cleanup.policy"), YamlMember(Order = 1, Alias = "cleanup.policy")]
public virtual string[]? CleanupPolicy { get; set; }

/// <summary>
/// Gets/sets the retention duration in milliseconds, if any
/// </summary>
[DataMember(Order = 2, Name = "retention.ms"), JsonPropertyOrder(2), JsonPropertyName("retention.ms"), YamlMember(Order = 2, Alias = "retention.ms")]
public virtual long? RetentionMilliseconds { get; set; }

/// <summary>
/// Gets/sets the retention bytes, if any
/// </summary>
[DataMember(Order = 3, Name = "retention.bytes"), JsonPropertyOrder(3), JsonPropertyName("retention.bytes"), YamlMember(Order = 3, Alias = "retention.bytes")]
public virtual long? RetentionBytes { get; set; }

/// <summary>
/// Gets/sets the delete retention duration in milliseconds, if any
/// </summary>
[DataMember(Order = 4, Name = "delete.retention.ms"), JsonPropertyOrder(4), JsonPropertyName("delete.retention.ms"), YamlMember(Order = 4, Alias = "delete.retention.ms")]
public virtual long? DeleteRetentionMilliseconds { get; set; }

/// <summary>
/// Gets/sets the maximum length in bytes, if any, for the topic's messages
/// </summary>
[DataMember(Order = 5, Name = "max.message.bytes"), JsonPropertyOrder(5), JsonPropertyName("max.message.bytes"), YamlMember(Order = 5, Alias = "max.message.bytes")]
public virtual int? MaxMessageBytes { get; set; }

/// <summary>
/// Gets/sets a boolean indicating whether or not to validate the key schema. This configuration is specific to Confluent
/// </summary>
[DataMember(Order = 6, Name = "confluent.key.schema.validation"), JsonPropertyOrder(6), JsonPropertyName("confluent.key.schema.validation"), YamlMember(Order = 6, Alias = "confluent.key.schema.validation")]
public virtual bool ConfluentKeySchemaValidation { get; set; }

/// <summary>
/// Gets/sets the name of the schema lookup strategy for the message key. This configuration is specific to Confluent
/// </summary>
[DataMember(Order = 7, Name = "confluent.key.subject.name.strategy"), JsonPropertyOrder(7), JsonPropertyName("confluent.key.subject.name.strategy"), YamlMember(Order = 7, Alias = "confluent.key.subject.name.strategy")]
public virtual bool ConfluentKeySubjectNameStrategy { get; set; }

/// <summary>
/// Gets/sets a boolean indicating whether or not whether the schema validation for the message value is enabled. This configuration is specific to Confluent
/// </summary>
[DataMember(Order = 8, Name = "confluent.value.schema.validation"), JsonPropertyOrder(8), JsonPropertyName("confluent.value.schema.validation"), YamlMember(Order = 8, Alias = "confluent.value.schema.validation")]
public virtual bool ConfluentValueSchemaValidation { get; set; }

/// <summary>
/// Gets/sets the name of the schema lookup strategy for the message key. This configuration is specific to Confluent
/// </summary>
[DataMember(Order = 9, Name = "confluent.value.subject.name.strategy"), JsonPropertyOrder(9), JsonPropertyName("confluent.value.subject.name.strategy"), YamlMember(Order = 9, Alias = "confluent.value.subject.name.strategy")]
public virtual bool ConfluentValueSubjectNameStrategy { get; set; }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Copyright © 2021-Present Neuroglia SRL. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Confluent.Kafka;
using Neuroglia.AsyncApi.Bindings.Kafka;
using Neuroglia.AsyncApi.Client;
using Neuroglia.AsyncApi.Client.Bindings;
using Neuroglia.AsyncApi.UnitTests.Containers;
using System.Text;

namespace Neuroglia.AsyncApi.UnitTests.Cases.Client.Bindings;

public class KafkaBindingHandlerTests
: BindingHandlerTestsBase
{
public KafkaBindingHandlerTests()
: base(builder => builder.AddKafkaBindingHandler(), ConfigureServices)
{

}

[Fact]
public async Task Publish_Should_Work()
{
//arrange
var serverId = "kafka-server";
var channelId = "cloud-events";
var operationId = "publishCloudEvent";
var messageId = "cloudEvent";
var stringSchema = new JsonSchemaBuilder().Type(SchemaValueType.String).Build();
var objectSchema = new JsonSchemaBuilder().Type(SchemaValueType.Object).AdditionalProperties(true).Build();
var document = DocumentBuilder
.UsingAsyncApiV3()
.WithTitle("Test Kafka API")
.WithVersion("1.0.0")
.WithServer(serverId, server => server
.WithHost($"localhost:{ServiceProvider.GetRequiredKeyedService<DotNet.Testcontainers.Containers.IContainer>("kafka").GetMappedPublicPort(KafkaContainerBuilder.PublicPort)}")
.WithProtocol(AsyncApiProtocol.Kafka)
.WithBinding(new KafkaServerBindingDefinition()))
.WithChannel(channelId, channel => channel
.WithAddress("cloud-event")
.WithServer($"#/servers/{serverId}")
.WithMessage(messageId, message => message
.WithContentType(CloudEventContentType.Json)
.WithPayloadSchema(schemaDefinition => schemaDefinition
.WithJsonSchema(schema => schema
.Type(SchemaValueType.Object)
.Properties(new Dictionary<string, JsonSchema>()
{
{ CloudEventAttributes.SpecVersion, stringSchema },
{ CloudEventAttributes.Id, stringSchema },
{ CloudEventAttributes.Time, stringSchema },
{ CloudEventAttributes.Source, stringSchema },
{ CloudEventAttributes.Type, stringSchema },
{ CloudEventAttributes.Subject, stringSchema },
{ CloudEventAttributes.DataSchema, stringSchema },
{ CloudEventAttributes.DataContentType, stringSchema },
{ CloudEventAttributes.Data, objectSchema },
})
.Required(CloudEventAttributes.GetRequiredAttributes())
.AdditionalProperties(true)))
.WithBinding(new KafkaMessageBindingDefinition()))
.WithBinding(new KafkaChannelBindingDefinition()))
.WithOperation(operationId, operation => operation
.WithAction(v3.V3OperationAction.Receive)
.WithChannel($"#/channels/{channelId}")
.WithMessage($"#/channels/{channelId}/messages/{messageId}")
.WithBinding(new KafkaOperationBindingDefinition()))
.Build();
await using var client = ClientFactory.CreateFor(document);

//act
var e = new CloudEvent()
{
Id = Guid.NewGuid().ToString(),
SpecVersion = CloudEventSpecVersion.V1.Version,
Source = new("https://unit-tests.v3.asyncapi.neuroglia.io"),
Type = "io.neuroglia.asyncapi.v3.test.v1",
DataContentType = MediaTypeNames.Application.Json,
Data = new
{
Greetings = "Hello, World!"
}
};
var parameters = new AsyncApiPublishOperationParameters(operationId)
{
Payload = e
};
await using var result = await client.PublishAsync(parameters);

//assert
result.IsSuccessful.Should().BeTrue();
}

[Fact]
public async Task Subscribe_Should_Work()
{
//arrange
var serverId = "kafka-server";
var serverAddress = $"localhost:{ServiceProvider.GetRequiredKeyedService<DotNet.Testcontainers.Containers.IContainer>("kafka").GetMappedPublicPort(KafkaContainerBuilder.PublicPort)}";
var channelId = "cloud-events";
var channelAddress = "cloud-event";
var operationId = "subscribeToCloudEvents";
var messageId = "cloud-event";
var stringSchema = new JsonSchemaBuilder().Type(SchemaValueType.String).Build();
var objectSchema = new JsonSchemaBuilder().Type(SchemaValueType.Object).AdditionalProperties(true).Build();
var document = DocumentBuilder
.UsingAsyncApiV3()
.WithTitle("Test Kafka API")
.WithVersion("1.0.0")
.WithServer(serverId, server => server
.WithHost(serverAddress)
.WithProtocol(AsyncApiProtocol.Kafka)
.WithBinding(new KafkaServerBindingDefinition()))
.WithChannel(channelId, channel => channel
.WithAddress(channelAddress)
.WithServer($"#/servers/{serverId}")
.WithMessage(messageId, message => message
.WithContentType(CloudEventContentType.Json)
.WithPayloadSchema(schemaDefinition => schemaDefinition
.WithJsonSchema(schema => schema
.Type(SchemaValueType.Object)
.Properties(new Dictionary<string, JsonSchema>()
{
{ CloudEventAttributes.SpecVersion, stringSchema },
{ CloudEventAttributes.Id, stringSchema },
{ CloudEventAttributes.Time, stringSchema },
{ CloudEventAttributes.Source, stringSchema },
{ CloudEventAttributes.Type, stringSchema },
{ CloudEventAttributes.Subject, stringSchema },
{ CloudEventAttributes.DataSchema, stringSchema },
{ CloudEventAttributes.DataContentType, stringSchema },
{ CloudEventAttributes.Data, objectSchema },
})
.Required(CloudEventAttributes.GetRequiredAttributes())
.AdditionalProperties(true)))
.WithBinding(new KafkaMessageBindingDefinition()))
.WithBinding(new KafkaChannelBindingDefinition()))
.WithOperation(operationId, operation => operation
.WithAction(v3.V3OperationAction.Send)
.WithChannel($"#/channels/{channelId}")
.WithMessage($"#/channels/{channelId}/messages/{messageId}")
.WithBinding(new KafkaOperationBindingDefinition()
{
GroupId = new JsonSchemaBuilder().Type(SchemaValueType.String).Default("test-consumer-group")
}))
.Build();
await using var client = ClientFactory.CreateFor(document);

//act
var parameters = new AsyncApiSubscribeOperationParameters(operationId);
await using var result = await client.SubscribeAsync(parameters);
var messageCount = 10;
var messagesToSend = new List<Message<Null, byte[]>>();
for (var i = 0; i < messageCount; i++)
{
var e = new CloudEvent()
{
Id = Guid.NewGuid().ToString(),
SpecVersion = CloudEventSpecVersion.V1.Version,
Source = new("https://unit-tests.v3.asyncapi.neuroglia.io"),
Subject = i.ToString(),
Type = "io.neuroglia.asyncapi.v3.test.v1",
DataContentType = MediaTypeNames.Application.Json,
Data = new
{
Greetings = "Hello, World!"
}
};
var payload = JsonSerializer.Default.SerializeToByteArray(e)!;
var headers = new Headers
{
{ "Header1", JsonSerializer.Default.SerializeToByteArray("value1") },
{ "Header2", JsonSerializer.Default.SerializeToByteArray("value2") },
{ "Header3", JsonSerializer.Default.SerializeToByteArray("value3") }
};
messagesToSend.Add(new()
{
Value = payload,
Headers = headers
});
}
var messagesReceived = new List<IAsyncApiMessage>();
var subscription = result.Messages?.Subscribe(messagesReceived.Add);
var producerConfig = new ProducerConfig()
{
BootstrapServers = serverAddress
};
using var producer = new ProducerBuilder<Null, byte[]>(producerConfig).Build();
foreach (var message in messagesToSend) await producer.ProduceAsync(channelAddress, message);
await Task.Delay(3500);
subscription?.Dispose();

//assert
result.IsSuccessful.Should().BeTrue();
result.Messages.Should().NotBeNull();
messagesReceived.Should().NotBeEmpty();
}

static void ConfigureServices(IServiceCollection services)
{
services.AddKeyedSingleton("kafka", KafkaContainerBuilder.Build());
services.AddSingleton(provider => provider.GetRequiredKeyedService<DotNet.Testcontainers.Containers.IContainer>("kafka"));
services.AddHostedService<ContainerBootstrapper>();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright © 2021-Present Neuroglia SRL. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using DotNet.Testcontainers.Builders;
using DotNet.Testcontainers.Containers;

namespace Neuroglia.AsyncApi.UnitTests.Containers;

public static class KafkaContainerBuilder
{

public const int PublicPort = 9092;

public static IContainer Build()
{
return new ContainerBuilder()
.WithName($"kafka-{Guid.NewGuid():N}")
.WithImage("confluentinc/confluent-local")
.WithPortBinding(PublicPort, PublicPort)
.WithWaitStrategy(Wait
.ForUnixContainer()
.UntilMessageIsLogged(".* Kafka Server started .*"))
.Build();
}

}

0 comments on commit ccacdb3

Please sign in to comment.