Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #790]🚀Support Query message(request code :12,33)🔥 #791

Merged
merged 1 commit into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
use crate::processor::default_pull_message_result_handler::DefaultPullMessageResultHandler;
use crate::processor::pull_message_processor::PullMessageProcessor;
use crate::processor::pull_message_result_handler::PullMessageResultHandler;
use crate::processor::query_message_processor::QueryMessageProcessor;
use crate::processor::send_message_processor::SendMessageProcessor;
use crate::processor::BrokerRequestProcessor;
use crate::schedule::schedule_message_service::ScheduleMessageService;
Expand Down Expand Up @@ -410,7 +411,7 @@
self.message_store.clone().unwrap(),
);
self.pull_request_hold_service = Some(PullRequestHoldService::new(
message_store,
message_store.clone(),

Check warning on line 414 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L414

Added line #L414 was not covered by tests
Arc::new(pull_message_processor.clone()),
self.broker_config.clone(),
));
Expand All @@ -430,6 +431,8 @@
.set_message_arriving_listener(Some(Arc::new(Box::new(
NotifyMessageArrivingListener::new(self.pull_request_hold_service.clone().unwrap()),
))));
let query_message_processor =
QueryMessageProcessor::new(self.message_store_config.clone(), message_store.clone());

Check warning on line 435 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L435

Added line #L435 was not covered by tests

let admin_broker_processor = AdminBrokerProcessor::new(
self.broker_config.clone(),
Expand Down Expand Up @@ -459,7 +462,7 @@
),
consumer_manage_processor,
query_assignment_processor: Default::default(),
query_message_processor: Default::default(),
query_message_processor,

Check warning on line 465 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L465

Added line #L465 was not covered by tests
end_transaction_processor: Default::default(),
}
}
Expand Down
9 changes: 8 additions & 1 deletion rocketmq-broker/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
pub(crate) notification_processor: NotificationProcessor,
pub(crate) polling_info_processor: PollingInfoProcessor,
pub(crate) reply_message_processor: ReplyMessageProcessor,
pub(crate) query_message_processor: QueryMessageProcessor,
pub(crate) query_message_processor: QueryMessageProcessor<MS>,
pub(crate) client_manage_processor: ClientManageProcessor<MS>,
pub(crate) consumer_manage_processor: ConsumerManageProcessor<MS>,
pub(crate) query_assignment_processor: QueryAssignmentProcessor,
Expand Down Expand Up @@ -161,6 +161,13 @@
.process_request(channel, ctx, request_code, request)
.await
}

RequestCode::QueryMessage | RequestCode::ViewMessageById => {
self.query_message_processor
.process_request(channel, ctx, request_code, request)
.await

Check warning on line 168 in rocketmq-broker/src/processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor.rs#L166-L168

Added lines #L166 - L168 were not covered by tests
}

_ => {
self.admin_broker_processor
.process_request(channel, ctx, request_code, request)
Expand Down
122 changes: 115 additions & 7 deletions rocketmq-broker/src/processor/query_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,127 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::sync::Arc;

use rocketmq_common::common::mix_all::UNIQUE_MSG_QUERY_FLAG;
use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::code::response_code::ResponseCode;
use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::header::query_message_request_header::QueryMessageRequestHeader;
use rocketmq_remoting::protocol::header::query_message_response_header::QueryMessageResponseHeader;
use rocketmq_remoting::protocol::header::view_message_request_header::ViewMessageRequestHeader;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::runtime::server::ConnectionHandlerContext;
use rocketmq_store::config::message_store_config::MessageStoreConfig;
use rocketmq_store::log_file::MessageStore;

#[derive(Default, Clone)]
pub struct QueryMessageProcessor {}
pub struct QueryMessageProcessor<MS> {
message_store_config: Arc<MessageStoreConfig>,
message_store: Arc<MS>,

Check warning on line 34 in rocketmq-broker/src/processor/query_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_message_processor.rs#L33-L34

Added lines #L33 - L34 were not covered by tests
}

impl<MS> QueryMessageProcessor<MS> {
pub fn new(message_store_config: Arc<MessageStoreConfig>, message_store: Arc<MS>) -> Self {

Check warning on line 38 in rocketmq-broker/src/processor/query_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_message_processor.rs#L38

Added line #L38 was not covered by tests
Self {
message_store_config,
message_store,
}
}

Check warning on line 43 in rocketmq-broker/src/processor/query_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_message_processor.rs#L43

Added line #L43 was not covered by tests
}

impl<MS> QueryMessageProcessor<MS>
where
MS: MessageStore + Send + Sync + 'static,
{
pub async fn process_request(
&mut self,
channel: Channel,
ctx: ConnectionHandlerContext,
request_code: RequestCode,
request: RemotingCommand,
) -> Option<RemotingCommand> {
match request_code {
RequestCode::QueryMessage => self.query_message(channel, ctx, request).await,
RequestCode::ViewMessageById => self.view_message_by_id(channel, ctx, request).await,
_ => None,

Check warning on line 60 in rocketmq-broker/src/processor/query_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_message_processor.rs#L50-L60

Added lines #L50 - L60 were not covered by tests
}
}

Check warning on line 62 in rocketmq-broker/src/processor/query_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_message_processor.rs#L62

Added line #L62 was not covered by tests

async fn query_message(
&mut self,
_channel: Channel,
_ctx: ConnectionHandlerContext,
request: RemotingCommand,
) -> Option<RemotingCommand> {
let mut response = RemotingCommand::create_response_command_with_header(
QueryMessageResponseHeader::default(),
);
let mut request_header = request

Check warning on line 73 in rocketmq-broker/src/processor/query_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_message_processor.rs#L64-L73

Added lines #L64 - L73 were not covered by tests
.decode_command_custom_header::<QueryMessageRequestHeader>()
.unwrap();
response.set_opaque_mut(request.opaque());
let is_unique_key = request.ext_fields().unwrap().get(UNIQUE_MSG_QUERY_FLAG);
if is_unique_key.is_some() && is_unique_key.unwrap() == "true" {
request_header.max_num = self.message_store_config.default_query_max_num as i32;

Check warning on line 79 in rocketmq-broker/src/processor/query_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_message_processor.rs#L76-L79

Added lines #L76 - L79 were not covered by tests
}
let query_message_result = self

Check warning on line 81 in rocketmq-broker/src/processor/query_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_message_processor.rs#L81

Added line #L81 was not covered by tests
.message_store
.query_message(
request_header.topic.as_str(),
request_header.key.as_str(),
request_header.max_num,
request_header.begin_timestamp,
request_header.end_timestamp,

Check warning on line 88 in rocketmq-broker/src/processor/query_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_message_processor.rs#L84-L88

Added lines #L84 - L88 were not covered by tests
)
.await?;

Check warning on line 90 in rocketmq-broker/src/processor/query_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_message_processor.rs#L90

Added line #L90 was not covered by tests

let response_header = response

Check warning on line 92 in rocketmq-broker/src/processor/query_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_message_processor.rs#L92

Added line #L92 was not covered by tests
.read_custom_header_mut::<QueryMessageResponseHeader>()
.unwrap();
response_header.index_last_update_phyoffset =
query_message_result.index_last_update_phyoffset;
response_header.index_last_update_timestamp =
query_message_result.index_last_update_timestamp;

Check warning on line 98 in rocketmq-broker/src/processor/query_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_message_processor.rs#L95-L98

Added lines #L95 - L98 were not covered by tests

if query_message_result.buffer_total_size > 0 {
let message_data = query_message_result.get_message_data();
return Some(response.set_body(message_data));

Check warning on line 102 in rocketmq-broker/src/processor/query_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_message_processor.rs#L100-L102

Added lines #L100 - L102 were not covered by tests
}
Some(
response
.set_code(ResponseCode::QueryNotFound)
.set_remark(Some(
"can not find message, maybe time range not correct".to_string(),
)),

Check warning on line 109 in rocketmq-broker/src/processor/query_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_message_processor.rs#L104-L109

Added lines #L104 - L109 were not covered by tests
)
}

Check warning on line 111 in rocketmq-broker/src/processor/query_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_message_processor.rs#L111

Added line #L111 was not covered by tests

impl QueryMessageProcessor {
fn process_request(
&self,
async fn view_message_by_id(
&mut self,
_channel: Channel,

Check warning on line 115 in rocketmq-broker/src/processor/query_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_message_processor.rs#L113-L115

Added lines #L113 - L115 were not covered by tests
_ctx: ConnectionHandlerContext,
_request: RemotingCommand,
) -> RemotingCommand {
todo!()
request: RemotingCommand,
) -> Option<RemotingCommand> {
let response = RemotingCommand::create_response_command();
let request_header = request

Check warning on line 120 in rocketmq-broker/src/processor/query_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_message_processor.rs#L117-L120

Added lines #L117 - L120 were not covered by tests
.decode_command_custom_header::<ViewMessageRequestHeader>()
.unwrap();
let select_mapped_buffer_result = self

Check warning on line 123 in rocketmq-broker/src/processor/query_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_message_processor.rs#L123

Added line #L123 was not covered by tests
.message_store
.select_one_message_by_offset(request_header.offset)
.await;
if let Some(result) = select_mapped_buffer_result {
let message_data = result.get_bytes();
return Some(response.set_body(message_data));
}
Some(
response
.set_code(ResponseCode::SystemError)
.set_remark(Some(format!(

Check warning on line 134 in rocketmq-broker/src/processor/query_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_message_processor.rs#L125-L134

Added lines #L125 - L134 were not covered by tests
"can not find message by offset: {}",
request_header.offset
))),

Check warning on line 137 in rocketmq-broker/src/processor/query_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_message_processor.rs#L137

Added line #L137 was not covered by tests
)
}
}
4 changes: 4 additions & 0 deletions rocketmq-remoting/src/protocol/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ pub mod pull_message_request_header;
pub mod pull_message_response_header;
pub mod query_consumer_offset_request_header;
pub mod query_consumer_offset_response_header;
pub mod query_message_request_header;
pub mod query_message_response_header;
pub mod search_offset_response_header;
pub mod unregister_client_request_header;
pub mod update_consumer_offset_header;
pub mod view_message_request_header;
pub mod view_message_response_header;
184 changes: 184 additions & 0 deletions rocketmq-remoting/src/protocol/header/query_message_request_header.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use serde::Deserialize;
use serde::Serialize;

use crate::protocol::command_custom_header::CommandCustomHeader;
use crate::protocol::command_custom_header::FromMap;
use crate::rpc::topic_request_header::TopicRequestHeader;

#[derive(Debug, Clone, Serialize, Deserialize, Default)]

Check warning on line 24 in rocketmq-remoting/src/protocol/header/query_message_request_header.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/header/query_message_request_header.rs#L24

Added line #L24 was not covered by tests
#[serde(rename_all = "camelCase")]
pub struct QueryMessageRequestHeader {
pub topic: String,

Check warning on line 27 in rocketmq-remoting/src/protocol/header/query_message_request_header.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/header/query_message_request_header.rs#L27

Added line #L27 was not covered by tests

pub key: String,

Check warning on line 29 in rocketmq-remoting/src/protocol/header/query_message_request_header.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/header/query_message_request_header.rs#L29

Added line #L29 was not covered by tests

pub max_num: i32,

Check warning on line 31 in rocketmq-remoting/src/protocol/header/query_message_request_header.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/header/query_message_request_header.rs#L31

Added line #L31 was not covered by tests

pub begin_timestamp: i64,

Check warning on line 33 in rocketmq-remoting/src/protocol/header/query_message_request_header.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/header/query_message_request_header.rs#L33

Added line #L33 was not covered by tests

pub end_timestamp: i64,

Check warning on line 35 in rocketmq-remoting/src/protocol/header/query_message_request_header.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/header/query_message_request_header.rs#L35

Added line #L35 was not covered by tests

#[serde(flatten)]
pub topic_request_header: Option<TopicRequestHeader>,

Check warning on line 38 in rocketmq-remoting/src/protocol/header/query_message_request_header.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/header/query_message_request_header.rs#L38

Added line #L38 was not covered by tests
}

impl QueryMessageRequestHeader {
pub const TOPIC: &'static str = "topic";
pub const KEY: &'static str = "key";
pub const MAX_NUM: &'static str = "maxNum";
pub const BEGIN_TIMESTAMP: &'static str = "beginTimestamp";
pub const END_TIMESTAMP: &'static str = "endTimestamp";
}

/*impl From<&QueryMessageRequestHeader> for String {
fn from(header: &QueryMessageRequestHeader) -> Self {
SerdeJsonUtils::to_json(header).unwrap()
}
}*/

impl CommandCustomHeader for QueryMessageRequestHeader {
fn to_map(&self) -> Option<std::collections::HashMap<String, String>> {
let mut map = std::collections::HashMap::new();
map.insert(Self::TOPIC.to_string(), self.topic.clone());
map.insert(Self::KEY.to_string(), self.key.clone());
map.insert(Self::MAX_NUM.to_string(), self.max_num.to_string());
map.insert(
Self::BEGIN_TIMESTAMP.to_string(),
self.begin_timestamp.to_string(),
);
map.insert(
Self::END_TIMESTAMP.to_string(),
self.end_timestamp.to_string(),
);
if let Some(value) = self.topic_request_header.as_ref() {
if let Some(val) = value.to_map() {
map.extend(val);
}
}
Some(map)
}
}

impl FromMap for QueryMessageRequestHeader {
type Target = Self;

fn from(map: &std::collections::HashMap<String, String>) -> Option<Self::Target> {
Some(QueryMessageRequestHeader {
topic: map.get(Self::TOPIC)?.clone(),
key: map.get(Self::KEY)?.clone(),
max_num: map.get(Self::MAX_NUM)?.parse().ok()?,
begin_timestamp: map.get(Self::BEGIN_TIMESTAMP)?.parse().ok()?,
end_timestamp: map.get(Self::END_TIMESTAMP)?.parse().ok()?,
topic_request_header: <TopicRequestHeader as FromMap>::from(map),
})
}
}

#[cfg(test)]
mod query_message_request_header_tests {
use std::collections::HashMap;

use super::*;

#[test]
fn creating_from_map_with_all_fields_populates_struct_correctly() {
let mut map = HashMap::new();
map.insert("topic".to_string(), "test_topic".to_string());
map.insert("key".to_string(), "test_key".to_string());
map.insert("maxNum".to_string(), "10".to_string());
map.insert("beginTimestamp".to_string(), "1000".to_string());
map.insert("endTimestamp".to_string(), "2000".to_string());

let header = <QueryMessageRequestHeader as FromMap>::from(&map).unwrap();

assert_eq!(header.topic, "test_topic");
assert_eq!(header.key, "test_key");
assert_eq!(header.max_num, 10);
assert_eq!(header.begin_timestamp, 1000);
assert_eq!(header.end_timestamp, 2000);
}

#[test]
fn creating_from_map_missing_optional_fields_still_succeeds() {
let mut map = HashMap::new();
map.insert("topic".to_string(), "test_topic".to_string());
map.insert("key".to_string(), "test_key".to_string());
map.insert("maxNum".to_string(), "10".to_string());
map.insert("beginTimestamp".to_string(), "1000".to_string());
map.insert("endTimestamp".to_string(), "2000".to_string());

let header = <QueryMessageRequestHeader as FromMap>::from(&map).unwrap();

assert_eq!(header.topic, "test_topic");
assert_eq!(header.key, "test_key");
assert_eq!(header.max_num, 10);
}

#[test]
fn creating_from_map_with_invalid_number_fields_returns_none() {
let mut map = HashMap::new();
map.insert("topic".to_string(), "test_topic".to_string());
map.insert("key".to_string(), "test_key".to_string());
map.insert("maxNum".to_string(), "invalid".to_string());

let header = <QueryMessageRequestHeader as FromMap>::from(&map);

assert!(header.is_none());
}

#[test]
fn to_map_includes_all_fields() {
let header = QueryMessageRequestHeader {
topic: "test_topic".to_string(),
key: "test_key".to_string(),
max_num: 10,
begin_timestamp: 1000,
end_timestamp: 2000,
topic_request_header: None,
};

Check warning on line 154 in rocketmq-remoting/src/protocol/header/query_message_request_header.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/header/query_message_request_header.rs#L154

Added line #L154 was not covered by tests

let map = header.to_map().unwrap();

assert_eq!(map.get("topic").unwrap(), "test_topic");
assert_eq!(map.get("key").unwrap(), "test_key");
assert_eq!(map.get("maxNum").unwrap(), "10");
assert_eq!(map.get("beginTimestamp").unwrap(), "1000");
assert_eq!(map.get("endTimestamp").unwrap(), "2000");
}

#[test]
fn to_map_with_topic_request_header_includes_nested_fields() {
let topic_request_header = TopicRequestHeader::default();
let header = QueryMessageRequestHeader {
topic: "test_topic".to_string(),
key: "test_key".to_string(),
max_num: 10,
begin_timestamp: 1000,
end_timestamp: 2000,
topic_request_header: Some(topic_request_header),
};

Check warning on line 175 in rocketmq-remoting/src/protocol/header/query_message_request_header.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/header/query_message_request_header.rs#L175

Added line #L175 was not covered by tests

let map = header.to_map().unwrap();

// Verify that nested fields are included, assuming specific fields in TopicRequestHeader
// This is a placeholder assertion; actual fields and checks depend on TopicRequestHeader's
// structure
assert!(!map.contains_key("nestedField"));
}
}
Loading
Loading