Skip to content

Commit

Permalink
release v2.2 (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
IharYakimush authored Jun 19, 2024
1 parent d731b18 commit c06f4a2
Show file tree
Hide file tree
Showing 60 changed files with 1,212 additions and 159 deletions.
6 changes: 1 addition & 5 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,4 @@ __pycache__/
*.btp.cs
*.btm.cs
*.odx.cs
*.xsd.cs

# custom
/nupkg
/reports
*.xsd.cs
2 changes: 1 addition & 1 deletion .runsettings
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</RunConfiguration>
<xUnit>
<LongRunningTestSeconds>40</LongRunningTestSeconds>
<ParallelizeTestCollections>false</ParallelizeTestCollections>
<ParallelizeTestCollections>true</ParallelizeTestCollections>
</xUnit>
<DataCollectionRunSettings>
<DataCollectors>
Expand Down
16 changes: 10 additions & 6 deletions Epam.Kafka.sln
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".github", ".github", "{A87B
.github\ISSUE_TEMPLATE\enhancement.md = .github\ISSUE_TEMPLATE\enhancement.md
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Docker", "Docker", "{222BCFF6-0433-4BE2-B19F-800790CC9261}"
ProjectSection(SolutionItems) = preProject
tests\docker-compose.yml = tests\docker-compose.yml
tests\kafka-start.cmd = tests\kafka-start.cmd
tests\kafka-stop.cmd = tests\kafka-stop.cmd
EndProjectSection
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Integration", "Integration", "{222BCFF6-0433-4BE2-B19F-800790CC9261}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{CC45261E-D51D-43F4-829D-34BDC3D68EEE}"
ProjectSection(SolutionItems) = preProject
Expand All @@ -67,6 +62,13 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{CC45261E-D
README.md = README.md
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Docker", "Docker", "{FEEEFBBD-B549-459A-A5F2-3F608EC1BF39}"
ProjectSection(SolutionItems) = preProject
tests\docker-compose.yml = tests\docker-compose.yml
tests\kafka-start.cmd = tests\kafka-start.cmd
tests\kafka-stop.cmd = tests\kafka-stop.cmd
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -125,11 +127,13 @@ Global
{1C278B5D-4685-4677-8895-90244028BEB3} = {837928B6-19AD-471C-A875-912C7F1F97C7}
{B89145BF-B42C-4674-BDD5-61A9BFEB4AA3} = {837928B6-19AD-471C-A875-912C7F1F97C7}
{46C55727-E63E-4571-BA2B-5C5A5A5D6EC9} = {07C5B87A-BBE0-4E3E-83CB-8A95B2D6855E}
{2E0E5146-9022-41A4-A8F6-7460F69639B1} = {222BCFF6-0433-4BE2-B19F-800790CC9261}
{191052C8-A238-40F2-9322-A8B98CECA156} = {E0525F24-5B2C-4D26-A93F-6817B1540C87}
{9BF7230E-5647-44F5-A7E5-CAA5C80A5A94} = {837928B6-19AD-471C-A875-912C7F1F97C7}
{A87B39B7-FF58-4305-83E5-5AE99D3ECC1A} = {A5013F6E-5A4E-4FC5-8A20-96AEB83BA9C7}
{222BCFF6-0433-4BE2-B19F-800790CC9261} = {837928B6-19AD-471C-A875-912C7F1F97C7}
{CC45261E-D51D-43F4-829D-34BDC3D68EEE} = {A5013F6E-5A4E-4FC5-8A20-96AEB83BA9C7}
{FEEEFBBD-B549-459A-A5F2-3F608EC1BF39} = {222BCFF6-0433-4BE2-B19F-800790CC9261}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {12AADABB-DDF0-4214-9300-49F6F55819DA}
Expand Down
7 changes: 4 additions & 3 deletions sample/Epam.Kafka.Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ private static void Main(string[] args)
{
IHostBuilder hostBuilder = Host.CreateDefaultBuilder(args);

hostBuilder.ConfigureServices(services =>
hostBuilder.ConfigureServices((context,services) =>
{
// view health check results in console for demo purposes only.
services.AddSingleton<IHealthCheckPublisher, ConsoleHealthCheckPublisher>();
Expand All @@ -34,8 +34,9 @@ private static void Main(string[] args)
services.AddOpenTelemetry().WithMetrics(mb =>
mb.AddMeter(PipelineMonitor.StatusMeterName, PipelineMonitor.HealthMeterName).AddConsoleExporter());

KafkaBuilder kafkaBuilder = services.AddKafka();

KafkaBuilder kafkaBuilder = services.AddKafka()
.WithConfigPlaceholders("<EnvironmentName>", context.HostingEnvironment.EnvironmentName);

kafkaBuilder.WithPubSubSummaryHealthCheck();

kafkaBuilder.WithClusterConfig("Sandbox").Configure(options =>
Expand Down
29 changes: 21 additions & 8 deletions sample/Epam.Kafka.Sample/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
"BatchSize": 100,
"BatchEmptyTimeout": "00:00:10",
"ExternalStateCommitToKafka": true,
"PipelineRetryTimeout": "00:00:30"
"PipelineRetryTimeout": "00:00:30",
"Consumer": "DefaultSub"
},
"Publication": {
"Producer": "DefaultPub"
}
},
"Clusters": {
"Sandbox": {
"Sandbox": {
"bootstrap.servers": "localhost:9092",
"allow.auto.create.topics": true
},
Expand All @@ -29,17 +33,26 @@
}
},
"Producers": {
"Default": {},
"Transactional": {
"transactional.id": "producer.epam-kafka-sample"
"Default": {
"client.id": "<DomainName>@<MachineName>",
"dotnet.logger.category": "Custom"
},
"DefaultPub": {
"client.id": "<DomainName>@<MachineName>:<Name>"
},
"TransactionalPub": {
"transactional.id": "producer.epam-kafka-sample.<name>.<EnvironmentName>",
"client.id": "<DomainName>@<MachineName>:<Name>"
}
},
"Consumers": {
"Default": {
"group.id": "consumer.epam-kafka-sample"
"group.id": "consumer.epam-kafka-sample.<EnvironmentName>",
"client.id": "<DomainName>@<MachineName>"
},
"Republish": {
"group.id": "consumer.epam-kafka-sample.republish"
"DefaultSub": {
"group.id": "consumer.epam-kafka-sample.<EnvironmentName>",
"client.id": "<DomainName>@<MachineName>:<Name>"
}
},
"Subscriptions": {
Expand Down
2 changes: 1 addition & 1 deletion src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<NeutralLanguage>en</NeutralLanguage>

<VersionPrefix>0</VersionPrefix>
<Version>2.1.$(VersionPrefix)</Version>
<Version>2.2.$(VersionPrefix)</Version>
</PropertyGroup>

<ItemGroup>
Expand Down
11 changes: 10 additions & 1 deletion src/Epam.Kafka.PubSub/Common/Pipeline/PipelineMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,19 @@ public abstract class PipelineMonitor
/// </summary>
public const string StatusResultGaugeName = "epam_kafka_pubsub_status_result";

internal PipelineMonitor(string name)
internal PipelineMonitor(PubSubContext context, string name)
{
this.Context = context ?? throw new ArgumentNullException(nameof(context));
this.FullName = name ?? throw new ArgumentNullException(nameof(name));
this.Name = this.FullName.Split('.').Last();
this.NamePlaceholder = new Dictionary<string, string> { { "<name>", this.Name } };
}

/// <summary>
/// The <see cref="PubSubContext"/>
/// </summary>
internal PubSubContext Context { get; }

internal string FullName { get; }

/// <summary>
Expand All @@ -64,4 +71,6 @@ internal PipelineMonitor(string name)
/// Number of sequential pipeline errors without at least one successful batch.
/// </summary>
public int PipelineRetryIteration { get; internal set; }

internal IReadOnlyDictionary<string, string> NamePlaceholder { get; }
}
2 changes: 1 addition & 1 deletion src/Epam.Kafka.PubSub/Common/Pipeline/PubSubMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public abstract class PubSubMonitor<TBatchResult> : PipelineMonitor
{

/// <inheritdoc />
internal PubSubMonitor(string name) : base(name)
internal PubSubMonitor(PubSubContext context, string name) : base(context, name)
{
}

Expand Down
16 changes: 15 additions & 1 deletion src/Epam.Kafka.PubSub/Common/PubSubBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal PubSubBuilder(KafkaBuilder builder, string name, Type keyType, Type val
{
this.Builder = builder ?? throw new ArgumentNullException(nameof(builder));
this.Key = name ?? throw new ArgumentNullException(nameof(name));

this._options = builder.Services.AddOptions<TOptions>(this.Key)
.Configure(x =>
{
Expand Down Expand Up @@ -87,4 +87,18 @@ public TBuilder WithOptions(Action<TOptions> configure)
this._options.Configure(configure);
return (TBuilder)this;
}

/// <inheritdoc cref="WithOptions"/>
/// <typeparam name="TDep"><inheritdoc cref="OptionsBuilder{TOptions}.Configure{TDep}" path="/typeparam[@name='TDep']"/> </typeparam>
public TBuilder WithOptions<TDep>(Action<TOptions, TDep> configure) where TDep : class
{
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}

this._options.Configure(configure);

return (TBuilder)this;
}
}
16 changes: 11 additions & 5 deletions src/Epam.Kafka.PubSub/Common/PubSubContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ public sealed class PubSubContext
public const int MaxPublicationsCount = 100;

private readonly ConcurrentDictionary<int, ISyncPolicy> _bulkheads = new();
private readonly Dictionary<string, PublicationMonitor> _publications = new(StringComparer.OrdinalIgnoreCase);

private readonly Dictionary<string, PublicationMonitor> _publications = new(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, SubscriptionMonitor> _subscriptions = new(StringComparer.OrdinalIgnoreCase);

internal ConcurrentDictionary<string, PublicationMonitor> TransactionIds { get; } = new();

internal PubSubContext()
{
}
Expand All @@ -48,7 +50,7 @@ internal PubSubContext()
/// </summary>
public IReadOnlyDictionary<string, PublicationMonitor> Publications => this._publications;

internal void AddSubscription(string name)
internal SubscriptionMonitor AddSubscription(string name)
{
if (this._subscriptions.ContainsKey(name))
{
Expand All @@ -63,7 +65,7 @@ internal void AddSubscription(string name)

if (this._subscriptions.Count < MaxSubscriptionsCount)
{
this._subscriptions.Add(name, new SubscriptionMonitor(name));
this._subscriptions.Add(name, new SubscriptionMonitor(this, name));
}
else
{
Expand All @@ -74,9 +76,11 @@ internal void AddSubscription(string name)
{
throw new InvalidOperationException();
}

return this._subscriptions[name];
}

internal void AddPublication(string name)
internal PublicationMonitor AddPublication(string name)
{
if (this._publications.ContainsKey(name))
{
Expand All @@ -91,7 +95,7 @@ internal void AddPublication(string name)

if (this._publications.Count < MaxPublicationsCount)
{
this._publications.Add(name, new PublicationMonitor(name));
this._publications.Add(name, new PublicationMonitor(this, name));
}
else
{
Expand All @@ -102,6 +106,8 @@ internal void AddPublication(string name)
{
throw new InvalidOperationException();
}

return this._publications[name];
}

internal ISyncPolicy GetHandlerPolicy(PubSubOptions options)
Expand Down
6 changes: 5 additions & 1 deletion src/Epam.Kafka.PubSub/Epam.Kafka.PubSub.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
<PropertyGroup>
<TargetFrameworks>net8.0;net6.0;netstandard2.0;net462</TargetFrameworks>
<Description>Framework for building pub/sub batch processing applications</Description>
<PackageReleaseNotes>Implementation for EntityFramework6 added in Epam.Kafka.PubSub.EntityFramework6 package. Separate summary health checks for subscriptions and publications added. Polly updated to 8.4.0.</PackageReleaseNotes>
<PackageReleaseNotes>
- [Breaking Change] Subscription background service don't set 'client.id' config value automatically. Previously it was `$"{AppDomain.CurrentDomain.FriendlyName}@{Environment.MachineName}:{this.Monitor.Name}"`.
- [Breaking Change] Publication background service don't append it's name to 'transactional.id' config value automatically. Previously it was `config.TransactionalId += $"-{this.Monitor.Name}"`. Added validation to prevent different publication background services use same 'transactional.id' config value.
- Subscription and Publication background services can resolve special placeholder &lt;name&gt; (that reference name of subscription or publication) when it used in ConsumerConfig or ProducerConfig.
</PackageReleaseNotes>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ namespace Epam.Kafka.PubSub.Publication.Options;
public sealed class PublicationOptions : PubSubOptions, IOptions<PublicationOptions>
{
internal readonly ProducerPartitioner Partitioner = new();

internal Func<Lazy<ISchemaRegistryClient>, object>? KeySerializer;
internal Func<Lazy<ISchemaRegistryClient>, object>? ValueSerializer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ public ValidateOptionsResult Validate(string? name, PublicationOptions options)
throw new ArgumentNullException(nameof(options));
}

if (options.Enabled == false)
{
return ValidateOptionsResult.Success;
}

string? result = PubSubOptionsValidate.GetFirstFailure(options);

result ??= options.ValidateString(x => x.DefaultTopic, regex: RegexHelper.TopicNameRegex);
Expand All @@ -24,7 +29,7 @@ public ValidateOptionsResult Validate(string? name, PublicationOptions options)

if (result != null)
{
return ValidateOptionsResult.Fail(result);
return ValidateOptionsResult.Fail($"Publication '{name}' configuration not valid: {result}");
}

return ValidateOptionsResult.Success;
Expand Down
24 changes: 23 additions & 1 deletion src/Epam.Kafka.PubSub/Publication/Pipeline/PublicationMonitor.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
// Copyright © 2024 EPAM Systems

using System.Collections.Concurrent;
using Confluent.Kafka;

using Epam.Kafka.PubSub.Common;
using Epam.Kafka.PubSub.Common.Pipeline;

namespace Epam.Kafka.PubSub.Publication.Pipeline;
Expand All @@ -14,7 +18,7 @@ public class PublicationMonitor : PubSubMonitor<PublicationBatchResult>
/// </summary>
public const string Prefix = "Epam.Kafka.Publication";

internal PublicationMonitor(string name) : base(BuildFullName(name))
internal PublicationMonitor(PubSubContext context, string name) : base(context, BuildFullName(name))
{
}

Expand All @@ -31,4 +35,22 @@ internal override void HandleResult(PublicationBatchResult batchResult)
this.PipelineRetryIteration = 0;
}
}

internal bool TryRegisterTransactionId(ProducerConfig config, out string? existingName)
{
if (config == null) throw new ArgumentNullException(nameof(config));

existingName = null;
ConcurrentDictionary<string, PublicationMonitor> ids = this.Context.TransactionIds;
string id = config.TransactionalId!;

bool result = ids.TryAdd(id, this) || ids.TryUpdate(id, this, this);

if (!result)
{
existingName = ids[id]?.Name;
}

return result;
}
}
Loading

0 comments on commit c06f4a2

Please sign in to comment.