Skip to content
maggie edited this page Sep 15, 2020 · 1 revision

AMOP模块重构

  • 模块介绍
  • 重构任务
  • 重构设计
  • 接口说明

1. 模块介绍

AMOP模块

  • 主要功能

    • 话题订阅,话题相关消息推送的功能

      • 单拨: 随机发给一个订阅方
      • 组播: 广播给所有订阅方 (AMOP_MULBROADCAST)

      img

    • 话题认证。在普通的配置下,任何一个监听了某topic的接收者都能接受到发送者推送的消息。但在某些场景下,发送者只希望特定的接收者能接收到消息,不希望无关的接收者能任意的监听此topic。在此场景下,需要使用Topic认证功能。

      img

  • 主要处理的消息类型

AMOP_REQUEST(0x30), // type for request from sdk
AMOP_RESPONSE(0x31), // type for response to sdk
AMOP_MULBROADCAST(0x35), // type for mult broadcast
AMOP_CLIENT_TOPICS(0x32), // type for topic request
REQUEST_TOPICCERT(0x37), // type request verify
UPDATE_TOPIICSTATUS(0x38), // type for update status

2.重构任务

  • 适配新的Network和Channel模块
  • 新增Topic动态更新功能。认证Topic能够再运行过程中动态更新。
  • 解决Topic长度受限的问题。Channel协议长度字段只有1为,Topic长度JSON序列化之后不超过255字节。
  • 优雅关闭,代码优化

3. 重构设计

重构点1. 适配新的Network和Channel模块

toml配置方式:

# Configure a "need verify AMOP topic" as a topic message sender.
[[amop]]
topicName = "PrivateTopic1"
# Public keys of the nodes that you want to send AMOP message of this topic to.
publicKeys = [ "conf/amop/consumer_public_key_1.pem" ]

# Configure a "need verify AMOP topic" as a topic subscriber.
[[amop]]
topicName = "PrivateTopic2"
# Your private key that used to subscriber verification.
privateKey = "conf/amop/consumer_private_key.p12"
password = "123456"

重构点2. 新增 Topic动态更新功能

重构前:SDK用户需要在Service启动之前设置好订阅的Topics,SDK的启动后,会向节点同步自己订阅的Topics。不支持Topic动态更新。

重构方法: 新增3个添加Topic的接口、一个移除Topic的接口

public interface Amop {
    void subscribeTopic(String topicName, AmopCallback callback); //订阅话题
    void subscribePrivateTopics(
            String topicName, KeyManager privateKeyManager, AmopCallback callback);//订阅私有话题
    void publishPrivateTopic(String topicName, List<KeyManager> publicKeyManagers);//设置私有话题
    void unsubscribeTopic(String topicName);//取消订阅
    void sendAmopMsg(AmopMsgOut content, ResponseCallback callback);//发送AMOP消息
    void broadcastAmopMsg(AmopMsgOut content);//广播AMOP消息
    Set<String> getSubTopics();//获取订阅的消息
    void setCallback(AmopCallback cb);//设置回调函数
    void start();
    void stop();
}

新增话题后,会触发syncTopicsToNode()方法,将话题与节点同步。

问题:用户添加了认证话题后,是否要将该配置写入配置文件?

  • 不写入配置文件,则SDK重启后话题无法继续。
  • 写入配置文件,则需要自动生成配置项,写入原配置文件中。

重构点3. 解决Topic长度受限的问题

重构前:

  • AMOP消息格式如下,其中topic length是固定的1字节长度,那么这就限制了Topic的长度最长为255字节。
  • 我们会在用户定义的TopicName前加上一些标注,和一个UUID标注认证话题的一个订阅房。如下所示。一个Topic的长度在69+。

    public static final String verifyChannelPrefix = "#!$VerifyChannel_";
    public static final String pushChannelPrefix = "#!$PushChannel_";
    public static final String topicNeedVerifyPrefix = "#!$TopicNeedVerify_";
    String topicName = "TopicName";
    StringBuilder topicString =new StringBuilder();
    stringBuilder.append(verifyChannelPrefix);
    stringBuilder.append(topicNeedVerifyPrefix);
    stringBuilder.append(topicName)
    stringBuilder.append('_');
    stringBuilder.append(UUID.randomUUID().toString().replaceAll("-", ""));

    当发送一个更新Topics的消息时,Topic字段需要带上节点所订阅的所有主题。如果这样,可能4个Topic就已经用完了255字节。

    这种情况,需要修改协议。

重构方案:

  • 消息格式:采用普遍消息的消息格式。

    其中Data部分包含内容:

    {
    	topic: Topic1_#!$VerifyChannel_#!$TopicNeedVerify_Topic2_e0d3184269914257986648e63df5ce07
    }
    

    删除topic lengh字段。

  • 虽然SDK新增了“动态订阅”功能,但每次发送订阅消息时,仍然需要传输全量的Topic,以减少节点方面对AMOP功能的修改。(否则,节点可能需要新增一个更复杂的AMOP管理模块)

4. 接口说明

AMOP配置:

# Configure a "need verify AMOP topic" as a topic message sender.
[[amop]]
topicName = "PrivateTopic1"
# Public keys of the nodes that you want to send AMOP message of this topic to.
publicKeys = [ "conf/amop/consumer_public_key_1.pem" ]

# Configure a "need verify AMOP topic" as a topic subscriber.
[[amop]]
topicName = "PrivateTopic2"
# Your private key that used to subscriber verification.
privateKey = "conf/amop/consumer_private_key.p12"
password = "123456"

AMOP 接口:

/*
 * Copyright 2014-2020  [fisco-dev]
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 * in compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 *
 */

package org.fisco.bcos.sdk.amop;

import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.fisco.bcos.sdk.amop.topic.TopicManager;
import org.fisco.bcos.sdk.channel.Channel;
import org.fisco.bcos.sdk.channel.ResponseCallback;
import org.fisco.bcos.sdk.config.ConfigOption;
import org.fisco.bcos.sdk.crypto.keystore.KeyManager;

/**
 * AMOP module interface.
 *
 * @author Maggie
 */
public interface Amop {
    /**
     * Create a Amop object.
     *
     * @param channel
     * @param config
     * @return Amop instance
     */
    static Amop build(Channel channel, ConfigOption config) {
        return new AmopImp(channel, config);
    }

    /**
     * Subscribe a normal topic.
     *
     * @param topicName
     * @param callback callback is called when receive a msg relate to this topic
     */
    void subscribeTopic(String topicName, AmopCallback callback);

    /**
     * Subscribe a private topic which need verify.
     *
     * @param topicName
     * @param privateKeyManager the private key you used to prove your identity.
     * @param callback callback is called when receive a msg relate to this topic
     */
    void subscribePrivateTopics(
            String topicName, KeyManager privateKeyManager, AmopCallback callback);

    /**
     * Config a topic which is need verification, after that user can send message to verified
     * subscriber.
     *
     * @param topicName
     * @param publicKeyManagers the public keys of the target organizations that you want to
     *     communicate with
     */
    void publishPrivateTopic(String topicName, List<KeyManager> publicKeyManagers);

    /**
     * Unsubscribe a topic.
     *
     * @param topicName
     */
    void unsubscribeTopic(String topicName);

    /**
     * Send amop msg
     *
     * @param content
     * @param callback
     */
    void sendAmopMsg(AmopMsgOut content, ResponseCallback callback);

    /**
     * Send amop msg
     *
     * @param content
     */
    void broadcastAmopMsg(AmopMsgOut content);

    /**
     * Get all subscribe topics.
     *
     * @return topic name list
     */
    Set<String> getSubTopics();

    /**
     * Get list of subscribers to a topic
     *
     * @param topicName the topic you want to query
     * @return List of subscribers
     */
    List<String> getTopicSubscribers(String topicName);

    /**
     * set amop default callback
     *
     * @param cb the amop callback
     */
    void setCallback(AmopCallback cb);

    /** Start. */
    void start();

    /** Stop. */
    void stop();

    /** If configured private topic, wait until finish verify */
    void waitFinishPrivateTopicVerify();

    /**
     * generate message sequence string
     *
     * @return Sequence string
     */
    static String newSeq() {
        String seq = UUID.randomUUID().toString().replaceAll("-", "");
        return seq;
    }

    TopicManager getTopicManager();

    void sendSubscribe();
}

AMOP 使用方法:

String configFile = "conf/config.yaml";
BcosSDK sdk = new BcosSDK(configFile);

Amop amop = sdk.getAmop();
AmopCallback defaultCallback = new TestAmopCallback();
amop.setCallback(defaultCallback);

// 订阅Topic
AmopCallback t1Callback = new TestAmopCallback();
amop.subscribeTopic("topic1111",t1Callback);

// 订阅Private Topic
AmopCallback t1Callback = new TestAmopCallback();
KeyManager km = new PEMManager("keys/privateKey.pem");
amop.subscribePrivateTopic("privateSub1",km,t1Callback);

// 设置一个Private Topic
KeyManager km = new PEMManager("keys/publicKey.pem");
List<KeyManager> list = new ArrayList<>();
list.add(km);
amop.setupPrivateTopic("privateSend2",list);
 
// 取消订阅
amop.unsubscribeTopic("topic1111");

// 发送消息
mopMsgOut out = new AmopMsgOut();
out.setTopic("topic1111");
out.getTimeout(5000);
out.setContent("Tell you something".getBytes());
ResponseCallback cb = new TestResponseCallback();
amop.sendAmopMsg(out, cb);
amop.broadcastAmopMsg(out, cb);

// 其它操作
List<String> topics = amop.getSubTopics(); //获取自己订阅的和设置的所有Topic
amop.getTopicSubscribers("topic1111") //获取某个Topic的订阅者。
  
public class TestAmopCallback extends AmopCallback {

  @Override
  public void onSubscribedTopicMsg(AmopMsgIn msg) {
    // do something.
  }
}