From 5beb23a79c171563d40ad393ed8230a70df27f3c Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Wed, 27 Nov 2024 19:18:41 +0800 Subject: [PATCH] Make the proxy adapter worker thread configrable (#1549) --- .../pulsar/handlers/mqtt/proxy/MQTTProxyConfiguration.java | 7 +++++++ .../handlers/mqtt/proxy/channel/MQTTProxyAdapter.java | 4 ++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyConfiguration.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyConfiguration.java index ac09e4c9..2e367361 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyConfiguration.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyConfiguration.java @@ -69,6 +69,13 @@ public class MQTTProxyConfiguration extends MQTTCommonConfiguration { ) private int maxNoOfChannels = 1; + @FieldContext( + category = CATEGORY_MQTT, + doc = "Number of threads to use for Netty IO for proxy to broker. " + + "Default is set to `Runtime.getRuntime().availableProcessors()`" + ) + private int proxyAdapterNumIOThreads = Runtime.getRuntime().availableProcessors(); + @FieldContext( category = CATEGORY_MQTT, doc = "Proxy connect to broker timeout (ms)" diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/MQTTProxyAdapter.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/MQTTProxyAdapter.java index bc82661a..83bb64dc 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/MQTTProxyAdapter.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/MQTTProxyAdapter.java @@ -71,14 +71,14 @@ public class MQTTProxyAdapter { private final AtomicLong counter = new AtomicLong(0); @Getter private final ConcurrentMap>> pool; - private final int workerThread = Runtime.getRuntime().availableProcessors(); private final int maxNoOfChannels; public MQTTProxyAdapter(MQTTProxyService proxyService) { this.proxyService = proxyService; this.pool = new ConcurrentHashMap<>(); this.bootstrap = new Bootstrap(); - this.eventLoopGroup = EventLoopUtil.newEventLoopGroup(workerThread, false, threadFactory); + this.eventLoopGroup = EventLoopUtil.newEventLoopGroup( + proxyService.getProxyConfig().getProxyAdapterNumIOThreads(), false, threadFactory); this.maxNoOfChannels = proxyService.getProxyConfig().getMaxNoOfChannels(); this.bootstrap.group(eventLoopGroup) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, proxyService.getProxyConfig().getConnectTimeoutMs())