Skip to content

Commit

Permalink
[ISSUE #248]💥Implement ConfigManager decode method (#253)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Mar 8, 2024
1 parent e091b4d commit 9c3a0dd
Show file tree
Hide file tree
Showing 23 changed files with 973 additions and 363 deletions.
8 changes: 5 additions & 3 deletions rocketmq-broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ local_file_store = ["rocketmq-store/local_file_store"]
rocketmq-rust = { version = "0.2.0", path = "../rocketmq" }
rocketmq-common = { version = "0.2.0", path = "../rocketmq-common" }
rocketmq-remoting = { version = "0.2.0", path = "../rocketmq-remoting" }
rocketmq-store = { version = "0.2.0", path = "../rocketmq-store", default-features = true}
rocketmq-store = { version = "0.2.0", path = "../rocketmq-store", default-features = true }
rocketmq-filter = { version = "0.2.0", path = "../rocketmq-filter" }
rocketmq-macros = { version = "0.2.0", path = "../rocketmq-macros" }

anyhow.workspace = true
env_logger.workspace = true
Expand All @@ -38,8 +40,8 @@ serde_json.workspace = true

futures-core = "0.3.0"
futures-sink = "0.3.0"
futures-io = { version = "0.3.0"}
futures-util = { version = "0.3.0"}
futures-io = { version = "0.3.0" }
futures-util = { version = "0.3.0" }
futures = "0.3.29"
bytes = "1.5.0"
config.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion rocketmq-broker/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
* limitations under the License.
*/

pub(crate) mod manager;
mod consumer_filter_data;
pub(crate) mod manager;
84 changes: 84 additions & 0 deletions rocketmq-broker/src/filter/consumer_filter_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use rocketmq_filter::utils::bloom_filter_data::BloomFilterData;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct ConsumerFilterData {
consumer_group: String,
topic: String,
expression: String,
expression_type: String,
//compiled_expression: Expression,
born_time: u64,
dead_time: u64,
bloom_filter_data: BloomFilterData,
client_version: u64,
}

impl ConsumerFilterData {
pub fn consumer_group(&self) -> &str {
&self.consumer_group
}
pub fn topic(&self) -> &str {
&self.topic
}
pub fn expression(&self) -> &str {
&self.expression
}
pub fn expression_type(&self) -> &str {
&self.expression_type
}
pub fn born_time(&self) -> u64 {
self.born_time
}
pub fn dead_time(&self) -> u64 {
self.dead_time
}
pub fn bloom_filter_data(&self) -> &BloomFilterData {
&self.bloom_filter_data
}
pub fn client_version(&self) -> u64 {
self.client_version
}
pub fn set_consumer_group(&mut self, consumer_group: String) {
self.consumer_group = consumer_group;
}
pub fn set_topic(&mut self, topic: String) {
self.topic = topic;
}
pub fn set_expression(&mut self, expression: String) {
self.expression = expression;
}
pub fn set_expression_type(&mut self, expression_type: String) {
self.expression_type = expression_type;
}
pub fn set_born_time(&mut self, born_time: u64) {
self.born_time = born_time;
}
pub fn set_dead_time(&mut self, dead_time: u64) {
self.dead_time = dead_time;
}
pub fn set_bloom_filter_data(&mut self, bloom_filter_data: BloomFilterData) {
self.bloom_filter_data = bloom_filter_data;
}
pub fn set_client_version(&mut self, client_version: u64) {
self.client_version = client_version;
}
}

4 changes: 3 additions & 1 deletion rocketmq-broker/src/filter/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@
* limitations under the License.
*/

pub(crate) mod consumer_filter_manager;
pub(crate) mod consumer_filter_manager;
mod consumer_filter_wrapper;

36 changes: 36 additions & 0 deletions rocketmq-broker/src/filter/manager/consumer_filter_wrapper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashMap;

use rocketmq_remoting::protocol::DataVersion;
use serde::{Deserialize, Serialize};

use crate::filter::consumer_filter_data::ConsumerFilterData;

#[derive(Clone, Debug, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct ConsumerFilterWrapper {
filter_data_by_topic: HashMap<String /* Topic */, FilterDataMapByTopic>,
}

#[derive(Clone, Debug, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct FilterDataMapByTopic {
filter_data_map: HashMap<String /* consumer group */, ConsumerFilterData>,
topic: DataVersion,
}

8 changes: 5 additions & 3 deletions rocketmq-broker/src/offset/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
* limitations under the License.
*/

pub(crate) mod broadcast_offset_manager;
pub(crate) mod consumer_offset_manager;
pub(crate) mod consumer_order_info_manager;
pub(crate) mod broadcast_offset_manager;
pub(crate) mod consumer_offset_manager;
mod consumer_order_info_lock_manager;
pub(crate) mod consumer_order_info_manager;

97 changes: 60 additions & 37 deletions rocketmq-broker/src/offset/manager/consumer_offset_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,64 @@
* limitations under the License.
*/

use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

use rocketmq_common::common::config_manager::ConfigManager;

use crate::{broker_config::BrokerConfig, broker_path_config_helper::get_consumer_offset_path};

#[derive(Default)]
pub(crate) struct ConsumerOffsetManager {
pub(crate) broker_config: Arc<BrokerConfig>,
}

//Fully implemented will be removed
#[allow(unused_variables)]
impl ConfigManager for ConsumerOffsetManager {
fn decode0(&mut self, key: &[u8], body: &[u8]) {
todo!()
}

fn stop(&mut self) -> bool {
todo!()
}

fn config_file_path(&mut self) -> String {
get_consumer_offset_path(self.broker_config.store_path_root_dir.as_str())
}

fn encode(&mut self) -> String {
todo!()
}

fn encode_pretty(&mut self, pretty_format: bool) -> String {
todo!()
}

fn decode(&mut self, json_string: &str) {
todo!()
}
}
use rocketmq_common::common::config_manager::ConfigManager;
use rocketmq_remoting::protocol::DataVersion;
use serde::{Deserialize, Serialize};

use crate::{broker_config::BrokerConfig, broker_path_config_helper::get_consumer_offset_path};

#[derive(Default)]
pub(crate) struct ConsumerOffsetManager {
pub(crate) broker_config: Arc<BrokerConfig>,
consumer_offset_wrapper: ConsumerOffsetWrapper,
}

//Fully implemented will be removed
#[allow(unused_variables)]
impl ConfigManager for ConsumerOffsetManager {
fn decode0(&mut self, key: &[u8], body: &[u8]) {
todo!()
}

fn stop(&mut self) -> bool {
todo!()
}

fn config_file_path(&mut self) -> String {
get_consumer_offset_path(self.broker_config.store_path_root_dir.as_str())
}

fn encode(&mut self) -> String {
todo!()
}

fn encode_pretty(&mut self, pretty_format: bool) -> String {
todo!()
}

fn decode(&mut self, json_string: &str) {
if json_string.is_empty() {
return;
}
let wrapper =
serde_json::from_str::<ConsumerOffsetWrapper>(json_string).unwrap_or_default();
if !wrapper.offset_table.is_empty() {
self.consumer_offset_wrapper
.offset_table
.clone_from(&wrapper.offset_table);
self.consumer_offset_wrapper.data_version = wrapper.data_version.clone();
}
}
}

#[derive(Debug, Default, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
struct ConsumerOffsetWrapper {
data_version: DataVersion,
offset_table: HashMap<String /* topic@group */, HashMap<i32, i64>>,
reset_offset_table: HashMap<String, HashMap<i32, i64>>,
pull_offset_table: HashMap<String /* topic@group */, HashMap<i32, i64>>,
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use crate::offset::manager::consumer_order_info_manager::ConsumerOrderInfoWrapper;

pub struct ConsumerOrderInfoLockManager {}

impl ConsumerOrderInfoLockManager {
pub fn recover(&mut self, _consumer_order_info_wrapper: &ConsumerOrderInfoWrapper) {}
}

Loading

0 comments on commit 9c3a0dd

Please sign in to comment.