Skip to content

Commit

Permalink
[ISSUE #419]✨Implement message persistence to disk during transmissio…
Browse files Browse the repository at this point in the history
…n-2✨ (#420)

* fix some bug

* fix DefaultFlushManager new

* fix bug

* add

* add

* [ISSUE #419]Implement message persistence to disk during transmission-2
  • Loading branch information
mxsm authored Jun 3, 2024
1 parent 425114b commit dc31c0b
Show file tree
Hide file tree
Showing 9 changed files with 267 additions and 51 deletions.
4 changes: 2 additions & 2 deletions rocketmq-store/src/base/store_checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ impl StoreCheckpoint {
let min = self
.physic_msg_timestamp
.load(Ordering::Relaxed)
.min(self.logics_msg_timestamp.load(Ordering::Relaxed));
.min(self.logics_msg_timestamp.load(Ordering::Relaxed)) as i64;
let min = min - 1000 * 3;
min.max(0)
min.max(0) as u64
}
pub fn get_min_timestamp_index(&self) -> u64 {
self.get_min_timestamp()
Expand Down
26 changes: 13 additions & 13 deletions rocketmq-store/src/config/message_store_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ pub struct MessageStoreConfig {
pub mapped_file_size_consume_queue_ext: usize,
pub mapper_file_size_batch_consume_queue: usize,
pub bit_map_length_consume_queue_ext: usize,
pub flush_interval_commit_log: usize,
pub commit_interval_commit_log: usize,
pub flush_interval_commit_log: i32,
pub commit_interval_commit_log: u64,
pub max_recovery_commit_log_files: usize,
pub disk_space_warning_level_ratio: usize,
pub disk_space_clean_forcibly_ratio: usize,
Expand All @@ -92,12 +92,12 @@ pub struct MessageStoreConfig {
pub put_msg_index_hight_water: usize,
pub max_message_size: i32,
pub check_crc_on_recover: bool,
pub flush_commit_log_least_pages: usize,
pub commit_commit_log_least_pages: usize,
pub flush_commit_log_least_pages: i32,
pub commit_commit_log_least_pages: i32,
pub flush_least_pages_when_warm_mapped_file: usize,
pub flush_consume_queue_least_pages: usize,
pub flush_commit_log_thorough_interval: usize,
pub commit_commit_log_thorough_interval: usize,
pub flush_commit_log_thorough_interval: i32,
pub commit_commit_log_thorough_interval: u64,
pub flush_consume_queue_thorough_interval: usize,
pub max_transfer_bytes_on_message_in_memory: usize,
pub max_transfer_count_on_message_in_memory: usize,
Expand Down Expand Up @@ -255,13 +255,13 @@ impl Default for MessageStoreConfig {
mapped_file_size_consume_queue_ext: 48 * 1024 * 1024,
mapper_file_size_batch_consume_queue: 300000 * 46,
bit_map_length_consume_queue_ext: 64,
flush_interval_commit_log: 0,
commit_interval_commit_log: 0,
flush_interval_commit_log: 500,
commit_interval_commit_log: 200,
max_recovery_commit_log_files: 0,
disk_space_warning_level_ratio: 0,
disk_space_clean_forcibly_ratio: 0,
use_reentrant_lock_when_put_message: false,
flush_commit_log_timed: false,
flush_commit_log_timed: true,
flush_interval_consume_queue: 1000,
clean_resource_interval: 10000,
delete_commit_log_files_interval: 100,
Expand All @@ -276,11 +276,11 @@ impl Default for MessageStoreConfig {
max_message_size: 1024 * 1024 * 4,
check_crc_on_recover: false,
flush_commit_log_least_pages: 0,
commit_commit_log_least_pages: 0,
commit_commit_log_least_pages: 4,
flush_least_pages_when_warm_mapped_file: 0,
flush_consume_queue_least_pages: 0,
flush_commit_log_thorough_interval: 0,
commit_commit_log_thorough_interval: 0,
flush_commit_log_thorough_interval: 1000 * 10,
commit_commit_log_thorough_interval: 200,
flush_consume_queue_thorough_interval: 0,
max_transfer_bytes_on_message_in_memory: 0,
max_transfer_count_on_message_in_memory: 0,
Expand All @@ -299,7 +299,7 @@ impl Default for MessageStoreConfig {
ha_master_address: None,
ha_max_gap_not_in_sync: 0,
broker_role: Default::default(),
flush_disk_type: Default::default(),
flush_disk_type: FlushDiskType::SyncFlush,
sync_flush_timeout: 1000 * 5,
put_message_timeout: 0,
slave_timeout: 0,
Expand Down
18 changes: 18 additions & 0 deletions rocketmq-store/src/consume_queue/mapped_file_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,24 @@ impl MappedFileQueue {
true
}

pub fn commit(&self, commit_least_pages: i32) -> bool {
let mut result = true;
let committed_where = self.get_committed_where();
if let Some(mapped_file) =
self.find_mapped_file_by_offset(committed_where, committed_where == 0)
{
let offset = mapped_file.commit(commit_least_pages);
let whered = mapped_file.get_file_from_offset() + offset as u64;
result = whered == self.get_committed_where() as u64;
self.set_committed_where(whered as i64);
}
result
}

pub fn get_committed_where(&self) -> i64 {
self.committed_where.load(Ordering::Acquire) as i64
}

pub fn check_self(&self) {
println!("mapped_file_queue check self unimplemented")
}
Expand Down
16 changes: 9 additions & 7 deletions rocketmq-store/src/log_file/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

use std::{cell::Cell, collections::HashMap, mem, sync::Arc, thread};
use std::{cell::Cell, collections::HashMap, mem, sync::Arc};

use bytes::{Buf, Bytes, BytesMut};
use rocketmq_common::{
Expand All @@ -38,7 +38,7 @@ use rocketmq_common::{
},
UtilAll::time_millis_to_human_string,
};
use tokio::{runtime::Handle, time::Instant};
use tokio::time::Instant;
use tracing::{error, info, warn};

use crate::{
Expand Down Expand Up @@ -222,12 +222,14 @@ impl CommitLog {
}

pub fn start(&mut self) {
let handle = Handle::current();
let flush_manager = self.flush_manager.clone();
thread::spawn(move || {
handle.block_on(async move {
flush_manager.lock().await.start();
});
tokio::spawn(async move {
let flush_manager_weak = Arc::downgrade(&flush_manager);
let mut guard = flush_manager.lock().await;
if let Some(service) = guard.commit_real_time_service_mut() {
service.set_flush_manager(Some(flush_manager_weak))
}
guard.start();
});
}

Expand Down
Loading

0 comments on commit dc31c0b

Please sign in to comment.