Skip to content

Commit

Permalink
[ISSUE #972]🔥Supports client clusting consume🚀
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Sep 20, 2024
1 parent 887e9f6 commit 4edbba7
Show file tree
Hide file tree
Showing 38 changed files with 2,674 additions and 186 deletions.
6 changes: 3 additions & 3 deletions rocketmq-client/examples/quickstart/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub const MESSAGE_COUNT: usize = 1;
pub const CONSUMER_GROUP: &str = "please_rename_unique_group_name_4";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "TopicTest";
pub const TAG: &str = "TagA";
pub const TAG: &str = "*";

#[rocketmq::main]
pub async fn main() -> Result<()> {
Expand All @@ -53,8 +53,8 @@ pub struct MyMessageListener;
impl MessageListenerConcurrently for MyMessageListener {
fn consume_message(
&self,
msgs: Vec<MessageExt>,
_context: ConsumeConcurrentlyContext,
msgs: &[&MessageExt],
_context: &ConsumeConcurrentlyContext,
) -> Result<ConsumeConcurrentlyStatus> {
for msg in msgs {
println!("Receive message: {:?}", msg);
Expand Down
7 changes: 5 additions & 2 deletions rocketmq-client/src/base/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,18 @@ impl ClientConfig {
)
}

pub fn queue_with_namespace(&mut self, queue: &mut MessageQueue) {
pub fn queue_with_namespace(&mut self, queue: MessageQueue) -> MessageQueue {
if let Some(namespace) = self.get_namespace() {
if !namespace.is_empty() {
queue.set_topic(NamespaceUtil::wrap_namespace(
let mut message_queue = queue.clone();
message_queue.set_topic(NamespaceUtil::wrap_namespace(
namespace.as_str(),
queue.get_topic(),
));
return message_queue;
}
}
queue
}

pub fn get_namespace(&mut self) -> Option<String> {
Expand Down
8 changes: 4 additions & 4 deletions rocketmq-client/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ pub mod message_selector;
pub mod mq_consumer;
pub(crate) mod mq_consumer_inner;
pub mod mq_push_consumer;
mod pull_callback;
mod pull_result;
mod pull_status;
pub(crate) mod pull_callback;
pub(crate) mod pull_result;
pub(crate) mod pull_status;
pub mod rebalance_strategy;
mod store;
pub(crate) mod store;
1 change: 1 addition & 0 deletions rocketmq-client/src/consumer/consumer_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub(crate) mod process_queue;
pub(crate) mod pull_api_wrapper;
pub(crate) mod pull_message_service;
pub(crate) mod pull_request;
pub(crate) mod pull_request_ext;
pub(crate) mod re_balance;

pub(crate) static PULL_MAX_IDLE_TIME: Lazy<u64> = Lazy::new(|| {
Expand Down
Loading

0 comments on commit 4edbba7

Please sign in to comment.