Skip to content

Commit

Permalink
initial restructuring and handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
d-roak committed Jul 30, 2024
1 parent a06c2c8 commit c4da7ac
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 38 deletions.
14 changes: 10 additions & 4 deletions packages/network/src/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { bootstrap } from "@libp2p/bootstrap";
import { webTransport } from "@libp2p/webtransport";
import { autoNAT } from "@libp2p/autonat";
import { fromString as uint8ArrayFromString } from "uint8arrays/from-string";
import * as lp from "it-length-prefixed";
import { Message } from "./proto/messages_pb.js";

// snake_casing to match the JSON config
Expand Down Expand Up @@ -187,8 +188,8 @@ export class TopologyNetworkNode {

async broadcastMessage(topic: string, message: Message) {
try {
if (this._pubsub?.getSubscribers(topic)?.length === 0) return;
await this._pubsub?.publish(topic, message);
let messageBuffer = Message.encode(message).finish();
await this._pubsub?.publish(topic, messageBuffer);

console.log(
"topology::network::broadcastMessage: Successfuly broadcasted message to topic",
Expand All @@ -203,7 +204,10 @@ export class TopologyNetworkNode {
try {
const connection = await this._node?.dial([multiaddr(`/p2p/${peerId}`)]);
const stream = <Stream>await connection?.newStream(protocols);
stringToStream(stream, message);
let messageBuffer = Message.encode(message).finish();
stream.sink(lp.encode([messageBuffer]))

// stringToStream(stream, message);

console.log(
`topology::network::sendMessage: Successfuly sent message to peer: ${peerId} with message: ${message}`,
Expand All @@ -225,7 +229,9 @@ export class TopologyNetworkNode {

const connection = await this._node?.dial(peerId);
const stream: Stream = (await connection?.newStream(protocols)) as Stream;
Message.encode(message, stream.sink)
let messageBuffer = Message.encode(message).finish();
stream.sink(lp.encode([messageBuffer]))

// stringToStream(stream, message);

console.log(
Expand Down
8 changes: 4 additions & 4 deletions packages/network/src/proto/messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ message Message {
SYNC = 1;
SYNC_ACCEPT = 2;
SYNC_REJECT = 3;
CUSTOM = 4;
}

string id = 1;
string sender = 2;
MessageType type = 3;
bytes data = 4;
string sender = 1;
MessageType type = 2;
bytes data = 3;
}
38 changes: 14 additions & 24 deletions packages/network/src/proto/messages_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 55 additions & 0 deletions packages/node/src/handlers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { Stream } from "@libp2p/interface";
import * as lp from "it-length-prefixed";
import {
Message,
Message_MessageType,
} from "@topology-foundation/network";
import { TopologyNode } from ".";

export async function topologyMessageHandler(node: TopologyNode, stream: Stream) {
const buf = (await lp.decode(stream.source).return()).value;
const message = Message.decode(new Uint8Array(buf ? buf.subarray() : []))

switch (message.type) {
case Message_MessageType.UPDATE:
updateHandler(node, message.data);
break;
case Message_MessageType.SYNC:
syncHandler(node, stream.protocol ?? "", message.sender, message.data);
break;
case Message_MessageType.SYNC_ACCEPT:
syncAcceptHandler(node, message.data);
break;
case Message_MessageType.SYNC_REJECT:
syncRejectHandler(node, message.data);
break;
default:
console.error("topology::node::messageHandler", "Invalid operation");
break;
}
}

function updateHandler(node: TopologyNode, data: Uint8Array) {
//
}

function syncHandler(node: TopologyNode, protocol: string, sender: string, data: Uint8Array) {
// Receive RBILT & send back
// (might send reject) <- TODO: when should we reject?
const message = Message.create({
sender: node.networkNode.peerId,
type: Message_MessageType.SYNC_ACCEPT,
// add data here
data: new Uint8Array(0),
});

node.networkNode.sendMessage(sender, [protocol], message);
}

function syncAcceptHandler(node: TopologyNode, data: Uint8Array) {
// Process RBILT
}

function syncRejectHandler(node: TopologyNode, data: Uint8Array) {
// Ask sync from another peer
}
12 changes: 11 additions & 1 deletion packages/node/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { GossipsubMessage } from "@chainsafe/libp2p-gossipsub";
import { EventHandler, StreamHandler } from "@libp2p/interface";
import {
Message,
Message_MessageType,
TopologyNetworkNode,
TopologyNetworkNodeConfig,
streamToString,
Expand All @@ -10,6 +12,7 @@ import { TopologyObjectStore } from "./store";
import { fromString as uint8ArrayFromString } from "uint8arrays/from-string";
import { toString as uint8ArrayToString } from "uint8arrays/to-string";
import { OPERATIONS } from "./operations.js";
import * as lp from "it-length-prefixed";

export * from "./operations.js";

Expand Down Expand Up @@ -39,7 +42,14 @@ export class TopologyNode {
let input = await streamToString(stream);
if (!input) return;

const message = JSON.parse(input);
// const stream: Stream = (await connection?.newStream(protocols)) as Stream;
// let messageBuffer = Message.encode(message).finish();
// stream.sink(lp.encode([messageBuffer]))

const buf = (await lp.decode(stream.source).return()).value;
const message = Message.decode(new Uint8Array(buf ? buf.subarray() : []))

// const message = JSON.parse(input);
switch (message["type"]) {
case "object_fetch": {
const objectId = uint8ArrayToString(
Expand Down
12 changes: 7 additions & 5 deletions packages/node/src/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,27 @@ export enum OPERATIONS {
SYNC
}

function executeObjectOperation(node: TopologyNode, operation: OPERATIONS, data: Uint8Array) {
export function executeObjectOperation(node: TopologyNode, operation: OPERATIONS, data: Uint8Array) {
switch (operation) {
case OPERATIONS.CREATE:
// data = CRO
createObject(node, data);
break;
case OPERATIONS.UPDATE:
// data = CRO
// data = [CRO_ID, OPERATION]
updateObject(node, data)
break;
case OPERATIONS.SUBSCRIBE:
// data = TopologyObjectId
// data = CRO_ID
subscribeObject(node, data)
break;
case OPERATIONS.UNSUBSCRIBE:
// data = ObjectId
// data = CRO_ID
unsubscribeObject(node, data)
break;
case OPERATIONS.SYNC:
// data = CRO
// TODO: data = RIBLT
// TODO: data = [CRO_ID, RIBLT]
syncObject(node, data)
break;
default:
Expand Down Expand Up @@ -90,5 +90,7 @@ function syncObject(node: TopologyNode, data: Uint8Array) {
type: Message_MessageType.SYNC,
data: data
})

// TODO: check how to do it better
node.networkNode.sendGroupMessageRandomPeer(object.id, ["/topology/message"], message)
}

0 comments on commit c4da7ac

Please sign in to comment.