Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #9152] Broker getConsumeStats supports inputting multiple topics #9153

Merged
merged 5 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1947,37 +1947,21 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx,
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
try {
final GetConsumeStatsRequestHeader requestHeader = request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);
ConsumeStats consumeStats = new ConsumeStats();
List<String> topicListProvided = requestHeader.fetchTopicList();
String topicProvided = requestHeader.getTopic();
String group = requestHeader.getConsumerGroup();

Set<String> topics = new HashSet<>();
if (UtilAll.isBlank(requestHeader.getTopic())) {
topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup());
} else {
topics.add(requestHeader.getTopic());
}
ConsumeStats consumeStats = new ConsumeStats();
Set<String> topicsForCollecting = getTopicsForCollectingConsumeStats(topicListProvided, topicProvided, group);

for (String topic : topics) {
for (String topic : topicsForCollecting) {
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (null == topicConfig) {
LOGGER.warn("AdminBrokerProcessor#getConsumeStats: topic config does not exist, topic={}", topic);
continue;
}

TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);

{
SubscriptionData findSubscriptionData =
this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);

if (null == findSubscriptionData
&& this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) {
LOGGER.warn(
"AdminBrokerProcessor#getConsumeStats: topic does not exist in consumer group's subscription, "
+ "topic={}, consumer group={}", topic, requestHeader.getConsumerGroup());
continue;
}
}

for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue();
mq.setTopic(topic);
Expand Down Expand Up @@ -2038,6 +2022,38 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx,
return response;
}

private Set<String> getTopicsForCollectingConsumeStats(List<String> topicListProvided, String topicProvided,
String group) {
Set<String> topicsForCollecting = new HashSet<>();
if (!topicListProvided.isEmpty()) {
// if topic list is provided, only collect the topics in the list
// and ignore subscription check
topicsForCollecting.addAll(topicListProvided);
} else {
// In order to be compatible with the old logic,
// even if the topic has been provided here, the subscription will be checked.
if (UtilAll.isBlank(topicProvided)) {
topicsForCollecting.addAll(
this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(group));
} else {
topicsForCollecting.add(topicProvided);
}
int subscriptionCount = this.brokerController.getConsumerManager().findSubscriptionDataCount(group);
Iterator<String> iterator = topicsForCollecting.iterator();
while (iterator.hasNext()) {
String topic = iterator.next();
SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic);
if (findSubscriptionData == null && subscriptionCount > 0) {
LOGGER.warn(
"AdminBrokerProcessor#getConsumeStats: topic does not exist in consumer group's subscription, topic={}, consumer group={}",
topic, group);
iterator.remove();
}
}
}
qianye1001 marked this conversation as resolved.
Show resolved Hide resolved
return topicsForCollecting;
}

private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1748,16 +1748,27 @@ public TopicStatsTable getTopicStatsInfo(final String addr, final String topic,
public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final long timeoutMillis)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
MQBrokerException {
return getConsumeStats(addr, consumerGroup, null, timeoutMillis);
return getConsumeStats(addr, consumerGroup, null, null, timeoutMillis);
}

public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final List<String> topicList,
final long timeoutMillis) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException {
return getConsumeStats(addr, consumerGroup, null, topicList, timeoutMillis);
}

public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic,
final long timeoutMillis)
final long timeoutMillis) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException {
return getConsumeStats(addr, consumerGroup, topic, null, timeoutMillis);
}

public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic,
final List<String> topicList, final long timeoutMillis)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
MQBrokerException {
GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setTopic(topic);
requestHeader.updateTopicList(topicList);

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.action.Action;
import org.apache.rocketmq.common.action.RocketMQAction;
import org.apache.rocketmq.common.resource.ResourceType;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RequestCode;

@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, action = Action.GET)
@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, resource = ResourceType.CLUSTER, action = Action.GET)
public class ExportRocksDBConfigToJsonRequestHeader implements CommandCustomHeader {
private static final String CONFIG_TYPE_SEPARATOR = ";";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,62 @@
package org.apache.rocketmq.remoting.protocol.header;

import com.google.common.base.MoreObjects;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.action.Action;
import org.apache.rocketmq.common.action.RocketMQAction;
import org.apache.rocketmq.common.resource.ResourceType;
import org.apache.rocketmq.common.resource.RocketMQResource;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.rpc.TopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.rpc.TopicRequestHeader;

@RocketMQAction(value = RequestCode.GET_CONSUME_STATS, action = Action.GET)
public class GetConsumeStatsRequestHeader extends TopicRequestHeader {
private static final String TOPIC_NAME_SEPARATOR = ";";

@CFNotNull
@RocketMQResource(ResourceType.GROUP)
private String consumerGroup;

@RocketMQResource(ResourceType.TOPIC)
private String topic;

// if topicList is provided, topic will be ignored
@RocketMQResource(value = ResourceType.TOPIC, splitter = TOPIC_NAME_SEPARATOR)
private String topicList;

@Override
public void checkFields() throws RemotingCommandException {
}

public List<String> fetchTopicList() {
if (StringUtils.isBlank(topicList)) {
return Collections.emptyList();
}
return Arrays.asList(StringUtils.split(topicList, TOPIC_NAME_SEPARATOR));
}

public void updateTopicList(List<String> topicList) {
if (topicList == null || topicList.isEmpty()) {
return;
}
StringBuilder sb = new StringBuilder();
topicList.forEach(topic -> sb.append(topic).append(TOPIC_NAME_SEPARATOR));
this.setTopicList(sb.toString());
}

public String getTopicList() {
return topicList;
}

public void setTopicList(String topicList) {
this.topicList = topicList;
}

public String getConsumerGroup() {
return consumerGroup;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.protocol.header;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

public class GetConsumeStatsRequestHeaderTest {

private GetConsumeStatsRequestHeader header;

@Before
public void setUp() {
header = new GetConsumeStatsRequestHeader();
}

@Test
public void updateTopicList_NullTopicList_DoesNotUpdate() {
header.updateTopicList(null);
assertNull(header.getTopicList());
}

@Test
public void updateTopicList_EmptyTopicList_SetsEmptyString() {
header.updateTopicList(Collections.emptyList());
assertNull(header.getTopicList());
}

@Test
public void updateTopicList_SingleTopic_SetsSingleTopicString() {
List<String> topicList = Collections.singletonList("TopicA");
header.updateTopicList(topicList);
assertEquals("TopicA;", header.getTopicList());
}

@Test
public void updateTopicList_MultipleTopics_SetsMultipleTopicsString() {
List<String> topicList = Arrays.asList("TopicA", "TopicB", "TopicC");
header.updateTopicList(topicList);
assertEquals("TopicA;TopicB;TopicC;", header.getTopicList());
}

@Test
public void updateTopicList_RepeatedTopics_SetsRepeatedTopicsString() {
List<String> topicList = Arrays.asList("TopicA", "TopicA", "TopicB");
header.updateTopicList(topicList);
assertEquals("TopicA;TopicA;TopicB;", header.getTopicList());
}

@Test
public void fetchTopicList_NullTopicList_ReturnsEmptyList() {
header.setTopicList(null);
List<String> topicList = header.fetchTopicList();
assertEquals(Collections.emptyList(), topicList);

header.updateTopicList(new ArrayList<>());
topicList = header.fetchTopicList();
assertEquals(Collections.emptyList(), topicList);
}

@Test
public void fetchTopicList_EmptyTopicList_ReturnsEmptyList() {
header.setTopicList("");
List<String> topicList = header.fetchTopicList();
assertEquals(Collections.emptyList(), topicList);
}

@Test
public void fetchTopicList_BlankTopicList_ReturnsEmptyList() {
header.setTopicList(" ");
List<String> topicList = header.fetchTopicList();
assertEquals(Collections.emptyList(), topicList);
}

@Test
public void fetchTopicList_SingleTopic_ReturnsSingleTopicList() {
header.setTopicList("TopicA");
List<String> topicList = header.fetchTopicList();
assertEquals(Collections.singletonList("TopicA"), topicList);
}

@Test
public void fetchTopicList_MultipleTopics_ReturnsTopicList() {
header.setTopicList("TopicA;TopicB;TopicC");
List<String> topicList = header.fetchTopicList();
assertEquals(Arrays.asList("TopicA", "TopicB", "TopicC"), topicList);
}

@Test
public void fetchTopicList_TopicListEndsWithSeparator_ReturnsTopicList() {
header.setTopicList("TopicA;TopicB;");
List<String> topicList = header.fetchTopicList();
assertEquals(Arrays.asList("TopicA", "TopicB"), topicList);
}

@Test
public void fetchTopicList_TopicListStartsWithSeparator_ReturnsTopicList() {
header.setTopicList(";TopicA;TopicB");
List<String> topicList = header.fetchTopicList();
assertEquals(Arrays.asList("TopicA", "TopicB"), topicList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ public void testMessageTrackDetail() throws InterruptedException, RemotingExcept
connection.setConnectionSet(connections);
when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(connection);
ConsumeStats consumeStats = new ConsumeStats();
when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), isNull(), anyLong())).thenReturn(consumeStats);
when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), (String) isNull(), anyLong())).thenReturn(consumeStats);
List<MessageTrack> broadcastMessageTracks = defaultMQAdminExt.messageTrackDetail(messageExt);
assertThat(broadcastMessageTracks.size()).isEqualTo(2);
assertThat(broadcastMessageTracks.get(0).getTrackType()).isEqualTo(TrackType.CONSUME_BROADCASTING);
Expand Down
Loading