-
Notifications
You must be signed in to change notification settings - Fork 59
amop
maggie edited this page Sep 15, 2020
·
1 revision
- 模块介绍
- 重构任务
- 重构设计
- 接口说明
-
主要功能
-
话题订阅,话题相关消息推送的功能
- 单拨: 随机发给一个订阅方
- 组播: 广播给所有订阅方 (AMOP_MULBROADCAST)
-
话题认证。在普通的配置下,任何一个监听了某topic的接收者都能接受到发送者推送的消息。但在某些场景下,发送者只希望特定的接收者能接收到消息,不希望无关的接收者能任意的监听此topic。在此场景下,需要使用Topic认证功能。
-
-
主要处理的消息类型
AMOP_REQUEST(0x30), // type for request from sdk
AMOP_RESPONSE(0x31), // type for response to sdk
AMOP_MULBROADCAST(0x35), // type for mult broadcast
AMOP_CLIENT_TOPICS(0x32), // type for topic request
REQUEST_TOPICCERT(0x37), // type request verify
UPDATE_TOPIICSTATUS(0x38), // type for update status
- 适配新的Network和Channel模块
- 新增Topic动态更新功能。认证Topic能够再运行过程中动态更新。
- 解决Topic长度受限的问题。Channel协议长度字段只有1为,Topic长度JSON序列化之后不超过255字节。
- 优雅关闭,代码优化
toml配置方式:
# Configure a "need verify AMOP topic" as a topic message sender.
[[amop]]
topicName = "PrivateTopic1"
# Public keys of the nodes that you want to send AMOP message of this topic to.
publicKeys = [ "conf/amop/consumer_public_key_1.pem" ]
# Configure a "need verify AMOP topic" as a topic subscriber.
[[amop]]
topicName = "PrivateTopic2"
# Your private key that used to subscriber verification.
privateKey = "conf/amop/consumer_private_key.p12"
password = "123456"
重构前:SDK用户需要在Service启动之前设置好订阅的Topics,SDK的启动后,会向节点同步自己订阅的Topics。不支持Topic动态更新。
重构方法: 新增3个添加Topic的接口、一个移除Topic的接口
public interface Amop {
void subscribeTopic(String topicName, AmopCallback callback); //订阅话题
void subscribePrivateTopics(
String topicName, KeyManager privateKeyManager, AmopCallback callback);//订阅私有话题
void publishPrivateTopic(String topicName, List<KeyManager> publicKeyManagers);//设置私有话题
void unsubscribeTopic(String topicName);//取消订阅
void sendAmopMsg(AmopMsgOut content, ResponseCallback callback);//发送AMOP消息
void broadcastAmopMsg(AmopMsgOut content);//广播AMOP消息
Set<String> getSubTopics();//获取订阅的消息
void setCallback(AmopCallback cb);//设置回调函数
void start();
void stop();
}
新增话题后,会触发syncTopicsToNode()方法,将话题与节点同步。
问题:用户添加了认证话题后,是否要将该配置写入配置文件?
- 不写入配置文件,则SDK重启后话题无法继续。
- 写入配置文件,则需要自动生成配置项,写入原配置文件中。
重构前:
- AMOP消息格式如下,其中topic length是固定的1字节长度,那么这就限制了Topic的长度最长为255字节。
-
我们会在用户定义的TopicName前加上一些标注,和一个UUID标注认证话题的一个订阅房。如下所示。一个Topic的长度在69+。
public static final String verifyChannelPrefix = "#!$VerifyChannel_"; public static final String pushChannelPrefix = "#!$PushChannel_"; public static final String topicNeedVerifyPrefix = "#!$TopicNeedVerify_";
String topicName = "TopicName"; StringBuilder topicString =new StringBuilder(); stringBuilder.append(verifyChannelPrefix); stringBuilder.append(topicNeedVerifyPrefix); stringBuilder.append(topicName) stringBuilder.append('_'); stringBuilder.append(UUID.randomUUID().toString().replaceAll("-", ""));
当发送一个更新Topics的消息时,Topic字段需要带上节点所订阅的所有主题。如果这样,可能4个Topic就已经用完了255字节。
这种情况,需要修改协议。
重构方案:
-
消息格式:采用普遍消息的消息格式。
其中Data部分包含内容:
{ topic: Topic1_#!$VerifyChannel_#!$TopicNeedVerify_Topic2_e0d3184269914257986648e63df5ce07 }
删除topic lengh字段。
-
虽然SDK新增了“动态订阅”功能,但每次发送订阅消息时,仍然需要传输全量的Topic,以减少节点方面对AMOP功能的修改。(否则,节点可能需要新增一个更复杂的AMOP管理模块)
AMOP配置:
# Configure a "need verify AMOP topic" as a topic message sender.
[[amop]]
topicName = "PrivateTopic1"
# Public keys of the nodes that you want to send AMOP message of this topic to.
publicKeys = [ "conf/amop/consumer_public_key_1.pem" ]
# Configure a "need verify AMOP topic" as a topic subscriber.
[[amop]]
topicName = "PrivateTopic2"
# Your private key that used to subscriber verification.
privateKey = "conf/amop/consumer_private_key.p12"
password = "123456"
AMOP 接口:
/*
* Copyright 2014-2020 [fisco-dev]
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.fisco.bcos.sdk.amop;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.fisco.bcos.sdk.amop.topic.TopicManager;
import org.fisco.bcos.sdk.channel.Channel;
import org.fisco.bcos.sdk.channel.ResponseCallback;
import org.fisco.bcos.sdk.config.ConfigOption;
import org.fisco.bcos.sdk.crypto.keystore.KeyManager;
/**
* AMOP module interface.
*
* @author Maggie
*/
public interface Amop {
/**
* Create a Amop object.
*
* @param channel
* @param config
* @return Amop instance
*/
static Amop build(Channel channel, ConfigOption config) {
return new AmopImp(channel, config);
}
/**
* Subscribe a normal topic.
*
* @param topicName
* @param callback callback is called when receive a msg relate to this topic
*/
void subscribeTopic(String topicName, AmopCallback callback);
/**
* Subscribe a private topic which need verify.
*
* @param topicName
* @param privateKeyManager the private key you used to prove your identity.
* @param callback callback is called when receive a msg relate to this topic
*/
void subscribePrivateTopics(
String topicName, KeyManager privateKeyManager, AmopCallback callback);
/**
* Config a topic which is need verification, after that user can send message to verified
* subscriber.
*
* @param topicName
* @param publicKeyManagers the public keys of the target organizations that you want to
* communicate with
*/
void publishPrivateTopic(String topicName, List<KeyManager> publicKeyManagers);
/**
* Unsubscribe a topic.
*
* @param topicName
*/
void unsubscribeTopic(String topicName);
/**
* Send amop msg
*
* @param content
* @param callback
*/
void sendAmopMsg(AmopMsgOut content, ResponseCallback callback);
/**
* Send amop msg
*
* @param content
*/
void broadcastAmopMsg(AmopMsgOut content);
/**
* Get all subscribe topics.
*
* @return topic name list
*/
Set<String> getSubTopics();
/**
* Get list of subscribers to a topic
*
* @param topicName the topic you want to query
* @return List of subscribers
*/
List<String> getTopicSubscribers(String topicName);
/**
* set amop default callback
*
* @param cb the amop callback
*/
void setCallback(AmopCallback cb);
/** Start. */
void start();
/** Stop. */
void stop();
/** If configured private topic, wait until finish verify */
void waitFinishPrivateTopicVerify();
/**
* generate message sequence string
*
* @return Sequence string
*/
static String newSeq() {
String seq = UUID.randomUUID().toString().replaceAll("-", "");
return seq;
}
TopicManager getTopicManager();
void sendSubscribe();
}
AMOP 使用方法:
String configFile = "conf/config.yaml";
BcosSDK sdk = new BcosSDK(configFile);
Amop amop = sdk.getAmop();
AmopCallback defaultCallback = new TestAmopCallback();
amop.setCallback(defaultCallback);
// 订阅Topic
AmopCallback t1Callback = new TestAmopCallback();
amop.subscribeTopic("topic1111",t1Callback);
// 订阅Private Topic
AmopCallback t1Callback = new TestAmopCallback();
KeyManager km = new PEMManager("keys/privateKey.pem");
amop.subscribePrivateTopic("privateSub1",km,t1Callback);
// 设置一个Private Topic
KeyManager km = new PEMManager("keys/publicKey.pem");
List<KeyManager> list = new ArrayList<>();
list.add(km);
amop.setupPrivateTopic("privateSend2",list);
// 取消订阅
amop.unsubscribeTopic("topic1111");
// 发送消息
mopMsgOut out = new AmopMsgOut();
out.setTopic("topic1111");
out.getTimeout(5000);
out.setContent("Tell you something".getBytes());
ResponseCallback cb = new TestResponseCallback();
amop.sendAmopMsg(out, cb);
amop.broadcastAmopMsg(out, cb);
// 其它操作
List<String> topics = amop.getSubTopics(); //获取自己订阅的和设置的所有Topic
amop.getTopicSubscribers("topic1111") //获取某个Topic的订阅者。
public class TestAmopCallback extends AmopCallback {
@Override
public void onSubscribedTopicMsg(AmopMsgIn msg) {
// do something.
}
}