-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #1012]🚀Support client Broadcasting consume-local file store⚡️ #1013
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -16,53 +16,235 @@ | |||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||
use std::collections::HashMap; | ||||||||||||||||||||||||||||||
use std::collections::HashSet; | ||||||||||||||||||||||||||||||
use std::path::PathBuf; | ||||||||||||||||||||||||||||||
use std::sync::atomic::AtomicI64; | ||||||||||||||||||||||||||||||
use std::sync::atomic::Ordering; | ||||||||||||||||||||||||||||||
use std::sync::Arc; | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
use once_cell::sync::Lazy; | ||||||||||||||||||||||||||||||
use rocketmq_common::common::message::message_queue::MessageQueue; | ||||||||||||||||||||||||||||||
use rocketmq_common::utils::file_utils; | ||||||||||||||||||||||||||||||
use rocketmq_common::ArcRefCellWrapper; | ||||||||||||||||||||||||||||||
use rocketmq_remoting::protocol::RemotingDeserializable; | ||||||||||||||||||||||||||||||
use rocketmq_remoting::protocol::RemotingSerializable; | ||||||||||||||||||||||||||||||
use tokio::sync::Mutex; | ||||||||||||||||||||||||||||||
use tracing::error; | ||||||||||||||||||||||||||||||
use tracing::info; | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
use crate::consumer::store::controllable_offset::ControllableOffset; | ||||||||||||||||||||||||||||||
use crate::consumer::store::offset_serialize_wrapper::OffsetSerializeWrapper; | ||||||||||||||||||||||||||||||
use crate::consumer::store::offset_store::OffsetStoreTrait; | ||||||||||||||||||||||||||||||
use crate::consumer::store::read_offset_type::ReadOffsetType; | ||||||||||||||||||||||||||||||
use crate::error::MQClientError; | ||||||||||||||||||||||||||||||
use crate::factory::mq_client_instance::MQClientInstance; | ||||||||||||||||||||||||||||||
use crate::Result; | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
pub struct LocalFileOffsetStore; | ||||||||||||||||||||||||||||||
static LOCAL_OFFSET_STORE_DIR: Lazy<PathBuf> = Lazy::new(|| { | ||||||||||||||||||||||||||||||
#[cfg(target_os = "windows")] | ||||||||||||||||||||||||||||||
let home = std::env::var("USERPROFILE") | ||||||||||||||||||||||||||||||
.map_or(PathBuf::from("C:\\tmp\\.rocketmq_offsets"), |home| { | ||||||||||||||||||||||||||||||
PathBuf::from(home).join(".rocketmq_offsets") | ||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
#[cfg(not(target_os = "windows"))] | ||||||||||||||||||||||||||||||
let home = std::env::var("HOME").map_or(PathBuf::from("/tmp/.rocketmq_offsets"), |home| { | ||||||||||||||||||||||||||||||
PathBuf::from(home).join(".rocketmq_offsets") | ||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
std::env::var("rocketmq.client.localOffsetStoreDir").map_or(home, PathBuf::from) | ||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
pub struct LocalFileOffsetStore { | ||||||||||||||||||||||||||||||
client_instance: ArcRefCellWrapper<MQClientInstance>, | ||||||||||||||||||||||||||||||
group_name: String, | ||||||||||||||||||||||||||||||
store_path: String, | ||||||||||||||||||||||||||||||
offset_table: Arc<Mutex<HashMap<MessageQueue, ControllableOffset>>>, | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
impl LocalFileOffsetStore { | ||||||||||||||||||||||||||||||
pub fn new(mq_client_factory: ArcRefCellWrapper<MQClientInstance>, group_name: String) -> Self { | ||||||||||||||||||||||||||||||
Self | ||||||||||||||||||||||||||||||
pub fn new(client_instance: ArcRefCellWrapper<MQClientInstance>, group_name: String) -> Self { | ||||||||||||||||||||||||||||||
Self { | ||||||||||||||||||||||||||||||
client_instance, | ||||||||||||||||||||||||||||||
group_name, | ||||||||||||||||||||||||||||||
store_path: LOCAL_OFFSET_STORE_DIR | ||||||||||||||||||||||||||||||
.clone() | ||||||||||||||||||||||||||||||
.join("offsets.json") | ||||||||||||||||||||||||||||||
.to_string_lossy() | ||||||||||||||||||||||||||||||
.to_string(), | ||||||||||||||||||||||||||||||
offset_table: Arc::new(Mutex::new(HashMap::new())), | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
fn read_local_offset(&self) -> Result<Option<OffsetSerializeWrapper>> { | ||||||||||||||||||||||||||||||
let content = | ||||||||||||||||||||||||||||||
file_utils::file_to_string(&self.store_path).map_or("".to_string(), |content| content); | ||||||||||||||||||||||||||||||
if content.is_empty() { | ||||||||||||||||||||||||||||||
self.read_local_offset_bak() | ||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||
match OffsetSerializeWrapper::decode(content.as_bytes()) { | ||||||||||||||||||||||||||||||
Ok(value) => Ok(Some(value)), | ||||||||||||||||||||||||||||||
Err(e) => Err(MQClientError::MQClientErr( | ||||||||||||||||||||||||||||||
-1, | ||||||||||||||||||||||||||||||
format!("Failed to deserialize local offset: {}", e), | ||||||||||||||||||||||||||||||
)), | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
fn read_local_offset_bak(&self) -> Result<Option<OffsetSerializeWrapper>> { | ||||||||||||||||||||||||||||||
let content = file_utils::file_to_string(&format!("{}{}", self.store_path, ".bak")) | ||||||||||||||||||||||||||||||
.map_or("".to_string(), |content| content); | ||||||||||||||||||||||||||||||
if content.is_empty() { | ||||||||||||||||||||||||||||||
Ok(None) | ||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||
match OffsetSerializeWrapper::decode(content.as_bytes()) { | ||||||||||||||||||||||||||||||
Ok(value) => Ok(Some(value)), | ||||||||||||||||||||||||||||||
Err(_) => Err(MQClientError::MQClientErr( | ||||||||||||||||||||||||||||||
-1, | ||||||||||||||||||||||||||||||
format!("read local offset bak failed, content: {}", content), | ||||||||||||||||||||||||||||||
)), | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
impl OffsetStoreTrait for LocalFileOffsetStore { | ||||||||||||||||||||||||||||||
async fn load(&self) -> crate::Result<()> { | ||||||||||||||||||||||||||||||
todo!() | ||||||||||||||||||||||||||||||
let offset_serialize_wrapper = self.read_local_offset()?; | ||||||||||||||||||||||||||||||
if let Some(offset_serialize_wrapper) = offset_serialize_wrapper { | ||||||||||||||||||||||||||||||
let offset_table = offset_serialize_wrapper.offset_table; | ||||||||||||||||||||||||||||||
let mut offset_table_inner = self.offset_table.lock().await; | ||||||||||||||||||||||||||||||
for (mq, offset) in offset_table { | ||||||||||||||||||||||||||||||
let offset = offset.load(Ordering::Relaxed); | ||||||||||||||||||||||||||||||
info!( | ||||||||||||||||||||||||||||||
"load consumer's offset, {} {} {}", | ||||||||||||||||||||||||||||||
self.group_name, mq, offset | ||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||
offset_table_inner.insert(mq, ControllableOffset::new(offset)); | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
Ok(()) | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
async fn update_offset(&self, mq: &MessageQueue, offset: i64, increase_only: bool) { | ||||||||||||||||||||||||||||||
todo!() | ||||||||||||||||||||||||||||||
let mut offset_table = self.offset_table.lock().await; | ||||||||||||||||||||||||||||||
let offset_old = offset_table | ||||||||||||||||||||||||||||||
.entry(mq.clone()) | ||||||||||||||||||||||||||||||
.or_insert_with(|| ControllableOffset::new(offset)); | ||||||||||||||||||||||||||||||
if increase_only { | ||||||||||||||||||||||||||||||
offset_old.update(offset, true); | ||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||
offset_old.update_unconditionally(offset); | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
async fn update_and_freeze_offset(&self, mq: &MessageQueue, offset: i64) { | ||||||||||||||||||||||||||||||
todo!() | ||||||||||||||||||||||||||||||
let mut offset_table = self.offset_table.lock().await; | ||||||||||||||||||||||||||||||
offset_table | ||||||||||||||||||||||||||||||
.entry(mq.clone()) | ||||||||||||||||||||||||||||||
.or_insert_with(|| ControllableOffset::new(offset)) | ||||||||||||||||||||||||||||||
.update_and_freeze(offset); | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
async fn read_offset(&self, mq: &MessageQueue, type_: ReadOffsetType) -> i64 { | ||||||||||||||||||||||||||||||
todo!() | ||||||||||||||||||||||||||||||
match type_ { | ||||||||||||||||||||||||||||||
ReadOffsetType::ReadFromMemory | ReadOffsetType::MemoryFirstThenStore => { | ||||||||||||||||||||||||||||||
let offset_table = self.offset_table.lock().await; | ||||||||||||||||||||||||||||||
if let Some(offset) = offset_table.get(mq) { | ||||||||||||||||||||||||||||||
offset.get_offset() | ||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||
-1 | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
ReadOffsetType::ReadFromStore => match self.read_local_offset() { | ||||||||||||||||||||||||||||||
Ok(offset_serialize_wrapper) => { | ||||||||||||||||||||||||||||||
if let Some(offset_serialize_wrapper) = offset_serialize_wrapper { | ||||||||||||||||||||||||||||||
if let Some(offset) = offset_serialize_wrapper.offset_table.get(mq) { | ||||||||||||||||||||||||||||||
offset.load(Ordering::Relaxed) | ||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||
-1 | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||
-1 | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
Err(_) => -1, | ||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
async fn persist_all(&mut self, mqs: &HashSet<MessageQueue>) { | ||||||||||||||||||||||||||||||
todo!() | ||||||||||||||||||||||||||||||
if mqs.is_empty() { | ||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
let mut offset_serialize_wrapper = match self.read_local_offset() { | ||||||||||||||||||||||||||||||
Ok(value) => value.unwrap_or_default(), | ||||||||||||||||||||||||||||||
Err(e) => { | ||||||||||||||||||||||||||||||
error!("read local offset failed: {}", e); | ||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||
Comment on lines
+179
to
+185
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider handling read errors in If Apply this diff to handle read errors gracefully: let mut offset_serialize_wrapper = match self.read_local_offset() {
Ok(value) => value.unwrap_or_default(),
Err(e) => {
error!("read local offset failed: {}", e);
- return;
+ OffsetSerializeWrapper::default()
}
}; Similarly, consider updating the 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||
let offset_table = self.offset_table.lock().await; | ||||||||||||||||||||||||||||||
for (mq, offset) in offset_table.iter() { | ||||||||||||||||||||||||||||||
if mqs.contains(mq) { | ||||||||||||||||||||||||||||||
offset_serialize_wrapper | ||||||||||||||||||||||||||||||
.offset_table | ||||||||||||||||||||||||||||||
.insert(mq.clone(), AtomicI64::new(offset.get_offset())); | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
let content = offset_serialize_wrapper.to_json_pretty(); | ||||||||||||||||||||||||||||||
if !content.is_empty() { | ||||||||||||||||||||||||||||||
if let Err(e) = file_utils::string_to_file(&content, &self.store_path) { | ||||||||||||||||||||||||||||||
error!( | ||||||||||||||||||||||||||||||
"persistAll consumer offset Exception, {},{}", | ||||||||||||||||||||||||||||||
self.store_path, e | ||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
async fn persist(&mut self, mq: &MessageQueue) { | ||||||||||||||||||||||||||||||
todo!() | ||||||||||||||||||||||||||||||
let offset_table = self.offset_table.lock().await; | ||||||||||||||||||||||||||||||
if let Some(offset) = offset_table.get(mq) { | ||||||||||||||||||||||||||||||
let mut offset_serialize_wrapper = match self.read_local_offset() { | ||||||||||||||||||||||||||||||
Ok(value) => value.unwrap_or_default(), | ||||||||||||||||||||||||||||||
Err(e) => { | ||||||||||||||||||||||||||||||
error!("read local offset failed: {}", e); | ||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||
offset_serialize_wrapper | ||||||||||||||||||||||||||||||
.offset_table | ||||||||||||||||||||||||||||||
.insert(mq.clone(), AtomicI64::new(offset.get_offset())); | ||||||||||||||||||||||||||||||
let content = offset_serialize_wrapper.to_json_pretty(); | ||||||||||||||||||||||||||||||
if !content.is_empty() { | ||||||||||||||||||||||||||||||
if let Err(e) = file_utils::string_to_file(&content, &self.store_path) { | ||||||||||||||||||||||||||||||
error!( | ||||||||||||||||||||||||||||||
"persist consumer offset Exception, {},{}", | ||||||||||||||||||||||||||||||
self.store_path, e | ||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
async fn remove_offset(&self, mq: &MessageQueue) { | ||||||||||||||||||||||||||||||
todo!() | ||||||||||||||||||||||||||||||
let mut offset_table = self.offset_table.lock().await; | ||||||||||||||||||||||||||||||
offset_table.remove(mq); | ||||||||||||||||||||||||||||||
info!( | ||||||||||||||||||||||||||||||
"remove unnecessary messageQueue offset. group={}, mq={}, offsetTableSize={}", | ||||||||||||||||||||||||||||||
mq, | ||||||||||||||||||||||||||||||
self.group_name, | ||||||||||||||||||||||||||||||
offset_table.len() | ||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
async fn clone_offset_table(&self, topic: &str) -> HashMap<MessageQueue, i64> { | ||||||||||||||||||||||||||||||
todo!() | ||||||||||||||||||||||||||||||
let offset_table = self.offset_table.lock().await; | ||||||||||||||||||||||||||||||
offset_table | ||||||||||||||||||||||||||||||
.iter() | ||||||||||||||||||||||||||||||
.filter(|(mq, _)| topic.is_empty() || mq.get_topic() == topic) | ||||||||||||||||||||||||||||||
.map(|(mq, offset)| (mq.clone(), offset.get_offset())) | ||||||||||||||||||||||||||||||
.collect() | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
async fn update_consume_offset_to_broker( | ||||||||||||||||||||||||||||||
|
@@ -71,6 +253,6 @@ impl OffsetStoreTrait for LocalFileOffsetStore { | |||||||||||||||||||||||||||||
offset: i64, | ||||||||||||||||||||||||||||||
is_oneway: bool, | ||||||||||||||||||||||||||||||
) -> crate::Result<()> { | ||||||||||||||||||||||||||||||
todo!() | ||||||||||||||||||||||||||||||
Ok(()) | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
use std::collections::HashMap; | ||
use std::sync::atomic::AtomicI64; | ||
|
||
use rocketmq_common::common::message::message_queue::MessageQueue; | ||
use serde::Deserialize; | ||
use serde::Serialize; | ||
|
||
#[derive(Serialize, Deserialize, Debug, Default)] | ||
#[serde(rename_all = "camelCase")] | ||
pub struct OffsetSerializeWrapper { | ||
pub offset_table: HashMap<MessageQueue, AtomicI64>, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
🛠️ Refactor suggestion
Error Handling and Security Concerns Remain in
read_local_offset
Methods.The current implementation still defaults to an empty string when file reading fails in
read_local_offset
, and error messages inread_local_offset_bak
include the full file content, which poses security risks and can lead to large log entries.Please address the following:
read_local_offset
(Lines 78-92): Handle file reading errors explicitly instead of defaulting to an empty string.read_local_offset_bak
(Lines 93-107): Avoid including the full file content in error messages to mitigate security risks.🔗 Analysis chain
Improve error handling and file reading logic.
While the error handling has been improved, there are still some areas that could be enhanced:
read_local_offset
, consider handling the file reading error explicitly instead of defaulting to an empty string.read_local_offset_bak
, the error message still includes the full file content, which could be a security risk and cause large log entries.Consider refactoring these methods as follows:
This refactoring improves error handling, provides more informative error messages, and avoids potential security risks.
To ensure these changes don't introduce regressions, please run the following test script:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
Length of output: 1320
Script:
Length of output: 282
Script:
Length of output: 3552