Skip to content

Commit

Permalink
cleanup index
Browse files Browse the repository at this point in the history
  • Loading branch information
d-roak committed Jul 31, 2024
1 parent cdfac10 commit 8ee79bb
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 154 deletions.
14 changes: 13 additions & 1 deletion packages/node/src/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
} from "@topology-foundation/network";
import { TopologyNode } from ".";

export async function topologyMessageHandler(node: TopologyNode, stream: Stream) {
export async function topologyMessagesHandler(node: TopologyNode, stream: Stream) {
const buf = (await lp.decode(stream.source).return()).value;
const message = Message.decode(new Uint8Array(buf ? buf.subarray() : []))

Expand All @@ -31,6 +31,18 @@ export async function topologyMessageHandler(node: TopologyNode, stream: Stream)

function updateHandler(node: TopologyNode, data: Uint8Array) {
//
/*
this.objectStore.put(object.id, object);
// not dialed, emitted through pubsub
const message = `{
"type": "object_update",
"data": [${uint8ArrayFromString(update_data)}]
}`;
this.networkNode.broadcastMessage(
object.id,
uint8ArrayFromString(message),
);
*/
}

function syncHandler(node: TopologyNode, protocol: string, sender: string, data: Uint8Array) {
Expand Down
163 changes: 21 additions & 142 deletions packages/node/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import { TopologyObject } from "@topology-foundation/object";
import { TopologyObjectStore } from "./store";
import { fromString as uint8ArrayFromString } from "uint8arrays/from-string";
import { toString as uint8ArrayToString } from "uint8arrays/to-string";
import { OPERATIONS } from "./operations.js";
import * as lp from "it-length-prefixed";
import { topologyMessagesHandler } from "./handlers";

export * from "./operations.js";

Expand All @@ -38,161 +38,40 @@ export class TopologyNode {

this.networkNode.addMessageHandler(
["/topology/message/0.0.1"],
async ({ stream }) => {
let input = await streamToString(stream);
if (!input) return;

// const stream: Stream = (await connection?.newStream(protocols)) as Stream;
// let messageBuffer = Message.encode(message).finish();
// stream.sink(lp.encode([messageBuffer]))

const buf = (await lp.decode(stream.source).return()).value;
const message = Message.decode(new Uint8Array(buf ? buf.subarray() : []))

// const message = JSON.parse(input);
switch (message["type"]) {
case "object_fetch": {
const objectId = uint8ArrayToString(
new Uint8Array(message["data"]),
);
const object = <TopologyObject>this.getObject(objectId);
const object_message = `{
"type": "object",
"data": [${uint8ArrayFromString(JSON.stringify(object, (_key, value) => (value instanceof Set ? [...value] : value)))}]
}`;
await this.networkNode.sendMessage(
message["sender"],
[<string>stream.protocol],
object_message,
);
// await stringToStream(stream, object_message);
break;
}
case "object": {
const object = JSON.parse(
uint8ArrayToString(new Uint8Array(message["data"])),
);
this.objectStore.put(object["id"], object);
break;
}
case "object_sync": {
const objectId = uint8ArrayToString(
new Uint8Array(message["data"]),
);
const object = <TopologyObject>this.getObject(objectId);
const object_message = `{
"type": "object_merge",
"data": [${uint8ArrayFromString(JSON.stringify(object))}]
}`;
await this.networkNode.sendMessage(
message["sender"],
[<string>stream.protocol],
object_message,
);
break;
}
case "object_merge": {
const object = JSON.parse(
uint8ArrayToString(new Uint8Array(message["data"])),
);
const local = this.objectStore.get(object["id"]);
if (local) {
// TODO: merge requires a merge function in wasm
// local.merge(object);
this.objectStore.put(object["id"], local);
}
break;
}
default: {
return;
}
}
},
);
}


/// Subscribe to the object's PubSub group
/// and fetch it from a peer
async subscribeObject(objectId: string, fetch = false, peerId = "") {
this.networkNode.subscribe(objectId);
if (!fetch) return;
const message = `{
"type": "object_fetch",
"sender": "${this.networkNode.peerId}",
"data": [${uint8ArrayFromString(objectId)}]
}`;

if (!peerId) {
await this.networkNode.sendGroupMessageRandomPeer(
objectId,
["/topology/message/0.0.1"],
message,
);
} else {
await this.networkNode.sendMessage(
peerId,
["/topology/message/0.0.1"],
message,
);
}
}

async syncObject(objectId: string, peerId = "") {
const message = `{
"type": "object_sync",
"sender": "${this.networkNode.peerId}",
"data": [${uint8ArrayFromString(objectId)}]
}`;

if (!peerId) {
await this.networkNode.sendGroupMessageRandomPeer(
objectId,
["/topology/message/0.0.1"],
message,
);
} else {
await this.networkNode.sendMessage(
peerId,
["/topology/message/0.0.1"],
message,
);
}
}

/// Get the object from the local Object Store
getObject(objectId: string) {
return this.objectStore.get(objectId);
}

updateObject(object: TopologyObject, update_data: string) {
this.objectStore.put(object.id, object);
// not dialed, emitted through pubsub
const message = `{
"type": "object_update",
"data": [${uint8ArrayFromString(update_data)}]
}`;
this.networkNode.broadcastMessage(
object.id,
uint8ArrayFromString(message),
async ({ stream }) => topologyMessagesHandler(this, stream)
);
}

addCustomGroup(group: string) {
this.networkNode.subscribe(group);
}

sendGroupMessage(group: string, message: Uint8Array) {
this.networkNode.broadcastMessage(group, message);
}

addCustomGroupMessageHandler(
group: string,
handler: EventHandler<CustomEvent<GossipsubMessage>>,
) {
this.networkNode.addGroupMessageHandler(handler);
}

sendGroupMessage(group: string, data: Uint8Array) {
const message = Message.create({
sender: this.networkNode.peerId,
type: Message_MessageType.CUSTOM,
data,
})
this.networkNode.broadcastMessage(group, message);
}

addCustomMessageHandler(protocol: string | string[], handler: StreamHandler) {
this.networkNode.addMessageHandler(protocol, handler);
}

sendCustomMessage(peerId: string, protocol: string, data: Uint8Array) {
const message = Message.create({
sender: this.networkNode.peerId,
type: Message_MessageType.CUSTOM,
data,
});
this.networkNode.sendMessage(peerId, [protocol], message);
}
}
54 changes: 44 additions & 10 deletions packages/node/src/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ export enum OPERATIONS {
SUBSCRIBE,
/* Unsubscribe from a PubSub group */
UNSUBSCRIBE,

/* Actively send the CRO RIBLT to a random peer */
SYNC
}

export function executeObjectOperation(node: TopologyNode, operation: OPERATIONS, data: Uint8Array) {
/* Utility function to execute object operations apart of calling the functions directly */
export async function executeObjectOperation(node: TopologyNode, operation: OPERATIONS, data: Uint8Array) {
switch (operation) {
case OPERATIONS.CREATE:
// data = CRO
Expand All @@ -30,7 +30,7 @@ export function executeObjectOperation(node: TopologyNode, operation: OPERATIONS
break;
case OPERATIONS.SUBSCRIBE:
// data = CRO_ID
subscribeObject(node, data)
await subscribeObject(node, data)
break;
case OPERATIONS.UNSUBSCRIBE:
// data = CRO_ID
Expand All @@ -39,21 +39,21 @@ export function executeObjectOperation(node: TopologyNode, operation: OPERATIONS
case OPERATIONS.SYNC:
// data = CRO
// TODO: data = [CRO_ID, RIBLT]
syncObject(node, data)
await syncObject(node, data)
break;
default:
console.error("topology::node::executeObjectOperation", "Invalid operation");
break;
}
}

function createObject(node: TopologyNode, data: Uint8Array) {
export function createObject(node: TopologyNode, data: Uint8Array) {
const object = TopologyObject.decode(data)
node.networkNode.subscribe(object.id);
node.objectStore.put(object.id, object);
}

function updateObject(node: TopologyNode, data: Uint8Array) {
export function updateObject(node: TopologyNode, data: Uint8Array) {
// TODO: should just send the object diff, not the full object
// this is handler, we want the action of sending
const object = TopologyObject.decode(data)
Expand All @@ -70,19 +70,41 @@ function updateObject(node: TopologyNode, data: Uint8Array) {
);
}

function subscribeObject(node: TopologyNode, data: Uint8Array) {
export async function subscribeObject(node: TopologyNode, data: Uint8Array, fetch?: boolean, peerId?: string) {
// process data as only the object id and not the full obj
// need to create the obj anyway to sync empty obj
const object = TopologyObject.decode(data)
node.networkNode.subscribe(object.id);

if (!fetch) return;
const message = Message.create({
sender: node.networkNode.peerId,
type: Message_MessageType.SYNC,
data
});

if (!peerId) {
await node.networkNode.sendGroupMessageRandomPeer(
object.id,
["/topology/message/0.0.1"],
message,
);
} else {
await node.networkNode.sendMessage(
peerId,
["/topology/message/0.0.1"],
message,
);
}
}

function unsubscribeObject(node: TopologyNode, data: Uint8Array) {
export function unsubscribeObject(node: TopologyNode, data: Uint8Array) {
// process data as only the object id and not the full obj
const object = TopologyObject.decode(data)
node.networkNode.unsubscribe(object.id);
}

function syncObject(node: TopologyNode, data: Uint8Array) {
export async function syncObject(node: TopologyNode, data: Uint8Array, peerId?: string) {
// Send sync request to a random peer
const object = TopologyObject.decode(data)

Expand All @@ -92,5 +114,17 @@ function syncObject(node: TopologyNode, data: Uint8Array) {
})

// TODO: check how to do it better
node.networkNode.sendGroupMessageRandomPeer(object.id, ["/topology/message"], message)
if (!peerId) {
await node.networkNode.sendGroupMessageRandomPeer(
object.id,
["/topology/message/0.0.1"],
message,
);
} else {
await node.networkNode.sendMessage(
peerId,
["/topology/message/0.0.1"],
message,
);
}
}
2 changes: 1 addition & 1 deletion packages/node/src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export const VERSION = "0.0.22";
export const VERSION = "0.0.23-5";

0 comments on commit 8ee79bb

Please sign in to comment.