From df3c5cacee84e3eb2c9f20040c0cbab4ce715967 Mon Sep 17 00:00:00 2001 From: zhengpeng <847850277@qq.com> Date: Tue, 10 Sep 2024 09:07:40 +0800 Subject: [PATCH] [ISSUES #969] Support AdminBrokerProcessor get_all_consumer_offset (#970) --- .../src/processor/admin_broker_processor.rs | 5 +++++ .../consumer_request_handler.rs | 22 +++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/rocketmq-broker/src/processor/admin_broker_processor.rs b/rocketmq-broker/src/processor/admin_broker_processor.rs index 0b7f009f..f24d3c48 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor.rs @@ -156,6 +156,11 @@ impl AdminBrokerProcessor { .get_consume_stats(channel, ctx, request_code, request) .await } + RequestCode::GetAllConsumerOffset => { + self.consumer_request_handler + .get_all_consumer_offset(channel, ctx, request_code, request) + .await + } RequestCode::GetTopicConfig => { self.topic_request_handler .get_topic_config(channel, ctx, request_code, request) diff --git a/rocketmq-broker/src/processor/admin_broker_processor/consumer_request_handler.rs b/rocketmq-broker/src/processor/admin_broker_processor/consumer_request_handler.rs index 1bc44947..f990c367 100644 --- a/rocketmq-broker/src/processor/admin_broker_processor/consumer_request_handler.rs +++ b/rocketmq-broker/src/processor/admin_broker_processor/consumer_request_handler.rs @@ -17,6 +17,7 @@ use std::collections::HashSet; +use rocketmq_common::common::config_manager::ConfigManager; use rocketmq_common::common::message::message_queue::MessageQueue; use rocketmq_remoting::code::request_code::RequestCode; use rocketmq_remoting::code::response_code::ResponseCode; @@ -216,4 +217,25 @@ impl ConsumerRequestHandler { response.set_body_mut_ref(Some(body)); Some(response) } + + pub async fn get_all_consumer_offset( + &mut self, + _channel: Channel, + _ctx: ConnectionHandlerContext, + _request_code: RequestCode, + _request: RemotingCommand, + ) -> Option { + let mut response = RemotingCommand::create_response_command(); + let content = self.inner.consumer_offset_manager.encode(); + if !content.is_empty() { + response.set_body_mut_ref(Some(content)); + Some(response) + } else { + Some( + response + .set_code(ResponseCode::SystemError) + .set_remark(Some("No consumer offset in this broker".to_string())), + ) + } + } }