Skip to content

Commit

Permalink
[ISSUE #8352] Fix CLIENT_REGISTER in registerConsumer (#8353)
Browse files Browse the repository at this point in the history
  • Loading branch information
drpmma authored Jul 3, 2024
1 parent 3aa5d19 commit 933ffc0
Showing 1 changed file with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,6 @@ public boolean registerConsumer(final String group, final ClientChannelInfo clie
long start = System.currentTimeMillis();
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_REGISTER, group, clientChannelInfo,
subList.stream().map(SubscriptionData::getTopic).collect(Collectors.toSet()));
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
Expand All @@ -188,6 +186,10 @@ public boolean registerConsumer(final String group, final ClientChannelInfo clie
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
if (r1) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_REGISTER, group, clientChannelInfo,
subList.stream().map(SubscriptionData::getTopic).collect(Collectors.toSet()));
}
boolean r2 = false;
if (updateSubscription) {
r2 = consumerGroupInfo.updateSubscription(subList);
Expand Down

0 comments on commit 933ffc0

Please sign in to comment.