-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #790]🚀Support Query message(request code :12,33)🔥 #791
Conversation
WalkthroughThe update introduces Changes
Assessment against linked issues
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
Outside diff range, codebase verification and nitpick comments (7)
rocketmq-remoting/src/protocol/header.rs (2)
33-34
: Add documentation for the new modules.Consider adding documentation comments for the new modules
query_message_request_header
andquery_message_response_header
to improve code readability and maintainability.
38-39
: Add documentation for the new modules.Consider adding documentation comments for the new modules
view_message_request_header
andview_message_response_header
to improve code readability and maintainability.rocketmq-store/src/base.rs (1)
31-31
: Add documentation for the new module.Consider adding a documentation comment for the new module
query_message_result
to improve code readability and maintainability.rocketmq-store/src/base/query_message_result.rs (3)
22-28
: Add documentation for the struct and its fields.Consider adding documentation comments for the
QueryMessageResult
struct and its fields to improve code readability and maintainability.
30-43
: Add documentation for the method.Consider adding a documentation comment for the
get_message_data
method to improve code readability and maintainability.
45-48
: Add documentation for the method.Consider adding a documentation comment for the
add_message
method to improve code readability and maintainability.rocketmq-store/src/index/index_service.rs (1)
Line range hint
134-167
:
Consider adding comments for readability.Adding comments to explain the logic within the
query_offset
function would improve readability and maintainability.
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files selected for processing (16)
- rocketmq-broker/src/broker_runtime.rs (4 hunks)
- rocketmq-broker/src/processor.rs (2 hunks)
- rocketmq-broker/src/processor/query_message_processor.rs (1 hunks)
- rocketmq-remoting/src/protocol/header.rs (1 hunks)
- rocketmq-remoting/src/protocol/header/query_message_request_header.rs (1 hunks)
- rocketmq-remoting/src/protocol/header/query_message_response_header.rs (1 hunks)
- rocketmq-remoting/src/protocol/header/view_message_request_header.rs (1 hunks)
- rocketmq-remoting/src/protocol/header/view_message_response_header.rs (1 hunks)
- rocketmq-remoting/src/rpc/topic_request_header.rs (1 hunks)
- rocketmq-store/src/base.rs (1 hunks)
- rocketmq-store/src/base/query_message_result.rs (1 hunks)
- rocketmq-store/src/base/select_result.rs (2 hunks)
- rocketmq-store/src/index/index_service.rs (2 hunks)
- rocketmq-store/src/index/query_offset_result.rs (1 hunks)
- rocketmq-store/src/log_file.rs (2 hunks)
- rocketmq-store/src/message_store/default_message_store.rs (4 hunks)
Additional comments not posted (31)
rocketmq-remoting/src/protocol/header/view_message_response_header.rs (2)
1-16
: Licensing Information Approved.The licensing information is standard and complies with ASF requirements.
17-22
: Struct Definition Approved.The
ViewMessageResponseHeader
struct is correctly defined and uses appropriate traits.rocketmq-remoting/src/protocol/header/view_message_request_header.rs (2)
1-16
: Licensing Information Approved.The licensing information is standard and complies with ASF requirements.
17-25
: Struct Definition Approved.The
ViewMessageRequestHeader
struct is correctly defined and uses appropriate traits.rocketmq-remoting/src/protocol/header/query_message_response_header.rs (2)
1-16
: Licensing Information Approved.The licensing information is standard and complies with ASF requirements.
17-26
: Struct Definition Approved.The
QueryMessageResponseHeader
struct is correctly defined and uses appropriate traits.rocketmq-store/src/index/query_offset_result.rs (2)
Line range hint
1-10
: Licensing Information Approved.The licensing information is standard and complies with ASF requirements.
Line range hint
12-43
: Struct Definition and Method Approved.The
QueryOffsetResult
struct is correctly defined and the new methodget_phy_offsets_mut
is appropriately implemented.rocketmq-store/src/base/select_result.rs (2)
46-50
: Approve theget_buffer_slice_mut
method.The method correctly retrieves a mutable slice of the buffer from the mapped file. Ensure that the mapped file is not null and the buffer access is within bounds.
52-57
: Approve theget_bytes
method.The method correctly converts the buffer to a
Bytes
object if the size is greater than 0 and the mapped file is not null. Ensure that the buffer conversion is handled properly.rocketmq-store/src/log_file.rs (5)
124-131
: Approve thequery_message
method.The method correctly handles querying messages based on the provided topic, key, max number, and timestamp range. Ensure that the query parameters are validated and the query logic is correct.
133-136
: Approve theselect_one_message_by_offset
method.The method correctly handles selecting a single message by its offset. Ensure that the offset is validated and the selection logic is correct.
138-142
: Approve theselect_one_message_by_offset_with_size
method.The method correctly handles selecting a single message by its offset and size. Ensure that the offset and size are validated and the selection logic is correct.
144-144
: Approve thelook_message_by_offset
method.The method correctly handles looking up a message by its offset. Ensure that the offset is validated and the lookup logic is correct.
146-150
: Approve thelook_message_by_offset_with_size
method.The method correctly handles looking up a message by its offset and size. Ensure that the offset and size are validated and the lookup logic is correct.
rocketmq-broker/src/processor/query_message_processor.rs (5)
32-35
: Approve theQueryMessageProcessor
struct.The struct is correctly defined and the fields are properly initialized. Ensure that the generic type
MS
is correctly constrained.
38-43
: Approve thenew
method.The method correctly initializes a new instance of
QueryMessageProcessor
with the providedmessage_store_config
andmessage_store
.
50-62
: Approve theprocess_request
method.The method correctly handles processing requests based on the request code. Ensure that the request code is validated and the request processing logic is correct.
64-110
: Approve thequery_message
method.The method correctly handles querying messages based on the request parameters. Ensure that the query parameters are validated and the query logic is correct.
113-138
: Approve theview_message_by_id
method.The method correctly handles viewing a message by its ID based on the request parameters. Ensure that the request parameters are validated and the view logic is correct.
rocketmq-remoting/src/protocol/header/query_message_request_header.rs (6)
24-38
: Approve theQueryMessageRequestHeader
struct.The struct is correctly defined and the fields are properly initialized. Ensure that the nested
TopicRequestHeader
is correctly handled.
55-75
: Approve theto_map
method.The method correctly handles converting the struct to a map. Ensure that all fields are included in the map and the conversion logic is correct.
78-91
: Approve thefrom
method.The method correctly handles creating an instance of the struct from a map. Ensure that all required fields are present in the map and the creation logic is correct.
100-115
: Approve thecreating_from_map_with_all_fields_populates_struct_correctly
test.The test correctly verifies creating an instance of the struct from a map with all fields populated. Ensure that all field values are correctly checked.
118-131
: Approve thecreating_from_map_missing_optional_fields_still_succeeds
test.The test correctly verifies creating an instance of the struct from a map with missing optional fields. Ensure that all required fields are correctly checked.
133-143
: Approve thecreating_from_map_with_invalid_number_fields_returns_none
test.The test correctly verifies creating an instance of the struct from a map with invalid number fields. Ensure that all invalid field values are correctly checked.
rocketmq-store/src/index/index_service.rs (1)
134-137
: Good use of&str
for parameters.Changing
topic
andkey
to&str
improves performance by avoiding unnecessary allocations.rocketmq-broker/src/processor.rs (1)
165-169
: Good addition for handling new request codes.The handling of
RequestCode::QueryMessage
andRequestCode::ViewMessageById
is correctly added.rocketmq-broker/src/broker_runtime.rs (1)
434-435
: Correct initialization ofQueryMessageProcessor
.The
QueryMessageProcessor
is correctly initialized with the necessary configurations.rocketmq-store/src/message_store/default_message_store.rs (2)
1158-1164
: LGTM!The method is correctly implemented.
1175-1190
: LGTM!The method is correctly implemented.
if let Some(value) = self.rpc_request_header.as_ref() { | ||
if let Some(rpc_map) = value.to_map() { | ||
map.extend(rpc_map); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve error handling in to_map
method.
Consider adding error handling for potential issues when converting rpc_request_header
to a map. This can help prevent unexpected behavior if the conversion fails.
if let Some(value) = self.rpc_request_header.as_ref() {
if let Some(rpc_map) = value.to_map() {
map.extend(rpc_map);
} else {
// Handle the error case where rpc_map is None
// e.g., log an error or return an error result
}
}
async fn query_message( | ||
&self, | ||
topic: &str, | ||
key: &str, | ||
max_num: i32, | ||
begin_timestamp: i64, | ||
end_timestamp: i64, | ||
) -> Option<QueryMessageResult> { | ||
let mut query_message_result = QueryMessageResult::default(); | ||
let mut last_query_msg_time = end_timestamp; | ||
for i in 1..3 { | ||
let mut query_offset_result = self.index_service.query_offset( | ||
topic, | ||
key, | ||
max_num, | ||
begin_timestamp, | ||
end_timestamp, | ||
); | ||
if query_offset_result.get_phy_offsets().is_empty() { | ||
break; | ||
} | ||
|
||
query_offset_result.get_phy_offsets_mut().sort(); | ||
|
||
query_message_result.index_last_update_timestamp = | ||
query_offset_result.get_index_last_update_timestamp(); | ||
query_message_result.index_last_update_phyoffset = | ||
query_offset_result.get_index_last_update_phyoffset(); | ||
let phy_offsets = query_offset_result.get_phy_offsets(); | ||
for m in 0..phy_offsets.len() { | ||
let offset = *phy_offsets.get(m).unwrap(); | ||
let msg = self.look_message_by_offset(offset); | ||
if m == 0 { | ||
last_query_msg_time = msg.as_ref().unwrap().store_timestamp; | ||
} | ||
let result = self.commit_log.get_data_with_option(offset, false); | ||
if let Some(sbr) = result { | ||
query_message_result.add_message(sbr); | ||
} | ||
} | ||
if query_message_result.buffer_total_size > 0 { | ||
break; | ||
} | ||
if last_query_msg_time < begin_timestamp { | ||
break; | ||
} | ||
} | ||
|
||
Some(query_message_result) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve error handling and justify loop limit.
- Error handling for
look_message_by_offset
andget_data_with_option
should be improved to handle cases where these methods returnNone
. - The loop limit of 3 iterations seems arbitrary. Consider making this limit configurable or providing a justification for this choice.
for i in 1..3 {
let mut query_offset_result = self.index_service.query_offset(
topic,
key,
max_num,
begin_timestamp,
end_timestamp,
);
if query_offset_result.get_phy_offsets().is_empty() {
break;
}
query_offset_result.get_phy_offsets_mut().sort();
query_message_result.index_last_update_timestamp =
query_offset_result.get_index_last_update_timestamp();
query_message_result.index_last_update_phyoffset =
query_offset_result.get_index_last_update_phyoffset();
let phy_offsets = query_offset_result.get_phy_offsets();
for m in 0..phy_offsets.len() {
let offset = *phy_offsets.get(m).unwrap();
let msg = self.look_message_by_offset(offset);
if msg.is_none() {
continue; // Handle None case
}
if m == 0 {
last_query_msg_time = msg.as_ref().unwrap().store_timestamp;
}
let result = self.commit_log.get_data_with_option(offset, false);
if let Some(sbr) = result {
query_message_result.add_message(sbr);
}
}
if query_message_result.buffer_total_size > 0 {
break;
}
if last_query_msg_time < begin_timestamp {
break;
}
}
Improve error handling and justify loop limit.
- Error handling for
look_message_by_offset
andget_data_with_option
should be improved to handle cases where these methods returnNone
. - The loop limit of 3 iterations seems arbitrary. Consider making this limit configurable or providing a justification for this choice.
for i in 1..3 {
let mut query_offset_result = self.index_service.query_offset(
topic,
key,
max_num,
begin_timestamp,
end_timestamp,
);
if query_offset_result.get_phy_offsets().is_empty() {
break;
}
query_offset_result.get_phy_offsets_mut().sort();
query_message_result.index_last_update_timestamp =
query_offset_result.get_index_last_update_timestamp();
query_message_result.index_last_update_phyoffset =
query_offset_result.get_index_last_update_phyoffset();
let phy_offsets = query_offset_result.get_phy_offsets();
for m in 0..phy_offsets.len() {
let offset = *phy_offsets.get(m).unwrap();
let msg = self.look_message_by_offset(offset);
if msg.is_none() {
continue; // Handle None case
}
if m == 0 {
last_query_msg_time = msg.as_ref().unwrap().store_timestamp;
}
let result = self.commit_log.get_data_with_option(offset, false);
if let Some(sbr) = result {
query_message_result.add_message(sbr);
}
}
if query_message_result.buffer_total_size > 0 {
break;
}
if last_query_msg_time < begin_timestamp {
break;
}
}
async fn select_one_message_by_offset( | ||
&self, | ||
commit_log_offset: i64, | ||
) -> Option<SelectMappedBufferResult> { | ||
let sbr = self.commit_log.get_message(commit_log_offset, 4); | ||
if let Some(sbr) = sbr { | ||
let size = sbr.get_buffer().get_i32(); | ||
self.commit_log.get_message(commit_log_offset, size) | ||
} else { | ||
None | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimize message retrieval.
Fetching the message size separately and then fetching the complete message can be optimized by combining these operations.
async fn select_one_message_by_offset(
&self,
commit_log_offset: i64,
) -> Option<SelectMappedBufferResult> {
if let Some(sbr) = self.commit_log.get_message(commit_log_offset, 4) {
let size = sbr.get_buffer().get_i32();
self.commit_log.get_message(commit_log_offset, size)
} else {
None
}
}
fn look_message_by_offset(&self, commit_log_offset: i64) -> Option<MessageExt> { | ||
if let Some(sbr) = self.commit_log.get_message(commit_log_offset, 4) { | ||
let size = sbr.get_buffer().get_i32(); | ||
self.look_message_by_offset_with_size(commit_log_offset, size) | ||
} else { | ||
None | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimize message retrieval.
Fetching the message size separately and then fetching the complete message can be optimized by combining these operations.
fn look_message_by_offset(&self, commit_log_offset: i64) -> Option<MessageExt> {
if let Some(sbr) = self.commit_log.get_message(commit_log_offset, 4) {
let size = sbr.get_buffer().get_i32();
self.look_message_by_offset_with_size(commit_log_offset, size)
} else {
None
}
}
Which Issue(s) This PR Fixes(Closes)
Fixes #790
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
QueryMessageProcessor
to enhance message querying and viewing functionalities.Enhancements
QueryMessageProcessor
with generic types.QueryOffsetResult
to include mutable access to physical offsets.Bug Fixes
query_offset
to improve query efficiency.Code Improvements