Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1012]🚀Support client Broadcasting consume-local file store⚡️ #1013

Merged
merged 2 commits into from
Sep 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rocketmq-client/src/consumer/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

mod controllable_offset;
pub(crate) mod local_file_offset_store;
mod offset_serialize_wrapper;
pub(crate) mod offset_store;
pub(crate) mod read_offset_type;
pub(crate) mod remote_broker_offset_store;
7 changes: 7 additions & 0 deletions rocketmq-client/src/consumer/store/controllable_offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ impl ControllableOffset {
}
}

pub fn new_atomic(value: AtomicI64) -> Self {
Self {
value: Arc::new(value),
allow_to_update: Arc::new(AtomicBool::new(true)),
}
}

pub fn update(&self, target: i64, increase_only: bool) {
if self.allow_to_update.load(Ordering::SeqCst) {
self.value
Expand Down
206 changes: 194 additions & 12 deletions rocketmq-client/src/consumer/store/local_file_offset_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,53 +16,235 @@
*/
use std::collections::HashMap;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use once_cell::sync::Lazy;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::utils::file_utils;
use rocketmq_common::ArcRefCellWrapper;
use rocketmq_remoting::protocol::RemotingDeserializable;
use rocketmq_remoting::protocol::RemotingSerializable;
use tokio::sync::Mutex;
use tracing::error;
use tracing::info;

use crate::consumer::store::controllable_offset::ControllableOffset;
use crate::consumer::store::offset_serialize_wrapper::OffsetSerializeWrapper;
use crate::consumer::store::offset_store::OffsetStoreTrait;
use crate::consumer::store::read_offset_type::ReadOffsetType;
use crate::error::MQClientError;
use crate::factory::mq_client_instance::MQClientInstance;
use crate::Result;

pub struct LocalFileOffsetStore;
static LOCAL_OFFSET_STORE_DIR: Lazy<PathBuf> = Lazy::new(|| {
#[cfg(target_os = "windows")]
let home = std::env::var("USERPROFILE")
.map_or(PathBuf::from("C:\\tmp\\.rocketmq_offsets"), |home| {
PathBuf::from(home).join(".rocketmq_offsets")
});

#[cfg(not(target_os = "windows"))]
let home = std::env::var("HOME").map_or(PathBuf::from("/tmp/.rocketmq_offsets"), |home| {
PathBuf::from(home).join(".rocketmq_offsets")
});

std::env::var("rocketmq.client.localOffsetStoreDir").map_or(home, PathBuf::from)
});

pub struct LocalFileOffsetStore {
client_instance: ArcRefCellWrapper<MQClientInstance>,
group_name: String,
store_path: String,
offset_table: Arc<Mutex<HashMap<MessageQueue, ControllableOffset>>>,
}

impl LocalFileOffsetStore {
pub fn new(mq_client_factory: ArcRefCellWrapper<MQClientInstance>, group_name: String) -> Self {
Self
pub fn new(client_instance: ArcRefCellWrapper<MQClientInstance>, group_name: String) -> Self {
Self {
client_instance,
group_name,
store_path: LOCAL_OFFSET_STORE_DIR
.clone()
.join("offsets.json")
.to_string_lossy()
.to_string(),
offset_table: Arc::new(Mutex::new(HashMap::new())),
}
}

fn read_local_offset(&self) -> Result<Option<OffsetSerializeWrapper>> {
let content =
file_utils::file_to_string(&self.store_path).map_or("".to_string(), |content| content);
if content.is_empty() {
self.read_local_offset_bak()
} else {
match OffsetSerializeWrapper::decode(content.as_bytes()) {
Ok(value) => Ok(Some(value)),
Err(e) => Err(MQClientError::MQClientErr(
-1,
format!("Failed to deserialize local offset: {}", e),
)),
}
}
}
fn read_local_offset_bak(&self) -> Result<Option<OffsetSerializeWrapper>> {
let content = file_utils::file_to_string(&format!("{}{}", self.store_path, ".bak"))
.map_or("".to_string(), |content| content);
if content.is_empty() {
Ok(None)
} else {
match OffsetSerializeWrapper::decode(content.as_bytes()) {
Ok(value) => Ok(Some(value)),
Err(_) => Err(MQClientError::MQClientErr(
-1,
format!("read local offset bak failed, content: {}", content),
)),
}
}
Comment on lines +78 to +106
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

🛠️ Refactor suggestion

Error Handling and Security Concerns Remain in read_local_offset Methods.

The current implementation still defaults to an empty string when file reading fails in read_local_offset, and error messages in read_local_offset_bak include the full file content, which poses security risks and can lead to large log entries.

Please address the following:

  • read_local_offset (Lines 78-92): Handle file reading errors explicitly instead of defaulting to an empty string.
  • read_local_offset_bak (Lines 93-107): Avoid including the full file content in error messages to mitigate security risks.
🔗 Analysis chain

Improve error handling and file reading logic.

While the error handling has been improved, there are still some areas that could be enhanced:

  1. In read_local_offset, consider handling the file reading error explicitly instead of defaulting to an empty string.
  2. In read_local_offset_bak, the error message still includes the full file content, which could be a security risk and cause large log entries.

Consider refactoring these methods as follows:

fn read_local_offset(&self) -> Result<Option<OffsetSerializeWrapper>> {
    match file_utils::file_to_string(&self.store_path) {
        Ok(content) if !content.is_empty() => {
            OffsetSerializeWrapper::decode(content.as_bytes())
                .map(Some)
                .map_err(|e| MQClientError::MQClientErr(-1, format!("Failed to deserialize local offset: {}", e)))
        }
        Ok(_) => self.read_local_offset_bak(),
        Err(e) => {
            error!("Failed to read offset file {}: {}", self.store_path, e);
            self.read_local_offset_bak()
        }
    }
}

fn read_local_offset_bak(&self) -> Result<Option<OffsetSerializeWrapper>> {
    let bak_path = format!("{}.bak", self.store_path);
    match file_utils::file_to_string(&bak_path) {
        Ok(content) if !content.is_empty() => {
            OffsetSerializeWrapper::decode(content.as_bytes())
                .map(Some)
                .map_err(|e| MQClientError::MQClientErr(-1, format!("Failed to deserialize backup local offset: {}", e)))
        }
        Ok(_) => Ok(None),
        Err(e) => {
            error!("Failed to read backup offset file {}: {}", bak_path, e);
            Ok(None)
        }
    }
}

This refactoring improves error handling, provides more informative error messages, and avoids potential security risks.

To ensure these changes don't introduce regressions, please run the following test script:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Verify that the offset reading methods are called and handle errors correctly
rg --type rust -e "read_local_offset" -e "read_local_offset_bak" rocketmq-client/src/consumer/store/local_file_offset_store.rs

Length of output: 1320


Script:

#!/bin/bash
# Verify that the read_local_offset and read_local_offset_bak methods are present and correctly implemented
rg --type rust "fn read_local_offset|fn read_local_offset_bak" rocketmq-client/src/consumer/store/local_file_offset_store.rs

Length of output: 282


Script:

#!/bin/bash
# Extract the implementation of the `read_local_offset` method
ast-grep --lang rust --pattern 'fn read_local_offset(&self) -> Result<Option<OffsetSerializeWrapper>> { $$$ }' rocketmq-client/src/consumer/store/local_file_offset_store.rs

# Extract the implementation of the `read_local_offset_bak` method
ast-grep --lang rust --pattern 'fn read_local_offset_bak(&self) -> Result<Option<OffsetSerializeWrapper>> { $$$ }' rocketmq-client/src/consumer/store/local_file_offset_store.rs

Length of output: 3552

}
}

impl OffsetStoreTrait for LocalFileOffsetStore {
async fn load(&self) -> crate::Result<()> {
todo!()
let offset_serialize_wrapper = self.read_local_offset()?;
if let Some(offset_serialize_wrapper) = offset_serialize_wrapper {
let offset_table = offset_serialize_wrapper.offset_table;
let mut offset_table_inner = self.offset_table.lock().await;
for (mq, offset) in offset_table {
let offset = offset.load(Ordering::Relaxed);
info!(
"load consumer's offset, {} {} {}",
self.group_name, mq, offset
);
offset_table_inner.insert(mq, ControllableOffset::new(offset));
}
}
Ok(())
}

async fn update_offset(&self, mq: &MessageQueue, offset: i64, increase_only: bool) {
todo!()
let mut offset_table = self.offset_table.lock().await;
let offset_old = offset_table
.entry(mq.clone())
.or_insert_with(|| ControllableOffset::new(offset));
if increase_only {
offset_old.update(offset, true);
} else {
offset_old.update_unconditionally(offset);
}
}

async fn update_and_freeze_offset(&self, mq: &MessageQueue, offset: i64) {
todo!()
let mut offset_table = self.offset_table.lock().await;
offset_table
.entry(mq.clone())
.or_insert_with(|| ControllableOffset::new(offset))
.update_and_freeze(offset);
}

async fn read_offset(&self, mq: &MessageQueue, type_: ReadOffsetType) -> i64 {
todo!()
match type_ {
ReadOffsetType::ReadFromMemory | ReadOffsetType::MemoryFirstThenStore => {
let offset_table = self.offset_table.lock().await;
if let Some(offset) = offset_table.get(mq) {
offset.get_offset()
} else {
-1
}
}
ReadOffsetType::ReadFromStore => match self.read_local_offset() {
Ok(offset_serialize_wrapper) => {
if let Some(offset_serialize_wrapper) = offset_serialize_wrapper {
if let Some(offset) = offset_serialize_wrapper.offset_table.get(mq) {
offset.load(Ordering::Relaxed)
} else {
-1
}
} else {
-1
}
}
Err(_) => -1,
},
}
}

async fn persist_all(&mut self, mqs: &HashSet<MessageQueue>) {
todo!()
if mqs.is_empty() {
return;
}
let mut offset_serialize_wrapper = match self.read_local_offset() {
Ok(value) => value.unwrap_or_default(),
Err(e) => {
error!("read local offset failed: {}", e);
return;
}
};
Comment on lines +179 to +185
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Consider handling read errors in persist_all without early return

If read_local_offset fails, persist_all logs an error and returns early, which could prevent offsets from being persisted. Consider proceeding with an empty OffsetSerializeWrapper to ensure that the offsets are persisted, even if the existing local offsets cannot be read.

Apply this diff to handle read errors gracefully:

 let mut offset_serialize_wrapper = match self.read_local_offset() {
     Ok(value) => value.unwrap_or_default(),
     Err(e) => {
         error!("read local offset failed: {}", e);
-        return;
+        OffsetSerializeWrapper::default()
     }
 };

Similarly, consider updating the persist method to handle errors without early return.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut offset_serialize_wrapper = match self.read_local_offset() {
Ok(value) => value.unwrap_or_default(),
Err(e) => {
error!("read local offset failed: {}", e);
return;
}
};
let mut offset_serialize_wrapper = match self.read_local_offset() {
Ok(value) => value.unwrap_or_default(),
Err(e) => {
error!("read local offset failed: {}", e);
OffsetSerializeWrapper::default()
}
};

let offset_table = self.offset_table.lock().await;
for (mq, offset) in offset_table.iter() {
if mqs.contains(mq) {
offset_serialize_wrapper
.offset_table
.insert(mq.clone(), AtomicI64::new(offset.get_offset()));
}
}
let content = offset_serialize_wrapper.to_json_pretty();
if !content.is_empty() {
if let Err(e) = file_utils::string_to_file(&content, &self.store_path) {
error!(
"persistAll consumer offset Exception, {},{}",
self.store_path, e
);
}
}
}

async fn persist(&mut self, mq: &MessageQueue) {
todo!()
let offset_table = self.offset_table.lock().await;
if let Some(offset) = offset_table.get(mq) {
let mut offset_serialize_wrapper = match self.read_local_offset() {
Ok(value) => value.unwrap_or_default(),
Err(e) => {
error!("read local offset failed: {}", e);
return;
}
};
offset_serialize_wrapper
.offset_table
.insert(mq.clone(), AtomicI64::new(offset.get_offset()));
let content = offset_serialize_wrapper.to_json_pretty();
if !content.is_empty() {
if let Err(e) = file_utils::string_to_file(&content, &self.store_path) {
error!(
"persist consumer offset Exception, {},{}",
self.store_path, e
);
}
}
}
}

async fn remove_offset(&self, mq: &MessageQueue) {
todo!()
let mut offset_table = self.offset_table.lock().await;
offset_table.remove(mq);
info!(
"remove unnecessary messageQueue offset. group={}, mq={}, offsetTableSize={}",
mq,
self.group_name,
offset_table.len()
);
}

async fn clone_offset_table(&self, topic: &str) -> HashMap<MessageQueue, i64> {
todo!()
let offset_table = self.offset_table.lock().await;
offset_table
.iter()
.filter(|(mq, _)| topic.is_empty() || mq.get_topic() == topic)
.map(|(mq, offset)| (mq.clone(), offset.get_offset()))
.collect()
}

async fn update_consume_offset_to_broker(
Expand All @@ -71,6 +253,6 @@ impl OffsetStoreTrait for LocalFileOffsetStore {
offset: i64,
is_oneway: bool,
) -> crate::Result<()> {
todo!()
Ok(())
}
}
28 changes: 28 additions & 0 deletions rocketmq-client/src/consumer/store/offset_serialize_wrapper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashMap;
use std::sync::atomic::AtomicI64;

use rocketmq_common::common::message::message_queue::MessageQueue;
use serde::Deserialize;
use serde::Serialize;

#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct OffsetSerializeWrapper {
pub offset_table: HashMap<MessageQueue, AtomicI64>,
}
Loading