Skip to content

Commit

Permalink
update metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
IharYakimush committed Feb 6, 2025
1 parent 25cbd59 commit 2e4ecab
Show file tree
Hide file tree
Showing 18 changed files with 311 additions and 100 deletions.
14 changes: 13 additions & 1 deletion src/Epam.Kafka/Metrics/ConsumerMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

using Confluent.Kafka;

using Epam.Kafka.Stats;
using Epam.Kafka.Stats.Topic;

namespace Epam.Kafka.Metrics;

Expand Down Expand Up @@ -60,6 +60,18 @@ private void ConfigureCgMeter(Meter cgMeter)

private void ConfigureTopParMeter(Meter topParMeter)
{
topParMeter.CreateObservableGauge("epam_kafka_stats_tp_fetch_state", () => this.Value!.Topics
.SelectMany(p =>
p.Value.Partitions.Where(x => x.Key != PartitionStatistics.InternalUnassignedPartition)
.Select(x => new KeyValuePair<TopicStatistics, PartitionStatistics>(p.Value, x.Value)))
.Select(m => new Measurement<long>((long)m.Value.FetchState, new[]
{
new KeyValuePair<string, object?>(DesiredTagName, m.Value.Desired),
new KeyValuePair<string, object?>(TopicTagName, m.Key.Name),
new KeyValuePair<string, object?>(PartitionTagName, m.Value.Id),
})),
null, "Consumer lag");

topParMeter.CreateObservableGauge("epam_kafka_stats_tp_lag", () => this.Value!.Topics
.SelectMany(p =>
p.Value.Partitions.Where(x => x.Key != PartitionStatistics.InternalUnassignedPartition)
Expand Down
4 changes: 4 additions & 0 deletions src/Epam.Kafka/Statistics.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
// Copyright © 2024 EPAM Systems

using Epam.Kafka.Stats;
using Epam.Kafka.Stats.Broker;
using Epam.Kafka.Stats.Eos;
using Epam.Kafka.Stats.Topic;

using System.Diagnostics.Metrics;
using System.Text.Json;
using System.Text.Json.Serialization;
using Epam.Kafka.Stats.Group;

namespace Epam.Kafka;

Expand Down
42 changes: 42 additions & 0 deletions src/Epam.Kafka/Stats/Broker/BrokerSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright © 2024 EPAM Systems

using System.Text.Json.Serialization;

namespace Epam.Kafka.Stats.Broker;

/// <summary>
/// Enum representing the source state of a Broker in librdkafka.
/// </summary>
[JsonConverter(typeof(JsonStringEnumConverter<BrokerSource>))]
public enum BrokerSource
{
/// <summary>
/// Not available
/// </summary>
[JsonIgnore]
None,

/// <summary>
/// State indicating the broker has been learned through broker metadata.
/// </summary>
[JsonStringEnumMemberName("learned")]
Learned,

/// <summary>
/// State indicating the broker has been configured by the user.
/// </summary>
[JsonStringEnumMemberName("configured")]
Configured,

/// <summary>
/// State indicating the broker is managed internally by the system.
/// </summary>
[JsonStringEnumMemberName("internal")]
Internal,

/// <summary>
/// State indicating the broker acts as a logical broker within the Kafka cluster.
/// </summary>
[JsonStringEnumMemberName("logical")]
Logical
}
91 changes: 91 additions & 0 deletions src/Epam.Kafka/Stats/Broker/BrokerState.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright © 2024 EPAM Systems

using System.Text.Json.Serialization;

namespace Epam.Kafka.Stats.Broker;

/// <summary>
/// Enum representing the state of a Broker in librdkafka.
/// See https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_broker.h
/// </summary>
[JsonConverter(typeof(JsonStringEnumConverter<BrokerState>))]
public enum BrokerState
{
/// <summary>
/// State not available
/// </summary>
[JsonIgnore]
None,

/// <summary>
/// Initial state before any connection attempts.
/// </summary>
[JsonStringEnumMemberName("INIT")]
Init,

/// <summary>
/// Broker is not connected.
/// </summary>
[JsonStringEnumMemberName("DOWN")]
Down,

/// <summary>
/// Broker is trying to connect.
/// </summary>
[JsonStringEnumMemberName("TRY_CONNECT")]
TryConnect,

/// <summary>
/// Broker is connecting.
/// </summary>
[JsonStringEnumMemberName("CONNECT")]
Connect,

/// <summary>
/// SSL handshake is underway.
/// </summary>
[JsonStringEnumMemberName("SSL_HANDSHAKE")]
SslHandshake,

/// <summary>
/// Broker is using legacy authentication.
/// </summary>
[JsonStringEnumMemberName("AUTH_LEGACY")]
AuthLegacy,

/// <summary>
/// Broker is operational for Kafka protocol operations.
/// </summary>
[JsonStringEnumMemberName("UP")]
Up,

/// <summary>
/// Broker state is being updated.
/// </summary>
[JsonStringEnumMemberName("UPDATE")]
Update,

/// <summary>
/// Broker is querying for supported API versions.
/// </summary>
[JsonStringEnumMemberName("APIVERSION_QUERY")]
ApiVersionQuery,

/// <summary>
/// Authentication handshake is in progress.
/// </summary>
[JsonStringEnumMemberName("AUTH_HANDSHAKE")]
AuthHandshake,

/// <summary>
/// Authentication request is being processed.
/// </summary>
[JsonStringEnumMemberName("AUTH_REQ")]
AuthReq,

/// <summary>
/// Broker is re-authenticating.
/// </summary>
[JsonStringEnumMemberName("REAUTH")]
Reauth,
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
// Copyright © 2024 EPAM Systems

using Confluent.Kafka;

using System.Text.Json.Serialization;

namespace Epam.Kafka.Stats;
namespace Epam.Kafka.Stats.Broker;

/// <summary>
/// Per broker statistics. See https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md for details.
Expand Down Expand Up @@ -33,13 +32,13 @@ public class BrokerStatistics
/// Broker source (learned, configured, internal, logical)
/// </summary>
[JsonPropertyName("source")]
public string Source { get; set; } = string.Empty;
public BrokerSource Source { get; set; } = BrokerSource.None;

/// <summary>
/// Broker state (INIT, DOWN, CONNECT, AUTH, APIVERSION_QUERY, AUTH_HANDSHAKE, UP, UPDATE)
/// Broker state
/// </summary>
[JsonPropertyName("state")]
public string State { get; set; } = string.Empty;
public BrokerState State { get; set; } = BrokerState.None;

/// <summary>
/// Time since last broker state change (microseconds)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright © 2024 EPAM Systems

namespace Epam.Kafka.Stats;
namespace Epam.Kafka.Stats.Eos;

/// <summary>
/// Enum representing the states of an idempotent producer's ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

using System.Text.Json.Serialization;

namespace Epam.Kafka.Stats;
namespace Epam.Kafka.Stats.Eos;

/// <summary>
/// EOS / Idempotent producer state and metrics
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright © 2024 EPAM Systems

namespace Epam.Kafka.Stats;
namespace Epam.Kafka.Stats.Eos;

/// <summary>
/// Enum representing the current states of a transactional producer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

using System.Text.Json.Serialization;

namespace Epam.Kafka.Stats;
namespace Epam.Kafka.Stats.Group;

/// <summary>
/// Enum representing the local consumer group handler's join states in librdkafka.
/// </summary>
public enum ConsumerGroupJoinState
[JsonConverter(typeof(JsonStringEnumConverter<GroupJoinState>))]
public enum GroupJoinState
{
/// <summary>
/// State not available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

using System.Text.Json.Serialization;

namespace Epam.Kafka.Stats;
namespace Epam.Kafka.Stats.Group;

/// <summary>
/// Enum representing the local consumer group handler's overall states in librdkafka.
/// </summary>
public enum ConsumerGroupState
[JsonConverter(typeof(JsonStringEnumConverter<GroupState>))]
public enum GroupState
{
/// <summary>
/// State not available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

using System.Text.Json.Serialization;

namespace Epam.Kafka.Stats;
namespace Epam.Kafka.Stats.Group;

/// <summary>
/// Consumer group metrics.
Expand All @@ -13,7 +13,7 @@ public class GroupStatistics
/// Local consumer group handler's state.
/// </summary>
[JsonPropertyName("state")]
public ConsumerGroupState State { get; set; } = ConsumerGroupState.None;
public GroupState State { get; set; } = GroupState.None;

/// <summary>
/// Time elapsed since last state change (milliseconds).
Expand All @@ -31,7 +31,7 @@ public class GroupStatistics
/// Local consumer group handler's join state.
/// </summary>
[JsonPropertyName("join_state")]
public ConsumerGroupJoinState JoinState { get; set; } = ConsumerGroupJoinState.None;
public GroupJoinState JoinState { get; set; } = GroupJoinState.None;

/// <summary>
/// Time elapsed since last re-balance (assign or revoke) (milliseconds).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

using System.Text.Json.Serialization;

namespace Epam.Kafka.Stats;
namespace Epam.Kafka.Stats.Topic;

/// <summary>
/// Enum representing the states of consumer fetch for a partition.
/// </summary>
[JsonConverter(typeof(JsonStringEnumConverter<PartitionFetchState>))]
public enum PartitionFetchState
{
/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

using System.Text.Json.Serialization;

namespace Epam.Kafka.Stats;
namespace Epam.Kafka.Stats.Topic;

/// <summary>
/// Partition statistics. See https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md for details.
Expand Down Expand Up @@ -45,7 +45,7 @@ public class PartitionStatistics
public long FetchCount { get; set; }

/// <summary>
/// Consumer fetch state for this partition (none, stopping, stopped, offset-query, offset-wait, active)
/// Consumer fetch state for this partition
/// </summary>
[JsonPropertyName("fetch_state")]
public PartitionFetchState FetchState { get; set; } = PartitionFetchState.None;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

using System.Text.Json.Serialization;

namespace Epam.Kafka.Stats;
namespace Epam.Kafka.Stats.Topic;

/// <summary>
/// Topic statistics. See https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md for details.
Expand Down
2 changes: 1 addition & 1 deletion tests/Epam.Kafka.Tests/Data/ConsumerStat.json
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@
"nodeid": 534,
"nodename": "kafka-4.sandbox.contoso.com:9095",
"source": "learned",
"state": "UP",
"state": "APIVERSION_QUERY",
"stateage": 23884830,
"outbuf_cnt": 0,
"outbuf_msg_cnt": 0,
Expand Down
Loading

0 comments on commit 2e4ecab

Please sign in to comment.