Skip to content

Commit

Permalink
[ISSUE #1004]🚀optimize and improve consume logic⚡️ (#1008)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Sep 27, 2024
1 parent 190a50f commit 62fd26c
Show file tree
Hide file tree
Showing 17 changed files with 682 additions and 47 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,7 @@ cfg-if = "1.0.0"
sysinfo = "0.31.4"
uuid = { version = "1.10.0", features = ["v4", # Lets you generate random UUIDs
"fast-rng", # Use a faster (but still sufficiently random) RNG
"macro-diagnostics", ] }
"macro-diagnostics", ] }


futures = "0.3"
3 changes: 3 additions & 0 deletions rocketmq-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ regex = { version = "1.10.6", features = [] }
parking_lot = { workspace = true }
once_cell = { workspace = true }
bytes = { workspace = true }


futures = { workspace = true }
[[example]]
name = "simple-producer"
path = "examples/producer/simple_producer.rs"
Expand Down
14 changes: 12 additions & 2 deletions rocketmq-client/src/consumer/consumer_impl/process_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use rocketmq_common::MessageAccessor::MessageAccessor;
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_common::WeakCellWrapper;
use rocketmq_remoting::protocol::body::process_queue_info::ProcessQueueInfo;
use rocketmq_rust::RocketMQTokioRwLock;
use tokio::sync::RwLock;

use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl;
Expand All @@ -56,7 +57,7 @@ pub(crate) struct ProcessQueue {
Arc<RwLock<std::collections::BTreeMap<i64, ArcRefCellWrapper<MessageClientExt>>>>,
pub(crate) msg_count: Arc<AtomicU64>,
pub(crate) msg_size: Arc<AtomicU64>,
pub(crate) consume_lock: Arc<RwLock<()>>,
pub(crate) consume_lock: Arc<RocketMQTokioRwLock<()>>,
pub(crate) consuming_msg_orderly_tree_map:
Arc<RwLock<std::collections::BTreeMap<i64, ArcRefCellWrapper<MessageClientExt>>>>,
pub(crate) try_unlock_times: Arc<AtomicI64>,
Expand All @@ -77,7 +78,7 @@ impl ProcessQueue {
msg_tree_map: Arc::new(RwLock::new(std::collections::BTreeMap::new())),
msg_count: Arc::new(AtomicU64::new(0)),
msg_size: Arc::new(AtomicU64::new(0)),
consume_lock: Arc::new(RwLock::new(())),
consume_lock: Arc::new(RocketMQTokioRwLock::new(())),
consuming_msg_orderly_tree_map: Arc::new(
RwLock::new(std::collections::BTreeMap::new()),
),
Expand Down Expand Up @@ -124,6 +125,10 @@ impl ProcessQueue {
> *REBALANCE_LOCK_MAX_LIVE_TIME
}

pub(crate) fn inc_try_unlock_times(&self) {
self.try_unlock_times.fetch_add(1, Ordering::AcqRel);
}

pub(crate) async fn clean_expired_msg(
&self,
push_consumer: Option<WeakCellWrapper<DefaultMQPushConsumerImpl>>,
Expand Down Expand Up @@ -308,6 +313,11 @@ impl ProcessQueue {
.store(last_pull_timestamp, std::sync::atomic::Ordering::Release);
}

pub(crate) fn set_last_lock_timestamp(&self, last_lock_timestamp: u64) {
self.last_lock_timestamp
.store(last_lock_timestamp, std::sync::atomic::Ordering::Release);
}

pub fn msg_count(&self) -> u64 {
self.msg_count.load(std::sync::atomic::Ordering::Acquire)
}
Expand Down
17 changes: 15 additions & 2 deletions rocketmq-client/src/consumer/consumer_impl/re_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub trait RebalanceLocal {
) -> bool;

fn remove_unnecessary_pop_message_queue(&self, mq: MessageQueue, pq: ProcessQueue) -> bool;

fn remove_unnecessary_pop_message_queue_pop(
&self,
_mq: MessageQueue,
Expand All @@ -53,22 +54,34 @@ pub trait RebalanceLocal {
}

fn consume_type(&self) -> ConsumeType;
async fn remove_dirty_offset(&self, mq: &MessageQueue);

async fn remove_dirty_offset(&mut self, mq: &MessageQueue);

async fn compute_pull_from_where_with_exception(&mut self, mq: &MessageQueue) -> Result<i64>;

async fn compute_pull_from_where(&mut self, mq: &MessageQueue) -> i64;

fn get_consume_init_mode(&self) -> i32;

async fn dispatch_pull_request(&self, pull_request_list: Vec<PullRequest>, delay: u64);

fn dispatch_pop_pull_request(&self, pull_request_list: Vec<PopRequest>, delay: u64);

fn create_process_queue(&self) -> ProcessQueue;

fn create_pop_process_queue(&self) -> PopProcessQueue;

async fn remove_process_queue(&mut self, mq: &MessageQueue);
fn unlock(&self, mq: MessageQueue, oneway: bool);

async fn unlock(&mut self, mq: &MessageQueue, oneway: bool);

fn lock_all(&self);

fn unlock_all(&self, oneway: bool);

async fn do_rebalance(&mut self, is_order: bool) -> bool;

fn client_rebalance(&mut self, topic: &str) -> bool;

fn destroy(&mut self);
}
Loading

0 comments on commit 62fd26c

Please sign in to comment.