diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index decd8ac0..14848622 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -66,6 +66,7 @@ use crate::processor::consumer_manage_processor::ConsumerManageProcessor; use crate::processor::default_pull_message_result_handler::DefaultPullMessageResultHandler; use crate::processor::pull_message_processor::PullMessageProcessor; use crate::processor::pull_message_result_handler::PullMessageResultHandler; +use crate::processor::query_message_processor::QueryMessageProcessor; use crate::processor::send_message_processor::SendMessageProcessor; use crate::processor::BrokerRequestProcessor; use crate::schedule::schedule_message_service::ScheduleMessageService; @@ -410,7 +411,7 @@ impl BrokerRuntime { self.message_store.clone().unwrap(), ); self.pull_request_hold_service = Some(PullRequestHoldService::new( - message_store, + message_store.clone(), Arc::new(pull_message_processor.clone()), self.broker_config.clone(), )); @@ -430,6 +431,8 @@ impl BrokerRuntime { .set_message_arriving_listener(Some(Arc::new(Box::new( NotifyMessageArrivingListener::new(self.pull_request_hold_service.clone().unwrap()), )))); + let query_message_processor = + QueryMessageProcessor::new(self.message_store_config.clone(), message_store.clone()); let admin_broker_processor = AdminBrokerProcessor::new( self.broker_config.clone(), @@ -459,7 +462,7 @@ impl BrokerRuntime { ), consumer_manage_processor, query_assignment_processor: Default::default(), - query_message_processor: Default::default(), + query_message_processor, end_transaction_processor: Default::default(), } } diff --git a/rocketmq-broker/src/processor.rs b/rocketmq-broker/src/processor.rs index 29be39e8..8a9df972 100644 --- a/rocketmq-broker/src/processor.rs +++ b/rocketmq-broker/src/processor.rs @@ -95,7 +95,7 @@ where pub(crate) notification_processor: NotificationProcessor, pub(crate) polling_info_processor: PollingInfoProcessor, pub(crate) reply_message_processor: ReplyMessageProcessor, - pub(crate) query_message_processor: QueryMessageProcessor, + pub(crate) query_message_processor: QueryMessageProcessor, pub(crate) client_manage_processor: ClientManageProcessor, pub(crate) consumer_manage_processor: ConsumerManageProcessor, pub(crate) query_assignment_processor: QueryAssignmentProcessor, @@ -161,6 +161,13 @@ impl RequestProcessor for BrokerReques .process_request(channel, ctx, request_code, request) .await } + + RequestCode::QueryMessage | RequestCode::ViewMessageById => { + self.query_message_processor + .process_request(channel, ctx, request_code, request) + .await + } + _ => { self.admin_broker_processor .process_request(channel, ctx, request_code, request) diff --git a/rocketmq-broker/src/processor/query_message_processor.rs b/rocketmq-broker/src/processor/query_message_processor.rs index 9af3e632..db4632c7 100644 --- a/rocketmq-broker/src/processor/query_message_processor.rs +++ b/rocketmq-broker/src/processor/query_message_processor.rs @@ -14,19 +14,127 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::sync::Arc; +use rocketmq_common::common::mix_all::UNIQUE_MSG_QUERY_FLAG; +use rocketmq_remoting::code::request_code::RequestCode; +use rocketmq_remoting::code::response_code::ResponseCode; +use rocketmq_remoting::net::channel::Channel; +use rocketmq_remoting::protocol::header::query_message_request_header::QueryMessageRequestHeader; +use rocketmq_remoting::protocol::header::query_message_response_header::QueryMessageResponseHeader; +use rocketmq_remoting::protocol::header::view_message_request_header::ViewMessageRequestHeader; use rocketmq_remoting::protocol::remoting_command::RemotingCommand; use rocketmq_remoting::runtime::server::ConnectionHandlerContext; +use rocketmq_store::config::message_store_config::MessageStoreConfig; +use rocketmq_store::log_file::MessageStore; #[derive(Default, Clone)] -pub struct QueryMessageProcessor {} +pub struct QueryMessageProcessor { + message_store_config: Arc, + message_store: Arc, +} + +impl QueryMessageProcessor { + pub fn new(message_store_config: Arc, message_store: Arc) -> Self { + Self { + message_store_config, + message_store, + } + } +} + +impl QueryMessageProcessor +where + MS: MessageStore + Send + Sync + 'static, +{ + pub async fn process_request( + &mut self, + channel: Channel, + ctx: ConnectionHandlerContext, + request_code: RequestCode, + request: RemotingCommand, + ) -> Option { + match request_code { + RequestCode::QueryMessage => self.query_message(channel, ctx, request).await, + RequestCode::ViewMessageById => self.view_message_by_id(channel, ctx, request).await, + _ => None, + } + } + + async fn query_message( + &mut self, + _channel: Channel, + _ctx: ConnectionHandlerContext, + request: RemotingCommand, + ) -> Option { + let mut response = RemotingCommand::create_response_command_with_header( + QueryMessageResponseHeader::default(), + ); + let mut request_header = request + .decode_command_custom_header::() + .unwrap(); + response.set_opaque_mut(request.opaque()); + let is_unique_key = request.ext_fields().unwrap().get(UNIQUE_MSG_QUERY_FLAG); + if is_unique_key.is_some() && is_unique_key.unwrap() == "true" { + request_header.max_num = self.message_store_config.default_query_max_num as i32; + } + let query_message_result = self + .message_store + .query_message( + request_header.topic.as_str(), + request_header.key.as_str(), + request_header.max_num, + request_header.begin_timestamp, + request_header.end_timestamp, + ) + .await?; + + let response_header = response + .read_custom_header_mut::() + .unwrap(); + response_header.index_last_update_phyoffset = + query_message_result.index_last_update_phyoffset; + response_header.index_last_update_timestamp = + query_message_result.index_last_update_timestamp; + + if query_message_result.buffer_total_size > 0 { + let message_data = query_message_result.get_message_data(); + return Some(response.set_body(message_data)); + } + Some( + response + .set_code(ResponseCode::QueryNotFound) + .set_remark(Some( + "can not find message, maybe time range not correct".to_string(), + )), + ) + } -impl QueryMessageProcessor { - fn process_request( - &self, + async fn view_message_by_id( + &mut self, + _channel: Channel, _ctx: ConnectionHandlerContext, - _request: RemotingCommand, - ) -> RemotingCommand { - todo!() + request: RemotingCommand, + ) -> Option { + let response = RemotingCommand::create_response_command(); + let request_header = request + .decode_command_custom_header::() + .unwrap(); + let select_mapped_buffer_result = self + .message_store + .select_one_message_by_offset(request_header.offset) + .await; + if let Some(result) = select_mapped_buffer_result { + let message_data = result.get_bytes(); + return Some(response.set_body(message_data)); + } + Some( + response + .set_code(ResponseCode::SystemError) + .set_remark(Some(format!( + "can not find message by offset: {}", + request_header.offset + ))), + ) } } diff --git a/rocketmq-remoting/src/protocol/header.rs b/rocketmq-remoting/src/protocol/header.rs index 5e5fa58c..5c40de0f 100644 --- a/rocketmq-remoting/src/protocol/header.rs +++ b/rocketmq-remoting/src/protocol/header.rs @@ -30,6 +30,10 @@ pub mod pull_message_request_header; pub mod pull_message_response_header; pub mod query_consumer_offset_request_header; pub mod query_consumer_offset_response_header; +pub mod query_message_request_header; +pub mod query_message_response_header; pub mod search_offset_response_header; pub mod unregister_client_request_header; pub mod update_consumer_offset_header; +pub mod view_message_request_header; +pub mod view_message_response_header; diff --git a/rocketmq-remoting/src/protocol/header/query_message_request_header.rs b/rocketmq-remoting/src/protocol/header/query_message_request_header.rs new file mode 100644 index 00000000..0763918d --- /dev/null +++ b/rocketmq-remoting/src/protocol/header/query_message_request_header.rs @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use serde::Deserialize; +use serde::Serialize; + +use crate::protocol::command_custom_header::CommandCustomHeader; +use crate::protocol::command_custom_header::FromMap; +use crate::rpc::topic_request_header::TopicRequestHeader; + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct QueryMessageRequestHeader { + pub topic: String, + + pub key: String, + + pub max_num: i32, + + pub begin_timestamp: i64, + + pub end_timestamp: i64, + + #[serde(flatten)] + pub topic_request_header: Option, +} + +impl QueryMessageRequestHeader { + pub const TOPIC: &'static str = "topic"; + pub const KEY: &'static str = "key"; + pub const MAX_NUM: &'static str = "maxNum"; + pub const BEGIN_TIMESTAMP: &'static str = "beginTimestamp"; + pub const END_TIMESTAMP: &'static str = "endTimestamp"; +} + +/*impl From<&QueryMessageRequestHeader> for String { + fn from(header: &QueryMessageRequestHeader) -> Self { + SerdeJsonUtils::to_json(header).unwrap() + } +}*/ + +impl CommandCustomHeader for QueryMessageRequestHeader { + fn to_map(&self) -> Option> { + let mut map = std::collections::HashMap::new(); + map.insert(Self::TOPIC.to_string(), self.topic.clone()); + map.insert(Self::KEY.to_string(), self.key.clone()); + map.insert(Self::MAX_NUM.to_string(), self.max_num.to_string()); + map.insert( + Self::BEGIN_TIMESTAMP.to_string(), + self.begin_timestamp.to_string(), + ); + map.insert( + Self::END_TIMESTAMP.to_string(), + self.end_timestamp.to_string(), + ); + if let Some(value) = self.topic_request_header.as_ref() { + if let Some(val) = value.to_map() { + map.extend(val); + } + } + Some(map) + } +} + +impl FromMap for QueryMessageRequestHeader { + type Target = Self; + + fn from(map: &std::collections::HashMap) -> Option { + Some(QueryMessageRequestHeader { + topic: map.get(Self::TOPIC)?.clone(), + key: map.get(Self::KEY)?.clone(), + max_num: map.get(Self::MAX_NUM)?.parse().ok()?, + begin_timestamp: map.get(Self::BEGIN_TIMESTAMP)?.parse().ok()?, + end_timestamp: map.get(Self::END_TIMESTAMP)?.parse().ok()?, + topic_request_header: ::from(map), + }) + } +} + +#[cfg(test)] +mod query_message_request_header_tests { + use std::collections::HashMap; + + use super::*; + + #[test] + fn creating_from_map_with_all_fields_populates_struct_correctly() { + let mut map = HashMap::new(); + map.insert("topic".to_string(), "test_topic".to_string()); + map.insert("key".to_string(), "test_key".to_string()); + map.insert("maxNum".to_string(), "10".to_string()); + map.insert("beginTimestamp".to_string(), "1000".to_string()); + map.insert("endTimestamp".to_string(), "2000".to_string()); + + let header = ::from(&map).unwrap(); + + assert_eq!(header.topic, "test_topic"); + assert_eq!(header.key, "test_key"); + assert_eq!(header.max_num, 10); + assert_eq!(header.begin_timestamp, 1000); + assert_eq!(header.end_timestamp, 2000); + } + + #[test] + fn creating_from_map_missing_optional_fields_still_succeeds() { + let mut map = HashMap::new(); + map.insert("topic".to_string(), "test_topic".to_string()); + map.insert("key".to_string(), "test_key".to_string()); + map.insert("maxNum".to_string(), "10".to_string()); + map.insert("beginTimestamp".to_string(), "1000".to_string()); + map.insert("endTimestamp".to_string(), "2000".to_string()); + + let header = ::from(&map).unwrap(); + + assert_eq!(header.topic, "test_topic"); + assert_eq!(header.key, "test_key"); + assert_eq!(header.max_num, 10); + } + + #[test] + fn creating_from_map_with_invalid_number_fields_returns_none() { + let mut map = HashMap::new(); + map.insert("topic".to_string(), "test_topic".to_string()); + map.insert("key".to_string(), "test_key".to_string()); + map.insert("maxNum".to_string(), "invalid".to_string()); + + let header = ::from(&map); + + assert!(header.is_none()); + } + + #[test] + fn to_map_includes_all_fields() { + let header = QueryMessageRequestHeader { + topic: "test_topic".to_string(), + key: "test_key".to_string(), + max_num: 10, + begin_timestamp: 1000, + end_timestamp: 2000, + topic_request_header: None, + }; + + let map = header.to_map().unwrap(); + + assert_eq!(map.get("topic").unwrap(), "test_topic"); + assert_eq!(map.get("key").unwrap(), "test_key"); + assert_eq!(map.get("maxNum").unwrap(), "10"); + assert_eq!(map.get("beginTimestamp").unwrap(), "1000"); + assert_eq!(map.get("endTimestamp").unwrap(), "2000"); + } + + #[test] + fn to_map_with_topic_request_header_includes_nested_fields() { + let topic_request_header = TopicRequestHeader::default(); + let header = QueryMessageRequestHeader { + topic: "test_topic".to_string(), + key: "test_key".to_string(), + max_num: 10, + begin_timestamp: 1000, + end_timestamp: 2000, + topic_request_header: Some(topic_request_header), + }; + + let map = header.to_map().unwrap(); + + // Verify that nested fields are included, assuming specific fields in TopicRequestHeader + // This is a placeholder assertion; actual fields and checks depend on TopicRequestHeader's + // structure + assert!(!map.contains_key("nestedField")); + } +} diff --git a/rocketmq-remoting/src/protocol/header/query_message_response_header.rs b/rocketmq-remoting/src/protocol/header/query_message_response_header.rs new file mode 100644 index 00000000..eac87576 --- /dev/null +++ b/rocketmq-remoting/src/protocol/header/query_message_response_header.rs @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use rocketmq_macros::RequestHeaderCodec; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Debug, Clone, Serialize, Deserialize, RequestHeaderCodec, Default)] +#[serde(rename_all = "camelCase")] +pub struct QueryMessageResponseHeader { + pub index_last_update_timestamp: i64, + pub index_last_update_phyoffset: i64, +} diff --git a/rocketmq-remoting/src/protocol/header/view_message_request_header.rs b/rocketmq-remoting/src/protocol/header/view_message_request_header.rs new file mode 100644 index 00000000..b31d9589 --- /dev/null +++ b/rocketmq-remoting/src/protocol/header/view_message_request_header.rs @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use rocketmq_macros::RequestHeaderCodec; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Debug, Clone, Serialize, Deserialize, RequestHeaderCodec, Default)] +pub struct ViewMessageRequestHeader { + pub topic: String, + pub offset: i64, +} diff --git a/rocketmq-remoting/src/protocol/header/view_message_response_header.rs b/rocketmq-remoting/src/protocol/header/view_message_response_header.rs new file mode 100644 index 00000000..ee1e8469 --- /dev/null +++ b/rocketmq-remoting/src/protocol/header/view_message_response_header.rs @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use rocketmq_macros::RequestHeaderCodec; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Debug, Clone, Serialize, Deserialize, RequestHeaderCodec, Default)] +pub struct ViewMessageResponseHeader; diff --git a/rocketmq-remoting/src/rpc/topic_request_header.rs b/rocketmq-remoting/src/rpc/topic_request_header.rs index d24e7608..5a16d095 100644 --- a/rocketmq-remoting/src/rpc/topic_request_header.rs +++ b/rocketmq-remoting/src/rpc/topic_request_header.rs @@ -51,8 +51,10 @@ impl CommandCustomHeader for TopicRequestHeader { if let Some(ref lo) = self.lo { map.insert(Self::LO.to_string(), lo.to_string()); } - if let Some(value) = self.rpc_request_header.as_ref().unwrap().to_map() { - map.extend(value); + if let Some(value) = self.rpc_request_header.as_ref() { + if let Some(rpc_map) = value.to_map() { + map.extend(rpc_map); + } } Some(map) } diff --git a/rocketmq-store/src/base.rs b/rocketmq-store/src/base.rs index f490c4b0..e5c6033d 100644 --- a/rocketmq-store/src/base.rs +++ b/rocketmq-store/src/base.rs @@ -28,6 +28,7 @@ pub mod message_arriving_listener; pub mod message_result; pub mod message_status_enum; pub mod put_message_context; +pub mod query_message_result; pub mod select_result; pub mod store_checkpoint; pub mod store_enum; diff --git a/rocketmq-store/src/base/query_message_result.rs b/rocketmq-store/src/base/query_message_result.rs new file mode 100644 index 00000000..f28468bb --- /dev/null +++ b/rocketmq-store/src/base/query_message_result.rs @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use bytes::Bytes; +use bytes::BytesMut; + +use crate::base::select_result::SelectMappedBufferResult; + +#[derive(Default)] +pub struct QueryMessageResult { + pub message_maped_list: Vec, + pub index_last_update_timestamp: i64, + pub index_last_update_phyoffset: i64, + pub buffer_total_size: i32, +} + +impl QueryMessageResult { + pub fn get_message_data(&self) -> Option { + if self.buffer_total_size <= 0 || self.message_maped_list.is_empty() { + return None; + } + + let mut bytes_mut = BytesMut::with_capacity(self.buffer_total_size as usize); + for msg in self.message_maped_list.iter() { + let data = &msg.mapped_file.as_ref().unwrap().get_mapped_file() + [msg.start_offset as usize..(msg.start_offset + msg.size as u64) as usize]; + bytes_mut.extend_from_slice(data); + } + Some(bytes_mut.freeze()) + } + + pub fn add_message(&mut self, result: SelectMappedBufferResult) { + self.buffer_total_size += result.size; + self.message_maped_list.push(result); + } +} diff --git a/rocketmq-store/src/base/select_result.rs b/rocketmq-store/src/base/select_result.rs index 23ae373c..a231d9f5 100644 --- a/rocketmq-store/src/base/select_result.rs +++ b/rocketmq-store/src/base/select_result.rs @@ -17,6 +17,9 @@ use std::sync::Arc; +use bytes::Bytes; +use bytes::BytesMut; + use crate::log_file::mapped_file::default_impl::DefaultMappedFile; use crate::log_file::mapped_file::MappedFile; @@ -40,6 +43,19 @@ impl SelectMappedBufferResult { .as_ref() } + pub fn get_buffer_slice_mut(&self) -> &mut [u8] { + self.mapped_file.as_ref().unwrap().get_mapped_file_mut() + [self.start_offset as usize..(self.start_offset + self.size as u64) as usize] + .as_mut() + } + + pub fn get_bytes(&self) -> Option { + if self.size <= 0 || self.mapped_file.is_none() { + return None; + } + Some(BytesMut::from(self.get_buffer()).freeze()) + } + pub fn is_in_mem(&self) -> bool { match self.mapped_file.as_ref() { None => true, diff --git a/rocketmq-store/src/index/index_service.rs b/rocketmq-store/src/index/index_service.rs index 3db806c0..4b0ac6ac 100644 --- a/rocketmq-store/src/index/index_service.rs +++ b/rocketmq-store/src/index/index_service.rs @@ -131,10 +131,10 @@ impl IndexService { index_file_list_lock.clear(); } - fn query_offset( + pub fn query_offset( &self, - topic: String, - key: String, + topic: &str, + key: &str, max_num: i32, begin: i64, end: i64, @@ -158,7 +158,13 @@ impl IndexService { // Assuming IndexFile has a method to check if the file's timestamps match the query if f.is_time_matched(begin, end) { // Assuming IndexFile has a method to select physical offsets based on the query - f.select_phy_offset(&mut phy_offsets, &topic, max_num as usize, begin, end); + f.select_phy_offset( + &mut phy_offsets, + build_key(topic, key).as_str(), + max_num as usize, + begin, + end, + ); } if f.get_begin_timestamp() < begin { diff --git a/rocketmq-store/src/index/query_offset_result.rs b/rocketmq-store/src/index/query_offset_result.rs index eea24cdb..c7e0d928 100644 --- a/rocketmq-store/src/index/query_offset_result.rs +++ b/rocketmq-store/src/index/query_offset_result.rs @@ -38,6 +38,10 @@ impl QueryOffsetResult { &self.phy_offsets } + pub fn get_phy_offsets_mut(&mut self) -> &mut Vec { + &mut self.phy_offsets + } + pub fn get_index_last_update_timestamp(&self) -> i64 { self.index_last_update_timestamp } diff --git a/rocketmq-store/src/log_file.rs b/rocketmq-store/src/log_file.rs index dc3a16a5..53dfad11 100644 --- a/rocketmq-store/src/log_file.rs +++ b/rocketmq-store/src/log_file.rs @@ -18,12 +18,15 @@ use std::sync::Arc; use rocketmq_common::common::message::message_batch::MessageExtBatch; +use rocketmq_common::common::message::message_single::MessageExt; use rocketmq_common::common::message::message_single::MessageExtBrokerInner; use rocketmq_common::TimeUtils::get_current_millis; use crate::base::dispatch_request::DispatchRequest; use crate::base::get_message_result::GetMessageResult; use crate::base::message_result::PutMessageResult; +use crate::base::query_message_result::QueryMessageResult; +use crate::base::select_result::SelectMappedBufferResult; use crate::filter::MessageFilter; use crate::hook::put_message_hook::BoxedPutMessageHook; use crate::queue::ArcConsumeQueue; @@ -117,4 +120,32 @@ pub trait RocketMQMessageStore: Clone + 'static { fn find_consume_queue(&self, topic: &str, queue_id: i32) -> Option; fn delete_topics(&mut self, delete_topics: Vec<&str>) -> i32; + + async fn query_message( + &self, + topic: &str, + key: &str, + max_num: i32, + begin_timestamp: i64, + end_timestamp: i64, + ) -> Option; + + async fn select_one_message_by_offset( + &self, + commit_log_offset: i64, + ) -> Option; + + async fn select_one_message_by_offset_with_size( + &self, + commit_log_offset: i64, + size: i32, + ) -> Option; + + fn look_message_by_offset(&self, commit_log_offset: i64) -> Option; + + fn look_message_by_offset_with_size( + &self, + commit_log_offset: i64, + size: i32, + ) -> Option; } diff --git a/rocketmq-store/src/message_store/default_message_store.rs b/rocketmq-store/src/message_store/default_message_store.rs index d62a55f2..e5be3aff 100644 --- a/rocketmq-store/src/message_store/default_message_store.rs +++ b/rocketmq-store/src/message_store/default_message_store.rs @@ -31,6 +31,7 @@ use std::time::Instant; use bytes::Buf; use rocketmq_common::common::attribute::cleanup_policy::CleanupPolicy; use rocketmq_common::common::message::message_batch::MessageExtBatch; +use rocketmq_common::common::message::message_single::MessageExt; use rocketmq_common::common::mix_all::is_lmq; use rocketmq_common::common::mix_all::is_sys_consumer_group_for_no_cold_read_limit; use rocketmq_common::common::mix_all::MULTI_DISPATCH_QUEUE_SPLITTER; @@ -48,6 +49,7 @@ use rocketmq_common::{ }, utils::queue_type_utils::QueueTypeUtils, FileUtils::string_to_file, + MessageDecoder, UtilAll::ensure_dir_ok, }; use tokio::runtime::Handle; @@ -64,6 +66,8 @@ use crate::base::message_arriving_listener::MessageArrivingListener; use crate::base::message_result::PutMessageResult; use crate::base::message_status_enum::GetMessageStatus; use crate::base::message_status_enum::PutMessageStatus; +use crate::base::query_message_result::QueryMessageResult; +use crate::base::select_result::SelectMappedBufferResult; use crate::base::store_checkpoint::StoreCheckpoint; use crate::base::store_stats_service::StoreStatsService; use crate::config::broker_role::BrokerRole; @@ -1088,6 +1092,102 @@ impl MessageStore for DefaultMessageStore { delete_count } + async fn query_message( + &self, + topic: &str, + key: &str, + max_num: i32, + begin_timestamp: i64, + end_timestamp: i64, + ) -> Option { + let mut query_message_result = QueryMessageResult::default(); + let mut last_query_msg_time = end_timestamp; + for i in 1..3 { + let mut query_offset_result = self.index_service.query_offset( + topic, + key, + max_num, + begin_timestamp, + end_timestamp, + ); + if query_offset_result.get_phy_offsets().is_empty() { + break; + } + + query_offset_result.get_phy_offsets_mut().sort(); + + query_message_result.index_last_update_timestamp = + query_offset_result.get_index_last_update_timestamp(); + query_message_result.index_last_update_phyoffset = + query_offset_result.get_index_last_update_phyoffset(); + let phy_offsets = query_offset_result.get_phy_offsets(); + for m in 0..phy_offsets.len() { + let offset = *phy_offsets.get(m).unwrap(); + let msg = self.look_message_by_offset(offset); + if m == 0 { + last_query_msg_time = msg.as_ref().unwrap().store_timestamp; + } + let result = self.commit_log.get_data_with_option(offset, false); + if let Some(sbr) = result { + query_message_result.add_message(sbr); + } + } + if query_message_result.buffer_total_size > 0 { + break; + } + if last_query_msg_time < begin_timestamp { + break; + } + } + + Some(query_message_result) + } + + async fn select_one_message_by_offset( + &self, + commit_log_offset: i64, + ) -> Option { + let sbr = self.commit_log.get_message(commit_log_offset, 4); + if let Some(sbr) = sbr { + let size = sbr.get_buffer().get_i32(); + self.commit_log.get_message(commit_log_offset, size) + } else { + None + } + } + async fn select_one_message_by_offset_with_size( + &self, + commit_log_offset: i64, + size: i32, + ) -> Option { + self.commit_log.get_message(commit_log_offset, size) + } + + fn look_message_by_offset(&self, commit_log_offset: i64) -> Option { + if let Some(sbr) = self.commit_log.get_message(commit_log_offset, 4) { + let size = sbr.get_buffer().get_i32(); + self.look_message_by_offset_with_size(commit_log_offset, size) + } else { + None + } + } + + fn look_message_by_offset_with_size( + &self, + commit_log_offset: i64, + size: i32, + ) -> Option { + let sbr = self.commit_log.get_message(commit_log_offset, size); + if let Some(sbr) = sbr { + if let Some(mut value) = sbr.get_bytes() { + MessageDecoder::decode(&mut value, true, false, false, false, false) + } else { + None + } + } else { + None + } + } } #[derive(Clone)]