From eba4cd96b65bd0672721f38cf328825c63d5a810 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Fri, 3 Jan 2025 10:33:56 +0800 Subject: [PATCH] [fix][client][branch-3.0] Fix compatibility between kerberos and tls (#23801) Signed-off-by: Zixuan Liu (cherry picked from commit 4b69b30a705a060b1d370c1551da34cee9e61d93) --- .../client/api/TlsProducerConsumerTest.java | 69 +++++++++ .../internal/http/AsyncHttpConnector.java | 3 +- .../pulsar/client/api/Authentication.java | 1 + .../apache/pulsar/client/cli/CmdConsume.java | 2 +- .../apache/pulsar/client/cli/CmdProduce.java | 2 +- .../org/apache/pulsar/client/cli/CmdRead.java | 2 +- .../apache/pulsar/client/impl/HttpClient.java | 3 +- .../client/impl/PulsarChannelInitializer.java | 146 ++++++++++-------- .../proxy/server/AdminProxyHandler.java | 32 ++-- .../proxy/server/DirectProxyHandler.java | 103 ++++++------ .../server/ProxyServiceTlsStarterTest.java | 2 + .../socket/client/PerformanceClient.java | 2 +- 12 files changed, 227 insertions(+), 140 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java index 879289eb65dc8..85e6dd6292f95 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java @@ -18,18 +18,24 @@ */ package org.apache.pulsar.client.api; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import lombok.Cleanup; import org.apache.commons.compress.utils.IOUtils; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -293,4 +299,67 @@ public void testTlsTransport(Supplier url, Authentication auth) throws E @Cleanup Producer ignored = client.newProducer().topic(topicName).create(); } + + @Test + public void testTlsWithFakeAuthentication() throws Exception { + Authentication authentication = spy(new Authentication() { + @Override + public String getAuthMethodName() { + return "fake"; + } + + @Override + public void configure(Map authParams) { + + } + + @Override + public void start() { + + } + + @Override + public void close() { + + } + + @Override + public AuthenticationDataProvider getAuthData(String brokerHostName) { + return mock(AuthenticationDataProvider.class); + } + }); + + @Cleanup + PulsarAdmin pulsarAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsar().getWebServiceAddressTls()) + .tlsTrustCertsFilePath(CA_CERT_FILE_PATH) + .allowTlsInsecureConnection(false) + .enableTlsHostnameVerification(false) + .tlsKeyFilePath(getTlsFileForClient("admin.key-pk8")) + .tlsCertificateFilePath(getTlsFileForClient("admin.cert")) + .authentication(authentication) + .build(); + pulsarAdmin.tenants().getTenants(); + verify(authentication, never()).getAuthData(); + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPulsar().getBrokerServiceUrlTls()) + .tlsTrustCertsFilePath(CA_CERT_FILE_PATH) + .allowTlsInsecureConnection(false) + .enableTlsHostnameVerification(false) + .tlsKeyFilePath(getTlsFileForClient("admin.key-pk8")) + .tlsCertificateFilePath(getTlsFileForClient("admin.cert")) + .authentication(authentication).build(); + verify(authentication, never()).getAuthData(); + + final String topicName = "persistent://my-property/my-ns/my-topic-1"; + internalSetUpForNamespace(); + @Cleanup + Consumer ignoredConsumer = + pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name").subscribe(); + verify(authentication, never()).getAuthData(); + @Cleanup + Producer ignoredProducer = pulsarClient.newProducer().topic(topicName).create(); + verify(authentication, never()).getAuthData(); + } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java index 60e1b43442428..edcde2f27cece 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java @@ -186,7 +186,8 @@ private void configureAsyncHttpClientSslEngineFactory(ClientConfigurationData co DefaultAsyncHttpClientConfig.Builder confBuilder) throws GeneralSecurityException, IOException { // Set client key and certificate if available - AuthenticationDataProvider authData = conf.getAuthentication().getAuthData(); + AuthenticationDataProvider authData = conf.getAuthentication().getAuthData(serviceNameResolver + .resolveHostUri().getHost()); SslEngineFactory sslEngineFactory = null; if (conf.isUseKeyStoreTls()) { diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java index 9bf1b24cbdb32..48d9e3e230701 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java @@ -48,6 +48,7 @@ public interface Authentication extends Closeable, Serializable { * @throws PulsarClientException * any other error */ + @Deprecated default AuthenticationDataProvider getAuthData() throws PulsarClientException { throw new UnsupportedAuthenticationException("Method not implemented!"); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java index 0c65604cbe6b8..1375d61f10460 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java @@ -254,7 +254,7 @@ private int consumeFromWebSocket(String topic) { try { if (authentication != null) { authentication.start(); - AuthenticationDataProvider authData = authentication.getAuthData(); + AuthenticationDataProvider authData = authentication.getAuthData(consumerUri.getHost()); if (authData.hasDataForHttp()) { for (Map.Entry kv : authData.getHttpHeaders()) { consumeRequest.setHeader(kv.getKey(), kv.getValue()); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java index 77a1612f30e67..19b3b6603e1e7 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java @@ -469,7 +469,7 @@ private int publishToWebSocket(String topic) { try { if (authentication != null) { authentication.start(); - AuthenticationDataProvider authData = authentication.getAuthData(); + AuthenticationDataProvider authData = authentication.getAuthData(produceUri.getHost()); if (authData.hasDataForHttp()) { for (Map.Entry kv : authData.getHttpHeaders()) { produceRequest.setHeader(kv.getKey(), kv.getValue()); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java index 4ad8a5293f6e1..53d849aa27209 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java @@ -243,7 +243,7 @@ private int readFromWebSocket(String topic) { try { if (authentication != null) { authentication.start(); - AuthenticationDataProvider authData = authentication.getAuthData(); + AuthenticationDataProvider authData = authentication.getAuthData(readerUri.getHost()); if (authData.hasDataForHttp()) { for (Map.Entry kv : authData.getHttpHeaders()) { readRequest.setHeader(kv.getKey(), kv.getValue()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java index ea45fe8981e34..d732a6a36d98d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java @@ -93,7 +93,8 @@ public boolean keepAlive(InetSocketAddress remoteAddress, Request ahcRequest, if ("https".equals(serviceNameResolver.getServiceUri().getServiceName())) { try { // Set client key and certificate if available - AuthenticationDataProvider authData = authentication.getAuthData(); + AuthenticationDataProvider authData = + authentication.getAuthData(serviceNameResolver.resolveHostUri().getHost()); if (conf.isUseKeyStoreTls()) { SSLContext sslCtx = null; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java index dff423d19fbef..6385ab78ccda8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java @@ -29,8 +29,10 @@ import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslProvider; import java.net.InetSocketAddress; +import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import lombok.Getter; @@ -59,9 +61,9 @@ public class PulsarChannelInitializer extends ChannelInitializer private final InetSocketAddress socks5ProxyAddress; private final String socks5ProxyUsername; private final String socks5ProxyPassword; - - private final Supplier sslContextSupplier; - private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder; + private final ClientConfigurationData conf; + private Map> sslContextSupplierMap; + private Map nettySSLContextAutoRefreshBuilderMap; private static final long TLS_CERTIFICATE_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(1); @@ -76,15 +78,34 @@ public PulsarChannelInitializer(ClientConfigurationData conf, Supplier(); + this.nettySSLContextAutoRefreshBuilderMap = new ConcurrentHashMap<>(); + } - if (tlsEnabled) { - if (tlsEnabledWithKeyStore) { - AuthenticationDataProvider authData1 = conf.getAuthentication().getAuthData(); - if (StringUtils.isBlank(conf.getTlsTrustStorePath())) { - throw new PulsarClientException("Failed to create TLS context, the tlsTrustStorePath" - + " need to be configured if useKeyStoreTls enabled"); - } - nettySSLContextAutoRefreshBuilder = new NettySSLContextAutoRefreshBuilder( + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast("consolidation", new FlushConsolidationHandler(1024, true)); + + // Setup channel except for the SsHandler for TLS enabled connections + ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.getEncoder(tlsEnabled)); + + ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder( + Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4)); + ChannelHandler clientCnx = clientCnxSupplier.get(); + ch.pipeline().addLast("handler", clientCnx); + } + + private NettySSLContextAutoRefreshBuilder getNettySSLContextAutoRefreshBuilder(String host) + throws PulsarClientException { + if (tlsEnabledWithKeyStore) { + AuthenticationDataProvider authData1 = conf.getAuthentication().getAuthData(host); + if (StringUtils.isBlank(conf.getTlsTrustStorePath())) { + throw new PulsarClientException("Failed to create TLS context, the tlsTrustStorePath" + + " need to be configured if useKeyStoreTls enabled"); + } + return nettySSLContextAutoRefreshBuilderMap.computeIfAbsent(host, + key -> new NettySSLContextAutoRefreshBuilder( conf.getSslProvider(), conf.isTlsAllowInsecureConnection(), conf.getTlsTrustStoreType(), @@ -96,64 +117,52 @@ public PulsarChannelInitializer(ClientConfigurationData conf, Supplier(() -> { - try { - SslProvider sslProvider = null; - if (conf.getSslProvider() != null) { - sslProvider = SslProvider.valueOf(conf.getSslProvider()); - } - - // Set client certificate if available - AuthenticationDataProvider authData = conf.getAuthentication().getAuthData(); - if (authData.hasDataForTls()) { - return authData.getTlsTrustStoreStream() == null - ? SecurityUtility.createNettySslContextForClient( - sslProvider, - conf.isTlsAllowInsecureConnection(), - conf.getTlsTrustCertsFilePath(), - authData.getTlsCertificates(), - authData.getTlsPrivateKey(), - conf.getTlsCiphers(), - conf.getTlsProtocols()) - : SecurityUtility.createNettySslContextForClient(sslProvider, - conf.isTlsAllowInsecureConnection(), - authData.getTlsTrustStoreStream(), - authData.getTlsCertificates(), authData.getTlsPrivateKey(), - conf.getTlsCiphers(), - conf.getTlsProtocols()); - } else { - return SecurityUtility.createNettySslContextForClient( - sslProvider, - conf.isTlsAllowInsecureConnection(), - conf.getTlsTrustCertsFilePath(), - conf.getTlsCertificateFilePath(), - conf.getTlsKeyFilePath(), - conf.getTlsCiphers(), - conf.getTlsProtocols()); - } - } catch (Exception e) { - throw new RuntimeException("Failed to create TLS context", e); - } - }, TLS_CERTIFICATE_CACHE_MILLIS, TimeUnit.MILLISECONDS); - } else { - sslContextSupplier = null; + authData1)); } + throw new PulsarClientException( + "Failed to create TLS context, the tlsEnabledWithKeyStore need to be true"); } - @Override - public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast("consolidation", new FlushConsolidationHandler(1024, true)); - - // Setup channel except for the SsHandler for TLS enabled connections - ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.getEncoder(tlsEnabled)); + private Supplier getSslContextSupplier(String host) { + return sslContextSupplierMap.computeIfAbsent(host, key -> new ObjectCache<>(() -> { + try { + SslProvider sslProvider = null; + if (conf.getSslProvider() != null) { + sslProvider = SslProvider.valueOf(conf.getSslProvider()); + } - ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder( - Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4)); - ChannelHandler clientCnx = clientCnxSupplier.get(); - ch.pipeline().addLast("handler", clientCnx); + // Set client certificate if available + AuthenticationDataProvider authData = conf.getAuthentication().getAuthData(host); + if (authData.hasDataForTls()) { + return authData.getTlsTrustStoreStream() == null + ? SecurityUtility.createNettySslContextForClient( + sslProvider, + conf.isTlsAllowInsecureConnection(), + conf.getTlsTrustCertsFilePath(), + authData.getTlsCertificates(), + authData.getTlsPrivateKey(), + conf.getTlsCiphers(), + conf.getTlsProtocols()) + : SecurityUtility.createNettySslContextForClient(sslProvider, + conf.isTlsAllowInsecureConnection(), + authData.getTlsTrustStoreStream(), + authData.getTlsCertificates(), authData.getTlsPrivateKey(), + conf.getTlsCiphers(), + conf.getTlsProtocols()); + } else { + return SecurityUtility.createNettySslContextForClient( + sslProvider, + conf.isTlsAllowInsecureConnection(), + conf.getTlsTrustCertsFilePath(), + conf.getTlsCertificateFilePath(), + conf.getTlsKeyFilePath(), + conf.getTlsCiphers(), + conf.getTlsProtocols()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to create TLS context", e); + } + }, TLS_CERTIFICATE_CACHE_MILLIS, TimeUnit.MILLISECONDS)); } /** @@ -175,9 +184,10 @@ CompletableFuture initTls(Channel ch, InetSocketAddress sniHost) { ch.eventLoop().execute(() -> { try { SslHandler handler = tlsEnabledWithKeyStore - ? new SslHandler(nettySSLContextAutoRefreshBuilder.get() - .createSSLEngine(sniHost.getHostString(), sniHost.getPort())) - : sslContextSupplier.get().newHandler(ch.alloc(), sniHost.getHostString(), sniHost.getPort()); + ? new SslHandler(getNettySSLContextAutoRefreshBuilder(sniHost.getHostName()).get() + .createSSLEngine(sniHost.getHostString(), sniHost.getPort())) + : getSslContextSupplier(sniHost.getHostName()).get() + .newHandler(ch.alloc(), sniHost.getHostString(), sniHost.getPort()); if (tlsHostnameVerificationEnabled) { SecurityUtility.configureSSLHandler(handler); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java index 0108b770249a0..eaed42a532afb 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java @@ -36,6 +36,7 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.web.AuthenticationFilter; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; @@ -263,7 +264,8 @@ protected HttpClient newHttpClient() { .loadCertificatesFromPemFile(config.getBrokerClientTrustCertsFilePath()); SSLContext sslCtx; - AuthenticationDataProvider authData = proxyClientAuthentication.getAuthData(); + AuthenticationDataProvider authData = + proxyClientAuthentication.getAuthData(URI.create(getWebServiceUrl()).getHost()); if (config.isBrokerClientTlsEnabledWithKeyStore()) { KeyStoreParams params = authData.hasDataForTls() ? authData.getTlsKeyStoreParams() : null; sslCtx = KeyStoreSSLContext.createClientSslContext( @@ -314,6 +316,19 @@ protected HttpClient newHttpClient() { return new JettyHttpClient(); } + private String getWebServiceUrl() throws PulsarServerException { + if (isBlank(brokerWebServiceUrl)) { + ServiceLookupData availableBroker = discoveryProvider.nextBroker(); + if (config.isTlsEnabledWithBroker()) { + return availableBroker.getWebServiceUrlTls(); + } else { + return availableBroker.getWebServiceUrl(); + } + } else { + return brokerWebServiceUrl; + } + } + @Override protected String rewriteTarget(HttpServletRequest request) { StringBuilder url = new StringBuilder(); @@ -329,17 +344,10 @@ protected String rewriteTarget(HttpServletRequest request) { if (isFunctionsRestRequest && !isBlank(functionWorkerWebServiceUrl)) { url.append(functionWorkerWebServiceUrl); - } else if (isBlank(brokerWebServiceUrl)) { + } else { try { - ServiceLookupData availableBroker = discoveryProvider.nextBroker(); - - if (config.isTlsEnabledWithBroker()) { - url.append(availableBroker.getWebServiceUrlTls()); - } else { - url.append(availableBroker.getWebServiceUrl()); - } - - if (LOG.isDebugEnabled()) { + url.append(getWebServiceUrl()); + if (LOG.isDebugEnabled() && isBlank(brokerWebServiceUrl)) { LOG.debug("[{}:{}] Selected active broker is {}", request.getRemoteAddr(), request.getRemotePort(), url); } @@ -348,8 +356,6 @@ protected String rewriteTarget(HttpServletRequest request) { request.getRemotePort(), e.getMessage(), e); return null; } - } else { - url.append(brokerWebServiceUrl); } if (url.lastIndexOf("/") == url.length() - 1) { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index 4678db82c6e55..ac3acf2d6475d 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -47,6 +47,8 @@ import io.netty.util.CharsetUtil; import java.net.InetSocketAddress; import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import lombok.Getter; import org.apache.pulsar.PulsarVersion; @@ -90,8 +92,8 @@ public class DirectProxyHandler { private final boolean tlsHostnameVerificationEnabled; private final boolean tlsEnabledWithKeyStore; final boolean tlsEnabledWithBroker; - private final SslContextAutoRefreshBuilder clientSslCtxRefresher; - private final NettySSLContextAutoRefreshBuilder clientSSLContextAutoRefreshBuilder; + private final Map> clientSslCtxRefresherMap; + private final Map clientSSLContextAutoRefreshBuilderMap; public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection) { this.service = service; @@ -106,54 +108,8 @@ public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection) this.tlsHostnameVerificationEnabled = service.getConfiguration().isTlsHostnameVerificationEnabled(); this.tlsEnabledWithKeyStore = service.getConfiguration().isTlsEnabledWithKeyStore(); this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask; - ProxyConfiguration config = service.getConfiguration(); - - if (tlsEnabledWithBroker) { - AuthenticationDataProvider authData = null; - - if (!isEmpty(config.getBrokerClientAuthenticationPlugin())) { - try { - authData = authentication.getAuthData(); - } catch (PulsarClientException e) { - throw new RuntimeException(e); - } - } - - if (tlsEnabledWithKeyStore) { - clientSSLContextAutoRefreshBuilder = new NettySSLContextAutoRefreshBuilder( - config.getBrokerClientSslProvider(), - config.isTlsAllowInsecureConnection(), - config.getBrokerClientTlsTrustStoreType(), - config.getBrokerClientTlsTrustStore(), - config.getBrokerClientTlsTrustStorePassword(), - config.getBrokerClientTlsKeyStoreType(), - config.getBrokerClientTlsKeyStore(), - config.getBrokerClientTlsKeyStorePassword(), - config.getBrokerClientTlsCiphers(), - config.getBrokerClientTlsProtocols(), - config.getTlsCertRefreshCheckDurationSec(), - authData); - clientSslCtxRefresher = null; - } else { - SslProvider sslProvider = null; - if (config.getBrokerClientSslProvider() != null) { - sslProvider = SslProvider.valueOf(config.getBrokerClientSslProvider()); - } - clientSslCtxRefresher = new NettyClientSslContextRefresher( - sslProvider, - config.isTlsAllowInsecureConnection(), - config.getBrokerClientTrustCertsFilePath(), - authData, - config.getBrokerClientTlsCiphers(), - config.getBrokerClientTlsProtocols(), - config.getTlsCertRefreshCheckDurationSec() - ); - clientSSLContextAutoRefreshBuilder = null; - } - } else { - clientSSLContextAutoRefreshBuilder = null; - clientSslCtxRefresher = null; - } + this.clientSslCtxRefresherMap = new ConcurrentHashMap<>(); + this.clientSSLContextAutoRefreshBuilderMap = new ConcurrentHashMap<>(); } public void connect(String brokerHostAndPort, InetSocketAddress targetBrokerAddress, int protocolVersion) { @@ -191,11 +147,52 @@ protected void initChannel(SocketChannel ch) { ch.pipeline().addLast("consolidation", new FlushConsolidationHandler(1024, true)); if (tlsEnabledWithBroker) { + AuthenticationDataProvider authData; + if (!isEmpty(config.getBrokerClientAuthenticationPlugin())) { + try { + authData = authentication.getAuthData(remoteHost); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } + } else { + authData = null; + } String host = targetBrokerAddress.getHostString(); int port = targetBrokerAddress.getPort(); - SslHandler handler = tlsEnabledWithKeyStore - ? new SslHandler(clientSSLContextAutoRefreshBuilder.get().createSSLEngine(host, port)) - : clientSslCtxRefresher.get().newHandler(ch.alloc(), host, port); + SslHandler handler; + if (tlsEnabledWithKeyStore) { + handler = new SslHandler(clientSSLContextAutoRefreshBuilderMap.computeIfAbsent(remoteHost, + key -> new NettySSLContextAutoRefreshBuilder( + config.getBrokerClientSslProvider(), + config.isTlsAllowInsecureConnection(), + config.getBrokerClientTlsTrustStoreType(), + config.getBrokerClientTlsTrustStore(), + config.getBrokerClientTlsTrustStorePassword(), + config.getBrokerClientTlsKeyStoreType(), + config.getBrokerClientTlsKeyStore(), + config.getBrokerClientTlsKeyStorePassword(), + config.getBrokerClientTlsCiphers(), + config.getBrokerClientTlsProtocols(), + config.getTlsCertRefreshCheckDurationSec(), + authData)).get().createSSLEngine(host, port)); + } else { + SslProvider sslProvider; + if (config.getBrokerClientSslProvider() != null) { + sslProvider = SslProvider.valueOf(config.getBrokerClientSslProvider()); + } else { + sslProvider = null; + } + handler = clientSslCtxRefresherMap.computeIfAbsent(remoteHost, + key -> new NettyClientSslContextRefresher( + sslProvider, + config.isTlsAllowInsecureConnection(), + config.getBrokerClientTrustCertsFilePath(), + authData, + config.getBrokerClientTlsCiphers(), + config.getBrokerClientTlsProtocols(), + config.getTlsCertRefreshCheckDurationSec() + )).get().newHandler(ch.alloc(), host, port); + } if (tlsHostnameVerificationEnabled) { SecurityUtility.configureSSLHandler(handler); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java index a4ebe25b428a3..8246e974f6f06 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java @@ -59,6 +59,7 @@ protected void setup() throws Exception { serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl()); serviceStarter.getConfig().setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls()); serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress()); + serviceStarter.getConfig().setBrokerWebServiceURLTLS(pulsar.getWebServiceAddressTls()); serviceStarter.getConfig().setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH); serviceStarter.getConfig().setServicePort(Optional.empty()); serviceStarter.getConfig().setServicePortTls(Optional.of(0)); @@ -76,6 +77,7 @@ protected void setup() throws Exception { protected void doInitConf() throws Exception { super.doInitConf(); this.conf.setBrokerServicePortTls(Optional.of(0)); + this.conf.setWebServicePortTls(Optional.of(0)); this.conf.setTlsCertificateFilePath(PROXY_CERT_FILE_PATH); this.conf.setTlsKeyFilePath(PROXY_KEY_FILE_PATH); } diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java index 596eb8d2c2807..0b5db8d78fbdf 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java @@ -263,7 +263,7 @@ public void runPerformanceTest(Arguments arguments) throws InterruptedException, Authentication auth = AuthenticationFactory.create(arguments.authPluginClassName, arguments.authParams); auth.start(); - AuthenticationDataProvider authData = auth.getAuthData(); + AuthenticationDataProvider authData = auth.getAuthData(produceUri.getHost()); if (authData.hasDataForHttp()) { for (Map.Entry kv : authData.getHttpHeaders()) { produceRequest.setHeader(kv.getKey(), kv.getValue());