Skip to content

Commit

Permalink
[ISSUE #6386] Some improvements for compactionTopic (#6387)
Browse files Browse the repository at this point in the history
* 1、Graceful shutdown needs to wait for the compactionTopic to build the index
2、Exception handling that may occur when adding pullMessageFromMaster
3、Update usage docs

* fix doc

---------

Co-authored-by: guyinyou <[email protected]>
  • Loading branch information
guyinyou and guyinyou authored Mar 22, 2023
1 parent 1e2a301 commit c17baf1
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 16 deletions.
16 changes: 15 additions & 1 deletion docs/cn/Example_Compaction_Topic_cn.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
# Compaction Topic

## 使用方式

### 打开namesrv上支持顺序消息的开关
CompactionTopic依赖顺序消息来保障一致性
```shell
$ bin/mqadmin updateNamesrvConfig -k orderMessageEnable -v true
```

### 创建compaction topic

```shell
$ bin/mqadmin updateTopic -w 8 -r 8 -a +delete.policy=COMPACTION -n localhost:9876 -t ctopic -c DefaultCluster
$ bin/mqadmin updateTopic -w 8 -r 8 -a +cleanup.policy=COMPACTION -n localhost:9876 -t ctopic -o true -c DefaultCluster
create topic to 127.0.0.1:10911 success.
TopicConfig [topicName=ctopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={+delete.policy=COMPACTION}]
```

### 生产数据

与普通消息一样

```java
DefaultMQProducer producer = new DefaultMQProducer("CompactionTestGroup");
producer.setNamesrvAddr("localhost:9876");
Expand All @@ -28,9 +39,12 @@ SendResult sendResult = producer.send(msg, (mqs, message, shardingKey) -> {

System.out.printf("%s%n", sendResult);
```

### 消费数据

消费offset与compaction之前保持不变,如果指定offset消费,当指定的offset不存在时,返回后面最近的一条数据
在compaction场景下,大部分消费都是从0开始消费完整的数据

```java
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("compactionTestGroup");
consumer.setNamesrvAddr("localhost:9876");
Expand Down
22 changes: 19 additions & 3 deletions docs/en/Example_Compaction_Topic.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
# Compaction Topic

## use example

### Turn on the opening of support for orderMessages on namesrv
CompactionTopic relies on orderMessages to ensure consistency
```shell
$ bin/mqadmin updateNamesrvConfig -k orderMessageEnable -v true
```

### create compaction topic
```shell
$ bin/mqadmin updateTopic -w 8 -r 8 -a +delete.policy=COMPACTION -n localhost:9876 -t ctopic -c DefaultCluster
$ bin/mqadmin updateTopic -w 8 -r 8 -a +cleanup.policy=COMPACTION -n localhost:9876 -t ctopic -o true -c DefaultCluster
create topic to 127.0.0.1:10911 success.
TopicConfig [topicName=ctopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={+delete.policy=COMPACTION}]
```
Expand All @@ -15,8 +22,17 @@ DefaultMQProducer producer = new DefaultMQProducer("CompactionTestGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();

Message msg = new Message(topic, "tags", "keys", "bodys"getBytes(StandardCharsets.UTF_8));
SendResult sendResult = producer.send(msg);
String topic = "ctopic";
String tag = "tag1";
String key = "key1";
Message msg = new Message(topic, tag, key, "bodys"getBytes(StandardCharsets.UTF_8));
SendResult sendResult = producer.send(msg, (mqs, message, shardingKey) -> {
int select = Math.abs(shardingKey.hashCode());
if (select < 0) {
select = 0;
}
return mqs.get(select % mqs.size());
}, key);

System.out.printf("%s%n", sendResult);
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void putRequest(DispatchRequest request) {
}

public GetMessageResult getMessage(final String group, final String topic, final int queueId,
final long offset, final int maxMsgNums, final int maxTotalMsgSize) {
final long offset, final int maxMsgNums, final int maxTotalMsgSize) {
return compactionStore.getMessage(group, topic, queueId, offset, maxMsgNums, maxTotalMsgSize);
}

Expand Down Expand Up @@ -126,6 +126,13 @@ public boolean load(boolean exitOK) {
@Override
public void shutdown() {
super.shutdown();
while (!compactionMsgQ.isEmpty()) {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {

}
}
compactionStore.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import java.io.IOException;
import java.util.function.BiFunction;

import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
Expand Down Expand Up @@ -51,6 +53,7 @@ public class MessageFetcher implements AutoCloseable {

private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private final RemotingClient client;

public MessageFetcher() {
NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyClientConfig.setUseTLS(false);
Expand Down Expand Up @@ -85,12 +88,13 @@ private PullMessageRequestHeader createPullMessageRequest(String topic, int queu
private String getConsumerGroup(String topic, int queueId) {
return String.join("-", topic, String.valueOf(queueId), "pull", "group");
}

private String getClientId() {
return String.join("@", NetworkUtil.getLocalAddress(), "compactionIns", "compactionUnit");
}

private boolean prepare(String masterAddr, String topic, String groupName, long subVersion)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
HeartbeatData heartbeatData = new HeartbeatData();

heartbeatData.setClientID(getClientId());
Expand Down Expand Up @@ -121,7 +125,7 @@ private boolean prepare(String masterAddr, String topic, String groupName, long
}

private boolean pullDone(String masterAddr, String groupName)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader();
requestHeader.setClientID(getClientId());
requestHeader.setProducerGroup("");
Expand All @@ -140,14 +144,16 @@ private boolean stopPull(long currPullOffset, long endOffset) {
}

public void pullMessageFromMaster(String topic, int queueId, long endOffset, String masterAddr,
BiFunction<Long, RemotingCommand, Boolean> responseHandler) throws Exception {
BiFunction<Long, RemotingCommand, Boolean> responseHandler) throws Exception {
long currentPullOffset = 0;

try {
long subVersion = System.currentTimeMillis();
String groupName = getConsumerGroup(topic, queueId);
prepare(masterAddr, topic, groupName, subVersion);

if (!prepare(masterAddr, topic, groupName, subVersion)) {
log.error("{}:{} prepare to {} pull message failed", topic, queueId, masterAddr);
throw new RemotingCommandException(topic + ":" + queueId + " prepare to " + masterAddr + " pull message failed");
}

boolean noNewMsg = false;
boolean keepPull = true;
Expand All @@ -157,11 +163,11 @@ public void pullMessageFromMaster(String topic, int queueId, long endOffset, Str
PullMessageRequestHeader requestHeader = createPullMessageRequest(topic, queueId, currentPullOffset, subVersion);

RemotingCommand
request = RemotingCommand.createRequestCommand(RequestCode.LITE_PULL_MESSAGE, requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.LITE_PULL_MESSAGE, requestHeader);
RemotingCommand response = client.invokeSync(masterAddr, request, 1000 * 30L);

PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader)response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
if (responseHeader == null) {
log.error("{}:{} pull message responseHeader is null", topic, queueId);
throw new RemotingCommandException(topic + ":" + queueId + " pull message responseHeader is null");
Expand All @@ -175,20 +181,20 @@ public void pullMessageFromMaster(String topic, int queueId, long endOffset, Str
break;
case ResponseCode.PULL_NOT_FOUND: // NO_NEW_MSG, need break loop
log.info("PULL_NOT_FOUND, topic:{}, queueId:{}, pullOffset:{},",
topic, queueId, currentPullOffset);
topic, queueId, currentPullOffset);
noNewMsg = true;
break;
case ResponseCode.PULL_RETRY_IMMEDIATELY:
log.info("PULL_RETRY_IMMEDIATE, topic:{}, queueId:{}, pullOffset:{},",
topic, queueId, currentPullOffset);
topic, queueId, currentPullOffset);
break;
case ResponseCode.PULL_OFFSET_MOVED:
log.info("PULL_OFFSET_MOVED, topic:{}, queueId:{}, pullOffset:{},",
topic, queueId, currentPullOffset);
topic, queueId, currentPullOffset);
break;
default:
log.warn("Pull Message error, response code: {}, remark: {}",
response.getCode(), response.getRemark());
response.getCode(), response.getRemark());
}

if (noNewMsg || !keepPull) {
Expand Down

0 comments on commit c17baf1

Please sign in to comment.