Skip to content

Commit

Permalink
[fix][authentication] Store the original authentication data
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Mar 13, 2023
1 parent 240c22c commit bca557a
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -643,20 +643,12 @@ private void doAuthentication(AuthData clientData,
// 2. an authentication refresh, in which case we need to refresh authenticationData

String newAuthRole = authState.getAuthRole();
AuthenticationDataSource newAuthDataSource = authState.getAuthDataSource();

// Refresh the auth data.
this.authenticationData = authState.getAuthDataSource();
if (log.isDebugEnabled()) {
log.debug("[{}] Auth data refreshed for role={}", remoteAddress, this.authRole);
}

// Set the auth data and auth role
if (!useOriginalAuthState) {
this.authRole = newAuthRole;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}",
remoteAddress, authMethod, this.authRole, originalPrincipal);
this.authenticationData = newAuthDataSource;
}

if (state != State.Connected) {
Expand All @@ -676,7 +668,18 @@ private void doAuthentication(AuthData clientData,
maybeScheduleAuthenticationCredentialsRefresh();
}
completeConnect(clientProtocolVersion, clientVersion);
if (log.isDebugEnabled()) {
log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}",
remoteAddress, authMethod, this.authRole, originalPrincipal);
}
} else {
// Refresh the auth data
if (!useOriginalAuthState) {
this.authenticationData = newAuthDataSource;
} else {
this.originalAuthData = newAuthDataSource;
}

// If the connection was already ready, it means we're doing a refresh
if (!StringUtils.isEmpty(authRole)) {
if (!authRole.equals(newAuthRole)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,15 @@
*/
package org.apache.pulsar.broker.auth;

import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.common.api.AuthData;

import javax.naming.AuthenticationException;

import static java.nio.charset.StandardCharsets.UTF_8;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;

/**
* Class to use when verifying the behavior around expired authentication data because it will always return
* true when isExpired is called.
*/
public class MockAlwaysExpiredAuthenticationState implements AuthenticationState {
final MockAlwaysExpiredAuthenticationProvider provider;
AuthenticationDataSource authenticationDataSource;
volatile String authRole;

MockAlwaysExpiredAuthenticationState(MockAlwaysExpiredAuthenticationProvider provider) {
this.provider = provider;
}


@Override
public String getAuthRole() throws AuthenticationException {
if (authRole == null) {
throw new AuthenticationException("Must authenticate first.");
}
return authRole;
}

@Override
public AuthData authenticate(AuthData authData) throws AuthenticationException {
authenticationDataSource = new AuthenticationDataCommand(new String(authData.getBytes(), UTF_8));
authRole = provider.authenticate(authenticationDataSource);
return null;
}

@Override
public AuthenticationDataSource getAuthDataSource() {
return authenticationDataSource;
}

@Override
public boolean isComplete() {
return authRole != null;
public class MockAlwaysExpiredAuthenticationState extends MockMutableAuthenticationState {
MockAlwaysExpiredAuthenticationState(AuthenticationProvider provider) {
super(provider);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.auth;

import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.common.api.AuthData;
import javax.net.ssl.SSLSession;
import java.net.SocketAddress;

public class MockMutableAuthenticationProvider extends MockAuthenticationProvider {
public AuthenticationState newAuthState(AuthData authData,
SocketAddress remoteAddress,
SSLSession sslSession) {
return new MockMutableAuthenticationState(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.auth;

import static java.nio.charset.StandardCharsets.UTF_8;
import javax.naming.AuthenticationException;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.common.api.AuthData;

// MockMutableAuthenticationState always update the authentication data source and auth role.
public class MockMutableAuthenticationState implements AuthenticationState {
final AuthenticationProvider provider;
AuthenticationDataSource authenticationDataSource;
volatile String authRole;

MockMutableAuthenticationState(AuthenticationProvider provider) {
this.provider = provider;
}

@Override
public String getAuthRole() throws AuthenticationException {
if (authRole == null) {
throw new AuthenticationException("Must authenticate first.");
}
return authRole;
}

/**
* This authentication is always single stage, so it returns immediately
*/
@Override
public AuthData authenticate(AuthData authData) throws AuthenticationException {
authenticationDataSource = new AuthenticationDataCommand(new String(authData.getBytes(), UTF_8));
authRole = provider.authenticate(authenticationDataSource);
return null;
}

@Override
public AuthenticationDataSource getAuthDataSource() {
return authenticationDataSource;
}

@Override
public boolean isComplete() {
return true;
}

@Override
public boolean isExpired() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockAlwaysExpiredAuthenticationProvider;
import org.apache.pulsar.broker.auth.MockMutableAuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
Expand Down Expand Up @@ -1040,6 +1041,54 @@ public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker() throws Exc
}));
}

@Test
public void testRefreshOriginalPrincipalWithAuthDataForwardedFromProxy() throws Exception {
AuthenticationService authenticationService = mock(AuthenticationService.class);
AuthenticationProvider authenticationProvider = new MockMutableAuthenticationProvider();
String authMethodName = authenticationProvider.getAuthMethodName();
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
svcConfig.setAuthenticationEnabled(true);
svcConfig.setAuthenticateOriginalAuthData(true);
svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));

resetChannel();
assertTrue(channel.isActive());
assertEquals(serverCnx.getState(), State.Start);

String proxyRole = "pass.proxy";
String clientRole = "pass.client";
ByteBuf connect = Commands.newConnect(authMethodName, proxyRole, "test", "localhost",
clientRole, clientRole, authMethodName);
channel.writeInbound(connect);
Object connectResponse = getResponse();
assertTrue(connectResponse instanceof CommandConnected);
assertEquals(serverCnx.getOriginalAuthData().getCommandData(), clientRole);
assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), clientRole);
assertEquals(serverCnx.getOriginalPrincipal(), clientRole);
assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole);
assertEquals(serverCnx.getAuthRole(), proxyRole);
assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole);

// Request refreshing the original auth.
// Expected:
// 1. Original role and original data equals to "pass.RefreshOriginAuthData".
// 2. The broker disconnects the client, because the new role doesn't equal the old role.
String newClientRole = "pass.RefreshOriginAuthData";
ByteBuf refreshAuth = Commands.newAuthResponse(authMethodName,
AuthData.of(newClientRole.getBytes(StandardCharsets.UTF_8)), 0, "test");
channel.writeInbound(refreshAuth);

assertEquals(serverCnx.getOriginalAuthData().getCommandData(), newClientRole);
assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), newClientRole);
assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole);
assertEquals(serverCnx.getAuthRole(), proxyRole);
assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole);

assertFalse(channel.isOpen());
assertFalse(channel.isActive());
}

@Test(timeOut = 30000)
public void testProducerCommand() throws Exception {
resetChannel();
Expand Down

0 comments on commit bca557a

Please sign in to comment.