Skip to content

Commit

Permalink
[ISSUE #966]🚀Support client consumer message-3🚀 (#968)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Sep 8, 2024
1 parent c54bd64 commit 27debb7
Show file tree
Hide file tree
Showing 19 changed files with 414 additions and 187 deletions.
26 changes: 20 additions & 6 deletions rocketmq-client/examples/quickstart/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
* limitations under the License.
*/
use rocketmq_client::consumer::default_mq_push_consumer::DefaultMQPushConsumer;
use rocketmq_client::consumer::listener::consume_concurrently_context::ConsumeConcurrentlyContext;
use rocketmq_client::consumer::listener::consume_concurrently_status::ConsumeConcurrentlyStatus;
use rocketmq_client::consumer::listener::message_listener_concurrently::MessageListenerConcurrently;
use rocketmq_client::consumer::mq_push_consumer::MQPushConsumer;
use rocketmq_client::Result;
use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_rust::rocketmq;

pub const MESSAGE_COUNT: usize = 1;
Expand All @@ -37,14 +41,24 @@ pub async fn main() -> Result<()> {
.consumer_group(CONSUMER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.build();
consumer.subscribe(TOPIC, "*")?;
consumer.register_message_listener_concurrently(MyMessageListener);
consumer.start().await?;

Ok(())
}

/*consumer.register_message_listener_concurrently(|msgs, _context| {
pub struct MyMessageListener;

impl MessageListenerConcurrently for MyMessageListener {
fn consume_message(
&self,
msgs: Vec<MessageExt>,
_context: ConsumeConcurrentlyContext,
) -> Result<ConsumeConcurrentlyStatus> {
for msg in msgs {
println!("Receive message: {:?}", msg);
}
Ok(())
});*/
consumer.start().await?;

Ok(())
Ok(ConsumeConcurrentlyStatus::ConsumeSuccess)
}
}
38 changes: 16 additions & 22 deletions rocketmq-client/src/base/validators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use rocketmq_common::common::message::MessageTrait;
use rocketmq_common::common::topic::TopicValidator;
use rocketmq_remoting::code::response_code::ResponseCode;

use crate::error::MQClientError::MQClientException;
use crate::error::MQClientError::MQClientErr;
use crate::producer::default_mq_producer::ProducerConfig;
use crate::Result;

Expand All @@ -35,21 +35,18 @@ impl Validators {

pub fn check_group(group: &str) -> Result<()> {
if group.trim().is_empty() {
return Err(MQClientException(
-1,
"the specified group is blank".to_string(),
));
return Err(MQClientErr(-1, "the specified group is blank".to_string()));
}

if group.len() > Self::CHARACTER_MAX_LENGTH {
return Err(MQClientException(
return Err(MQClientErr(
-1,
"the specified group is longer than group max length 255.".to_string(),
));
}

if TopicValidator::is_topic_or_group_illegal(group) {
return Err(MQClientException(
return Err(MQClientErr(
-1,
format!(
"the specified group[{}] contains illegal characters, allowing only \
Expand All @@ -67,7 +64,7 @@ impl Validators {
M: MessageTrait,
{
if msg.is_none() {
return Err(MQClientException(
return Err(MQClientErr(
ResponseCode::MessageIllegal as i32,
"the message is null".to_string(),
));
Expand All @@ -77,22 +74,22 @@ impl Validators {
Self::is_not_allowed_send_topic(msg.get_topic())?;

if msg.get_body().is_none() {
return Err(MQClientException(
return Err(MQClientErr(
ResponseCode::MessageIllegal as i32,
"the message body is null".to_string(),
));
}

let length = msg.get_body().unwrap().len();
if length == 0 {
return Err(MQClientException(
return Err(MQClientErr(
ResponseCode::MessageIllegal as i32,
"the message body length is zero".to_string(),
));
}

if length > producer_config.max_message_size() as usize {
return Err(MQClientException(
return Err(MQClientErr(
ResponseCode::MessageIllegal as i32,
format!(
"the message body size over max value, MAX: {}",
Expand All @@ -104,7 +101,7 @@ impl Validators {
let lmq_path = msg.get_user_property(MessageConst::PROPERTY_INNER_MULTI_DISPATCH);
if let Some(value) = lmq_path {
if value.contains(std::path::MAIN_SEPARATOR) {
return Err(MQClientException(
return Err(MQClientErr(
ResponseCode::MessageIllegal as i32,
format!(
"INNER_MULTI_DISPATCH {} can not contains {} character",
Expand All @@ -120,14 +117,11 @@ impl Validators {

pub fn check_topic(topic: &str) -> Result<()> {
if topic.trim().is_empty() {
return Err(MQClientException(
-1,
"The specified topic is blank".to_string(),
));
return Err(MQClientErr(-1, "The specified topic is blank".to_string()));
}

if topic.len() > Self::TOPIC_MAX_LENGTH {
return Err(MQClientException(
return Err(MQClientErr(
-1,
format!(
"The specified topic is longer than topic max length {}.",
Expand All @@ -137,7 +131,7 @@ impl Validators {
}

if TopicValidator::is_topic_or_group_illegal(topic) {
return Err(MQClientException(
return Err(MQClientErr(
-1,
format!(
"The specified topic[{}] contains illegal characters, allowing only \
Expand All @@ -152,7 +146,7 @@ impl Validators {

pub fn is_system_topic(topic: &str) -> Result<()> {
if TopicValidator::is_system_topic(topic) {
return Err(MQClientException(
return Err(MQClientErr(
-1,
format!("The topic[{}] is conflict with system topic.", topic),
));
Expand All @@ -162,7 +156,7 @@ impl Validators {

pub fn is_not_allowed_send_topic(topic: &str) -> Result<()> {
if TopicValidator::is_not_allowed_send_topic(topic) {
return Err(MQClientException(
return Err(MQClientErr(
-1,
format!("Sending message to topic[{}] is forbidden.", topic),
));
Expand All @@ -173,7 +167,7 @@ impl Validators {

pub fn check_topic_config(topic_config: &TopicConfig) -> Result<()> {
if !PermName::is_valid(topic_config.perm) {
return Err(MQClientException(
return Err(MQClientErr(
ResponseCode::NoPermission as i32,
format!("topicPermission value: {} is invalid.", topic_config.perm),
));
Expand All @@ -185,7 +179,7 @@ impl Validators {
pub fn check_broker_config(broker_config: &HashMap<String, String>) -> Result<()> {
if let Some(broker_permission) = broker_config.get("brokerPermission") {
if !PermName::is_valid(broker_permission.parse().unwrap()) {
return Err(MQClientException(
return Err(MQClientErr(
-1,
format!("brokerPermission value: {} is invalid.", broker_permission),
));
Expand Down
1 change: 1 addition & 0 deletions rocketmq-client/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ pub mod mq_push_consumer;
mod pull_callback;
mod pull_result;
mod pull_status;
pub mod rebalance_strategy;
mod store;
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ pub trait AllocateMessageQueueStrategy: Send + Sync {
cid_all: &[String],
) -> Result<Vec<MessageQueue>>;

fn get_name(&self) -> &str;
fn get_name(&self) -> &'static str;
}
Loading

0 comments on commit 27debb7

Please sign in to comment.