Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[feat][misc] PIP-264: Implement topic lookup metrics using OpenTeleme…
Browse files Browse the repository at this point in the history
…try (apache#22058)
  • Loading branch information
dragosvictor authored Mar 6, 2024
1 parent 68c1092 commit 4ff8600
Show file tree
Hide file tree
Showing 18 changed files with 505 additions and 86 deletions.
5 changes: 5 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,11 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -249,7 +250,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private final Timer brokerClientSharedTimer;

private MetricsGenerator metricsGenerator;
private PulsarBrokerOpenTelemetry openTelemetry;
private final PulsarBrokerOpenTelemetry openTelemetry;

private TransactionMetadataStoreService transactionMetadataStoreService;
private TransactionBufferProvider transactionBufferProvider;
Expand Down Expand Up @@ -305,13 +306,23 @@ public PulsarService(ServiceConfiguration config,
WorkerConfig workerConfig,
Optional<WorkerService> functionWorkerService,
Consumer<Integer> processTerminator) {
this(config, workerConfig, functionWorkerService, processTerminator, null);
}

public PulsarService(ServiceConfiguration config,
WorkerConfig workerConfig,
Optional<WorkerService> functionWorkerService,
Consumer<Integer> processTerminator,
Consumer<AutoConfiguredOpenTelemetrySdkBuilder> openTelemetrySdkBuilderCustomizer) {
state = State.Init;

// Validate correctness of configuration
PulsarConfigurationLoader.isComplete(config);
TransactionBatchedWriteValidator.validate(config);
this.config = config;

this.openTelemetry = new PulsarBrokerOpenTelemetry(config, openTelemetrySdkBuilderCustomizer);

// validate `advertisedAddress`, `advertisedListeners`, `internalListenerName`
this.advertisedListeners = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);

Expand Down Expand Up @@ -902,7 +913,6 @@ public void start() throws PulsarServerException {
}

this.metricsGenerator = new MetricsGenerator(this);
this.openTelemetry = new PulsarBrokerOpenTelemetry(config);

// Initialize the message protocol handlers.
// start the protocol handlers only after the broker is ready,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import com.google.common.hash.Hashing;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.prometheus.client.Counter;
import java.net.URI;
import java.net.URL;
Expand Down Expand Up @@ -97,10 +100,12 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.stats.MetricsUtil;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.slf4j.Logger;
Expand Down Expand Up @@ -146,18 +151,37 @@ public class NamespaceService implements AutoCloseable {

private final RedirectManager redirectManager;


public static final String LOOKUP_REQUEST_DURATION_METRIC_NAME = "pulsar.broker.request.topic.lookup.duration";

private static final AttributeKey<String> PULSAR_LOOKUP_RESPONSE_ATTRIBUTE =
AttributeKey.stringKey("pulsar.lookup.response");
public static final Attributes PULSAR_LOOKUP_RESPONSE_BROKER_ATTRIBUTES = Attributes.builder()
.put(PULSAR_LOOKUP_RESPONSE_ATTRIBUTE, "broker")
.build();
public static final Attributes PULSAR_LOOKUP_RESPONSE_REDIRECT_ATTRIBUTES = Attributes.builder()
.put(PULSAR_LOOKUP_RESPONSE_ATTRIBUTE, "redirect")
.build();
public static final Attributes PULSAR_LOOKUP_RESPONSE_FAILURE_ATTRIBUTES = Attributes.builder()
.put(PULSAR_LOOKUP_RESPONSE_ATTRIBUTE, "failure")
.build();

@PulsarDeprecatedMetric(newMetricName = LOOKUP_REQUEST_DURATION_METRIC_NAME)
private static final Counter lookupRedirects = Counter.build("pulsar_broker_lookup_redirects", "-").register();

@PulsarDeprecatedMetric(newMetricName = LOOKUP_REQUEST_DURATION_METRIC_NAME)
private static final Counter lookupFailures = Counter.build("pulsar_broker_lookup_failures", "-").register();

@PulsarDeprecatedMetric(newMetricName = LOOKUP_REQUEST_DURATION_METRIC_NAME)
private static final Counter lookupAnswers = Counter.build("pulsar_broker_lookup_answers", "-").register();

@PulsarDeprecatedMetric(newMetricName = LOOKUP_REQUEST_DURATION_METRIC_NAME)
private static final Summary lookupLatency = Summary.build("pulsar_broker_lookup", "-")
.quantile(0.50)
.quantile(0.99)
.quantile(0.999)
.quantile(1.0)
.register();

private final DoubleHistogram lookupLatencyHistogram;

/**
* Default constructor.
Expand All @@ -175,6 +199,12 @@ public NamespaceService(PulsarService pulsar) {
this.bundleSplitListeners = new CopyOnWriteArrayList<>();
this.localBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
this.redirectManager = new RedirectManager(pulsar);

this.lookupLatencyHistogram = pulsar.getOpenTelemetry().getMeter()
.histogramBuilder(LOOKUP_REQUEST_DURATION_METRIC_NAME)
.setDescription("The duration of topic lookup requests (either binary or HTTP)")
.setUnit("s")
.build();
}

public void initialize() {
Expand Down Expand Up @@ -204,18 +234,28 @@ public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicN
});
});

future.thenAccept(optResult -> {
lookupLatency.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
if (optResult.isPresent()) {
if (optResult.get().isRedirect()) {
lookupRedirects.inc();
future.whenComplete((lookupResult, throwable) -> {
var latencyNs = System.nanoTime() - startTime;
lookupLatency.observe(latencyNs, TimeUnit.NANOSECONDS);
Attributes attributes;
if (throwable == null) {
if (lookupResult.isPresent()) {
if (lookupResult.get().isRedirect()) {
lookupRedirects.inc();
attributes = PULSAR_LOOKUP_RESPONSE_REDIRECT_ATTRIBUTES;
} else {
lookupAnswers.inc();
attributes = PULSAR_LOOKUP_RESPONSE_BROKER_ATTRIBUTES;
}
} else {
lookupAnswers.inc();
// No lookup result, default to reporting as failure.
attributes = PULSAR_LOOKUP_RESPONSE_FAILURE_ATTRIBUTES;
}
} else {
lookupFailures.inc();
attributes = PULSAR_LOOKUP_RESPONSE_FAILURE_ATTRIBUTES;
}
}).exceptionally(ex -> {
lookupFailures.inc();
return null;
lookupLatencyHistogram.record(MetricsUtil.convertToSeconds(latencyNs, TimeUnit.NANOSECONDS), attributes);
});

return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
import io.prometheus.client.Histogram;
import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -179,6 +180,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.slf4j.Logger;
Expand Down Expand Up @@ -241,8 +243,19 @@ public class BrokerService implements Closeable {
protected final AtomicReference<Semaphore> lookupRequestSemaphore;
protected final AtomicReference<Semaphore> topicLoadRequestSemaphore;

public static final String TOPIC_LOOKUP_USAGE_METRIC_NAME = "pulsar.broker.request.topic.lookup.concurrent.usage";
public static final String TOPIC_LOOKUP_LIMIT_METRIC_NAME = "pulsar.broker.request.topic.lookup.concurrent.limit";
@PulsarDeprecatedMetric(newMetricName = TOPIC_LOOKUP_USAGE_METRIC_NAME)
private final ObserverGauge pendingLookupRequests;
private final ObservableLongUpDownCounter pendingLookupOperationsCounter;
private final ObservableLongUpDownCounter pendingLookupOperationsLimitCounter;

public static final String TOPIC_LOAD_USAGE_METRIC_NAME = "pulsar.broker.topic.load.concurrent.usage";
public static final String TOPIC_LOAD_LIMIT_METRIC_NAME = "pulsar.broker.topic.load.concurrent.limit";
@PulsarDeprecatedMetric(newMetricName = TOPIC_LOAD_USAGE_METRIC_NAME)
private final ObserverGauge pendingTopicLoadRequests;
private final ObservableLongUpDownCounter pendingTopicLoadOperationsCounter;
private final ObservableLongUpDownCounter pendingTopicLoadOperationsLimitCounter;

private final ScheduledExecutorService inactivityMonitor;
private final ScheduledExecutorService messageExpiryMonitor;
Expand Down Expand Up @@ -346,7 +359,6 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);


this.inactivityMonitor = OrderedScheduler.newSchedulerBuilder()
.name("pulsar-inactivity-monitor")
.numThreads(1)
Expand Down Expand Up @@ -374,9 +386,9 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
this.topicFactory = createPersistentTopicFactory();
// update dynamic configuration and register-listener
updateConfigurationAndRegisterListeners();
this.lookupRequestSemaphore = new AtomicReference<Semaphore>(
this.lookupRequestSemaphore = new AtomicReference<>(
new Semaphore(pulsar.getConfiguration().getMaxConcurrentLookupRequest(), false));
this.topicLoadRequestSemaphore = new AtomicReference<Semaphore>(
this.topicLoadRequestSemaphore = new AtomicReference<>(
new Semaphore(pulsar.getConfiguration().getMaxConcurrentTopicLoadRequest(), false));
if (pulsar.getConfiguration().getMaxUnackedMessagesPerBroker() > 0
&& pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked() > 0.0) {
Expand All @@ -403,15 +415,41 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
this.defaultServerBootstrap = defaultServerBootstrap();

this.pendingLookupRequests = ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-")
.supplier(() -> pulsar.getConfig().getMaxConcurrentLookupRequest()
- lookupRequestSemaphore.get().availablePermits())
.supplier(this::getPendingLookupRequest)
.register();
this.pendingLookupOperationsCounter = pulsar.getOpenTelemetry().getMeter()
.upDownCounterBuilder(TOPIC_LOOKUP_USAGE_METRIC_NAME)
.setDescription("The number of pending lookup operations in the broker. "
+ "When it reaches threshold \"maxConcurrentLookupRequest\" defined in broker.conf, "
+ "new requests are rejected.")
.setUnit("{operation}")
.buildWithCallback(measurement -> measurement.record(getPendingLookupRequest()));
this.pendingLookupOperationsLimitCounter = pulsar.getOpenTelemetry().getMeter()
.upDownCounterBuilder(TOPIC_LOOKUP_LIMIT_METRIC_NAME)
.setDescription("The maximum number of pending lookup operations in the broker. "
+ "Equal to \"maxConcurrentLookupRequest\" defined in broker.conf.")
.setUnit("{operation}")
.buildWithCallback(
measurement -> measurement.record(pulsar.getConfig().getMaxConcurrentLookupRequest()));

this.pendingTopicLoadRequests = ObserverGauge.build(
"pulsar_broker_topic_load_pending_requests", "-")
.supplier(() -> pulsar.getConfig().getMaxConcurrentTopicLoadRequest()
- topicLoadRequestSemaphore.get().availablePermits())
"pulsar_broker_topic_load_pending_requests", "-")
.supplier(this::getPendingTopicLoadRequests)
.register();
this.pendingTopicLoadOperationsCounter = pulsar.getOpenTelemetry().getMeter()
.upDownCounterBuilder(TOPIC_LOAD_USAGE_METRIC_NAME)
.setDescription("The number of pending topic load operations in the broker. "
+ "When it reaches threshold \"maxConcurrentTopicLoadRequest\" defined in broker.conf, "
+ "new requests are rejected.")
.setUnit("{operation}")
.buildWithCallback(measurement -> measurement.record(getPendingTopicLoadRequests()));
this.pendingTopicLoadOperationsLimitCounter = pulsar.getOpenTelemetry().getMeter()
.upDownCounterBuilder(TOPIC_LOAD_LIMIT_METRIC_NAME)
.setDescription("The maximum number of pending topic load operations in the broker. "
+ "Equal to \"maxConcurrentTopicLoadRequest\" defined in broker.conf.")
.setUnit("{operation}")
.buildWithCallback(
measurement -> measurement.record(pulsar.getConfig().getMaxConcurrentTopicLoadRequest()));

this.brokerEntryMetadataInterceptors = BrokerEntryMetadataUtils
.loadBrokerEntryMetadataInterceptors(pulsar.getConfiguration().getBrokerEntryMetadataInterceptors(),
Expand All @@ -423,6 +461,15 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
this.bundlesQuotas = new BundlesQuotas(pulsar);
}

private int getPendingLookupRequest() {
return pulsar.getConfig().getMaxConcurrentLookupRequest() - lookupRequestSemaphore.get().availablePermits();
}

private int getPendingTopicLoadRequests() {
return pulsar.getConfig().getMaxConcurrentTopicLoadRequest()
- topicLoadRequestSemaphore.get().availablePermits();
}

public void addTopicEventListener(TopicEventsListener... listeners) {
topicEventsDispatcher.addTopicEventListener(listeners);
getTopics().keys().forEach(topic ->
Expand Down Expand Up @@ -780,6 +827,8 @@ public CompletableFuture<Void> closeAsync() {
log.warn("Error in closing authenticationService", e);
}
pulsarStats.close();
pendingTopicLoadOperationsCounter.close();
pendingLookupOperationsCounter.close();
try {
delayedDeliveryTrackerFactory.close();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
package org.apache.pulsar.broker.stats;

import com.google.common.annotations.VisibleForTesting;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import java.io.Closeable;
import java.util.function.Consumer;
import lombok.Getter;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -33,11 +36,13 @@ public class PulsarBrokerOpenTelemetry implements Closeable {
@Getter
private final Meter meter;

public PulsarBrokerOpenTelemetry(ServiceConfiguration config) {
public PulsarBrokerOpenTelemetry(ServiceConfiguration config,
@VisibleForTesting Consumer<AutoConfiguredOpenTelemetrySdkBuilder> builderCustomizer) {
openTelemetryService = OpenTelemetryService.builder()
.clusterName(config.getClusterName())
.serviceName(SERVICE_NAME)
.serviceVersion(PulsarVersion.getVersion())
.builderCustomizer(builderCustomizer)
.build();
meter = openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.broker");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,19 +448,27 @@ protected PulsarTestContext.Builder createPulsarTestContextBuilder(ServiceConfig
return builder;
}

protected PulsarTestContext createAdditionalPulsarTestContext(ServiceConfiguration conf) throws Exception {
return createAdditionalPulsarTestContext(conf, null);
}
/**
* This method can be used in test classes for creating additional PulsarTestContext instances
* that share the same mock ZooKeeper and BookKeeper instances as the main PulsarTestContext instance.
*
* @param conf the ServiceConfiguration instance to use
* @param builderCustomizer a consumer that can be used to customize the builder configuration
* @return the PulsarTestContext instance
* @throws Exception if an error occurs
*/
protected PulsarTestContext createAdditionalPulsarTestContext(ServiceConfiguration conf) throws Exception {
return createPulsarTestContextBuilder(conf)
protected PulsarTestContext createAdditionalPulsarTestContext(ServiceConfiguration conf,
Consumer<PulsarTestContext.Builder> builderCustomizer) throws Exception {
var builder = createPulsarTestContextBuilder(conf)
.reuseMockBookkeeperAndMetadataStores(pulsarTestContext)
.reuseSpyConfig(pulsarTestContext)
.build();
.reuseSpyConfig(pulsarTestContext);
if (builderCustomizer != null) {
builderCustomizer.accept(builder);
}
return builder.build();
}

protected void waitForZooKeeperWatchers() {
Expand Down
Loading

0 comments on commit 4ff8600

Please sign in to comment.