-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #1012]🚀Support client Broadcasting consume-local file store⚡️ #1013
Conversation
WalkthroughThe changes enhance the Changes
Assessment against linked issues
Possibly related PRs
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (4)
rocketmq-client/src/consumer/store.rs (1)
20-20
: LGTM! Consider adding a brief comment for the new module.The addition of the
offset_serialize_wrapper
module aligns with the PR objective of supporting client broadcasting for consuming local file store. This change looks good and doesn't impact existing functionality.Consider adding a brief comment above the new module declaration to explain its purpose. This would improve code documentation and make it easier for other developers to understand the module's role. For example:
// Module for handling serialization of offsets mod offset_serialize_wrapper;rocketmq-client/src/consumer/store/offset_serialize_wrapper.rs (1)
24-28
: LGTM: Well-structuredOffsetSerializeWrapper
, with a minor consideration.The
OffsetSerializeWrapper
struct is well-designed for serializing and deserializing offset data. The use ofAtomicI64
for offset values suggests thread-safe operations, which is a good practice. ThecamelCase
renaming is appropriate for maintaining consistency with other systems or languages that use camel case naming conventions.However, consider if the
Default
derive is necessary or appropriate for this struct. An emptyHashMap
as the default might not be the most meaningful default state for this wrapper. If a specific initial state is required, it might be better to implementDefault
manually or provide a custom initialization method.rocketmq-client/src/consumer/store/controllable_offset.rs (1)
35-40
: LGTM! Consider adding documentation.The new
new_atomic
method is a valuable addition, providing more flexibility in initializingControllableOffset
. The implementation is consistent with the existingnew
method and maintains the expected behavior.Consider adding a doc comment to explain the purpose and use case of this new method. For example:
/// Creates a new `ControllableOffset` with a provided `AtomicI64`. /// /// This method allows for more flexible initialization when an `AtomicI64` is already available. /// /// # Arguments /// /// * `value` - An `AtomicI64` to be used as the initial offset value. pub fn new_atomic(value: AtomicI64) -> Self { // ... (existing implementation) }rocketmq-client/src/consumer/store/local_file_offset_store.rs (1)
256-258
: Include a comment explaining the empty implementation ofupdate_consume_offset_to_broker
The
update_consume_offset_to_broker
method is empty in this implementation. Adding a comment to indicate that it's intentionally left empty can improve code readability and maintainability.You can add a comment like:
// LocalFileOffsetStore does not update consume offset to broker.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (4)
- rocketmq-client/src/consumer/store.rs (1 hunks)
- rocketmq-client/src/consumer/store/controllable_offset.rs (1 hunks)
- rocketmq-client/src/consumer/store/local_file_offset_store.rs (2 hunks)
- rocketmq-client/src/consumer/store/offset_serialize_wrapper.rs (1 hunks)
🔇 Additional comments (16)
rocketmq-client/src/consumer/store/offset_serialize_wrapper.rs (2)
1-22
: LGTM: License and imports are appropriate.The file includes the correct Apache 2.0 license, which is suitable for the Apache RocketMQ project. The imports are relevant to the functionality being implemented, including necessary standard library components and project-specific modules.
1-28
: Consider future enhancements and provide usage context.The
OffsetSerializeWrapper
struct is a good foundation for managing offsets in a serializable format, which aligns with the PR objectives. However, to ensure its effectiveness and ease of use, consider the following suggestions:
- Implement methods for adding, updating, and retrieving offsets from the
offset_table
.- Add documentation comments to explain the purpose and usage of this struct.
- Consider implementing the
Clone
trait if deep copying of the wrapper might be needed.Additionally, it would be helpful to provide more context on how this struct will be used within the broader system. This information could guide further optimizations or additional functionality that might be necessary.
To better understand the usage and potential impact of this new struct, let's check for any references to
OffsetSerializeWrapper
in the codebase:✅ Verification successful
Review Comment Clarification
Upon reviewing the git diff, the
offset_serialize_wrapper.rs
file is newly added with lines 1-28 entirely introducing new code. There are no removed or replaced lines in this section. Therefore, the initial observation about the absence of removed or replaced code is accurate.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for usage of OffsetSerializeWrapper rg --type rust "OffsetSerializeWrapper"Length of output: 1026
Script:
#!/bin/bash # Get git diff for lines 1-28 in offset_serialize_wrapper.rs git diff HEAD~1 HEAD -- rocketmq-client/src/consumer/store/offset_serialize_wrapper.rs | sed -n '1,28p'Length of output: 1391
rocketmq-client/src/consumer/store/controllable_offset.rs (1)
35-40
: Verify the usage of the newnew_atomic
method in the codebase.The addition of the
new_atomic
method enhances the flexibility ofControllableOffset
initialization, which aligns well with the PR objectives. It maintains backward compatibility while providing a new option for scenarios where anAtomicI64
is already available.To ensure proper integration, please run the following script to check for any usage of the new method:
This will help identify if the new method is being utilized effectively and if there are any other places in the codebase where it could potentially be beneficial.
✅ Verification successful
Usage of the new
new_atomic
method is correctly confined tocontrollable_offset.rs
and does not interfere with existing codebase functionalities.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for usage of the new_atomic method in the codebase # Test: Search for new_atomic method usage rg --type rust 'ControllableOffset::new_atomic' # Test: Search for potential places where new_atomic could be used (AtomicI64 creation) rg --type rust 'AtomicI64::new'Length of output: 6164
rocketmq-client/src/consumer/store/local_file_offset_store.rs (13)
19-22
: LGTM!The addition of necessary imports for
PathBuf
,AtomicI64
,Ordering
, andArc
enhances the functionality and thread safety of the code.
24-35
: LGTM!The imports of
Lazy
,ControllableOffset
, andOffsetSerializeWrapper
are appropriate for implementing lazy initialization and handling offset serialization.
38-40
: LGTM!Importing
MQClientError
andResult
is appropriate for error handling within the module.
57-62
: Definition ofLocalFileOffsetStore
struct looks goodThe struct fields are appropriately defined to support local offset storage functionality.
65-76
: Constructornew
properly initializesLocalFileOffsetStore
The
new
method correctly initializes thestore_path
andoffset_table
.
94-106
: LGTM!The
read_local_offset_bak
method appropriately attempts to read the backup offset file and handles errors.
112-125
:load
method correctly loads offsets from local storageThe
load
function reads the local offset and populates theoffset_table
as expected.
129-137
:update_offset
method properly updates offsetsThe method correctly handles both
increase_only
scenarios and updates the offsets.
141-145
:update_and_freeze_offset
method correctly updates and freezes offsetThe method updates the offset and freezes it as intended.
149-172
:read_offset
method correctly retrieves offsetsThe function handles different
ReadOffsetType
variants and retrieves offsets from memory or store appropriately.
206-227
:persist
method writes the offset of a specific message queueThe method correctly updates and writes the offset data for the specified
MessageQueue
.
231-238
:remove_offset
method correctly removes offset from the tableThe method removes the offset for the given
MessageQueue
and logs the action.
242-247
:clone_offset_table
method correctly clones offsetsThe method filters offset entries by topic and returns a cloned
HashMap
.
let mut offset_serialize_wrapper = match self.read_local_offset() { | ||
Ok(value) => value.unwrap_or_default(), | ||
Err(e) => { | ||
error!("read local offset failed: {}", e); | ||
return; | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider handling read errors in persist_all
without early return
If read_local_offset
fails, persist_all
logs an error and returns early, which could prevent offsets from being persisted. Consider proceeding with an empty OffsetSerializeWrapper
to ensure that the offsets are persisted, even if the existing local offsets cannot be read.
Apply this diff to handle read errors gracefully:
let mut offset_serialize_wrapper = match self.read_local_offset() {
Ok(value) => value.unwrap_or_default(),
Err(e) => {
error!("read local offset failed: {}", e);
- return;
+ OffsetSerializeWrapper::default()
}
};
Similarly, consider updating the persist
method to handle errors without early return.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let mut offset_serialize_wrapper = match self.read_local_offset() { | |
Ok(value) => value.unwrap_or_default(), | |
Err(e) => { | |
error!("read local offset failed: {}", e); | |
return; | |
} | |
}; | |
let mut offset_serialize_wrapper = match self.read_local_offset() { | |
Ok(value) => value.unwrap_or_default(), | |
Err(e) => { | |
error!("read local offset failed: {}", e); | |
OffsetSerializeWrapper::default() | |
} | |
}; |
fn read_local_offset(&self) -> Result<Option<OffsetSerializeWrapper>> { | ||
let content = | ||
file_utils::file_to_string(&self.store_path).map_or("".to_string(), |content| content); | ||
if content.is_empty() { | ||
self.read_local_offset_bak() | ||
} else { | ||
match OffsetSerializeWrapper::decode(content.as_bytes()) { | ||
Ok(value) => Ok(Some(value)), | ||
Err(_) => Err(MQClientError::MQClientErr( | ||
-1, | ||
format!("read local offset failed, content: {}", content), | ||
)), | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle file reading errors explicitly instead of defaulting to empty content
The read_local_offset
function maps errors from reading the offset file to an empty string, which may inadvertently mask I/O errors such as permission denied or disk errors. This could lead to unexpected behavior, as the function then attempts to read the backup file or proceeds with empty content, potentially overwriting valid data. It's better to handle errors explicitly and propagate them appropriately.
Apply this diff to handle errors explicitly:
fn read_local_offset(&self) -> Result<Option<OffsetSerializeWrapper>> {
- let content =
- file_utils::file_to_string(&self.store_path).map_or("".to_string(), |content| content);
- if content.is_empty() {
- self.read_local_offset_bak()
- } else {
- match OffsetSerializeWrapper::decode(content.as_bytes()) {
- Ok(value) => Ok(Some(value)),
- Err(_) => Err(MQClientError::MQClientErr(
- -1,
- format!("read local offset failed, content: {}", content),
- )),
- }
- }
+ match file_utils::file_to_string(&self.store_path) {
+ Ok(content) if !content.is_empty() => {
+ match OffsetSerializeWrapper::decode(content.as_bytes()) {
+ Ok(value) => Ok(Some(value)),
+ Err(e) => Err(MQClientError::MQClientErr(
+ -1,
+ format!("Failed to deserialize local offset: {}", e),
+ )),
+ }
+ }
+ Ok(_) => self.read_local_offset_bak(),
+ Err(e) => {
+ error!("Failed to read offset file {}: {}", self.store_path, e);
+ self.read_local_offset_bak()
+ }
+ }
}
Similarly, update read_local_offset_bak
to handle errors appropriately.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
fn read_local_offset(&self) -> Result<Option<OffsetSerializeWrapper>> { | |
let content = | |
file_utils::file_to_string(&self.store_path).map_or("".to_string(), |content| content); | |
if content.is_empty() { | |
self.read_local_offset_bak() | |
} else { | |
match OffsetSerializeWrapper::decode(content.as_bytes()) { | |
Ok(value) => Ok(Some(value)), | |
Err(_) => Err(MQClientError::MQClientErr( | |
-1, | |
format!("read local offset failed, content: {}", content), | |
)), | |
} | |
} | |
} | |
fn read_local_offset(&self) -> Result<Option<OffsetSerializeWrapper>> { | |
match file_utils::file_to_string(&self.store_path) { | |
Ok(content) if !content.is_empty() => { | |
match OffsetSerializeWrapper::decode(content.as_bytes()) { | |
Ok(value) => Ok(Some(value)), | |
Err(e) => Err(MQClientError::MQClientErr( | |
-1, | |
format!("Failed to deserialize local offset: {}", e), | |
)), | |
} | |
} | |
Ok(_) => self.read_local_offset_bak(), | |
Err(e) => { | |
error!("Failed to read offset file {}: {}", self.store_path, e); | |
self.read_local_offset_bak() | |
} | |
} | |
} |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1013 +/- ##
==========================================
- Coverage 19.92% 19.88% -0.05%
==========================================
Files 418 419 +1
Lines 34502 34567 +65
==========================================
- Hits 6875 6872 -3
- Misses 27627 27695 +68 ☔ View full report in Codecov by Sentry. |
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (2)
rocketmq-client/src/consumer/store/local_file_offset_store.rs (2)
19-55
: LGTM! Consider usingdirs
crate for cross-platform home directory detection.The new imports and the
LOCAL_OFFSET_STORE_DIR
static variable are well-implemented, providing good configurability and cross-platform support.Consider using the
dirs
crate for a more robust cross-platform solution to detect the home directory. This can simplify the code and potentially handle edge cases better. Here's an example of how you could refactor this:use dirs; static LOCAL_OFFSET_STORE_DIR: Lazy<PathBuf> = Lazy::new(|| { let default_dir = dirs::home_dir() .unwrap_or_else(|| PathBuf::from("/tmp")) .join(".rocketmq_offsets"); std::env::var("rocketmq.client.localOffsetStoreDir") .map(PathBuf::from) .unwrap_or(default_dir) });This approach reduces the need for platform-specific code and handles cases where the home directory might not be available.
111-125
: Consider improving error handling in theload
method.The
load
method implementation is generally good, but the error handling could be enhanced to provide more context.Consider updating the error handling as follows:
async fn load(&self) -> crate::Result<()> { let offset_serialize_wrapper = self.read_local_offset() .map_err(|e| MQClientError::MQClientErr(-1, format!("Failed to load local offset: {}", e)))?; if let Some(offset_serialize_wrapper) = offset_serialize_wrapper { let mut offset_table_inner = self.offset_table.lock().await; for (mq, offset) in offset_serialize_wrapper.offset_table { let offset = offset.load(Ordering::Relaxed); info!( "load consumer's offset, {} {} {}", self.group_name, mq, offset ); offset_table_inner.insert(mq, ControllableOffset::new(offset)); } } Ok(()) }This change provides more context in case of an error during the loading process.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (1)
- rocketmq-client/src/consumer/store/local_file_offset_store.rs (2 hunks)
🔇 Additional comments (5)
rocketmq-client/src/consumer/store/local_file_offset_store.rs (5)
57-62
: LGTM! Well-structured for thread-safe offset management.The
LocalFileOffsetStore
struct is well-designed with appropriate fields for managing offsets. The use ofArc<Mutex<...>>
foroffset_table
ensures thread-safe access in concurrent scenarios.
65-76
: LGTM! Proper initialization of LocalFileOffsetStore.The
new
method correctly initializes theLocalFileOffsetStore
struct. It appropriately uses theLOCAL_OFFSET_STORE_DIR
for setting thestore_path
and initializes an emptyoffset_table
.
128-172
: LGTM! Well-implemented offset management methods.The
update_offset
,update_and_freeze_offset
, andread_offset
methods are well-implemented:
- They correctly use mutex locks to ensure thread-safety.
- The logic for different read types in
read_offset
is properly handled.- The use of
entry
API inupdate_offset
is efficient.
175-227
: 🛠️ Refactor suggestionEnhance error handling and reduce code duplication in persistence methods.
The
persist_all
andpersist
methods are functionally correct, but there are opportunities for improvement:
- Error handling can be more robust.
- There's some code duplication between these methods.
Consider the following improvements:
- Create a helper method for reading and updating the offset wrapper:
fn read_and_update_offset_wrapper(&self, mq: Option<&MessageQueue>, offset: Option<i64>) -> Result<OffsetSerializeWrapper> { let mut wrapper = self.read_local_offset() .map_err(|e| MQClientError::MQClientErr(-1, format!("Failed to read local offset: {}", e)))? .unwrap_or_default(); if let (Some(mq), Some(offset)) = (mq, offset) { wrapper.offset_table.insert(mq.clone(), AtomicI64::new(offset)); } Ok(wrapper) }
- Use this helper method in both
persist_all
andpersist
:async fn persist_all(&mut self, mqs: &HashSet<MessageQueue>) { if mqs.is_empty() { return; } let offset_table = self.offset_table.lock().await; let mut wrapper = match self.read_and_update_offset_wrapper(None, None) { Ok(w) => w, Err(e) => { error!("Failed to read and update offset wrapper: {}", e); return; } }; for (mq, offset) in offset_table.iter() { if mqs.contains(mq) { wrapper.offset_table.insert(mq.clone(), AtomicI64::new(offset.get_offset())); } } self.write_offset_to_file(wrapper); } async fn persist(&mut self, mq: &MessageQueue) { let offset_table = self.offset_table.lock().await; if let Some(offset) = offset_table.get(mq) { match self.read_and_update_offset_wrapper(Some(mq), Some(offset.get_offset())) { Ok(wrapper) => self.write_offset_to_file(wrapper), Err(e) => error!("Failed to read and update offset wrapper: {}", e), } } } fn write_offset_to_file(&self, wrapper: OffsetSerializeWrapper) { let content = wrapper.to_json_pretty(); if !content.is_empty() { if let Err(e) = file_utils::string_to_file(&content, &self.store_path) { error!("Failed to persist consumer offset: {}, {}", self.store_path, e); } } }These changes improve error handling and reduce code duplication, making the methods more maintainable.
To ensure these changes don't introduce regressions, please run the following test script:
#!/bin/bash # Verify that the persistence methods are called correctly rg --type rust -e "persist_all" -e "persist.*MessageQueue" rocketmq-client/src/consumer/store/local_file_offset_store.rs
Line range hint
230-257
: LGTM forremove_offset
andclone_offset_table
. Implementupdate_consume_offset_to_broker
.The
remove_offset
andclone_offset_table
methods are well-implemented and thread-safe. However,update_consume_offset_to_broker
is currently a no-op and needs implementation.The
update_consume_offset_to_broker
method should be implemented to update the consumer offset on the broker. Would you like me to provide a skeleton implementation or open a GitHub issue to track this task?To ensure the
update_consume_offset_to_broker
method is called correctly in the codebase, run the following script:#!/bin/bash # Search for calls to update_consume_offset_to_broker rg --type rust "update_consume_offset_to_broker" rocketmq-client/srcThis will help identify where the method is expected to be used and guide the implementation.
fn read_local_offset(&self) -> Result<Option<OffsetSerializeWrapper>> { | ||
let content = | ||
file_utils::file_to_string(&self.store_path).map_or("".to_string(), |content| content); | ||
if content.is_empty() { | ||
self.read_local_offset_bak() | ||
} else { | ||
match OffsetSerializeWrapper::decode(content.as_bytes()) { | ||
Ok(value) => Ok(Some(value)), | ||
Err(e) => Err(MQClientError::MQClientErr( | ||
-1, | ||
format!("Failed to deserialize local offset: {}", e), | ||
)), | ||
} | ||
} | ||
} | ||
fn read_local_offset_bak(&self) -> Result<Option<OffsetSerializeWrapper>> { | ||
let content = file_utils::file_to_string(&format!("{}{}", self.store_path, ".bak")) | ||
.map_or("".to_string(), |content| content); | ||
if content.is_empty() { | ||
Ok(None) | ||
} else { | ||
match OffsetSerializeWrapper::decode(content.as_bytes()) { | ||
Ok(value) => Ok(Some(value)), | ||
Err(_) => Err(MQClientError::MQClientErr( | ||
-1, | ||
format!("read local offset bak failed, content: {}", content), | ||
)), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
🛠️ Refactor suggestion
Error Handling and Security Concerns Remain in read_local_offset
Methods.
The current implementation still defaults to an empty string when file reading fails in read_local_offset
, and error messages in read_local_offset_bak
include the full file content, which poses security risks and can lead to large log entries.
Please address the following:
read_local_offset
(Lines 78-92): Handle file reading errors explicitly instead of defaulting to an empty string.read_local_offset_bak
(Lines 93-107): Avoid including the full file content in error messages to mitigate security risks.
🔗 Analysis chain
Improve error handling and file reading logic.
While the error handling has been improved, there are still some areas that could be enhanced:
- In
read_local_offset
, consider handling the file reading error explicitly instead of defaulting to an empty string. - In
read_local_offset_bak
, the error message still includes the full file content, which could be a security risk and cause large log entries.
Consider refactoring these methods as follows:
fn read_local_offset(&self) -> Result<Option<OffsetSerializeWrapper>> {
match file_utils::file_to_string(&self.store_path) {
Ok(content) if !content.is_empty() => {
OffsetSerializeWrapper::decode(content.as_bytes())
.map(Some)
.map_err(|e| MQClientError::MQClientErr(-1, format!("Failed to deserialize local offset: {}", e)))
}
Ok(_) => self.read_local_offset_bak(),
Err(e) => {
error!("Failed to read offset file {}: {}", self.store_path, e);
self.read_local_offset_bak()
}
}
}
fn read_local_offset_bak(&self) -> Result<Option<OffsetSerializeWrapper>> {
let bak_path = format!("{}.bak", self.store_path);
match file_utils::file_to_string(&bak_path) {
Ok(content) if !content.is_empty() => {
OffsetSerializeWrapper::decode(content.as_bytes())
.map(Some)
.map_err(|e| MQClientError::MQClientErr(-1, format!("Failed to deserialize backup local offset: {}", e)))
}
Ok(_) => Ok(None),
Err(e) => {
error!("Failed to read backup offset file {}: {}", bak_path, e);
Ok(None)
}
}
}
This refactoring improves error handling, provides more informative error messages, and avoids potential security risks.
To ensure these changes don't introduce regressions, please run the following test script:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Verify that the offset reading methods are called and handle errors correctly
rg --type rust -e "read_local_offset" -e "read_local_offset_bak" rocketmq-client/src/consumer/store/local_file_offset_store.rs
Length of output: 1320
Script:
#!/bin/bash
# Verify that the read_local_offset and read_local_offset_bak methods are present and correctly implemented
rg --type rust "fn read_local_offset|fn read_local_offset_bak" rocketmq-client/src/consumer/store/local_file_offset_store.rs
Length of output: 282
Script:
#!/bin/bash
# Extract the implementation of the `read_local_offset` method
ast-grep --lang rust --pattern 'fn read_local_offset(&self) -> Result<Option<OffsetSerializeWrapper>> { $$$ }' rocketmq-client/src/consumer/store/local_file_offset_store.rs
# Extract the implementation of the `read_local_offset_bak` method
ast-grep --lang rust --pattern 'fn read_local_offset_bak(&self) -> Result<Option<OffsetSerializeWrapper>> { $$$ }' rocketmq-client/src/consumer/store/local_file_offset_store.rs
Length of output: 3552
Which Issue(s) This PR Fixes(Closes)
Fixes #1012
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
LocalFileOffsetStore
with comprehensive methods for managing offsets, including reading, updating, and persisting.Bug Fixes
Documentation