Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #244]🚧Implement TopicConfigManager load method #245

Merged
merged 2 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rocketmq-broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ parking_lot.workspace = true
clap = { version = "4.2.7", features = ["derive"] }
rand = "0.8.5"

#tools
dirs.workspace = true

[[bin]]
name = "rocketmq-broker-rust"
path = "src/bin/broker_bootstrap_server.rs"
6 changes: 6 additions & 0 deletions rocketmq-broker/src/broker_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
pub trace_on: bool,
pub broker_permission: i8,
pub async_send_enable: bool,
pub store_path_root_dir: String,

Check warning on line 51 in rocketmq-broker/src/broker_config.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_config.rs#L51

Added line #L51 was not covered by tests
}

impl Default for BrokerConfig {
Expand All @@ -74,6 +75,11 @@
trace_on: true,
broker_permission: PermName::PERM_WRITE | PermName::PERM_READ,
async_send_enable: false,
store_path_root_dir: dirs::home_dir()

Check warning on line 78 in rocketmq-broker/src/broker_config.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_config.rs#L78

Added line #L78 was not covered by tests
.unwrap()
.join("store")
.to_string_lossy()
.into_owned(),
}
}
}
Expand Down
276 changes: 276 additions & 0 deletions rocketmq-broker/src/broker_path_config_helper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use std::path::PathBuf;

// Default broker config path
pub fn get_broker_config_path() -> String {
let mut path = dirs::home_dir().unwrap();
path.push("store");
path.push("config");
path.push("broker.properties");
path.to_string_lossy().into_owned()
}

// Topic config path
pub fn get_topic_config_path(root_dir: &str) -> String {
PathBuf::from(root_dir)
.join("config")
.join("topics.json")
.to_string_lossy()
.into_owned()
}

// Topic-queue mapping path
pub fn get_topic_queue_mapping_path(root_dir: &str) -> String {
PathBuf::from(root_dir)
.join("config")
.join("topicQueueMapping.json")
.to_string_lossy()
.into_owned()
}

// Consumer offset path
pub fn get_consumer_offset_path(root_dir: &str) -> String {
PathBuf::from(root_dir)
.join("config")
.join("consumerOffset.json")
.to_string_lossy()
.into_owned()
}

// Lmq consumer offset path
pub fn get_lmq_consumer_offset_path(root_dir: &str) -> String {
PathBuf::from(root_dir)
.join("config")
.join("lmqConsumerOffset.json")
.to_string_lossy()
.into_owned()
}

// Consumer order info path
pub fn get_consumer_order_info_path(root_dir: &str) -> String {
PathBuf::from(root_dir)
.join("config")
.join("consumerOrderInfo.json")
.to_string_lossy()
.into_owned()
}

// Subscription group path
pub fn get_subscription_group_path(root_dir: &str) -> String {
PathBuf::from(root_dir)
.join("config")
.join("subscriptionGroup.json")
.to_string_lossy()
.into_owned()
}

// Timer check path
pub fn get_timer_check_path(root_dir: &str) -> String {
PathBuf::from(root_dir)
.join("config")
.join("timercheck")
.to_string_lossy()
.into_owned()
}

// Timer metrics path
pub fn get_timer_metrics_path(root_dir: &str) -> String {
PathBuf::from(root_dir)
.join("config")
.join("timermetrics")
.to_string_lossy()
.into_owned()
}

// Transaction metrics path
pub fn get_transaction_metrics_path(root_dir: &str) -> String {
PathBuf::from(root_dir)
.join("config")
.join("transactionMetrics")
.to_string_lossy()
.into_owned()
}

// Consumer filter path
pub fn get_consumer_filter_path(root_dir: &str) -> String {
PathBuf::from(root_dir)
.join("config")
.join("consumerFilter.json")
.to_string_lossy()
.into_owned()
}

// Message request mode path
pub fn get_message_request_mode_path(root_dir: &str) -> String {
PathBuf::from(root_dir)
.join("config")
.join("messageRequestMode.json")
.to_string_lossy()
.into_owned()
}

#[cfg(test)]
mod test {
use std::path::PathBuf;

use super::*;

#[test]
fn test_get_broker_config_path() {
let path = get_broker_config_path();
let home_dir = dirs::home_dir().unwrap();
let mut path_ = PathBuf::from(home_dir);
path_.push("store");
path_.push("config");
path_.push("broker.properties");
assert_eq!(path, path_.to_string_lossy().into_owned());
}

#[test]
fn test_get_topic_config_path() {
let root_dir = PathBuf::from("/path/to/root")
.to_string_lossy()
.into_owned();
let path = get_topic_config_path(root_dir.as_str());
let expected_path = PathBuf::from(root_dir.clone())
.join("config")
.join("topics.json");
assert_eq!(path, expected_path.to_string_lossy().into_owned());
}

#[test]
fn test_get_topic_queue_mapping_path() {
let root_dir = PathBuf::from("/path/to/root")
.to_string_lossy()
.into_owned();
let path = get_topic_queue_mapping_path(root_dir.as_str());
let expected_path = PathBuf::from(root_dir.clone())
.join("config")
.join("topicQueueMapping.json");
assert_eq!(path, expected_path.to_string_lossy().into_owned());
}

#[test]
fn test_get_consumer_offset_path() {
let root_dir = PathBuf::from("/path/to/root")
.to_string_lossy()
.into_owned();
let path = get_consumer_offset_path(root_dir.as_str());
let expected_path = PathBuf::from(root_dir.clone())
.join("config")
.join("consumerOffset.json");
assert_eq!(path, expected_path.to_string_lossy().into_owned());
}

#[test]
fn test_get_lmq_consumer_offset_path() {
let root_dir = PathBuf::from("/path/to/root")
.to_string_lossy()
.into_owned();
let path = get_lmq_consumer_offset_path(root_dir.as_str());
let expected_path = PathBuf::from(root_dir.clone())
.join("config")
.join("lmqConsumerOffset.json");
assert_eq!(path, expected_path.to_string_lossy().into_owned());
}

#[test]
fn test_get_consumer_order_info_path() {
let root_dir = PathBuf::from("/path/to/root")
.to_string_lossy()
.into_owned();
let path = get_consumer_order_info_path(root_dir.as_str());
let expected_path = PathBuf::from(root_dir.clone())
.join("config")
.join("consumerOrderInfo.json");
assert_eq!(path, expected_path.to_string_lossy().into_owned());
}

#[test]
fn test_get_subscription_group_path() {
let root_dir = PathBuf::from("/path/to/root")
.to_string_lossy()
.into_owned();
let path = get_subscription_group_path(root_dir.as_str());
let expected_path = PathBuf::from(root_dir.clone())
.join("config")
.join("subscriptionGroup.json");
assert_eq!(path, expected_path.to_string_lossy().into_owned());
}

#[test]
fn test_get_timer_check_path() {
let root_dir = PathBuf::from("/path/to/root")
.to_string_lossy()
.into_owned();
let path = get_timer_check_path(root_dir.as_str());
let expected_path = PathBuf::from(root_dir.clone())
.join("config")
.join("timercheck");
assert_eq!(path, expected_path.to_string_lossy().into_owned());
}

#[test]
fn test_get_timer_metrics_path() {
let root_dir = PathBuf::from("/path/to/root")
.to_string_lossy()
.into_owned();
let path = get_timer_metrics_path(root_dir.as_str());
let expected_path = PathBuf::from(root_dir.clone())
.join("config")
.join("timermetrics");
assert_eq!(path, expected_path.to_string_lossy().into_owned());
}

#[test]
fn test_get_transaction_metrics_path() {
let root_dir = PathBuf::from("/path/to/root")
.to_string_lossy()
.into_owned();
let path = get_transaction_metrics_path(root_dir.as_str());
let expected_path = PathBuf::from(root_dir.clone())
.join("config")
.join("transactionMetrics");
assert_eq!(path, expected_path.to_string_lossy().into_owned());
}

#[test]
fn test_get_consumer_filter_path() {
let root_dir = PathBuf::from("/path/to/root")
.to_string_lossy()
.into_owned();
let path = get_consumer_filter_path(root_dir.as_str());
let expected_path = PathBuf::from(root_dir.clone())
.join("config")
.join("consumerFilter.json");
assert_eq!(path, expected_path.to_string_lossy().into_owned());
}

#[test]
fn test_get_message_request_mode_path() {
let root_dir = PathBuf::from("/path/to/root")
.to_string_lossy()
.into_owned();
let path = get_message_request_mode_path(root_dir.as_str());
let expected_path = PathBuf::from(root_dir.clone())
.join("config")
.join("messageRequestMode.json");
assert_eq!(path, expected_path.to_string_lossy().into_owned());
}
}
4 changes: 2 additions & 2 deletions rocketmq-broker/src/filter/manager/consumer_filter_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
todo!()
}

fn config_file_path(&mut self) -> &str {
""
fn config_file_path(&mut self) -> String {
"".to_string()

Check warning on line 35 in rocketmq-broker/src/filter/manager/consumer_filter_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/filter/manager/consumer_filter_manager.rs#L34-L35

Added lines #L34 - L35 were not covered by tests
}

fn encode(&mut self) -> String {
Expand Down
1 change: 1 addition & 0 deletions rocketmq-broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod command;

pub mod broker_config;
mod broker_outer_api;
mod broker_path_config_helper;
mod client;
mod coldctr;
mod controller;
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-broker/src/offset/manager/consumer_offset_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
todo!()
}

fn config_file_path(&mut self) -> &str {
""
fn config_file_path(&mut self) -> String {
"".to_string()

Check warning on line 35 in rocketmq-broker/src/offset/manager/consumer_offset_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_offset_manager.rs#L34-L35

Added lines #L34 - L35 were not covered by tests
}

fn encode(&mut self) -> String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
todo!()
}

fn config_file_path(&mut self) -> &str {
""
fn config_file_path(&mut self) -> String {
"".to_string()

Check warning on line 35 in rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs#L34-L35

Added lines #L34 - L35 were not covered by tests
}

fn encode(&mut self) -> String {
Expand Down
6 changes: 2 additions & 4 deletions rocketmq-broker/src/processor/send_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@
message_store: Arc<parking_lot::RwLock<MS>>,
}

impl<MS> Default for SendMessageProcessor<MS> {
impl<MS: Default> Default for SendMessageProcessor<MS> {
fn default() -> Self {
/* #[cfg(feature = "local_file_store")]
Self {
inner: SendMessageProcessorInner::default(),
topic_queue_mapping_manager: Arc::new(parking_lot::RwLock::new(
Expand All @@ -69,8 +68,7 @@
topic_config_manager: Arc::new(parking_lot::RwLock::new(TopicConfigManager::default())),
broker_config: Arc::new(parking_lot::RwLock::new(BrokerConfig::default())),
message_store: Arc::new(parking_lot::RwLock::new(Default::default())),
}*/
unimplemented!()
}

Check warning on line 71 in rocketmq-broker/src/processor/send_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/send_message_processor.rs#L71

Added line #L71 was not covered by tests
}
}

Expand Down
4 changes: 2 additions & 2 deletions rocketmq-broker/src/schedule/schedule_message_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
todo!()
}

fn config_file_path(&mut self) -> &str {
""
fn config_file_path(&mut self) -> String {
"".to_string()
}

Check warning on line 35 in rocketmq-broker/src/schedule/schedule_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/schedule/schedule_message_service.rs#L32-L35

Added lines #L32 - L35 were not covered by tests
fn encode(&mut self) -> String {
todo!()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ impl ConfigManager for SubscriptionGroupManager {
todo!()
}

fn config_file_path(&mut self) -> &str {
""
fn config_file_path(&mut self) -> String {
"".to_string()
}

fn encode(&mut self) -> String {
Expand Down
Loading
Loading