From f1f14500e351c85b21f7f0c893346306f32b8ac4 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 11 Jan 2023 11:06:28 +0800 Subject: [PATCH] Optimize performance of EncodeResult.updateProducerStats and DecodeResult.updateConsumerStats (#1642) - reduce unnecessary garbage created by adding some caching This came up in some profiling ![image](https://user-images.githubusercontent.com/66864/209136588-027071f1-2c96-4dd9-9182-986a540cbd86.png) ![image](https://user-images.githubusercontent.com/66864/209136672-5398e5a2-1b38-4492-bae1-304e3d0cb297.png) (cherry picked from commit b35669b1a1c1e306ff690bec36dd0b3c65ed4db7) --- .../pulsar/handlers/kop/RequestStats.java | 28 ++++++++++++++++++- .../handlers/kop/format/DecodeResult.java | 9 ++---- .../handlers/kop/format/EncodeResult.java | 6 +--- .../kop/stats/PrometheusStatsLogger.java | 11 ++++++-- 4 files changed, 39 insertions(+), 15 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/RequestStats.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/RequestStats.java index e25d7f5942..cca2df7de6 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/RequestStats.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/RequestStats.java @@ -18,11 +18,13 @@ import static io.streamnative.pulsar.handlers.kop.KopServerStats.BATCH_COUNT_PER_MEMORYRECORDS; import static io.streamnative.pulsar.handlers.kop.KopServerStats.CATEGORY_SERVER; import static io.streamnative.pulsar.handlers.kop.KopServerStats.FETCH_DECODE; +import static io.streamnative.pulsar.handlers.kop.KopServerStats.GROUP_SCOPE; import static io.streamnative.pulsar.handlers.kop.KopServerStats.MESSAGE_PUBLISH; import static io.streamnative.pulsar.handlers.kop.KopServerStats.MESSAGE_QUEUED_LATENCY; import static io.streamnative.pulsar.handlers.kop.KopServerStats.MESSAGE_READ; import static io.streamnative.pulsar.handlers.kop.KopServerStats.NETWORK_TOTAL_BYTES_IN; import static io.streamnative.pulsar.handlers.kop.KopServerStats.NETWORK_TOTAL_BYTES_OUT; +import static io.streamnative.pulsar.handlers.kop.KopServerStats.PARTITION_SCOPE; import static io.streamnative.pulsar.handlers.kop.KopServerStats.PENDING_TOPIC_LATENCY; import static io.streamnative.pulsar.handlers.kop.KopServerStats.PREPARE_METADATA; import static io.streamnative.pulsar.handlers.kop.KopServerStats.PRODUCE_ENCODE; @@ -31,6 +33,7 @@ import static io.streamnative.pulsar.handlers.kop.KopServerStats.RESPONSE_BLOCKED_LATENCY; import static io.streamnative.pulsar.handlers.kop.KopServerStats.RESPONSE_BLOCKED_TIMES; import static io.streamnative.pulsar.handlers.kop.KopServerStats.SERVER_SCOPE; +import static io.streamnative.pulsar.handlers.kop.KopServerStats.TOPIC_SCOPE; import static io.streamnative.pulsar.handlers.kop.KopServerStats.WAITING_FETCHES_TRIGGERED; import com.google.common.annotations.VisibleForTesting; @@ -47,6 +50,8 @@ import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.annotations.StatsDoc; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; /** @@ -152,6 +157,12 @@ public class RequestStats { private final Map apiKeysToStatsLogger = new ConcurrentHashMap<>(); + private final Map cachedLoggersForTopicPartitions = new ConcurrentHashMap<>(); + private final Map, StatsLogger> cachedLoggersForTopicPartitionsAndGroups = + new ConcurrentHashMap<>(); + + private final Map cachedRequestStatsForTenants = new ConcurrentHashMap<>(); + public RequestStats(StatsLogger statsLogger) { this.statsLogger = statsLogger; @@ -221,6 +232,20 @@ public Number getSample() { }); } + public StatsLogger getStatsLoggerForTopicPartition(TopicPartition topicPartition) { + return cachedLoggersForTopicPartitions.computeIfAbsent(topicPartition, + __ -> statsLogger + .scopeLabel(TOPIC_SCOPE, topicPartition.topic()) + .scopeLabel(PARTITION_SCOPE, String.valueOf(topicPartition.partition()))); + } + + public StatsLogger getStatsLoggerForTopicPartitionAndGroup(TopicPartition topicPartition, String groupId) { + return cachedLoggersForTopicPartitionsAndGroups.computeIfAbsent( + Pair.of(topicPartition, groupId), + __ -> getStatsLoggerForTopicPartition(topicPartition) + .scopeLabel(GROUP_SCOPE, groupId)); + } + /** * Get the stats logger for Kafka requests. * @@ -240,6 +265,7 @@ public Set getApiKeysSet() { } public RequestStats forTenant(String tenant) { - return new RequestStats(statsLogger.scopeLabel("tenant", tenant)); + return cachedRequestStatsForTenants.computeIfAbsent(tenant, + __ -> new RequestStats(statsLogger.scopeLabel("tenant", tenant))); } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/DecodeResult.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/DecodeResult.java index b980ccb96e..5cedc7c4a8 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/DecodeResult.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/DecodeResult.java @@ -17,10 +17,7 @@ import static io.streamnative.pulsar.handlers.kop.KopServerStats.CONSUME_MESSAGE_CONVERSIONS; import static io.streamnative.pulsar.handlers.kop.KopServerStats.CONSUME_MESSAGE_CONVERSIONS_TIME_NANOS; import static io.streamnative.pulsar.handlers.kop.KopServerStats.ENTRIES_OUT; -import static io.streamnative.pulsar.handlers.kop.KopServerStats.GROUP_SCOPE; import static io.streamnative.pulsar.handlers.kop.KopServerStats.MESSAGE_OUT; -import static io.streamnative.pulsar.handlers.kop.KopServerStats.PARTITION_SCOPE; -import static io.streamnative.pulsar.handlers.kop.KopServerStats.TOPIC_SCOPE; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -100,16 +97,14 @@ public void updateConsumerStats(final TopicPartition topicPartition, RequestStats statsLogger) { final int numMessages = EntryFormatter.parseNumMessages(records); - final StatsLogger statsLoggerForThisPartition = statsLogger.getStatsLogger() - .scopeLabel(TOPIC_SCOPE, topicPartition.topic()) - .scopeLabel(PARTITION_SCOPE, String.valueOf(topicPartition.partition())); + final StatsLogger statsLoggerForThisPartition = statsLogger.getStatsLoggerForTopicPartition(topicPartition); statsLoggerForThisPartition.getCounter(CONSUME_MESSAGE_CONVERSIONS).add(conversionCount); statsLoggerForThisPartition.getOpStatsLogger(CONSUME_MESSAGE_CONVERSIONS_TIME_NANOS) .registerSuccessfulEvent(conversionTimeNanos, TimeUnit.NANOSECONDS); final StatsLogger statsLoggerForThisGroup; if (groupId != null) { - statsLoggerForThisGroup = statsLoggerForThisPartition.scopeLabel(GROUP_SCOPE, groupId); + statsLoggerForThisGroup = statsLogger.getStatsLoggerForTopicPartitionAndGroup(topicPartition, groupId); } else { statsLoggerForThisGroup = statsLoggerForThisPartition; } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EncodeResult.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EncodeResult.java index 9ea789a9a6..465c4a0dc9 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EncodeResult.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EncodeResult.java @@ -15,10 +15,8 @@ import static io.streamnative.pulsar.handlers.kop.KopServerStats.BYTES_IN; import static io.streamnative.pulsar.handlers.kop.KopServerStats.MESSAGE_IN; -import static io.streamnative.pulsar.handlers.kop.KopServerStats.PARTITION_SCOPE; import static io.streamnative.pulsar.handlers.kop.KopServerStats.PRODUCE_MESSAGE_CONVERSIONS; import static io.streamnative.pulsar.handlers.kop.KopServerStats.PRODUCE_MESSAGE_CONVERSIONS_TIME_NANOS; -import static io.streamnative.pulsar.handlers.kop.KopServerStats.TOPIC_SCOPE; import io.netty.buffer.ByteBuf; import io.netty.util.Recycler; @@ -89,9 +87,7 @@ public void updateProducerStats(final TopicPartition topicPartition, producer.updateRates(numMessages, numBytes); producer.getTopic().incrementPublishCount(numMessages, numBytes); - final StatsLogger statsLoggerForThisPartition = requestStats.getStatsLogger() - .scopeLabel(TOPIC_SCOPE, topicPartition.topic()) - .scopeLabel(PARTITION_SCOPE, String.valueOf(topicPartition.partition())); + final StatsLogger statsLoggerForThisPartition = requestStats.getStatsLoggerForTopicPartition(topicPartition); statsLoggerForThisPartition.getCounter(BYTES_IN).add(numBytes); statsLoggerForThisPartition.getCounter(MESSAGE_IN).add(numMessages); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/stats/PrometheusStatsLogger.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/stats/PrometheusStatsLogger.java index 1092e2fad9..2a4c70e26f 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/stats/PrometheusStatsLogger.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/stats/PrometheusStatsLogger.java @@ -16,6 +16,7 @@ import com.google.common.base.Joiner; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.OpStatsLogger; @@ -29,6 +30,11 @@ public class PrometheusStatsLogger implements StatsLogger { private final String scope; private final Map labels; + private final Map completeNameCache = new ConcurrentHashMap<>(); + private final Map scopeContextCache = new ConcurrentHashMap<>(); + + + PrometheusStatsLogger(PrometheusMetricsProvider provider, String scope, Map labels) { this.provider = provider; this.scope = scope; @@ -73,11 +79,12 @@ public StatsLogger scopeLabel(String labelName, String labelValue) { } private ScopeContext scopeContext(String name) { - return new ScopeContext(completeName(name), labels); + return scopeContextCache.computeIfAbsent(name, __ -> new ScopeContext(completeName(name), labels)); } private String completeName(String name) { - return sanitizeMetricName(scope.isEmpty() ? name : Joiner.on('_').join(scope, name)); + return completeNameCache.computeIfAbsent(name, + __ -> sanitizeMetricName(scope.isEmpty() ? name : Joiner.on('_').join(scope, name))); } private static String sanitizeMetricName(String metricName) {