Skip to content

Commit

Permalink
refactor metrics and add samples
Browse files Browse the repository at this point in the history
  • Loading branch information
IharYakimush committed Jan 30, 2025
1 parent bd80611 commit e7a6b85
Show file tree
Hide file tree
Showing 20 changed files with 143 additions and 89 deletions.
2 changes: 1 addition & 1 deletion sample/ProduceAndConsume/ProduceAndConsume.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Epam.Kafka" Version="2.5.202-rc" />
<PackageReference Include="Epam.Kafka" Version="2.5.209-rc" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Epam.Kafka" Version="2.5.202-rc" />
<PackageReference Include="Epam.Kafka" Version="2.5.209-rc" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.11.1" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.11.1" />
Expand Down
25 changes: 22 additions & 3 deletions sample/ProduceAndConsumeMetrics/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ private static void Main(string[] args)
hostBuilder.ConfigureServices(services =>
{
services.AddOpenTelemetry().WithMetrics(
mb => mb.AddMeter(Statistics.MeterName)
mb => mb.AddMeter(Statistics.TopLevelMeterName, Statistics.TopicPartitionMeterName)
.AddReader(new PeriodicExportingMetricReader(new CustomConsoleExporter(), 1000)));

KafkaBuilder kafkaBuilder = services.AddKafka();
Expand Down Expand Up @@ -121,16 +121,35 @@ public override ExportResult Export(in Batch<Metric> batch)
{
foreach (MetricPoint point in m.GetMetricPoints())
{
var pt = new List<string>(point.Tags.Count);
var pt = new List<string>();

if (m.MeterTags is not null)
{
foreach (KeyValuePair<string, object?> tag in m.MeterTags)
{
pt.Add(tag.ToString());
}
}

foreach (KeyValuePair<string, object?> tag in point.Tags)
{
pt.Add(tag.ToString());
}

string tags = string.Join(",", pt);

Console.WriteLine($"{m.Name}[{tags}]= {point.GetSumLong()}");
string value = "???";

if (m.MetricType == MetricType.LongGauge)
{
value = point.GetGaugeLastValueLong().ToString("D");
}
else if (m.MetricType == MetricType.LongSum)
{
value = point.GetSumLong().ToString("D");
}

Console.WriteLine($"{m.Name}[{tags}]= {value}");
}
}

Expand Down
2 changes: 1 addition & 1 deletion sample/PublishEfCore/PublishEfCore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Epam.Kafka.PubSub.EntityFrameworkCore" Version="2.5.202-rc" />
<PackageReference Include="Epam.Kafka.PubSub.EntityFrameworkCore" Version="2.5.209-rc" />
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="8.0.12" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Epam.Kafka.PubSub.EntityFrameworkCore" Version="2.5.202-rc" />
<PackageReference Include="Epam.Kafka.PubSub.EntityFrameworkCore" Version="2.5.209-rc" />
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="8.0.12" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
</ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion sample/Subscribe/Subscribe.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Epam.Kafka.PubSub" Version="2.5.202-rc" />
<PackageReference Include="Epam.Kafka.PubSub" Version="2.5.209-rc" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion sample/SubscribeEfCore/SubscribeEfCore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Epam.Kafka.PubSub.EntityFrameworkCore" Version="2.5.202-rc" />
<PackageReference Include="Epam.Kafka.PubSub.EntityFrameworkCore" Version="2.5.209-rc" />
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="8.0.12" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Epam.Kafka.PubSub.EntityFrameworkCore" Version="2.5.202-rc" />
<PackageReference Include="Epam.Kafka.PubSub.EntityFrameworkCore" Version="2.5.209-rc" />
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="8.0.12" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Epam.Kafka.PubSub" Version="2.5.202-rc" />
<PackageReference Include="Epam.Kafka.PubSub" Version="2.5.209-rc" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
</ItemGroup>

Expand Down
7 changes: 2 additions & 5 deletions src/Epam.Kafka.PubSub/Common/Metrics/MetricsWithName.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,11 @@ internal abstract class MetricsWithName : IDisposable

private readonly Meter _meter;

private readonly KeyValuePair<string, object?>[] _monitorName;

protected MetricsWithName(string name, PipelineMonitor monitor)
{
if (name == null) throw new ArgumentNullException(nameof(name));

this._meter = new Meter(name);
this._monitorName = new[] { new KeyValuePair<string, object?>(NameTag, monitor.FullName) };
this._meter = new Meter(name,null, new[] { new KeyValuePair<string, object?>(NameTag, monitor.FullName) });
}

public void Dispose()
Expand All @@ -29,6 +26,6 @@ public void Dispose()

protected void CreateObservableGauge<T>(string name, Func<T> observeValue, string? description) where T : struct
{
this._meter.CreateObservableGauge(name, () => new Measurement<T>(observeValue(), this._monitorName), null, description);
this._meter.CreateObservableGauge(name, () => new Measurement<T>(observeValue()), null, description);
}
}
2 changes: 1 addition & 1 deletion src/Epam.Kafka/Internals/KafkaFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public IConsumer<TKey, TValue> CreateConsumer<TKey, TValue>(ConsumerConfig confi

try
{
consumer = new ObservableConsumer<TKey, TValue>(builder);
consumer = new ObservableConsumer<TKey, TValue>(builder, config);

logger.ConsumerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue), oauthSet, logSet);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Epam.Kafka/Internals/Observable/ObservableConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ internal class ObservableConsumer<TKey, TValue> : ObservableClient, IConsumer<TK
{
private readonly IConsumer<TKey, TValue> _inner;

public ObservableConsumer(ConsumerBuilder<TKey, TValue> builder)
public ObservableConsumer(ConsumerBuilder<TKey, TValue> builder, ConsumerConfig config)
{
if (builder == null) throw new ArgumentNullException(nameof(builder));

Expand All @@ -29,7 +29,7 @@ public ObservableConsumer(ConsumerBuilder<TKey, TValue> builder)
builder.SetStatisticsHandler((_, json) => this.StatisticsHandler(json));
this.StatObservers = new List<IObserver<string>>();
#pragma warning disable CA2000 // unsubscribe not needed
this.Subscribe(new ConsumerMetrics());
this.Subscribe(new ConsumerMetrics(config));
#pragma warning restore CA2000
}
catch (InvalidOperationException)
Expand Down
27 changes: 18 additions & 9 deletions src/Epam.Kafka/Metrics/CommonMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,27 @@

namespace Epam.Kafka.Metrics;

internal class CommonMetrics : StatisticsMetrics
internal abstract class CommonMetrics : StatisticsMetrics
{
protected CommonMetrics() : base()
{
}

protected override void Initialize(Meter meter, Meter topParMeter)
{
this.CreateCounter(meter, "epam_kafka_stats_age", v => v.AgeMicroseconds, "microseconds",
"Time since this client instance was created (microseconds).");
this.CreateCounter(meter, "epam_kafka_stats_trx_msgs", this.GetTxRxMsg,
description: "Number of messages consumed or produced.");

this.CreateCounter(meter, "epam_kafka_stats_trx", this.GetTxRx,
description: "Number of requests transmitted or received.");

//this.CreateTopLevelCounter(meter, "epam_kafka_stats_replyq", v => v.OpsQueueCountGauge, description:
// "Number of ops (callbacks, events, etc) waiting in queue for application to serve with rd_kafka_poll().");
this.CreateCounter(meter, "epam_kafka_stats_trx_bytes", this.GetTxRxBytes,
description: "Number of bytes transmitted or received.");

this.CreateGauge(meter, "epam_kafka_stats_age", v => v.AgeMicroseconds / 1000000, "seconds",
"Time since this client instance was created (seconds).");

this.CreateGauge(meter, "epam_kafka_stats_replyq", v => v.OpsQueueCountGauge, description:
"Number of ops (callbacks, events, etc) waiting in queue for application to serve with rd_kafka_poll().");
}

protected abstract long GetTxRxMsg(Statistics value);
protected abstract long GetTxRx(Statistics value);
protected abstract long GetTxRxBytes(Statistics value);
}
65 changes: 59 additions & 6 deletions src/Epam.Kafka/Metrics/ConsumerMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,74 @@

using System.Diagnostics.Metrics;

using Confluent.Kafka;
using Epam.Kafka.Stats;

namespace Epam.Kafka.Metrics;

internal sealed class ConsumerMetrics : CommonMetrics
{
private const string TopicTagName = "Topic";
private const string PartitionTagName = "Partition";
private const string ConsumerGroupTagName = "Group";

private readonly ConsumerConfig _config;

public ConsumerMetrics(ConsumerConfig config)
{
this._config = config ?? throw new ArgumentNullException(nameof(config));
}

protected override void Initialize(Meter meter, Meter topParMeter)
{
base.Initialize(meter, topParMeter);

this.CreateCounter(meter, "epam_kafka_stats_rxmsgs", v => v.ConsumedMessagesTotal,
description: "Total number of messages consumed, not including ignored messages (due to offset, etc), from Kafka brokers.");

//this.CreateTopLevelCounter(meter, "epam_kafka_stats_rx", v => v.ConsumedRequestsTotal,
// description: "Total number of responses received from Kafka brokers.");

this.CreateTpGauge(topParMeter, "epam_kafka_stats_tp_lag",
m => m.Value.ConsumerLag, null, "Consumer lag");
}

protected override long GetTxRxMsg(Statistics value)
{
return value.ConsumedMessagesTotal;
}

protected override long GetTxRx(Statistics value)
{
return value.ConsumedRequestsTotal;
}

protected override long GetTxRxBytes(Statistics value)
{
return value.ConsumedBytesTotal;
}

private void CreateTpGauge(Meter meter, string name,
Func<KeyValuePair<TopicStatistics, PartitionStatistics>, long> factory,
string? unit = null,
string? description = null)
{
if (meter == null) throw new ArgumentNullException(nameof(meter));
if (name == null) throw new ArgumentNullException(nameof(name));

meter.CreateObservableGauge(name, () =>
{
Statistics? v = this.Value;

if (v != null)
{
return v.Topics
.SelectMany(p =>
p.Value.Partitions.Where(x => x.Key != PartitionStatistics.InternalUnassignedPartition)
.Select(x => new KeyValuePair<TopicStatistics, PartitionStatistics>(p.Value, x.Value)))
.Select(m => new Measurement<long>(factory(m), new[]
{
new KeyValuePair<string, object?>(TopicTagName, m.Key.Name),
new KeyValuePair<string, object?>(PartitionTagName, m.Value.Id),
new KeyValuePair<string, object?>(ConsumerGroupTagName, this._config.GroupId)
}));
}

return Empty;
}, unit, description);
}
}
19 changes: 11 additions & 8 deletions src/Epam.Kafka/Metrics/ProducerMetrics.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
// Copyright © 2024 EPAM Systems

using System.Diagnostics.Metrics;

namespace Epam.Kafka.Metrics;

internal sealed class ProducerMetrics : CommonMetrics
{
protected override void Initialize(Meter meter, Meter topParMeter)

protected override long GetTxRxMsg(Statistics value)
{
base.Initialize(meter, topParMeter);
return value.TransmittedMessagesTotal;
}

this.CreateCounter(meter, "epam_kafka_stats_txmsgs", v => v.TransmittedMessagesTotal,
description: "Total number of messages transmitted (produced) to Kafka brokers");
protected override long GetTxRx(Statistics value)
{
return value.TransmittedRequestsTotal;
}

//this.CreateTopLevelCounter(meter, "epam_kafka_stats_tx", v => v.TransmittedRequestsTotal,
// description: "Total number of requests sent to Kafka brokers.");
protected override long GetTxRxBytes(Statistics value)
{
return value.TransmittedBytesTotal;
}
}
36 changes: 3 additions & 33 deletions src/Epam.Kafka/Metrics/StatisticsMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,19 @@

using System.Diagnostics.Metrics;
using System.Text.RegularExpressions;
using Epam.Kafka.Stats;

namespace Epam.Kafka.Metrics;

#pragma warning disable CA1001 // dispose only on completed

internal abstract class StatisticsMetrics : IObserver<Statistics>
{
private static readonly Regex HandlerRegex = new ("^(.*)#(consumer|producer)-(\\d{1,7})$",
private static readonly Regex HandlerRegex = new("^(.*)#(consumer|producer)-(\\d{1,7})$",
RegexOptions.Compiled | RegexOptions.IgnoreCase);

private const string NameTag = "Name";
private const string HandlerTag = "Handler";
private const string TypeTag = "Type";
private const string TopicTagName = "Topic";
private const string PartitionTagName = "Partition";

private readonly object _syncObj = new();
private bool _initialized;
Expand Down Expand Up @@ -72,7 +69,7 @@ public void OnCompleted()
this._topParMeter?.Dispose();
}

protected void CreateGauge(Meter meter, string name, Func<Statistics, long> factory)
protected void CreateGauge(Meter meter, string name, Func<Statistics, long> factory, string? unit = null, string? description = null)
{
if (meter == null) throw new ArgumentNullException(nameof(meter));
if (name == null) throw new ArgumentNullException(nameof(name));
Expand All @@ -88,7 +85,7 @@ protected void CreateGauge(Meter meter, string name, Func<Statistics, long> fact
}

return Empty;
});
}, unit, description);
}

protected void CreateCounter(Meter meter, string name, Func<Statistics, long> factory, string? unit = null, string? description = null)
Expand All @@ -109,33 +106,6 @@ protected void CreateCounter(Meter meter, string name, Func<Statistics, long> fa
return Empty;
}, unit, description);
}

protected void CreateTpGauge(Meter meter, string name, Func<KeyValuePair<TopicStatistics,PartitionStatistics>,long> factory, string? unit = null,
string? description = null)
{
if (meter == null) throw new ArgumentNullException(nameof(meter));
if (name == null) throw new ArgumentNullException(nameof(name));

meter.CreateObservableGauge(name, () =>
{
Statistics? v = this.Value;

if (v != null)
{
return v.Topics
.SelectMany(p =>
p.Value.Partitions.Where(x => x.Key != PartitionStatistics.InternalUnassignedPartition)
.Select(x => new KeyValuePair<TopicStatistics, PartitionStatistics>(p.Value, x.Value)))
.Select(m => new Measurement<long>(factory(m), new[]
{
new KeyValuePair<string, object?>(TopicTagName, m.Key.Name),
new KeyValuePair<string, object?>(PartitionTagName, m.Value.Id)
}));
}

return Empty;
}, unit, description);
}
}

#pragma warning restore CA1001
Loading

0 comments on commit e7a6b85

Please sign in to comment.