Skip to content

Commit

Permalink
[ISSUES #969] Support AdminBrokerProcessor get_all_consumer_offset (#970
Browse files Browse the repository at this point in the history
)
  • Loading branch information
847850277 committed Sep 10, 2024
1 parent 27debb7 commit df3c5ca
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 0 deletions.
5 changes: 5 additions & 0 deletions rocketmq-broker/src/processor/admin_broker_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ impl AdminBrokerProcessor {
.get_consume_stats(channel, ctx, request_code, request)
.await
}
RequestCode::GetAllConsumerOffset => {
self.consumer_request_handler
.get_all_consumer_offset(channel, ctx, request_code, request)
.await
}
RequestCode::GetTopicConfig => {
self.topic_request_handler
.get_topic_config(channel, ctx, request_code, request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::collections::HashSet;

use rocketmq_common::common::config_manager::ConfigManager;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::code::response_code::ResponseCode;
Expand Down Expand Up @@ -216,4 +217,25 @@ impl ConsumerRequestHandler {
response.set_body_mut_ref(Some(body));
Some(response)
}

pub async fn get_all_consumer_offset(
&mut self,
_channel: Channel,
_ctx: ConnectionHandlerContext,
_request_code: RequestCode,
_request: RemotingCommand,
) -> Option<RemotingCommand> {
let mut response = RemotingCommand::create_response_command();
let content = self.inner.consumer_offset_manager.encode();
if !content.is_empty() {
response.set_body_mut_ref(Some(content));
Some(response)
} else {
Some(
response
.set_code(ResponseCode::SystemError)
.set_remark(Some("No consumer offset in this broker".to_string())),
)
}
}
}

0 comments on commit df3c5ca

Please sign in to comment.