Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #9044]Supplemental checks that are missing from the UpdateSubGroup&UpdateSubGroupList  operation #9058

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,9 @@ private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext c

SubscriptionGroupConfig config = RemotingSerializable.decode(request.getBody(), SubscriptionGroupConfig.class);
if (config != null) {
if (!updateSubGroupPreCheck(config, response)) {
return response;
}
this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfig(config);
}

Expand All @@ -1638,15 +1641,18 @@ private RemotingCommand updateAndCreateSubscriptionGroupList(ChannelHandlerConte
final List<SubscriptionGroupConfig> groupConfigList = subscriptionGroupList.getGroupConfigList();

final StringBuilder builder = new StringBuilder();
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
for (SubscriptionGroupConfig config : groupConfigList) {
if (!updateSubGroupPreCheck(config, response)) {
return response;
}
builder.append(config.getGroupName()).append(";");
}
final String groupNames = builder.toString();
LOGGER.info("AdminBrokerProcessor#updateAndCreateSubscriptionGroupList: groupNames: {}, called by {}",
groupNames,
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

final RemotingCommand response = RemotingCommand.createResponseCommand(null);
try {
this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfigList(groupConfigList);
response.setCode(ResponseCode.SUCCESS);
Expand All @@ -1665,6 +1671,15 @@ private RemotingCommand updateAndCreateSubscriptionGroupList(ChannelHandlerConte
return response;
}

private boolean updateSubGroupPreCheck(SubscriptionGroupConfig config,RemotingCommand resp) {
if (StringUtils.isBlank(config.getGroupName())) {
resp.setCode(ResponseCode.ILLEGAL_ARGUMENT);
resp.setRemark("The subscription group name cannot be empty");
return false;
}
return true;
}

private void initConsumerOffset(String clientHost, String groupName, int mode, TopicConfig topicConfig)
throws ConsumeQueueException {
String topic = topicConfig.getTopicName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupList;
import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader;
Expand Down Expand Up @@ -633,6 +634,30 @@ public void testUpdateAndCreateSubscriptionGroup() throws RemotingCommandExcepti
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}

@Test
public void testEmptyNameWhenUpdateSubGroup() throws RemotingCommandException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName("");
request.setBody(JSON.toJSON(subscriptionGroupConfig).toString().getBytes());
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.ILLEGAL_ARGUMENT);
}

@Test
public void testEmptyNameWhenUpdateSubGroupList() throws RemotingCommandException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP_LIST, null);
SubscriptionGroupList groupList = new SubscriptionGroupList();
List<SubscriptionGroupConfig> list = new ArrayList<>();
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName("");
list.add(subscriptionGroupConfig);
groupList.setGroupConfigList(list);
request.setBody(JSON.toJSON(groupList).toString().getBytes());
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.ILLEGAL_ARGUMENT);
}

@Test
public void testGetAllSubscriptionGroupInRocksdb() throws Exception {
initRocksdbSubscriptionManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public class ResponseCode extends RemotingSysResponseCode {

public static final int ILLEGAL_OPERATION = 604;

public static final int ILLEGAL_ARGUMENT = 605;

public static final int RPC_UNKNOWN = -1000;
public static final int RPC_ADDR_IS_NULL = -1002;
public static final int RPC_SEND_TO_CHANNEL_FAILED = -1004;
Expand Down
Loading