From c4da7ac3f09af32b1fa5f978010e68c388bd9d9a Mon Sep 17 00:00:00 2001 From: droak Date: Wed, 31 Jul 2024 00:29:36 +0900 Subject: [PATCH] initial restructuring and handlers --- packages/network/src/node.ts | 14 ++++-- packages/network/src/proto/messages.proto | 8 ++-- packages/network/src/proto/messages_pb.ts | 38 ++++++---------- packages/node/src/handlers.ts | 55 +++++++++++++++++++++++ packages/node/src/index.ts | 12 ++++- packages/node/src/operations.ts | 12 ++--- 6 files changed, 101 insertions(+), 38 deletions(-) create mode 100644 packages/node/src/handlers.ts diff --git a/packages/network/src/node.ts b/packages/network/src/node.ts index f4742e2f..a3e34e19 100644 --- a/packages/network/src/node.ts +++ b/packages/network/src/node.ts @@ -24,6 +24,7 @@ import { bootstrap } from "@libp2p/bootstrap"; import { webTransport } from "@libp2p/webtransport"; import { autoNAT } from "@libp2p/autonat"; import { fromString as uint8ArrayFromString } from "uint8arrays/from-string"; +import * as lp from "it-length-prefixed"; import { Message } from "./proto/messages_pb.js"; // snake_casing to match the JSON config @@ -187,8 +188,8 @@ export class TopologyNetworkNode { async broadcastMessage(topic: string, message: Message) { try { - if (this._pubsub?.getSubscribers(topic)?.length === 0) return; - await this._pubsub?.publish(topic, message); + let messageBuffer = Message.encode(message).finish(); + await this._pubsub?.publish(topic, messageBuffer); console.log( "topology::network::broadcastMessage: Successfuly broadcasted message to topic", @@ -203,7 +204,10 @@ export class TopologyNetworkNode { try { const connection = await this._node?.dial([multiaddr(`/p2p/${peerId}`)]); const stream = await connection?.newStream(protocols); - stringToStream(stream, message); + let messageBuffer = Message.encode(message).finish(); + stream.sink(lp.encode([messageBuffer])) + + // stringToStream(stream, message); console.log( `topology::network::sendMessage: Successfuly sent message to peer: ${peerId} with message: ${message}`, @@ -225,7 +229,9 @@ export class TopologyNetworkNode { const connection = await this._node?.dial(peerId); const stream: Stream = (await connection?.newStream(protocols)) as Stream; - Message.encode(message, stream.sink) + let messageBuffer = Message.encode(message).finish(); + stream.sink(lp.encode([messageBuffer])) + // stringToStream(stream, message); console.log( diff --git a/packages/network/src/proto/messages.proto b/packages/network/src/proto/messages.proto index 4eb466b6..6aa971fe 100644 --- a/packages/network/src/proto/messages.proto +++ b/packages/network/src/proto/messages.proto @@ -7,10 +7,10 @@ message Message { SYNC = 1; SYNC_ACCEPT = 2; SYNC_REJECT = 3; + CUSTOM = 4; } - string id = 1; - string sender = 2; - MessageType type = 3; - bytes data = 4; + string sender = 1; + MessageType type = 2; + bytes data = 3; } diff --git a/packages/network/src/proto/messages_pb.ts b/packages/network/src/proto/messages_pb.ts index 8bf675de..da36bdfc 100644 --- a/packages/network/src/proto/messages_pb.ts +++ b/packages/network/src/proto/messages_pb.ts @@ -10,7 +10,6 @@ import _m0 from "protobufjs/minimal"; export const protobufPackage = "topology.network"; export interface Message { - id: string; sender: string; type: Message_MessageType; data: Uint8Array; @@ -21,6 +20,7 @@ export enum Message_MessageType { SYNC = 1, SYNC_ACCEPT = 2, SYNC_REJECT = 3, + CUSTOM = 4, UNRECOGNIZED = -1, } @@ -38,6 +38,9 @@ export function message_MessageTypeFromJSON(object: any): Message_MessageType { case 3: case "SYNC_REJECT": return Message_MessageType.SYNC_REJECT; + case 4: + case "CUSTOM": + return Message_MessageType.CUSTOM; case -1: case "UNRECOGNIZED": default: @@ -55,6 +58,8 @@ export function message_MessageTypeToJSON(object: Message_MessageType): string { return "SYNC_ACCEPT"; case Message_MessageType.SYNC_REJECT: return "SYNC_REJECT"; + case Message_MessageType.CUSTOM: + return "CUSTOM"; case Message_MessageType.UNRECOGNIZED: default: return "UNRECOGNIZED"; @@ -62,22 +67,19 @@ export function message_MessageTypeToJSON(object: Message_MessageType): string { } function createBaseMessage(): Message { - return { id: "", sender: "", type: 0, data: new Uint8Array(0) }; + return { sender: "", type: 0, data: new Uint8Array(0) }; } export const Message = { encode(message: Message, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { - if (message.id !== "") { - writer.uint32(10).string(message.id); - } if (message.sender !== "") { - writer.uint32(18).string(message.sender); + writer.uint32(10).string(message.sender); } if (message.type !== 0) { - writer.uint32(24).int32(message.type); + writer.uint32(16).int32(message.type); } if (message.data.length !== 0) { - writer.uint32(34).bytes(message.data); + writer.uint32(26).bytes(message.data); } return writer; }, @@ -94,24 +96,17 @@ export const Message = { break; } - message.id = reader.string(); - continue; - case 2: - if (tag !== 18) { - break; - } - message.sender = reader.string(); continue; - case 3: - if (tag !== 24) { + case 2: + if (tag !== 16) { break; } message.type = reader.int32() as any; continue; - case 4: - if (tag !== 34) { + case 3: + if (tag !== 26) { break; } @@ -128,7 +123,6 @@ export const Message = { fromJSON(object: any): Message { return { - id: isSet(object.id) ? globalThis.String(object.id) : "", sender: isSet(object.sender) ? globalThis.String(object.sender) : "", type: isSet(object.type) ? message_MessageTypeFromJSON(object.type) : 0, data: isSet(object.data) ? bytesFromBase64(object.data) : new Uint8Array(0), @@ -137,9 +131,6 @@ export const Message = { toJSON(message: Message): unknown { const obj: any = {}; - if (message.id !== "") { - obj.id = message.id; - } if (message.sender !== "") { obj.sender = message.sender; } @@ -157,7 +148,6 @@ export const Message = { }, fromPartial, I>>(object: I): Message { const message = createBaseMessage(); - message.id = object.id ?? ""; message.sender = object.sender ?? ""; message.type = object.type ?? 0; message.data = object.data ?? new Uint8Array(0); diff --git a/packages/node/src/handlers.ts b/packages/node/src/handlers.ts new file mode 100644 index 00000000..1f92f1ee --- /dev/null +++ b/packages/node/src/handlers.ts @@ -0,0 +1,55 @@ +import { Stream } from "@libp2p/interface"; +import * as lp from "it-length-prefixed"; +import { + Message, + Message_MessageType, +} from "@topology-foundation/network"; +import { TopologyNode } from "."; + +export async function topologyMessageHandler(node: TopologyNode, stream: Stream) { + const buf = (await lp.decode(stream.source).return()).value; + const message = Message.decode(new Uint8Array(buf ? buf.subarray() : [])) + + switch (message.type) { + case Message_MessageType.UPDATE: + updateHandler(node, message.data); + break; + case Message_MessageType.SYNC: + syncHandler(node, stream.protocol ?? "", message.sender, message.data); + break; + case Message_MessageType.SYNC_ACCEPT: + syncAcceptHandler(node, message.data); + break; + case Message_MessageType.SYNC_REJECT: + syncRejectHandler(node, message.data); + break; + default: + console.error("topology::node::messageHandler", "Invalid operation"); + break; + } +} + +function updateHandler(node: TopologyNode, data: Uint8Array) { + // +} + +function syncHandler(node: TopologyNode, protocol: string, sender: string, data: Uint8Array) { + // Receive RBILT & send back + // (might send reject) <- TODO: when should we reject? + const message = Message.create({ + sender: node.networkNode.peerId, + type: Message_MessageType.SYNC_ACCEPT, + // add data here + data: new Uint8Array(0), + }); + + node.networkNode.sendMessage(sender, [protocol], message); +} + +function syncAcceptHandler(node: TopologyNode, data: Uint8Array) { + // Process RBILT +} + +function syncRejectHandler(node: TopologyNode, data: Uint8Array) { + // Ask sync from another peer +} diff --git a/packages/node/src/index.ts b/packages/node/src/index.ts index 75efe54f..995da97e 100644 --- a/packages/node/src/index.ts +++ b/packages/node/src/index.ts @@ -1,6 +1,8 @@ import { GossipsubMessage } from "@chainsafe/libp2p-gossipsub"; import { EventHandler, StreamHandler } from "@libp2p/interface"; import { + Message, + Message_MessageType, TopologyNetworkNode, TopologyNetworkNodeConfig, streamToString, @@ -10,6 +12,7 @@ import { TopologyObjectStore } from "./store"; import { fromString as uint8ArrayFromString } from "uint8arrays/from-string"; import { toString as uint8ArrayToString } from "uint8arrays/to-string"; import { OPERATIONS } from "./operations.js"; +import * as lp from "it-length-prefixed"; export * from "./operations.js"; @@ -39,7 +42,14 @@ export class TopologyNode { let input = await streamToString(stream); if (!input) return; - const message = JSON.parse(input); + // const stream: Stream = (await connection?.newStream(protocols)) as Stream; + // let messageBuffer = Message.encode(message).finish(); + // stream.sink(lp.encode([messageBuffer])) + + const buf = (await lp.decode(stream.source).return()).value; + const message = Message.decode(new Uint8Array(buf ? buf.subarray() : [])) + + // const message = JSON.parse(input); switch (message["type"]) { case "object_fetch": { const objectId = uint8ArrayToString( diff --git a/packages/node/src/operations.ts b/packages/node/src/operations.ts index 93f5da2a..86ee3b82 100644 --- a/packages/node/src/operations.ts +++ b/packages/node/src/operations.ts @@ -18,27 +18,27 @@ export enum OPERATIONS { SYNC } -function executeObjectOperation(node: TopologyNode, operation: OPERATIONS, data: Uint8Array) { +export function executeObjectOperation(node: TopologyNode, operation: OPERATIONS, data: Uint8Array) { switch (operation) { case OPERATIONS.CREATE: // data = CRO createObject(node, data); break; case OPERATIONS.UPDATE: - // data = CRO + // data = [CRO_ID, OPERATION] updateObject(node, data) break; case OPERATIONS.SUBSCRIBE: - // data = TopologyObjectId + // data = CRO_ID subscribeObject(node, data) break; case OPERATIONS.UNSUBSCRIBE: - // data = ObjectId + // data = CRO_ID unsubscribeObject(node, data) break; case OPERATIONS.SYNC: // data = CRO - // TODO: data = RIBLT + // TODO: data = [CRO_ID, RIBLT] syncObject(node, data) break; default: @@ -90,5 +90,7 @@ function syncObject(node: TopologyNode, data: Uint8Array) { type: Message_MessageType.SYNC, data: data }) + + // TODO: check how to do it better node.networkNode.sendGroupMessageRandomPeer(object.id, ["/topology/message"], message) }