Skip to content

Commit

Permalink
update metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
IharYakimush committed Feb 4, 2025
1 parent b2e8e69 commit 4027925
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 27 deletions.
17 changes: 6 additions & 11 deletions src/Epam.Kafka/Metrics/ConsumerMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ namespace Epam.Kafka.Metrics;

internal sealed class ConsumerMetrics : CommonMetrics
{
private const string CgStateTagName = "CgState";
private const string CgJoinStateTagName = "CgJoinState";
private const string ReasonTagName = "Reason";
private const string GroupStateTagName = "GroupState";
private const string GroupJoinStateTagName = "GroupJoinState";
private const string TopicTagName = "Topic";
private const string PartitionTagName = "Partition";
private const string ConsumerGroupTagName = "Group";
Expand Down Expand Up @@ -53,8 +52,8 @@ private void ConfigureCgMeter(Meter cgMeter)
return Enumerable.Repeat(new Measurement<long>(v.ConsumerGroup.StateAgeMilliseconds / 1000,
new[]
{
new KeyValuePair<string, object?>(CgStateTagName, v.ConsumerGroup.State),
new KeyValuePair<string, object?>(CgJoinStateTagName, v.ConsumerGroup.JoinState)
new KeyValuePair<string, object?>(GroupStateTagName, v.ConsumerGroup.State),
new KeyValuePair<string, object?>(GroupJoinStateTagName, v.ConsumerGroup.JoinState)
}), 1);
}

Expand All @@ -65,13 +64,9 @@ private void ConfigureCgMeter(Meter cgMeter)
{
Statistics? v = this.Value;

if (v != null)
if (v is { ConsumerGroup.RebalanceAgeMilliseconds: > 0 })
{
return Enumerable.Repeat(new Measurement<long>(v.ConsumerGroup.RebalanceAgeMilliseconds / 1000,
new[]
{
new KeyValuePair<string, object?>(ReasonTagName, v.ConsumerGroup.RebalanceReason)
}), 1);
return Enumerable.Repeat(new Measurement<long>(v.ConsumerGroup.RebalanceAgeMilliseconds / 1000), 1);
}

return Empty;
Expand Down
4 changes: 2 additions & 2 deletions src/Epam.Kafka/Metrics/ProducerMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Epam.Kafka.Metrics;
internal sealed class ProducerMetrics : CommonMetrics
{
private const string TransactionTagName = "Transaction";
private const string TrStateTagName = "TrState";
private const string TransactionStateTagName = "TransactionState";
private const string IdempStateTagName = "IdempState";

private readonly ProducerConfig _config;
Expand Down Expand Up @@ -37,7 +37,7 @@ protected override void Initialize(Func<string, IEnumerable<KeyValuePair<string,
return Enumerable.Repeat(new Measurement<long>(v.ProducerTransaction.TransactionAgeMilliseconds / 1000,
new[]
{
new KeyValuePair<string, object?>(TrStateTagName, v.ProducerTransaction.TransactionState),
new KeyValuePair<string, object?>(TransactionStateTagName, v.ProducerTransaction.TransactionState),
new KeyValuePair<string, object?>(TransactionTagName, this._config.TransactionalId)
}), 1);
}
Expand Down
16 changes: 8 additions & 8 deletions tests/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="PublicApiGenerator" Version="11.1.0" />
<PackageReference Include="Shouldly" Version="4.2.1" />
<PackageReference Include="JunitXml.TestLogger" Version="3.1.12" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
<PackageReference Include="Moq" Version="4.20.70" />
<PackageReference Include="xunit" Version="2.8.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.1" />
<PackageReference Include="coverlet.collector" Version="6.0.2">
<PackageReference Include="PublicApiGenerator" Version="11.4.1" />
<PackageReference Include="Shouldly" Version="4.3.0" />
<PackageReference Include="JunitXml.TestLogger" Version="5.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
<PackageReference Include="Moq" Version="4.20.72" />
<PackageReference Include="xunit" Version="2.9.3" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.2" />
<PackageReference Include="coverlet.collector" Version="6.0.4">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
Expand Down
6 changes: 3 additions & 3 deletions tests/Epam.Kafka.Tests/MetricsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public async Task ProducerTransaction()
await Task.Delay(200);
ml.RecordObservableInstruments(this.Output);
ml.Results.Count.ShouldBe(2);
ml.Results.Keys.First().ShouldContain("Type:producer-TrState:Init-Transaction:qwe");
ml.Results.Keys.First().ShouldContain("Type:producer-TransactionState:Init-Transaction:qwe");
ml.Results.Keys.Last().ShouldContain("Type:producer-IdempState:Init");

// One 1 of 4 assigned
Expand All @@ -120,15 +120,15 @@ public async Task ProducerTransaction()
await Task.Delay(200);
ml.RecordObservableInstruments(this.Output);
ml.Results.Count.ShouldBe(2);
ml.Results.Keys.First().ShouldContain("Type:producer-TrState:Ready-Transaction:qwe");
ml.Results.Keys.First().ShouldContain("Type:producer-TransactionState:Ready-Transaction:qwe");
ml.Results.Keys.Last().ShouldContain("Type:producer-IdempState:Assigned");

producer.BeginTransaction();

await Task.Delay(200);
ml.RecordObservableInstruments(this.Output);
ml.Results.Count.ShouldBe(2);
ml.Results.Keys.First().ShouldContain("Type:producer-TrState:InTransaction-Transaction:qwe");
ml.Results.Keys.First().ShouldContain("Type:producer-TransactionState:InTransaction-Transaction:qwe");
ml.Results.Keys.Last().ShouldContain("Type:producer-IdempState:Assigned");

await producer.ProduceAsync("test", new Message<int, int> { Key = 1, Value = 2 });
Expand Down
6 changes: 3 additions & 3 deletions tests/Epam.Kafka.Tests/StatisticsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void TransactionMetricsTests()
ml.RecordObservableInstruments(this.Output);

ml.Results.Count.ShouldBe(2);
ml.Results["epam_kafka_stats_eos_txn_age_Handler:n1-Name:c1-Type:c-TrState:test-Transaction:qwe"].ShouldBe(120);
ml.Results["epam_kafka_stats_eos_txn_age_Handler:n1-Name:c1-Type:c-TransactionState:test-Transaction:qwe"].ShouldBe(120);
ml.Results["epam_kafka_stats_eos_idemp_age_Handler:n1-Name:c1-Type:c-IdempState:ids"].ShouldBe(130);

cm.OnCompleted();
Expand All @@ -209,8 +209,8 @@ public void ConsumerGroupMetricsTests()
ml.RecordObservableInstruments(this.Output);

ml.Results.Count.ShouldBe(4);
ml.Results["epam_kafka_stats_cg_state_age_Group:qwe-Handler:n1-Name:c1-Type:c-CgState:test1-CgJoinState:test2"].ShouldBe(120);
ml.Results["epam_kafka_stats_cg_rebalance_age_Group:qwe-Handler:n1-Name:c1-Type:c-Reason:test3"].ShouldBe(130);
ml.Results["epam_kafka_stats_cg_state_age_Group:qwe-Handler:n1-Name:c1-Type:c-GroupState:test1-GroupJoinState:test2"].ShouldBe(120);
ml.Results["epam_kafka_stats_cg_rebalance_age_Group:qwe-Handler:n1-Name:c1-Type:c"].ShouldBe(130);
ml.Results["epam_kafka_stats_cg_rebalance_count_Group:qwe-Handler:n1-Name:c1-Type:c"].ShouldBe(22);
ml.Results["epam_kafka_stats_cg_assignment_count_Group:qwe-Handler:n1-Name:c1-Type:c"].ShouldBe(4);

Expand Down

0 comments on commit 4027925

Please sign in to comment.