diff --git a/packages/node/src/handlers.ts b/packages/node/src/handlers.ts index 1f92f1ee..19ca06c2 100644 --- a/packages/node/src/handlers.ts +++ b/packages/node/src/handlers.ts @@ -6,7 +6,7 @@ import { } from "@topology-foundation/network"; import { TopologyNode } from "."; -export async function topologyMessageHandler(node: TopologyNode, stream: Stream) { +export async function topologyMessagesHandler(node: TopologyNode, stream: Stream) { const buf = (await lp.decode(stream.source).return()).value; const message = Message.decode(new Uint8Array(buf ? buf.subarray() : [])) @@ -31,6 +31,18 @@ export async function topologyMessageHandler(node: TopologyNode, stream: Stream) function updateHandler(node: TopologyNode, data: Uint8Array) { // + /* + this.objectStore.put(object.id, object); + // not dialed, emitted through pubsub + const message = `{ + "type": "object_update", + "data": [${uint8ArrayFromString(update_data)}] + }`; + this.networkNode.broadcastMessage( + object.id, + uint8ArrayFromString(message), + ); + */ } function syncHandler(node: TopologyNode, protocol: string, sender: string, data: Uint8Array) { diff --git a/packages/node/src/index.ts b/packages/node/src/index.ts index 995da97e..87c96e87 100644 --- a/packages/node/src/index.ts +++ b/packages/node/src/index.ts @@ -11,8 +11,8 @@ import { TopologyObject } from "@topology-foundation/object"; import { TopologyObjectStore } from "./store"; import { fromString as uint8ArrayFromString } from "uint8arrays/from-string"; import { toString as uint8ArrayToString } from "uint8arrays/to-string"; -import { OPERATIONS } from "./operations.js"; import * as lp from "it-length-prefixed"; +import { topologyMessagesHandler } from "./handlers"; export * from "./operations.js"; @@ -38,143 +38,7 @@ export class TopologyNode { this.networkNode.addMessageHandler( ["/topology/message/0.0.1"], - async ({ stream }) => { - let input = await streamToString(stream); - if (!input) return; - - // const stream: Stream = (await connection?.newStream(protocols)) as Stream; - // let messageBuffer = Message.encode(message).finish(); - // stream.sink(lp.encode([messageBuffer])) - - const buf = (await lp.decode(stream.source).return()).value; - const message = Message.decode(new Uint8Array(buf ? buf.subarray() : [])) - - // const message = JSON.parse(input); - switch (message["type"]) { - case "object_fetch": { - const objectId = uint8ArrayToString( - new Uint8Array(message["data"]), - ); - const object = this.getObject(objectId); - const object_message = `{ - "type": "object", - "data": [${uint8ArrayFromString(JSON.stringify(object, (_key, value) => (value instanceof Set ? [...value] : value)))}] - }`; - await this.networkNode.sendMessage( - message["sender"], - [stream.protocol], - object_message, - ); - // await stringToStream(stream, object_message); - break; - } - case "object": { - const object = JSON.parse( - uint8ArrayToString(new Uint8Array(message["data"])), - ); - this.objectStore.put(object["id"], object); - break; - } - case "object_sync": { - const objectId = uint8ArrayToString( - new Uint8Array(message["data"]), - ); - const object = this.getObject(objectId); - const object_message = `{ - "type": "object_merge", - "data": [${uint8ArrayFromString(JSON.stringify(object))}] - }`; - await this.networkNode.sendMessage( - message["sender"], - [stream.protocol], - object_message, - ); - break; - } - case "object_merge": { - const object = JSON.parse( - uint8ArrayToString(new Uint8Array(message["data"])), - ); - const local = this.objectStore.get(object["id"]); - if (local) { - // TODO: merge requires a merge function in wasm - // local.merge(object); - this.objectStore.put(object["id"], local); - } - break; - } - default: { - return; - } - } - }, - ); - } - - - /// Subscribe to the object's PubSub group - /// and fetch it from a peer - async subscribeObject(objectId: string, fetch = false, peerId = "") { - this.networkNode.subscribe(objectId); - if (!fetch) return; - const message = `{ - "type": "object_fetch", - "sender": "${this.networkNode.peerId}", - "data": [${uint8ArrayFromString(objectId)}] - }`; - - if (!peerId) { - await this.networkNode.sendGroupMessageRandomPeer( - objectId, - ["/topology/message/0.0.1"], - message, - ); - } else { - await this.networkNode.sendMessage( - peerId, - ["/topology/message/0.0.1"], - message, - ); - } - } - - async syncObject(objectId: string, peerId = "") { - const message = `{ - "type": "object_sync", - "sender": "${this.networkNode.peerId}", - "data": [${uint8ArrayFromString(objectId)}] - }`; - - if (!peerId) { - await this.networkNode.sendGroupMessageRandomPeer( - objectId, - ["/topology/message/0.0.1"], - message, - ); - } else { - await this.networkNode.sendMessage( - peerId, - ["/topology/message/0.0.1"], - message, - ); - } - } - - /// Get the object from the local Object Store - getObject(objectId: string) { - return this.objectStore.get(objectId); - } - - updateObject(object: TopologyObject, update_data: string) { - this.objectStore.put(object.id, object); - // not dialed, emitted through pubsub - const message = `{ - "type": "object_update", - "data": [${uint8ArrayFromString(update_data)}] - }`; - this.networkNode.broadcastMessage( - object.id, - uint8ArrayFromString(message), + async ({ stream }) => topologyMessagesHandler(this, stream) ); } @@ -182,17 +46,32 @@ export class TopologyNode { this.networkNode.subscribe(group); } - sendGroupMessage(group: string, message: Uint8Array) { - this.networkNode.broadcastMessage(group, message); - } - addCustomGroupMessageHandler( + group: string, handler: EventHandler>, ) { this.networkNode.addGroupMessageHandler(handler); } + sendGroupMessage(group: string, data: Uint8Array) { + const message = Message.create({ + sender: this.networkNode.peerId, + type: Message_MessageType.CUSTOM, + data, + }) + this.networkNode.broadcastMessage(group, message); + } + addCustomMessageHandler(protocol: string | string[], handler: StreamHandler) { this.networkNode.addMessageHandler(protocol, handler); } + + sendCustomMessage(peerId: string, protocol: string, data: Uint8Array) { + const message = Message.create({ + sender: this.networkNode.peerId, + type: Message_MessageType.CUSTOM, + data, + }); + this.networkNode.sendMessage(peerId, [protocol], message); + } } diff --git a/packages/node/src/operations.ts b/packages/node/src/operations.ts index 86ee3b82..6c256542 100644 --- a/packages/node/src/operations.ts +++ b/packages/node/src/operations.ts @@ -13,12 +13,12 @@ export enum OPERATIONS { SUBSCRIBE, /* Unsubscribe from a PubSub group */ UNSUBSCRIBE, - /* Actively send the CRO RIBLT to a random peer */ SYNC } -export function executeObjectOperation(node: TopologyNode, operation: OPERATIONS, data: Uint8Array) { +/* Utility function to execute object operations apart of calling the functions directly */ +export async function executeObjectOperation(node: TopologyNode, operation: OPERATIONS, data: Uint8Array) { switch (operation) { case OPERATIONS.CREATE: // data = CRO @@ -30,7 +30,7 @@ export function executeObjectOperation(node: TopologyNode, operation: OPERATIONS break; case OPERATIONS.SUBSCRIBE: // data = CRO_ID - subscribeObject(node, data) + await subscribeObject(node, data) break; case OPERATIONS.UNSUBSCRIBE: // data = CRO_ID @@ -39,7 +39,7 @@ export function executeObjectOperation(node: TopologyNode, operation: OPERATIONS case OPERATIONS.SYNC: // data = CRO // TODO: data = [CRO_ID, RIBLT] - syncObject(node, data) + await syncObject(node, data) break; default: console.error("topology::node::executeObjectOperation", "Invalid operation"); @@ -47,13 +47,13 @@ export function executeObjectOperation(node: TopologyNode, operation: OPERATIONS } } -function createObject(node: TopologyNode, data: Uint8Array) { +export function createObject(node: TopologyNode, data: Uint8Array) { const object = TopologyObject.decode(data) node.networkNode.subscribe(object.id); node.objectStore.put(object.id, object); } -function updateObject(node: TopologyNode, data: Uint8Array) { +export function updateObject(node: TopologyNode, data: Uint8Array) { // TODO: should just send the object diff, not the full object // this is handler, we want the action of sending const object = TopologyObject.decode(data) @@ -70,19 +70,41 @@ function updateObject(node: TopologyNode, data: Uint8Array) { ); } -function subscribeObject(node: TopologyNode, data: Uint8Array) { +export async function subscribeObject(node: TopologyNode, data: Uint8Array, fetch?: boolean, peerId?: string) { // process data as only the object id and not the full obj + // need to create the obj anyway to sync empty obj const object = TopologyObject.decode(data) node.networkNode.subscribe(object.id); + + if (!fetch) return; + const message = Message.create({ + sender: node.networkNode.peerId, + type: Message_MessageType.SYNC, + data + }); + + if (!peerId) { + await node.networkNode.sendGroupMessageRandomPeer( + object.id, + ["/topology/message/0.0.1"], + message, + ); + } else { + await node.networkNode.sendMessage( + peerId, + ["/topology/message/0.0.1"], + message, + ); + } } -function unsubscribeObject(node: TopologyNode, data: Uint8Array) { +export function unsubscribeObject(node: TopologyNode, data: Uint8Array) { // process data as only the object id and not the full obj const object = TopologyObject.decode(data) node.networkNode.unsubscribe(object.id); } -function syncObject(node: TopologyNode, data: Uint8Array) { +export async function syncObject(node: TopologyNode, data: Uint8Array, peerId?: string) { // Send sync request to a random peer const object = TopologyObject.decode(data) @@ -92,5 +114,17 @@ function syncObject(node: TopologyNode, data: Uint8Array) { }) // TODO: check how to do it better - node.networkNode.sendGroupMessageRandomPeer(object.id, ["/topology/message"], message) + if (!peerId) { + await node.networkNode.sendGroupMessageRandomPeer( + object.id, + ["/topology/message/0.0.1"], + message, + ); + } else { + await node.networkNode.sendMessage( + peerId, + ["/topology/message/0.0.1"], + message, + ); + } } diff --git a/packages/node/src/version.ts b/packages/node/src/version.ts index 85ceb686..2d05f320 100644 --- a/packages/node/src/version.ts +++ b/packages/node/src/version.ts @@ -1 +1 @@ -export const VERSION = "0.0.22"; +export const VERSION = "0.0.23-5";