Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[broker] Enhance retention-related restful API authority
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao committed Feb 28, 2024
1 parent 825e997 commit 921fb76
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2435,7 +2435,8 @@ public void getRetention(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.READ)
.thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetRetention(applied, isGlobal))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
Expand All @@ -2462,7 +2463,8 @@ public void setRetention(@Suspended final AsyncResponse asyncResponse,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Retention policies for the specified topic") RetentionPolicies retention) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.WRITE)
.thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetRetention(retention, isGlobal))
.thenRun(() -> {
try {
Expand Down Expand Up @@ -2498,7 +2500,8 @@ public void removeRetention(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.WRITE)
.thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveRetention(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove retention: namespace={}, topic={}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,36 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.security.tls;
package org.apache.pulsar.security;

import static org.apache.pulsar.utils.ResourceUtils.getAbsolutePath;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import javax.crypto.SecretKey;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;



public abstract class MockedPulsarStandalone implements AutoCloseable {
Expand All @@ -60,6 +71,50 @@ public abstract class MockedPulsarStandalone implements AutoCloseable {
serviceConfiguration.setExposeBundlesMetricsInPrometheus(true);
}


protected static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);

private static final String BROKER_INTERNAL_CLIENT_SUBJECT = "broker_internal";
private static final String BROKER_INTERNAL_CLIENT_TOKEN = Jwts.builder()
.claim("sub", BROKER_INTERNAL_CLIENT_SUBJECT).signWith(SECRET_KEY).compact();
protected static final String SUPER_USER_SUBJECT = "super-user";
protected static final String SUPER_USER_TOKEN = Jwts.builder()
.claim("sub", SUPER_USER_SUBJECT).signWith(SECRET_KEY).compact();
protected static final String NOBODY_SUBJECT = "nobody";
protected static final String NOBODY_TOKEN = Jwts.builder()
.claim("sub", NOBODY_SUBJECT).signWith(SECRET_KEY).compact();


@SneakyThrows
protected void loadTokenAuthentication() {
serviceConfiguration.setAuthenticationEnabled(true);
serviceConfiguration.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
// internal client
serviceConfiguration.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
final Map<String, String> brokerClientAuthParams = new HashMap<>();
brokerClientAuthParams.put("token", BROKER_INTERNAL_CLIENT_TOKEN);
final String brokerClientAuthParamStr = MAPPER.writeValueAsString(brokerClientAuthParams);
serviceConfiguration.setBrokerClientAuthenticationParameters(brokerClientAuthParamStr);

Properties properties = serviceConfiguration.getProperties();
if (properties == null) {
properties = new Properties();
serviceConfiguration.setProperties(properties);
}
properties.put("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));

}



protected void loadDefaultAuthorization() {
serviceConfiguration.setAuthorizationEnabled(true);
serviceConfiguration.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
serviceConfiguration.setSuperUserRoles(Set.of(SUPER_USER_SUBJECT, BROKER_INTERNAL_CLIENT_SUBJECT));
}



@SneakyThrows
protected void loadECTlsCertificateWithFile() {
serviceConfiguration.setTlsEnabled(true);
Expand Down Expand Up @@ -176,4 +231,7 @@ public void close() throws Exception {
protected static final String TLS_EC_KS_TRUSTED_STORE =
getAbsolutePath("certificate-authority/ec/jks/ca.truststore.jks");
protected static final String TLS_EC_KS_TRUSTED_STORE_PASS = "rootpw";


private static final ObjectMapper MAPPER = ObjectMapperFactory.getMapper().getObjectMapper();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.apache.pulsar.security.authz;

import io.jsonwebtoken.Jwts;
import java.util.Set;
import java.util.UUID;
import lombok.SneakyThrows;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.security.MockedPulsarStandalone;
import org.junit.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public final class DefaultAuthZWithPublicAPITest extends MockedPulsarStandalone {

private static final String USER1_SUBJECT = "user1";
private static final String USER1_TOKEN = Jwts.builder()
.claim("sub", USER1_SUBJECT).signWith(SECRET_KEY).compact();

private PulsarAdmin user1Admin;

private PulsarAdmin superUserAdmin;
@SneakyThrows
@BeforeClass
public void before() {
loadTokenAuthentication();
loadDefaultAuthorization();
start();
this.user1Admin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(USER1_TOKEN))
.build();
this.superUserAdmin =PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(SUPER_USER_TOKEN))
.build();
}


@SneakyThrows
@AfterClass
public void after() {
close();
}



@SneakyThrows
@Test
public void testConsumeWithTopicPolicyRetention() {
final String random = UUID.randomUUID().toString();
final String topic = "persistent://public/default/" + random;

// grant consume permission to user 1, it can lookup and consume messages
superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
USER1_SUBJECT, Set.of(AuthAction.consume));
superUserAdmin.topics().createNonPartitionedTopic(topic);

// the user 1 shouldn't touch retention policy
try {
user1Admin.topicPolicies().getRetention(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}

try {
final RetentionPolicies policies = new RetentionPolicies(1, 1);
user1Admin.topicPolicies().setRetention(topic, policies);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}

try {
user1Admin.topicPolicies().removeRetention(topic);
Assert.fail("unexpected behaviour");
} catch (PulsarAdminException ex) {
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.UUID;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.pulsar.security.tls.MockedPulsarStandalone;
import org.apache.pulsar.security.MockedPulsarStandalone;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.pulsar.security.tls.MockedPulsarStandalone;
import org.apache.pulsar.security.MockedPulsarStandalone;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand All @@ -35,10 +39,6 @@
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;


@Test
Expand Down

0 comments on commit 921fb76

Please sign in to comment.