diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 1048ed4bbb497..6d84e22f162a1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -654,24 +654,14 @@ private void doAuthentication(AuthData clientData, String newAuthRole = authState.getAuthRole(); - // Refresh the auth data. - this.authenticationData = authState.getAuthDataSource(); - if (log.isDebugEnabled()) { - log.debug("[{}] Auth data refreshed for role={}", remoteAddress, this.authRole); - } - - if (!useOriginalAuthState) { - this.authRole = newAuthRole; - } - - if (log.isDebugEnabled()) { - log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}", - remoteAddress, authMethod, this.authRole, originalPrincipal); - } - + AuthenticationDataSource newAuthDataSource = authState.getAuthDataSource(); if (state != State.Connected) { // First time authentication is done if (service.isAuthenticationEnabled()) { + if (!useOriginalAuthState) { + this.authRole = newAuthRole; + this.authenticationData = newAuthDataSource; + } if (service.isAuthorizationEnabled()) { if (!service.getAuthorizationService() .isValidOriginalPrincipal(this.authRole, originalPrincipal, remoteAddress, false)) { @@ -686,7 +676,16 @@ private void doAuthentication(AuthData clientData, maybeScheduleAuthenticationCredentialsRefresh(); } completeConnect(clientProtocolVersion, clientVersion); + if (log.isDebugEnabled()) { + log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}", + remoteAddress, authMethod, this.authRole, originalPrincipal); + } } else { + if (!useOriginalAuthState) { + this.authenticationData = newAuthDataSource; + } else { + this.originalAuthData = newAuthDataSource; + } // If the connection was already ready, it means we're doing a refresh if (!StringUtils.isEmpty(authRole)) { if (!authRole.equals(newAuthRole)) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAlwaysExpiredAuthenticationState.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAlwaysExpiredAuthenticationState.java index 3fc7e321b9d08..08d291b849a5b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAlwaysExpiredAuthenticationState.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAlwaysExpiredAuthenticationState.java @@ -18,56 +18,21 @@ */ package org.apache.pulsar.broker.auth; -import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; -import org.apache.pulsar.broker.authentication.AuthenticationDataSource; -import org.apache.pulsar.broker.authentication.AuthenticationState; -import org.apache.pulsar.common.api.AuthData; - -import javax.naming.AuthenticationException; - -import static java.nio.charset.StandardCharsets.UTF_8; +import org.apache.pulsar.broker.authentication.AuthenticationProvider; /** * Class to use when verifying the behavior around expired authentication data because it will always return * true when isExpired is called. */ -public class MockAlwaysExpiredAuthenticationState implements AuthenticationState { - final MockAlwaysExpiredAuthenticationProvider provider; - AuthenticationDataSource authenticationDataSource; - volatile String authRole; - - MockAlwaysExpiredAuthenticationState(MockAlwaysExpiredAuthenticationProvider provider) { - this.provider = provider; - } - - - @Override - public String getAuthRole() throws AuthenticationException { - if (authRole == null) { - throw new AuthenticationException("Must authenticate first."); - } - return authRole; - } +public class MockAlwaysExpiredAuthenticationState extends MockMutableAuthenticationState { - @Override - public AuthData authenticate(AuthData authData) throws AuthenticationException { - authenticationDataSource = new AuthenticationDataCommand(new String(authData.getBytes(), UTF_8)); - authRole = provider.authenticate(authenticationDataSource); - return null; - } - - @Override - public AuthenticationDataSource getAuthDataSource() { - return authenticationDataSource; - } - - @Override - public boolean isComplete() { - return authRole != null; + MockAlwaysExpiredAuthenticationState(AuthenticationProvider provider) { + super(provider); } @Override public boolean isExpired() { return true; } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMutableAuthenticationProvider.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMutableAuthenticationProvider.java new file mode 100644 index 0000000000000..01d56f891e27e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMutableAuthenticationProvider.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.auth; + +import org.apache.pulsar.broker.authentication.AuthenticationState; +import org.apache.pulsar.common.api.AuthData; +import javax.net.ssl.SSLSession; +import java.net.SocketAddress; + +public class MockMutableAuthenticationProvider extends MockAuthenticationProvider { + public AuthenticationState newAuthState(AuthData authData, + SocketAddress remoteAddress, + SSLSession sslSession) { + return new MockMutableAuthenticationState(this); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMutableAuthenticationState.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMutableAuthenticationState.java new file mode 100644 index 0000000000000..cb97d26ca2b26 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMutableAuthenticationState.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.auth; + +import static java.nio.charset.StandardCharsets.UTF_8; +import javax.naming.AuthenticationException; +import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationProvider; +import org.apache.pulsar.broker.authentication.AuthenticationState; +import org.apache.pulsar.common.api.AuthData; + +// MockMutableAuthenticationState always update the authentication data source and auth role. +public class MockMutableAuthenticationState implements AuthenticationState { + + final AuthenticationProvider provider; + AuthenticationDataSource authenticationDataSource; + volatile String authRole; + + MockMutableAuthenticationState(AuthenticationProvider provider) { + this.provider = provider; + } + + @Override + public String getAuthRole() throws AuthenticationException { + if (authRole == null) { + throw new AuthenticationException("Must authenticate first."); + } + return authRole; + } + + @Override + public AuthData authenticate(AuthData authData) throws AuthenticationException { + authenticationDataSource = new AuthenticationDataCommand(new String(authData.getBytes(), UTF_8)); + authRole = provider.authenticate(authenticationDataSource); + return null; + } + + @Override + public AuthenticationDataSource getAuthDataSource() { + return authenticationDataSource; + } + + @Override + public boolean isComplete() { + return authRole != null; + } + + @Override + public boolean isExpired() { + return false; + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index 1aa3c2c66046b..824af6b71e317 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -248,13 +248,8 @@ protected void setup() throws Exception { .brokerServiceUrl(pulsar4.getBrokerServiceUrl()) .brokerServiceUrlTls(pulsar4.getBrokerServiceUrlTls()) .brokerClientTlsEnabled(true) - .brokerClientCertificateFilePath(clientCertFilePath) - .brokerClientKeyFilePath(clientKeyFilePath) .brokerClientTrustCertsFilePath(caCertFilePath) .brokerClientTlsEnabledWithKeyStore(tlsWithKeyStore) - .brokerClientTlsKeyStore(clientKeyStorePath) - .brokerClientTlsKeyStorePassword(keyStorePassword) - .brokerClientTlsKeyStoreType(keyStoreType) .brokerClientTlsTrustStore(clientTrustStorePath) .brokerClientTlsTrustStorePassword(keyStorePassword) .brokerClientTlsTrustStoreType(keyStoreType) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 09428db2b1cb2..4a4628bece44f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -87,6 +87,7 @@ import org.apache.pulsar.broker.TransactionMetadataStoreService; import org.apache.pulsar.broker.auth.MockAlwaysExpiredAuthenticationProvider; import org.apache.pulsar.broker.auth.MockAuthorizationProvider; +import org.apache.pulsar.broker.auth.MockMutableAuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; import org.apache.pulsar.broker.auth.MockAuthenticationProvider; import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider; @@ -123,7 +124,6 @@ import org.apache.pulsar.common.api.proto.CommandProducerSuccess; import org.apache.pulsar.common.api.proto.CommandSendError; import org.apache.pulsar.common.api.proto.CommandSendReceipt; -import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.CommandSuccess; @@ -1368,6 +1368,54 @@ public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker() throws Exc })); } + @Test + public void testRefreshOriginalPrincipalWithAuthDataForwardedFromProxy() throws Exception { + AuthenticationService authenticationService = mock(AuthenticationService.class); + AuthenticationProvider authenticationProvider = new MockMutableAuthenticationProvider(); + String authMethodName = authenticationProvider.getAuthMethodName(); + when(brokerService.getAuthenticationService()).thenReturn(authenticationService); + when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); + svcConfig.setAuthenticationEnabled(true); + svcConfig.setAuthenticateOriginalAuthData(true); + svcConfig.setProxyRoles(Collections.singleton("pass.proxy")); + + resetChannel(); + assertTrue(channel.isActive()); + assertEquals(serverCnx.getState(), State.Start); + + String proxyRole = "pass.proxy"; + String clientRole = "pass.client"; + ByteBuf connect = Commands.newConnect(authMethodName, proxyRole, "test", "localhost", + clientRole, clientRole, authMethodName); + channel.writeInbound(connect); + Object connectResponse = getResponse(); + assertTrue(connectResponse instanceof CommandConnected); + assertEquals(serverCnx.getOriginalAuthData().getCommandData(), clientRole); + assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), clientRole); + assertEquals(serverCnx.getOriginalPrincipal(), clientRole); + assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole); + assertEquals(serverCnx.getAuthRole(), proxyRole); + assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole); + + // Request refreshing the original auth. + // Expected: + // 1. Original role and original data equals to "pass.RefreshOriginAuthData". + // 2. The broker disconnects the client, because the new role doesn't equal the old role. + String newClientRole = "pass.RefreshOriginAuthData"; + ByteBuf refreshAuth = Commands.newAuthResponse(authMethodName, + AuthData.of(newClientRole.getBytes(StandardCharsets.UTF_8)), 0, "test"); + channel.writeInbound(refreshAuth); + + assertEquals(serverCnx.getOriginalAuthData().getCommandData(), newClientRole); + assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), newClientRole); + assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole); + assertEquals(serverCnx.getAuthRole(), proxyRole); + assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole); + + assertFalse(channel.isOpen()); + assertFalse(channel.isActive()); + } + @Test(timeOut = 30000) public void testProducerCommand() throws Exception { resetChannel();