Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #790]🚀Support Query message(request code :12,33)🔥 #791

Merged
merged 1 commit into from
Jul 15, 2024

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Jul 15, 2024

Which Issue(s) This PR Fixes(Closes)

Fixes #790

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Introduced QueryMessageProcessor to enhance message querying and viewing functionalities.
    • Added async methods for querying and selecting messages by offset.
    • Expanded headers for query and view message requests and responses.
  • Enhancements

    • Improved flexibility in QueryMessageProcessor with generic types.
    • Modified QueryOffsetResult to include mutable access to physical offsets.
  • Bug Fixes

    • Adjusted parameter types in query_offset to improve query efficiency.
  • Code Improvements

    • Refactored message store methods to handle additional message querying and selection scenarios.

Copy link
Contributor

coderabbitai bot commented Jul 15, 2024

Walkthrough

The update introduces QueryMessageProcessor to enhance RocketMQ's message querying and viewing capabilities. This includes asynchronous methods for querying messages by attributes and viewing messages by ID. Changes span across rocketmq-broker, rocketmq-remoting, and rocketmq-store modules, adding new headers, structs, and methods to support the new functionality.

Changes

Files/Paths Change Summary
rocketmq-broker/src/broker_runtime.rs Added QueryMessageProcessor initialization, cloned message_store, and updated BrokerRuntime struct.
rocketmq-broker/src/processor.rs Updated QueryMessageProcessor to be generic, handled new RequestCode variants.
rocketmq-broker/src/processor/query_message_processor.rs Made QueryMessageProcessor generic, added async methods for request processing.
rocketmq-remoting/src/protocol/header.rs Added new modules for query and view message request/response headers.
rocketmq-remoting/src/protocol/header/query_message_request_header.rs Added QueryMessageRequestHeader with serialization/deserialization methods and unit tests.
rocketmq-remoting/src/protocol/header/query_message_response_header.rs Added QueryMessageResponseHeader struct with traits for serialization/deserialization.
rocketmq-remoting/src/protocol/header/view_message_request_header.rs Added ViewMessageRequestHeader struct with public fields and serialization/deserialization traits.
rocketmq-remoting/src/protocol/header/view_message_response_header.rs Added ViewMessageResponseHeader struct with traits for serialization/deserialization.
rocketmq-remoting/src/rpc/topic_request_header.rs Modified to_map method to handle rpc_request_header differently.
rocketmq-store/src/base.rs Added query_message_result module for handling query results.
rocketmq-store/src/base/query_message_result.rs Introduced QueryMessageResult struct with methods for message data management.
rocketmq-store/src/base/select_result.rs Added methods get_buffer_slice_mut and get_bytes to SelectMappedBufferResult.
rocketmq-store/src/index/index_service.rs Modified query_offset parameters to use &str instead of String.
rocketmq-store/src/index/query_offset_result.rs Added get_phy_offsets_mut method to QueryOffsetResult struct.
rocketmq-store/src/log_file.rs Introduced async methods for querying/selecting messages, adjusted signatures to use new types.
rocketmq-store/src/message_store/default_message_store.rs Added imports and new async methods for querying/selecting messages by various criteria.

Assessment against linked issues

Objective ($issue numbers) Addressed Explanation
Support Query message(request code :12,33) (#790)

In the realms of RocketMQ's might,
Query messages come to light.
With processors and headers so bright,
They enhance our broker's flight.
Messages now query, view, and delight,
RocketMQ soars to a new height. 🚀


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@mxsm
Copy link
Owner Author

mxsm commented Jul 15, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

Copy link

codecov bot commented Jul 15, 2024

Codecov Report

Attention: Patch coverage is 30.46595% with 194 lines in your changes missing coverage. Please review.

Project coverage is 27.73%. Comparing base (4f59378) to head (66cd887).

Files Patch % Lines
...mq-broker/src/processor/query_message_processor.rs 0.00% 71 Missing ⚠️
...q-store/src/message_store/default_message_store.rs 0.00% 64 Missing ⚠️
rocketmq-store/src/base/query_message_result.rs 0.00% 19 Missing ⚠️
...rc/protocol/header/query_message_request_header.rs 90.32% 9 Missing ⚠️
rocketmq-store/src/base/select_result.rs 0.00% 9 Missing ⚠️
rocketmq-store/src/index/index_service.rs 0.00% 5 Missing ⚠️
rocketmq-broker/src/broker_runtime.rs 0.00% 3 Missing ⚠️
rocketmq-broker/src/processor.rs 0.00% 3 Missing ⚠️
...c/protocol/header/query_message_response_header.rs 0.00% 3 Missing ⚠️
...src/protocol/header/view_message_request_header.rs 0.00% 3 Missing ⚠️
... and 3 more
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #791      +/-   ##
==========================================
+ Coverage   27.65%   27.73%   +0.07%     
==========================================
  Files         289      294       +5     
  Lines       23128    23400     +272     
==========================================
+ Hits         6397     6490      +93     
- Misses      16731    16910     +179     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Outside diff range, codebase verification and nitpick comments (7)
rocketmq-remoting/src/protocol/header.rs (2)

33-34: Add documentation for the new modules.

Consider adding documentation comments for the new modules query_message_request_header and query_message_response_header to improve code readability and maintainability.


38-39: Add documentation for the new modules.

Consider adding documentation comments for the new modules view_message_request_header and view_message_response_header to improve code readability and maintainability.

rocketmq-store/src/base.rs (1)

31-31: Add documentation for the new module.

Consider adding a documentation comment for the new module query_message_result to improve code readability and maintainability.

rocketmq-store/src/base/query_message_result.rs (3)

22-28: Add documentation for the struct and its fields.

Consider adding documentation comments for the QueryMessageResult struct and its fields to improve code readability and maintainability.


30-43: Add documentation for the method.

Consider adding a documentation comment for the get_message_data method to improve code readability and maintainability.


45-48: Add documentation for the method.

Consider adding a documentation comment for the add_message method to improve code readability and maintainability.

rocketmq-store/src/index/index_service.rs (1)

Line range hint 134-167:
Consider adding comments for readability.

Adding comments to explain the logic within the query_offset function would improve readability and maintainability.

Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 4f59378 and 66cd887.

Files selected for processing (16)
  • rocketmq-broker/src/broker_runtime.rs (4 hunks)
  • rocketmq-broker/src/processor.rs (2 hunks)
  • rocketmq-broker/src/processor/query_message_processor.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header/query_message_request_header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header/query_message_response_header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header/view_message_request_header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header/view_message_response_header.rs (1 hunks)
  • rocketmq-remoting/src/rpc/topic_request_header.rs (1 hunks)
  • rocketmq-store/src/base.rs (1 hunks)
  • rocketmq-store/src/base/query_message_result.rs (1 hunks)
  • rocketmq-store/src/base/select_result.rs (2 hunks)
  • rocketmq-store/src/index/index_service.rs (2 hunks)
  • rocketmq-store/src/index/query_offset_result.rs (1 hunks)
  • rocketmq-store/src/log_file.rs (2 hunks)
  • rocketmq-store/src/message_store/default_message_store.rs (4 hunks)
Additional comments not posted (31)
rocketmq-remoting/src/protocol/header/view_message_response_header.rs (2)

1-16: Licensing Information Approved.

The licensing information is standard and complies with ASF requirements.


17-22: Struct Definition Approved.

The ViewMessageResponseHeader struct is correctly defined and uses appropriate traits.

rocketmq-remoting/src/protocol/header/view_message_request_header.rs (2)

1-16: Licensing Information Approved.

The licensing information is standard and complies with ASF requirements.


17-25: Struct Definition Approved.

The ViewMessageRequestHeader struct is correctly defined and uses appropriate traits.

rocketmq-remoting/src/protocol/header/query_message_response_header.rs (2)

1-16: Licensing Information Approved.

The licensing information is standard and complies with ASF requirements.


17-26: Struct Definition Approved.

The QueryMessageResponseHeader struct is correctly defined and uses appropriate traits.

rocketmq-store/src/index/query_offset_result.rs (2)

Line range hint 1-10: Licensing Information Approved.

The licensing information is standard and complies with ASF requirements.


Line range hint 12-43: Struct Definition and Method Approved.

The QueryOffsetResult struct is correctly defined and the new method get_phy_offsets_mut is appropriately implemented.

rocketmq-store/src/base/select_result.rs (2)

46-50: Approve the get_buffer_slice_mut method.

The method correctly retrieves a mutable slice of the buffer from the mapped file. Ensure that the mapped file is not null and the buffer access is within bounds.


52-57: Approve the get_bytes method.

The method correctly converts the buffer to a Bytes object if the size is greater than 0 and the mapped file is not null. Ensure that the buffer conversion is handled properly.

rocketmq-store/src/log_file.rs (5)

124-131: Approve the query_message method.

The method correctly handles querying messages based on the provided topic, key, max number, and timestamp range. Ensure that the query parameters are validated and the query logic is correct.


133-136: Approve the select_one_message_by_offset method.

The method correctly handles selecting a single message by its offset. Ensure that the offset is validated and the selection logic is correct.


138-142: Approve the select_one_message_by_offset_with_size method.

The method correctly handles selecting a single message by its offset and size. Ensure that the offset and size are validated and the selection logic is correct.


144-144: Approve the look_message_by_offset method.

The method correctly handles looking up a message by its offset. Ensure that the offset is validated and the lookup logic is correct.


146-150: Approve the look_message_by_offset_with_size method.

The method correctly handles looking up a message by its offset and size. Ensure that the offset and size are validated and the lookup logic is correct.

rocketmq-broker/src/processor/query_message_processor.rs (5)

32-35: Approve the QueryMessageProcessor struct.

The struct is correctly defined and the fields are properly initialized. Ensure that the generic type MS is correctly constrained.


38-43: Approve the new method.

The method correctly initializes a new instance of QueryMessageProcessor with the provided message_store_config and message_store.


50-62: Approve the process_request method.

The method correctly handles processing requests based on the request code. Ensure that the request code is validated and the request processing logic is correct.


64-110: Approve the query_message method.

The method correctly handles querying messages based on the request parameters. Ensure that the query parameters are validated and the query logic is correct.


113-138: Approve the view_message_by_id method.

The method correctly handles viewing a message by its ID based on the request parameters. Ensure that the request parameters are validated and the view logic is correct.

rocketmq-remoting/src/protocol/header/query_message_request_header.rs (6)

24-38: Approve the QueryMessageRequestHeader struct.

The struct is correctly defined and the fields are properly initialized. Ensure that the nested TopicRequestHeader is correctly handled.


55-75: Approve the to_map method.

The method correctly handles converting the struct to a map. Ensure that all fields are included in the map and the conversion logic is correct.


78-91: Approve the from method.

The method correctly handles creating an instance of the struct from a map. Ensure that all required fields are present in the map and the creation logic is correct.


100-115: Approve the creating_from_map_with_all_fields_populates_struct_correctly test.

The test correctly verifies creating an instance of the struct from a map with all fields populated. Ensure that all field values are correctly checked.


118-131: Approve the creating_from_map_missing_optional_fields_still_succeeds test.

The test correctly verifies creating an instance of the struct from a map with missing optional fields. Ensure that all required fields are correctly checked.


133-143: Approve the creating_from_map_with_invalid_number_fields_returns_none test.

The test correctly verifies creating an instance of the struct from a map with invalid number fields. Ensure that all invalid field values are correctly checked.

rocketmq-store/src/index/index_service.rs (1)

134-137: Good use of &str for parameters.

Changing topic and key to &str improves performance by avoiding unnecessary allocations.

rocketmq-broker/src/processor.rs (1)

165-169: Good addition for handling new request codes.

The handling of RequestCode::QueryMessage and RequestCode::ViewMessageById is correctly added.

rocketmq-broker/src/broker_runtime.rs (1)

434-435: Correct initialization of QueryMessageProcessor.

The QueryMessageProcessor is correctly initialized with the necessary configurations.

rocketmq-store/src/message_store/default_message_store.rs (2)

1158-1164: LGTM!

The method is correctly implemented.


1175-1190: LGTM!

The method is correctly implemented.

Comment on lines +54 to +57
if let Some(value) = self.rpc_request_header.as_ref() {
if let Some(rpc_map) = value.to_map() {
map.extend(rpc_map);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve error handling in to_map method.

Consider adding error handling for potential issues when converting rpc_request_header to a map. This can help prevent unexpected behavior if the conversion fails.

if let Some(value) = self.rpc_request_header.as_ref() {
    if let Some(rpc_map) = value.to_map() {
        map.extend(rpc_map);
    } else {
        // Handle the error case where rpc_map is None
        // e.g., log an error or return an error result
    }
}

Comment on lines +1095 to +1144
async fn query_message(
&self,
topic: &str,
key: &str,
max_num: i32,
begin_timestamp: i64,
end_timestamp: i64,
) -> Option<QueryMessageResult> {
let mut query_message_result = QueryMessageResult::default();
let mut last_query_msg_time = end_timestamp;
for i in 1..3 {
let mut query_offset_result = self.index_service.query_offset(
topic,
key,
max_num,
begin_timestamp,
end_timestamp,
);
if query_offset_result.get_phy_offsets().is_empty() {
break;
}

query_offset_result.get_phy_offsets_mut().sort();

query_message_result.index_last_update_timestamp =
query_offset_result.get_index_last_update_timestamp();
query_message_result.index_last_update_phyoffset =
query_offset_result.get_index_last_update_phyoffset();
let phy_offsets = query_offset_result.get_phy_offsets();
for m in 0..phy_offsets.len() {
let offset = *phy_offsets.get(m).unwrap();
let msg = self.look_message_by_offset(offset);
if m == 0 {
last_query_msg_time = msg.as_ref().unwrap().store_timestamp;
}
let result = self.commit_log.get_data_with_option(offset, false);
if let Some(sbr) = result {
query_message_result.add_message(sbr);
}
}
if query_message_result.buffer_total_size > 0 {
break;
}
if last_query_msg_time < begin_timestamp {
break;
}
}

Some(query_message_result)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve error handling and justify loop limit.

  1. Error handling for look_message_by_offset and get_data_with_option should be improved to handle cases where these methods return None.
  2. The loop limit of 3 iterations seems arbitrary. Consider making this limit configurable or providing a justification for this choice.
for i in 1..3 {
    let mut query_offset_result = self.index_service.query_offset(
        topic,
        key,
        max_num,
        begin_timestamp,
        end_timestamp,
    );
    if query_offset_result.get_phy_offsets().is_empty() {
        break;
    }

    query_offset_result.get_phy_offsets_mut().sort();

    query_message_result.index_last_update_timestamp =
        query_offset_result.get_index_last_update_timestamp();
    query_message_result.index_last_update_phyoffset =
        query_offset_result.get_index_last_update_phyoffset();
    let phy_offsets = query_offset_result.get_phy_offsets();
    for m in 0..phy_offsets.len() {
        let offset = *phy_offsets.get(m).unwrap();
        let msg = self.look_message_by_offset(offset);
        if msg.is_none() {
            continue; // Handle None case
        }
        if m == 0 {
            last_query_msg_time = msg.as_ref().unwrap().store_timestamp;
        }
        let result = self.commit_log.get_data_with_option(offset, false);
        if let Some(sbr) = result {
            query_message_result.add_message(sbr);
        }
    }
    if query_message_result.buffer_total_size > 0 {
        break;
    }
    if last_query_msg_time < begin_timestamp {
        break;
    }
}

Improve error handling and justify loop limit.

  1. Error handling for look_message_by_offset and get_data_with_option should be improved to handle cases where these methods return None.
  2. The loop limit of 3 iterations seems arbitrary. Consider making this limit configurable or providing a justification for this choice.
for i in 1..3 {
    let mut query_offset_result = self.index_service.query_offset(
        topic,
        key,
        max_num,
        begin_timestamp,
        end_timestamp,
    );
    if query_offset_result.get_phy_offsets().is_empty() {
        break;
    }

    query_offset_result.get_phy_offsets_mut().sort();

    query_message_result.index_last_update_timestamp =
        query_offset_result.get_index_last_update_timestamp();
    query_message_result.index_last_update_phyoffset =
        query_offset_result.get_index_last_update_phyoffset();
    let phy_offsets = query_offset_result.get_phy_offsets();
    for m in 0..phy_offsets.len() {
        let offset = *phy_offsets.get(m).unwrap();
        let msg = self.look_message_by_offset(offset);
        if msg.is_none() {
            continue; // Handle None case
        }
        if m == 0 {
            last_query_msg_time = msg.as_ref().unwrap().store_timestamp;
        }
        let result = self.commit_log.get_data_with_option(offset, false);
        if let Some(sbr) = result {
            query_message_result.add_message(sbr);
        }
    }
    if query_message_result.buffer_total_size > 0 {
        break;
    }
    if last_query_msg_time < begin_timestamp {
        break;
    }
}

Comment on lines +1146 to +1157
async fn select_one_message_by_offset(
&self,
commit_log_offset: i64,
) -> Option<SelectMappedBufferResult> {
let sbr = self.commit_log.get_message(commit_log_offset, 4);
if let Some(sbr) = sbr {
let size = sbr.get_buffer().get_i32();
self.commit_log.get_message(commit_log_offset, size)
} else {
None
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimize message retrieval.

Fetching the message size separately and then fetching the complete message can be optimized by combining these operations.

async fn select_one_message_by_offset(
    &self,
    commit_log_offset: i64,
) -> Option<SelectMappedBufferResult> {
    if let Some(sbr) = self.commit_log.get_message(commit_log_offset, 4) {
        let size = sbr.get_buffer().get_i32();
        self.commit_log.get_message(commit_log_offset, size)
    } else {
        None
    }
}

Comment on lines +1166 to +1173
fn look_message_by_offset(&self, commit_log_offset: i64) -> Option<MessageExt> {
if let Some(sbr) = self.commit_log.get_message(commit_log_offset, 4) {
let size = sbr.get_buffer().get_i32();
self.look_message_by_offset_with_size(commit_log_offset, size)
} else {
None
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimize message retrieval.

Fetching the message size separately and then fetching the complete message can be optimized by combining these operations.

fn look_message_by_offset(&self, commit_log_offset: i64) -> Option<MessageExt> {
    if let Some(sbr) = self.commit_log.get_message(commit_log_offset, 4) {
        let size = sbr.get_buffer().get_i32();
        self.look_message_by_offset_with_size(commit_log_offset, size)
    } else {
        None
    }
}

@mxsm mxsm merged commit b6ecd9d into main Jul 15, 2024
19 checks passed
@mxsm mxsm deleted the feature-790 branch July 15, 2024 09:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature🚀] Support Query message(request code :12,33)
2 participants