Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #660] Add namespace in java client #661

Merged
merged 2 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,19 @@ public class ClientConfiguration {
private final SessionCredentialsProvider sessionCredentialsProvider;
private final Duration requestTimeout;
private final boolean sslEnabled;
private final String namespace;

/**
* The caller is supposed to have validated the arguments and handled throwing exceptions or
* logging warnings already, so we avoid repeating args check here.
*/
ClientConfiguration(String endpoints, SessionCredentialsProvider sessionCredentialsProvider,
Duration requestTimeout, boolean sslEnabled) {
Duration requestTimeout, boolean sslEnabled, String namespace) {
this.endpoints = endpoints;
this.sessionCredentialsProvider = sessionCredentialsProvider;
this.requestTimeout = requestTimeout;
this.sslEnabled = sslEnabled;
this.namespace = namespace;
}

public static ClientConfigurationBuilder newBuilder() {
Expand All @@ -60,4 +62,8 @@ public Duration getRequestTimeout() {
public boolean isSslEnabled() {
return sslEnabled;
}

public String getNamespace() {
return namespace;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class ClientConfigurationBuilder {
private SessionCredentialsProvider sessionCredentialsProvider = null;
private Duration requestTimeout = Duration.ofSeconds(3);
private boolean sslEnabled = true;
private String namespace = "";

/**
* Configure the access point with which the SDK should communicate.
Expand Down Expand Up @@ -82,6 +83,16 @@ public ClientConfigurationBuilder enableSsl(boolean sslEnabled) {
return this;
}

/**
* Configure namespace for client
* @param namespace namespace
* @return The {@link ClientConfigurationBuilder} instance, to allow for method chaining.
*/
public ClientConfigurationBuilder setNamespace(String namespace) {
this.namespace = checkNotNull(namespace, "namespace should not be null");
return this;
}

/**
* Finalize the build of {@link ClientConfiguration}.
*
Expand All @@ -90,6 +101,6 @@ public ClientConfigurationBuilder enableSsl(boolean sslEnabled) {
public ClientConfiguration build() {
checkNotNull(endpoints, "endpoints should not be null");
checkNotNull(requestTimeout, "requestTimeout should not be null");
return new ClientConfiguration(endpoints, sessionCredentialsProvider, requestTimeout, sslEnabled);
return new ClientConfiguration(endpoints, sessionCredentialsProvider, requestTimeout, sslEnabled, namespace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,10 @@ public void onFailure(Throwable t) {
}

protected ListenableFuture<TopicRouteData> fetchTopicRoute0(final String topic) {
Resource topicResource = Resource.newBuilder().setName(topic).build();
Resource topicResource = Resource.newBuilder()
.setResourceNamespace(clientConfiguration.getNamespace())
.setName(topic)
.build();
final QueryRouteRequest request = QueryRouteRequest.newBuilder().setTopic(topicResource)
.setEndpoints(endpoints.toProtobuf()).build();
final RpcFuture<QueryRouteRequest, QueryRouteResponse> future =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,26 @@
import org.apache.rocketmq.client.java.route.Endpoints;

public abstract class Settings {
protected final String namespace;
protected final ClientId clientId;
protected final ClientType clientType;
protected final Endpoints accessPoint;
protected volatile RetryPolicy retryPolicy;
protected final Duration requestTimeout;

public Settings(ClientId clientId, ClientType clientType, Endpoints accessPoint, RetryPolicy retryPolicy,
Duration requestTimeout) {
public Settings(String namespace, ClientId clientId, ClientType clientType, Endpoints accessPoint,
RetryPolicy retryPolicy, Duration requestTimeout) {
this.namespace = namespace;
this.clientId = clientId;
this.clientType = clientType;
this.accessPoint = accessPoint;
this.retryPolicy = retryPolicy;
this.requestTimeout = requestTimeout;
}

public Settings(ClientId clientId, ClientType clientType, Endpoints accessPoint, Duration requestTimeout) {
this(clientId, clientType, accessPoint, null, requestTimeout);
public Settings(String namespace, ClientId clientId, ClientType clientType, Endpoints accessPoint,
Duration requestTimeout) {
this(namespace, clientId, clientType, accessPoint, null, requestTimeout);
}

public abstract apache.rocketmq.v2.Settings toProtobuf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ protected ListenableFuture<ReceiveMessageResult> receiveMessage(ReceiveMessageRe
}

private AckMessageRequest wrapAckMessageRequest(MessageViewImpl messageView) {
final Resource topicResource = Resource.newBuilder().setName(messageView.getTopic()).build();
final Resource topicResource = Resource.newBuilder()
.setResourceNamespace(clientConfiguration.getNamespace())
aaron-ai marked this conversation as resolved.
Show resolved Hide resolved
.setName(messageView.getTopic())
.build();
final AckMessageEntry entry = AckMessageEntry.newBuilder()
.setMessageId(messageView.getMessageId().toString())
.setReceiptHandle(messageView.getReceiptHandle())
Expand All @@ -134,7 +137,9 @@ private AckMessageRequest wrapAckMessageRequest(MessageViewImpl messageView) {

private ChangeInvisibleDurationRequest wrapChangeInvisibleDuration(MessageViewImpl messageView,
Duration invisibleDuration) {
final Resource topicResource = Resource.newBuilder().setName(messageView.getTopic()).build();
final Resource topicResource = Resource.newBuilder()
.setResourceNamespace(clientConfiguration.getNamespace())
.setName(messageView.getTopic()).build();
return ChangeInvisibleDurationRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource)
.setReceiptHandle(messageView.getReceiptHandle())
.setInvisibleDuration(Durations.fromNanos(invisibleDuration.toNanos()))
Expand Down Expand Up @@ -219,7 +224,10 @@ public void onFailure(Throwable t) {
}

protected Resource getProtobufGroup() {
return Resource.newBuilder().setName(consumerGroup).build();
return Resource.newBuilder()
.setResourceNamespace(clientConfiguration.getNamespace())
.setName(consumerGroup)
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ public PushConsumerImpl(ClientConfiguration clientConfiguration, String consumer
int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount) {
super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
this.clientConfiguration = clientConfiguration;
Resource groupResource = new Resource(consumerGroup);
this.pushSubscriptionSettings = new PushSubscriptionSettings(clientId, endpoints, groupResource,
clientConfiguration.getRequestTimeout(), subscriptionExpressions);
Resource groupResource = new Resource(clientConfiguration.getNamespace(), consumerGroup);
this.pushSubscriptionSettings = new PushSubscriptionSettings(clientConfiguration.getNamespace(), clientId,
endpoints, groupResource, clientConfiguration.getRequestTimeout(), subscriptionExpressions);
this.consumerGroup = consumerGroup;
this.subscriptionExpressions = subscriptionExpressions;
this.cacheAssignments = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -261,7 +261,10 @@ private ListenableFuture<Endpoints> pickEndpointsToQueryAssignments(String topic
}

private QueryAssignmentRequest wrapQueryAssignmentRequest(String topic) {
apache.rocketmq.v2.Resource topicResource = apache.rocketmq.v2.Resource.newBuilder().setName(topic).build();
apache.rocketmq.v2.Resource topicResource = apache.rocketmq.v2.Resource.newBuilder()
.setResourceNamespace(clientConfiguration.getNamespace())
.setName(topic)
.build();
return QueryAssignmentRequest.newBuilder().setTopic(topicResource)
.setEndpoints(endpoints.toProtobuf()).setGroup(getProtobufGroup()).build();
}
Expand Down Expand Up @@ -500,7 +503,10 @@ public void onFailure(Throwable t) {
private ForwardMessageToDeadLetterQueueRequest wrapForwardMessageToDeadLetterQueueRequest(
MessageViewImpl messageView) {
final apache.rocketmq.v2.Resource topicResource =
apache.rocketmq.v2.Resource.newBuilder().setName(messageView.getTopic()).build();
apache.rocketmq.v2.Resource.newBuilder()
.setResourceNamespace(clientConfiguration.getNamespace())
.setName(messageView.getTopic())
.build();
return ForwardMessageToDeadLetterQueueRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource)
.setReceiptHandle(messageView.getReceiptHandle())
.setMessageId(messageView.getMessageId().toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ public class PushSubscriptionSettings extends Settings {
private volatile int receiveBatchSize = 32;
private volatile Duration longPollingTimeout = Duration.ofSeconds(30);

public PushSubscriptionSettings(ClientId clientId, Endpoints endpoints, Resource group,
public PushSubscriptionSettings(String namespace, ClientId clientId, Endpoints endpoints, Resource group,
Duration requestTimeout, Map<String, FilterExpression> subscriptionExpression) {
super(clientId, ClientType.PUSH_CONSUMER, endpoints, requestTimeout);
super(namespace, clientId, ClientType.PUSH_CONSUMER, endpoints, requestTimeout);
this.group = group;
this.subscriptionExpressions = subscriptionExpression;
}
Expand All @@ -75,7 +75,10 @@ public apache.rocketmq.v2.Settings toProtobuf() {
for (Map.Entry<String, FilterExpression> entry : subscriptionExpressions.entrySet()) {
final FilterExpression filterExpression = entry.getValue();
apache.rocketmq.v2.Resource topic =
apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build();
apache.rocketmq.v2.Resource.newBuilder()
.setResourceNamespace(namespace)
.setName(entry.getKey())
.build();
final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder =
apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
final FilterExpressionType type = filterExpression.getFilterExpressionType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
public SimpleConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup, Duration awaitDuration,
Map<String, FilterExpression> subscriptionExpressions) {
super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
Resource groupResource = new Resource(consumerGroup);
this.simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientId, endpoints,
groupResource, clientConfiguration.getRequestTimeout(), awaitDuration, subscriptionExpressions);
Resource groupResource = new Resource(clientConfiguration.getNamespace(), consumerGroup);
this.simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientConfiguration.getNamespace(), clientId,
endpoints, groupResource, clientConfiguration.getRequestTimeout(), awaitDuration, subscriptionExpressions);
this.consumerGroup = consumerGroup;
this.awaitDuration = awaitDuration;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public class SimpleSubscriptionSettings extends Settings {
private final Duration longPollingTimeout;
private final Map<String, FilterExpression> subscriptionExpressions;

public SimpleSubscriptionSettings(ClientId clientId, Endpoints endpoints, Resource group,
public SimpleSubscriptionSettings(String namespace, ClientId clientId, Endpoints endpoints, Resource group,
Duration requestTimeout, Duration longPollingTimeout, Map<String, FilterExpression> subscriptionExpression) {
super(clientId, ClientType.SIMPLE_CONSUMER, endpoints, requestTimeout);
super(namespace, clientId, ClientType.SIMPLE_CONSUMER, endpoints, requestTimeout);
this.group = group;
this.subscriptionExpressions = subscriptionExpression;
this.longPollingTimeout = longPollingTimeout;
Expand All @@ -59,7 +59,9 @@ public apache.rocketmq.v2.Settings toProtobuf() {
for (Map.Entry<String, FilterExpression> entry : subscriptionExpressions.entrySet()) {
final FilterExpression filterExpression = entry.getValue();
apache.rocketmq.v2.Resource topic = apache.rocketmq.v2.Resource.newBuilder()
.setName(entry.getKey()).build();
.setResourceNamespace(namespace)
.setName(entry.getKey())
.build();
final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder =
apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
final FilterExpressionType type = filterExpression.getFilterExpressionType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ class ProducerImpl extends ClientImpl implements Producer {
TransactionChecker checker) {
super(clientConfiguration, topics);
ExponentialBackoffRetryPolicy retryPolicy = ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts);
this.publishingSettings = new PublishingSettings(clientId, endpoints, retryPolicy,
clientConfiguration.getRequestTimeout(), topics);
this.publishingSettings = new PublishingSettings(clientConfiguration.getNamespace(), clientId, endpoints,
retryPolicy, clientConfiguration.getRequestTimeout(), topics);
this.checker = checker;
this.publishingRouteDataCache = new ConcurrentHashMap<>();
}
Expand Down Expand Up @@ -259,7 +259,10 @@ public void endTransaction(Endpoints endpoints, GeneralMessage generalMessage, M
String transactionId, final TransactionResolution resolution) throws ClientException {
final EndTransactionRequest.Builder builder = EndTransactionRequest.newBuilder()
.setMessageId(messageId.toString()).setTransactionId(transactionId)
.setTopic(apache.rocketmq.v2.Resource.newBuilder().setName(generalMessage.getTopic()).build());
.setTopic(apache.rocketmq.v2.Resource.newBuilder()
.setResourceNamespace(clientConfiguration.getNamespace())
.setName(generalMessage.getTopic())
.build());
switch (resolution) {
case COMMIT:
builder.setResolution(apache.rocketmq.v2.TransactionResolution.COMMIT);
Expand Down Expand Up @@ -415,7 +418,8 @@ private ListenableFuture<List<SendReceiptImpl>> send(List<Message> messages, boo
*/
private SendMessageRequest wrapSendMessageRequest(List<PublishingMessageImpl> pubMessages, MessageQueueImpl mq) {
final List<apache.rocketmq.v2.Message> messages = pubMessages.stream()
.map(publishingMessage -> publishingMessage.toProtobuf(mq)).collect(Collectors.toList());
.map(publishingMessage -> publishingMessage.toProtobuf(clientConfiguration.getNamespace(), mq))
.collect(Collectors.toList());
return SendMessageRequest.newBuilder().addAllMessages(messages).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public class PublishingSettings extends Settings {
private volatile int maxBodySizeBytes = 4 * 1024 * 1024;
private volatile boolean validateMessageType = true;

public PublishingSettings(ClientId clientId, Endpoints accessPoint, ExponentialBackoffRetryPolicy retryPolicy,
Duration requestTimeout, Set<String> topics) {
super(clientId, ClientType.PRODUCER, accessPoint, retryPolicy, requestTimeout);
public PublishingSettings(String namespace, ClientId clientId, Endpoints accessPoint,
ExponentialBackoffRetryPolicy retryPolicy, Duration requestTimeout, Set<String> topics) {
super(namespace, clientId, ClientType.PRODUCER, accessPoint, retryPolicy, requestTimeout);
this.topics = topics;
}

Expand All @@ -62,8 +62,13 @@ public boolean isValidateMessageType() {
@Override
public apache.rocketmq.v2.Settings toProtobuf() {
final Publishing publishing = Publishing.newBuilder()
.addAllTopics(topics.stream().map(name -> Resource.newBuilder().setName(name).build())
.collect(Collectors.toList())).setValidateMessageType(validateMessageType).build();
.addAllTopics(topics.stream().map(name -> Resource.newBuilder()
.setResourceNamespace(namespace)
.setName(name)
.build())
.collect(Collectors.toList()))
.setValidateMessageType(validateMessageType)
.build();
final apache.rocketmq.v2.Settings.Builder builder = apache.rocketmq.v2.Settings.newBuilder()
.setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
.setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setPublishing(publishing);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public MessageType getMessageType() {
* <p>This method should be invoked before each message sending, because the born time is reset before each
* invocation, which means that it should not be invoked ahead of time.
*/
public apache.rocketmq.v2.Message toProtobuf(MessageQueueImpl mq) {
public apache.rocketmq.v2.Message toProtobuf(String namespace, MessageQueueImpl mq) {
final apache.rocketmq.v2.SystemProperties.Builder systemPropertiesBuilder =
apache.rocketmq.v2.SystemProperties.newBuilder()
// Message keys
Expand All @@ -112,7 +112,7 @@ public apache.rocketmq.v2.Message toProtobuf(MessageQueueImpl mq) {
// Message group
this.getMessageGroup().ifPresent(systemPropertiesBuilder::setMessageGroup);
final SystemProperties systemProperties = systemPropertiesBuilder.build();
Resource topicResource = Resource.newBuilder().setName(getTopic()).build();
Resource topicResource = Resource.newBuilder().setResourceNamespace(namespace).setName(getTopic()).build();
return apache.rocketmq.v2.Message.newBuilder()
// Topic
.setTopic(topicResource)
Expand Down
Loading
Loading