Skip to content

Commit

Permalink
refactor: Align Kafka tests
Browse files Browse the repository at this point in the history
  • Loading branch information
HofmeisterAn committed Jan 13, 2025
1 parent 3080e79 commit 840e3a9
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 191 deletions.
4 changes: 2 additions & 2 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
<PackageVersion Include="Azure.Storage.Queues" Version="12.15.0"/>
<PackageVersion Include="ClickHouse.Client" Version="7.9.1"/>
<PackageVersion Include="Confluent.Kafka" Version="2.8.0"/>
<PackageVersion Include="Confluent.SchemaRegistry.Serdes.Json" Version="2.8.0" />
<PackageVersion Include="Confluent.SchemaRegistry" Version="2.8.0" />
<PackageVersion Include="Confluent.SchemaRegistry.Serdes.Json" Version="2.8.0"/>
<PackageVersion Include="Confluent.SchemaRegistry" Version="2.8.0"/>
<PackageVersion Include="Consul" Version="1.6.10.9"/>
<PackageVersion Include="CouchbaseNetClient" Version="3.6.4"/>
<PackageVersion Include="DotPulsar" Version="3.3.2"/>
Expand Down
17 changes: 9 additions & 8 deletions src/Testcontainers.Kafka/KafkaBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,21 @@ public KafkaBuilder WithListener(string kafka)
var listenerSecurityProtocolMap = $"{protocol}:PLAINTEXT";

var listeners = new[] { listener };
var listenersSecurityProtocolMap = new[] { listenerSecurityProtocolMap };

var host = kafka.Split(':')[0];

var currentListeners = DockerResourceConfiguration.Environments["KAFKA_LISTENERS"]
.Split([','], StringSplitOptions.RemoveEmptyEntries)
.Concat([listener]);
var updatedListeners = DockerResourceConfiguration.Environments["KAFKA_LISTENERS"]
.Split(',')
.Concat(listeners);

var currentListenersSecurityProtocolMap = DockerResourceConfiguration.Environments["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"]
.Split([','], StringSplitOptions.RemoveEmptyEntries)
.Concat([listenerSecurityProtocolMap]);
var updatedListenersSecurityProtocolMap = DockerResourceConfiguration.Environments["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"]
.Split(',')
.Concat(listenersSecurityProtocolMap);

return Merge(DockerResourceConfiguration, new KafkaConfiguration(listeners, listeners))
.WithEnvironment("KAFKA_LISTENERS", string.Join(",", currentListeners))
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", string.Join(",", currentListenersSecurityProtocolMap))
.WithEnvironment("KAFKA_LISTENERS", string.Join(",", updatedListeners))
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", string.Join(",", updatedListenersSecurityProtocolMap))
.WithNetworkAliases(host);
}

Expand Down
89 changes: 47 additions & 42 deletions tests/Testcontainers.Kafka.Tests/KafkaContainerNetworkTest.cs
Original file line number Diff line number Diff line change
@@ -1,63 +1,68 @@
using System.Collections.Generic;
using System.Text;
using DotNet.Testcontainers.Builders;
using DotNet.Testcontainers.Containers;
using DotNet.Testcontainers.Networks;

namespace Testcontainers.Kafka;

public sealed class KafkaContainerNetworkTest : IAsyncLifetime
{
private INetwork _network;
private KafkaContainer _kafkaContainer;
private const string Message = "Message produced by kafkacat";

private IContainer _kCatContainer;
public async Task InitializeAsync()
private const string Listener = "kafka:19092";

private const string DataFilePath = "/data/msgs.txt";

private readonly INetwork _network;

private readonly IContainer _kafkaContainer;

private readonly IContainer _kCatContainer;

public KafkaContainerNetworkTest()
{
_network = new NetworkBuilder().Build();
_network = new NetworkBuilder()
.Build();

_kafkaContainer = new KafkaBuilder()
.WithImage("confluentinc/cp-kafka")
.WithImage("confluentinc/cp-kafka:6.1.9")
.WithNetwork(_network)
.WithListener("kafka:19092")
.WithListener(Listener)
.Build();

_kCatContainer = new ContainerBuilder()
.WithImage("confluentinc/cp-kcat")
.WithImage("confluentinc/cp-kafkacat:6.1.9")
.WithNetwork(_network)
.WithCommand("-c", "tail -f /dev/null")
.WithEntrypoint("sh")
.WithResourceMapping(Encoding.Default.GetBytes("Message produced by kcat"), "/data/msgs.txt")
.WithEntrypoint(CommonCommands.SleepInfinity)
.WithResourceMapping(Encoding.Default.GetBytes(Message), DataFilePath)
.Build();

await _kCatContainer.StartAsync();
await _kafkaContainer.StartAsync();
}

public Task DisposeAsync()
public async Task InitializeAsync()
{
return Task.WhenAll(
_kafkaContainer.DisposeAsync().AsTask(),
_kCatContainer.DisposeAsync().AsTask()
);
await _kafkaContainer.StartAsync()
.ConfigureAwait(false);

await _kCatContainer.StartAsync()
.ConfigureAwait(false);
}


public async Task DisposeAsync()
{
await _kafkaContainer.StartAsync()
.ConfigureAwait(false);

await _kCatContainer.StartAsync()
.ConfigureAwait(false);

await _network.DisposeAsync()
.ConfigureAwait(false);
}

[Fact]
public async Task TestUsageWithListener()
public async Task ConsumesProducedKafkaMessage()
{
// kcat producer
await _kCatContainer.ExecAsync(new List<string>()
{
"kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt"
});


// kcat consumer
var kCatResult = await _kCatContainer.ExecAsync(new List<string>()
{
"kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1"
});

Assert.Contains("Message produced by kcat", kCatResult.Stdout);
_ = await _kCatContainer.ExecAsync(new[] { "kafkacat", "-b", Listener, "-t", "msgs", "-P", "-l", DataFilePath })
.ConfigureAwait(true);

var execResult = await _kCatContainer.ExecAsync(new[] { "kafkacat", "-b", Listener, "-C", "-t", "msgs", "-c", "1" })
.ConfigureAwait(true);

Assert.Equal(Message, execResult.Stdout.Trim());
}

}
129 changes: 129 additions & 0 deletions tests/Testcontainers.Kafka.Tests/KafkaContainerRegistryTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
namespace Testcontainers.Kafka;

public sealed class KafkaContainerRegistryTest : IAsyncLifetime
{
private const string Schema = @"
{
""$schema"": ""http://json-schema.org/draft-04/schema#"",
""title"": ""User"",
""type"": ""object"",
""additionalProperties"": false,
""properties"": {
""FirstName"": {
""type"": [""null"", ""string""]
},
""LastName"": {
""type"": [""null"", ""string""]
}
}
}";

private const ushort RestPort = 8085;

private const string SchemaRegistryNetworkAlias = "schema-registry";

private const string Listener = "kafka:19092";

private readonly INetwork _network;

private readonly KafkaContainer _kafkaContainer;

private readonly IContainer _schemaRegistryContainer;

public KafkaContainerRegistryTest()
{
_network = new NetworkBuilder()
.Build();

_kafkaContainer = new KafkaBuilder()
.WithImage("confluentinc/cp-kafka:6.1.9")
.WithNetwork(_network)
.WithListener(Listener)
.Build();

_schemaRegistryContainer = new ContainerBuilder()
.WithImage("confluentinc/cp-schema-registry:6.1.9")
.WithPortBinding(RestPort, true)
.WithNetwork(_network)
.WithNetworkAliases(SchemaRegistryNetworkAlias)
.WithEnvironment("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:" + RestPort)
.WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "PLAINTEXT")
.WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://" + Listener)
.WithEnvironment("SCHEMA_REGISTRY_HOST_NAME", SchemaRegistryNetworkAlias)
.WithWaitStrategy(Wait.ForUnixContainer().UntilHttpRequestIsSucceeded(request =>
request.ForPort(RestPort).ForPath("/subjects")))
.Build();
}

public async Task InitializeAsync()
{
await _kafkaContainer.StartAsync()
.ConfigureAwait(false);

await _schemaRegistryContainer.StartAsync()
.ConfigureAwait(false);
}

public async Task DisposeAsync()
{
await _kafkaContainer.StartAsync()
.ConfigureAwait(false);

await _schemaRegistryContainer.StartAsync()
.ConfigureAwait(false);

await _network.DisposeAsync()
.ConfigureAwait(false);
}

[Fact]
public async Task ConsumerReturnsProducerMessage()
{
// Given
const string topic = "user";

var subject = SubjectNameStrategy.Topic.ConstructValueSubjectName(topic);

var bootstrapServer = _kafkaContainer.GetBootstrapAddress();

var producerConfig = new ProducerConfig();
producerConfig.BootstrapServers = bootstrapServer;

var consumerConfig = new ConsumerConfig();
consumerConfig.BootstrapServers = bootstrapServer;
consumerConfig.GroupId = "sample-consumer";
consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest;

var message = new Message<string, User>();
message.Value = new User("John", "Doe");

var schemaRegistryConfig = new SchemaRegistryConfig();
schemaRegistryConfig.Url = new UriBuilder(Uri.UriSchemeHttp, _schemaRegistryContainer.Hostname, _schemaRegistryContainer.GetMappedPublicPort(RestPort)).ToString();

// When
using var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
_ = await schemaRegistry.RegisterSchemaAsync(subject, new Schema(Schema, SchemaType.Json))
.ConfigureAwait(true);

using var producer = new ProducerBuilder<string, User>(producerConfig)
.SetValueSerializer(new JsonSerializer<User>(schemaRegistry))
.Build();

_ = await producer.ProduceAsync(topic, message)
.ConfigureAwait(true);

using var consumer = new ConsumerBuilder<string, User>(consumerConfig)
.SetValueDeserializer(new JsonDeserializer<User>().AsSyncOverAsync())
.Build();

consumer.Subscribe(topic);

var result = consumer.Consume(TimeSpan.FromSeconds(15));

// Then
Assert.NotNull(result);
Assert.Equal(message.Value, result.Message.Value);
}

private record User(string FirstName, string LastName);
}
Loading

0 comments on commit 840e3a9

Please sign in to comment.