Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rocketmq message grayscale support 5.1.x #1726

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.mq.grayscale.rocketmq.declarer.client51;

import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import io.sermant.core.plugin.agent.matcher.ClassMatcher;
import io.sermant.core.plugin.agent.matcher.MethodMatcher;
import io.sermant.mq.grayscale.rocketmq.interceptor.client51.RocketMqV51ConsumerConstructorInterceptor;

/**
* 5.1.x client lite pull consumer set gray consumer group declarer
*
* @author chengyouling
* @since 2024-09-07
**/
public class RocketMqV51LitePullConsumerConstructorDeclarer extends AbstractPluginDeclarer {
private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.DefaultLitePullConsumer";

private static final String[] METHOD_PARAM_TYPES = {
"java.lang.String",
"org.apache.rocketmq.remoting.RPCHook"
};

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.isConstructor()
.and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)),
new RocketMqV51ConsumerConstructorInterceptor())
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.mq.grayscale.rocketmq.declarer.client51;

import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import io.sermant.core.plugin.agent.matcher.ClassMatcher;
import io.sermant.core.plugin.agent.matcher.MethodMatcher;
import io.sermant.mq.grayscale.rocketmq.interceptor.client51.RocketMqV51ConsumerConstructorInterceptor;

/**
* 5.1.x client pull consumer set gray consumer group declarer
*
* @author chengyouling
* @since 2024-09-07
**/
public class RocketMqV51PullConsumerConstructorDeclarer extends AbstractPluginDeclarer {
private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.DefaultMQPullConsumer";

private static final String[] METHOD_PARAM_TYPES = {
"java.lang.String",
"org.apache.rocketmq.remoting.RPCHook"
};

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.isConstructor()
.and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)),
new RocketMqV51ConsumerConstructorInterceptor())
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.mq.grayscale.rocketmq.declarer.client51;

import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import io.sermant.core.plugin.agent.matcher.ClassMatcher;
import io.sermant.core.plugin.agent.matcher.MethodMatcher;
import io.sermant.mq.grayscale.rocketmq.interceptor.client51.RocketMqV51ConsumerConstructorInterceptor;

/**
* 5.1.x client push consumer set gray consumer group declarer
*
* @author chengyouling
* @since 2024-09-07
**/
public class RocketMqV51PushConsumerConstructorDeclarer extends AbstractPluginDeclarer {
private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.DefaultMQPushConsumer";

private static final String PARAMETER_STRING = "java.lang.String";

private static final String PARAMETER_HOOK = "org.apache.rocketmq.remoting.RPCHook";

private static final String PARAMETER_STRATEGY = "org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy";

private static final String[] METHOD_PARAM_TYPES = {
PARAMETER_STRING,
PARAMETER_HOOK,
PARAMETER_STRATEGY,
"boolean",
PARAMETER_STRING
};

private static final String[] METHOD_PARAM_THREE_TYPES = {
PARAMETER_STRING,
PARAMETER_HOOK,
PARAMETER_STRATEGY
};

private static final String[] METHOD_PARAM_TWO_TYPES = {
PARAMETER_STRING,
PARAMETER_HOOK
};

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.isConstructor()
.and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)
.or(MethodMatcher.paramTypesEqual(METHOD_PARAM_THREE_TYPES))
.or(MethodMatcher.paramTypesEqual(METHOD_PARAM_TWO_TYPES))),
new RocketMqV51ConsumerConstructorInterceptor())
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import io.sermant.core.utils.StringUtils;
import io.sermant.mq.grayscale.rocketmq.service.RocketMqConsumerGroupAutoCheck;
import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils;
import io.sermant.mq.grayscale.rocketmq.utils.RocketMqReflectUtils;
import io.sermant.mq.grayscale.rocketmq.utils.RocketMqSubscriptionDataUtils;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;

import java.util.Optional;
import java.util.logging.Level;
Expand All @@ -44,8 +44,9 @@ public class RocketMqPullConsumerSubscriptionUpdateInterceptor extends RocketMqA

@Override
public ExecuteContext doAfter(ExecuteContext context) throws Exception {
SubscriptionData subscriptionData = (SubscriptionData) context.getResult();
if (RocketMqSubscriptionDataUtils.isExpressionTypeInaccurate(subscriptionData.getExpressionType())) {
Object subscriptionData = context.getResult();
if (RocketMqSubscriptionDataUtils
.isExpressionTypeInaccurate(RocketMqReflectUtils.getExpressionType(subscriptionData))) {
return context;
}
Optional<Object> fieldValue = ReflectUtils.getFieldValue(context.getObject(), "mQClientFactory");
Expand All @@ -58,19 +59,20 @@ public ExecuteContext doAfter(ExecuteContext context) throws Exception {
return context;
}

private void buildSql92SubscriptionData(ExecuteContext context, SubscriptionData subscriptionData,
private void buildSql92SubscriptionData(ExecuteContext context, Object subscriptionData,
MQClientInstance instance) {
DefaultMQPullConsumer pullConsumer
= ((DefaultMQPullConsumerImpl) context.getObject()).getDefaultMQPullConsumer();
String consumerGroup = pullConsumer.getConsumerGroup();
if (StringUtils.isEmpty(RocketMqGrayscaleConfigUtils.getGrayGroupTag())) {
RocketMqConsumerGroupAutoCheck.setMqClientInstance(subscriptionData.getTopic(), consumerGroup, instance);
RocketMqConsumerGroupAutoCheck.setMqClientInstance(RocketMqReflectUtils.getTopic(subscriptionData),
consumerGroup, instance);
RocketMqConsumerGroupAutoCheck.syncUpdateCacheGrayTags();
RocketMqConsumerGroupAutoCheck.startSchedulerCheckGroupTask();
}
String namesrvAddr = instance.getClientConfig().getNamesrvAddr();
String subscribeScope = RocketMqSubscriptionDataUtils.buildSubscribeScope(subscriptionData.getTopic(),
consumerGroup, namesrvAddr);
String subscribeScope = RocketMqSubscriptionDataUtils
.buildSubscribeScope(RocketMqReflectUtils.getTopic(subscriptionData), consumerGroup, namesrvAddr);
RocketMqSubscriptionDataUtils.resetsSql92SubscriptionData(subscriptionData, subscribeScope);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import io.sermant.core.utils.StringUtils;
import io.sermant.mq.grayscale.rocketmq.service.RocketMqConsumerGroupAutoCheck;
import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils;
import io.sermant.mq.grayscale.rocketmq.utils.RocketMqReflectUtils;
import io.sermant.mq.grayscale.rocketmq.utils.RocketMqSubscriptionDataUtils;

import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;

import java.util.concurrent.ConcurrentMap;

Expand All @@ -39,23 +39,24 @@ public class RocketMqSchedulerRebuildSubscriptionInterceptor extends RocketMqAbs

@Override
public ExecuteContext doAfter(ExecuteContext context) throws Exception {
ConcurrentMap<String, SubscriptionData> map = (ConcurrentMap<String, SubscriptionData>) context.getResult();
ConcurrentMap<String, Object> map = (ConcurrentMap<String, Object>) context.getResult();
RebalanceImpl balance = (RebalanceImpl) context.getObject();
if (balance.getConsumerGroup() == null) {
return context;
}
for (SubscriptionData subscriptionData : map.values()) {
if (RocketMqSubscriptionDataUtils.isExpressionTypeInaccurate(subscriptionData.getExpressionType())) {
for (Object subscriptionData : map.values()) {
if (RocketMqSubscriptionDataUtils
.isExpressionTypeInaccurate(RocketMqReflectUtils.getExpressionType(subscriptionData))) {
continue;
}
buildSql92SubscriptionData(subscriptionData, balance);
}
return context;
}

private void buildSql92SubscriptionData(SubscriptionData subscriptionData, RebalanceImpl balance) {
private void buildSql92SubscriptionData(Object subscriptionData, RebalanceImpl balance) {
synchronized (lock) {
String topic = subscriptionData.getTopic();
String topic = RocketMqReflectUtils.getTopic(subscriptionData);
if (!RocketMqSubscriptionDataUtils.getGrayTagChangeFlag(topic, balance)) {
return;
}
Expand All @@ -74,7 +75,7 @@ private void buildSql92SubscriptionData(SubscriptionData subscriptionData, Rebal
}
}

private void resetsSql92SubscriptionData(String topic, String consumerGroup, SubscriptionData subscriptionData,
private void resetsSql92SubscriptionData(String topic, String consumerGroup, Object subscriptionData,
String namesrvAddr) {
String subscribeScope = RocketMqSubscriptionDataUtils.buildSubscribeScope(topic, consumerGroup,
namesrvAddr);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.mq.grayscale.rocketmq.interceptor.client51;

import io.sermant.core.plugin.agent.entity.ExecuteContext;
import io.sermant.core.plugin.agent.interceptor.AbstractInterceptor;
import io.sermant.core.utils.StringUtils;
import io.sermant.mq.grayscale.config.MqGrayConfigCache;
import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils;

/**
* upper 5.1.x DefaultMQPushConsumer/DefaultLitePullConsumer/DefaultMQPullConsumer Constructor method interceptor
* gray scene reset consumerGroup with grayGroupTag
*
* @author chengyouling
* @since 2024-05-27
**/
public class RocketMqV51ConsumerConstructorInterceptor extends AbstractInterceptor {
@Override
public ExecuteContext before(ExecuteContext context) throws Exception {
if (!MqGrayConfigCache.getCacheConfig().isEnabled()) {
return context;
}
String grayGroupTag = RocketMqGrayscaleConfigUtils.getGrayGroupTag();
if (StringUtils.isEmpty(grayGroupTag)) {
return context;
}
String originGroup = (String) context.getArguments()[0];
context.getArguments()[0]
= originGroup.contains("_" + grayGroupTag) ? originGroup : originGroup + "_" + grayGroupTag;
return context;
}

@Override
public ExecuteContext after(ExecuteContext context) throws Exception {
return context;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,14 @@
import io.sermant.mq.grayscale.config.MqGrayConfigCache;
import io.sermant.mq.grayscale.rocketmq.config.RocketMqConsumerClientConfig;
import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils;
import io.sermant.mq.grayscale.rocketmq.utils.RocketMqReflectUtils;
import io.sermant.mq.grayscale.rocketmq.utils.RocketMqSubscriptionDataUtils;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
Expand Down Expand Up @@ -158,22 +156,22 @@ private static Set<String> findGrayConsumerGroupAndGetTags(RocketMqConsumerClien
try {
MQClientAPIImpl mqClientApi = clientConfig.getMqClientInstance().getMQClientAPIImpl();
String brokerAddress = getBrokerAddress(clientConfig.getTopic(), mqClientApi);
GroupList groupList = mqClientApi.queryTopicConsumeByWho(brokerAddress, clientConfig.getTopic(),
ROCKET_MQ_READ_TIMEOUT);
Object groupList = RocketMqReflectUtils.queryTopicConsumeByWho(mqClientApi, brokerAddress,
clientConfig.getTopic(), ROCKET_MQ_READ_TIMEOUT);
return getGrayTagsByConsumerGroup(groupList, brokerAddress, mqClientApi,
clientConfig.getConsumerGroup());
} catch (MQClientException | InterruptedException | RemotingTimeoutException | RemotingSendRequestException
| RemotingConnectException | MQBrokerException e) {
| RemotingConnectException e) {
LOGGER.log(Level.FINE, String.format(Locale.ENGLISH, "[auto-check] error, message: %s",
e.getMessage()), e);
}
return new HashSet<>();
}

private static Set<String> getGrayTagsByConsumerGroup(GroupList groupList, String brokerAddress,
private static Set<String> getGrayTagsByConsumerGroup(Object groupList, String brokerAddress,
MQClientAPIImpl mqClientApi, String consumerGroup) {
Set<String> grayTags = new HashSet<>();
for (String group : groupList.getGroupList()) {
for (String group : RocketMqReflectUtils.getGroupList(groupList)) {
try {
List<String> consumerIds = mqClientApi.getConsumerIdListByGroup(brokerAddress, group,
ROCKET_MQ_READ_TIMEOUT);
Expand All @@ -196,11 +194,11 @@ private static Set<String> getGrayTagsByConsumerGroup(GroupList groupList, Strin
private static String getBrokerAddress(String topic, MQClientAPIImpl mqClientApi)
throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException,
InterruptedException, MQClientException {
TopicRouteData topicRouteData = mqClientApi.getTopicRouteInfoFromNameServer(topic, ROCKET_MQ_READ_TIMEOUT,
false);
Object topicRouteData = RocketMqReflectUtils.getTopicRouteInfoFromNameServer(mqClientApi, topic,
ROCKET_MQ_READ_TIMEOUT, false);
List<String> brokerList = new ArrayList<>();
for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
brokerList.addAll(brokerData.getBrokerAddrs().values());
for (Object brokerData : RocketMqReflectUtils.getBrokerDatas(topicRouteData)) {
brokerList.addAll(RocketMqReflectUtils.getBrokerAddrs(brokerData).values());
}

// cluster mode has multiple addresses, just select one
Expand Down
Loading
Loading