Skip to content

Commit

Permalink
[improve][broker] Add msgInReplay subscription stat and metric to imp…
Browse files Browse the repository at this point in the history
…rove Key_Shared observability (apache#23224)

(cherry picked from commit 59424a8)
(cherry picked from commit 766d2a4)
  • Loading branch information
lhotari authored and srinath-ctds committed Sep 5, 2024
1 parent efa9a54 commit 4233c81
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,13 @@ public boolean containsStickyKeyHashes(Set<Integer> stickyKeyHashes) {
public NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
return messagesToRedeliver.items(maxMessagesToRead, PositionImpl::new);
}

/**
* Get the number of messages registered for replay in the redelivery controller.
*
* @return number of messages
*/
public int size() {
return messagesToRedeliver.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1270,5 +1270,9 @@ public Subscription getSubscription() {
return subscription;
}

public long getNumberOfMessagesInReplay() {
return redeliveryMessages.size();
}

private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,7 @@ public CompletableFuture<SubscriptionStatsImpl> getStatsAsync(Boolean getPrecise
subStats.unackedMessages = d.getTotalUnackedMessages();
subStats.blockedSubscriptionOnUnackedMsgs = d.isBlockedDispatcherOnUnackedMsgs();
subStats.msgDelayed = d.getNumberOfDelayedMessages();
subStats.msgInReplay = d.getNumberOfMessagesInReplay();
}
}
subStats.msgBacklog = getNumberOfEntriesInBacklog(getPreciseBacklog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class AggregatedNamespaceStats {
public ManagedLedgerStats managedLedgerStats = new ManagedLedgerStats();
public long msgBacklog;
public long msgDelayed;
public long msgInReplay;

public long ongoingTxnCount;
public long abortedTxnCount;
Expand Down Expand Up @@ -141,10 +142,12 @@ void updateStats(TopicStats stats) {
AggregatedSubscriptionStats subsStats =
subscriptionStats.computeIfAbsent(n, k -> new AggregatedSubscriptionStats());
msgDelayed += as.msgDelayed;
msgInReplay += as.msgInReplay;
subsStats.blockedSubscriptionOnUnackedMsgs = as.blockedSubscriptionOnUnackedMsgs;
subsStats.msgBacklog += as.msgBacklog;
subsStats.msgBacklogNoDelayed += as.msgBacklogNoDelayed;
subsStats.msgDelayed += as.msgDelayed;
subsStats.msgInReplay += as.msgInReplay;
subsStats.msgRateRedeliver += as.msgRateRedeliver;
subsStats.unackedMessages += as.unackedMessages;
subsStats.filterProcessedMsgCount += as.filterProcessedMsgCount;
Expand Down Expand Up @@ -200,6 +203,7 @@ public void reset() {

msgBacklog = 0;
msgDelayed = 0;
msgInReplay = 0;
ongoingTxnCount = 0;
abortedTxnCount = 0;
committedTxnCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class AggregatedSubscriptionStats {

public long msgDelayed;

public long msgInReplay;

long msgOutCounter;

long bytesOutCounter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl
subsStats.msgOutCounter = subscriptionStats.msgOutCounter;
subsStats.msgBacklog = subscriptionStats.msgBacklog;
subsStats.msgDelayed = subscriptionStats.msgDelayed;
subsStats.msgInReplay = subscriptionStats.msgInReplay;
subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
Expand Down Expand Up @@ -412,6 +413,8 @@ private static void printNamespaceStats(PrometheusMetricStreams stream, Aggregat

writeMetric(stream, "pulsar_subscription_delayed", stats.msgDelayed, cluster, namespace);

writeMetric(stream, "pulsar_subscription_in_replay", stats.msgInReplay, cluster, namespace);

writeMetric(stream, "pulsar_delayed_message_index_size_bytes", stats.delayedMessageIndexSizeInBytes, cluster,
namespace);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ public static void printTopicStats(PrometheusMetricStreams stream, TopicStats st
subsStats.msgBacklogNoDelayed, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_delayed",
subsStats.msgDelayed, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_in_replay",
subsStats.msgInReplay, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_redeliver",
subsStats.msgRateRedeliver, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_unacked_messages",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public interface SubscriptionStats {
/** Number of delayed messages currently being tracked. */
long getMsgDelayed();

/** Number of messages registered for replay. */
long getMsgInReplay();

/**
* Number of unacknowledged messages for the subscription, where an unacknowledged message is one that has been
* sent to a consumer but not yet acknowledged. Calculated by summing all {@link ConsumerStats#getUnackedMessages()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public class SubscriptionStatsImpl implements SubscriptionStats {
/** Number of delayed messages currently being tracked. */
public long msgDelayed;

/** Number of messages registered for replay. */
public long msgInReplay;

/**
* Number of unacknowledged messages for the subscription, where an unacknowledged message is one that has been
* sent to a consumer but not yet acknowledged. Calculated by summing all {@link ConsumerStatsImpl#unackedMessages}
Expand Down Expand Up @@ -167,6 +170,8 @@ public void reset() {
msgBacklog = 0;
backlogSize = 0;
msgBacklogNoDelayed = 0;
msgDelayed = 0;
msgInReplay = 0;
unackedMessages = 0;
type = null;
msgRateExpired = 0;
Expand Down Expand Up @@ -202,6 +207,7 @@ public SubscriptionStatsImpl add(SubscriptionStatsImpl stats) {
this.backlogSize += stats.backlogSize;
this.msgBacklogNoDelayed += stats.msgBacklogNoDelayed;
this.msgDelayed += stats.msgDelayed;
this.msgInReplay += stats.msgInReplay;
this.unackedMessages += stats.unackedMessages;
this.type = stats.type;
this.msgRateExpired += stats.msgRateExpired;
Expand Down

0 comments on commit 4233c81

Please sign in to comment.