Skip to content

Commit

Permalink
[ISSUE #967]🚀 Support client consumer message-4 🚀 (#971)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Sep 11, 2024
1 parent df3c5ca commit 487a339
Show file tree
Hide file tree
Showing 30 changed files with 1,779 additions and 185 deletions.
2 changes: 1 addition & 1 deletion rocketmq-client/examples/quickstart/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub async fn main() -> Result<()> {
consumer.subscribe(TOPIC, "*")?;
consumer.register_message_listener_concurrently(MyMessageListener);
consumer.start().await?;

let _ = tokio::signal::ctrl_c().await;
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,68 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use std::sync::Arc;
use std::time::Duration;

use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::ArcRefCellWrapper;
use rocketmq_common::WeakCellWrapper;
use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;

use crate::base::client_config::ClientConfig;
use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageServiceTrait;
use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl;
use crate::consumer::consumer_impl::pop_process_queue::PopProcessQueue;
use crate::consumer::consumer_impl::process_queue::ProcessQueue;
use crate::consumer::default_mq_push_consumer::ConsumerConfig;
use crate::consumer::listener::message_listener_concurrently::ArcBoxMessageListenerConcurrently;

#[derive(Clone)]
pub struct ConsumeMessageConcurrentlyService {
pub(crate) default_mqpush_consumer_impl: Option<WeakCellWrapper<DefaultMQPushConsumerImpl>>,
pub(crate) client_config: ArcRefCellWrapper<ClientConfig>,
pub(crate) consumer_config: ArcRefCellWrapper<ConsumerConfig>,
pub(crate) consumer_group: Arc<String>,
pub(crate) message_listener: ArcBoxMessageListenerConcurrently,
}

impl ConsumeMessageConcurrentlyService {
pub fn new(
client_config: ArcRefCellWrapper<ClientConfig>,
consumer_config: ArcRefCellWrapper<ConsumerConfig>,
consumer_group: String,
message_listener: ArcBoxMessageListenerConcurrently,
) -> Self {
Self {
default_mqpush_consumer_impl: None,
client_config,
consumer_config,
consumer_group: Arc::new(consumer_group),
message_listener,
}
}
}

pub struct ConsumeMessageConcurrentlyService;
impl ConsumeMessageConcurrentlyService {
async fn clean_expire_msg(&mut self) {
println!("===========================")
}
}

impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService {
fn start(&mut self) {
todo!()
let mut this = self.clone();
tokio::spawn(async move {
let timeout = this.consumer_config.consume_timeout;
let mut interval = tokio::time::interval(Duration::from_secs(timeout * 60));
interval.tick().await;
loop {
interval.tick().await;
this.clean_expire_msg().await;
}
});
}

fn shutdown(&self, await_terminate_millis: u64) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,50 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::sync::Arc;

use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::ArcRefCellWrapper;
use rocketmq_common::WeakCellWrapper;
use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;

use crate::base::client_config::ClientConfig;
use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageServiceTrait;
use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl;
use crate::consumer::consumer_impl::pop_process_queue::PopProcessQueue;
use crate::consumer::consumer_impl::process_queue::ProcessQueue;
use crate::consumer::default_mq_push_consumer::ConsumerConfig;
use crate::consumer::listener::message_listener_concurrently::ArcBoxMessageListenerConcurrently;

pub struct ConsumeMessagePopConcurrentlyService {
pub(crate) default_mqpush_consumer_impl: Option<WeakCellWrapper<DefaultMQPushConsumerImpl>>,
pub(crate) client_config: ArcRefCellWrapper<ClientConfig>,
pub(crate) consumer_config: ArcRefCellWrapper<ConsumerConfig>,
pub(crate) consumer_group: Arc<String>,
pub(crate) message_listener: ArcBoxMessageListenerConcurrently,
}

pub struct ConsumeMessagePopConcurrentlyService;
impl ConsumeMessagePopConcurrentlyService {
pub fn new(
client_config: ArcRefCellWrapper<ClientConfig>,
consumer_config: ArcRefCellWrapper<ConsumerConfig>,
consumer_group: String,
message_listener: ArcBoxMessageListenerConcurrently,
) -> Self {
Self {
default_mqpush_consumer_impl: None,
client_config,
consumer_config,
consumer_group: Arc::new(consumer_group),
message_listener,
}
}
}

impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService {
fn start(&mut self) {
todo!()
// nothing to do
}

fn shutdown(&self, await_terminate_millis: u64) {
Expand Down
Loading

0 comments on commit 487a339

Please sign in to comment.