Skip to content

Commit

Permalink
#810 Optimize TopicConfigAndMappingSerializeWrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
847850277 committed Jul 20, 2024
1 parent c87bd24 commit 5267061
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 108 deletions.
10 changes: 8 additions & 2 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use rocketmq_common::ArcCellWrapper;
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_common::UtilAll::compute_next_morning_time_millis;
use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper;
use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigSerializeWrapper;
use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail;
use rocketmq_remoting::protocol::DataVersion;
Expand Down Expand Up @@ -925,7 +926,10 @@ impl BrokerRuntimeInner {
data_version: DataVersion,
) {
let mut serialize_wrapper = TopicConfigAndMappingSerializeWrapper {
data_version: Some(data_version),
topic_config_serialize_wrapper: TopicConfigSerializeWrapper {
data_version: data_version.clone(),
topic_config_table: Default::default(),
},

Check warning on line 932 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L929-L932

Added lines #L929 - L932 were not covered by tests
..Default::default()
};

Expand All @@ -947,7 +951,9 @@ impl BrokerRuntimeInner {
register_topic_config,
);
}
serialize_wrapper.topic_config_table = Some(topic_config_table);
serialize_wrapper

Check warning on line 954 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L954

Added line #L954 was not covered by tests
.topic_config_serialize_wrapper
.topic_config_table = topic_config_table;

Check warning on line 956 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L956

Added line #L956 was not covered by tests
let mut topic_queue_mapping_info_map = HashMap::new();
for topic_config in topic_config_list {
if let Some(ref value) = self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use rocketmq_remoting::code::response_code::ResponseCode;
use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::body::create_topic_list_request_body::CreateTopicListRequestBody;
use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper;
use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigSerializeWrapper;
use rocketmq_remoting::protocol::header::create_topic_request_header::CreateTopicRequestHeader;
use rocketmq_remoting::protocol::header::delete_topic_request_header::DeleteTopicRequestHeader;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
Expand Down Expand Up @@ -365,20 +366,20 @@ impl TopicRequestHandler {
.data_version
.lock()
.clone(),
topic_config_table: Some(
self.inner
.topic_config_manager
.topic_config_table()
.lock()
.clone(),
),
data_version: Some(
self.inner
topic_config_serialize_wrapper: TopicConfigSerializeWrapper {
data_version: self

Check warning on line 370 in rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs#L369-L370

Added lines #L369 - L370 were not covered by tests
.inner
.topic_config_manager
.data_version()
.as_ref()
.clone(),
),
topic_config_table: self

Check warning on line 376 in rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs#L376

Added line #L376 was not covered by tests
.inner
.topic_config_manager
.topic_config_table()
.lock()
.clone(),
},

Check warning on line 382 in rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs#L382

Added line #L382 was not covered by tests
..Default::default()
};
let content = topic_config_and_mapping_serialize_wrapper.to_json();
Expand Down
5 changes: 4 additions & 1 deletion rocketmq-broker/src/topic/manager/topic_config_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,10 @@ impl TopicConfigManager {
self.data_version.mut_from_ref().next_version();
}
TopicConfigAndMappingSerializeWrapper {
topic_config_table: Some(topic_config_table),
topic_config_serialize_wrapper: rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigSerializeWrapper {
topic_config_table: topic_config_table,
data_version: self.data_version.as_ref().clone(),
},

Check warning on line 240 in rocketmq-broker/src/topic/manager/topic_config_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/topic/manager/topic_config_manager.rs#L237-L240

Added lines #L237 - L240 were not covered by tests
topic_queue_mapping_info_map,
..TopicConfigAndMappingSerializeWrapper::default()
}
Expand Down
158 changes: 78 additions & 80 deletions rocketmq-namesrv/src/route/route_info_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ impl RouteInfoManager {
}
}
}

//impl register broker
impl RouteInfoManager {
pub fn register_broker(
Expand Down Expand Up @@ -161,9 +162,8 @@ impl RouteInfoManager {
if let Some(val) = self.broker_live_table.get(&addr_info_old) {
let old_state_version = val.data_version().state_version();
let new_state_version = topic_config_serialize_wrapper
.topic_config_serialize_wrapper
.data_version()
.as_ref()
.unwrap()
.state_version();
if old_state_version > new_state_version {
warn!(
Expand All @@ -186,15 +186,16 @@ impl RouteInfoManager {
}
}
}
let size = if let Some(val) = topic_config_serialize_wrapper.topic_config_table() {
val.len()
} else {
0
};
let size = topic_config_serialize_wrapper

Check warning on line 189 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L189

Added line #L189 was not covered by tests
.topic_config_serialize_wrapper
.topic_config_table()
.len();
if !broker_data.broker_addrs().contains_key(&broker_id) && size == 1 {
warn!(
"Can't register topicConfigWrapper={:?} because broker[{}]={} has not registered.",
topic_config_serialize_wrapper.topic_config_table(),
topic_config_serialize_wrapper
.topic_config_serialize_wrapper
.topic_config_table(),
broker_id,
broker_addr
);
Expand All @@ -214,79 +215,77 @@ impl RouteInfoManager {
let broker_data = broker_data.clone();
//handle master or prime slave topic config update
if is_master || is_prime_slave {
if let Some(tc_table) = topic_config_serialize_wrapper.topic_config_table() {
let topic_queue_mapping_info_map =
topic_config_serialize_wrapper.topic_queue_mapping_info_map();

// Delete the topics that don't exist in tcTable from the current broker
// Static topic is not supported currently
if self.namesrv_config.delete_topic_with_broker_registration
&& topic_queue_mapping_info_map.is_empty()
{
let old_topic_set = self.topic_set_of_broker_name(&broker_name);
let new_topic_set = tc_table
.keys()
.map(|item| item.to_string())
.collect::<HashSet<String>>();
let to_delete_topics = new_topic_set
.difference(&old_topic_set)
.map(|item| item.to_string())
.collect::<HashSet<String>>();
for to_delete_topic in to_delete_topics {
let queue_data_map = self.topic_queue_table.get_mut(&to_delete_topic);
if let Some(queue_data) = queue_data_map {
let removed_qd = queue_data.remove(&broker_name);
if let Some(ref removed_qd_inner) = removed_qd {
info!(
"broker[{}] delete topic[{}] queue[{:?}] because of master \
change",
broker_name, to_delete_topic, removed_qd_inner
);
}
if queue_data.is_empty() {
self.topic_queue_table.remove(&to_delete_topic);
}
let tc_table = topic_config_serialize_wrapper

Check warning on line 218 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L218

Added line #L218 was not covered by tests
.topic_config_serialize_wrapper
.topic_config_table();
let topic_queue_mapping_info_map =
topic_config_serialize_wrapper.topic_queue_mapping_info_map();

Check warning on line 222 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L222

Added line #L222 was not covered by tests

// Delete the topics that don't exist in tcTable from the current broker
// Static topic is not supported currently
if self.namesrv_config.delete_topic_with_broker_registration
&& topic_queue_mapping_info_map.is_empty()

Check warning on line 227 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L226-L227

Added lines #L226 - L227 were not covered by tests
{
let old_topic_set = self.topic_set_of_broker_name(&broker_name);
let new_topic_set = tc_table

Check warning on line 230 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L229-L230

Added lines #L229 - L230 were not covered by tests
.keys()
.map(|item| item.to_string())

Check warning on line 232 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L232

Added line #L232 was not covered by tests
.collect::<HashSet<String>>();
let to_delete_topics = new_topic_set

Check warning on line 234 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L234

Added line #L234 was not covered by tests
.difference(&old_topic_set)
.map(|item| item.to_string())

Check warning on line 236 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L236

Added line #L236 was not covered by tests
.collect::<HashSet<String>>();
for to_delete_topic in to_delete_topics {
let queue_data_map = self.topic_queue_table.get_mut(&to_delete_topic);
if let Some(queue_data) = queue_data_map {
let removed_qd = queue_data.remove(&broker_name);
if let Some(ref removed_qd_inner) = removed_qd {
info!(

Check warning on line 243 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L238-L243

Added lines #L238 - L243 were not covered by tests
"broker[{}] delete topic[{}] queue[{:?}] because of master change",
broker_name, to_delete_topic, removed_qd_inner
);
}
if queue_data.is_empty() {
self.topic_queue_table.remove(&to_delete_topic);

Check warning on line 249 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L248-L249

Added lines #L248 - L249 were not covered by tests
}
}
}
let default_data_version = DataVersion::default();
let data_version = topic_config_serialize_wrapper
.data_version()
.as_ref()
.unwrap_or(&default_data_version);
for topic_config in tc_table.values() {
let mut config = topic_config.clone();
if (register_first
|| self.is_topic_config_changed(
&cluster_name,
&broker_addr,
data_version,
&broker_name,
topic_config.topic_name.as_ref().unwrap(),
))
&& is_prime_slave
&& broker_data.enable_acting_master()
{
config.perm &= !PermName::PERM_WRITE;
}
self.create_and_update_queue_data(&broker_name, config);
}
if self.is_broker_topic_config_changed(&cluster_name, &broker_addr, data_version)
|| register_first
}
let data_version = topic_config_serialize_wrapper

Check warning on line 254 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L253-L254

Added lines #L253 - L254 were not covered by tests
.topic_config_serialize_wrapper
.data_version();
for topic_config in tc_table.values() {
let mut config = topic_config.clone();
if (register_first
|| self.is_topic_config_changed(
&cluster_name,
&broker_addr,

Check warning on line 262 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L257-L262

Added lines #L257 - L262 were not covered by tests
data_version,
&broker_name,
topic_config.topic_name.as_ref().unwrap(),

Check warning on line 265 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L264-L265

Added lines #L264 - L265 were not covered by tests
))
&& is_prime_slave
&& broker_data.enable_acting_master()

Check warning on line 268 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L267-L268

Added lines #L267 - L268 were not covered by tests
{
for (topic, vtq_info) in topic_queue_mapping_info_map {
if !self.topic_queue_mapping_info_table.contains_key(topic) {
self.topic_queue_mapping_info_table
.insert(topic.to_string(), HashMap::new());
}
config.perm &= !PermName::PERM_WRITE;

Check warning on line 270 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L270

Added line #L270 was not covered by tests
}
self.create_and_update_queue_data(&broker_name, config);
}
if self.is_broker_topic_config_changed(&cluster_name, &broker_addr, data_version)
|| register_first

Check warning on line 275 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L272-L275

Added lines #L272 - L275 were not covered by tests
{
for (topic, vtq_info) in topic_queue_mapping_info_map {
if !self.topic_queue_mapping_info_table.contains_key(topic) {

Check warning on line 278 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L277-L278

Added lines #L277 - L278 were not covered by tests
self.topic_queue_mapping_info_table
.get_mut(topic)
.unwrap()
.insert(
vtq_info.bname.as_ref().unwrap().to_string(),
vtq_info.clone(),
);
.insert(topic.to_string(), HashMap::new());

Check warning on line 280 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L280

Added line #L280 was not covered by tests
}
self.topic_queue_mapping_info_table

Check warning on line 282 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L282

Added line #L282 was not covered by tests
.get_mut(topic)
.unwrap()
.insert(
vtq_info.bname.as_ref().unwrap().to_string(),
vtq_info.clone(),
);

Check warning on line 288 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L286-L288

Added lines #L286 - L288 were not covered by tests
}
}
}
Expand All @@ -301,11 +300,10 @@ impl RouteInfoManager {
.expect("Time went backwards")
.as_millis() as i64,
DEFAULT_BROKER_CHANNEL_EXPIRED_TIME,
if let Some(data_version) = topic_config_serialize_wrapper.data_version() {
data_version.clone()
} else {
DataVersion::default()
},
topic_config_serialize_wrapper

Check warning on line 303 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L303

Added line #L303 was not covered by tests
.topic_config_serialize_wrapper
.data_version()
.clone(),

Check warning on line 306 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L306

Added line #L306 was not covered by tests
ha_server_addr.clone(),
remote_addr,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,16 @@ pub struct TopicConfigAndMappingSerializeWrapper {
#[serde(rename = "mappingDataVersion")]
pub mapping_data_version: DataVersion,

#[serde(rename = "topicConfigTable")]
pub topic_config_table: Option<HashMap<String, TopicConfig>>,
#[serde(flatten)]
pub topic_config_serialize_wrapper: TopicConfigSerializeWrapper,

Check warning on line 39 in rocketmq-remoting/src/protocol/body/topic_info_wrapper/topic_config_wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/body/topic_info_wrapper/topic_config_wrapper.rs#L39

Added line #L39 was not covered by tests
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TopicConfigSerializeWrapper {
#[serde(rename = "topicConfigTable")]
pub topic_config_table: HashMap<String, TopicConfig>,
#[serde(rename = "dataVersion")]
pub data_version: Option<DataVersion>,
pub data_version: DataVersion,
}

impl TopicConfigAndMappingSerializeWrapper {
Expand All @@ -54,12 +59,17 @@ impl TopicConfigAndMappingSerializeWrapper {
pub fn mapping_data_version(&self) -> &DataVersion {
&self.mapping_data_version
}
pub fn topic_config_serialize_wrapper(&self) -> &TopicConfigSerializeWrapper {
&self.topic_config_serialize_wrapper
}
}

pub fn topic_config_table(&self) -> &Option<HashMap<String, TopicConfig>> {
impl TopicConfigSerializeWrapper {
pub fn topic_config_table(&self) -> &HashMap<String, TopicConfig> {
&self.topic_config_table
}

pub fn data_version(&self) -> &Option<DataVersion> {
pub fn data_version(&self) -> &DataVersion {
&self.data_version
}
}
Expand All @@ -70,8 +80,16 @@ impl Default for TopicConfigAndMappingSerializeWrapper {
topic_queue_mapping_info_map: HashMap::new(),
topic_queue_mapping_detail_map: HashMap::new(),
mapping_data_version: DataVersion::new(),
topic_config_table: None,
data_version: None,
topic_config_serialize_wrapper: TopicConfigSerializeWrapper::default(),
}

Check warning on line 84 in rocketmq-remoting/src/protocol/body/topic_info_wrapper/topic_config_wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/body/topic_info_wrapper/topic_config_wrapper.rs#L84

Added line #L84 was not covered by tests
}
}

impl Default for TopicConfigSerializeWrapper {
fn default() -> Self {
Self {
topic_config_table: HashMap::new(),
data_version: DataVersion::new(),
}
}
}
Expand All @@ -87,9 +105,18 @@ mod tests {
let wrapper = TopicConfigAndMappingSerializeWrapper::default();
assert!(wrapper.topic_queue_mapping_info_map.is_empty());
assert!(wrapper.topic_queue_mapping_detail_map.is_empty());
//assert_eq!(wrapper.mapping_data_version, DataVersion::new());
assert!(wrapper.topic_config_table.is_none());
assert!(wrapper.data_version.is_none());
assert_eq!(wrapper.mapping_data_version, DataVersion::new());
assert_eq!(
wrapper
.topic_config_serialize_wrapper()
.topic_config_table()
.is_empty(),
true
);
assert_eq!(
wrapper.topic_config_serialize_wrapper().data_version(),
&DataVersion::new()
);
}

#[test]
Expand All @@ -100,8 +127,8 @@ mod tests {
let topic_queue_mapping_detail = TopicQueueMappingDetail::default();
let data_version = DataVersion::default();

wrapper.topic_config_table = Some(HashMap::new());
wrapper.data_version = Some(data_version.clone());
let topic_config_serialize_wrapper = TopicConfigSerializeWrapper::default();
wrapper.topic_config_serialize_wrapper = topic_config_serialize_wrapper.clone();
wrapper
.topic_queue_mapping_info_map
.insert("test".to_string(), topic_queue_mapping_info.clone());
Expand All @@ -117,8 +144,10 @@ mod tests {
wrapper.topic_queue_mapping_detail_map(),
&HashMap::from([("test".to_string(), topic_queue_mapping_detail)])
);
//assert_eq!(wrapper.mapping_data_version(), &DataVersion::new());
assert_eq!(wrapper.topic_config_table(), &Some(HashMap::new()));
assert_eq!(wrapper.data_version(), &Some(data_version));
assert_eq!(wrapper.mapping_data_version(), &data_version);
assert_eq!(
wrapper.topic_config_serialize_wrapper(),
&topic_config_serialize_wrapper
);
}
}

0 comments on commit 5267061

Please sign in to comment.