Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #972]🔥Supports client clusting consume🚀 #980

Merged
merged 2 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions rocketmq-client/examples/quickstart/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub const MESSAGE_COUNT: usize = 1;
pub const CONSUMER_GROUP: &str = "please_rename_unique_group_name_4";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "TopicTest";
pub const TAG: &str = "TagA";
pub const TAG: &str = "*";

#[rocketmq::main]
pub async fn main() -> Result<()> {
Expand All @@ -53,8 +53,8 @@ pub struct MyMessageListener;
impl MessageListenerConcurrently for MyMessageListener {
fn consume_message(
&self,
msgs: Vec<MessageExt>,
_context: ConsumeConcurrentlyContext,
msgs: &[&MessageExt],
_context: &ConsumeConcurrentlyContext,
) -> Result<ConsumeConcurrentlyStatus> {
for msg in msgs {
println!("Receive message: {:?}", msg);
Expand Down
7 changes: 5 additions & 2 deletions rocketmq-client/src/base/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,18 @@
)
}

pub fn queue_with_namespace(&mut self, queue: &mut MessageQueue) {
pub fn queue_with_namespace(&mut self, queue: MessageQueue) -> MessageQueue {

Check warning on line 145 in rocketmq-client/src/base/client_config.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/base/client_config.rs#L145

Added line #L145 was not covered by tests
if let Some(namespace) = self.get_namespace() {
if !namespace.is_empty() {
queue.set_topic(NamespaceUtil::wrap_namespace(
let mut message_queue = queue.clone();
message_queue.set_topic(NamespaceUtil::wrap_namespace(

Check warning on line 149 in rocketmq-client/src/base/client_config.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/base/client_config.rs#L148-L149

Added lines #L148 - L149 were not covered by tests
namespace.as_str(),
queue.get_topic(),
));
return message_queue;

Check warning on line 153 in rocketmq-client/src/base/client_config.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/base/client_config.rs#L153

Added line #L153 was not covered by tests
}
}
queue

Check warning on line 156 in rocketmq-client/src/base/client_config.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/base/client_config.rs#L156

Added line #L156 was not covered by tests
}

pub fn get_namespace(&mut self) -> Option<String> {
Expand Down
8 changes: 4 additions & 4 deletions rocketmq-client/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ pub mod message_selector;
pub mod mq_consumer;
pub(crate) mod mq_consumer_inner;
pub mod mq_push_consumer;
mod pull_callback;
mod pull_result;
mod pull_status;
pub(crate) mod pull_callback;
pub(crate) mod pull_result;
pub(crate) mod pull_status;
pub mod rebalance_strategy;
mod store;
pub(crate) mod store;
1 change: 1 addition & 0 deletions rocketmq-client/src/consumer/consumer_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub(crate) mod process_queue;
pub(crate) mod pull_api_wrapper;
pub(crate) mod pull_message_service;
pub(crate) mod pull_request;
pub(crate) mod pull_request_ext;
pub(crate) mod re_balance;

pub(crate) static PULL_MAX_IDLE_TIME: Lazy<u64> = Lazy::new(|| {
Expand Down
Loading
Loading