diff --git a/rocketmq-client/examples/quickstart/consumer.rs b/rocketmq-client/examples/quickstart/consumer.rs index 2b284bd9..8f258b5f 100644 --- a/rocketmq-client/examples/quickstart/consumer.rs +++ b/rocketmq-client/examples/quickstart/consumer.rs @@ -27,7 +27,7 @@ pub const MESSAGE_COUNT: usize = 1; pub const CONSUMER_GROUP: &str = "please_rename_unique_group_name_4"; pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876"; pub const TOPIC: &str = "TopicTest"; -pub const TAG: &str = "TagA"; +pub const TAG: &str = "*"; #[rocketmq::main] pub async fn main() -> Result<()> { @@ -53,8 +53,8 @@ pub struct MyMessageListener; impl MessageListenerConcurrently for MyMessageListener { fn consume_message( &self, - msgs: Vec, - _context: ConsumeConcurrentlyContext, + msgs: &[&MessageExt], + _context: &ConsumeConcurrentlyContext, ) -> Result { for msg in msgs { println!("Receive message: {:?}", msg); diff --git a/rocketmq-client/src/base/client_config.rs b/rocketmq-client/src/base/client_config.rs index d39ac1ba..667d7ca6 100644 --- a/rocketmq-client/src/base/client_config.rs +++ b/rocketmq-client/src/base/client_config.rs @@ -142,15 +142,18 @@ impl ClientConfig { ) } - pub fn queue_with_namespace(&mut self, queue: &mut MessageQueue) { + pub fn queue_with_namespace(&mut self, queue: MessageQueue) -> MessageQueue { if let Some(namespace) = self.get_namespace() { if !namespace.is_empty() { - queue.set_topic(NamespaceUtil::wrap_namespace( + let mut message_queue = queue.clone(); + message_queue.set_topic(NamespaceUtil::wrap_namespace( namespace.as_str(), queue.get_topic(), )); + return message_queue; } } + queue } pub fn get_namespace(&mut self) -> Option { diff --git a/rocketmq-client/src/consumer.rs b/rocketmq-client/src/consumer.rs index 03c78d82..aa1e64c2 100644 --- a/rocketmq-client/src/consumer.rs +++ b/rocketmq-client/src/consumer.rs @@ -24,8 +24,8 @@ pub mod message_selector; pub mod mq_consumer; pub(crate) mod mq_consumer_inner; pub mod mq_push_consumer; -mod pull_callback; -mod pull_result; -mod pull_status; +pub(crate) mod pull_callback; +pub(crate) mod pull_result; +pub(crate) mod pull_status; pub mod rebalance_strategy; -mod store; +pub(crate) mod store; diff --git a/rocketmq-client/src/consumer/consumer_impl.rs b/rocketmq-client/src/consumer/consumer_impl.rs index 71051bec..14b43a33 100644 --- a/rocketmq-client/src/consumer/consumer_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl.rs @@ -29,6 +29,7 @@ pub(crate) mod process_queue; pub(crate) mod pull_api_wrapper; pub(crate) mod pull_message_service; pub(crate) mod pull_request; +pub(crate) mod pull_request_ext; pub(crate) mod re_balance; pub(crate) static PULL_MAX_IDLE_TIME: Lazy = Lazy::new(|| { diff --git a/rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs b/rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs index f6d88b09..f541b2de 100644 --- a/rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs +++ b/rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs @@ -17,12 +17,22 @@ use std::sync::Arc; use std::time::Duration; +use std::time::Instant; +use rocketmq_common::common::message::message_client_ext::MessageClientExt; use rocketmq_common::common::message::message_ext::MessageExt; use rocketmq_common::common::message::message_queue::MessageQueue; +use rocketmq_common::common::message::MessageTrait; +use rocketmq_common::common::mix_all; use rocketmq_common::ArcRefCellWrapper; +use rocketmq_common::MessageAccessor::MessageAccessor; +use rocketmq_common::TimeUtils::get_current_millis; use rocketmq_common::WeakCellWrapper; use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult; +use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel; +use rocketmq_runtime::RocketMQRuntime; +use tracing::info; +use tracing::warn; use crate::base::client_config::ClientConfig; use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageServiceTrait; @@ -30,7 +40,11 @@ use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPush use crate::consumer::consumer_impl::pop_process_queue::PopProcessQueue; use crate::consumer::consumer_impl::process_queue::ProcessQueue; use crate::consumer::default_mq_push_consumer::ConsumerConfig; +use crate::consumer::listener::consume_concurrently_context::ConsumeConcurrentlyContext; +use crate::consumer::listener::consume_concurrently_status::ConsumeConcurrentlyStatus; +use crate::consumer::listener::consume_return_type::ConsumeReturnType; use crate::consumer::listener::message_listener_concurrently::ArcBoxMessageListenerConcurrently; +use crate::hook::consume_message_context::ConsumeMessageContext; #[derive(Clone)] pub struct ConsumeMessageConcurrentlyService { @@ -39,6 +53,7 @@ pub struct ConsumeMessageConcurrentlyService { pub(crate) consumer_config: ArcRefCellWrapper, pub(crate) consumer_group: Arc, pub(crate) message_listener: ArcBoxMessageListenerConcurrently, + pub(crate) consume_runtime: Arc, } impl ConsumeMessageConcurrentlyService { @@ -47,13 +62,19 @@ impl ConsumeMessageConcurrentlyService { consumer_config: ArcRefCellWrapper, consumer_group: String, message_listener: ArcBoxMessageListenerConcurrently, + default_mqpush_consumer_impl: Option>, ) -> Self { + let consume_thread = consumer_config.consume_thread_max; Self { - default_mqpush_consumer_impl: None, + default_mqpush_consumer_impl, client_config, consumer_config, consumer_group: Arc::new(consumer_group), message_listener, + consume_runtime: Arc::new(RocketMQRuntime::new_multi( + consume_thread as usize, + "ConsumeMessageThread_", + )), } } } @@ -62,6 +83,138 @@ impl ConsumeMessageConcurrentlyService { async fn clean_expire_msg(&mut self) { println!("===========================") } + + async fn process_consume_result( + &mut self, + status: ConsumeConcurrentlyStatus, + context: &ConsumeConcurrentlyContext, + consume_request: &mut ConsumeRequest, + ) { + if consume_request.msgs.is_empty() { + return; + } + let mut ack_index = context.ack_index; + match status { + ConsumeConcurrentlyStatus::ConsumeSuccess => { + if ack_index >= consume_request.msgs.len() as i32 { + ack_index = consume_request.msgs.len() as i32 - 1; + } + } + ConsumeConcurrentlyStatus::ReconsumeLater => { + ack_index = -1; + } + } + + match self.consumer_config.message_model { + MessageModel::Broadcasting => { + for i in ((ack_index + 1) as usize)..consume_request.msgs.len() { + warn!("BROADCASTING, the message consume failed, drop it"); + } + } + MessageModel::Clustering => { + let len = consume_request.msgs.len(); + let mut msg_back_failed = Vec::with_capacity(len); + let mut msg_back_success = Vec::with_capacity(len); + let failed_msgs = consume_request.msgs.split_off((ack_index + 1) as usize); + for mut msg in failed_msgs { + if !consume_request + .process_queue + .contains_message(&msg.message_ext_inner) + { + /*info!("Message is not found in its process queue; skip send-back-procedure, topic={}, " + + "brokerName={}, queueId={}, queueOffset={}", msg.get_topic(), msg.get_broker_name(), + msg.getQueueId(), msg.getQueueOffset());*/ + continue; + } + + let result = self + .send_message_back(&mut msg.message_ext_inner, context) + .await; + if !result { + let reconsume_times = msg.message_ext_inner.reconsume_times() + 1; + msg.message_ext_inner.set_reconsume_times(reconsume_times); + msg_back_failed.push(msg); + } else { + msg_back_success.push(msg); + } + } + if !msg_back_failed.is_empty() { + consume_request.msgs.append(&mut msg_back_success); + /* let msg_back_failed_switched = msg_back_failed + .into_iter() + .map(|msg| MessageClientExt { + message_ext_inner: msg, + }) + .collect();*/ + self.submit_consume_request_later( + msg_back_failed, + consume_request.process_queue.clone(), + consume_request.message_queue.clone(), + ); + } + } + } + let offset = consume_request + .process_queue + .remove_message(&consume_request.msgs) + .await; + if offset >= 0 && !consume_request.process_queue.is_dropped() { + if let Some(mut default_mqpush_consumer_impl) = self + .default_mqpush_consumer_impl + .as_ref() + .unwrap() + .upgrade() + { + default_mqpush_consumer_impl + .offset_store + .as_mut() + .unwrap() + .update_offset(&consume_request.message_queue, offset, true) + .await; + } + } + } + + fn submit_consume_request_later( + &self, + msgs: Vec>, + process_queue: Arc, + message_queue: MessageQueue, + ) { + let this = self.clone(); + self.consume_runtime.get_handle().spawn(async move { + tokio::time::sleep(Duration::from_secs(5)).await; + this.submit_consume_request(msgs, process_queue, message_queue, true) + .await; + }); + } + + pub async fn send_message_back( + &mut self, + msg: &mut MessageExt, + context: &ConsumeConcurrentlyContext, + ) -> bool { + let delay_level = context.delay_level_when_next_consume; + msg.set_topic(self.client_config.with_namespace(msg.get_topic()).as_str()); + match self + .default_mqpush_consumer_impl + .as_ref() + .unwrap() + .upgrade() + { + None => false, + Some(mut default_mqpush_consumer_impl) => default_mqpush_consumer_impl + .send_message_back( + msg, + delay_level, + &self + .client_config + .queue_with_namespace(context.get_message_queue().clone()), + ) + .await + .is_ok(), + } + } } impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService { @@ -108,12 +261,56 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService { async fn submit_consume_request( &self, - msgs: Vec, - process_queue: &ProcessQueue, - message_queue: &MessageQueue, + mut msgs: Vec>, + process_queue: Arc, + message_queue: MessageQueue, dispatch_to_consume: bool, ) { - todo!() + let consume_batch_size = self.consumer_config.consume_message_batch_max_size; + if msgs.len() < consume_batch_size as usize { + let mut consume_request = ConsumeRequest { + msgs: msgs.clone(), + message_listener: self.message_listener.clone(), + process_queue, + message_queue, + dispatch_to_consume, + consumer_group: self.consumer_group.as_ref().clone(), + default_mqpush_consumer_impl: self.default_mqpush_consumer_impl.clone(), + }; + let consume_message_concurrently_service = self.clone(); + self.consume_runtime.get_handle().spawn(async move { + consume_request + .run(consume_message_concurrently_service) + .await + }); + } else { + loop { + let item = if msgs.len() > consume_batch_size as usize { + msgs.split_off(consume_batch_size as usize) + } else { + msgs.split_off(msgs.len()) + }; + + let mut consume_request = ConsumeRequest { + msgs: item, + message_listener: self.message_listener.clone(), + process_queue: process_queue.clone(), + message_queue: message_queue.clone(), + dispatch_to_consume, + consumer_group: self.consumer_group.as_ref().clone(), + default_mqpush_consumer_impl: self.default_mqpush_consumer_impl.clone(), + }; + let consume_message_concurrently_service = self.clone(); + self.consume_runtime.get_handle().spawn(async move { + consume_request + .run(consume_message_concurrently_service) + .await + }); + if msgs.is_empty() { + break; + } + } + } } async fn submit_pop_consume_request( @@ -125,3 +322,144 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService { todo!() } } + +struct ConsumeRequest { + msgs: Vec>, + message_listener: ArcBoxMessageListenerConcurrently, + process_queue: Arc, + message_queue: MessageQueue, + dispatch_to_consume: bool, + consumer_group: String, + default_mqpush_consumer_impl: Option>, +} + +impl ConsumeRequest { + async fn run( + &mut self, + mut consume_message_concurrently_service: ConsumeMessageConcurrentlyService, + ) { + if self.process_queue.is_dropped() { + info!( + "the message queue not be able to consume, because it's dropped. group={} {}", + self.consumer_group, self.message_queue, + ); + return; + } + let context = ConsumeConcurrentlyContext { + message_queue: self.message_queue.clone(), + delay_level_when_next_consume: 0, + ack_index: i32::MAX, + }; + + let default_mqpush_consumer_impl = self + .default_mqpush_consumer_impl + .as_ref() + .unwrap() + .upgrade(); + if default_mqpush_consumer_impl.is_none() { + return; + } + let mut default_mqpush_consumer_impl = default_mqpush_consumer_impl.unwrap(); + let consumer_group = self.consumer_group.clone(); + DefaultMQPushConsumerImpl::try_reset_pop_retry_topic( + &mut self.msgs, + consumer_group.as_str(), + ); + default_mqpush_consumer_impl + .reset_retry_and_namespace(&mut self.msgs, consumer_group.as_str()); + + let mut consume_message_context = None; + + let begin_timestamp = Instant::now(); + let mut has_exception = false; + let mut return_type = ConsumeReturnType::Success; + let mut status = None; + + if !self.msgs.is_empty() { + for msg in self.msgs.iter_mut() { + MessageAccessor::set_consume_start_time_stamp( + &mut msg.message_ext_inner, + get_current_millis().to_string().as_str(), + ); + } + if default_mqpush_consumer_impl.has_hook() { + let queue = self.message_queue.clone(); + consume_message_context = Some(ConsumeMessageContext { + consumer_group, + msg_list: &self.msgs, + mq: Some(queue), + success: false, + status: "".to_string(), + mq_trace_context: None, + props: Default::default(), + namespace: default_mqpush_consumer_impl + .client_config + .get_namespace() + .unwrap_or("".to_string()), + access_channel: Default::default(), + }); + default_mqpush_consumer_impl.execute_hook_before(&mut consume_message_context); + } + let vec = self + .msgs + .iter() + .map(|msg| &msg.message_ext_inner) + .collect::>(); + match self.message_listener.consume_message(&vec, &context) { + Ok(value) => { + status = Some(value); + } + Err(_) => { + has_exception = true; + } + } + } + + let consume_rt = begin_timestamp.elapsed().as_millis() as u64; + if status.is_none() { + if has_exception { + return_type = ConsumeReturnType::Exception; + } else { + return_type = ConsumeReturnType::ReturnNull; + } + } else if consume_rt + > default_mqpush_consumer_impl.consumer_config.consume_timeout * 60 * 1000 + { + return_type = ConsumeReturnType::TimeOut; + } else if status.unwrap() == ConsumeConcurrentlyStatus::ReconsumeLater { + return_type = ConsumeReturnType::Failed; + } else if status.unwrap() == ConsumeConcurrentlyStatus::ConsumeSuccess { + return_type = ConsumeReturnType::Success; + } + + if default_mqpush_consumer_impl.has_hook() { + consume_message_context.as_mut().unwrap().props.insert( + mix_all::CONSUME_CONTEXT_TYPE.to_string(), + return_type.to_string(), + ); + } + + if status.is_none() { + status = Some(ConsumeConcurrentlyStatus::ReconsumeLater); + } + + if default_mqpush_consumer_impl.has_hook() { + let cmc = consume_message_context.as_mut().unwrap(); + cmc.status = status.unwrap().to_string(); + cmc.success = status.unwrap() == ConsumeConcurrentlyStatus::ConsumeSuccess; + cmc.access_channel = Some(default_mqpush_consumer_impl.client_config.access_channel); + default_mqpush_consumer_impl.execute_hook_after(&mut consume_message_context); + } + + if self.process_queue.is_dropped() { + warn!( + "the message queue not be able to consume, because it's dropped. group={} {}", + self.consumer_group, self.message_queue, + ); + } else { + consume_message_concurrently_service + .process_consume_result(status.unwrap(), &context, self) + .await; + } + } +} diff --git a/rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs b/rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs index ecbee4c4..feb8c9e6 100644 --- a/rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs +++ b/rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs @@ -14,8 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::sync::Arc; + +use rocketmq_common::common::message::message_client_ext::MessageClientExt; use rocketmq_common::common::message::message_ext::MessageExt; use rocketmq_common::common::message::message_queue::MessageQueue; +use rocketmq_common::ArcRefCellWrapper; use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult; use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageServiceTrait; @@ -59,9 +63,9 @@ impl ConsumeMessageServiceTrait for ConsumeMessageOrderlyService { async fn submit_consume_request( &self, - msgs: Vec, - process_queue: &ProcessQueue, - message_queue: &MessageQueue, + msgs: Vec>, + process_queue: Arc, + message_queue: MessageQueue, dispatch_to_consume: bool, ) { todo!() diff --git a/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs b/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs index 157be9a3..f11f10c4 100644 --- a/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs +++ b/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs @@ -16,6 +16,7 @@ */ use std::sync::Arc; +use rocketmq_common::common::message::message_client_ext::MessageClientExt; use rocketmq_common::common::message::message_ext::MessageExt; use rocketmq_common::common::message::message_queue::MessageQueue; use rocketmq_common::ArcRefCellWrapper; @@ -90,9 +91,9 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService { async fn submit_consume_request( &self, - msgs: Vec, - process_queue: &ProcessQueue, - message_queue: &MessageQueue, + msgs: Vec>, + process_queue: Arc, + message_queue: MessageQueue, dispatch_to_consume: bool, ) { todo!() diff --git a/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs b/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs index cbe88ef9..68a90486 100644 --- a/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs +++ b/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs @@ -14,8 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::sync::Arc; + +use rocketmq_common::common::message::message_client_ext::MessageClientExt; use rocketmq_common::common::message::message_ext::MessageExt; use rocketmq_common::common::message::message_queue::MessageQueue; +use rocketmq_common::ArcRefCellWrapper; use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult; use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageServiceTrait; @@ -59,9 +63,9 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopOrderlyService { async fn submit_consume_request( &self, - msgs: Vec, - process_queue: &ProcessQueue, - message_queue: &MessageQueue, + msgs: Vec>, + process_queue: Arc, + message_queue: MessageQueue, dispatch_to_consume: bool, ) { todo!() diff --git a/rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs b/rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs index 7fd5181b..1d124735 100644 --- a/rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs +++ b/rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs @@ -14,8 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::sync::Arc; + +use rocketmq_common::common::message::message_client_ext::MessageClientExt; use rocketmq_common::common::message::message_ext::MessageExt; use rocketmq_common::common::message::message_queue::MessageQueue; +use rocketmq_common::ArcRefCellWrapper; use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult; use crate::consumer::consumer_impl::pop_process_queue::PopProcessQueue; @@ -59,9 +63,9 @@ pub trait ConsumeMessageServiceTrait { async fn submit_consume_request( &self, - msgs: Vec, - process_queue: &ProcessQueue, - message_queue: &MessageQueue, + msgs: Vec>, + process_queue: Arc, + message_queue: MessageQueue, dispatch_to_consume: bool, ); diff --git a/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs b/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs index b412cb5c..e00ae46d 100644 --- a/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs @@ -20,23 +20,36 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; use std::thread; +use std::time::Instant; use rocketmq_common::common::base::service_state::ServiceState; use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere; +use rocketmq_common::common::key_builder::KeyBuilder; +use rocketmq_common::common::message::message_client_ext::MessageClientExt; +use rocketmq_common::common::message::message_ext::MessageExt; use rocketmq_common::common::message::message_queue::MessageQueue; +use rocketmq_common::common::message::message_single::Message; +use rocketmq_common::common::message::MessageConst; +use rocketmq_common::common::message::MessageTrait; use rocketmq_common::common::mix_all; use rocketmq_common::common::mix_all::DEFAULT_CONSUMER_GROUP; +use rocketmq_common::common::sys_flag::pull_sys_flag::PullSysFlag; use rocketmq_common::common::FAQUrl; use rocketmq_common::ArcRefCellWrapper; +use rocketmq_common::MessageAccessor::MessageAccessor; +use rocketmq_common::TimeUtils::get_current_millis; use rocketmq_common::WeakCellWrapper; use rocketmq_remoting::protocol::body::consumer_running_info::ConsumerRunningInfo; use rocketmq_remoting::protocol::filter::filter_api::FilterAPI; use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType; use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel; use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData; +use rocketmq_remoting::protocol::namespace_util::NamespaceUtil; use rocketmq_remoting::runtime::RPCHook; use tokio::runtime::Handle; +use tracing::error; use tracing::info; +use tracing::warn; use crate::base::client_config::ClientConfig; use crate::base::validators::Validators; @@ -55,18 +68,23 @@ use crate::consumer::consumer_impl::re_balance::Rebalance; use crate::consumer::default_mq_push_consumer::ConsumerConfig; use crate::consumer::listener::message_listener::MessageListener; use crate::consumer::mq_consumer_inner::MQConsumerInner; +use crate::consumer::pull_callback::DefaultPullCallback; use crate::consumer::store::local_file_offset_store::LocalFileOffsetStore; use crate::consumer::store::offset_store::OffsetStore; +use crate::consumer::store::read_offset_type::ReadOffsetType; use crate::consumer::store::remote_broker_offset_store::RemoteBrokerOffsetStore; use crate::error::MQClientError; use crate::factory::mq_client_instance::MQClientInstance; +use crate::hook::consume_message_context::ConsumeMessageContext; use crate::hook::consume_message_hook::ConsumeMessageHook; use crate::hook::filter_message_hook::FilterMessageHook; +use crate::implementation::communication_mode::CommunicationMode; use crate::implementation::mq_client_manager::MQClientManager; +use crate::producer::mq_producer::MQProducer; use crate::Result; const PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL: u64 = 50; -const PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL: u64 = 20; +pub(crate) const PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL: u64 = 20; const PULL_TIME_DELAY_MILLS_WHEN_SUSPEND: u64 = 1000; const BROKER_SUSPEND_MAX_TIME_MILLIS: u64 = 1000 * 15; const CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND: u64 = 1000 * 30; @@ -74,22 +92,25 @@ const MAX_POP_INVISIBLE_TIME: u64 = 300000; const MIN_POP_INVISIBLE_TIME: u64 = 5000; const ASYNC_TIMEOUT: u64 = 3000; const DO_NOT_UPDATE_TOPIC_SUBSCRIBE_INFO_WHEN_SUBSCRIPTION_CHANGED: bool = false; +const _1MB: u64 = 1024 * 1024; #[derive(Clone)] pub struct DefaultMQPushConsumerImpl { - client_config: ArcRefCellWrapper, - consumer_config: ArcRefCellWrapper, - rebalance_impl: ArcRefCellWrapper, + pub(crate) pull_time_delay_mills_when_exception: u64, + pub(crate) client_config: ArcRefCellWrapper, + pub(crate) consumer_config: ArcRefCellWrapper, + pub(crate) rebalance_impl: ArcRefCellWrapper, filter_message_hook_list: Vec>>, + consume_message_hook_list: Vec>>, rpc_hook: Option>>, - service_state: ServiceState, + service_state: ArcRefCellWrapper, client_instance: Option>>, - pull_api_wrapper: Option>, + pub(crate) pull_api_wrapper: Option>, pause: Arc, consume_orderly: bool, message_listener: Option>, pub(crate) offset_store: Option>, - consume_message_concurrently_service: Option< + pub(crate) consume_message_concurrently_service: Option< ArcRefCellWrapper< ConsumeMessageConcurrentlyServiceGeneral< ConsumeMessageConcurrentlyService, @@ -108,6 +129,7 @@ pub struct DefaultMQPushConsumerImpl { queue_flow_control_times: u64, queue_max_span_flow_control_times: u64, pop_delay_level: Arc<[i32; 16]>, + default_mqpush_consumer_impl: Option>, } impl DefaultMQPushConsumerImpl { @@ -117,6 +139,7 @@ impl DefaultMQPushConsumerImpl { rpc_hook: Option>>, ) -> Self { let mut this = Self { + pull_time_delay_mills_when_exception: 3_000, client_config: ArcRefCellWrapper::new(client_config.clone()), consumer_config: consumer_config.clone(), rebalance_impl: ArcRefCellWrapper::new(RebalancePushImpl::new( @@ -124,8 +147,9 @@ impl DefaultMQPushConsumerImpl { consumer_config, )), filter_message_hook_list: vec![], + consume_message_hook_list: vec![], rpc_hook, - service_state: ServiceState::CreateJust, + service_state: ArcRefCellWrapper::new(ServiceState::CreateJust), client_instance: None, pull_api_wrapper: None, pause: Arc::new(AtomicBool::new(false)), @@ -139,6 +163,7 @@ impl DefaultMQPushConsumerImpl { pop_delay_level: Arc::new([ 10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200, ]), + default_mqpush_consumer_impl: None, }; let wrapper = ArcRefCellWrapper::downgrade(&this.rebalance_impl); this.rebalance_impl.set_rebalance_impl(wrapper); @@ -151,6 +176,7 @@ impl DefaultMQPushConsumerImpl { ) { self.rebalance_impl .set_default_mqpush_consumer_impl(default_mqpush_consumer_impl.clone()); + self.default_mqpush_consumer_impl = Some(default_mqpush_consumer_impl.clone()); if let Some(ref mut consume_message_concurrently_service) = self.consume_message_concurrently_service { @@ -168,7 +194,7 @@ impl DefaultMQPushConsumerImpl { impl DefaultMQPushConsumerImpl { pub async fn start(&mut self) -> Result<()> { - match self.service_state { + match *self.service_state { ServiceState::CreateJust => { info!( "the consumer [{}] start beginning. message_model={}, isUnitMode={}", @@ -176,7 +202,7 @@ impl DefaultMQPushConsumerImpl { self.consumer_config.message_model, self.consumer_config.unit_mode ); - self.service_state = ServiceState::StartFailed; + *self.service_state = ServiceState::StartFailed; self.check_config()?; self.copy_subscription().await?; if self.consumer_config.message_model() == MessageModel::Clustering { @@ -249,6 +275,7 @@ impl DefaultMQPushConsumerImpl { self.consumer_config.clone(), self.consumer_config.consumer_group.clone(), listener.clone().expect("listener is None"), + self.default_mqpush_consumer_impl.clone(), ), consume_message_pop_concurrently_service: @@ -307,7 +334,7 @@ impl DefaultMQPushConsumerImpl { self.consumer_config.message_model, self.consumer_config.unit_mode ); - self.service_state = ServiceState::Running; + *self.service_state = ServiceState::Running; } ServiceState::Running => { return Err(MQClientError::MQClientErr( @@ -326,7 +353,7 @@ impl DefaultMQPushConsumerImpl { -1, format!( "The PushConsumer service state not OK, maybe started once,{:?},{}", - self.service_state, + *self.service_state, FAQUrl::suggest_todo(FAQUrl::CLIENT_SERVICE_NOT_OK) ), )); @@ -340,7 +367,7 @@ impl DefaultMQPushConsumerImpl { .send_heartbeat_to_all_broker_with_lock() .await { - client_instance.re_balance_immediately().await; + client_instance.re_balance_immediately(); } Ok(()) } @@ -690,8 +717,475 @@ impl DefaultMQPushConsumerImpl { unimplemented!("popMessage"); } - pub(crate) async fn pull_message(&mut self, pull_request: PullRequest) { - unimplemented!("pull_message"); + pub(crate) async fn pull_message(&mut self, mut pull_request: PullRequest) { + //let process_queue = pull_request.get_process_queue_mut(); + if pull_request.process_queue.is_dropped() { + info!("the pull request[{}] is dropped.", pull_request); + return; + } + pull_request + .process_queue + .set_last_pull_timestamp(get_current_millis()); + if let Err(e) = self.make_sure_state_ok() { + warn!("pullMessage exception, consumer state not ok {}", e); + self.execute_pull_request_later( + pull_request, + self.pull_time_delay_mills_when_exception, + ); + return; + } + if self.pause.load(Ordering::Acquire) { + warn!( + "consumer was paused, execute pull request later. instanceName={}, group={}", + self.client_config.instance_name, self.consumer_config.consumer_group + ); + self.execute_pull_request_later(pull_request, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); + return; + } + let cached_message_count = pull_request.process_queue.msg_count(); + let cached_message_size_in_mib = pull_request.process_queue.msg_size() / _1MB; + if cached_message_count > self.consumer_config.pull_threshold_for_queue as u64 { + if self.queue_flow_control_times % 1000 == 0 { + let msg_tree_map = pull_request.process_queue.msg_tree_map.read().await; + let first_key_value = msg_tree_map.first_key_value().unwrap(); + let last_key_value = msg_tree_map.last_key_value().unwrap(); + warn!( + "the cached message count exceeds the threshold {}, so do flow control, \ + minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, \ + flowControlTimes={}", + self.consumer_config.pull_threshold_for_queue, + first_key_value.0, + last_key_value.0, + cached_message_count, + cached_message_size_in_mib, + pull_request.to_string(), + self.queue_flow_control_times + ); + } + self.execute_pull_request_later( + pull_request, + PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, + ); + + self.queue_flow_control_times += 1; + return; + } + + if cached_message_size_in_mib > self.consumer_config.pull_threshold_size_for_queue as u64 { + if self.queue_flow_control_times % 1000 == 0 { + let msg_tree_map = pull_request.process_queue.msg_tree_map.read().await; + let first_key_value = msg_tree_map.first_key_value().unwrap(); + let last_key_value = msg_tree_map.last_key_value().unwrap(); + warn!( + "the cached message size exceeds the threshold {} MiB, so do flow control, \ + minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, \ + flowControlTimes={}", + self.consumer_config.pull_threshold_size_for_queue, + first_key_value.0, + last_key_value.0, + cached_message_count, + cached_message_size_in_mib, + pull_request.to_string(), + self.queue_flow_control_times + ); + } + self.execute_pull_request_later( + pull_request, + PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, + ); + self.queue_flow_control_times += 1; + return; + } + + if !self.consume_orderly { + let max_span = pull_request.process_queue.get_max_span().await; + if max_span > self.consumer_config.consume_concurrently_max_span as u64 { + if self.queue_max_span_flow_control_times % 1000 == 0 { + let msg_tree_map = pull_request.process_queue.msg_tree_map.read().await; + let first_key_value = msg_tree_map.first_key_value().unwrap(); + let last_key_value = msg_tree_map.last_key_value().unwrap(); + warn!( + "the queue's messages, span too long, so do flow control, minOffset={}, \ + maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", + first_key_value.0, + last_key_value.0, + max_span, + pull_request.to_string(), + self.queue_max_span_flow_control_times + ); + } + self.queue_max_span_flow_control_times += 1; + self.execute_pull_request_later( + pull_request, + PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, + ); + return; + } + } else if pull_request.get_process_queue().is_locked() { + if !pull_request.is_previously_locked() { + let offset = match self + .rebalance_impl + .compute_pull_from_where_with_exception(pull_request.get_message_queue()) + .await + { + Ok(value) => { + if value < 0 { + error!( + "Failed to compute pull offset, pullResult: {}", + pull_request + ); + self.execute_pull_request_later( + pull_request, + self.pull_time_delay_mills_when_exception, + ); + + return; + } + value + } + Err(e) => { + error!( + "Failed to compute pull offset, pullResult: {}, {}", + pull_request, e + ); + self.execute_pull_request_later( + pull_request, + self.pull_time_delay_mills_when_exception, + ); + + return; + } + }; + let broker_busy = offset < pull_request.get_next_offset(); + info!( + "the first time to pull message, so fix offset from broker. pullRequest: {} \ + NewOffset: {} brokerBusy: {}", + pull_request.to_string(), + offset, + broker_busy + ); + if broker_busy { + warn!( + "[NOTIFYME]the first time to pull message, but pull request offset larger \ + than broker consume offset. pullRequest: {} NewOffset: {}", + pull_request.to_string(), + offset + ) + } + pull_request.set_previously_locked(true); + pull_request.set_next_offset(offset); + } + } else { + info!( + "pull message later because not locked in broker, {}", + pull_request + ); + self.execute_pull_request_later( + pull_request, + self.pull_time_delay_mills_when_exception, + ); + return; + } + let message_queue = pull_request.get_message_queue().clone(); + let inner = self.rebalance_impl.get_subscription_inner(); + let guard = inner.read().await; + let subscription_data = guard.get(message_queue.get_topic()).cloned(); + + if subscription_data.is_none() { + error!( + "find the consumer's subscription failed, {}, {}", + message_queue, self.consumer_config.consumer_group + ); + self.execute_pull_request_later( + pull_request, + self.pull_time_delay_mills_when_exception, + ); + return; + } + let begin_timestamp = Instant::now(); + let topic = message_queue.get_topic().to_string(); + + let message_queue_inner = message_queue.clone(); + let next_offset = pull_request.next_offset; + let mut commit_offset_enable = false; + let mut commit_offset_value = 0; + if MessageModel::Clustering == self.consumer_config.message_model { + commit_offset_value = self + .offset_store + .as_ref() + .unwrap() + .read_offset(&message_queue, ReadOffsetType::ReadFromMemory) + .await; + if commit_offset_value > 0 { + commit_offset_enable = true; + } + } + let mut sub_expression = None; + let mut class_filter = false; + if subscription_data.is_some() { + let subscription_data = subscription_data.as_ref().unwrap(); + if self.consumer_config.post_subscription_when_pull + && !subscription_data.class_filter_mode + { + sub_expression = Some(subscription_data.sub_string.clone()); + } + class_filter = subscription_data.class_filter_mode + } + let sys_flag = PullSysFlag::build_sys_flag( + commit_offset_enable, + true, + sub_expression.is_some(), + class_filter, + ); + let subscription_data = subscription_data.unwrap(); + let this = self.clone(); + let result = self + .pull_api_wrapper + .as_mut() + .unwrap() + .pull_kernel_impl( + &message_queue, + sub_expression.clone().unwrap_or("".to_string()).as_str(), + subscription_data.expression_type.clone().as_str(), + subscription_data.sub_version, + next_offset, + self.consumer_config.pull_batch_size as i32, + self.consumer_config.pull_batch_size_in_bytes as i32, + sys_flag as i32, + commit_offset_value, + BROKER_SUSPEND_MAX_TIME_MILLIS, + CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, + CommunicationMode::Async, + DefaultPullCallback { + push_consumer_impl: this, + message_queue_inner: Some(message_queue_inner), + subscription_data: Some(subscription_data), + pull_request: Some(pull_request.clone()), + }, + ) + .await; + + if let Err(e) = result { + error!( + "pullKernelImpl exception, {}, cost: {}ms, {}", + pull_request, + begin_timestamp.elapsed().as_millis(), + e + ); + self.execute_pull_request_later( + pull_request, + self.pull_time_delay_mills_when_exception, + ); + } + } + + fn make_sure_state_ok(&self) -> Result<()> { + if *self.service_state != ServiceState::Running { + return Err(MQClientError::MQClientErr( + -1, + format!( + "The consumer service state not OK, {},{}", + *self.service_state, + FAQUrl::suggest_todo(FAQUrl::CLIENT_SERVICE_NOT_OK) + ), + )); + } + Ok(()) + } + + pub(crate) async fn correct_tags_offset(&mut self, pull_request: &PullRequest) { + if pull_request.process_queue.msg_count() == 0 { + self.offset_store + .as_mut() + .unwrap() + .update_offset( + pull_request.get_message_queue(), + pull_request.next_offset, + true, + ) + .await; + } + } + + pub fn try_reset_pop_retry_topic( + msgs: &mut [ArcRefCellWrapper], + consumer_group: &str, + ) { + let pop_retry_prefix = format!( + "{}{}_{}", + mix_all::RETRY_GROUP_TOPIC_PREFIX, + consumer_group, + "_" + ); + for msg in msgs.iter_mut() { + if msg.get_topic().starts_with(&pop_retry_prefix) { + let normal_topic = KeyBuilder::parse_normal_topic(msg.get_topic(), consumer_group); + + if !normal_topic.is_empty() { + msg.set_topic(&normal_topic); + } + } + } + } + + pub fn reset_retry_and_namespace( + &mut self, + msgs: &mut [ArcRefCellWrapper], + consumer_group: &str, + ) { + let group_topic = mix_all::get_retry_topic(consumer_group); + let namespace = self.client_config.get_namespace().unwrap_or("".to_string()); + for msg in msgs.iter_mut() { + if let Some(retry_topic) = msg.get_property(MessageConst::PROPERTY_RETRY_TOPIC) { + if group_topic == msg.get_topic() { + msg.set_topic(retry_topic.as_str()); + } + } + + if !namespace.is_empty() { + let topic = msg.get_topic().to_string(); + msg.set_topic( + NamespaceUtil::without_namespace_with_namespace( + topic.as_str(), + namespace.as_str(), + ) + .as_str(), + ); + } + } + } + + #[inline] + pub fn has_hook(&self) -> bool { + !self.consume_message_hook_list.is_empty() + } + + pub fn execute_hook_before(&self, context: &mut Option) { + for hook in self.consume_message_hook_list.iter() { + hook.consume_message_before(context.as_mut()); + } + } + + pub fn execute_hook_after(&self, context: &mut Option) { + for hook in self.consume_message_hook_list.iter() { + hook.consume_message_after(context.as_mut()); + } + } + + pub async fn send_message_back( + &mut self, + msg: &mut MessageExt, + delay_level: i32, + mq: &MessageQueue, + ) -> Result<()> { + self.send_message_back_with_broker_name(msg, delay_level, None, Some(mq)) + .await + } + pub async fn send_message_back_with_broker_name( + &mut self, + msg: &mut MessageExt, + delay_level: i32, + broker_name: Option, + mq: Option<&MessageQueue>, + ) -> Result<()> { + let need_retry = true; + if broker_name.is_some() + && broker_name + .as_ref() + .unwrap() + .starts_with(mix_all::LOGICAL_QUEUE_MOCK_BROKER_PREFIX) + && mq.is_some() + && mq + .unwrap() + .get_broker_name() + .starts_with(mix_all::LOGICAL_QUEUE_MOCK_BROKER_PREFIX) + { + let _ = self.send_message_back_as_normal_message(msg).await; + } else { + let broker_addr = if let Some(ref broker_name_) = broker_name { + self.client_instance + .as_mut() + .unwrap() + .find_broker_address_in_publish(broker_name_.as_str()) + .await + .unwrap() + } else { + msg.store_host.to_string() + }; + let max_consume_retry_times = self.get_max_reconsume_times(); + let result = self + .client_instance + .as_mut() + .unwrap() + .mq_client_api_impl + .consumer_send_message_back( + broker_addr.as_str(), + broker_name.as_ref().unwrap().as_str(), + msg, + self.consumer_config.consumer_group.as_str(), + delay_level, + 5000, + max_consume_retry_times, + ) + .await; + if let Err(e) = result { + error!("send message back error: {}", e); + + self.send_message_back_as_normal_message(msg).await?; + } + } + msg.set_topic( + NamespaceUtil::without_namespace_with_namespace( + msg.get_topic(), + self.client_config + .get_namespace() + .unwrap_or("".to_string()) + .as_str(), + ) + .as_str(), + ); + Ok(()) + } + + async fn send_message_back_as_normal_message(&mut self, msg: &MessageExt) -> Result<()> { + let topic = mix_all::get_retry_topic(self.consumer_config.consumer_group()); + let body = msg.get_body().cloned(); + let mut new_msg = Message::new_body(topic.as_str(), body); + let origin_msg_id = + MessageAccessor::get_origin_message_id(&new_msg).unwrap_or(msg.msg_id.clone()); + MessageAccessor::set_origin_message_id(&mut new_msg, origin_msg_id.as_str()); + new_msg.set_flag(msg.get_flag()); + MessageAccessor::set_properties(&mut new_msg, msg.get_properties().clone()); + MessageAccessor::put_property( + &mut new_msg, + MessageConst::PROPERTY_RETRY_TOPIC, + msg.get_topic(), + ); + MessageAccessor::set_reconsume_time( + &mut new_msg, + (msg.reconsume_times + 1).to_string().as_str(), + ); + MessageAccessor::set_max_reconsume_times( + &mut new_msg, + self.get_max_reconsume_times().to_string().as_str(), + ); + MessageAccessor::clear_property(&mut new_msg, MessageConst::PROPERTY_TRANSACTION_PREPARED); + new_msg.set_delay_time_level(3 + msg.reconsume_times); + self.client_instance + .as_mut() + .unwrap() + .default_producer + .send(new_msg) + .await?; + Ok(()) + } + + pub fn get_max_reconsume_times(&self) -> i32 { + if self.consumer_config.max_reconsume_times == -1 { + 16 + } else { + self.consumer_config.max_reconsume_times + } } } @@ -767,6 +1261,7 @@ impl MQConsumerInner for DefaultMQPushConsumerImpl { let sub_table = self.rebalance_impl.get_subscription_inner(); let sub_table_inner = sub_table.read().await; if sub_table_inner.contains_key(topic) { + drop(sub_table_inner); let guard = self .rebalance_impl .rebalance_impl_inner diff --git a/rocketmq-client/src/consumer/consumer_impl/process_queue.rs b/rocketmq-client/src/consumer/consumer_impl/process_queue.rs index 080b1d86..0a011507 100644 --- a/rocketmq-client/src/consumer/consumer_impl/process_queue.rs +++ b/rocketmq-client/src/consumer/consumer_impl/process_queue.rs @@ -19,12 +19,16 @@ use std::sync::atomic::AtomicI64; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::sync::RwLock; use once_cell::sync::Lazy; +use rocketmq_common::common::message::message_client_ext::MessageClientExt; use rocketmq_common::common::message::message_ext::MessageExt; +use rocketmq_common::common::message::MessageConst; +use rocketmq_common::common::message::MessageTrait; +use rocketmq_common::ArcRefCellWrapper; use rocketmq_common::TimeUtils::get_current_millis; use rocketmq_remoting::protocol::body::process_queue_info::ProcessQueueInfo; +use tokio::sync::RwLock; use crate::consumer::consumer_impl::PULL_MAX_IDLE_TIME; use crate::consumer::default_mq_push_consumer::DefaultMQPushConsumer; @@ -45,21 +49,23 @@ static REBALANCE_LOCK_INTERVAL: Lazy = Lazy::new(|| { #[derive(Clone)] pub(crate) struct ProcessQueue { - tree_map_lock: Arc>, - msg_tree_map: Arc>>, - msg_count: Arc, - msg_size: Arc, - consume_lock: Arc>, - consuming_msg_orderly_tree_map: Arc>>, - try_unlock_times: Arc, - queue_offset_max: Arc, - dropped: Arc, - last_pull_timestamp: Arc, - last_consume_timestamp: Arc, - locked: Arc, - last_lock_timestamp: Arc, - consuming: Arc, - msg_acc_cnt: Arc, + pub(crate) tree_map_lock: Arc>, + pub(crate) msg_tree_map: + Arc>>>, + pub(crate) msg_count: Arc, + pub(crate) msg_size: Arc, + pub(crate) consume_lock: Arc>, + pub(crate) consuming_msg_orderly_tree_map: + Arc>>>, + pub(crate) try_unlock_times: Arc, + pub(crate) queue_offset_max: Arc, + pub(crate) dropped: Arc, + pub(crate) last_pull_timestamp: Arc, + pub(crate) last_consume_timestamp: Arc, + pub(crate) locked: Arc, + pub(crate) last_lock_timestamp: Arc, + pub(crate) consuming: Arc, + pub(crate) msg_acc_cnt: Arc, } impl ProcessQueue { @@ -67,8 +73,8 @@ impl ProcessQueue { ProcessQueue { tree_map_lock: Arc::new(RwLock::new(())), msg_tree_map: Arc::new(RwLock::new(std::collections::BTreeMap::new())), - msg_count: Arc::new(AtomicI64::new(0)), - msg_size: Arc::new(AtomicI64::new(0)), + msg_count: Arc::new(AtomicU64::new(0)), + msg_size: Arc::new(AtomicU64::new(0)), consume_lock: Arc::new(RwLock::new(())), consuming_msg_orderly_tree_map: Arc::new( RwLock::new(std::collections::BTreeMap::new()), @@ -120,16 +126,92 @@ impl ProcessQueue { unimplemented!("clean_expired_msg") } - pub(crate) fn put_message(&self, messages: Vec) -> bool { - unimplemented!("put_message") + pub(crate) async fn put_message( + &self, + messages: Vec>, + ) -> bool { + let mut dispatch_to_consume = false; + let mut msg_tree_map = self.msg_tree_map.write().await; + let mut valid_msg_cnt = 0; + + let acc_total = if !messages.is_empty() { + let message_ext = messages.last().unwrap(); + if let Some(property) = message_ext.get_property(MessageConst::PROPERTY_MAX_OFFSET) { + property.parse::().unwrap() - message_ext.message_ext_inner.queue_offset + } else { + 0 + } + } else { + 0 + }; + + for message in messages { + if msg_tree_map + .insert(message.message_ext_inner.queue_offset, message.clone()) + .is_none() + { + valid_msg_cnt += 1; + self.queue_offset_max.store( + message.message_ext_inner.queue_offset as u64, + std::sync::atomic::Ordering::Release, + ); + self.msg_size.fetch_add( + message.message_ext_inner.body().as_ref().unwrap().len() as u64, + Ordering::AcqRel, + ); + } + } + self.msg_count.fetch_add(valid_msg_cnt, Ordering::AcqRel); + if !msg_tree_map.is_empty() && !self.consuming.load(Ordering::Acquire) { + dispatch_to_consume = true; + self.consuming.store(true, Ordering::Release); + } + if acc_total > 0 { + self.msg_acc_cnt + .store(acc_total, std::sync::atomic::Ordering::Release); + } + dispatch_to_consume } - pub(crate) fn get_max_span(&self) -> u64 { - unimplemented!("get_max_span") + pub(crate) async fn get_max_span(&self) -> u64 { + let msg_tree_map = self.msg_tree_map.read().await; + if msg_tree_map.is_empty() { + return 0; + } + let first = msg_tree_map.first_key_value().unwrap(); + let last = msg_tree_map.last_key_value().unwrap(); + (last.0 - first.0) as u64 } - pub(crate) fn remove_message(&self, messages: Vec) -> u64 { - unimplemented!("remove_message") + pub(crate) async fn remove_message( + &self, + messages: &[ArcRefCellWrapper], + ) -> i64 { + let mut result = -1; + let mut msg_tree_map = self.msg_tree_map.write().await; + if msg_tree_map.is_empty() { + return result; + } + result = self.queue_offset_max.load(Ordering::Acquire) as i64 + 1; + let mut removed_cnt = 0; + for message in messages { + let prev = msg_tree_map.remove(&message.message_ext_inner.queue_offset); + if let Some(prev) = prev { + removed_cnt += 1; + self.msg_size.fetch_sub( + message.message_ext_inner.body().as_ref().unwrap().len() as u64, + Ordering::AcqRel, + ); + } + self.msg_count.fetch_sub(removed_cnt, Ordering::AcqRel); + if self.msg_count.load(Ordering::Acquire) == 0 { + self.msg_size.store(0, Ordering::Release); + } + if !msg_tree_map.is_empty() { + result = *msg_tree_map.first_key_value().unwrap().0; + } + } + result } pub(crate) fn rollback(&self) { @@ -148,7 +230,7 @@ impl ProcessQueue { unimplemented!("take_messages") } - pub(crate) fn contains_message(&self, message_ext: MessageExt) -> bool { + pub(crate) fn contains_message(&self, message_ext: &MessageExt) -> bool { unimplemented!("contains_message") } @@ -159,4 +241,21 @@ impl ProcessQueue { pub(crate) fn fill_process_queue_info(&self, info: ProcessQueueInfo) { unimplemented!("fill_process_queue_info") } + + pub(crate) fn set_last_pull_timestamp(&self, last_pull_timestamp: u64) { + self.last_pull_timestamp + .store(last_pull_timestamp, std::sync::atomic::Ordering::Release); + } + + pub fn msg_count(&self) -> u64 { + self.msg_count.load(std::sync::atomic::Ordering::Acquire) + } + + pub(crate) fn msg_size(&self) -> u64 { + self.msg_size.load(std::sync::atomic::Ordering::Acquire) + } + + pub(crate) fn is_locked(&self) -> bool { + self.locked.load(std::sync::atomic::Ordering::Acquire) + } } diff --git a/rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs b/rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs index b4ca4286..1f37d4f4 100644 --- a/rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs +++ b/rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs @@ -18,20 +18,39 @@ use std::collections::HashMap; use std::sync::atomic::AtomicU64; use std::sync::Arc; +use rand::Rng; +use rocketmq_common::common::filter::expression_type::ExpressionType; +use rocketmq_common::common::message::message_decoder; use rocketmq_common::common::message::message_queue::MessageQueue; +use rocketmq_common::common::message::MessageConst; +use rocketmq_common::common::message::MessageTrait; use rocketmq_common::common::mix_all; +use rocketmq_common::common::mq_version::RocketMqVersion; +use rocketmq_common::common::sys_flag::message_sys_flag::MessageSysFlag; +use rocketmq_common::common::sys_flag::pull_sys_flag::PullSysFlag; use rocketmq_common::ArcRefCellWrapper; -use tokio::sync::RwLock; +use rocketmq_common::MessageAccessor::MessageAccessor; +use rocketmq_remoting::protocol::header::namesrv::topic_operation_header::TopicRequestHeader; +use rocketmq_remoting::protocol::header::pull_message_request_header::PullMessageRequestHeader; +use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData; +use rocketmq_remoting::rpc::rpc_request_header::RpcRequestHeader; +use crate::consumer::consumer_impl::pull_request_ext::PullResultExt; +use crate::consumer::pull_callback::PullCallback; +use crate::consumer::pull_status::PullStatus; +use crate::error::MQClientError::MQClientErr; use crate::factory::mq_client_instance::MQClientInstance; +use crate::hook::filter_message_context::FilterMessageContext; use crate::hook::filter_message_hook::FilterMessageHook; +use crate::implementation::communication_mode::CommunicationMode; +use crate::Result; #[derive(Clone)] pub struct PullAPIWrapper { - mq_client_factory: ArcRefCellWrapper, + client_instance: ArcRefCellWrapper, consumer_group: String, unit_mode: bool, - pull_from_which_node_table: Arc>>, + pull_from_which_node_table: ArcRefCellWrapper>, connect_broker_by_user: bool, default_broker_id: u64, filter_message_hook_list: Vec>>, @@ -44,10 +63,10 @@ impl PullAPIWrapper { unit_mode: bool, ) -> Self { Self { - mq_client_factory, + client_instance: mq_client_factory, consumer_group, unit_mode, - pull_from_which_node_table: Arc::new(RwLock::new(HashMap::with_capacity(64))), + pull_from_which_node_table: ArcRefCellWrapper::new(HashMap::with_capacity(64)), connect_broker_by_user: false, default_broker_id: mix_all::MASTER_ID, filter_message_hook_list: Vec::new(), @@ -60,4 +79,306 @@ impl PullAPIWrapper { ) { self.filter_message_hook_list = filter_message_hook_list; } + + #[inline] + pub fn update_pull_from_which_node(&mut self, mq: &MessageQueue, broker_id: u64) { + let atomic_u64 = self + .pull_from_which_node_table + .entry(mq.clone()) + .or_insert_with(|| AtomicU64::new(broker_id)); + atomic_u64.store(broker_id, std::sync::atomic::Ordering::Release); + } + + pub fn has_hook(&self) -> bool { + !self.filter_message_hook_list.is_empty() + } + + pub fn process_pull_result( + &mut self, + message_queue: &MessageQueue, + pull_result_ext: &mut PullResultExt, + subscription_data: &SubscriptionData, + ) { + self.update_pull_from_which_node(message_queue, pull_result_ext.suggest_which_broker_id); + if PullStatus::Found == pull_result_ext.pull_result.pull_status { + let mut message_binary = pull_result_ext.message_binary.take().unwrap_or_default(); + let mut msg_vec = message_decoder::decodes_batch_client( + &mut message_binary, + self.client_instance.client_config.decode_read_body, + self.client_instance.client_config.decode_decompress_body, + ); + let mut need_decode_inner_message = false; + for msg in &msg_vec { + if MessageSysFlag::check( + msg.message_ext_inner.sys_flag, + MessageSysFlag::INNER_BATCH_FLAG, + ) && MessageSysFlag::check( + msg.message_ext_inner.sys_flag, + MessageSysFlag::NEED_UNWRAP_FLAG, + ) { + need_decode_inner_message = true; + break; + } + } + if need_decode_inner_message { + let mut inner_msg_vec = Vec::with_capacity(msg_vec.len()); + for msg in msg_vec { + if MessageSysFlag::check( + msg.message_ext_inner.sys_flag, + MessageSysFlag::INNER_BATCH_FLAG, + ) && MessageSysFlag::check( + msg.message_ext_inner.sys_flag, + MessageSysFlag::NEED_UNWRAP_FLAG, + ) { + message_decoder::decode_message_client( + msg.message_ext_inner, + &mut inner_msg_vec, + ); + } else { + inner_msg_vec.push(msg); + } + } + msg_vec = inner_msg_vec; + } + + let mut msg_list_filter_again = + if !subscription_data.tags_set.is_empty() && !subscription_data.class_filter_mode { + let mut msg_vec_again = Vec::with_capacity(msg_vec.len()); + for msg in msg_vec { + if let Some(ref tag) = msg.get_tags() { + if subscription_data.tags_set.contains(tag) { + msg_vec_again.push(msg); + } + } + } + msg_vec_again + } else { + msg_vec + }; + if self.has_hook() { + let context = FilterMessageContext { + unit_mode: self.unit_mode, + msg_list: &msg_list_filter_again, + ..Default::default() + }; + self.execute_hook(&context); + } + + for msg in &mut msg_list_filter_again { + let tra_flag = msg + .get_property(MessageConst::PROPERTY_TRANSACTION_PREPARED) + .map_or(false, |v| v.parse().unwrap_or(false)); + if tra_flag { + if let Some(transaction_id) = + msg.get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) + { + msg.set_transaction_id(transaction_id.as_str()); + } + } + MessageAccessor::put_property( + msg, + MessageConst::PROPERTY_MIN_OFFSET, + pull_result_ext.pull_result.min_offset.to_string().as_str(), + ); + MessageAccessor::put_property( + msg, + MessageConst::PROPERTY_MAX_OFFSET, + pull_result_ext.pull_result.max_offset.to_string().as_str(), + ); + msg.message_ext_inner.broker_name = message_queue.get_broker_name().to_string(); + msg.message_ext_inner.queue_id = message_queue.get_queue_id(); + if let Some(offset_delta) = pull_result_ext.offset_delta { + msg.message_ext_inner.queue_offset += offset_delta; + } + } + pull_result_ext.pull_result.msg_found_list = msg_list_filter_again + .into_iter() + .map(ArcRefCellWrapper::new) + .collect::>(); + } + } + + pub fn execute_hook(&self, context: &FilterMessageContext) { + for hook in &self.filter_message_hook_list { + hook.filter_message(context); + } + } + + pub fn recalculate_pull_from_which_node(&self, mq: &MessageQueue) -> u64 { + if self.connect_broker_by_user { + return self.default_broker_id; + } + + if let Some(atomic_u64) = self.pull_from_which_node_table.get(mq) { + atomic_u64.load(std::sync::atomic::Ordering::Acquire) + } else { + mix_all::MASTER_ID + } + } + + pub async fn pull_kernel_impl( + &mut self, + mq: &MessageQueue, + sub_expression: &str, + expression_type: &str, + sub_version: i64, + offset: i64, + max_nums: i32, + max_size_in_bytes: i32, + sys_flag: i32, + commit_offset: i64, + broker_suspend_max_time_millis: u64, + timeout_millis: u64, + communication_mode: CommunicationMode, + pull_callback: PCB, + ) -> Result> + where + PCB: PullCallback, + { + let broker_name = self + .client_instance + .get_broker_name_from_message_queue(mq) + .await; + let broker_id = self.recalculate_pull_from_which_node(mq); + let mut find_broker_result = self + .client_instance + .find_broker_address_in_subscribe(broker_name.as_str(), broker_id, false) + .await; + + if find_broker_result.is_none() { + self.client_instance + .update_topic_route_info_from_name_server_topic(mq.get_topic()) + .await; + let broker_name_again = self + .client_instance + .get_broker_name_from_message_queue(mq) + .await; + let broker_id_again = self.recalculate_pull_from_which_node(mq); + find_broker_result = self + .client_instance + .find_broker_address_in_subscribe( + broker_name_again.as_str(), + broker_id_again, + false, + ) + .await; + } + + if let Some(find_broker_result) = find_broker_result { + { + if !ExpressionType::is_tag_type(Some(expression_type)) + && find_broker_result.broker_version < RocketMqVersion::V410Snapshot.into() + { + return Err(MQClientErr( + -1, + format!( + "The broker[{}],[{}] does not support consumer to filter message by \ + tag[{}]", + mq.get_broker_name(), + find_broker_result.broker_version, + expression_type + ), + )); + } + } + + let mut sys_flag_inner = sys_flag; + if find_broker_result.slave { + sys_flag_inner = + PullSysFlag::clear_commit_offset_flag(sys_flag_inner as u32) as i32; + } + + let request_header = PullMessageRequestHeader { + consumer_group: self.consumer_group.clone(), + topic: mq.get_topic().to_string(), + queue_id: Some(mq.get_queue_id()), + queue_offset: offset, + max_msg_nums: max_nums, + sys_flag: sys_flag_inner, + commit_offset, + suspend_timeout_millis: broker_suspend_max_time_millis, + subscription: Some(sub_expression.to_string()), + sub_version, + max_msg_bytes: Some(max_size_in_bytes), + request_source: None, + proxy_forward_client_id: None, + expression_type: Some(expression_type.to_string()), + topic_request: Some(TopicRequestHeader { + lo: None, + rpc: Some(RpcRequestHeader { + namespace: None, + namespaced: None, + broker_name: Some(mq.get_broker_name().to_string()), + oneway: None, + }), + }), + }; + + let mut broker_addr = find_broker_result.broker_addr.clone(); + if PullSysFlag::has_class_filter_flag(sys_flag_inner as u32) { + broker_addr = self + .compute_pull_from_which_filter_server(mq.get_topic(), broker_addr.as_str()) + .await?; + } + self.client_instance + .get_mq_client_api_impl() + .pull_message( + broker_addr.as_str(), + request_header, + timeout_millis, + communication_mode, + pull_callback, + ) + .await + } else { + Err(MQClientErr( + -1, + format!("The broker[{}] not exist", mq.get_broker_name(),), + )) + } + } + + async fn compute_pull_from_which_filter_server( + &mut self, + topic: &str, + broker_addr: &str, + ) -> Result { + let topic_route_table = self.client_instance.topic_route_table.read().await; + let topic_route_data = topic_route_table.get(topic); + let vec = topic_route_data + .unwrap() + .filter_server_table + .get(broker_addr); + if let Some(vec) = vec { + return vec.get(random_num() as usize % vec.len()).map_or( + Err(MQClientErr( + -1, + format!( + "Find Filter Server Failed, Broker Addr: {},topic:{}", + broker_addr, topic + ), + )), + |v| Ok(v.clone()), + ); + } + Err(MQClientErr( + -1, + format!( + "Find Filter Server Failed, Broker Addr: {},topic:{}", + broker_addr, topic + ), + )) + } +} + +pub fn random_num() -> i32 { + let mut rng = rand::thread_rng(); + let mut value = rng.gen::(); + if value < 0 { + value = value.abs(); + if value < 0 { + value = 0; + } + } + value } diff --git a/rocketmq-client/src/consumer/consumer_impl/pull_request.rs b/rocketmq-client/src/consumer/consumer_impl/pull_request.rs index 925186a5..a818af03 100644 --- a/rocketmq-client/src/consumer/consumer_impl/pull_request.rs +++ b/rocketmq-client/src/consumer/consumer_impl/pull_request.rs @@ -16,6 +16,7 @@ */ use std::hash::Hash; use std::hash::Hasher; +use std::sync::Arc; use rocketmq_common::common::message::message_enum::MessageRequestMode; use rocketmq_common::common::message::message_queue::MessageQueue; @@ -24,19 +25,19 @@ use crate::consumer::consumer_impl::message_request::MessageRequest; use crate::consumer::consumer_impl::process_queue::ProcessQueue; #[derive(Clone)] -pub struct PullRequest { - consumer_group: String, - message_queue: MessageQueue, - process_queue: ProcessQueue, - next_offset: i64, - previously_locked: bool, +pub(crate) struct PullRequest { + pub(crate) consumer_group: String, + pub(crate) message_queue: MessageQueue, + pub(crate) process_queue: Arc, + pub(crate) next_offset: i64, + pub(crate) previously_locked: bool, } impl PullRequest { pub fn new( consumer_group: String, message_queue: MessageQueue, - process_queue: ProcessQueue, + process_queue: Arc, next_offset: i64, ) -> Self { PullRequest { @@ -80,11 +81,11 @@ impl PullRequest { self.next_offset = next_offset; } - pub fn get_process_queue(&self) -> &ProcessQueue { + pub fn get_process_queue(&self) -> &Arc { &self.process_queue } - pub fn set_process_queue(&mut self, process_queue: ProcessQueue) { + pub fn set_process_queue(&mut self, process_queue: Arc) { self.process_queue = process_queue; } } diff --git a/rocketmq-client/src/consumer/consumer_impl/pull_request_ext.rs b/rocketmq-client/src/consumer/consumer_impl/pull_request_ext.rs new file mode 100644 index 00000000..fd11e18b --- /dev/null +++ b/rocketmq-client/src/consumer/consumer_impl/pull_request_ext.rs @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use crate::consumer::pull_result::PullResult; + +pub(crate) struct PullResultExt { + pub(crate) pull_result: PullResult, + pub(crate) suggest_which_broker_id: u64, + pub(crate) message_binary: Option, + pub(crate) offset_delta: Option, +} diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance.rs index 63f8abe5..93fc6751 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance.rs @@ -32,7 +32,7 @@ pub(crate) mod rebalance_service; #[trait_variant::make(Rebalance: Send)] pub trait RebalanceLocal { async fn message_queue_changed( - &self, + &mut self, topic: &str, mq_all: &HashSet, mq_divided: &HashSet, @@ -63,7 +63,7 @@ pub trait RebalanceLocal { fn dispatch_pop_pull_request(&self, pull_request_list: Vec, delay: u64); fn create_process_queue(&self) -> ProcessQueue; fn create_pop_process_queue(&self) -> PopProcessQueue; - fn remove_process_queue(&self, mq: MessageQueue); + async fn remove_process_queue(&mut self, mq: &MessageQueue); fn unlock(&self, mq: MessageQueue, oneway: bool); fn lock_all(&self); fn unlock_all(&self, oneway: bool); diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs index 03004dbd..0ef777d8 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs @@ -42,7 +42,7 @@ const TIMEOUT_CHECK_TIMES: u32 = 3; const QUERY_ASSIGNMENT_TIMEOUT: u32 = 3000; pub(crate) struct RebalanceImpl { - pub(crate) process_queue_table: Arc>>, + pub(crate) process_queue_table: Arc>>>, pub(crate) pop_process_queue_table: Arc>>, pub(crate) topic_subscribe_info_table: Arc>>>, pub(crate) subscription_inner: Arc>>, @@ -51,6 +51,8 @@ pub(crate) struct RebalanceImpl { pub(crate) allocate_message_queue_strategy: Option>, pub(crate) client_instance: Option>, pub(crate) sub_rebalance_impl: Option>, + pub(crate) topic_broker_rebalance: Arc>>, + pub(crate) topic_client_rebalance: Arc>>, } impl RebalanceImpl @@ -73,6 +75,8 @@ where allocate_message_queue_strategy, client_instance: mqclient_instance, sub_rebalance_impl: None, + topic_broker_rebalance: Arc::new(RwLock::new(HashMap::with_capacity(64))), + topic_client_rebalance: Arc::new(RwLock::new(HashMap::with_capacity(64))), } } @@ -81,12 +85,18 @@ where subscription_inner.insert(topic.to_string(), subscription_data); } + #[inline] pub async fn do_rebalance(&mut self, is_order: bool) -> bool { let mut balanced = true; - let sub_inner = self.subscription_inner.clone(); - let sub_table = sub_inner.read().await; + let sub_table = self.subscription_inner.read().await; if !sub_table.is_empty() { - for (topic, _) in sub_table.iter() { + let topics = sub_table + .keys() + .map(|item| item.to_string()) + .collect::>(); + drop(sub_table); + for topic in &topics { + //try_query_assignment unimplemented if !self.client_rebalance(topic) && self.try_query_assignment(topic).await { if !self.get_rebalance_result_from_broker(topic, is_order).await { balanced = false; @@ -113,7 +123,41 @@ where } async fn truncate_message_queue_not_my_topic(&self) { - unimplemented!("truncate_message_queue_not_my_topic") + let sub_table = self.subscription_inner.read().await; + + let mut process_queue_table = self.process_queue_table.write().await; + process_queue_table.retain(|mq, pq| { + if !sub_table.contains_key(mq.get_topic()) { + pq.set_dropped(true); + info!( + "doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", + self.consumer_group.as_ref().unwrap(), + mq.get_topic() + ); + false + } else { + true + } + }); + let mut pop_process_queue_table = self.pop_process_queue_table.write().await; + pop_process_queue_table.retain(|mq, pq| { + if !sub_table.contains_key(mq.get_topic()) { + pq.set_dropped(true); + info!( + "doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary pop mq, {}", + self.consumer_group.as_ref().unwrap(), + mq.get_topic() + ); + false + } else { + true + } + }); + + let mut topic_client_rebalance = self.topic_client_rebalance.write().await; + topic_client_rebalance.retain(|topic, _| sub_table.contains_key(topic)); + let mut topic_broker_rebalance = self.topic_broker_rebalance.write().await; + topic_broker_rebalance.retain(|topic, _| sub_table.contains_key(topic)); } async fn get_rebalance_result_from_broker(&self, topic: &str, is_order: bool) -> bool { @@ -192,7 +236,7 @@ where } sub_rebalance_impl.remove_dirty_offset(mq).await; - let pq = ProcessQueue::new(); + let pq = Arc::new(ProcessQueue::new()); pq.set_locked(true); let next_offset = sub_rebalance_impl.compute_pull_from_where(mq).await; if next_offset >= 0 { @@ -253,7 +297,7 @@ where .find_consumer_id_list(topic, self.consumer_group.as_ref().unwrap()) .await; if ci_all.is_none() && !topic.starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX) { - if let Some(sub_rebalance_impl) = + if let Some(mut sub_rebalance_impl) = self.sub_rebalance_impl.as_ref().unwrap().upgrade() { sub_rebalance_impl @@ -315,7 +359,7 @@ where .await; if changed { // info - if let Some(sub_rebalance_impl) = + if let Some(mut sub_rebalance_impl) = self.sub_rebalance_impl.as_ref().unwrap().upgrade() { sub_rebalance_impl @@ -323,15 +367,28 @@ where .await; } } - return allocate_result_set.eq(&self.get_working_message_queue(topic)); + return allocate_result_set.eq(&self.get_working_message_queue(topic).await); } true } } } - pub fn get_working_message_queue(&self, topic: &str) -> HashSet { - unimplemented!() + pub async fn get_working_message_queue(&self, topic: &str) -> HashSet { + let mut queue_set = HashSet::new(); + let process_queue_table = self.process_queue_table.read().await; + for (mq, pq) in process_queue_table.iter() { + if mq.get_topic() == topic && !pq.is_dropped() { + queue_set.insert(mq.clone()); + } + } + let pop_process_queue_table = self.pop_process_queue_table.read().await; + for (mq, pq) in pop_process_queue_table.iter() { + if mq.get_topic() == topic && !pq.is_dropped() { + queue_set.insert(mq.clone()); + } + } + queue_set } pub fn lock(&self, mq: &MessageQueue) -> bool { diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs index d3502fca..f3a3dd59 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs @@ -149,7 +149,7 @@ impl RebalancePushImpl { impl Rebalance for RebalancePushImpl { async fn message_queue_changed( - &self, + &mut self, topic: &str, mq_all: &HashSet, mq_divided: &HashSet, @@ -167,9 +167,27 @@ impl Rebalance for RebalancePushImpl { let process_queue_table = self.rebalance_impl_inner.process_queue_table.read().await; let current_queue_count = process_queue_table.len(); if current_queue_count != 0 { - unimplemented!() + let pull_threshold_for_topic = self.consumer_config.pull_threshold_for_topic; + if pull_threshold_for_topic != -1 { + let new_val = 1.max(pull_threshold_for_topic / current_queue_count as i32); + info!( + "The pullThresholdForQueue is changed from {} to {}", + pull_threshold_for_topic, new_val + ); + self.consumer_config.pull_threshold_for_topic = new_val; + } + let pull_threshold_size_for_topic = self.consumer_config.pull_threshold_size_for_topic; + if pull_threshold_size_for_topic != -1 { + let new_val = 1.max(pull_threshold_size_for_topic / current_queue_count as i32); + info!( + "The pullThresholdSizeForQueue is changed from {} to {}", + pull_threshold_size_for_topic, new_val + ); + self.consumer_config.pull_threshold_size_for_topic = new_val; + } } - self.rebalance_impl_inner + let _ = self + .rebalance_impl_inner .client_instance .as_ref() .unwrap() @@ -394,8 +412,21 @@ impl Rebalance for RebalancePushImpl { PopProcessQueue::new() } - fn remove_process_queue(&self, mq: MessageQueue) { - todo!() + async fn remove_process_queue(&mut self, mq: &MessageQueue) { + let mut process_queue_table = self.rebalance_impl_inner.process_queue_table.write().await; + let prev = process_queue_table.remove(mq); + drop(process_queue_table); + if let Some(pq) = prev { + let droped = pq.is_dropped(); + pq.set_dropped(true); + self.remove_unnecessary_message_queue(mq, &pq).await; + info!( + "Fix Offset, {}, remove unnecessary mq, {} Droped: {}", + self.rebalance_impl_inner.consumer_group.as_ref().unwrap(), + mq, + droped + ); + } } fn unlock(&self, mq: MessageQueue, oneway: bool) { diff --git a/rocketmq-client/src/consumer/listener/message_listener_concurrently.rs b/rocketmq-client/src/consumer/listener/message_listener_concurrently.rs index 104afc60..6664f211 100644 --- a/rocketmq-client/src/consumer/listener/message_listener_concurrently.rs +++ b/rocketmq-client/src/consumer/listener/message_listener_concurrently.rs @@ -25,8 +25,8 @@ use crate::Result; pub trait MessageListenerConcurrently: Sync + Send { fn consume_message( &self, - msgs: Vec, - context: ConsumeConcurrentlyContext, + msgs: &[&MessageExt], + context: &ConsumeConcurrentlyContext, ) -> Result; } diff --git a/rocketmq-client/src/consumer/pull_callback.rs b/rocketmq-client/src/consumer/pull_callback.rs index 20869e62..ff567baf 100644 --- a/rocketmq-client/src/consumer/pull_callback.rs +++ b/rocketmq-client/src/consumer/pull_callback.rs @@ -14,14 +14,411 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + use std::sync::Arc; -use crate::consumer::pull_result::PullResult; +use rocketmq_common::common::message::message_queue::MessageQueue; +use rocketmq_common::common::mix_all; +use rocketmq_remoting::code::response_code::ResponseCode; +use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData; +use tracing::warn; + +use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageServiceTrait; +use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl; +use crate::consumer::consumer_impl::default_mq_push_consumer_impl::PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL; +use crate::consumer::consumer_impl::pull_request::PullRequest; +use crate::consumer::consumer_impl::pull_request_ext::PullResultExt; +use crate::consumer::consumer_impl::re_balance::Rebalance; +use crate::consumer::pull_status::PullStatus; +use crate::error::MQClientError; pub type PullCallbackFn = - Arc, Option<&dyn std::error::Error>) + Send + Sync>; + Arc, Option>) + Send + Sync>; + +#[trait_variant::make(PullCallback: Send)] +pub trait PullCallbackLocal: Sync { + async fn on_success(&mut self, pull_result: PullResultExt); + fn on_exception(&mut self, e: Box); +} + +pub(crate) struct DefaultPullCallback { + pub(crate) push_consumer_impl: DefaultMQPushConsumerImpl, + pub(crate) message_queue_inner: Option, + pub(crate) subscription_data: Option, + pub(crate) pull_request: Option, +} + +impl DefaultPullCallback { + /*fn kkk(){ + let pull_callback = + |pull_result_ext: Option, + err: Option>| { + tokio::spawn(async move { + if let Some(mut pull_result_ext) = pull_result_ext { + this.pull_api_wrapper.as_mut().unwrap().process_pull_result( + &message_queue_inner, + &mut pull_result_ext, + subscription_data.as_ref().unwrap(), + ); + match pull_result_ext.pull_result.pull_status { + PullStatus::Found => { + let prev_request_offset = pull_request.next_offset; + pull_request.set_next_offset( + pull_result_ext.pull_result.next_begin_offset as i64, + ); + /*let pull_rt = get_current_millis() - begin_timestamp.elapsed().as_millis() as u64; + self.client_instance.as_mut().unwrap().*/ + let mut first_msg_offset = i64::MAX; + if pull_result_ext.pull_result.msg_found_list.is_empty() { + this.execute_pull_request_immediately(pull_request).await; + } else { + first_msg_offset = pull_result_ext + .pull_result + .msg_found_list() + .first() + .unwrap() + .message_ext_inner + .queue_offset; + // DefaultMQPushConsumerImpl.this.getConsumerStatsManager(). + // incPullTPS(pullRequest.getConsumerGroup(), + // + // pullRequest.getMessageQueue().getTopic(), + // pullResult.getMsgFoundList().size()); + let vec = pull_result_ext + .pull_result + .msg_found_list + .clone() + .into_iter() + .map(|msg| msg.message_ext_inner) + .collect(); + let dispatch_to_consume = + pull_request.process_queue.put_message(vec).await; + this.consume_message_concurrently_service + .as_mut() + .unwrap() + .consume_message_concurrently_service + .submit_consume_request( + pull_result_ext.pull_result.msg_found_list, + pull_request.get_process_queue().clone(), + pull_request.get_message_queue().clone(), + dispatch_to_consume, + ) + .await; + if this.consumer_config.pull_interval > 0 { + this.execute_pull_request_later( + pull_request, + this.consumer_config.pull_interval, + ); + } else { + this.execute_pull_request_immediately(pull_request).await; + } + } + if pull_result_ext.pull_result.next_begin_offset + < prev_request_offset as u64 + || first_msg_offset < prev_request_offset + { + warn!( + "[BUG] pull message result maybe data wrong, \ + nextBeginOffset: {} firstMsgOffset: {} \ + prevRequestOffset: {}", + pull_result_ext.pull_result.next_begin_offset, + prev_request_offset, + prev_request_offset + ); + } + } + PullStatus::NoNewMsg | PullStatus::NoMatchedMsg => { + pull_request.next_offset = + pull_result_ext.pull_result.next_begin_offset as i64; + this.correct_tags_offset(&pull_request).await; + this.execute_pull_request_immediately(pull_request).await; + } + + PullStatus::OffsetIllegal => { + warn!( + "the pull request offset illegal, {},{}", + pull_result_ext.pull_result, + pull_result_ext.pull_result.pull_status + ); + pull_request.next_offset = + pull_result_ext.pull_result.next_begin_offset as i64; + pull_request.process_queue.set_dropped(true); + tokio::spawn(async move { + let offset_store = this.offset_store.as_mut().unwrap(); + offset_store + .update_and_freeze_offset( + pull_request.get_message_queue(), + pull_request.next_offset, + ) + .await; + offset_store.persist(pull_request.get_message_queue()).await; + this.rebalance_impl + .remove_process_queue(pull_request.get_message_queue()) + .await; + this.rebalance_impl + .rebalance_impl_inner + .client_instance + .as_ref() + .unwrap() + .re_balance_immediately() + }); + } + }; + return; + } + + if let Some(err) = err { + if !topic.starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX) { + if let Some(er) = err.downcast_ref::() { + match er { + MQClientError::MQBrokerError(code, msg, addr) => { + if ResponseCode::from(*code) + == ResponseCode::SubscriptionNotLatest + { + warn!( + "the subscription is not latest, group={}", + this.consumer_config.consumer_group, + ); + } else { + warn!( + "execute the pull request exception, group={}", + this.consumer_config.consumer_group + ); + } + } + _ => { + warn!( + "execute the pull request exception, group={}", + this.consumer_config.consumer_group + ); + } + } + } else { + warn!( + "execute the pull request exception, group={}", + this.consumer_config.consumer_group + ); + } + } + if let Some(er) = err.downcast_ref::() { + match er { + MQClientError::MQBrokerError(code, msg, addr) => { + if ResponseCode::from(*code) == ResponseCode::FlowControl { + this.execute_pull_request_later( + pull_request, + PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL, + ); + } else { + this.execute_pull_request_later( + pull_request, + this.pull_time_delay_mills_when_exception, + ); + } + } + _ => { + this.execute_pull_request_later( + pull_request, + this.pull_time_delay_mills_when_exception, + ); + } + } + } else { + this.execute_pull_request_later( + pull_request, + this.pull_time_delay_mills_when_exception, + ); + } + } + }); + }; + }*/ +} + +impl PullCallback for DefaultPullCallback { + async fn on_success(&mut self, mut pull_result_ext: PullResultExt) { + let message_queue_inner = self.message_queue_inner.take().unwrap(); + let subscription_data = self.subscription_data.take().unwrap(); + let mut pull_request = self.pull_request.take().unwrap(); + + self.push_consumer_impl + .pull_api_wrapper + .as_mut() + .unwrap() + .process_pull_result( + &message_queue_inner, + &mut pull_result_ext, + &subscription_data, + ); + match pull_result_ext.pull_result.pull_status { + PullStatus::Found => { + let prev_request_offset = pull_request.next_offset; + pull_request.set_next_offset(pull_result_ext.pull_result.next_begin_offset as i64); + /*let pull_rt = get_current_millis() - begin_timestamp.elapsed().as_millis() as u64; + self.client_instance.as_mut().unwrap().*/ + let mut first_msg_offset = i64::MAX; + if pull_result_ext.pull_result.msg_found_list.is_empty() { + self.push_consumer_impl + .execute_pull_request_immediately(pull_request) + .await; + } else { + first_msg_offset = pull_result_ext + .pull_result + .msg_found_list() + .first() + .unwrap() + .message_ext_inner + .queue_offset; + // DefaultMQPushConsumerImpl.self.push_consumer_impl.getConsumerStatsManager(). + // incPullTPS(pullRequest.getConsumerGroup(), + // + // pullRequest.getMessageQueue().getTopic(), + // pullResult.getMsgFoundList().size()); + let vec = pull_result_ext.pull_result.msg_found_list.clone(); + let dispatch_to_consume = pull_request.process_queue.put_message(vec).await; + self.push_consumer_impl + .consume_message_concurrently_service + .as_mut() + .unwrap() + .consume_message_concurrently_service + .submit_consume_request( + pull_result_ext.pull_result.msg_found_list, + pull_request.get_process_queue().clone(), + pull_request.get_message_queue().clone(), + dispatch_to_consume, + ) + .await; + if self.push_consumer_impl.consumer_config.pull_interval > 0 { + self.push_consumer_impl.execute_pull_request_later( + pull_request, + self.push_consumer_impl.consumer_config.pull_interval, + ); + } else { + self.push_consumer_impl + .execute_pull_request_immediately(pull_request) + .await; + } + } + if pull_result_ext.pull_result.next_begin_offset < prev_request_offset as u64 + || first_msg_offset < prev_request_offset + { + warn!( + "[BUG] pull message result maybe data wrong, nextBeginOffset: {} \ + firstMsgOffset: {} prevRequestOffset: {}", + pull_result_ext.pull_result.next_begin_offset, + prev_request_offset, + prev_request_offset + ); + } + } + PullStatus::NoNewMsg | PullStatus::NoMatchedMsg => { + pull_request.next_offset = pull_result_ext.pull_result.next_begin_offset as i64; + self.push_consumer_impl + .correct_tags_offset(&pull_request) + .await; + self.push_consumer_impl + .execute_pull_request_immediately(pull_request) + .await; + } + + PullStatus::OffsetIllegal => { + warn!( + "the pull request offset illegal, {},{}", + pull_result_ext.pull_result, pull_result_ext.pull_result.pull_status + ); + pull_request.next_offset = pull_result_ext.pull_result.next_begin_offset as i64; + pull_request.process_queue.set_dropped(true); + + let offset_store = self.push_consumer_impl.offset_store.as_mut().unwrap(); + offset_store + .update_and_freeze_offset( + pull_request.get_message_queue(), + pull_request.next_offset, + ) + .await; + offset_store.persist(pull_request.get_message_queue()).await; + self.push_consumer_impl + .rebalance_impl + .remove_process_queue(pull_request.get_message_queue()) + .await; + self.push_consumer_impl + .rebalance_impl + .rebalance_impl_inner + .client_instance + .as_ref() + .unwrap() + .re_balance_immediately() + } + }; + } + + fn on_exception(&mut self, err: Box) { + let message_queue_inner = self.message_queue_inner.take().unwrap(); + let pull_request = self.pull_request.take().unwrap(); + let topic = message_queue_inner.get_topic().to_string(); + if !topic.starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX) { + if let Some(er) = err.downcast_ref::() { + match er { + MQClientError::MQBrokerError(code, msg, addr) => { + if ResponseCode::from(*code) == ResponseCode::SubscriptionNotLatest { + warn!( + "the subscription is not latest, group={}", + self.push_consumer_impl.consumer_config.consumer_group, + ); + } else { + warn!( + "execute the pull request exception, group={}", + self.push_consumer_impl.consumer_config.consumer_group + ); + } + } + _ => { + warn!( + "execute the pull request exception, group={}", + self.push_consumer_impl.consumer_config.consumer_group + ); + } + } + } else { + warn!( + "execute the pull request exception, group={}", + self.push_consumer_impl.consumer_config.consumer_group + ); + } + } + let time_delay = if let Some(er) = err.downcast_ref::() { + match er { + MQClientError::MQBrokerError(code, _, _) => { + if ResponseCode::from(*code) == ResponseCode::FlowControl { + /*self.push_consumer_impl.execute_pull_request_later( + pull_request, + PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL, + );*/ + PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL + } else { + /*self.push_consumer_impl.execute_pull_request_later( + pull_request, + self.push_consumer_impl.pull_time_delay_mills_when_exception, + );*/ + self.push_consumer_impl.pull_time_delay_mills_when_exception + } + } + _ => { + /*self.push_consumer_impl.execute_pull_request_later( + pull_request, + self.push_consumer_impl.pull_time_delay_mills_when_exception, + );*/ + self.push_consumer_impl.pull_time_delay_mills_when_exception + } + } + } else { + /*self.push_consumer_impl.execute_pull_request_later( + pull_request, + self.push_consumer_impl.pull_time_delay_mills_when_exception, + );*/ + self.push_consumer_impl.pull_time_delay_mills_when_exception + }; -pub trait PullCallback { - fn on_success(&self, pull_result: PullResult); - fn on_exception(&self, e: &dyn std::error::Error); + self.push_consumer_impl + .execute_pull_request_later(pull_request, time_delay); + } } diff --git a/rocketmq-client/src/consumer/pull_result.rs b/rocketmq-client/src/consumer/pull_result.rs index 6f6562fa..7d820dd0 100644 --- a/rocketmq-client/src/consumer/pull_result.rs +++ b/rocketmq-client/src/consumer/pull_result.rs @@ -14,17 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -use rocketmq_common::common::message::message_ext::MessageExt; +use rocketmq_common::common::message::message_client_ext::MessageClientExt; +use rocketmq_common::ArcRefCellWrapper; use crate::consumer::pull_status::PullStatus; -#[derive(Debug)] pub struct PullResult { pub(crate) pull_status: PullStatus, pub(crate) next_begin_offset: u64, pub(crate) min_offset: u64, pub(crate) max_offset: u64, - pub(crate) msg_found_list: Vec, + pub(crate) msg_found_list: Vec>, } impl PullResult { @@ -33,7 +33,7 @@ impl PullResult { next_begin_offset: u64, min_offset: u64, max_offset: u64, - msg_found_list: Vec, + msg_found_list: Vec>, ) -> Self { Self { pull_status, @@ -60,11 +60,11 @@ impl PullResult { self.max_offset } - pub fn msg_found_list(&self) -> &Vec { + pub fn msg_found_list(&self) -> &Vec> { &self.msg_found_list } - pub fn set_msg_found_list(&mut self, msg_found_list: Vec) { + pub fn set_msg_found_list(&mut self, msg_found_list: Vec>) { self.msg_found_list = msg_found_list; } } diff --git a/rocketmq-client/src/consumer/store/offset_store.rs b/rocketmq-client/src/consumer/store/offset_store.rs index bf26d496..cf2eae26 100644 --- a/rocketmq-client/src/consumer/store/offset_store.rs +++ b/rocketmq-client/src/consumer/store/offset_store.rs @@ -82,11 +82,11 @@ impl OffsetStore { } } pub async fn read_offset(&self, mq: &MessageQueue, type_: ReadOffsetType) -> i64 { - if let Some(store) = &self.remote_broker_offset_store { + if let Some(ref store) = self.remote_broker_offset_store { return store.read_offset(mq, type_).await; } - if let Some(store) = &self.local_file_offset_store { + if let Some(ref store) = self.local_file_offset_store { return store.read_offset(mq, type_).await; } 0 diff --git a/rocketmq-client/src/consumer/store/remote_broker_offset_store.rs b/rocketmq-client/src/consumer/store/remote_broker_offset_store.rs index 1d164d25..368348d5 100644 --- a/rocketmq-client/src/consumer/store/remote_broker_offset_store.rs +++ b/rocketmq-client/src/consumer/store/remote_broker_offset_store.rs @@ -22,6 +22,7 @@ use rocketmq_common::common::message::message_queue::MessageQueue; use rocketmq_common::common::mix_all; use rocketmq_common::ArcRefCellWrapper; use rocketmq_remoting::protocol::header::namesrv::topic_operation_header::TopicRequestHeader; +use rocketmq_remoting::protocol::header::query_consumer_offset_request_header::QueryConsumerOffsetRequestHeader; use rocketmq_remoting::protocol::header::update_consumer_offset_header::UpdateConsumerOffsetRequestHeader; use rocketmq_remoting::rpc::rpc_request_header::RpcRequestHeader; use tokio::sync::Mutex; @@ -52,7 +53,62 @@ impl RemoteBrokerOffsetStore { } async fn fetch_consume_offset_from_broker(&self, mq: &MessageQueue) -> Result { - todo!() + let broker_name = self + .client_instance + .get_broker_name_from_message_queue(mq) + .await; + let mut find_broker_result = self + .client_instance + .mut_from_ref() + .find_broker_address_in_subscribe(broker_name.as_str(), mix_all::MASTER_ID, true) + .await; + + if find_broker_result.is_none() { + self.client_instance + .mut_from_ref() + .update_topic_route_info_from_name_server_topic(mq.get_topic()) + .await; + let broker_name = self + .client_instance + .get_broker_name_from_message_queue(mq) + .await; + find_broker_result = self + .client_instance + .mut_from_ref() + .find_broker_address_in_subscribe(broker_name.as_str(), mix_all::MASTER_ID, false) + .await; + } + if let Some(find_broker_result) = find_broker_result { + let request_header = QueryConsumerOffsetRequestHeader { + consumer_group: self.group_name.clone(), + topic: mq.get_topic().to_string(), + queue_id: mq.get_queue_id(), + set_zero_if_not_found: None, + topic_request_header: Some(TopicRequestHeader { + lo: None, + rpc: Some(RpcRequestHeader { + namespace: None, + namespaced: None, + broker_name: Some(mq.get_broker_name().to_string()), + oneway: None, + }), + }), + }; + self.client_instance + .mut_from_ref() + .mq_client_api_impl + .query_consumer_offset( + find_broker_result.broker_addr.as_str(), + request_header, + 5_000, + ) + .await + } else { + Err(MQClientError::MQClientErr( + -1, + format!("broker not found, {}", mq.get_broker_name()), + )) + } } } diff --git a/rocketmq-client/src/factory/mq_client_instance.rs b/rocketmq-client/src/factory/mq_client_instance.rs index 69fe4f79..b1d81772 100644 --- a/rocketmq-client/src/factory/mq_client_instance.rs +++ b/rocketmq-client/src/factory/mq_client_instance.rs @@ -68,7 +68,7 @@ pub struct MQClientInstance where C: Clone, { - client_config: Arc, + pub(crate) client_config: Arc, pub(crate) client_id: String, boot_timestamp: u64, /** @@ -97,7 +97,7 @@ where service_state: ServiceState, pub(crate) pull_message_service: ArcRefCellWrapper, rebalance_service: RebalanceService, - default_mqproducer: ArcRefCellWrapper, + pub(crate) default_producer: ArcRefCellWrapper, instance_runtime: Arc, broker_addr_table: Arc>>>, broker_version_table: @@ -153,7 +153,7 @@ where service_state: ServiceState::CreateJust, pull_message_service: ArcRefCellWrapper::new(PullMessageService::new()), rebalance_service: RebalanceService::new(), - default_mqproducer: ArcRefCellWrapper::new( + default_producer: ArcRefCellWrapper::new( DefaultMQProducer::builder() .producer_group(mix_all::CLIENT_INNER_PRODUCER_GROUP) .client_config(client_config.clone()) @@ -181,7 +181,7 @@ where .send_heartbeat_to_broker(*id, broker_name, addr) .await { - instance_.re_balance_immediately().await; + instance_.re_balance_immediately(); } } } @@ -195,7 +195,7 @@ where instance } - pub async fn re_balance_immediately(&self) { + pub fn re_balance_immediately(&self) { self.rebalance_service.wakeup(); } @@ -229,7 +229,7 @@ where // Start rebalance service self.rebalance_service.start(self.clone()).await; // Start push service - self.default_mqproducer + self.default_producer .default_mqproducer_impl .as_mut() .unwrap() @@ -848,7 +848,9 @@ where for (key, value) in consumer_table.iter() { match value.try_rebalance().await { Ok(result) => { - balanced = result; + if !result { + balanced = false; + } } Err(e) => { error!( @@ -898,7 +900,11 @@ where found = broker_addr.is_some(); } if !found && !only_this_broker { - unimplemented!("findBrokerAddressInSubscribe") + if let Some((key, value)) = map.iter().next() { + //broker_addr = Some(value.clone()); + slave = *key != mix_all::MASTER_ID as i64; + found = !value.is_empty(); + } } } if found { diff --git a/rocketmq-client/src/hook/consume_message_context.rs b/rocketmq-client/src/hook/consume_message_context.rs index 236ec6ca..5b8de362 100644 --- a/rocketmq-client/src/hook/consume_message_context.rs +++ b/rocketmq-client/src/hook/consume_message_context.rs @@ -17,20 +17,21 @@ use std::collections::HashMap; use std::sync::Arc; -use rocketmq_common::common::message::message_ext::MessageExt; +use rocketmq_common::common::message::message_client_ext::MessageClientExt; use rocketmq_common::common::message::message_queue::MessageQueue; +use rocketmq_common::ArcRefCellWrapper; use crate::base::access_channel::AccessChannel; #[derive(Default)] -pub struct ConsumeMessageContext { +pub struct ConsumeMessageContext<'a> { pub consumer_group: String, - pub msg_list: Vec, + pub msg_list: &'a [ArcRefCellWrapper], pub mq: Option, pub success: bool, pub status: String, pub mq_trace_context: Option>>, pub props: HashMap, pub namespace: String, - pub access_channel: AccessChannel, + pub access_channel: Option, } diff --git a/rocketmq-client/src/hook/consume_message_hook.rs b/rocketmq-client/src/hook/consume_message_hook.rs index cdb8ad18..b8d0bd3c 100644 --- a/rocketmq-client/src/hook/consume_message_hook.rs +++ b/rocketmq-client/src/hook/consume_message_hook.rs @@ -19,7 +19,7 @@ use crate::hook::consume_message_context::ConsumeMessageContext; pub trait ConsumeMessageHook { fn hook_name(&self) -> &str; - fn consume_message_before(&self, context: Option<&ConsumeMessageContext>); + fn consume_message_before(&self, context: Option<&mut ConsumeMessageContext>); - fn consume_message_after(&self, context: Option<&ConsumeMessageContext>); + fn consume_message_after(&self, context: Option<&mut ConsumeMessageContext>); } diff --git a/rocketmq-client/src/hook/filter_message_context.rs b/rocketmq-client/src/hook/filter_message_context.rs index 70d086aa..b715f0d6 100644 --- a/rocketmq-client/src/hook/filter_message_context.rs +++ b/rocketmq-client/src/hook/filter_message_context.rs @@ -16,26 +16,27 @@ */ use std::fmt; -use rocketmq_common::common::message::message_ext::MessageExt; +use rocketmq_common::common::message::message_client_ext::MessageClientExt; use rocketmq_common::common::message::message_queue::MessageQueue; -pub struct FilterMessageContext { - consumer_group: String, - msg_list: Vec, - mq: MessageQueue, - arg: Option>, - unit_mode: bool, +#[derive(Default)] +pub struct FilterMessageContext<'a> { + pub(crate) consumer_group: Option, + pub(crate) msg_list: &'a [MessageClientExt], + pub(crate) mq: Option<&'a MessageQueue>, + pub(crate) arg: Option>, + pub(crate) unit_mode: bool, } -impl FilterMessageContext { +impl<'a> FilterMessageContext<'a> { pub fn new( - consumer_group: String, - msg_list: Vec, - mq: MessageQueue, + consumer_group: Option, + msg_list: &'a [MessageClientExt], + mq: Option<&'a MessageQueue>, arg: Option>, unit_mode: bool, ) -> Self { - Self { + FilterMessageContext { consumer_group, msg_list, mq, @@ -44,52 +45,52 @@ impl FilterMessageContext { } } - pub fn consumer_group(&self) -> &str { + pub fn consumer_group(&self) -> &Option { &self.consumer_group } - pub fn set_consumer_group(&mut self, consumer_group: String) { - self.consumer_group = consumer_group; + pub fn msg_list(&self) -> &'a [MessageClientExt] { + self.msg_list } - pub fn msg_list(&self) -> &Vec { - &self.msg_list + pub fn mq(&self) -> Option<&'a MessageQueue> { + self.mq } - pub fn set_msg_list(&mut self, msg_list: Vec) { - self.msg_list = msg_list; + pub fn arg(&self) -> &Option> { + &self.arg } - pub fn mq(&self) -> &MessageQueue { - &self.mq + pub fn unit_mode(&self) -> bool { + self.unit_mode } - pub fn set_mq(&mut self, mq: MessageQueue) { - self.mq = mq; + pub fn set_consumer_group(&mut self, consumer_group: Option) { + self.consumer_group = consumer_group; + } + + pub fn set_msg_list(&mut self, msg_list: &'a [MessageClientExt]) { + self.msg_list = msg_list; } - pub fn arg(&self) -> Option<&Box> { - self.arg.as_ref() + pub fn set_mq(&mut self, mq: Option<&'a MessageQueue>) { + self.mq = mq; } pub fn set_arg(&mut self, arg: Option>) { self.arg = arg; } - pub fn unit_mode(&self) -> bool { - self.unit_mode - } - pub fn set_unit_mode(&mut self, unit_mode: bool) { self.unit_mode = unit_mode; } } -impl fmt::Debug for FilterMessageContext { +impl fmt::Debug for FilterMessageContext<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "FilterMessageContext {{ consumer_group: {}, msg_list: {:?}, mq: {:?}, arg: {:?}, \ + "FilterMessageContext {{ consumer_group: {:?}, msg_list: {:?}, mq: {:?}, arg: {:?}, \ unit_mode: {} }}", self.consumer_group, self.msg_list, self.mq, self.arg, self.unit_mode ) diff --git a/rocketmq-client/src/hook/filter_message_hook.rs b/rocketmq-client/src/hook/filter_message_hook.rs index 7a8a1eff..fb1f2838 100644 --- a/rocketmq-client/src/hook/filter_message_hook.rs +++ b/rocketmq-client/src/hook/filter_message_hook.rs @@ -19,5 +19,5 @@ use crate::hook::filter_message_context::FilterMessageContext; pub trait FilterMessageHook { fn hook_name(&self) -> &str; - fn filter_message(&self, context: Option<&FilterMessageContext>); + fn filter_message(&self, context: &FilterMessageContext<'_>); } diff --git a/rocketmq-client/src/implementation/mq_client_api_impl.rs b/rocketmq-client/src/implementation/mq_client_api_impl.rs index 510765de..9ab2cc50 100644 --- a/rocketmq-client/src/implementation/mq_client_api_impl.rs +++ b/rocketmq-client/src/implementation/mq_client_api_impl.rs @@ -22,6 +22,7 @@ use bytes::Bytes; use lazy_static::lazy_static; use rocketmq_common::common::message::message_batch::MessageBatch; use rocketmq_common::common::message::message_client_id_setter::MessageClientIDSetter; +use rocketmq_common::common::message::message_ext::MessageExt; use rocketmq_common::common::message::message_queue::MessageQueue; use rocketmq_common::common::message::MessageConst; use rocketmq_common::common::message::MessageTrait; @@ -29,6 +30,7 @@ use rocketmq_common::common::mix_all; use rocketmq_common::common::namesrv::default_top_addressing::DefaultTopAddressing; use rocketmq_common::common::namesrv::name_server_update_callback::NameServerUpdateCallback; use rocketmq_common::common::namesrv::top_addressing::TopAddressing; +use rocketmq_common::common::sys_flag::pull_sys_flag::PullSysFlag; use rocketmq_common::common::topic::TopicValidator; use rocketmq_common::ArcRefCellWrapper; use rocketmq_remoting::base::connection_net_event::ConnectionNetEvent; @@ -39,11 +41,16 @@ use rocketmq_remoting::code::response_code::ResponseCode; use rocketmq_remoting::protocol::body::check_client_request_body::CheckClientRequestBody; use rocketmq_remoting::protocol::body::get_consumer_listby_group_response_body::GetConsumerListByGroupResponseBody; use rocketmq_remoting::protocol::header::client_request_header::GetRouteInfoRequestHeader; +use rocketmq_remoting::protocol::header::consumer_send_msg_back_request_header::ConsumerSendMsgBackRequestHeader; use rocketmq_remoting::protocol::header::get_consumer_listby_group_request_header::GetConsumerListByGroupRequestHeader; use rocketmq_remoting::protocol::header::heartbeat_request_header::HeartbeatRequestHeader; use rocketmq_remoting::protocol::header::message_operation_header::send_message_request_header::SendMessageRequestHeader; use rocketmq_remoting::protocol::header::message_operation_header::send_message_request_header_v2::SendMessageRequestHeaderV2; use rocketmq_remoting::protocol::header::message_operation_header::send_message_response_header::SendMessageResponseHeader; +use rocketmq_remoting::protocol::header::pull_message_request_header::PullMessageRequestHeader; +use rocketmq_remoting::protocol::header::pull_message_response_header::PullMessageResponseHeader; +use rocketmq_remoting::protocol::header::query_consumer_offset_request_header::QueryConsumerOffsetRequestHeader; +use rocketmq_remoting::protocol::header::query_consumer_offset_response_header::QueryConsumerOffsetResponseHeader; use rocketmq_remoting::protocol::header::update_consumer_offset_header::UpdateConsumerOffsetRequestHeader; use rocketmq_remoting::protocol::heartbeat::heartbeat_data::HeartbeatData; use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData; @@ -53,13 +60,19 @@ use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData; use rocketmq_remoting::protocol::RemotingDeserializable; use rocketmq_remoting::protocol::RemotingSerializable; use rocketmq_remoting::remoting::RemotingService; +use rocketmq_remoting::rpc::rpc_request_header::RpcRequestHeader; use rocketmq_remoting::runtime::config::client_config::TokioClientConfig; use rocketmq_remoting::runtime::RPCHook; use tracing::error; use tracing::warn; use crate::base::client_config::ClientConfig; +use crate::consumer::consumer_impl::pull_request_ext::PullResultExt; +use crate::consumer::pull_callback::PullCallback; +use crate::consumer::pull_result::PullResult; +use crate::consumer::pull_status::PullStatus; use crate::error::MQClientError; +use crate::error::MQClientError::MQBrokerError; use crate::factory::mq_client_instance::MQClientInstance; use crate::hook::send_message_context::SendMessageContext; use crate::implementation::client_remoting_processor::ClientRemotingProcessor; @@ -740,7 +753,11 @@ impl MQClientAPIImpl { request_header, ); self.remoting_client - .invoke_oneway(addr.to_string(), request, timeout_millis) + .invoke_oneway( + mix_all::broker_vip_channel(self.client_config.vip_channel_enabled, addr), + request, + timeout_millis, + ) .await; Ok(()) } @@ -769,4 +786,212 @@ impl MQClientAPIImpl { Ok(()) } } + + pub async fn query_consumer_offset( + &mut self, + addr: &str, + request_header: QueryConsumerOffsetRequestHeader, + timeout_millis: u64, + ) -> Result { + let request = RemotingCommand::create_request_command( + RequestCode::QueryConsumerOffset, + request_header, + ); + let response = self + .remoting_client + .invoke_async( + Some(mix_all::broker_vip_channel( + self.client_config.vip_channel_enabled, + addr, + )), + request, + timeout_millis, + ) + .await?; + match ResponseCode::from(response.code()) { + ResponseCode::Success => { + let response_header = response + .decode_command_custom_header::() + .unwrap(); + return Ok(response_header.offset.unwrap()); + } + ResponseCode::QueryNotFound => { + return Err(MQClientError::OffsetNotFoundError( + response.code(), + response.remark().map_or("".to_string(), |s| s.to_string()), + addr.to_string(), + )) + } + _ => {} + } + Err(MQClientError::MQBrokerError( + response.code(), + response.remark().map_or("".to_string(), |s| s.to_string()), + addr.to_string(), + )) + } + + pub async fn pull_message( + &mut self, + addr: &str, + request_header: PullMessageRequestHeader, + timeout_millis: u64, + communication_mode: CommunicationMode, + pull_callback: PCB, + ) -> Result> + where + PCB: PullCallback, + { + let request = if PullSysFlag::has_lite_pull_flag(request_header.sys_flag as u32) { + RemotingCommand::create_request_command(RequestCode::LitePullMessage, request_header) + } else { + RemotingCommand::create_request_command(RequestCode::PullMessage, request_header) + }; + match communication_mode { + CommunicationMode::Sync => { + let result_ext = self + .pull_message_sync(addr, request, timeout_millis) + .await?; + Ok(Some(result_ext)) + } + CommunicationMode::Async => { + self.pull_message_async(addr, request, timeout_millis, pull_callback) + .await?; + Ok(None) + } + CommunicationMode::Oneway => Ok(None), + } + } + + async fn pull_message_sync( + &mut self, + addr: &str, + request: RemotingCommand, + timeout_millis: u64, + ) -> Result { + let response = self + .remoting_client + .invoke_async(Some(addr.to_string()), request, timeout_millis) + .await?; + self.process_pull_response(response, addr).await + } + + async fn pull_message_async( + &mut self, + addr: &str, + request: RemotingCommand, + timeout_millis: u64, + mut pull_callback: PCB, + ) -> Result<()> + where + PCB: PullCallback, + { + match self + .remoting_client + .invoke_async(Some(addr.to_string()), request, timeout_millis) + .await + { + Ok(response) => { + let result = self.process_pull_response(response, addr).await; + match result { + Ok(pull_result) => { + pull_callback.on_success(pull_result).await; + } + Err(error) => { + pull_callback.on_exception(Box::new(error)); + } + } + } + Err(err) => { + pull_callback.on_exception(Box::new(err)); + } + } + Ok(()) + } + + async fn process_pull_response( + &mut self, + mut response: RemotingCommand, + addr: &str, + ) -> Result { + let pull_status = match ResponseCode::from(response.code()) { + ResponseCode::Success => PullStatus::Found, + ResponseCode::PullNotFound => PullStatus::NoNewMsg, + ResponseCode::PullRetryImmediately => PullStatus::NoMatchedMsg, + ResponseCode::PullOffsetMoved => PullStatus::OffsetIllegal, + _ => { + return Err(MQBrokerError( + response.code(), + response.remark().map_or("".to_string(), |s| s.to_string()), + addr.to_string(), + )) + } + }; + let response_header = response + .decode_command_custom_header::() + .unwrap(); + let pull_result = PullResultExt { + pull_result: PullResult { + pull_status, + next_begin_offset: response_header.next_begin_offset.unwrap_or(0) as u64, + min_offset: response_header.min_offset.unwrap_or(0) as u64, + max_offset: response_header.max_offset.unwrap_or(0) as u64, + msg_found_list: vec![], + }, + suggest_which_broker_id: response_header.suggest_which_broker_id.unwrap_or(0), + message_binary: response.take_body(), + offset_delta: response_header.offset_delta, + }; + Ok(pull_result) + } + + pub async fn consumer_send_message_back( + &mut self, + addr: &str, + broker_name: &str, + msg: &MessageExt, + consumer_group: &str, + delay_level: i32, + timeout_millis: u64, + max_consume_retry_times: i32, + ) -> Result<()> { + let header = ConsumerSendMsgBackRequestHeader { + offset: msg.commit_log_offset, + group: consumer_group.to_string(), + delay_level, + origin_msg_id: Some(msg.msg_id.clone()), + origin_topic: Some(msg.get_topic().to_string()), + unit_mode: false, + max_reconsume_times: Some(max_consume_retry_times), + rpc_request_header: Some(RpcRequestHeader { + namespace: None, + namespaced: None, + broker_name: Some(broker_name.to_string()), + oneway: None, + }), + }; + + let request_command = + RemotingCommand::create_request_command(RequestCode::ConsumerSendMsgBack, header); + let response = self + .remoting_client + .invoke_async( + Some(mix_all::broker_vip_channel( + self.client_config.vip_channel_enabled, + addr, + )), + request_command, + timeout_millis, + ) + .await?; + if ResponseCode::from(response.code()) == ResponseCode::Success { + Ok(()) + } else { + Err(MQBrokerError( + response.code(), + response.remark().map_or("".to_string(), |s| s.to_string()), + addr.to_string(), + )) + } + } } diff --git a/rocketmq-client/src/producer/default_mq_producer.rs b/rocketmq-client/src/producer/default_mq_producer.rs index c60e9bd5..3eafd100 100644 --- a/rocketmq-client/src/producer/default_mq_producer.rs +++ b/rocketmq-client/src/producer/default_mq_producer.rs @@ -752,12 +752,12 @@ impl MQProducer for DefaultMQProducer { Ok(()) } - async fn send_to_queue(&mut self, mut msg: M, mut mq: MessageQueue) -> Result + async fn send_to_queue(&mut self, mut msg: M, mq: MessageQueue) -> Result where M: MessageTrait + Clone + Send + Sync, { msg.set_topic(self.with_namespace(msg.get_topic()).as_str()); - self.client_config.queue_with_namespace(&mut mq); + let mq = self.client_config.queue_with_namespace(mq); let result = if self.get_auto_batch() && msg.as_any().downcast_ref::().is_none() { self.send_by_accumulator(msg, Some(mq), None).await @@ -770,14 +770,14 @@ impl MQProducer for DefaultMQProducer { async fn send_to_queue_with_timeout( &mut self, mut msg: M, - mut mq: MessageQueue, + mq: MessageQueue, timeout: u64, ) -> Result where M: MessageTrait + Clone + Send + Sync, { msg.set_topic(self.with_namespace(msg.get_topic()).as_str()); - self.client_config.queue_with_namespace(&mut mq); + let mq = self.client_config.queue_with_namespace(mq); let result = self .default_mqproducer_impl .as_mut() @@ -790,7 +790,7 @@ impl MQProducer for DefaultMQProducer { async fn send_to_queue_with_callback( &mut self, mut msg: M, - mut mq: MessageQueue, + mq: MessageQueue, send_callback: F, ) -> Result<()> where @@ -798,7 +798,7 @@ impl MQProducer for DefaultMQProducer { F: Fn(Option<&SendResult>, Option<&dyn std::error::Error>) + Send + Sync + 'static, { msg.set_topic(self.with_namespace(msg.get_topic()).as_str()); - self.client_config.queue_with_namespace(&mut mq); + let mq = self.client_config.queue_with_namespace(mq); if self.get_auto_batch() && msg.as_any().downcast_ref::().is_none() { self.send_by_accumulator(msg, Some(mq), Some(Arc::new(send_callback))) @@ -835,12 +835,12 @@ impl MQProducer for DefaultMQProducer { .await } - async fn send_oneway_to_queue(&mut self, mut msg: M, mut mq: MessageQueue) -> Result<()> + async fn send_oneway_to_queue(&mut self, mut msg: M, mq: MessageQueue) -> Result<()> where M: MessageTrait + Clone + Send + Sync, { msg.set_topic(self.with_namespace(msg.get_topic()).as_str()); - self.client_config.queue_with_namespace(&mut mq); + let mq = self.client_config.queue_with_namespace(mq); self.default_mqproducer_impl .as_mut() .unwrap() @@ -863,7 +863,7 @@ impl MQProducer for DefaultMQProducer { T: std::any::Any + Send + Sync, { msg.set_topic(self.with_namespace(msg.get_topic()).as_str()); - let mut mq = self + let mq = self .default_mqproducer_impl .as_mut() .unwrap() @@ -874,7 +874,7 @@ impl MQProducer for DefaultMQProducer { self.producer_config.send_msg_timeout() as u64, ) .await?; - self.client_config.queue_with_namespace(&mut mq); + let mq = self.client_config.queue_with_namespace(mq); let result = if self.get_auto_batch() && msg.as_any().downcast_ref::().is_none() { self.send_by_accumulator(msg, Some(mq), None).await @@ -924,7 +924,7 @@ impl MQProducer for DefaultMQProducer { T: std::any::Any + Sync + Send, { msg.set_topic(self.with_namespace(msg.get_topic()).as_str()); - let mut mq = self + let mq = self .default_mqproducer_impl .as_mut() .unwrap() @@ -935,7 +935,7 @@ impl MQProducer for DefaultMQProducer { self.producer_config.send_msg_timeout() as u64, ) .await?; - self.client_config.queue_with_namespace(&mut mq); + let mq = self.client_config.queue_with_namespace(mq); if self.auto_batch() && msg.as_any().downcast_ref::().is_none() { self.send_by_accumulator(msg, Some(mq), send_callback).await } else { diff --git a/rocketmq-client/src/trace/hook/consume_message_trace_hook_impl.rs b/rocketmq-client/src/trace/hook/consume_message_trace_hook_impl.rs index f49ca61a..445b6d27 100644 --- a/rocketmq-client/src/trace/hook/consume_message_trace_hook_impl.rs +++ b/rocketmq-client/src/trace/hook/consume_message_trace_hook_impl.rs @@ -35,11 +35,11 @@ impl ConsumeMessageHook for ConsumeMessageTraceHookImpl { todo!() } - fn consume_message_before(&self, context: Option<&ConsumeMessageContext>) { + fn consume_message_before(&self, context: Option<&mut ConsumeMessageContext>) { todo!() } - fn consume_message_after(&self, context: Option<&ConsumeMessageContext>) { + fn consume_message_after(&self, context: Option<&mut ConsumeMessageContext>) { todo!() } } diff --git a/rocketmq-common/src/common/base/service_state.rs b/rocketmq-common/src/common/base/service_state.rs index 92ada8f7..3dce3103 100644 --- a/rocketmq-common/src/common/base/service_state.rs +++ b/rocketmq-common/src/common/base/service_state.rs @@ -14,6 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::fmt::Display; + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ServiceState { /// Service just created, not started @@ -25,3 +27,14 @@ pub enum ServiceState { /// Service start failure StartFailed, } + +impl Display for ServiceState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ServiceState::CreateJust => write!(f, "CreateJust"), + ServiceState::Running => write!(f, "Running"), + ServiceState::ShutdownAlready => write!(f, "ShutdownAlready"), + ServiceState::StartFailed => write!(f, "StartFailed"), + } + } +} diff --git a/rocketmq-common/src/common/message/message_client_ext.rs b/rocketmq-common/src/common/message/message_client_ext.rs index 8b5daba0..6c2a9861 100644 --- a/rocketmq-common/src/common/message/message_client_ext.rs +++ b/rocketmq-common/src/common/message/message_client_ext.rs @@ -14,9 +14,122 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::any::Any; +use std::collections::HashMap; +use std::fmt::Display; +use std::fmt::Formatter; + +use bytes::Bytes; + +use crate::common::message::message_client_id_setter::MessageClientIDSetter; use crate::common::message::message_ext::MessageExt; +use crate::common::message::MessageTrait; #[derive(Clone, Debug, Default)] pub struct MessageClientExt { pub message_ext_inner: MessageExt, } + +impl MessageClientExt { + pub fn get_offset_msg_id(&self) -> &str { + self.message_ext_inner.msg_id() + } + + pub fn set_offset_msg_id(&mut self, offset_msg_id: impl Into) { + self.message_ext_inner.set_msg_id(offset_msg_id.into()); + } + + pub fn get_msg_id(&self) -> String { + let uniq_id = MessageClientIDSetter::get_uniq_id(&self.message_ext_inner); + if let Some(uniq_id) = uniq_id { + uniq_id + } else { + self.message_ext_inner.msg_id().to_string() + } + } +} + +impl Display for MessageClientExt { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "MessageClientExt {{ message_ext_inner: {:?} }}", + self.message_ext_inner + ) + } +} + +impl MessageTrait for MessageClientExt { + fn put_property(&mut self, key: &str, value: &str) { + self.message_ext_inner.put_property(key, value); + } + + fn clear_property(&mut self, name: &str) { + self.message_ext_inner.clear_property(name); + } + + fn get_property(&self, name: &str) -> Option { + self.message_ext_inner.get_property(name) + } + + fn get_topic(&self) -> &str { + self.message_ext_inner.get_topic() + } + + fn set_topic(&mut self, topic: &str) { + self.message_ext_inner.set_topic(topic); + } + + fn get_flag(&self) -> i32 { + self.message_ext_inner.get_flag() + } + + fn set_flag(&mut self, flag: i32) { + self.message_ext_inner.set_flag(flag); + } + + fn get_body(&self) -> Option<&Bytes> { + self.message_ext_inner.get_body() + } + + fn set_body(&mut self, body: Bytes) { + self.message_ext_inner.set_body(body); + } + + fn get_properties(&self) -> &HashMap { + self.message_ext_inner.get_properties() + } + + fn set_properties(&mut self, properties: HashMap) { + self.message_ext_inner.set_properties(properties); + } + + fn get_transaction_id(&self) -> &str { + self.message_ext_inner.get_transaction_id() + } + + fn set_transaction_id(&mut self, transaction_id: &str) { + self.message_ext_inner.set_transaction_id(transaction_id); + } + + fn get_compressed_body_mut(&mut self) -> &mut Option { + self.message_ext_inner.get_compressed_body_mut() + } + + fn get_compressed_body(&self) -> Option<&Bytes> { + self.message_ext_inner.get_compressed_body() + } + + fn set_compressed_body_mut(&mut self, compressed_body: Bytes) { + self.message_ext_inner + .set_compressed_body_mut(compressed_body); + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} diff --git a/rocketmq-common/src/common/message/message_decoder.rs b/rocketmq-common/src/common/message/message_decoder.rs index 87ca01a6..1d4c9c03 100644 --- a/rocketmq-common/src/common/message/message_decoder.rs +++ b/rocketmq-common/src/common/message/message_decoder.rs @@ -28,11 +28,14 @@ use bytes::Bytes; use bytes::BytesMut; use crate::common::compression::compression_type::CompressionType; +use crate::common::message::message_client_ext::MessageClientExt; use crate::common::message::message_ext::MessageExt; use crate::common::message::message_single::Message; +use crate::common::message::MessageTrait; use crate::common::message::MessageVersion; use crate::common::sys_flag::message_sys_flag::MessageSysFlag; use crate::CRC32Utils::crc32; +use crate::MessageAccessor::MessageAccessor; use crate::MessageUtils::build_message_id; use crate::Result; @@ -77,6 +80,32 @@ pub fn string_to_message_properties(properties: Option<&String>) -> HashMap) -> HashMap { + let mut map = HashMap::new(); + if let Some(properties) = properties { + let mut index = 0; + let len = properties.len(); + while index < len { + let new_index = properties[index..] + .find(PROPERTY_SEPARATOR) + .map_or(len, |i| index + i); + if new_index - index >= 3 { + if let Some(kv_sep_index) = properties[index..new_index].find(NAME_VALUE_SEPARATOR) + { + let kv_sep_index = index + kv_sep_index; + if kv_sep_index > index && kv_sep_index < new_index - 1 { + let k = &properties[index..kv_sep_index]; + let v = &properties[kv_sep_index + 1..new_index]; + map.insert(k.to_string(), v.to_string()); + } + } + } + index = new_index + 1; + } + } + map +} + pub fn message_properties_to_string(properties: &HashMap) -> String { let mut len = 0; for (name, value) in properties.iter() { @@ -97,6 +126,41 @@ pub fn message_properties_to_string(properties: &HashMap) -> Str sb } +pub fn decode_client( + byte_buffer: &mut Bytes, + read_body: bool, + de_compress_body: bool, + is_set_properties_string: bool, + check_crc: bool, +) -> Option { + /*if let Some(msg_ext) = decode( + byte_buffer, + read_body, + de_compress_body, + false, + is_set_properties_string, + check_crc, + ) { + Some(MessageClientExt { + message_ext_inner: msg_ext, + }) + } else { + None + }*/ + decode( + byte_buffer, + read_body, + de_compress_body, + false, + is_set_properties_string, + check_crc, + ) + .map(|msg_ext| MessageClientExt { + message_ext_inner: msg_ext, + }) +} + +//this method will optimize later pub fn decode( byte_buffer: &mut Bytes, read_body: bool, @@ -321,11 +385,11 @@ pub fn encode_message(message: &Message) -> Bytes { let properties_length = properties_bytes.len(); let store_size = 4 // 1 TOTALSIZE - + 4 // 2 MAGICCOD - + 4 // 3 BODYCRC - + 4 // 4 FLAG - + 4 + body_len // 4 BODY - + 2 + properties_length; + + 4 // 2 MAGICCOD + + 4 // 3 BODYCRC + + 4 // 4 FLAG + + 4 + body_len // 4 BODY + + 2 + properties_length; let mut bytes = BytesMut::with_capacity(store_size); @@ -352,6 +416,108 @@ pub fn encode_message(message: &Message) -> Bytes { bytes.freeze() } +pub fn decodes_batch( + byte_buffer: &mut Bytes, + read_body: bool, + decompress_body: bool, +) -> Vec { + let mut messages = Vec::new(); + while byte_buffer.has_remaining() { + if let Some(msg_ext) = decode(byte_buffer, read_body, decompress_body, false, false, false) + { + messages.push(msg_ext); + } else { + break; + } + } + messages +} + +pub fn decodes_batch_client( + byte_buffer: &mut Bytes, + read_body: bool, + decompress_body: bool, +) -> Vec { + let mut messages = Vec::new(); + while byte_buffer.has_remaining() { + if let Some(msg_ext) = decode_client(byte_buffer, read_body, decompress_body, false, false) + { + messages.push(msg_ext); + } else { + break; + } + } + messages +} + +pub fn decode_message_client(mut message_ext: MessageExt, vec_: &mut Vec) { + let messages = decode_messages(message_ext.message.body.as_mut().unwrap()); + for message in messages { + let mut message_client_ext = MessageClientExt { + message_ext_inner: MessageExt { + message, + ..MessageExt::default() + }, + }; + message_client_ext.set_topic(message_ext.get_topic()); + message_client_ext.message_ext_inner.queue_offset = message_ext.queue_offset; + message_client_ext.message_ext_inner.queue_id = message_ext.queue_id; + message_client_ext.set_flag(message_ext.get_flag()); + //MessageAccessor::set_properties(&mut + // message_client_ext,message.get_properties().clone()); messageClientExt. + // setBody(message.getBody()) + message_client_ext.message_ext_inner.store_host = message_ext.store_host; + message_client_ext.message_ext_inner.born_host = message_ext.born_host; + message_client_ext.message_ext_inner.store_timestamp = message_ext.store_timestamp; + message_client_ext.message_ext_inner.born_timestamp = message_ext.born_timestamp; + message_client_ext.message_ext_inner.sys_flag = message_ext.sys_flag; + message_client_ext.message_ext_inner.commit_log_offset = message_ext.commit_log_offset; + message_client_ext.set_wait_store_msg_ok(message_ext.is_wait_store_msg_ok()); + vec_.push(message_client_ext); + } +} + +pub fn decode_messages(buffer: &mut Bytes) -> Vec { + let mut messages = Vec::new(); + while buffer.has_remaining() { + let message = decode_message(buffer); + messages.push(message); + } + messages +} + +pub fn decode_message(buffer: &mut Bytes) -> Message { + // 1 TOTALSIZE + let _ = buffer.get_i32(); + + // 2 MAGICCODE + let _ = buffer.get_i32(); + + // 3 BODYCRC + let _ = buffer.get_i32(); + + // 4 FLAG + let flag = buffer.get_i32(); + + // 5 BODY + let body_len = buffer.get_i32(); + let body = buffer.split_to(body_len as usize); + + // 6 properties + let properties_length = buffer.get_i16(); + let properties = buffer.split_to(properties_length as usize); + //string_to_message_properties(Some(&String::from_utf8_lossy(properties.as_ref()). + // to_string())); + let message_properties = str_to_message_properties(Some(str::from_utf8(&properties).unwrap())); + + Message { + body: Some(body), + properties: message_properties, + flag, + ..Message::default() + } +} + #[cfg(test)] mod tests { use bytes::BufMut; diff --git a/rocketmq-common/src/common/message/message_queue.rs b/rocketmq-common/src/common/message/message_queue.rs index 5e4c6580..bb95c046 100644 --- a/rocketmq-common/src/common/message/message_queue.rs +++ b/rocketmq-common/src/common/message/message_queue.rs @@ -63,6 +63,7 @@ impl MessageQueue { &self.topic } + #[inline] pub fn set_topic(&mut self, topic: String) { self.topic = topic; } diff --git a/rocketmq-common/src/common/message/message_single.rs b/rocketmq-common/src/common/message/message_single.rs index 4f1bc4c4..c7f86909 100644 --- a/rocketmq-common/src/common/message/message_single.rs +++ b/rocketmq-common/src/common/message/message_single.rs @@ -54,6 +54,10 @@ impl Message { Self::with_details(topic, String::new(), String::new(), 0, body, true) } + pub fn new_body(topic: impl Into, body: Option) -> Self { + Self::with_details_body(topic, String::new(), String::new(), 0, body, true) + } + pub fn with_tags(topic: impl Into, tags: impl Into, body: &[u8]) -> Self { Self::with_details(topic, tags, String::new(), 0, body, true) } @@ -97,6 +101,36 @@ impl Message { message } + pub fn with_details_body( + topic: impl Into, + tags: impl Into, + keys: impl Into, + flag: i32, + body: Option, + wait_store_msg_ok: bool, + ) -> Self { + let topic = topic.into(); + let tags = tags.into(); + let keys = keys.into(); + let mut message = Message { + topic, + flag, + body, + ..Default::default() + }; + + if !tags.is_empty() { + message.set_tags(tags); + } + + if !keys.is_empty() { + message.set_keys(keys); + } + + message.set_wait_store_msg_ok(wait_store_msg_ok); + message + } + pub fn set_tags(&mut self, tags: String) { self.properties .insert(MessageConst::PROPERTY_TAGS.to_string(), tags); diff --git a/rocketmq-remoting/src/protocol/header.rs b/rocketmq-remoting/src/protocol/header.rs index fd488071..1dc4a647 100644 --- a/rocketmq-remoting/src/protocol/header.rs +++ b/rocketmq-remoting/src/protocol/header.rs @@ -17,21 +17,21 @@ pub mod broker; pub mod check_transaction_state_request_header; pub mod client_request_header; +pub mod consumer_send_msg_back_request_header; pub mod create_topic_request_header; pub mod delete_topic_request_header; pub mod get_all_topic_config_response_header; +pub mod get_consume_stats_request_header; +pub mod get_consumer_connection_list_request_header; pub mod get_consumer_listby_group_request_header; pub mod get_consumer_listby_group_response_header; pub mod get_earliest_msg_storetime_response_header; -pub mod get_max_offset_response_header; -pub mod get_min_offset_response_header; -pub mod get_topic_stats_request_header; - -pub mod get_consume_stats_request_header; -pub mod get_consumer_connection_list_request_header; pub mod get_max_offset_request_header; +pub mod get_max_offset_response_header; pub mod get_min_offset_request_header; +pub mod get_min_offset_response_header; pub mod get_topic_config_request_header; +pub mod get_topic_stats_request_header; pub mod heartbeat_request_header; pub mod message_operation_header; pub mod namesrv; diff --git a/rocketmq-remoting/src/protocol/header/consumer_send_msg_back_request_header.rs b/rocketmq-remoting/src/protocol/header/consumer_send_msg_back_request_header.rs new file mode 100644 index 00000000..c2cf1fe3 --- /dev/null +++ b/rocketmq-remoting/src/protocol/header/consumer_send_msg_back_request_header.rs @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use serde::Deserialize; +use serde::Serialize; + +use crate::protocol::command_custom_header::CommandCustomHeader; +use crate::protocol::command_custom_header::FromMap; +use crate::rpc::rpc_request_header::RpcRequestHeader; + +#[derive(Clone, Debug, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct ConsumerSendMsgBackRequestHeader { + pub offset: i64, + pub group: String, + pub delay_level: i32, + pub origin_msg_id: Option, + pub origin_topic: Option, + pub unit_mode: bool, + pub max_reconsume_times: Option, + #[serde(flatten)] + pub rpc_request_header: Option, +} + +impl ConsumerSendMsgBackRequestHeader { + pub const OFFSET: &'static str = "offset"; + pub const GROUP: &'static str = "group"; + pub const DELAY_LEVEL: &'static str = "delayLevel"; + pub const ORIGIN_MSG_ID: &'static str = "originMsgId"; + pub const ORIGIN_TOPIC: &'static str = "originTopic"; + pub const UNIT_MODE: &'static str = "unitMode"; + pub const MAX_RECONSUME_TIMES: &'static str = "maxReconsumeTimes"; +} + +impl CommandCustomHeader for ConsumerSendMsgBackRequestHeader { + fn to_map(&self) -> Option> { + let mut map = std::collections::HashMap::new(); + map.insert(Self::OFFSET.to_string(), self.offset.to_string()); + map.insert(Self::GROUP.to_string(), self.group.clone()); + map.insert(Self::DELAY_LEVEL.to_string(), self.delay_level.to_string()); + if let Some(value) = &self.origin_msg_id { + map.insert(Self::ORIGIN_MSG_ID.to_string(), value.clone()); + } + if let Some(value) = &self.origin_topic { + map.insert(Self::ORIGIN_TOPIC.to_string(), value.clone()); + } + map.insert(Self::UNIT_MODE.to_string(), self.unit_mode.to_string()); + if let Some(value) = self.max_reconsume_times { + map.insert(Self::MAX_RECONSUME_TIMES.to_string(), value.to_string()); + } + if let Some(ref rpc) = self.rpc_request_header { + if let Some(rpc_map) = rpc.to_map() { + map.extend(rpc_map); + } + } + Some(map) + } +} + +impl FromMap for ConsumerSendMsgBackRequestHeader { + type Target = Self; + + fn from(map: &std::collections::HashMap) -> Option { + Some(ConsumerSendMsgBackRequestHeader { + offset: map.get(Self::OFFSET)?.parse().ok()?, + group: map.get(Self::GROUP)?.clone(), + delay_level: map.get(Self::DELAY_LEVEL)?.parse().ok()?, + origin_msg_id: map.get(Self::ORIGIN_MSG_ID).cloned(), + origin_topic: map.get(Self::ORIGIN_TOPIC).cloned(), + unit_mode: map.get(Self::UNIT_MODE)?.parse().ok()?, + max_reconsume_times: map.get(Self::MAX_RECONSUME_TIMES)?.parse().ok(), + rpc_request_header: ::from(map), + }) + } +} diff --git a/rocketmq-remoting/src/protocol/remoting_command.rs b/rocketmq-remoting/src/protocol/remoting_command.rs index a3a16ef2..e9e52cd4 100644 --- a/rocketmq-remoting/src/protocol/remoting_command.rs +++ b/rocketmq-remoting/src/protocol/remoting_command.rs @@ -547,6 +547,10 @@ impl RemotingCommand { &self.body } + pub fn take_body(&mut self) -> Option { + self.body.take() + } + pub fn suspended(&self) -> bool { self.suspended }