Skip to content

Commit

Permalink
[ISSUE #802]🔥Optimze CommitLog#put_message method, add TopicQueueLock…
Browse files Browse the repository at this point in the history
…⚡️ (#803)
  • Loading branch information
mxsm authored Jul 17, 2024
1 parent a2560ea commit 6543686
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 3 deletions.
2 changes: 1 addition & 1 deletion rocketmq-store/src/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub mod store_checkpoint;
pub mod store_enum;
pub mod store_stats_service;
pub mod swappable;
mod topic_queue_lock;
pub mod topic_queue_lock;
pub mod transient_store_pool;

pub struct ByteBuffer<'a> {
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-store/src/config/message_store_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ impl Default for MessageStoreConfig {
mem_table_flush_interval_ms: 0,
real_time_persist_rocksdb_config: false,
enable_rocksdb_log: false,
topic_queue_lock_num: 0,
topic_queue_lock_num: 32,
max_filter_message_size: 16000,
}
}
Expand Down
19 changes: 18 additions & 1 deletion rocketmq-store/src/log_file/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use crate::base::put_message_context::PutMessageContext;
use crate::base::select_result::SelectMappedBufferResult;
use crate::base::store_checkpoint::StoreCheckpoint;
use crate::base::swappable::Swappable;
use crate::base::topic_queue_lock::TopicQueueLock;
use crate::config::broker_role::BrokerRole;
use crate::config::message_store_config::MessageStoreConfig;
use crate::consume_queue::mapped_file_queue::MappedFileQueue;
Expand Down Expand Up @@ -192,6 +193,7 @@ pub struct CommitLog {
store_checkpoint: Arc<StoreCheckpoint>,
append_message_callback: Arc<DefaultAppendMessageCallback>,
put_message_lock: Arc<tokio::sync::Mutex<()>>,
topic_queue_lock: Arc<TopicQueueLock>,
topic_config_table: Arc<parking_lot::Mutex<HashMap<String, TopicConfig>>>,
consume_queue_store: ConsumeQueueStore,
flush_manager: Arc<tokio::sync::Mutex<DefaultFlushManager>>,
Expand Down Expand Up @@ -227,6 +229,9 @@ impl CommitLog {
topic_config_table.clone(),
)),
put_message_lock: Arc::new(Default::default()),
topic_queue_lock: Arc::new(TopicQueueLock::new(
message_store_config.topic_queue_lock_num,
)),
topic_config_table,
consume_queue_store,
flush_manager: Arc::new(tokio::sync::Mutex::new(DefaultFlushManager::new(
Expand Down Expand Up @@ -351,8 +356,13 @@ impl CommitLog {
);

let topic_queue_key = generate_key(&msg_batch.message_ext_broker_inner);
put_message_context.set_topic_queue_table_key(topic_queue_key);
put_message_context.set_topic_queue_table_key(topic_queue_key.clone());
msg_batch.encoded_buff = encoded_buff;
let topic_queue_lock = self
.topic_queue_lock
.lock(topic_queue_key.as_str())
.lock()
.await;
self.assign_offset(&mut msg_batch.message_ext_broker_inner);

let lock = self.put_message_lock.lock().await;
Expand Down Expand Up @@ -461,6 +471,7 @@ impl CommitLog {
&msg_batch.message_ext_broker_inner,
put_message_context.get_batch_size() as i16,
);
drop(topic_queue_lock);
self.handle_disk_flush_and_ha(
put_message_result,
msg_batch.message_ext_broker_inner,
Expand Down Expand Up @@ -531,6 +542,11 @@ impl CommitLog {
let need_assign_offset = !(self.message_store_config.duplication_enable
&& self.message_store_config.broker_role != BrokerRole::Slave);

let topic_queue_lock = self
.topic_queue_lock
.lock(topic_queue_key.as_str())
.lock()
.await;
if need_assign_offset {
self.assign_offset(&mut msg);
}
Expand Down Expand Up @@ -643,6 +659,7 @@ impl CommitLog {
if put_message_result.put_message_status() == PutMessageStatus::PutOk {
let message_num = get_message_num(&self.topic_config_table, &msg);
self.increase_offset(&msg, message_num);
drop(topic_queue_lock);
self.handle_disk_flush_and_ha(put_message_result, msg, need_ack_nums, need_handle_ha)
.await
} else {
Expand Down

0 comments on commit 6543686

Please sign in to comment.