Skip to content

Commit

Permalink
[ISSUE #908]Implement mq producer other methods-1 (#911)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Aug 19, 2024
1 parent 2d1e9a7 commit d2b5e0a
Show file tree
Hide file tree
Showing 9 changed files with 634 additions and 124 deletions.
12 changes: 12 additions & 0 deletions rocketmq-client/src/base/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::utils::name_server_address_utils::NameServerAddressUtils;
use rocketmq_common::utils::name_server_address_utils::NAMESRV_ENDPOINT_PATTERN;
use rocketmq_common::utils::network_util::NetworkUtil;
Expand Down Expand Up @@ -135,6 +136,17 @@ impl ClientConfig {
)
}

pub fn queue_with_namespace(&mut self, queue: &mut MessageQueue) {
if let Some(namespace) = self.get_namespace() {
if !namespace.is_empty() {
queue.set_topic(NamespaceUtil::wrap_namespace(
namespace.as_str(),
queue.get_topic(),
));
}
}
}

pub fn get_namespace(&mut self) -> Option<String> {
let namespace_initialized = self.namespace_initialized.load(Ordering::Acquire);
if namespace_initialized {
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-client/src/factory/mq_client_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub struct MQClientInstance {
*/
admin_ext_table: Arc<RwLock<HashMap<String, Box<dyn MQAdminExtInner>>>>,
mq_client_api_impl: ArcRefCellWrapper<MQClientAPIImpl>,
mq_admin_impl: Arc<MQAdminImpl>,
pub(crate) mq_admin_impl: ArcRefCellWrapper<MQAdminImpl>,
topic_route_table: Arc<RwLock<HashMap<String /* Topic */, TopicRouteData>>>,
topic_end_points_table:
Arc<RwLock<HashMap<String /* Topic */, HashMap<MessageQueue, String /* brokerName */>>>>,
Expand Down Expand Up @@ -122,7 +122,7 @@ impl MQClientInstance {
consumer_table: Arc::new(Default::default()),
admin_ext_table: Arc::new(Default::default()),
mq_client_api_impl,
mq_admin_impl: Arc::new(MQAdminImpl {}),
mq_admin_impl: ArcRefCellWrapper::new(MQAdminImpl {}),
topic_route_table: Arc::new(Default::default()),
topic_end_points_table: Arc::new(Default::default()),
lock_namesrv: Default::default(),
Expand Down
38 changes: 38 additions & 0 deletions rocketmq-client/src/implementation/mq_admin_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,42 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;

use crate::base::client_config::ClientConfig;

pub struct MQAdminImpl {}

impl MQAdminImpl {
pub fn new() -> Self {
MQAdminImpl {}
}
}

impl MQAdminImpl {
pub fn parse_publish_message_queues(
&mut self,
message_queue_array: &[MessageQueue],
client_config: &mut ClientConfig,
) -> Vec<MessageQueue> {
let mut message_queues = Vec::new();
for message_queue in message_queue_array {
let user_topic = NamespaceUtil::without_namespace_with_namespace(
message_queue.get_topic(),
client_config
.get_namespace()
.unwrap_or("".to_string())
.as_str(),
);

let message_queue = MessageQueue::from_parts(
user_topic,
message_queue.get_broker_name(),
message_queue.get_queue_id(),
);
message_queues.push(message_queue);
}
message_queues
}
}
Loading

0 comments on commit d2b5e0a

Please sign in to comment.