Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Optimize performance of EncodeResult.updateProducerStats and DecodeRe…
Browse files Browse the repository at this point in the history
…sult.updateConsumerStats (#1642)

- reduce unnecessary garbage created by adding some caching

This came up in some profiling

![image](https://user-images.githubusercontent.com/66864/209136588-027071f1-2c96-4dd9-9182-986a540cbd86.png)

![image](https://user-images.githubusercontent.com/66864/209136672-5398e5a2-1b38-4492-bae1-304e3d0cb297.png)

(cherry picked from commit b35669b)
  • Loading branch information
lhotari authored and Demogorgon314 committed Jan 11, 2023
1 parent 76843d2 commit f1f1450
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import static io.streamnative.pulsar.handlers.kop.KopServerStats.BATCH_COUNT_PER_MEMORYRECORDS;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.CATEGORY_SERVER;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.FETCH_DECODE;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.GROUP_SCOPE;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.MESSAGE_PUBLISH;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.MESSAGE_QUEUED_LATENCY;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.MESSAGE_READ;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.NETWORK_TOTAL_BYTES_IN;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.NETWORK_TOTAL_BYTES_OUT;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.PARTITION_SCOPE;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.PENDING_TOPIC_LATENCY;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.PREPARE_METADATA;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.PRODUCE_ENCODE;
Expand All @@ -31,6 +33,7 @@
import static io.streamnative.pulsar.handlers.kop.KopServerStats.RESPONSE_BLOCKED_LATENCY;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.RESPONSE_BLOCKED_TIMES;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.SERVER_SCOPE;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.TOPIC_SCOPE;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.WAITING_FETCHES_TRIGGERED;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -47,6 +50,8 @@
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;

/**
Expand Down Expand Up @@ -152,6 +157,12 @@ public class RequestStats {

private final Map<ApiKeys, StatsLogger> apiKeysToStatsLogger = new ConcurrentHashMap<>();

private final Map<TopicPartition, StatsLogger> cachedLoggersForTopicPartitions = new ConcurrentHashMap<>();
private final Map<Pair<TopicPartition, String>, StatsLogger> cachedLoggersForTopicPartitionsAndGroups =
new ConcurrentHashMap<>();

private final Map<String, RequestStats> cachedRequestStatsForTenants = new ConcurrentHashMap<>();

public RequestStats(StatsLogger statsLogger) {
this.statsLogger = statsLogger;

Expand Down Expand Up @@ -221,6 +232,20 @@ public Number getSample() {
});
}

public StatsLogger getStatsLoggerForTopicPartition(TopicPartition topicPartition) {
return cachedLoggersForTopicPartitions.computeIfAbsent(topicPartition,
__ -> statsLogger
.scopeLabel(TOPIC_SCOPE, topicPartition.topic())
.scopeLabel(PARTITION_SCOPE, String.valueOf(topicPartition.partition())));
}

public StatsLogger getStatsLoggerForTopicPartitionAndGroup(TopicPartition topicPartition, String groupId) {
return cachedLoggersForTopicPartitionsAndGroups.computeIfAbsent(
Pair.of(topicPartition, groupId),
__ -> getStatsLoggerForTopicPartition(topicPartition)
.scopeLabel(GROUP_SCOPE, groupId));
}

/**
* Get the stats logger for Kafka requests.
*
Expand All @@ -240,6 +265,7 @@ public Set<ApiKeys> getApiKeysSet() {
}

public RequestStats forTenant(String tenant) {
return new RequestStats(statsLogger.scopeLabel("tenant", tenant));
return cachedRequestStatsForTenants.computeIfAbsent(tenant,
__ -> new RequestStats(statsLogger.scopeLabel("tenant", tenant)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
import static io.streamnative.pulsar.handlers.kop.KopServerStats.CONSUME_MESSAGE_CONVERSIONS;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.CONSUME_MESSAGE_CONVERSIONS_TIME_NANOS;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.ENTRIES_OUT;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.GROUP_SCOPE;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.MESSAGE_OUT;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.PARTITION_SCOPE;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.TOPIC_SCOPE;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -100,16 +97,14 @@ public void updateConsumerStats(final TopicPartition topicPartition,
RequestStats statsLogger) {
final int numMessages = EntryFormatter.parseNumMessages(records);

final StatsLogger statsLoggerForThisPartition = statsLogger.getStatsLogger()
.scopeLabel(TOPIC_SCOPE, topicPartition.topic())
.scopeLabel(PARTITION_SCOPE, String.valueOf(topicPartition.partition()));
final StatsLogger statsLoggerForThisPartition = statsLogger.getStatsLoggerForTopicPartition(topicPartition);

statsLoggerForThisPartition.getCounter(CONSUME_MESSAGE_CONVERSIONS).add(conversionCount);
statsLoggerForThisPartition.getOpStatsLogger(CONSUME_MESSAGE_CONVERSIONS_TIME_NANOS)
.registerSuccessfulEvent(conversionTimeNanos, TimeUnit.NANOSECONDS);
final StatsLogger statsLoggerForThisGroup;
if (groupId != null) {
statsLoggerForThisGroup = statsLoggerForThisPartition.scopeLabel(GROUP_SCOPE, groupId);
statsLoggerForThisGroup = statsLogger.getStatsLoggerForTopicPartitionAndGroup(topicPartition, groupId);
} else {
statsLoggerForThisGroup = statsLoggerForThisPartition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@

import static io.streamnative.pulsar.handlers.kop.KopServerStats.BYTES_IN;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.MESSAGE_IN;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.PARTITION_SCOPE;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.PRODUCE_MESSAGE_CONVERSIONS;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.PRODUCE_MESSAGE_CONVERSIONS_TIME_NANOS;
import static io.streamnative.pulsar.handlers.kop.KopServerStats.TOPIC_SCOPE;

import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
Expand Down Expand Up @@ -89,9 +87,7 @@ public void updateProducerStats(final TopicPartition topicPartition,
producer.updateRates(numMessages, numBytes);
producer.getTopic().incrementPublishCount(numMessages, numBytes);

final StatsLogger statsLoggerForThisPartition = requestStats.getStatsLogger()
.scopeLabel(TOPIC_SCOPE, topicPartition.topic())
.scopeLabel(PARTITION_SCOPE, String.valueOf(topicPartition.partition()));
final StatsLogger statsLoggerForThisPartition = requestStats.getStatsLoggerForTopicPartition(topicPartition);

statsLoggerForThisPartition.getCounter(BYTES_IN).add(numBytes);
statsLoggerForThisPartition.getCounter(MESSAGE_IN).add(numMessages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.base.Joiner;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.OpStatsLogger;
Expand All @@ -29,6 +30,11 @@ public class PrometheusStatsLogger implements StatsLogger {
private final String scope;
private final Map<String, String> labels;

private final Map<String, String> completeNameCache = new ConcurrentHashMap<>();
private final Map<String, ScopeContext> scopeContextCache = new ConcurrentHashMap<>();



PrometheusStatsLogger(PrometheusMetricsProvider provider, String scope, Map<String, String> labels) {
this.provider = provider;
this.scope = scope;
Expand Down Expand Up @@ -73,11 +79,12 @@ public StatsLogger scopeLabel(String labelName, String labelValue) {
}

private ScopeContext scopeContext(String name) {
return new ScopeContext(completeName(name), labels);
return scopeContextCache.computeIfAbsent(name, __ -> new ScopeContext(completeName(name), labels));
}

private String completeName(String name) {
return sanitizeMetricName(scope.isEmpty() ? name : Joiner.on('_').join(scope, name));
return completeNameCache.computeIfAbsent(name,
__ -> sanitizeMetricName(scope.isEmpty() ? name : Joiner.on('_').join(scope, name)));
}

private static String sanitizeMetricName(String metricName) {
Expand Down

0 comments on commit f1f1450

Please sign in to comment.