diff --git a/rocketmq-client/examples/quickstart/consumer.rs b/rocketmq-client/examples/quickstart/consumer.rs index 3c773569..2b284bd9 100644 --- a/rocketmq-client/examples/quickstart/consumer.rs +++ b/rocketmq-client/examples/quickstart/consumer.rs @@ -44,7 +44,7 @@ pub async fn main() -> Result<()> { consumer.subscribe(TOPIC, "*")?; consumer.register_message_listener_concurrently(MyMessageListener); consumer.start().await?; - + let _ = tokio::signal::ctrl_c().await; Ok(()) } diff --git a/rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs b/rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs index 186e2fe2..f6d88b09 100644 --- a/rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs +++ b/rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs @@ -14,19 +14,68 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +use std::sync::Arc; +use std::time::Duration; + use rocketmq_common::common::message::message_ext::MessageExt; use rocketmq_common::common::message::message_queue::MessageQueue; +use rocketmq_common::ArcRefCellWrapper; +use rocketmq_common::WeakCellWrapper; use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult; +use crate::base::client_config::ClientConfig; use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageServiceTrait; +use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl; use crate::consumer::consumer_impl::pop_process_queue::PopProcessQueue; use crate::consumer::consumer_impl::process_queue::ProcessQueue; +use crate::consumer::default_mq_push_consumer::ConsumerConfig; +use crate::consumer::listener::message_listener_concurrently::ArcBoxMessageListenerConcurrently; + +#[derive(Clone)] +pub struct ConsumeMessageConcurrentlyService { + pub(crate) default_mqpush_consumer_impl: Option>, + pub(crate) client_config: ArcRefCellWrapper, + pub(crate) consumer_config: ArcRefCellWrapper, + pub(crate) consumer_group: Arc, + pub(crate) message_listener: ArcBoxMessageListenerConcurrently, +} + +impl ConsumeMessageConcurrentlyService { + pub fn new( + client_config: ArcRefCellWrapper, + consumer_config: ArcRefCellWrapper, + consumer_group: String, + message_listener: ArcBoxMessageListenerConcurrently, + ) -> Self { + Self { + default_mqpush_consumer_impl: None, + client_config, + consumer_config, + consumer_group: Arc::new(consumer_group), + message_listener, + } + } +} -pub struct ConsumeMessageConcurrentlyService; +impl ConsumeMessageConcurrentlyService { + async fn clean_expire_msg(&mut self) { + println!("===========================") + } +} impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService { fn start(&mut self) { - todo!() + let mut this = self.clone(); + tokio::spawn(async move { + let timeout = this.consumer_config.consume_timeout; + let mut interval = tokio::time::interval(Duration::from_secs(timeout * 60)); + interval.tick().await; + loop { + interval.tick().await; + this.clean_expire_msg().await; + } + }); } fn shutdown(&self, await_terminate_millis: u64) { diff --git a/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs b/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs index 2a2521bb..157be9a3 100644 --- a/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs +++ b/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs @@ -14,19 +14,50 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::sync::Arc; + use rocketmq_common::common::message::message_ext::MessageExt; use rocketmq_common::common::message::message_queue::MessageQueue; +use rocketmq_common::ArcRefCellWrapper; +use rocketmq_common::WeakCellWrapper; use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult; +use crate::base::client_config::ClientConfig; use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageServiceTrait; +use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl; use crate::consumer::consumer_impl::pop_process_queue::PopProcessQueue; use crate::consumer::consumer_impl::process_queue::ProcessQueue; +use crate::consumer::default_mq_push_consumer::ConsumerConfig; +use crate::consumer::listener::message_listener_concurrently::ArcBoxMessageListenerConcurrently; + +pub struct ConsumeMessagePopConcurrentlyService { + pub(crate) default_mqpush_consumer_impl: Option>, + pub(crate) client_config: ArcRefCellWrapper, + pub(crate) consumer_config: ArcRefCellWrapper, + pub(crate) consumer_group: Arc, + pub(crate) message_listener: ArcBoxMessageListenerConcurrently, +} -pub struct ConsumeMessagePopConcurrentlyService; +impl ConsumeMessagePopConcurrentlyService { + pub fn new( + client_config: ArcRefCellWrapper, + consumer_config: ArcRefCellWrapper, + consumer_group: String, + message_listener: ArcBoxMessageListenerConcurrently, + ) -> Self { + Self { + default_mqpush_consumer_impl: None, + client_config, + consumer_config, + consumer_group: Arc::new(consumer_group), + message_listener, + } + } +} impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService { fn start(&mut self) { - todo!() + // nothing to do } fn shutdown(&self, await_terminate_millis: u64) { diff --git a/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs b/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs index 6e47c659..b412cb5c 100644 --- a/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs @@ -19,6 +19,7 @@ use std::collections::HashSet; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::thread; use rocketmq_common::common::base::service_state::ServiceState; use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere; @@ -34,6 +35,7 @@ use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType; use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel; use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData; use rocketmq_remoting::runtime::RPCHook; +use tokio::runtime::Handle; use tracing::info; use crate::base::client_config::ClientConfig; @@ -45,8 +47,11 @@ use crate::consumer::consumer_impl::consume_message_pop_orderly_service::Consume use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageConcurrentlyServiceGeneral; use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageOrderlyServiceGeneral; use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageServiceTrait; +use crate::consumer::consumer_impl::pop_request::PopRequest; use crate::consumer::consumer_impl::pull_api_wrapper::PullAPIWrapper; +use crate::consumer::consumer_impl::pull_request::PullRequest; use crate::consumer::consumer_impl::re_balance::rebalance_push_impl::RebalancePushImpl; +use crate::consumer::consumer_impl::re_balance::Rebalance; use crate::consumer::default_mq_push_consumer::ConsumerConfig; use crate::consumer::listener::message_listener::MessageListener; use crate::consumer::mq_consumer_inner::MQConsumerInner; @@ -72,7 +77,7 @@ const DO_NOT_UPDATE_TOPIC_SUBSCRIBE_INFO_WHEN_SUBSCRIPTION_CHANGED: bool = false #[derive(Clone)] pub struct DefaultMQPushConsumerImpl { - client_config: ClientConfig, + client_config: ArcRefCellWrapper, consumer_config: ArcRefCellWrapper, rebalance_impl: ArcRefCellWrapper, filter_message_hook_list: Vec>>, @@ -83,7 +88,7 @@ pub struct DefaultMQPushConsumerImpl { pause: Arc, consume_orderly: bool, message_listener: Option>, - offset_store: Option>, + pub(crate) offset_store: Option>, consume_message_concurrently_service: Option< ArcRefCellWrapper< ConsumeMessageConcurrentlyServiceGeneral< @@ -112,7 +117,7 @@ impl DefaultMQPushConsumerImpl { rpc_hook: Option>>, ) -> Self { let mut this = Self { - client_config: client_config.clone(), + client_config: ArcRefCellWrapper::new(client_config.clone()), consumer_config: consumer_config.clone(), rebalance_impl: ArcRefCellWrapper::new(RebalancePushImpl::new( client_config, @@ -145,7 +150,14 @@ impl DefaultMQPushConsumerImpl { default_mqpush_consumer_impl: WeakCellWrapper, ) { self.rebalance_impl - .set_default_mqpush_consumer_impl(default_mqpush_consumer_impl); + .set_default_mqpush_consumer_impl(default_mqpush_consumer_impl.clone()); + if let Some(ref mut consume_message_concurrently_service) = + self.consume_message_concurrently_service + { + consume_message_concurrently_service + .consume_message_concurrently_service + .default_mqpush_consumer_impl = Some(default_mqpush_consumer_impl); + } } #[inline] @@ -172,10 +184,11 @@ impl DefaultMQPushConsumerImpl { } let client_instance = MQClientManager::get_instance() .get_or_create_mq_client_instance( - self.client_config.clone(), + self.client_config.as_ref().clone(), self.rpc_hook.clone(), ) .await; + self.client_instance = Some(client_instance.clone()); self.rebalance_impl .set_consumer_group(self.consumer_config.consumer_group.clone()); self.rebalance_impl @@ -223,14 +236,28 @@ impl DefaultMQPushConsumerImpl { if let Some(message_listener) = self.message_listener.as_ref() { if message_listener.message_listener_concurrently.is_some() { + let (listener, _) = message_listener + .message_listener_concurrently + .clone() + .unwrap(); self.consume_orderly = false; self.consume_message_concurrently_service = Some(ArcRefCellWrapper::new( ConsumeMessageConcurrentlyServiceGeneral { consume_message_concurrently_service: - ConsumeMessageConcurrentlyService, + ConsumeMessageConcurrentlyService::new( + self.client_config.clone(), + self.consumer_config.clone(), + self.consumer_config.consumer_group.clone(), + listener.clone().expect("listener is None"), + ), consume_message_pop_concurrently_service: - ConsumeMessagePopConcurrentlyService, + ConsumeMessagePopConcurrentlyService::new( + self.client_config.clone(), + self.consumer_config.clone(), + self.consumer_config.consumer_group.clone(), + listener.expect("listener is None"), + ), }, )); } else if message_listener.message_listener_orderly.is_some() { @@ -305,7 +332,8 @@ impl DefaultMQPushConsumerImpl { )); } } - self.update_topic_subscribe_info_when_subscription_changed(); + self.update_topic_subscribe_info_when_subscription_changed() + .await; let client_instance = self.client_instance.as_mut().unwrap(); client_instance.check_client_in_broker().await?; if client_instance @@ -317,8 +345,19 @@ impl DefaultMQPushConsumerImpl { Ok(()) } - fn update_topic_subscribe_info_when_subscription_changed(&mut self) { - unimplemented!("updateTopicSubscribeInfoWhenSubscriptionChanged"); + async fn update_topic_subscribe_info_when_subscription_changed(&mut self) { + if DO_NOT_UPDATE_TOPIC_SUBSCRIBE_INFO_WHEN_SUBSCRIPTION_CHANGED { + return; + } + let sub_table = self.rebalance_impl.get_subscription_inner(); + let sub_table_inner = sub_table.read().await; + let keys = sub_table_inner.keys().clone(); + let client = self.client_instance.as_mut().unwrap(); + for topic in keys { + client + .update_topic_route_info_from_name_server_topic(topic) + .await; + } } fn check_config(&mut self) -> Result<()> { @@ -630,27 +669,65 @@ impl DefaultMQPushConsumerImpl { } Ok(()) } + + pub async fn execute_pull_request_immediately(&mut self, pull_request: PullRequest) { + self.client_instance + .as_mut() + .unwrap() + .pull_message_service + .execute_pull_request_immediately(pull_request) + .await; + } + pub fn execute_pull_request_later(&mut self, pull_request: PullRequest, time_delay: u64) { + self.client_instance + .as_mut() + .unwrap() + .pull_message_service + .execute_pull_request_later(pull_request, time_delay); + } + + pub(crate) async fn pop_message(&mut self, pop_request: PopRequest) { + unimplemented!("popMessage"); + } + + pub(crate) async fn pull_message(&mut self, pull_request: PullRequest) { + unimplemented!("pull_message"); + } } impl MQConsumerInner for DefaultMQPushConsumerImpl { fn group_name(&self) -> &str { - todo!() + self.consumer_config.consumer_group() } fn message_model(&self) -> MessageModel { - todo!() + self.consumer_config.message_model } fn consume_type(&self) -> ConsumeType { - todo!() + ConsumeType::ConsumePassively } fn consume_from_where(&self) -> ConsumeFromWhere { - todo!() + self.consumer_config.consume_from_where } - fn subscriptions(&self) -> &HashSet { - todo!() + fn subscriptions(&self) -> HashSet { + let inner = self + .rebalance_impl + .rebalance_impl_inner + .subscription_inner + .clone(); + + let handle = Handle::current(); + thread::spawn(move || { + handle.block_on(async move { + let inner = inner.read().await; + inner.values().cloned().collect() + }) + }) + .join() + .unwrap() } fn do_rebalance(&self) { @@ -659,25 +736,50 @@ impl MQConsumerInner for DefaultMQPushConsumerImpl { async fn try_rebalance(&self) -> Result { if !self.pause.load(Ordering::Acquire) { - //self.rebalance_impl.do + return Ok(self + .rebalance_impl + .mut_from_ref() + .do_rebalance(self.consume_orderly) + .await); } - unimplemented!() + Ok(false) } fn persist_consumer_offset(&self) { todo!() } - fn update_topic_subscribe_info(&mut self, topic: &str, info: &HashSet) { - todo!() + async fn update_topic_subscribe_info(&mut self, topic: &str, info: &HashSet) { + let sub_table = self.rebalance_impl.get_subscription_inner(); + let sub_table_inner = sub_table.read().await; + if sub_table_inner.contains_key(topic) { + let mut guard = self + .rebalance_impl + .rebalance_impl_inner + .topic_subscribe_info_table + .write() + .await; + guard.insert(topic.to_string(), info.clone()); + } } - fn is_subscribe_topic_need_update(&self, topic: &str) -> bool { - todo!() + async fn is_subscribe_topic_need_update(&self, topic: &str) -> bool { + let sub_table = self.rebalance_impl.get_subscription_inner(); + let sub_table_inner = sub_table.read().await; + if sub_table_inner.contains_key(topic) { + let guard = self + .rebalance_impl + .rebalance_impl_inner + .topic_subscribe_info_table + .read() + .await; + return !guard.contains_key(topic); + } + false } fn is_unit_mode(&self) -> bool { - todo!() + self.consumer_config.unit_mode } fn consumer_running_info(&self) -> ConsumerRunningInfo { diff --git a/rocketmq-client/src/consumer/consumer_impl/message_request.rs b/rocketmq-client/src/consumer/consumer_impl/message_request.rs index bdf4aac4..cc66bcf3 100644 --- a/rocketmq-client/src/consumer/consumer_impl/message_request.rs +++ b/rocketmq-client/src/consumer/consumer_impl/message_request.rs @@ -14,8 +14,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::any::Any; + use rocketmq_common::common::message::message_enum::MessageRequestMode; -pub trait MessageRequest { +pub trait MessageRequest: MessageRequestAny { fn get_message_request_mode(&self) -> MessageRequestMode; } + +pub trait MessageRequestAny: Any { + fn as_any_mut(&mut self) -> &mut dyn Any; + + fn as_any(&self) -> &dyn Any; +} + +impl MessageRequestAny for T { + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/rocketmq-client/src/consumer/consumer_impl/process_queue.rs b/rocketmq-client/src/consumer/consumer_impl/process_queue.rs index dcc64595..080b1d86 100644 --- a/rocketmq-client/src/consumer/consumer_impl/process_queue.rs +++ b/rocketmq-client/src/consumer/consumer_impl/process_queue.rs @@ -14,13 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicI64; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::RwLock; use once_cell::sync::Lazy; use rocketmq_common::common::message::message_ext::MessageExt; use rocketmq_common::TimeUtils::get_current_millis; +use rocketmq_remoting::protocol::body::process_queue_info::ProcessQueueInfo; + +use crate::consumer::consumer_impl::PULL_MAX_IDLE_TIME; +use crate::consumer::default_mq_push_consumer::DefaultMQPushConsumer; static REBALANCE_LOCK_MAX_LIVE_TIME: Lazy = Lazy::new(|| { std::env::var("rocketmq.client.rebalance.lockMaxLiveTime") @@ -45,13 +52,13 @@ pub(crate) struct ProcessQueue { consume_lock: Arc>, consuming_msg_orderly_tree_map: Arc>>, try_unlock_times: Arc, - queue_offset_max: Arc>, - dropped: Arc>, - last_pull_timestamp: Arc>, - last_consume_timestamp: Arc>, - locked: Arc>, - last_lock_timestamp: Arc>, - consuming: Arc>, + queue_offset_max: Arc, + dropped: Arc, + last_pull_timestamp: Arc, + last_consume_timestamp: Arc, + locked: Arc, + last_lock_timestamp: Arc, + consuming: Arc, msg_acc_cnt: Arc, } @@ -67,14 +74,89 @@ impl ProcessQueue { RwLock::new(std::collections::BTreeMap::new()), ), try_unlock_times: Arc::new(AtomicI64::new(0)), - queue_offset_max: Arc::new(std::sync::Mutex::new(0)), - dropped: Arc::new(std::sync::Mutex::new(false)), - last_pull_timestamp: Arc::new(std::sync::Mutex::new(get_current_millis())), - last_consume_timestamp: Arc::new(std::sync::Mutex::new(get_current_millis())), - locked: Arc::new(std::sync::Mutex::new(false)), - last_lock_timestamp: Arc::new(std::sync::Mutex::new(get_current_millis())), - consuming: Arc::new(std::sync::Mutex::new(false)), + queue_offset_max: Arc::new(AtomicU64::new(0)), + dropped: Arc::new(AtomicBool::new(false)), + last_pull_timestamp: Arc::new(AtomicU64::new(get_current_millis())), + last_consume_timestamp: Arc::new(AtomicU64::new(get_current_millis())), + locked: Arc::new(AtomicBool::new(false)), + last_lock_timestamp: Arc::new(AtomicU64::new(get_current_millis())), + consuming: Arc::new(AtomicBool::new(false)), msg_acc_cnt: Arc::new(AtomicI64::new(0)), } } } + +impl ProcessQueue { + pub(crate) fn set_dropped(&self, dropped: bool) { + self.dropped + .store(dropped, std::sync::atomic::Ordering::Release); + } + + pub(crate) fn is_dropped(&self) -> bool { + self.dropped.load(std::sync::atomic::Ordering::Acquire) + } + + pub(crate) fn get_last_lock_timestamp(&self) -> u64 { + self.last_lock_timestamp + .load(std::sync::atomic::Ordering::Acquire) + } + + pub(crate) fn set_locked(&self, locked: bool) { + self.locked + .store(locked, std::sync::atomic::Ordering::Release); + } + + pub(crate) fn is_pull_expired(&self) -> bool { + (get_current_millis() - self.last_pull_timestamp.load(Ordering::Acquire)) + > *PULL_MAX_IDLE_TIME + } + + pub(crate) fn is_lock_expired(&self) -> bool { + (get_current_millis() - self.last_lock_timestamp.load(Ordering::Acquire)) + > *REBALANCE_LOCK_MAX_LIVE_TIME + } + + pub(crate) fn clean_expired_msg(&self, push_consumer: &DefaultMQPushConsumer) { + unimplemented!("clean_expired_msg") + } + + pub(crate) fn put_message(&self, messages: Vec) -> bool { + unimplemented!("put_message") + } + + pub(crate) fn get_max_span(&self) -> u64 { + unimplemented!("get_max_span") + } + + pub(crate) fn remove_message(&self, messages: Vec) -> u64 { + unimplemented!("remove_message") + } + + pub(crate) fn rollback(&self) { + unimplemented!("rollback") + } + + pub(crate) fn commit(&self) -> u64 { + unimplemented!("commit") + } + + pub(crate) fn make_message_to_consume_again(&self, messages: Vec) { + unimplemented!("make_message_to_consume_again") + } + + pub(crate) fn take_messages(&self, batch_size: u32) -> Vec { + unimplemented!("take_messages") + } + + pub(crate) fn contains_message(&self, message_ext: MessageExt) -> bool { + unimplemented!("contains_message") + } + + pub(crate) fn clear(&self) { + unimplemented!("clear") + } + + pub(crate) fn fill_process_queue_info(&self, info: ProcessQueueInfo) { + unimplemented!("fill_process_queue_info") + } +} diff --git a/rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs b/rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs index bccb3559..b4ca4286 100644 --- a/rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs +++ b/rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs @@ -14,15 +14,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::collections::HashMap; +use std::sync::atomic::AtomicU64; use std::sync::Arc; +use rocketmq_common::common::message::message_queue::MessageQueue; +use rocketmq_common::common::mix_all; use rocketmq_common::ArcRefCellWrapper; +use tokio::sync::RwLock; use crate::factory::mq_client_instance::MQClientInstance; use crate::hook::filter_message_hook::FilterMessageHook; #[derive(Clone)] -pub struct PullAPIWrapper; +pub struct PullAPIWrapper { + mq_client_factory: ArcRefCellWrapper, + consumer_group: String, + unit_mode: bool, + pull_from_which_node_table: Arc>>, + connect_broker_by_user: bool, + default_broker_id: u64, + filter_message_hook_list: Vec>>, +} impl PullAPIWrapper { pub fn new( @@ -30,13 +43,21 @@ impl PullAPIWrapper { consumer_group: String, unit_mode: bool, ) -> Self { - unimplemented!("PullAPIWrapper::new") + Self { + mq_client_factory, + consumer_group, + unit_mode, + pull_from_which_node_table: Arc::new(RwLock::new(HashMap::with_capacity(64))), + connect_broker_by_user: false, + default_broker_id: mix_all::MASTER_ID, + filter_message_hook_list: Vec::new(), + } } pub fn register_filter_message_hook( &mut self, filter_message_hook_list: Vec>>, ) { - unimplemented!("PullAPIWrapper::registerFilterMessageHook") + self.filter_message_hook_list = filter_message_hook_list; } } diff --git a/rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs b/rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs index f5b7a9b1..cece2b3d 100644 --- a/rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs +++ b/rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs @@ -14,13 +14,119 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -pub struct PullMessageService {} + +use rocketmq_common::ArcRefCellWrapper; +use tracing::info; +use tracing::warn; + +use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl; +use crate::consumer::consumer_impl::pop_request::PopRequest; +use crate::consumer::consumer_impl::pull_request::PullRequest; +use crate::consumer::mq_consumer_inner::MQConsumerInner; +use crate::factory::mq_client_instance::MQClientInstance; + +#[derive(Clone)] +pub struct PullMessageService { + pop_tx: Option>, + pull_tx: Option>, +} impl PullMessageService { pub fn new() -> Self { - PullMessageService {} + PullMessageService { + pop_tx: None, + pull_tx: None, + } + } + pub async fn start(&mut self, instance: MQClientInstance) + where + C: MQConsumerInner + Clone, + { + let (pop_tx, mut pop_rx) = tokio::sync::mpsc::channel(1024 * 4); + let (pull_tx, mut pull_rx) = tokio::sync::mpsc::channel(1024 * 4); + self.pop_tx = Some(pop_tx); + self.pull_tx = Some(pull_tx); + let instance_wrapper = ArcRefCellWrapper::new(instance); + let instance_wrapper_clone = instance_wrapper.clone(); + tokio::spawn(async move { + info!( + ">>>>>>>>>>>>>>>>>>>>>>>PullMessageService [PopRequest] \ + started<<<<<<<<<<<<<<<<<<<<<<<<<<<<" + ); + while let Some(request) = pop_rx.recv().await { + if let Some(mut consumer) = instance_wrapper + .select_consumer(request.get_consumer_group()) + .await + { + consumer + .as_any_mut() + .downcast_mut::() + .unwrap() + .pop_message(request) + .await; + } else { + warn!( + "No matched consumer for the PopRequest {}, drop it", + request + ) + } + } + }); + tokio::spawn(async move { + info!( + ">>>>>>>>>>>>>>>>>>>>>>>PullMessageService [PullRequest] \ + started<<<<<<<<<<<<<<<<<<<<<<<<<<<<" + ); + while let Some(request) = pull_rx.recv().await { + if let Some(mut consumer) = instance_wrapper_clone + .select_consumer(request.get_consumer_group()) + .await + { + consumer + .as_any_mut() + .downcast_mut::() + .unwrap() + .pull_message(request) + .await; + } else { + warn!( + "No matched consumer for the PullRequest {},drop it", + request + ) + } + } + }); + } + + pub fn execute_pull_request_later(&self, pull_request: PullRequest, time_delay: u64) { + let this = self.clone(); + tokio::spawn(async move { + tokio::time::sleep(tokio::time::Duration::from_millis(time_delay)).await; + if let Err(e) = this.pull_tx.as_ref().unwrap().send(pull_request).await { + warn!("Failed to send pull request to pull_tx, error: {:?}", e); + } + }); + } + + pub async fn execute_pull_request_immediately(&self, pull_request: PullRequest) { + if let Err(e) = self.pull_tx.as_ref().unwrap().send(pull_request).await { + warn!("Failed to send pull request to pull_tx, error: {:?}", e); + } } - pub async fn start(&mut self) { - println!("PullMessageService started"); + + pub fn execute_pop_pull_request_later(&self, pop_request: PopRequest, time_delay: u64) { + let this = self.clone(); + tokio::spawn(async move { + tokio::time::sleep(tokio::time::Duration::from_millis(time_delay)).await; + if let Err(e) = this.pop_tx.as_ref().unwrap().send(pop_request).await { + warn!("Failed to send pull request to pull_tx, error: {:?}", e); + } + }); + } + + pub async fn execute_pop_pull_request_immediately(&self, pop_request: PopRequest) { + if let Err(e) = self.pop_tx.as_ref().unwrap().send(pop_request).await { + warn!("Failed to send pull request to pull_tx, error: {:?}", e); + } } } diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance.rs index 9ff7137c..63f8abe5 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance.rs @@ -31,17 +31,16 @@ pub(crate) mod rebalance_service; #[trait_variant::make(Rebalance: Send)] pub trait RebalanceLocal { - fn message_queue_changed( + async fn message_queue_changed( &self, topic: &str, mq_all: &HashSet, mq_divided: &HashSet, ); - fn remove_unnecessary_message_queue( - &self, - topic: &str, - mq: MessageQueue, - pq: ProcessQueue, + async fn remove_unnecessary_message_queue( + &mut self, + mq: &MessageQueue, + pq: &ProcessQueue, ) -> bool; fn remove_unnecessary_pop_message_queue(&self, mq: MessageQueue, pq: ProcessQueue) -> bool; @@ -54,13 +53,14 @@ pub trait RebalanceLocal { } fn consume_type(&self) -> ConsumeType; - fn remove_dirty_offset(&self, mq: MessageQueue); + async fn remove_dirty_offset(&self, mq: &MessageQueue); - fn compute_pull_from_where_with_exception(&self, mq: MessageQueue) -> Result; + async fn compute_pull_from_where_with_exception(&mut self, mq: &MessageQueue) -> Result; + async fn compute_pull_from_where(&mut self, mq: &MessageQueue) -> i64; fn get_consume_init_mode(&self) -> i32; - fn dispatch_pull_request(&self, pull_request_list: Vec, delay: i64); - fn dispatch_pop_pull_request(&self, pull_request_list: Vec, delay: i64); + async fn dispatch_pull_request(&self, pull_request_list: Vec, delay: u64); + fn dispatch_pop_pull_request(&self, pull_request_list: Vec, delay: u64); fn create_process_queue(&self) -> ProcessQueue; fn create_pop_process_queue(&self) -> PopProcessQueue; fn remove_process_queue(&self, mq: MessageQueue); @@ -69,7 +69,5 @@ pub trait RebalanceLocal { fn unlock_all(&self, oneway: bool); async fn do_rebalance(&mut self, is_order: bool) -> bool; - fn client_rebalance(&mut self, topic: &str) -> bool { - true - } + fn client_rebalance(&mut self, topic: &str) -> bool; } diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs index 900bc3d6..03004dbd 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs @@ -23,15 +23,18 @@ use rocketmq_common::common::message::message_queue::MessageQueue; use rocketmq_common::common::mix_all; use rocketmq_common::ArcRefCellWrapper; use rocketmq_common::WeakCellWrapper; +use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType; use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel; use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData; use tokio::sync::RwLock; use tracing::error; +use tracing::info; use tracing::warn; use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy; use crate::consumer::consumer_impl::pop_process_queue::PopProcessQueue; use crate::consumer::consumer_impl::process_queue::ProcessQueue; +use crate::consumer::consumer_impl::pull_request::PullRequest; use crate::consumer::consumer_impl::re_balance::Rebalance; use crate::factory::mq_client_instance::MQClientInstance; @@ -80,8 +83,8 @@ where pub async fn do_rebalance(&mut self, is_order: bool) -> bool { let mut balanced = true; - let arc = self.subscription_inner.clone(); - let sub_table = arc.read().await; + let sub_inner = self.subscription_inner.clone(); + let sub_table = sub_inner.read().await; if !sub_table.is_empty() { for (topic, _) in sub_table.iter() { if !self.client_rebalance(topic) && self.try_query_assignment(topic).await { @@ -97,8 +100,12 @@ where balanced } + #[inline] pub fn client_rebalance(&mut self, topic: &str) -> bool { - true + match self.sub_rebalance_impl.as_mut().unwrap().upgrade() { + None => true, + Some(mut value) => value.client_rebalance(topic), + } } async fn try_query_assignment(&self, topic: &str) -> bool { @@ -106,32 +113,142 @@ where } async fn truncate_message_queue_not_my_topic(&self) { - unimplemented!("try_query_assignment") + unimplemented!("truncate_message_queue_not_my_topic") } async fn get_rebalance_result_from_broker(&self, topic: &str, is_order: bool) -> bool { - unimplemented!("try_query_assignment") + unimplemented!("get_rebalance_result_from_broker") } async fn update_process_queue_table_in_rebalance( - &self, + &mut self, topic: &str, mq_set: &HashSet, is_order: bool, ) -> bool { - unimplemented!("try_query_assignment") + let mut changed = false; + let mut remove_queue_map = HashMap::new(); + let process_queue_table_cloned = self.process_queue_table.clone(); + let mut process_queue_table = process_queue_table_cloned.write().await; + // Drop process queues no longer belong to me + for (mq, pq) in process_queue_table.iter() { + if mq.get_topic() == topic { + if !mq_set.contains(mq) { + pq.set_dropped(true); + remove_queue_map.insert(mq.clone(), pq.clone()); + } else if pq.is_pull_expired() { + if let Some(sub_rebalance) = self.sub_rebalance_impl.as_mut().unwrap().upgrade() + { + if sub_rebalance.consume_type() == ConsumeType::ConsumePassively { + pq.set_dropped(true); + remove_queue_map.insert(mq.clone(), pq.clone()); + error!( + "[BUG]doRebalance, {:?}, try remove unnecessary mq, {}, because \ + pull is pause, so try to fixed it", + self.consumer_group, + mq.get_topic() + ); + } + } + } + } + } + + // Remove message queues no longer belong to me + for (mq, pq) in remove_queue_map { + if let Some(mut sub_rebalance) = self.sub_rebalance_impl.as_mut().unwrap().upgrade() { + if sub_rebalance + .remove_unnecessary_message_queue(&mq, &pq) + .await + { + process_queue_table.remove(&mq); + changed = true; + info!( + "doRebalance, {:?}, remove unnecessary mq, {}", + self.consumer_group, + mq.get_topic() + ); + } + } + } + // Add new message queue + let mut all_mq_locked = true; + let mut pull_request_list = Vec::new(); + let sub_rebalance_impl = self.sub_rebalance_impl.as_mut().unwrap().upgrade(); + if sub_rebalance_impl.is_none() { + return false; + } + let mut sub_rebalance_impl = sub_rebalance_impl.unwrap(); + for mq in mq_set { + if !process_queue_table.contains_key(mq) { + if is_order && !self.lock(mq) { + warn!( + "doRebalance, {:?}, add a new mq failed, {}, because lock failed", + self.consumer_group, + mq.get_topic() + ); + all_mq_locked = false; + continue; + } + + sub_rebalance_impl.remove_dirty_offset(mq).await; + let pq = ProcessQueue::new(); + pq.set_locked(true); + let next_offset = sub_rebalance_impl.compute_pull_from_where(mq).await; + if next_offset >= 0 { + if process_queue_table.insert(mq.clone(), pq.clone()).is_none() { + info!( + "doRebalance, {:?}, add a new mq, {}", + self.consumer_group, + mq.get_topic() + ); + pull_request_list.push(PullRequest::new( + self.consumer_group.as_ref().unwrap().clone(), + mq.clone(), + pq, + next_offset, + )); + changed = true; + } else { + info!( + "doRebalance, {:?}, mq already exists, {}", + self.consumer_group, + mq.get_topic() + ); + } + } else { + warn!( + "doRebalance, {:?}, add new mq failed, {}", + self.consumer_group, + mq.get_topic() + ); + } + } + } + + if !all_mq_locked { + self.client_instance.as_mut().unwrap().rebalance_later(500); + } + + sub_rebalance_impl + .dispatch_pull_request(pull_request_list, 500) + .await; + + changed } - async fn rebalance_by_topic(&self, topic: &str, is_order: bool) -> bool { + + async fn rebalance_by_topic(&mut self, topic: &str, is_order: bool) -> bool { match self.message_model.unwrap() { MessageModel::Broadcasting => { unimplemented!("Broadcasting") } MessageModel::Clustering => { - let topic_subscribe_info_table_inner = self.topic_subscribe_info_table.read().await; + let topic_sub_cloned = self.topic_subscribe_info_table.clone(); + let topic_subscribe_info_table_inner = topic_sub_cloned.write().await; let mq_set = topic_subscribe_info_table_inner.get(topic); let ci_all = self .client_instance - .as_ref() + .as_mut() .unwrap() .find_consumer_id_list(topic, self.consumer_group.as_ref().unwrap()) .await; @@ -139,11 +256,9 @@ where if let Some(sub_rebalance_impl) = self.sub_rebalance_impl.as_ref().unwrap().upgrade() { - sub_rebalance_impl.message_queue_changed( - topic, - &HashSet::new(), - &HashSet::new(), - ); + sub_rebalance_impl + .message_queue_changed(topic, &HashSet::new(), &HashSet::new()) + .await; warn!( "doRebalance, {}, but the topic[{}] not exist.", self.consumer_group.as_ref().unwrap(), @@ -203,11 +318,9 @@ where if let Some(sub_rebalance_impl) = self.sub_rebalance_impl.as_ref().unwrap().upgrade() { - sub_rebalance_impl.message_queue_changed( - topic, - mq_set, - &allocate_result_set, - ); + sub_rebalance_impl + .message_queue_changed(topic, mq_set, &allocate_result_set) + .await; } } return allocate_result_set.eq(&self.get_working_message_queue(topic)); @@ -220,4 +333,8 @@ where pub fn get_working_message_queue(&self, topic: &str) -> HashSet { unimplemented!() } + + pub fn lock(&self, mq: &MessageQueue) -> bool { + unimplemented!() + } } diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs index 59e29f7f..d3502fca 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs @@ -14,15 +14,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; +use once_cell::sync::Lazy; +use rocketmq_common::common::constant::consume_init_mode::ConsumeInitMode; +use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere; use rocketmq_common::common::message::message_queue::MessageQueue; +use rocketmq_common::common::mix_all; +use rocketmq_common::utils::util_all; use rocketmq_common::ArcRefCellWrapper; +use rocketmq_common::TimeUtils::get_current_millis; use rocketmq_common::WeakCellWrapper; +use rocketmq_remoting::code::response_code::ResponseCode; use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType; use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel; use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData; +use tokio::sync::RwLock; +use tracing::info; +use tracing::warn; use crate::base::client_config::ClientConfig; use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy; @@ -34,14 +45,23 @@ use crate::consumer::consumer_impl::pull_request::PullRequest; use crate::consumer::consumer_impl::re_balance::rebalance_impl::RebalanceImpl; use crate::consumer::consumer_impl::re_balance::Rebalance; use crate::consumer::default_mq_push_consumer::ConsumerConfig; +use crate::consumer::store::read_offset_type::ReadOffsetType; +use crate::error::MQClientError; use crate::factory::mq_client_instance::MQClientInstance; use crate::Result; +static UNLOCK_DELAY_TIME_MILLS: Lazy = Lazy::new(|| { + std::env::var("rocketmq.client.unlockDelayTimeMills") + .unwrap_or_else(|_| "20000".into()) + .parse::() + .unwrap_or(20000) +}); + pub struct RebalancePushImpl { - client_config: ClientConfig, - consumer_config: ArcRefCellWrapper, - rebalance_impl: RebalanceImpl, - default_mqpush_consumer_impl: Option>, + pub(crate) client_config: ClientConfig, + pub(crate) consumer_config: ArcRefCellWrapper, + pub(crate) rebalance_impl_inner: RebalanceImpl, + pub(crate) default_mqpush_consumer_impl: Option>, } impl RebalancePushImpl { @@ -52,13 +72,17 @@ impl RebalancePushImpl { RebalancePushImpl { client_config, consumer_config, - rebalance_impl: RebalanceImpl::new(None, None, None, None), + rebalance_impl_inner: RebalanceImpl::new(None, None, None, None), default_mqpush_consumer_impl: None, } } } impl RebalancePushImpl { + pub fn get_subscription_inner(&self) -> Arc>> { + self.rebalance_impl_inner.subscription_inner.clone() + } + pub fn set_default_mqpush_consumer_impl( &mut self, default_mqpush_consumer_impl: WeakCellWrapper, @@ -67,22 +91,23 @@ impl RebalancePushImpl { } pub fn set_consumer_group(&mut self, consumer_group: String) { - self.rebalance_impl.consumer_group = Some(consumer_group); + self.rebalance_impl_inner.consumer_group = Some(consumer_group); } pub fn set_message_model(&mut self, message_model: MessageModel) { - self.rebalance_impl.message_model = Some(message_model); + self.rebalance_impl_inner.message_model = Some(message_model); } pub fn set_allocate_message_queue_strategy( &mut self, allocate_message_queue_strategy: Arc, ) { - self.rebalance_impl.allocate_message_queue_strategy = Some(allocate_message_queue_strategy); + self.rebalance_impl_inner.allocate_message_queue_strategy = + Some(allocate_message_queue_strategy); } pub fn set_mq_client_factory(&mut self, client_instance: ArcRefCellWrapper) { - self.rebalance_impl.client_instance = Some(client_instance); + self.rebalance_impl_inner.client_instance = Some(client_instance); } pub async fn put_subscription_data( @@ -90,47 +115,97 @@ impl RebalancePushImpl { topic: &str, subscription_data: SubscriptionData, ) { - let mut subscription_inner = self.rebalance_impl.subscription_inner.write().await; + let mut subscription_inner = self.rebalance_impl_inner.subscription_inner.write().await; subscription_inner.insert(topic.to_string(), subscription_data); } - pub fn client_rebalance(&self, topic: &str) -> bool { - self.consumer_config.client_rebalance - || self.rebalance_impl.message_model.unwrap() == MessageModel::Broadcasting - || if let Some(default_mqpush_consumer_impl) = self - .default_mqpush_consumer_impl - .as_ref() - .unwrap() - .upgrade() - { - default_mqpush_consumer_impl.is_consume_orderly() - } else { - false - } + pub fn set_rebalance_impl(&mut self, rebalance_impl: WeakCellWrapper) { + self.rebalance_impl_inner.sub_rebalance_impl = Some(rebalance_impl); } - pub fn set_rebalance_impl(&mut self, rebalance_impl: WeakCellWrapper) { - self.rebalance_impl.sub_rebalance_impl = Some(rebalance_impl); + async fn try_remove_order_message_queue( + &mut self, + mq: &MessageQueue, + pq: &ProcessQueue, + ) -> bool { + if let Some(mut default_mqpush_consumer_impl) = self + .default_mqpush_consumer_impl + .as_ref() + .unwrap() + .upgrade() + { + let force_unlock = pq.is_dropped() + && (get_current_millis() > pq.get_last_lock_timestamp() + *UNLOCK_DELAY_TIME_MILLS); + if force_unlock { + let offset_store = default_mqpush_consumer_impl.offset_store.as_mut().unwrap(); + offset_store.persist(mq).await; + offset_store.remove_offset(mq).await; + } + } + + false } } impl Rebalance for RebalancePushImpl { - fn message_queue_changed( + async fn message_queue_changed( &self, topic: &str, mq_all: &HashSet, mq_divided: &HashSet, ) { - todo!() + let mut subscription_inner = self.rebalance_impl_inner.subscription_inner.write().await; + let subscription_data = subscription_inner.get_mut(topic).unwrap(); + let new_version = get_current_millis() as i64; + info!( + "{} Rebalance changed, also update version: {}, {}", + topic, subscription_data.sub_version, new_version + ); + subscription_data.sub_version = new_version; + drop(subscription_inner); + + let process_queue_table = self.rebalance_impl_inner.process_queue_table.read().await; + let current_queue_count = process_queue_table.len(); + if current_queue_count != 0 { + unimplemented!() + } + self.rebalance_impl_inner + .client_instance + .as_ref() + .unwrap() + .mut_from_ref() + .send_heartbeat_to_all_broker_with_lock_v2(true) + .await; + if let Some(ref message_queue_listener) = self.consumer_config.message_queue_listener { + message_queue_listener.message_queue_changed(topic, mq_all, mq_divided); + } } - fn remove_unnecessary_message_queue( - &self, - topic: &str, - mq: MessageQueue, - pq: ProcessQueue, + async fn remove_unnecessary_message_queue( + &mut self, + mq: &MessageQueue, + pq: &ProcessQueue, ) -> bool { - todo!() + let default_mqpush_consumer_impl = self + .default_mqpush_consumer_impl + .as_ref() + .unwrap() + .upgrade(); + if default_mqpush_consumer_impl.is_none() { + return false; + } + let mut default_mqpush_consumer_impl = default_mqpush_consumer_impl.unwrap(); + let consume_orderly = default_mqpush_consumer_impl.is_consume_orderly(); + let offset_store = default_mqpush_consumer_impl.offset_store.as_mut().unwrap(); + + if consume_orderly && MessageModel::Clustering == self.consumer_config.message_model { + offset_store.persist(mq).await; + self.try_remove_order_message_queue(mq, pq).await + } else { + offset_store.persist(mq).await; + offset_store.remove_offset(mq).await; + true + } } fn remove_unnecessary_pop_message_queue(&self, mq: MessageQueue, pq: ProcessQueue) -> bool { @@ -138,26 +213,175 @@ impl Rebalance for RebalancePushImpl { } fn consume_type(&self) -> ConsumeType { - todo!() + ConsumeType::ConsumePassively } - fn remove_dirty_offset(&self, mq: MessageQueue) { - todo!() + async fn remove_dirty_offset(&self, mq: &MessageQueue) { + if let Some(mut default_mqpush_consumer_impl) = self + .default_mqpush_consumer_impl + .as_ref() + .unwrap() + .upgrade() + { + let offset_store = default_mqpush_consumer_impl.offset_store.as_mut().unwrap(); + offset_store.remove_offset(mq).await; + } } - fn compute_pull_from_where_with_exception(&self, mq: MessageQueue) -> Result { - todo!() + async fn compute_pull_from_where_with_exception(&mut self, mq: &MessageQueue) -> Result { + let consume_from_where = self.consumer_config.consume_from_where; + let default_mqpush_consumer_impl = self + .default_mqpush_consumer_impl + .as_ref() + .unwrap() + .upgrade(); + if default_mqpush_consumer_impl.is_none() { + return Err(MQClientError::MQClientErr( + -1, + "default_mqpush_consumer_impl is none".to_string(), + )); + } + let mut default_mqpush_consumer_impl = default_mqpush_consumer_impl.unwrap(); + let offset_store = default_mqpush_consumer_impl.offset_store.as_mut().unwrap(); + + let result = match consume_from_where { + ConsumeFromWhere::ConsumeFromLastOffset + | ConsumeFromWhere::ConsumeFromLastOffsetAndFromMinWhenBootFirst + | ConsumeFromWhere::ConsumeFromMinOffset + | ConsumeFromWhere::ConsumeFromMaxOffset => { + let last_offset = offset_store + .read_offset(mq, ReadOffsetType::ReadFromStore) + .await; + if last_offset >= 0 { + last_offset + } else if -1 == last_offset { + if mq + .get_topic() + .starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX) + { + 0 + } else { + self.rebalance_impl_inner + .client_instance + .as_mut() + .unwrap() + .mq_admin_impl + .max_offset(mq) + .await? + } + } else { + return Err(MQClientError::MQClientErr( + ResponseCode::QueryNotFound.into(), + "Failed to query consume offset from offset store".to_string(), + )); + } + } + ConsumeFromWhere::ConsumeFromFirstOffset => { + let last_offset = offset_store + .read_offset(mq, ReadOffsetType::ReadFromStore) + .await; + if last_offset >= 0 { + last_offset + } else if -1 == last_offset { + 0 + } else { + return Err(MQClientError::MQClientErr( + ResponseCode::QueryNotFound.into(), + "Failed to query consume offset from offset store".to_string(), + )); + } + } + ConsumeFromWhere::ConsumeFromTimestamp => { + let last_offset = offset_store + .read_offset(mq, ReadOffsetType::ReadFromStore) + .await; + if last_offset >= 0 { + last_offset + } else if -1 == last_offset { + if mq + .get_topic() + .starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX) + { + self.rebalance_impl_inner + .client_instance + .as_mut() + .unwrap() + .mq_admin_impl + .max_offset(mq) + .await? + } else { + let timestamp = util_all::parse_date( + self.consumer_config.consume_timestamp.as_ref().unwrap(), + util_all::YYYYMMDDHHMMSS, + ) + .unwrap() + .and_utc() + .timestamp(); + self.rebalance_impl_inner + .client_instance + .as_mut() + .unwrap() + .mq_admin_impl + .search_offset(mq, timestamp as u64) + .await? + } + } else { + return Err(MQClientError::MQClientErr( + ResponseCode::QueryNotFound.into(), + "Failed to query consume offset from offset store".to_string(), + )); + } + } + }; + if result < 0 { + return Err(MQClientError::MQClientErr( + ResponseCode::SystemError.into(), + "Failed to query consume offset from offset store".to_string(), + )); + } + Ok(result) + } + + async fn compute_pull_from_where(&mut self, mq: &MessageQueue) -> i64 { + self.compute_pull_from_where_with_exception(mq) + .await + .unwrap_or_else(|e| { + warn!("Compute consume offset exception, mq={:?}", e); + -1 + }) } fn get_consume_init_mode(&self) -> i32 { - todo!() + let consume_from_where = self.consumer_config.consume_from_where; + if consume_from_where == ConsumeFromWhere::ConsumeFromFirstOffset { + ConsumeInitMode::MIN + } else { + ConsumeInitMode::MAX + } } - fn dispatch_pull_request(&self, pull_request_list: Vec, delay: i64) { - todo!() + async fn dispatch_pull_request(&self, pull_request_list: Vec, delay: u64) { + let mqpush_consumer_impl = self + .default_mqpush_consumer_impl + .as_ref() + .unwrap() + .upgrade(); + if mqpush_consumer_impl.is_none() { + return; + } + let mut mqpush_consumer_impl = mqpush_consumer_impl.unwrap(); + for pull_request in pull_request_list { + if delay == 0 { + mqpush_consumer_impl + .execute_pull_request_immediately(pull_request) + .await; + } else { + mqpush_consumer_impl.execute_pull_request_later(pull_request, delay); + } + } } - fn dispatch_pop_pull_request(&self, pull_request_list: Vec, delay: i64) { + fn dispatch_pop_pull_request(&self, pull_request_list: Vec, delay: u64) { todo!() } @@ -187,6 +411,21 @@ impl Rebalance for RebalancePushImpl { } async fn do_rebalance(&mut self, is_order: bool) -> bool { - todo!() + self.rebalance_impl_inner.do_rebalance(is_order).await + } + + fn client_rebalance(&mut self, topic: &str) -> bool { + self.consumer_config.client_rebalance + || self.rebalance_impl_inner.message_model.unwrap() == MessageModel::Broadcasting + || if let Some(default_mqpush_consumer_impl) = self + .default_mqpush_consumer_impl + .as_ref() + .unwrap() + .upgrade() + { + default_mqpush_consumer_impl.is_consume_orderly() + } else { + false + } } } diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_service.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_service.rs index ad5e969f..d0aa3ecb 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_service.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_service.rs @@ -21,6 +21,7 @@ use once_cell::sync::Lazy; use tokio::select; use tokio::sync::Notify; use tokio::time::Instant; +use tracing::info; use crate::consumer::mq_consumer_inner::MQConsumerInner; use crate::factory::mq_client_instance::MQClientInstance; @@ -64,6 +65,7 @@ impl RebalanceService { let mut last_rebalance_timestamp = Instant::now(); let min_interval = *MIN_INTERVAL; let mut real_wait_interval = *WAIT_INTERVAL; + info!(">>>>>>>>>RebalanceService started<<<<<<<<<"); loop { select! { _ = notify.notified() => {} diff --git a/rocketmq-client/src/consumer/mq_consumer_inner.rs b/rocketmq-client/src/consumer/mq_consumer_inner.rs index 2afea3be..4e34b7c5 100644 --- a/rocketmq-client/src/consumer/mq_consumer_inner.rs +++ b/rocketmq-client/src/consumer/mq_consumer_inner.rs @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + use std::collections::HashSet; use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere; @@ -25,7 +26,7 @@ use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData; use crate::Result; #[trait_variant::make(MQConsumerInner: Send)] -pub trait MQConsumerInnerLocal: Sync + 'static { +pub trait MQConsumerInnerLocal: MQConsumerInnerAny + Sync + 'static { fn group_name(&self) -> &str; fn message_model(&self) -> MessageModel; @@ -34,7 +35,7 @@ pub trait MQConsumerInnerLocal: Sync + 'static { fn consume_from_where(&self) -> ConsumeFromWhere; - fn subscriptions(&self) -> &HashSet; + fn subscriptions(&self) -> HashSet; fn do_rebalance(&self); @@ -42,11 +43,27 @@ pub trait MQConsumerInnerLocal: Sync + 'static { fn persist_consumer_offset(&self); - fn update_topic_subscribe_info(&mut self, topic: &str, info: &HashSet); + async fn update_topic_subscribe_info(&mut self, topic: &str, info: &HashSet); - fn is_subscribe_topic_need_update(&self, topic: &str) -> bool; + async fn is_subscribe_topic_need_update(&self, topic: &str) -> bool; fn is_unit_mode(&self) -> bool; fn consumer_running_info(&self) -> ConsumerRunningInfo; } + +pub trait MQConsumerInnerAny: std::any::Any { + fn as_any_mut(&mut self) -> &mut dyn std::any::Any; + + fn as_any(&self) -> &dyn std::any::Any; +} + +impl MQConsumerInnerAny for T { + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { + self + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } +} diff --git a/rocketmq-client/src/consumer/store.rs b/rocketmq-client/src/consumer/store.rs index 9badbb7e..665178e3 100644 --- a/rocketmq-client/src/consumer/store.rs +++ b/rocketmq-client/src/consumer/store.rs @@ -14,6 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +mod controllable_offset; pub(crate) mod local_file_offset_store; pub(crate) mod offset_store; pub(crate) mod read_offset_type; diff --git a/rocketmq-client/src/consumer/store/controllable_offset.rs b/rocketmq-client/src/consumer/store/controllable_offset.rs new file mode 100644 index 00000000..85a8a499 --- /dev/null +++ b/rocketmq-client/src/consumer/store/controllable_offset.rs @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicI64; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +pub struct ControllableOffset { + value: Arc, + allow_to_update: Arc, +} + +impl ControllableOffset { + pub fn new(value: i64) -> Self { + Self { + value: Arc::new(AtomicI64::new(value)), + allow_to_update: Arc::new(AtomicBool::new(true)), + } + } + + pub fn update(&self, target: i64, increase_only: bool) { + if self.allow_to_update.load(Ordering::SeqCst) { + self.value + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |val| { + if self.allow_to_update.load(Ordering::SeqCst) { + if increase_only { + Some(std::cmp::max(target, val)) + } else { + Some(target) + } + } else { + Some(val) + } + }) + .ok(); + } + } + + pub fn update_unconditionally(&self, target: i64) { + self.update(target, false); + } + + pub fn update_and_freeze(&self, target: i64) { + self.value + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |val| { + self.allow_to_update.store(false, Ordering::SeqCst); + Some(target) + }) + .ok(); + } + + pub fn get_offset(&self) -> i64 { + self.value.load(Ordering::SeqCst) + } +} diff --git a/rocketmq-client/src/consumer/store/local_file_offset_store.rs b/rocketmq-client/src/consumer/store/local_file_offset_store.rs index a57621a7..8c3c2d4d 100644 --- a/rocketmq-client/src/consumer/store/local_file_offset_store.rs +++ b/rocketmq-client/src/consumer/store/local_file_offset_store.rs @@ -49,11 +49,11 @@ impl OffsetStoreTrait for LocalFileOffsetStore { todo!() } - async fn persist_all(&self, mqs: &HashSet) { + async fn persist_all(&mut self, mqs: &HashSet) { todo!() } - async fn persist(&self, mq: &MessageQueue) { + async fn persist(&mut self, mq: &MessageQueue) { todo!() } @@ -66,7 +66,7 @@ impl OffsetStoreTrait for LocalFileOffsetStore { } async fn update_consume_offset_to_broker( - &self, + &mut self, mq: &MessageQueue, offset: i64, is_oneway: bool, diff --git a/rocketmq-client/src/consumer/store/offset_store.rs b/rocketmq-client/src/consumer/store/offset_store.rs index 3e31e23f..bf26d496 100644 --- a/rocketmq-client/src/consumer/store/offset_store.rs +++ b/rocketmq-client/src/consumer/store/offset_store.rs @@ -92,20 +92,20 @@ impl OffsetStore { 0 } - pub async fn persist_all(&self, mqs: &HashSet) { - if let Some(store) = &self.remote_broker_offset_store { + pub async fn persist_all(&mut self, mqs: &HashSet) { + if let Some(ref mut store) = self.remote_broker_offset_store { store.persist_all(mqs).await; } - if let Some(store) = &self.local_file_offset_store { + if let Some(ref mut store) = self.local_file_offset_store { store.persist_all(mqs).await; } } - pub async fn persist(&self, mq: &MessageQueue) { - if let Some(store) = &self.remote_broker_offset_store { + pub async fn persist(&mut self, mq: &MessageQueue) { + if let Some(ref mut store) = self.remote_broker_offset_store { store.persist(mq).await; } - if let Some(store) = &self.local_file_offset_store { + if let Some(ref mut store) = self.local_file_offset_store { store.persist(mq).await; } } @@ -129,20 +129,20 @@ impl OffsetStore { } pub async fn update_consume_offset_to_broker( - &self, + &mut self, mq: &MessageQueue, offset: i64, is_oneway: bool, ) -> Result<()> { - if let Some(store) = &self.remote_broker_offset_store { - return store + if let Some(ref mut store) = self.remote_broker_offset_store { + store .update_consume_offset_to_broker(mq, offset, is_oneway) - .await; + .await?; } - if let Some(store) = &self.local_file_offset_store { - return store + if let Some(ref mut store) = self.local_file_offset_store { + store .update_consume_offset_to_broker(mq, offset, is_oneway) - .await; + .await?; } Ok(()) } @@ -156,16 +156,16 @@ pub trait OffsetStoreTrait { async fn update_and_freeze_offset(&self, mq: &MessageQueue, offset: i64); async fn read_offset(&self, mq: &MessageQueue, type_: ReadOffsetType) -> i64; - async fn persist_all(&self, mqs: &HashSet); + async fn persist_all(&mut self, mqs: &HashSet); - async fn persist(&self, mq: &MessageQueue); + async fn persist(&mut self, mq: &MessageQueue); async fn remove_offset(&self, mq: &MessageQueue); async fn clone_offset_table(&self, topic: &str) -> HashMap; async fn update_consume_offset_to_broker( - &self, + &mut self, mq: &MessageQueue, offset: i64, is_oneway: bool, diff --git a/rocketmq-client/src/consumer/store/remote_broker_offset_store.rs b/rocketmq-client/src/consumer/store/remote_broker_offset_store.rs index 113a459b..1d164d25 100644 --- a/rocketmq-client/src/consumer/store/remote_broker_offset_store.rs +++ b/rocketmq-client/src/consumer/store/remote_broker_offset_store.rs @@ -16,61 +16,250 @@ */ use std::collections::HashMap; use std::collections::HashSet; +use std::sync::Arc; use rocketmq_common::common::message::message_queue::MessageQueue; +use rocketmq_common::common::mix_all; use rocketmq_common::ArcRefCellWrapper; +use rocketmq_remoting::protocol::header::namesrv::topic_operation_header::TopicRequestHeader; +use rocketmq_remoting::protocol::header::update_consumer_offset_header::UpdateConsumerOffsetRequestHeader; +use rocketmq_remoting::rpc::rpc_request_header::RpcRequestHeader; +use tokio::sync::Mutex; +use tracing::error; +use tracing::info; +use tracing::warn; +use crate::consumer::store::controllable_offset::ControllableOffset; use crate::consumer::store::offset_store::OffsetStoreTrait; use crate::consumer::store::read_offset_type::ReadOffsetType; +use crate::error::MQClientError; use crate::factory::mq_client_instance::MQClientInstance; +use crate::Result; -pub struct RemoteBrokerOffsetStore {} +pub struct RemoteBrokerOffsetStore { + client_instance: ArcRefCellWrapper, + group_name: String, + offset_table: Arc>>, +} impl RemoteBrokerOffsetStore { - pub fn new(mq_client_factory: ArcRefCellWrapper, group_name: String) -> Self { - Self {} + pub fn new(client_instance: ArcRefCellWrapper, group_name: String) -> Self { + Self { + client_instance, + group_name, + offset_table: Arc::new(Mutex::new(HashMap::with_capacity(64))), + } + } + + async fn fetch_consume_offset_from_broker(&self, mq: &MessageQueue) -> Result { + todo!() } } impl OffsetStoreTrait for RemoteBrokerOffsetStore { async fn load(&self) -> crate::Result<()> { - todo!() + Ok(()) } async fn update_offset(&self, mq: &MessageQueue, offset: i64, increase_only: bool) { - todo!() + let mut offset_table = self.offset_table.lock().await; + let offset_old = offset_table + .entry(mq.clone()) + .or_insert_with(|| ControllableOffset::new(offset)); + if increase_only { + offset_old.update(offset, true); + } else { + offset_old.update_unconditionally(offset); + } } async fn update_and_freeze_offset(&self, mq: &MessageQueue, offset: i64) { - todo!() + let mut offset_table = self.offset_table.lock().await; + offset_table + .entry(mq.clone()) + .or_insert_with(|| ControllableOffset::new(offset)) + .update_and_freeze(offset); } async fn read_offset(&self, mq: &MessageQueue, type_: ReadOffsetType) -> i64 { - todo!() + match type_ { + ReadOffsetType::ReadFromMemory | ReadOffsetType::MemoryFirstThenStore => { + let offset_table = self.offset_table.lock().await; + if let Some(offset) = offset_table.get(mq) { + return offset.get_offset(); + } else if type_ == ReadOffsetType::ReadFromMemory { + return -1; + } + } + ReadOffsetType::ReadFromStore => { + return match self.fetch_consume_offset_from_broker(mq).await { + Ok(value) => { + self.update_offset(mq, value, false).await; + value + } + Err(e) => match e { + MQClientError::OffsetNotFoundError(_, _, _) => -1, + _ => { + warn!("fetchConsumeOffsetFromBroker exception: {:?}", mq); + -2 + } + }, + }; + } + }; + -3 } - async fn persist_all(&self, mqs: &HashSet) { - todo!() + async fn persist_all(&mut self, mqs: &HashSet) { + if mqs.is_empty() { + return; + } + let mut unused_mq = HashSet::new(); + let mut used_mq = Vec::new(); + let mut offset_table = self.offset_table.lock().await; + for (mq, offset) in offset_table.iter_mut() { + if mqs.contains(mq) { + let offset = offset.get_offset(); + used_mq.push((mq.clone(), offset)); + } else { + unused_mq.insert(mq.clone()); + } + } + for mq in unused_mq { + offset_table.remove(&mq); + info!("remove unused mq, {}, {}", mq, self.group_name); + } + drop(offset_table); + + for (mq, offset) in used_mq { + match self + .update_consume_offset_to_broker(&mq, offset, true) + .await + { + Ok(_) => { + info!( + "[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", + self.group_name, self.client_instance.client_id, mq, offset + ); + } + Err(e) => { + error!("updateConsumeOffsetToBroker exception, {},{}", mq, e); + } + } + } } - async fn persist(&self, mq: &MessageQueue) { - todo!() + async fn persist(&mut self, mq: &MessageQueue) { + let offset_table = self.offset_table.lock().await; + let offset = offset_table.get(mq); + if offset.is_none() { + return; + } + let offset = offset.unwrap().get_offset(); + drop(offset_table); + match self.update_consume_offset_to_broker(mq, offset, true).await { + Ok(_) => { + info!( + "[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", + self.group_name, self.client_instance.client_id, mq, offset + ); + } + Err(e) => { + error!("updateConsumeOffsetToBroker exception, {},{}", mq, e); + } + } } async fn remove_offset(&self, mq: &MessageQueue) { - todo!() + let mut offset_table = self.offset_table.lock().await; + offset_table.remove(mq); + info!( + "remove unnecessary messageQueue offset. group={}, mq={}, offsetTableSize={}", + mq, + self.group_name, + offset_table.len() + ); } async fn clone_offset_table(&self, topic: &str) -> HashMap { - todo!() + let offset_table = self.offset_table.lock().await; + offset_table + .iter() + .filter(|(mq, _)| mq.get_topic() == topic) + .map(|(mq, offset)| (mq.clone(), offset.get_offset())) + .collect() } async fn update_consume_offset_to_broker( - &self, + &mut self, mq: &MessageQueue, offset: i64, is_oneway: bool, ) -> crate::Result<()> { - todo!() + let broker_name = self + .client_instance + .get_broker_name_from_message_queue(mq) + .await; + let mut find_broker_result = self + .client_instance + .find_broker_address_in_subscribe(broker_name.as_str(), mix_all::MASTER_ID, false) + .await; + + if find_broker_result.is_none() { + self.client_instance + .update_topic_route_info_from_name_server_topic(mq.get_topic()) + .await; + let broker_name = self + .client_instance + .get_broker_name_from_message_queue(mq) + .await; + find_broker_result = self + .client_instance + .find_broker_address_in_subscribe(broker_name.as_str(), mix_all::MASTER_ID, false) + .await; + } + + if let Some(find_broker_result) = find_broker_result { + let request_header = UpdateConsumerOffsetRequestHeader { + consumer_group: self.group_name.clone(), + topic: mq.get_topic().to_string(), + queue_id: Some(mq.get_queue_id()), + commit_offset: Some(offset), + topic_request_header: Some(TopicRequestHeader { + lo: None, + rpc: Some(RpcRequestHeader { + namespace: None, + namespaced: None, + broker_name: Some(mq.get_broker_name().to_string()), + oneway: None, + }), + }), + }; + if is_oneway { + self.client_instance + .mq_client_api_impl + .update_consumer_offset_oneway( + find_broker_result.broker_addr.as_str(), + request_header, + 5_000, + ) + .await?; + } else { + self.client_instance + .mq_client_api_impl + .update_consumer_offset( + find_broker_result.broker_addr.as_str(), + request_header, + 5_000, + ) + .await?; + }; + Ok(()) + } else { + Err(MQClientError::MQClientErr( + -1, + format!("broker not found, {}", mq.get_broker_name()), + )) + } } } diff --git a/rocketmq-client/src/factory/mq_client_instance.rs b/rocketmq-client/src/factory/mq_client_instance.rs index 5452683c..69fe4f79 100644 --- a/rocketmq-client/src/factory/mq_client_instance.rs +++ b/rocketmq-client/src/factory/mq_client_instance.rs @@ -25,6 +25,7 @@ use std::time::Duration; use rand::seq::SliceRandom; use rocketmq_common::common::base::service_state::ServiceState; use rocketmq_common::common::constant::PermName; +use rocketmq_common::common::filter::expression_type::ExpressionType; use rocketmq_common::common::message::message_queue::MessageQueue; use rocketmq_common::common::mix_all; use rocketmq_common::ArcRefCellWrapper; @@ -34,7 +35,6 @@ use rocketmq_remoting::protocol::heartbeat::consumer_data::ConsumerData; use rocketmq_remoting::protocol::heartbeat::heartbeat_data::HeartbeatData; use rocketmq_remoting::protocol::heartbeat::producer_data::ProducerData; use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData; -use rocketmq_remoting::protocol::RemotingSerializable; use rocketmq_remoting::rpc::client_metadata::ClientMetadata; use rocketmq_remoting::runtime::config::client_config::TokioClientConfig; use rocketmq_remoting::runtime::RPCHook; @@ -42,6 +42,7 @@ use rocketmq_runtime::RocketMQRuntime; use tokio::runtime::Handle; use tokio::sync::Mutex; use tokio::sync::RwLock; +use tracing::error; use tracing::info; use tracing::warn; @@ -53,6 +54,7 @@ use crate::consumer::consumer_impl::re_balance::rebalance_service::RebalanceServ use crate::consumer::mq_consumer_inner::MQConsumerInner; use crate::error::MQClientError::MQClientErr; use crate::implementation::client_remoting_processor::ClientRemotingProcessor; +use crate::implementation::find_broker_result::FindBrokerResult; use crate::implementation::mq_admin_impl::MQAdminImpl; use crate::implementation::mq_client_api_impl::MQClientAPIImpl; use crate::producer::default_mq_producer::DefaultMQProducer; @@ -93,7 +95,7 @@ where lock_heartbeat: Arc>, service_state: ServiceState, - pull_message_service: ArcRefCellWrapper, + pub(crate) pull_message_service: ArcRefCellWrapper, rebalance_service: RebalanceService, default_mqproducer: ArcRefCellWrapper, instance_runtime: Arc, @@ -149,7 +151,7 @@ where lock_namesrv: Default::default(), lock_heartbeat: Default::default(), service_state: ServiceState::CreateJust, - pull_message_service: ArcRefCellWrapper::new(PullMessageService {}), + pull_message_service: ArcRefCellWrapper::new(PullMessageService::new()), rebalance_service: RebalanceService::new(), default_mqproducer: ArcRefCellWrapper::new( DefaultMQProducer::builder() @@ -222,7 +224,8 @@ where // Start various schedule tasks self.start_scheduled_task(); // Start pull service - self.pull_message_service.start().await; + let instance = self.clone(); + self.pull_message_service.start(instance).await; // Start rebalance service self.rebalance_service.start(self.clone()).await; // Start push service @@ -336,7 +339,28 @@ where } pub async fn update_topic_route_info_from_name_server(&mut self) { - println!("updateTopicRouteInfoFromNameServer") + let mut topic_list = HashSet::new(); + + { + let producer_table = self.producer_table.read().await; + for (_, value) in producer_table.iter() { + topic_list.extend(value.get_publish_topic_list()); + } + } + + { + let consumer_table = self.consumer_table.read().await; + for (_, value) in consumer_table.iter() { + value.subscriptions().iter().for_each(|sub| { + topic_list.insert(sub.topic.clone()); + }); + } + } + + for topic in topic_list.iter() { + self.update_topic_route_info_from_name_server_topic(topic) + .await; + } } #[inline] @@ -345,9 +369,34 @@ where .await } - pub async fn find_consumer_id_list(&self, topic: &str, group: &str) -> Option> { - let broker_addr = self.find_broker_addr_by_topic(topic).await?; - + pub async fn find_consumer_id_list(&mut self, topic: &str, group: &str) -> Option> { + let mut broker_addr = self.find_broker_addr_by_topic(topic).await; + if broker_addr.is_none() { + self.update_topic_route_info_from_name_server_topic(topic) + .await; + broker_addr = self.find_broker_addr_by_topic(topic).await; + } + if let Some(broker_addr) = broker_addr { + match self + .mq_client_api_impl + .get_consumer_id_list_by_group( + broker_addr.as_str(), + group, + self.client_config.mq_client_api_timeout, + ) + .await + { + Ok(value) => return Some(value), + Err(e) => { + warn!( + "getConsumerIdListByGroup exception,{} {}, err:{}", + broker_addr, + group, + e.to_string() + ); + } + } + } None } @@ -456,7 +505,9 @@ where let subscribe_info = topic_route_data2topic_subscribe_info(topic, &topic_route_data); for (_, value) in consumer_table.iter_mut() { - value.update_topic_subscribe_info(topic, &subscribe_info); + value + .update_topic_subscribe_info(topic, &subscribe_info) + .await; } } } @@ -492,7 +543,7 @@ where let consumer_table = self.consumer_table.read().await; for (key, value) in consumer_table.iter() { if !result { - result = value.is_subscribe_topic_need_update(topic); + result = value.is_subscribe_topic_need_update(topic).await; break; } } @@ -702,11 +753,11 @@ where let consumer_table = self.consumer_table.read().await; for (_, value) in consumer_table.iter() { let mut consumer_data = ConsumerData { - group_name: value.group_name().to_json(), + group_name: value.group_name().to_string(), consume_type: value.consume_type(), message_model: value.message_model(), consume_from_where: value.consume_from_where(), - subscription_data_set: value.subscriptions().clone(), + subscription_data_set: value.subscriptions(), unit_mode: value.is_unit_mode(), }; if !is_without_sub { @@ -740,21 +791,144 @@ where } pub async fn check_client_in_broker(&mut self) -> Result<()> { - unimplemented!() + let consumer_table = self.consumer_table.read().await; + for (key, value) in consumer_table.iter() { + let subscription_inner = value.subscriptions(); + if subscription_inner.is_empty() { + return Ok(()); + } + for subscription_data in subscription_inner.iter() { + if ExpressionType::is_tag_type(Some(subscription_data.expression_type.as_str())) { + continue; + } + let addr = self + .find_broker_addr_by_topic(subscription_data.topic.as_str()) + .await; + if let Some(addr) = addr { + match self + .mq_client_api_impl + .check_client_in_broker( + addr.as_str(), + key, + self.client_id.as_str(), + subscription_data, + self.client_config.mq_client_api_timeout, + ) + .await + { + Ok(_) => {} + Err(e) => match e { + MQClientErr(code, desc) => { + return Err(MQClientErr(code, desc)); + } + _ => { + let desc = format!( + "Check client in broker error, maybe because you use {} to \ + filter message, but server has not been upgraded to \ + support!This error would not affect the launch of consumer, \ + but may has impact on message receiving if you have use the \ + new features which are not supported by server, please check \ + the log!", + subscription_data.expression_type + ); + return Err(MQClientErr(-1, desc)); + } + }, + } + } + } + } + + Ok(()) } pub async fn do_rebalance(&mut self) -> bool { let mut balanced = true; let consumer_table = self.consumer_table.read().await; - for (_, value) in consumer_table.iter() { - let result = value.try_rebalance().await; - if let Err(e) = result { - warn!("rebalance exception, {}", e); - balanced = false; + for (key, value) in consumer_table.iter() { + match value.try_rebalance().await { + Ok(result) => { + balanced = result; + } + Err(e) => { + error!( + "doRebalance for consumer group [{}] exception:{}", + key, + e.to_string() + ); + } } } balanced } + + pub fn rebalance_later(&mut self, delay_millis: u64) { + if delay_millis == 0 { + self.rebalance_service.wakeup(); + } else { + let service = self.rebalance_service.clone(); + self.instance_runtime.get_handle().spawn(async move { + tokio::time::sleep(Duration::from_millis(delay_millis)).await; + service.wakeup(); + }); + } + } + + pub async fn find_broker_address_in_subscribe( + &mut self, + broker_name: &str, + broker_id: u64, + only_this_broker: bool, + ) -> Option { + if broker_name.is_empty() { + return None; + } + let broker_addr_table = self.broker_addr_table.read().await; + let map = broker_addr_table.get(broker_name); + let mut broker_addr = None; + let mut slave = false; + let mut found = false; + + if let Some(map) = map { + broker_addr = map.get(&(broker_id as i64)); + slave = broker_id != mix_all::MASTER_ID; + found = broker_addr.is_some(); + if !found && slave { + broker_addr = map.get(&((broker_id + 1) as i64)); + found = broker_addr.is_some(); + } + if !found && !only_this_broker { + unimplemented!("findBrokerAddressInSubscribe") + } + } + if found { + let broker_addr = broker_addr.cloned()?; + let broker_version = self + .find_broker_version(broker_name, broker_addr.as_str()) + .await; + Some(FindBrokerResult { + broker_addr, + slave, + broker_version, + }) + } else { + None + } + } + async fn find_broker_version(&self, broker_name: &str, broker_addr: &str) -> i32 { + let broker_version_table = self.broker_version_table.read().await; + if let Some(map) = broker_version_table.get(broker_name) { + if let Some(version) = map.get(broker_addr) { + return *version; + } + } + 0 + } + + pub async fn select_consumer(&self, group: &str) -> Option { + let consumer_table = self.consumer_table.read().await; + consumer_table.get(group).cloned() + } } pub fn topic_route_data2topic_publish_info( @@ -841,7 +1015,27 @@ pub fn topic_route_data2topic_publish_info( pub fn topic_route_data2topic_subscribe_info( topic: &str, - topic_route_data: &TopicRouteData, + route: &TopicRouteData, ) -> HashSet { - unimplemented!("topicRouteData2TopicSubscribeInfo") + if let Some(ref topic_queue_mapping_by_broker) = route.topic_queue_mapping_by_broker { + if !topic_queue_mapping_by_broker.is_empty() { + let mq_endpoints = + ClientMetadata::topic_route_data2endpoints_for_static_topic(topic, route); + return mq_endpoints + .unwrap_or_default() + .keys() + .cloned() + .collect::>(); + } + } + let mut mq_list = HashSet::new(); + for qd in &route.queue_datas { + if PermName::is_readable(qd.perm) { + for i in 0..qd.read_queue_nums { + let mq = MessageQueue::from_parts(topic, qd.broker_name.as_str(), i as i32); + mq_list.insert(mq); + } + } + } + mq_list } diff --git a/rocketmq-client/src/implementation.rs b/rocketmq-client/src/implementation.rs index b37a7423..f53ae838 100644 --- a/rocketmq-client/src/implementation.rs +++ b/rocketmq-client/src/implementation.rs @@ -17,6 +17,7 @@ pub(crate) mod client_remoting_processor; pub(crate) mod communication_mode; +pub(crate) mod find_broker_result; pub(crate) mod mq_admin_impl; pub(crate) mod mq_client_api_impl; pub(crate) mod mq_client_manager; diff --git a/rocketmq-client/src/implementation/find_broker_result.rs b/rocketmq-client/src/implementation/find_broker_result.rs new file mode 100644 index 00000000..2eee5bb1 --- /dev/null +++ b/rocketmq-client/src/implementation/find_broker_result.rs @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::fmt::Display; + +#[derive(Debug, Clone, Default)] +pub struct FindBrokerResult { + pub broker_addr: String, + pub slave: bool, + pub broker_version: i32, +} + +impl Display for FindBrokerResult { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "FindBrokerResult [broker_addr={}, slave={}, broker_version={}]", + self.broker_addr, self.slave, self.broker_version + ) + } +} diff --git a/rocketmq-client/src/implementation/mq_admin_impl.rs b/rocketmq-client/src/implementation/mq_admin_impl.rs index 3a1accb3..50fbad3d 100644 --- a/rocketmq-client/src/implementation/mq_admin_impl.rs +++ b/rocketmq-client/src/implementation/mq_admin_impl.rs @@ -91,4 +91,11 @@ impl MQAdminImpl { ), )) } + + pub async fn max_offset(&mut self, mq: &MessageQueue) -> Result { + unimplemented!("max_offset") + } + pub async fn search_offset(&mut self, mq: &MessageQueue, timestamp: u64) -> Result { + unimplemented!("max_offset") + } } diff --git a/rocketmq-client/src/implementation/mq_client_api_impl.rs b/rocketmq-client/src/implementation/mq_client_api_impl.rs index 2c0bfc0c..510765de 100644 --- a/rocketmq-client/src/implementation/mq_client_api_impl.rs +++ b/rocketmq-client/src/implementation/mq_client_api_impl.rs @@ -36,12 +36,17 @@ use rocketmq_remoting::clients::rocketmq_default_impl::RocketmqDefaultClient; use rocketmq_remoting::clients::RemotingClient; use rocketmq_remoting::code::request_code::RequestCode; use rocketmq_remoting::code::response_code::ResponseCode; +use rocketmq_remoting::protocol::body::check_client_request_body::CheckClientRequestBody; +use rocketmq_remoting::protocol::body::get_consumer_listby_group_response_body::GetConsumerListByGroupResponseBody; use rocketmq_remoting::protocol::header::client_request_header::GetRouteInfoRequestHeader; +use rocketmq_remoting::protocol::header::get_consumer_listby_group_request_header::GetConsumerListByGroupRequestHeader; use rocketmq_remoting::protocol::header::heartbeat_request_header::HeartbeatRequestHeader; use rocketmq_remoting::protocol::header::message_operation_header::send_message_request_header::SendMessageRequestHeader; use rocketmq_remoting::protocol::header::message_operation_header::send_message_request_header_v2::SendMessageRequestHeaderV2; use rocketmq_remoting::protocol::header::message_operation_header::send_message_response_header::SendMessageResponseHeader; +use rocketmq_remoting::protocol::header::update_consumer_offset_header::UpdateConsumerOffsetRequestHeader; use rocketmq_remoting::protocol::heartbeat::heartbeat_data::HeartbeatData; +use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData; use rocketmq_remoting::protocol::namespace_util::NamespaceUtil; use rocketmq_remoting::protocol::remoting_command::RemotingCommand; use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData; @@ -635,4 +640,133 @@ impl MQClientAPIImpl { addr.to_string(), )) } + + pub async fn check_client_in_broker( + &mut self, + broker_addr: &str, + consumer_group: &str, + client_id: &str, + subscription_data: &SubscriptionData, + timeout_millis: u64, + ) -> Result<()> { + let mut request = RemotingCommand::create_remoting_command(RequestCode::CheckClientConfig); + let body = CheckClientRequestBody::new( + client_id.to_string(), + consumer_group.to_string(), + subscription_data.clone(), + ); + request.set_body_mut_ref(Some(body.encode())); + let response = self + .remoting_client + .invoke_async( + Some(mix_all::broker_vip_channel( + self.client_config.vip_channel_enabled, + broker_addr, + )), + request, + timeout_millis, + ) + .await?; + if ResponseCode::from(response.code()) != ResponseCode::Success { + return Err(MQClientError::MQClientErr( + response.code(), + response.remark().map_or("".to_string(), |s| s.to_string()), + )); + } + Ok(()) + } + + pub async fn get_consumer_id_list_by_group( + &mut self, + addr: &str, + consumer_group: &str, + timeout_millis: u64, + ) -> Result> { + let request_header = GetConsumerListByGroupRequestHeader { + consumer_group: consumer_group.to_string(), + rpc: None, + }; + let request = RemotingCommand::create_request_command( + RequestCode::GetConsumerListByGroup, + request_header, + ); + let response = self + .remoting_client + .invoke_async( + Some(mix_all::broker_vip_channel( + self.client_config.vip_channel_enabled, + addr, + )), + request, + timeout_millis, + ) + .await?; + match ResponseCode::from(response.code()) { + ResponseCode::Success => { + let body = response.body(); + if let Some(body) = response.body() { + return match GetConsumerListByGroupResponseBody::decode(body) { + Ok(value) => Ok(value.consumer_id_list), + Err(e) => Err(MQClientError::MQClientErr( + -1, + response.remark().map_or("".to_string(), |s| s.to_string()), + )), + }; + } + } + _ => { + return Err(MQClientError::MQBrokerError( + response.code(), + response.remark().map_or("".to_string(), |s| s.to_string()), + addr.to_string(), + )); + } + } + Err(MQClientError::MQBrokerError( + response.code(), + response.remark().map_or("".to_string(), |s| s.to_string()), + addr.to_string(), + )) + } + + pub async fn update_consumer_offset_oneway( + &mut self, + addr: &str, + request_header: UpdateConsumerOffsetRequestHeader, + timeout_millis: u64, + ) -> Result<()> { + let request = RemotingCommand::create_request_command( + RequestCode::UpdateConsumerOffset, + request_header, + ); + self.remoting_client + .invoke_oneway(addr.to_string(), request, timeout_millis) + .await; + Ok(()) + } + + pub async fn update_consumer_offset( + &mut self, + addr: &str, + request_header: UpdateConsumerOffsetRequestHeader, + timeout_millis: u64, + ) -> Result<()> { + let request = RemotingCommand::create_request_command( + RequestCode::UpdateConsumerOffset, + request_header, + ); + let response = self + .remoting_client + .invoke_async(Some(addr.to_string()), request, timeout_millis) + .await?; + if ResponseCode::from(response.code()) != ResponseCode::Success { + Err(MQClientError::MQBrokerError( + response.code(), + response.remark().map_or("".to_string(), |s| s.to_string()), + addr.to_string(), + )) + } else { + Ok(()) + } + } } diff --git a/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs b/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs index f5e87a02..da0daa46 100644 --- a/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs +++ b/rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs @@ -1890,7 +1890,20 @@ impl DefaultMQProducerImpl { impl MQProducerInner for DefaultMQProducerImpl { fn get_publish_topic_list(&self) -> HashSet { - todo!() + let handle = Handle::current(); + let topic_publish_info_table = self.topic_publish_info_table.clone(); + thread::spawn(move || { + handle.block_on(async move { + topic_publish_info_table + .read() + .await + .iter() + .map(|(k, _)| k.clone()) + .collect() + }) + }) + .join() + .unwrap() } fn is_publish_topic_need_update(&self, topic: &str) -> bool { diff --git a/rocketmq-common/src/common/constant.rs b/rocketmq-common/src/common/constant.rs index e1fd7632..62d85c17 100644 --- a/rocketmq-common/src/common/constant.rs +++ b/rocketmq-common/src/common/constant.rs @@ -15,6 +15,8 @@ * limitations under the License. */ +pub mod consume_init_mode; + use std::ops::Deref; pub struct PermName; diff --git a/rocketmq-common/src/common/constant/consume_init_mode.rs b/rocketmq-common/src/common/constant/consume_init_mode.rs new file mode 100644 index 00000000..5b8a8bf1 --- /dev/null +++ b/rocketmq-common/src/common/constant/consume_init_mode.rs @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +pub struct ConsumeInitMode; + +impl ConsumeInitMode { + pub const MIN: i32 = 0; + pub const MAX: i32 = 1; +} diff --git a/rocketmq-common/src/utils/util_all.rs b/rocketmq-common/src/utils/util_all.rs index 40b0fe81..e7022ce2 100644 --- a/rocketmq-common/src/utils/util_all.rs +++ b/rocketmq-common/src/utils/util_all.rs @@ -29,6 +29,9 @@ use std::time::UNIX_EPOCH; use chrono::DateTime; use chrono::Datelike; use chrono::Local; +use chrono::NaiveDateTime; +use chrono::ParseError; +use chrono::ParseResult; use chrono::TimeZone; use chrono::Timelike; use chrono::Utc; @@ -41,6 +44,10 @@ use crate::common::mix_all::MULTI_PATH_SPLITTER; use crate::error::Error::RuntimeException; use crate::Result; +pub const YYYY_MM_DD_HH_MM_SS: &str = "%Y-%m-%d %H:%M:%S%"; +pub const YYYY_MM_DD_HH_MM_SS_SSS: &str = "%Y-%m-%d %H:%M:%S%.f"; +pub const YYYYMMDDHHMMSS: &str = "%Y%m%d%H%M%S%"; + const HEX_ARRAY: [char; 16] = [ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F', ]; @@ -292,6 +299,13 @@ pub fn get_ip() -> Result> { } } +pub fn parse_date(date: &str, pattern: &str) -> Option { + match NaiveDateTime::parse_from_str(date, pattern) { + Ok(value) => Some(value), + Err(_) => None, + } +} + #[cfg(test)] mod tests { use std::time::Instant; diff --git a/rocketmq-remoting/src/protocol/body.rs b/rocketmq-remoting/src/protocol/body.rs index dacd0bb0..49ed1db7 100644 --- a/rocketmq-remoting/src/protocol/body.rs +++ b/rocketmq-remoting/src/protocol/body.rs @@ -22,11 +22,13 @@ pub mod get_consumer_listby_group_response_body; pub mod consumer_connection; +pub mod check_client_request_body; pub mod cm_result; pub mod connection; pub mod consume_message_directly_result; pub mod group_list; pub mod kv_table; pub mod pop_process_queue_info; +pub mod process_queue_info; pub mod topic; pub mod topic_info_wrapper; diff --git a/rocketmq-remoting/src/protocol/body/check_client_request_body.rs b/rocketmq-remoting/src/protocol/body/check_client_request_body.rs new file mode 100644 index 00000000..21d613f7 --- /dev/null +++ b/rocketmq-remoting/src/protocol/body/check_client_request_body.rs @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use serde::Deserialize; +use serde::Serialize; + +use crate::protocol::heartbeat::subscription_data::SubscriptionData; + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct CheckClientRequestBody { + pub client_id: String, + pub group: String, + pub subscription_data: SubscriptionData, + pub namespace: Option, +} + +impl CheckClientRequestBody { + pub fn new(client_id: String, group: String, subscription_data: SubscriptionData) -> Self { + Self { + client_id, + group, + subscription_data, + namespace: None, + } + } + + pub fn get_client_id(&self) -> &String { + &self.client_id + } + + pub fn set_client_id(&mut self, client_id: String) { + self.client_id = client_id; + } + + pub fn get_group(&self) -> &String { + &self.group + } + + pub fn set_group(&mut self, group: String) { + self.group = group; + } + + pub fn get_subscription_data(&self) -> &SubscriptionData { + &self.subscription_data + } + + pub fn set_subscription_data(&mut self, subscription_data: SubscriptionData) { + self.subscription_data = subscription_data; + } +} diff --git a/rocketmq-remoting/src/protocol/body/process_queue_info.rs b/rocketmq-remoting/src/protocol/body/process_queue_info.rs new file mode 100644 index 00000000..047a8779 --- /dev/null +++ b/rocketmq-remoting/src/protocol/body/process_queue_info.rs @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#[derive(Debug, Clone, Copy)] +pub struct ProcessQueueInfo { + pub commit_offset: u64, + pub cached_msg_min_offset: u64, + pub cached_msg_max_offset: u64, + pub cached_msg_count: u32, + pub cached_msg_size_in_mib: u32, + + pub transaction_msg_min_offset: u64, + pub transaction_msg_max_offset: u64, + pub transaction_msg_count: u32, + + pub locked: bool, + pub try_unlock_times: u64, + pub last_lock_timestamp: u64, + + pub droped: bool, + pub last_pull_timestamp: u64, + pub last_consume_timestamp: u64, +} + +impl std::fmt::Display for ProcessQueueInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "ProcessQueueInfo [commit_offset: {}, cached_msg_min_offset: {}, \ + cached_msg_max_offset: {}, cached_msg_count: {}, cached_msg_size_in_mib: {}, \ + transaction_msg_min_offset: {}, transaction_msg_max_offset: {}, \ + transaction_msg_count: {}, locked: {}, try_unlock_times: {}, last_lock_timestamp: \ + {}, droped: {}, last_pull_timestamp: {}, last_consume_timestamp: {}]", + self.commit_offset, + self.cached_msg_min_offset, + self.cached_msg_max_offset, + self.cached_msg_count, + self.cached_msg_size_in_mib, + self.transaction_msg_min_offset, + self.transaction_msg_max_offset, + self.transaction_msg_count, + self.locked, + self.try_unlock_times, + self.last_lock_timestamp, + self.droped, + self.last_pull_timestamp, + self.last_consume_timestamp + ) + } +}