diff --git a/rocketmq-client/src/base/client_config.rs b/rocketmq-client/src/base/client_config.rs index 92d230f7..d81dbee4 100644 --- a/rocketmq-client/src/base/client_config.rs +++ b/rocketmq-client/src/base/client_config.rs @@ -20,6 +20,7 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; +use rocketmq_common::common::message::message_queue::MessageQueue; use rocketmq_common::utils::name_server_address_utils::NameServerAddressUtils; use rocketmq_common::utils::name_server_address_utils::NAMESRV_ENDPOINT_PATTERN; use rocketmq_common::utils::network_util::NetworkUtil; @@ -135,6 +136,17 @@ impl ClientConfig { ) } + pub fn queue_with_namespace(&mut self, queue: &mut MessageQueue) { + if let Some(namespace) = self.get_namespace() { + if !namespace.is_empty() { + queue.set_topic(NamespaceUtil::wrap_namespace( + namespace.as_str(), + queue.get_topic(), + )); + } + } + } + pub fn get_namespace(&mut self) -> Option { let namespace_initialized = self.namespace_initialized.load(Ordering::Acquire); if namespace_initialized { diff --git a/rocketmq-client/src/factory/mq_client_instance.rs b/rocketmq-client/src/factory/mq_client_instance.rs index 1520839a..e97c4297 100644 --- a/rocketmq-client/src/factory/mq_client_instance.rs +++ b/rocketmq-client/src/factory/mq_client_instance.rs @@ -74,7 +74,7 @@ pub struct MQClientInstance { */ admin_ext_table: Arc>>>, mq_client_api_impl: ArcRefCellWrapper, - mq_admin_impl: Arc, + pub(crate) mq_admin_impl: ArcRefCellWrapper, topic_route_table: Arc>>, topic_end_points_table: Arc>>>, @@ -122,7 +122,7 @@ impl MQClientInstance { consumer_table: Arc::new(Default::default()), admin_ext_table: Arc::new(Default::default()), mq_client_api_impl, - mq_admin_impl: Arc::new(MQAdminImpl {}), + mq_admin_impl: ArcRefCellWrapper::new(MQAdminImpl {}), topic_route_table: Arc::new(Default::default()), topic_end_points_table: Arc::new(Default::default()), lock_namesrv: Default::default(), diff --git a/rocketmq-client/src/implementation/mq_admin_impl.rs b/rocketmq-client/src/implementation/mq_admin_impl.rs index d2ea2c6f..9ee08744 100644 --- a/rocketmq-client/src/implementation/mq_admin_impl.rs +++ b/rocketmq-client/src/implementation/mq_admin_impl.rs @@ -14,4 +14,42 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use rocketmq_common::common::message::message_queue::MessageQueue; +use rocketmq_remoting::protocol::namespace_util::NamespaceUtil; + +use crate::base::client_config::ClientConfig; + pub struct MQAdminImpl {} + +impl MQAdminImpl { + pub fn new() -> Self { + MQAdminImpl {} + } +} + +impl MQAdminImpl { + pub fn parse_publish_message_queues( + &mut self, + message_queue_array: &[MessageQueue], + client_config: &mut ClientConfig, + ) -> Vec { + let mut message_queues = Vec::new(); + for message_queue in message_queue_array { + let user_topic = NamespaceUtil::without_namespace_with_namespace( + message_queue.get_topic(), + client_config + .get_namespace() + .unwrap_or("".to_string()) + .as_str(), + ); + + let message_queue = MessageQueue::from_parts( + user_topic, + message_queue.get_broker_name(), + message_queue.get_queue_id(), + ); + message_queues.push(message_queue); + } + message_queues + } +} diff --git a/rocketmq-client/src/producer/default_mq_producer.rs b/rocketmq-client/src/producer/default_mq_producer.rs index 5b74f660..52d67e13 100644 --- a/rocketmq-client/src/producer/default_mq_producer.rs +++ b/rocketmq-client/src/producer/default_mq_producer.rs @@ -45,7 +45,6 @@ use crate::producer::mq_producer::MQProducer; use crate::producer::produce_accumulator::ProduceAccumulator; use crate::producer::producer_impl::default_mq_producer_impl::DefaultMQProducerImpl; use crate::producer::request_callback::RequestCallback; -use crate::producer::send_callback::SendCallback; use crate::producer::send_callback::SendMessageCallback; use crate::producer::send_result::SendResult; use crate::producer::transaction_send_result::TransactionSendResult; @@ -737,93 +736,261 @@ impl MQProducer for DefaultMQProducer { Ok(()) } - async fn send_oneway(&self, msg: &Message) -> Result<()> { - todo!() + async fn send_oneway(&mut self, mut msg: M) -> Result<()> + where + M: MessageTrait + Clone + Send + Sync, + { + msg.set_topic(self.with_namespace(msg.get_topic()).as_str()); + self.default_mqproducer_impl + .as_mut() + .unwrap() + .send_oneway(msg) + .await?; + Ok(()) } - async fn send_to_queue(&self, msg: &Message, mq: &MessageQueue) -> Result { - todo!() + async fn send_to_queue(&mut self, mut msg: M, mut mq: MessageQueue) -> Result + where + M: MessageTrait + Clone + Send + Sync, + { + msg.set_topic(self.with_namespace(msg.get_topic()).as_str()); + self.client_config.queue_with_namespace(&mut mq); + let result = + if self.get_auto_batch() && msg.as_any().downcast_ref::().is_none() { + self.send_by_accumulator(msg, Some(mq), None).await + } else { + self.send_direct(msg, Some(mq), None).await + }?; + Ok(result.expect("SendResult should not be None")) } - async fn send_to_queue_with_timeout( - &self, - msg: &Message, - mq: &MessageQueue, + async fn send_to_queue_with_timeout( + &mut self, + mut msg: M, + mut mq: MessageQueue, timeout: u64, - ) -> Result { - todo!() + ) -> Result + where + M: MessageTrait + Clone + Send + Sync, + { + msg.set_topic(self.with_namespace(msg.get_topic()).as_str()); + self.client_config.queue_with_namespace(&mut mq); + let result = self + .default_mqproducer_impl + .as_mut() + .unwrap() + .sync_send_with_message_queue_timeout(msg, mq, timeout) + .await?; + Ok(result.expect("SendResult should not be None")) } - async fn send_to_queue_with_callback( - &self, - msg: &Message, - mq: &MessageQueue, - send_callback: impl SendCallback, - ) { - todo!() + async fn send_to_queue_with_callback( + &mut self, + mut msg: M, + mut mq: MessageQueue, + send_callback: F, + ) -> Result<()> + where + M: MessageTrait + Clone + Send + Sync, + F: Fn(Option<&SendResult>, Option<&dyn std::error::Error>) + Send + Sync + 'static, + { + msg.set_topic(self.with_namespace(msg.get_topic()).as_str()); + self.client_config.queue_with_namespace(&mut mq); + + if self.get_auto_batch() && msg.as_any().downcast_ref::().is_none() { + self.send_by_accumulator(msg, Some(mq), Some(Arc::new(send_callback))) + .await + } else { + self.send_direct(msg, Some(mq), Some(Arc::new(send_callback))) + .await + }?; + + Ok(()) } - async fn send_to_queue_with_callback_timeout( - &self, - msg: &Message, - mq: &MessageQueue, - send_callback: impl SendCallback, + async fn send_to_queue_with_callback_timeout( + &mut self, + mut msg: M, + mq: MessageQueue, + send_callback: F, timeout: u64, - ) { - todo!() + ) -> Result<()> + where + M: MessageTrait + Clone + Send + Sync, + F: Fn(Option<&SendResult>, Option<&dyn std::error::Error>) + Send + Sync + 'static, + { + msg.set_topic(self.with_namespace(msg.get_topic()).as_str()); + self.default_mqproducer_impl + .as_mut() + .unwrap() + .async_send_batch_to_queue_with_callback_timeout( + msg, + mq, + Some(Arc::new(send_callback)), + timeout, + ) + .await } - async fn send_oneway_to_queue(&self, msg: &Message, mq: &MessageQueue) -> Result<()> { - todo!() + async fn send_oneway_to_queue(&mut self, mut msg: M, mut mq: MessageQueue) -> Result<()> + where + M: MessageTrait + Clone + Send + Sync, + { + msg.set_topic(self.with_namespace(msg.get_topic()).as_str()); + self.client_config.queue_with_namespace(&mut mq); + self.default_mqproducer_impl + .as_mut() + .unwrap() + .send_oneway_with_message_queue(msg, mq) + .await } - async fn send_with_selector( - &self, - msg: &Message, - selector: impl MessageQueueSelector, - arg: &str, - ) -> Result { - todo!() + async fn send_with_selector( + &mut self, + mut msg: M, + selector: S, + arg: T, + ) -> Result + where + M: MessageTrait + Clone + Send + Sync, + S: Fn(&[MessageQueue], &dyn MessageTrait, &dyn std::any::Any) -> Option + + Send + + Sync + + 'static, + T: std::any::Any + Send + Sync, + { + msg.set_topic(self.with_namespace(msg.get_topic()).as_str()); + let mut mq = self + .default_mqproducer_impl + .as_mut() + .unwrap() + .invoke_message_queue_selector( + &msg, + Arc::new(selector), + &arg, + self.producer_config.send_msg_timeout() as u64, + ) + .await?; + self.client_config.queue_with_namespace(&mut mq); + let result = + if self.get_auto_batch() && msg.as_any().downcast_ref::().is_none() { + self.send_by_accumulator(msg, Some(mq), None).await + } else { + self.send_direct(msg, Some(mq), None).await + }?; + + Ok(result.expect("SendResult should not be None")) } - async fn send_with_selector_timeout( - &self, - msg: &Message, - selector: impl MessageQueueSelector, - arg: &str, + async fn send_with_selector_timeout( + &mut self, + mut msg: M, + selector: S, + arg: T, timeout: u64, - ) -> Result { - todo!() + ) -> Result + where + M: MessageTrait + Clone + Send + Sync, + S: Fn(&[MessageQueue], &dyn MessageTrait, &dyn std::any::Any) -> Option + + Send + + Sync + + 'static, + T: std::any::Any + Sync + Send, + { + msg.set_topic(self.with_namespace(msg.get_topic()).as_str()); + self.default_mqproducer_impl + .as_mut() + .unwrap() + .send_with_selector_timeout(msg, Arc::new(selector), arg, timeout) + .await } - async fn send_with_selector_callback( - &self, - msg: &Message, - selector: impl MessageQueueSelector, - arg: &str, - send_callback: impl SendCallback, - ) { - todo!() + async fn send_with_selector_callback( + &mut self, + mut msg: M, + selector: S, + arg: T, + send_callback: Option, + ) -> Result<()> + where + M: MessageTrait + Clone + Send + Sync, + S: Fn(&[MessageQueue], &dyn MessageTrait, &dyn std::any::Any) -> Option + + Send + + Sync + + 'static, + T: std::any::Any + Sync + Send, + { + msg.set_topic(self.with_namespace(msg.get_topic()).as_str()); + let mut mq = self + .default_mqproducer_impl + .as_mut() + .unwrap() + .invoke_message_queue_selector( + &msg, + Arc::new(selector), + &arg, + self.producer_config.send_msg_timeout() as u64, + ) + .await?; + self.client_config.queue_with_namespace(&mut mq); + if self.auto_batch() && msg.as_any().downcast_ref::().is_none() { + self.send_by_accumulator(msg, Some(mq), send_callback).await + } else { + self.send_direct(msg, Some(mq), send_callback).await + }?; + Ok(()) } - async fn send_with_selector_callback_timeout( - &self, - msg: &Message, - selector: impl MessageQueueSelector, - arg: &str, - send_callback: impl SendCallback, + async fn send_with_selector_callback_timeout( + &mut self, + mut msg: M, + selector: S, + arg: T, + send_callback: Option, timeout: u64, - ) { - todo!() + ) -> Result<()> + where + M: MessageTrait + Clone + Send + Sync, + S: Fn(&[MessageQueue], &dyn MessageTrait, &dyn std::any::Any) -> Option + + Send + + Sync + + 'static, + T: std::any::Any + Sync + Send, + { + msg.set_topic(self.with_namespace(msg.get_topic()).as_str()); + self.default_mqproducer_impl + .as_mut() + .unwrap() + .send_with_selector_callback_timeout( + msg, + Arc::new(selector), + arg, + send_callback, + timeout, + ) + .await } - async fn send_oneway_with_selector( - &self, - msg: &Message, - selector: impl MessageQueueSelector, - arg: &str, - ) -> Result<()> { - todo!() + async fn send_oneway_with_selector( + &mut self, + mut msg: M, + selector: S, + arg: T, + ) -> Result<()> + where + M: MessageTrait + Clone + Send + Sync, + S: Fn(&[MessageQueue], &dyn MessageTrait, &dyn std::any::Any) -> Option + + Send + + Sync + + 'static, + T: std::any::Any + Sync + Send, + { + msg.set_topic(self.with_namespace(msg.get_topic()).as_str()); + self.default_mqproducer_impl + .as_mut() + .unwrap() + .send_oneway_with_selector(msg, Arc::new(selector), arg) + .await } async fn send_message_in_transaction( diff --git a/rocketmq-client/src/producer/message_queue_selector.rs b/rocketmq-client/src/producer/message_queue_selector.rs index 2024eaf0..9c8af06c 100644 --- a/rocketmq-client/src/producer/message_queue_selector.rs +++ b/rocketmq-client/src/producer/message_queue_selector.rs @@ -14,8 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::sync::Arc; + use rocketmq_common::common::message::message_queue::MessageQueue; use rocketmq_common::common::message::message_single::Message; +use rocketmq_common::common::message::MessageTrait; + +pub type MessageQueueSelectorFn = Arc< + dyn Fn(&[MessageQueue], &dyn MessageTrait, &dyn std::any::Any) -> Option + + Send + + Sync, +>; /// A trait for selecting a message queue. /// diff --git a/rocketmq-client/src/producer/mq_producer.rs b/rocketmq-client/src/producer/mq_producer.rs index 83a451a7..e1ba0b7e 100644 --- a/rocketmq-client/src/producer/mq_producer.rs +++ b/rocketmq-client/src/producer/mq_producer.rs @@ -22,7 +22,7 @@ use rocketmq_common::common::message::MessageTrait; use crate::producer::message_queue_selector::MessageQueueSelector; use crate::producer::request_callback::RequestCallback; -use crate::producer::send_callback::SendCallback; +use crate::producer::send_callback::SendMessageCallback; use crate::producer::send_result::SendResult; use crate::producer::transaction_send_result::TransactionSendResult; use crate::Result; @@ -118,7 +118,9 @@ pub trait MQProducerLocal { /// /// # Returns /// A `Result` indicating success or failure. - async fn send_oneway(&self, msg: &Message) -> Result<()>; + async fn send_oneway(&mut self, msg: M) -> Result<()> + where + M: MessageTrait + Clone + Send + Sync; /// Sends a message to a specific queue. /// @@ -130,7 +132,9 @@ pub trait MQProducerLocal { /// /// # Returns /// A `Result` containing the `SendResult`, or an error. - async fn send_to_queue(&self, msg: &Message, mq: &MessageQueue) -> Result; + async fn send_to_queue(&mut self, msg: M, mq: MessageQueue) -> Result + where + M: MessageTrait + Clone + Send + Sync; /// Sends a message to a specific queue with a timeout. /// @@ -144,12 +148,14 @@ pub trait MQProducerLocal { /// /// # Returns /// A `Result` containing the `SendResult`, or an error. - async fn send_to_queue_with_timeout( - &self, - msg: &Message, - mq: &MessageQueue, + async fn send_to_queue_with_timeout( + &mut self, + msg: M, + mq: MessageQueue, timeout: u64, - ) -> Result; + ) -> Result + where + M: MessageTrait + Clone + Send + Sync; /// Sends a message to a specific queue with a callback. /// @@ -160,12 +166,15 @@ pub trait MQProducerLocal { /// * `msg` - A reference to the `Message` to be sent. /// * `mq` - A reference to the `MessageQueue` where the message should be sent. /// * `send_callback` - A callback function to be invoked with the result of the send operation. - async fn send_to_queue_with_callback( - &self, - msg: &Message, - mq: &MessageQueue, - send_callback: impl SendCallback, - ); + async fn send_to_queue_with_callback( + &mut self, + msg: M, + mq: MessageQueue, + send_callback: F, + ) -> Result<()> + where + M: MessageTrait + Clone + Send + Sync, + F: Fn(Option<&SendResult>, Option<&dyn std::error::Error>) + Send + Sync + 'static; /// Sends a message to a specific queue with a callback and a timeout. /// @@ -177,13 +186,16 @@ pub trait MQProducerLocal { /// * `mq` - A reference to the `MessageQueue` where the message should be sent. /// * `send_callback` - A callback function to be invoked with the result of the send operation. /// * `timeout` - The timeout duration in milliseconds. - async fn send_to_queue_with_callback_timeout( - &self, - msg: &Message, - mq: &MessageQueue, - send_callback: impl SendCallback, + async fn send_to_queue_with_callback_timeout( + &mut self, + msg: M, + mq: MessageQueue, + send_callback: F, timeout: u64, - ); + ) -> Result<()> + where + M: MessageTrait + Clone + Send + Sync, + F: Fn(Option<&SendResult>, Option<&dyn std::error::Error>) + Send + Sync + 'static; /// Sends a message to a specific queue without waiting for a response. /// @@ -196,7 +208,9 @@ pub trait MQProducerLocal { /// /// # Returns /// A `Result` indicating success or failure. - async fn send_oneway_to_queue(&self, msg: &Message, mq: &MessageQueue) -> Result<()>; + async fn send_oneway_to_queue(&mut self, msg: M, mq: MessageQueue) -> Result<()> + where + M: MessageTrait + Clone + Send + Sync; /// Sends a message with a selector. /// @@ -209,12 +223,19 @@ pub trait MQProducerLocal { /// /// # Returns /// A `Result` containing the `SendResult`, or an error. - async fn send_with_selector( - &self, - msg: &Message, - selector: impl MessageQueueSelector, - arg: &str, - ) -> Result; + async fn send_with_selector( + &mut self, + msg: M, + selector: S, + arg: T, + ) -> Result + where + M: MessageTrait + Clone + Send + Sync, + S: Fn(&[MessageQueue], &dyn MessageTrait, &dyn std::any::Any) -> Option + + Send + + Sync + + 'static, + T: std::any::Any + Sync + Send; /// Sends a message with a selector and a timeout. /// @@ -229,13 +250,20 @@ pub trait MQProducerLocal { /// /// # Returns /// A `Result` containing the `SendResult`, or an error. - async fn send_with_selector_timeout( - &self, - msg: &Message, - selector: impl MessageQueueSelector, - arg: &str, + async fn send_with_selector_timeout( + &mut self, + msg: M, + selector: S, + arg: T, timeout: u64, - ) -> Result; + ) -> Result + where + M: MessageTrait + Clone + Send + Sync, + S: Fn(&[MessageQueue], &dyn MessageTrait, &dyn std::any::Any) -> Option + + Send + + Sync + + 'static, + T: std::any::Any + Sync + Send; /// Sends a message with a selector and a callback. /// @@ -247,13 +275,20 @@ pub trait MQProducerLocal { /// * `selector` - A message queue selector to determine the target queue. /// * `arg` - An argument to be used by the selector. /// * `send_callback` - A callback function to be invoked with the result of the send operation. - async fn send_with_selector_callback( - &self, - msg: &Message, - selector: impl MessageQueueSelector, - arg: &str, - send_callback: impl SendCallback, - ); + async fn send_with_selector_callback( + &mut self, + msg: M, + selector: S, + arg: T, + send_callback: Option, + ) -> Result<()> + where + M: MessageTrait + Clone + Send + Sync, + S: Fn(&[MessageQueue], &dyn MessageTrait, &dyn std::any::Any) -> Option + + Send + + Sync + + 'static, + T: std::any::Any + Sync + Send; /// Sends a message with a selector, a callback, and a timeout. /// @@ -267,14 +302,21 @@ pub trait MQProducerLocal { /// * `arg` - An argument to be used by the selector. /// * `send_callback` - A callback function to be invoked with the result of the send operation. /// * `timeout` - The timeout duration in milliseconds. - async fn send_with_selector_callback_timeout( - &self, - msg: &Message, - selector: impl MessageQueueSelector, - arg: &str, - send_callback: impl SendCallback, + async fn send_with_selector_callback_timeout( + &mut self, + msg: M, + selector: S, + arg: T, + send_callback: Option, timeout: u64, - ); + ) -> Result<()> + where + M: MessageTrait + Clone + Send + Sync, + S: Fn(&[MessageQueue], &dyn MessageTrait, &dyn std::any::Any) -> Option + + Send + + Sync + + 'static, + T: std::any::Any + Sync + Send; /// Sends a message with a selector without waiting for a response. /// @@ -288,12 +330,19 @@ pub trait MQProducerLocal { /// /// # Returns /// A `Result` indicating success or failure. - async fn send_oneway_with_selector( - &self, - msg: &Message, - selector: impl MessageQueueSelector, - arg: &str, - ) -> Result<()>; + async fn send_oneway_with_selector( + &mut self, + msg: M, + selector: S, + arg: T, + ) -> Result<()> + where + M: MessageTrait + Clone + Send + Sync, + S: Fn(&[MessageQueue], &dyn MessageTrait, &dyn std::any::Any) -> Option + + Send + + Sync + + 'static, + T: std::any::Any + Sync + Send; /// Sends a message in a transaction. /// diff --git a/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs b/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs index baa0e541..1b6dd034 100644 --- a/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs +++ b/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs @@ -72,6 +72,7 @@ use crate::latency::mq_fault_strategy::MQFaultStrategy; use crate::latency::resolver::Resolver; use crate::latency::service_detector::ServiceDetector; use crate::producer::default_mq_producer::ProducerConfig; +use crate::producer::message_queue_selector::MessageQueueSelectorFn; use crate::producer::producer_impl::mq_producer_inner::MQProducerInner; use crate::producer::producer_impl::topic_publish_info::TopicPublishInfo; use crate::producer::send_callback::SendMessageCallback; @@ -187,6 +188,40 @@ impl DefaultMQProducerImpl { .await } + #[inline] + pub async fn send_oneway(&mut self, msg: T) -> Result<()> + where + T: MessageTrait + Clone + Send + Sync, + { + self.send_default_impl( + msg, + CommunicationMode::Oneway, + None, + self.producer_config.send_msg_timeout() as u64, + ) + .await?; + Ok(()) + } + + pub async fn send_oneway_with_message_queue( + &mut self, + msg: T, + mq: MessageQueue, + ) -> Result<()> + where + T: MessageTrait + Clone + Send + Sync, + { + self.make_sure_state_ok()?; + Validators::check_message(Some(&msg), self.producer_config.as_ref())?; + self.send_default_impl( + msg, + CommunicationMode::Oneway, + None, + self.producer_config.send_msg_timeout() as u64, + ) + .await?; + Ok(()) + } #[inline] pub async fn sync_send_with_message_queue_timeout( &mut self, @@ -251,6 +286,58 @@ impl DefaultMQProducerImpl { .await } + pub async fn send_with_selector_callback_timeout( + &mut self, + msg: M, + selector: MessageQueueSelectorFn, + arg: T, + send_callback: Option, + timeout: u64, + ) -> Result<()> + where + M: MessageTrait + Clone + Send + Sync, + T: std::any::Any + Sync + Send, + { + unimplemented!("send_with_selector_callback_timeout") + } + + pub async fn send_oneway_with_selector( + &mut self, + msg: M, + selector: MessageQueueSelectorFn, + arg: T, + ) -> Result<()> + where + M: MessageTrait + Clone + Send + Sync, + T: std::any::Any + Sync + Send, + { + self.send_select_impl( + msg, + selector, + arg, + CommunicationMode::Oneway, + None, + self.producer_config.send_msg_timeout() as u64, + ) + .await + } + + pub async fn send_select_impl( + &mut self, + msg: M, + selector: MessageQueueSelectorFn, + arg: T, + communication_mode: CommunicationMode, + send_message_callback: Option, + timeout: u64, + ) -> Result<()> + where + M: MessageTrait + Clone + Send + Sync, + T: std::any::Any + Sync + Send, + { + unimplemented!() + } + #[inline] pub async fn async_send_batch_to_queue_with_callback_timeout( &mut self, @@ -1128,6 +1215,148 @@ impl DefaultMQProducerImpl { } Ok(()) } + + pub async fn invoke_message_queue_selector( + &mut self, + msg: &M, + selector: MessageQueueSelectorFn, + arg: &T, + timeout: u64, + ) -> Result + where + M: MessageTrait + Clone, + T: std::any::Any + Send, + { + let begin_start_time = Instant::now(); + self.make_sure_state_ok()?; + Validators::check_message(Some(msg), self.producer_config.as_ref())?; + let topic_publish_info = self.try_to_find_topic_publish_info(msg.get_topic()).await; + if let Some(topic_publish_info) = topic_publish_info { + if topic_publish_info.ok() { + let message_queue_list = self + .client_instance + .as_mut() + .unwrap() + .mq_admin_impl + .parse_publish_message_queues( + &topic_publish_info.message_queue_list, + &mut self.client_config, + ); + let mut user_message = msg.clone(); + let user_topic = NamespaceUtil::without_namespace_with_namespace( + user_message.get_topic(), + self.client_config + .get_namespace() + .unwrap_or("".to_string()) + .as_str(), + ); + user_message.set_topic(user_topic.as_str()); + let message_queue = selector(&message_queue_list, msg, arg); + let cost_time = begin_start_time.elapsed().as_millis() as u64; + if timeout < cost_time { + return Err(MQClientError::RemotingTooMuchRequestException( + "sendSelectImpl call timeout".to_string(), + )); + } + if let Some(message_queue) = message_queue { + return Ok(message_queue); + } + return Err(MQClientError::MQClientException( + -1, + "select message queue return None.".to_string(), + )); + } + } + self.validate_name_server_setting(); + Err(MQClientException( + -1, + "select message queue return null.".to_string(), + )) + } + + pub async fn send_with_selector_timeout( + &mut self, + msg: M, + selector: MessageQueueSelectorFn, + arg: T, + timeout: u64, + ) -> Result + where + M: MessageTrait + Clone + Send + Sync, + T: std::any::Any + Sync + Send, + { + self.send_with_selector_timeout_impl(msg, selector, arg, timeout, None) + .await + } + + async fn send_with_selector_timeout_impl( + &mut self, + mut msg: M, + selector: MessageQueueSelectorFn, + arg: T, + timeout: u64, + send_callback: Option, + ) -> Result + where + M: MessageTrait + Clone + Send + Sync, + T: std::any::Any + Sync + Send, + { + let begin_start_time = Instant::now(); + self.make_sure_state_ok()?; + Validators::check_message(Some(&msg), self.producer_config.as_ref())?; + let topic_publish_info = self.try_to_find_topic_publish_info(msg.get_topic()).await; + if let Some(topic_publish_info) = topic_publish_info { + if topic_publish_info.ok() { + let message_queue_list = self + .client_instance + .as_mut() + .unwrap() + .mq_admin_impl + .parse_publish_message_queues( + &topic_publish_info.message_queue_list, + &mut self.client_config, + ); + let mut user_message = msg.clone(); + let user_topic = NamespaceUtil::without_namespace_with_namespace( + user_message.get_topic(), + self.client_config + .get_namespace() + .unwrap_or("".to_string()) + .as_str(), + ); + user_message.set_topic(user_topic.as_str()); + let message_queue = selector(&message_queue_list, &msg, &arg); + let cost_time = begin_start_time.elapsed().as_millis() as u64; + if timeout < cost_time { + return Err(MQClientError::RemotingTooMuchRequestException( + "sendSelectImpl call timeout".to_string(), + )); + } + if message_queue.is_some() { + let result = self + .send_kernel_impl( + &mut msg, + message_queue.as_ref().unwrap(), + CommunicationMode::Sync, + send_callback, + None, + timeout - cost_time, + ) + .await?; + return Ok(result.unwrap()); + } + return Err(MQClientError::MQClientException( + -1, + "select message queue return None.".to_string(), + )); + } + } + self.validate_name_server_setting()?; + Err(MQClientError::MQClientException( + -1, + "No route info for this topic, ".to_string(), + )) + } } impl MQProducerInner for DefaultMQProducerImpl { diff --git a/rocketmq-common/src/common/message/message_queue.rs b/rocketmq-common/src/common/message/message_queue.rs index 12de8462..c8fabb02 100644 --- a/rocketmq-common/src/common/message/message_queue.rs +++ b/rocketmq-common/src/common/message/message_queue.rs @@ -44,10 +44,14 @@ impl MessageQueue { } } - pub fn from_parts(topic: &str, broker_name: &str, queue_id: i32) -> Self { + pub fn from_parts( + topic: impl Into, + broker_name: impl Into, + queue_id: i32, + ) -> Self { MessageQueue { - topic: topic.to_string(), - broker_name: broker_name.to_string(), + topic: topic.into(), + broker_name: broker_name.into(), queue_id, } } @@ -60,6 +64,7 @@ impl MessageQueue { self.topic = topic; } + #[inline] pub fn get_broker_name(&self) -> &str { &self.broker_name } @@ -68,6 +73,7 @@ impl MessageQueue { self.broker_name = broker_name; } + #[inline] pub fn get_queue_id(&self) -> i32 { self.queue_id } diff --git a/rocketmq-remoting/src/rpc/client_metadata.rs b/rocketmq-remoting/src/rpc/client_metadata.rs index 7df5db77..61b5d7f7 100644 --- a/rocketmq-remoting/src/rpc/client_metadata.rs +++ b/rocketmq-remoting/src/rpc/client_metadata.rs @@ -176,7 +176,7 @@ impl ClientMetadata { for global_id in info.curr_id_map.as_ref().unwrap().keys() { let mq = MessageQueue::from_parts( topic, - &TopicQueueMappingUtils::get_mock_broker_name( + TopicQueueMappingUtils::get_mock_broker_name( info.scope.as_ref().unwrap().as_str(), ), *global_id, @@ -194,7 +194,7 @@ impl ClientMetadata { for i in 0..max_total_nums { let mq = MessageQueue::from_parts( topic, - &TopicQueueMappingUtils::get_mock_broker_name(&scope), + TopicQueueMappingUtils::get_mock_broker_name(&scope), i, ); if !mq_endpoints.contains_key(&mq) {