From 54234bcfa084ebec3fa00630ecd11c63dc3de5ed Mon Sep 17 00:00:00 2001 From: yx9o Date: Fri, 10 May 2024 22:38:20 +0800 Subject: [PATCH] feat: add metrics for messageType dimension --- .../apache/rocketmq/mqtt/ds/auth/AuthManagerSample.java | 9 +++++++++ .../mqtt/ds/upstream/UpstreamProcessorManager.java | 9 +++++++++ .../mqtt/ds/upstream/processor/PublishProcessor.java | 5 +++-- .../mqtt/exporter/collector/MqttMetricsCollector.java | 8 ++++++++ .../mqtt/exporter/collector/MqttMetricsInfo.java | 6 +++++- 5 files changed, 34 insertions(+), 3 deletions(-) diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/auth/AuthManagerSample.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/auth/AuthManagerSample.java index f2c43883d..bdba0cbfe 100644 --- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/auth/AuthManagerSample.java +++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/auth/AuthManagerSample.java @@ -31,6 +31,7 @@ import org.apache.rocketmq.mqtt.common.model.Remark; import org.apache.rocketmq.mqtt.common.util.HmacSHA1Util; import org.apache.rocketmq.mqtt.ds.config.ServiceConf; +import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector; import javax.annotation.Resource; import java.util.Objects; @@ -89,10 +90,18 @@ public HookResult doAuth(MqttMessage message) { logger.error("", e); } if (!Objects.equals(username, serviceConf.getUsername()) || !validateSign) { + collectAuthFailedTps(); return new HookResult(HookResult.FAIL, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD.byteValue(), Remark.AUTH_FAILED, null); } } return new HookResult(HookResult.SUCCESS, null, null); } + private void collectAuthFailedTps() { + try { + MqttMetricsCollector.collectProcessRequestTps(1, Remark.AUTH_FAILED); + } catch (Throwable e) { + logger.error("Collect prometheus error", e); + } + } } diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/UpstreamProcessorManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/UpstreamProcessorManager.java index 276cbad53..fadf0fd7c 100644 --- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/UpstreamProcessorManager.java +++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/UpstreamProcessorManager.java @@ -30,6 +30,7 @@ import org.apache.rocketmq.mqtt.ds.upstream.processor.PublishProcessor; import org.apache.rocketmq.mqtt.ds.upstream.processor.SubscribeProcessor; import org.apache.rocketmq.mqtt.ds.upstream.processor.UnSubscribeProcessor; +import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @@ -66,6 +67,7 @@ public void register() { @Override public CompletableFuture processMqttMessage(MqttMessageUpContext context, MqttMessage message) { + collectProcessRequestTps(message.fixedHeader().messageType().name()); switch (message.fixedHeader().messageType()) { case CONNECT: return connectProcessor.process(context, message); @@ -84,4 +86,11 @@ public CompletableFuture processMqttMessage(MqttMessageUpContext con return hookResult; } + private void collectProcessRequestTps(String name) { + try { + MqttMetricsCollector.collectProcessRequestTps(1, name); + } catch (Throwable e) { + logger.error("Collect prometheus error", e); + } + } } diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java index ed0057679..01b4511c6 100644 --- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java +++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java @@ -106,7 +106,7 @@ public CompletableFuture put(MqttMessageUpContext context, MqttMess message.setMsgId(msgId); message.setBornTimestamp(bornTime); message.setEmpty(isEmpty); - collectWriteBytes(message.getFirstTopic(), message.getPayload().length); + collectWriteBytesAndTps(message.getFirstTopic(), message.getPayload().length); return lmqQueueStore.putMessage(queueNames, message); } @@ -118,9 +118,10 @@ public CompletableFuture sendWillMsg(String clientId, MqttPublishMe return put(ctx, message); } - private void collectWriteBytes(String topic, int length) { + private void collectWriteBytesAndTps(String topic, int length) { try { MqttMetricsCollector.collectReadWriteMatchActionBytes(length, topic, "put"); + MqttMetricsCollector.collectPutRequestTps(1, topic); } catch (Throwable e) { logger.error("Collect prometheus error", e); } diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java index 2580dd44d..52875bee7 100644 --- a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java +++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java @@ -173,6 +173,14 @@ public static void collectReadWriteMatchActionBytes(long val, String... labels) collect(MqttMetricsInfo.READ_WRITE_MATCH_ACTION_BYTES, val, labels); } + public static void collectProcessRequestTps(long val, String... labels) throws PrometheusException { + collect(MqttMetricsInfo.PROCESS_REQUEST_TPS, val, labels); + } + + public static void collectPutRequestTps(long val, String... labels) throws PrometheusException { + collect(MqttMetricsInfo.PUT_REQUEST_TPS, val, labels); + } + private static String labels2String(String... labels) { StringBuilder sb = new StringBuilder(128); for (String label : labels) { diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java index 743f9d974..c9d66386c 100644 --- a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java +++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java @@ -39,7 +39,11 @@ public enum MqttMetricsInfo { CONNECTIONS_SIZE(Type.GAUGE, SubSystem.CS, "connections_size", "server connections size.", null, "hostName", "hostIp"), READ_WRITE_MATCH_ACTION_BYTES(Type.COUNTER, SubSystem.DS, "read_write_match_action_bytes", "lmq read write match action bytes.", null, - "hostName", "hostIp", "topic", "action"); + "hostName", "hostIp", "topic", "action"), + PROCESS_REQUEST_TPS(Type.COUNTER, SubSystem.DS, "process_request_tps", "ds process request tps.", null, + "hostName", "hostIp", "messageType"), + PUT_REQUEST_TPS(Type.COUNTER, SubSystem.DS, "put_request_tps", "ds topic put request tps.", null, + "hostName", "hostIp", "topic"); private final Type type;