Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1012]🚀Support client Broadcasting consume-local file store⚡️ #1013

Merged
merged 2 commits into from
Sep 28, 2024

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Sep 28, 2024

Which Issue(s) This PR Fixes(Closes)

Fixes #1012

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Enhanced LocalFileOffsetStore with comprehensive methods for managing offsets, including reading, updating, and persisting.
    • Introduced a new static variable to determine the local offset store directory based on the operating system.
    • Improved thread safety with the use of mutex-protected data structures.
  • Bug Fixes

    • Improved error handling in offset reading methods.
  • Documentation

    • Updated documentation to reflect new methods and features.

Copy link
Contributor

coderabbitai bot commented Sep 28, 2024

Walkthrough

The changes enhance the LocalFileOffsetStore in the RocketMQ client by introducing a static variable for the local offset store directory, updating the struct to include new fields for offset management, and implementing multiple methods for reading, updating, and persisting offsets. These modifications ensure thread safety and improve overall functionality in managing offsets.

Changes

Files Change Summary
rocketmq-client/src/consumer/store/local_file_offset_store.rs Major enhancements to LocalFileOffsetStore, including the addition of new fields for client instance, group name, store path, and offset table, as well as new methods for reading and persisting offsets. Method signatures updated for improved functionality.

Assessment against linked issues

Objective Addressed Explanation
Support client Broadcasting consume-local file store (#1012)

Possibly related PRs

Poem

🐰 In the meadow, offsets bloom,
With new wrappers, there's more room.
Atomic values hold so tight,
Local stores now shine so bright.
RocketMQ hops, a joyful cheer,
For every change, we hold so dear! 🌼


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@mxsm
Copy link
Owner Author

mxsm commented Sep 28, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Outside diff range and nitpick comments (4)
rocketmq-client/src/consumer/store.rs (1)

20-20: LGTM! Consider adding a brief comment for the new module.

The addition of the offset_serialize_wrapper module aligns with the PR objective of supporting client broadcasting for consuming local file store. This change looks good and doesn't impact existing functionality.

Consider adding a brief comment above the new module declaration to explain its purpose. This would improve code documentation and make it easier for other developers to understand the module's role. For example:

// Module for handling serialization of offsets
mod offset_serialize_wrapper;
rocketmq-client/src/consumer/store/offset_serialize_wrapper.rs (1)

24-28: LGTM: Well-structured OffsetSerializeWrapper, with a minor consideration.

The OffsetSerializeWrapper struct is well-designed for serializing and deserializing offset data. The use of AtomicI64 for offset values suggests thread-safe operations, which is a good practice. The camelCase renaming is appropriate for maintaining consistency with other systems or languages that use camel case naming conventions.

However, consider if the Default derive is necessary or appropriate for this struct. An empty HashMap as the default might not be the most meaningful default state for this wrapper. If a specific initial state is required, it might be better to implement Default manually or provide a custom initialization method.

rocketmq-client/src/consumer/store/controllable_offset.rs (1)

35-40: LGTM! Consider adding documentation.

The new new_atomic method is a valuable addition, providing more flexibility in initializing ControllableOffset. The implementation is consistent with the existing new method and maintains the expected behavior.

Consider adding a doc comment to explain the purpose and use case of this new method. For example:

/// Creates a new `ControllableOffset` with a provided `AtomicI64`.
///
/// This method allows for more flexible initialization when an `AtomicI64` is already available.
///
/// # Arguments
///
/// * `value` - An `AtomicI64` to be used as the initial offset value.
pub fn new_atomic(value: AtomicI64) -> Self {
    // ... (existing implementation)
}
rocketmq-client/src/consumer/store/local_file_offset_store.rs (1)

256-258: Include a comment explaining the empty implementation of update_consume_offset_to_broker

The update_consume_offset_to_broker method is empty in this implementation. Adding a comment to indicate that it's intentionally left empty can improve code readability and maintainability.

You can add a comment like:

// LocalFileOffsetStore does not update consume offset to broker.
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 5cefbc9 and 8c74161.

📒 Files selected for processing (4)
  • rocketmq-client/src/consumer/store.rs (1 hunks)
  • rocketmq-client/src/consumer/store/controllable_offset.rs (1 hunks)
  • rocketmq-client/src/consumer/store/local_file_offset_store.rs (2 hunks)
  • rocketmq-client/src/consumer/store/offset_serialize_wrapper.rs (1 hunks)
🔇 Additional comments (16)
rocketmq-client/src/consumer/store/offset_serialize_wrapper.rs (2)

1-22: LGTM: License and imports are appropriate.

The file includes the correct Apache 2.0 license, which is suitable for the Apache RocketMQ project. The imports are relevant to the functionality being implemented, including necessary standard library components and project-specific modules.


1-28: Consider future enhancements and provide usage context.

The OffsetSerializeWrapper struct is a good foundation for managing offsets in a serializable format, which aligns with the PR objectives. However, to ensure its effectiveness and ease of use, consider the following suggestions:

  1. Implement methods for adding, updating, and retrieving offsets from the offset_table.
  2. Add documentation comments to explain the purpose and usage of this struct.
  3. Consider implementing the Clone trait if deep copying of the wrapper might be needed.

Additionally, it would be helpful to provide more context on how this struct will be used within the broader system. This information could guide further optimizations or additional functionality that might be necessary.

To better understand the usage and potential impact of this new struct, let's check for any references to OffsetSerializeWrapper in the codebase:

✅ Verification successful

Review Comment Clarification

Upon reviewing the git diff, the offset_serialize_wrapper.rs file is newly added with lines 1-28 entirely introducing new code. There are no removed or replaced lines in this section. Therefore, the initial observation about the absence of removed or replaced code is accurate.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for usage of OffsetSerializeWrapper
rg --type rust "OffsetSerializeWrapper"

Length of output: 1026


Script:

#!/bin/bash
# Get git diff for lines 1-28 in offset_serialize_wrapper.rs
git diff HEAD~1 HEAD -- rocketmq-client/src/consumer/store/offset_serialize_wrapper.rs | sed -n '1,28p'

Length of output: 1391

rocketmq-client/src/consumer/store/controllable_offset.rs (1)

35-40: Verify the usage of the new new_atomic method in the codebase.

The addition of the new_atomic method enhances the flexibility of ControllableOffset initialization, which aligns well with the PR objectives. It maintains backward compatibility while providing a new option for scenarios where an AtomicI64 is already available.

To ensure proper integration, please run the following script to check for any usage of the new method:

This will help identify if the new method is being utilized effectively and if there are any other places in the codebase where it could potentially be beneficial.

✅ Verification successful

Usage of the new new_atomic method is correctly confined to controllable_offset.rs and does not interfere with existing codebase functionalities.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for usage of the new_atomic method in the codebase

# Test: Search for new_atomic method usage
rg --type rust 'ControllableOffset::new_atomic'

# Test: Search for potential places where new_atomic could be used (AtomicI64 creation)
rg --type rust 'AtomicI64::new'

Length of output: 6164

rocketmq-client/src/consumer/store/local_file_offset_store.rs (13)

19-22: LGTM!

The addition of necessary imports for PathBuf, AtomicI64, Ordering, and Arc enhances the functionality and thread safety of the code.


24-35: LGTM!

The imports of Lazy, ControllableOffset, and OffsetSerializeWrapper are appropriate for implementing lazy initialization and handling offset serialization.


38-40: LGTM!

Importing MQClientError and Result is appropriate for error handling within the module.


57-62: Definition of LocalFileOffsetStore struct looks good

The struct fields are appropriately defined to support local offset storage functionality.


65-76: Constructor new properly initializes LocalFileOffsetStore

The new method correctly initializes the store_path and offset_table.


94-106: LGTM!

The read_local_offset_bak method appropriately attempts to read the backup offset file and handles errors.


112-125: load method correctly loads offsets from local storage

The load function reads the local offset and populates the offset_table as expected.


129-137: update_offset method properly updates offsets

The method correctly handles both increase_only scenarios and updates the offsets.


141-145: update_and_freeze_offset method correctly updates and freezes offset

The method updates the offset and freezes it as intended.


149-172: read_offset method correctly retrieves offsets

The function handles different ReadOffsetType variants and retrieves offsets from memory or store appropriately.


206-227: persist method writes the offset of a specific message queue

The method correctly updates and writes the offset data for the specified MessageQueue.


231-238: remove_offset method correctly removes offset from the table

The method removes the offset for the given MessageQueue and logs the action.


242-247: clone_offset_table method correctly clones offsets

The method filters offset entries by topic and returns a cloned HashMap.

Comment on lines +179 to +185
let mut offset_serialize_wrapper = match self.read_local_offset() {
Ok(value) => value.unwrap_or_default(),
Err(e) => {
error!("read local offset failed: {}", e);
return;
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Consider handling read errors in persist_all without early return

If read_local_offset fails, persist_all logs an error and returns early, which could prevent offsets from being persisted. Consider proceeding with an empty OffsetSerializeWrapper to ensure that the offsets are persisted, even if the existing local offsets cannot be read.

Apply this diff to handle read errors gracefully:

 let mut offset_serialize_wrapper = match self.read_local_offset() {
     Ok(value) => value.unwrap_or_default(),
     Err(e) => {
         error!("read local offset failed: {}", e);
-        return;
+        OffsetSerializeWrapper::default()
     }
 };

Similarly, consider updating the persist method to handle errors without early return.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut offset_serialize_wrapper = match self.read_local_offset() {
Ok(value) => value.unwrap_or_default(),
Err(e) => {
error!("read local offset failed: {}", e);
return;
}
};
let mut offset_serialize_wrapper = match self.read_local_offset() {
Ok(value) => value.unwrap_or_default(),
Err(e) => {
error!("read local offset failed: {}", e);
OffsetSerializeWrapper::default()
}
};

Comment on lines 78 to 92
fn read_local_offset(&self) -> Result<Option<OffsetSerializeWrapper>> {
let content =
file_utils::file_to_string(&self.store_path).map_or("".to_string(), |content| content);
if content.is_empty() {
self.read_local_offset_bak()
} else {
match OffsetSerializeWrapper::decode(content.as_bytes()) {
Ok(value) => Ok(Some(value)),
Err(_) => Err(MQClientError::MQClientErr(
-1,
format!("read local offset failed, content: {}", content),
)),
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle file reading errors explicitly instead of defaulting to empty content

The read_local_offset function maps errors from reading the offset file to an empty string, which may inadvertently mask I/O errors such as permission denied or disk errors. This could lead to unexpected behavior, as the function then attempts to read the backup file or proceeds with empty content, potentially overwriting valid data. It's better to handle errors explicitly and propagate them appropriately.

Apply this diff to handle errors explicitly:

     fn read_local_offset(&self) -> Result<Option<OffsetSerializeWrapper>> {
-        let content =
-            file_utils::file_to_string(&self.store_path).map_or("".to_string(), |content| content);
-        if content.is_empty() {
-            self.read_local_offset_bak()
-        } else {
-            match OffsetSerializeWrapper::decode(content.as_bytes()) {
-                Ok(value) => Ok(Some(value)),
-                Err(_) => Err(MQClientError::MQClientErr(
-                    -1,
-                    format!("read local offset failed, content: {}", content),
-                )),
-            }
-        }
+        match file_utils::file_to_string(&self.store_path) {
+            Ok(content) if !content.is_empty() => {
+                match OffsetSerializeWrapper::decode(content.as_bytes()) {
+                    Ok(value) => Ok(Some(value)),
+                    Err(e) => Err(MQClientError::MQClientErr(
+                        -1,
+                        format!("Failed to deserialize local offset: {}", e),
+                    )),
+                }
+            }
+            Ok(_) => self.read_local_offset_bak(),
+            Err(e) => {
+                error!("Failed to read offset file {}: {}", self.store_path, e);
+                self.read_local_offset_bak()
+            }
+        }
     }

Similarly, update read_local_offset_bak to handle errors appropriately.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn read_local_offset(&self) -> Result<Option<OffsetSerializeWrapper>> {
let content =
file_utils::file_to_string(&self.store_path).map_or("".to_string(), |content| content);
if content.is_empty() {
self.read_local_offset_bak()
} else {
match OffsetSerializeWrapper::decode(content.as_bytes()) {
Ok(value) => Ok(Some(value)),
Err(_) => Err(MQClientError::MQClientErr(
-1,
format!("read local offset failed, content: {}", content),
)),
}
}
}
fn read_local_offset(&self) -> Result<Option<OffsetSerializeWrapper>> {
match file_utils::file_to_string(&self.store_path) {
Ok(content) if !content.is_empty() => {
match OffsetSerializeWrapper::decode(content.as_bytes()) {
Ok(value) => Ok(Some(value)),
Err(e) => Err(MQClientError::MQClientErr(
-1,
format!("Failed to deserialize local offset: {}", e),
)),
}
}
Ok(_) => self.read_local_offset_bak(),
Err(e) => {
error!("Failed to read offset file {}: {}", self.store_path, e);
self.read_local_offset_bak()
}
}
}

Copy link

codecov bot commented Sep 28, 2024

Codecov Report

Attention: Patch coverage is 0% with 68 lines in your changes missing coverage. Please review.

Project coverage is 19.88%. Comparing base (5cefbc9) to head (8349762).
Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
...ient/src/consumer/store/local_file_offset_store.rs 0.00% 60 Missing ⚠️
...q-client/src/consumer/store/controllable_offset.rs 0.00% 6 Missing ⚠️
...ent/src/consumer/store/offset_serialize_wrapper.rs 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1013      +/-   ##
==========================================
- Coverage   19.92%   19.88%   -0.05%     
==========================================
  Files         418      419       +1     
  Lines       34502    34567      +65     
==========================================
- Hits         6875     6872       -3     
- Misses      27627    27695      +68     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@mxsm
Copy link
Owner Author

mxsm commented Sep 28, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

@mxsm mxsm merged commit 98fe4d7 into main Sep 28, 2024
14 checks passed
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (2)
rocketmq-client/src/consumer/store/local_file_offset_store.rs (2)

19-55: LGTM! Consider using dirs crate for cross-platform home directory detection.

The new imports and the LOCAL_OFFSET_STORE_DIR static variable are well-implemented, providing good configurability and cross-platform support.

Consider using the dirs crate for a more robust cross-platform solution to detect the home directory. This can simplify the code and potentially handle edge cases better. Here's an example of how you could refactor this:

use dirs;

static LOCAL_OFFSET_STORE_DIR: Lazy<PathBuf> = Lazy::new(|| {
    let default_dir = dirs::home_dir()
        .unwrap_or_else(|| PathBuf::from("/tmp"))
        .join(".rocketmq_offsets");
    
    std::env::var("rocketmq.client.localOffsetStoreDir")
        .map(PathBuf::from)
        .unwrap_or(default_dir)
});

This approach reduces the need for platform-specific code and handles cases where the home directory might not be available.


111-125: Consider improving error handling in the load method.

The load method implementation is generally good, but the error handling could be enhanced to provide more context.

Consider updating the error handling as follows:

async fn load(&self) -> crate::Result<()> {
    let offset_serialize_wrapper = self.read_local_offset()
        .map_err(|e| MQClientError::MQClientErr(-1, format!("Failed to load local offset: {}", e)))?;
    
    if let Some(offset_serialize_wrapper) = offset_serialize_wrapper {
        let mut offset_table_inner = self.offset_table.lock().await;
        for (mq, offset) in offset_serialize_wrapper.offset_table {
            let offset = offset.load(Ordering::Relaxed);
            info!(
                "load consumer's offset, {} {} {}",
                self.group_name, mq, offset
            );
            offset_table_inner.insert(mq, ControllableOffset::new(offset));
        }
    }
    Ok(())
}

This change provides more context in case of an error during the loading process.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 8c74161 and 8349762.

📒 Files selected for processing (1)
  • rocketmq-client/src/consumer/store/local_file_offset_store.rs (2 hunks)
🔇 Additional comments (5)
rocketmq-client/src/consumer/store/local_file_offset_store.rs (5)

57-62: LGTM! Well-structured for thread-safe offset management.

The LocalFileOffsetStore struct is well-designed with appropriate fields for managing offsets. The use of Arc<Mutex<...>> for offset_table ensures thread-safe access in concurrent scenarios.


65-76: LGTM! Proper initialization of LocalFileOffsetStore.

The new method correctly initializes the LocalFileOffsetStore struct. It appropriately uses the LOCAL_OFFSET_STORE_DIR for setting the store_path and initializes an empty offset_table.


128-172: LGTM! Well-implemented offset management methods.

The update_offset, update_and_freeze_offset, and read_offset methods are well-implemented:

  • They correctly use mutex locks to ensure thread-safety.
  • The logic for different read types in read_offset is properly handled.
  • The use of entry API in update_offset is efficient.

175-227: 🛠️ Refactor suggestion

Enhance error handling and reduce code duplication in persistence methods.

The persist_all and persist methods are functionally correct, but there are opportunities for improvement:

  1. Error handling can be more robust.
  2. There's some code duplication between these methods.

Consider the following improvements:

  1. Create a helper method for reading and updating the offset wrapper:
fn read_and_update_offset_wrapper(&self, mq: Option<&MessageQueue>, offset: Option<i64>) -> Result<OffsetSerializeWrapper> {
    let mut wrapper = self.read_local_offset()
        .map_err(|e| MQClientError::MQClientErr(-1, format!("Failed to read local offset: {}", e)))?
        .unwrap_or_default();
    
    if let (Some(mq), Some(offset)) = (mq, offset) {
        wrapper.offset_table.insert(mq.clone(), AtomicI64::new(offset));
    }
    
    Ok(wrapper)
}
  1. Use this helper method in both persist_all and persist:
async fn persist_all(&mut self, mqs: &HashSet<MessageQueue>) {
    if mqs.is_empty() {
        return;
    }
    
    let offset_table = self.offset_table.lock().await;
    let mut wrapper = match self.read_and_update_offset_wrapper(None, None) {
        Ok(w) => w,
        Err(e) => {
            error!("Failed to read and update offset wrapper: {}", e);
            return;
        }
    };
    
    for (mq, offset) in offset_table.iter() {
        if mqs.contains(mq) {
            wrapper.offset_table.insert(mq.clone(), AtomicI64::new(offset.get_offset()));
        }
    }
    
    self.write_offset_to_file(wrapper);
}

async fn persist(&mut self, mq: &MessageQueue) {
    let offset_table = self.offset_table.lock().await;
    if let Some(offset) = offset_table.get(mq) {
        match self.read_and_update_offset_wrapper(Some(mq), Some(offset.get_offset())) {
            Ok(wrapper) => self.write_offset_to_file(wrapper),
            Err(e) => error!("Failed to read and update offset wrapper: {}", e),
        }
    }
}

fn write_offset_to_file(&self, wrapper: OffsetSerializeWrapper) {
    let content = wrapper.to_json_pretty();
    if !content.is_empty() {
        if let Err(e) = file_utils::string_to_file(&content, &self.store_path) {
            error!("Failed to persist consumer offset: {}, {}", self.store_path, e);
        }
    }
}

These changes improve error handling and reduce code duplication, making the methods more maintainable.

To ensure these changes don't introduce regressions, please run the following test script:

#!/bin/bash
# Verify that the persistence methods are called correctly
rg --type rust -e "persist_all" -e "persist.*MessageQueue" rocketmq-client/src/consumer/store/local_file_offset_store.rs

Line range hint 230-257: LGTM for remove_offset and clone_offset_table. Implement update_consume_offset_to_broker.

The remove_offset and clone_offset_table methods are well-implemented and thread-safe. However, update_consume_offset_to_broker is currently a no-op and needs implementation.

The update_consume_offset_to_broker method should be implemented to update the consumer offset on the broker. Would you like me to provide a skeleton implementation or open a GitHub issue to track this task?

To ensure the update_consume_offset_to_broker method is called correctly in the codebase, run the following script:

#!/bin/bash
# Search for calls to update_consume_offset_to_broker
rg --type rust "update_consume_offset_to_broker" rocketmq-client/src

This will help identify where the method is expected to be used and guide the implementation.

Comment on lines +78 to +106
fn read_local_offset(&self) -> Result<Option<OffsetSerializeWrapper>> {
let content =
file_utils::file_to_string(&self.store_path).map_or("".to_string(), |content| content);
if content.is_empty() {
self.read_local_offset_bak()
} else {
match OffsetSerializeWrapper::decode(content.as_bytes()) {
Ok(value) => Ok(Some(value)),
Err(e) => Err(MQClientError::MQClientErr(
-1,
format!("Failed to deserialize local offset: {}", e),
)),
}
}
}
fn read_local_offset_bak(&self) -> Result<Option<OffsetSerializeWrapper>> {
let content = file_utils::file_to_string(&format!("{}{}", self.store_path, ".bak"))
.map_or("".to_string(), |content| content);
if content.is_empty() {
Ok(None)
} else {
match OffsetSerializeWrapper::decode(content.as_bytes()) {
Ok(value) => Ok(Some(value)),
Err(_) => Err(MQClientError::MQClientErr(
-1,
format!("read local offset bak failed, content: {}", content),
)),
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

🛠️ Refactor suggestion

Error Handling and Security Concerns Remain in read_local_offset Methods.

The current implementation still defaults to an empty string when file reading fails in read_local_offset, and error messages in read_local_offset_bak include the full file content, which poses security risks and can lead to large log entries.

Please address the following:

  • read_local_offset (Lines 78-92): Handle file reading errors explicitly instead of defaulting to an empty string.
  • read_local_offset_bak (Lines 93-107): Avoid including the full file content in error messages to mitigate security risks.
🔗 Analysis chain

Improve error handling and file reading logic.

While the error handling has been improved, there are still some areas that could be enhanced:

  1. In read_local_offset, consider handling the file reading error explicitly instead of defaulting to an empty string.
  2. In read_local_offset_bak, the error message still includes the full file content, which could be a security risk and cause large log entries.

Consider refactoring these methods as follows:

fn read_local_offset(&self) -> Result<Option<OffsetSerializeWrapper>> {
    match file_utils::file_to_string(&self.store_path) {
        Ok(content) if !content.is_empty() => {
            OffsetSerializeWrapper::decode(content.as_bytes())
                .map(Some)
                .map_err(|e| MQClientError::MQClientErr(-1, format!("Failed to deserialize local offset: {}", e)))
        }
        Ok(_) => self.read_local_offset_bak(),
        Err(e) => {
            error!("Failed to read offset file {}: {}", self.store_path, e);
            self.read_local_offset_bak()
        }
    }
}

fn read_local_offset_bak(&self) -> Result<Option<OffsetSerializeWrapper>> {
    let bak_path = format!("{}.bak", self.store_path);
    match file_utils::file_to_string(&bak_path) {
        Ok(content) if !content.is_empty() => {
            OffsetSerializeWrapper::decode(content.as_bytes())
                .map(Some)
                .map_err(|e| MQClientError::MQClientErr(-1, format!("Failed to deserialize backup local offset: {}", e)))
        }
        Ok(_) => Ok(None),
        Err(e) => {
            error!("Failed to read backup offset file {}: {}", bak_path, e);
            Ok(None)
        }
    }
}

This refactoring improves error handling, provides more informative error messages, and avoids potential security risks.

To ensure these changes don't introduce regressions, please run the following test script:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Verify that the offset reading methods are called and handle errors correctly
rg --type rust -e "read_local_offset" -e "read_local_offset_bak" rocketmq-client/src/consumer/store/local_file_offset_store.rs

Length of output: 1320


Script:

#!/bin/bash
# Verify that the read_local_offset and read_local_offset_bak methods are present and correctly implemented
rg --type rust "fn read_local_offset|fn read_local_offset_bak" rocketmq-client/src/consumer/store/local_file_offset_store.rs

Length of output: 282


Script:

#!/bin/bash
# Extract the implementation of the `read_local_offset` method
ast-grep --lang rust --pattern 'fn read_local_offset(&self) -> Result<Option<OffsetSerializeWrapper>> { $$$ }' rocketmq-client/src/consumer/store/local_file_offset_store.rs

# Extract the implementation of the `read_local_offset_bak` method
ast-grep --lang rust --pattern 'fn read_local_offset_bak(&self) -> Result<Option<OffsetSerializeWrapper>> { $$$ }' rocketmq-client/src/consumer/store/local_file_offset_store.rs

Length of output: 3552

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature🚀] Support client Broadcasting consume-local file store
2 participants