From 6cf29d7f60913599643da87993e7757114451424 Mon Sep 17 00:00:00 2001 From: ScarbWin Date: Thu, 9 Jan 2025 01:26:59 +0800 Subject: [PATCH] add producer stats manager --- .../rocketmq/client/impl/MQClientAPIImpl.java | 18 ++++- .../client/impl/factory/MQClientInstance.java | 7 ++ .../impl/producer/DefaultMQProducerImpl.java | 5 ++ .../client/stat/ProducerStatsManager.java | 71 +++++++++++++++++++ 4 files changed, 100 insertions(+), 1 deletion(-) create mode 100644 client/src/main/java/org/apache/rocketmq/client/stat/ProducerStatsManager.java diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index c462dd1241c..8fff2e0b1c8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -629,6 +629,8 @@ public SendResult sendMessage( switch (communicationMode) { case ONEWAY: this.remotingClient.invokeOneway(addr, request, timeoutMillis); + instance.getProducerStatsManager().incSendTimes(msg.getTopic(), 1); + instance.getProducerStatsManager().incSendRT(msg.getTopic(), System.currentTimeMillis() - beginStartTime); return null; case ASYNC: final AtomicInteger times = new AtomicInteger(); @@ -644,7 +646,15 @@ public SendResult sendMessage( if (timeoutMillis < costTimeSync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } - return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); + try { + SendResult result = this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); + instance.getProducerStatsManager().incSendTimes(msg.getTopic(), 1); + instance.getProducerStatsManager().incSendRT(msg.getTopic(), System.currentTimeMillis() - beginStartTime); + return result; + } catch (Exception e) { + instance.getProducerStatsManager().incSendFailedTimes(msg.getTopic(), 1); + throw e; + } default: assert false; break; @@ -709,6 +719,8 @@ public void operationSucceed(RemotingCommand response) { } producer.updateFaultItem(brokerName, System.currentTimeMillis() - beginStartTime, false, true); + instance.getProducerStatsManager().incSendTimes(msg.getTopic(), 1); + instance.getProducerStatsManager().incSendRT(msg.getTopic(), cost); return; } @@ -726,6 +738,8 @@ public void operationSucceed(RemotingCommand response) { } producer.updateFaultItem(brokerName, System.currentTimeMillis() - beginStartTime, false, true); + instance.getProducerStatsManager().incSendTimes(msg.getTopic(), 1); + instance.getProducerStatsManager().incSendRT(msg.getTopic(), cost); } catch (Exception e) { producer.updateFaultItem(brokerName, System.currentTimeMillis() - beginStartTime, true, true); onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, @@ -737,6 +751,7 @@ public void operationSucceed(RemotingCommand response) { public void operationFail(Throwable throwable) { producer.updateFaultItem(brokerName, System.currentTimeMillis() - beginStartTime, true, true); long cost = System.currentTimeMillis() - beginStartTime; + instance.getProducerStatsManager().incSendFailedTimes(msg.getTopic(), 1); if (throwable instanceof RemotingSendRequestException) { MQClientException ex = new MQClientException("send request failed", throwable); onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, @@ -756,6 +771,7 @@ public void operationFail(Throwable throwable) { } catch (Exception ex) { long cost = System.currentTimeMillis() - beginStartTime; producer.updateFaultItem(brokerName, cost, true, false); + instance.getProducerStatsManager().incSendFailedTimes(msg.getTopic(), 1); onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index eba654c22d0..beb7943b05e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -57,6 +57,7 @@ import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.stat.ConsumerStatsManager; +import org.apache.rocketmq.client.stat.ProducerStatsManager; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; @@ -137,6 +138,7 @@ public Thread newThread(Runnable r) { private final PullMessageService pullMessageService; private final RebalanceService rebalanceService; private final DefaultMQProducer defaultMQProducer; + private final ProducerStatsManager producerStatsManager; private final ConsumerStatsManager consumerStatsManager; private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0); private ServiceState serviceState = ServiceState.CREATE_JUST; @@ -214,6 +216,7 @@ public void onChannelActive(String remoteAddr, Channel channel) { this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP); this.defaultMQProducer.resetClientConfig(clientConfig); + this.producerStatsManager = new ProducerStatsManager(this.scheduledExecutorService); this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService); log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}", @@ -1387,6 +1390,10 @@ public ConsumerStatsManager getConsumerStatsManager() { return consumerStatsManager; } + public ProducerStatsManager getProducerStatsManager() { + return producerStatsManager; + } + public NettyClientConfig getNettyClientConfig() { return nettyClientConfig; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 15264f0e503..f7a437fd7c5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -67,6 +67,7 @@ import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.client.producer.TransactionSendResult; +import org.apache.rocketmq.client.stat.ProducerStatsManager; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.ThreadFactoryImpl; @@ -1061,6 +1062,10 @@ private SendResult sendKernelImpl(final Message msg, requestHeader, timeout - costTimeSync, communicationMode, + null, + null, + this.mQClientFactory, + 0, context, this); break; diff --git a/client/src/main/java/org/apache/rocketmq/client/stat/ProducerStatsManager.java b/client/src/main/java/org/apache/rocketmq/client/stat/ProducerStatsManager.java new file mode 100644 index 00000000000..20cbaa4c4c1 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/stat/ProducerStatsManager.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.stat; + +import java.util.concurrent.ScheduledExecutorService; +import org.apache.rocketmq.common.stats.StatsItemSet; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + +public class ProducerStatsManager { + private static final Logger log = LoggerFactory.getLogger(ProducerStatsManager.class); + + private static final String TOPIC_SEND_OK_TPS = "SEND_OK_TPS"; + private static final String TOPIC_SEND_FAILED_TPS = "SEND_FAILED_TPS"; + private static final String TOPIC_SEND_RT = "SEND_RT"; + + private final StatsItemSet topicSendOKTPS; + private final StatsItemSet topicSendFailedTPS; + private final StatsItemSet topicSendRT; + + public ProducerStatsManager(final ScheduledExecutorService scheduledExecutorService) { + this.topicSendOKTPS = new StatsItemSet(TOPIC_SEND_OK_TPS, scheduledExecutorService, log); + this.topicSendFailedTPS = new StatsItemSet(TOPIC_SEND_FAILED_TPS, scheduledExecutorService, log); + this.topicSendRT = new StatsItemSet(TOPIC_SEND_RT, scheduledExecutorService, log); + } + + public void start() { + } + + public void shutdown() { + } + + public void incSendTimes(final String topic, final int times) { + this.topicSendOKTPS.addValue(topic, times, 1); + } + + public void incSendFailedTimes(final String topic, final int times) { + this.topicSendFailedTPS.addValue(topic, times, 1); + } + + public void incSendRT(final String topic, final long rt) { + this.topicSendRT.addValue(topic, (int)rt, 1); + } + + public StatsItemSet getTopicSendOKTPS() { + return topicSendOKTPS; + } + + public StatsItemSet getTopicSendFailedTPS() { + return topicSendFailedTPS; + } + + public StatsItemSet getTopicSendRT() { + return topicSendRT; + } +}