Skip to content

Commit

Permalink
readd stream; sync working
Browse files Browse the repository at this point in the history
  • Loading branch information
d-roak committed Sep 9, 2024
1 parent 6692c0e commit 858b75d
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 89 deletions.
18 changes: 5 additions & 13 deletions packages/network/src/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import * as lp from "it-length-prefixed";
import { type Libp2p, createLibp2p } from "libp2p";
import { fromString as uint8ArrayFromString } from "uint8arrays/from-string";
import { Message } from "./proto/messages_pb.js";
import { uint8ArrayToStream } from "./stream.js";

export * from "./stream.js";

// snake_casing to match the JSON config
export interface TopologyNetworkNodeConfig {
Expand Down Expand Up @@ -206,13 +209,7 @@ export class TopologyNetworkNode {
const connection = await this._node?.dial([multiaddr(`/p2p/${peerId}`)]);
const stream = <Stream>await connection?.newStream(protocols);
const messageBuffer = Message.encode(message).finish();
stream.sink(lp.encode([messageBuffer]));

// stringToStream(stream, message);

console.log(
`topology::network::sendMessage: Successfuly sent message to peer: ${peerId} with message: ${message}`,
);
uint8ArrayToStream(stream, messageBuffer);
} catch (e) {
console.error("topology::network::sendMessage:", e);
}
Expand All @@ -231,12 +228,7 @@ export class TopologyNetworkNode {
const connection = await this._node?.dial(peerId);
const stream: Stream = (await connection?.newStream(protocols)) as Stream;
const messageBuffer = Message.encode(message).finish();
console.log(message);
stream.sink(lp.encode([messageBuffer]));

console.log(
`topology::network::sendMessageRandomTopicPeer: Successfuly sent message to peer: ${peerId} with message: ${message}`,
);
uint8ArrayToStream(stream, messageBuffer);
} catch (e) {
console.error("topology::network::sendMessageRandomTopicPeer:", e);
}
Expand Down
45 changes: 45 additions & 0 deletions packages/network/src/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Adapted from here: https://github.com/libp2p/js-libp2p-examples/blob/main/examples/js-libp2p-example-chat/src/stream.js
// The MIT License (MIT)
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

import type { Stream } from "@libp2p/interface";
import * as lp from "it-length-prefixed";
import map from "it-map";
import { pipe } from "it-pipe";

export async function uint8ArrayToStream(stream: Stream, input: Uint8Array) {
await pipe(input, (source) => lp.encode([source]), stream.sink);
console.log("uint8ArrayToStream", input);
}

export async function streamToUint8Array(stream: Stream) {
return await pipe(
stream.source,
(source) => lp.decode(source),
(source) => map(source, (buf) => buf.subarray()),
async (source) => {
const output: Uint8Array[] = [];
for await (const msg of source) {
output.push(msg);
}
console.log("streamToUint8Array", output);
return output[0];
},
);
}
127 changes: 92 additions & 35 deletions packages/node/src/handlers.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import type { Stream } from "@libp2p/interface";
import { NetworkPb } from "@topology-foundation/network";
import { type TopologyObject, ObjectPb } from "@topology-foundation/object";
import * as lp from "it-length-prefixed";
import { NetworkPb, streamToUint8Array } from "@topology-foundation/network";
import {
type TopologyObject,
ObjectPb,
type Vertex,
} from "@topology-foundation/object";
import type { TopologyNode } from "./index.js";

/*
Expand All @@ -13,15 +16,10 @@ export async function topologyMessagesHandler(
stream?: Stream,
data?: Uint8Array,
) {
console.log("topology::node::messageHandler", stream, data);
let message: NetworkPb.Message;
if (stream) {
console.log("lp decode", lp.decode(stream.source));
const buf = (await lp.decode(stream.source).return()).value;
console.log(buf);
message = NetworkPb.Message.decode(
new Uint8Array(buf ? buf.subarray() : []),
);
const byteArray = await streamToUint8Array(stream);
message = NetworkPb.Message.decode(byteArray);
} else if (data) {
message = NetworkPb.Message.decode(data);
} else {
Expand Down Expand Up @@ -49,7 +47,16 @@ export async function topologyMessagesHandler(
);
break;
case NetworkPb.Message_MessageType.SYNC_ACCEPT:
syncAcceptHandler(node, message.data);
if (!stream) {
console.error("topology::node::messageHandler", "Stream is undefined");
return;
}
syncAcceptHandler(
node,
stream.protocol ?? "/topology/message/0.0.1",
message.sender,
message.data,
);
break;
case NetworkPb.Message_MessageType.SYNC_REJECT:
syncRejectHandler(node, message.data);
Expand All @@ -66,13 +73,12 @@ export async function topologyMessagesHandler(
*/
function updateHandler(node: TopologyNode, data: Uint8Array) {
const object_operations = ObjectPb.TopologyObjectBase.decode(data);
console.log("topology::node::updateHandler", object_operations);

const object = node.objectStore.get(object_operations.id);
if (!object) {
console.error("topology::node::updateHandler", "Object not found");
return false;
}

for (const v of object_operations.vertices) {
const vertex = object.vertices.find((x) => x.hash === v.hash);
if (!vertex) {
Expand All @@ -94,46 +100,98 @@ function syncHandler(
data: Uint8Array,
) {
// (might send reject) <- TODO: when should we reject?
const syncMessage = NetworkPb.Sync.decode(data);
const object = node.objectStore.get(syncMessage.objectId);
if (!object) {
console.error("topology::node::syncHandler", "Object not found");
return;
}

const diff: Set<NetworkPb.Vertex> = new Set(object.vertices);
const missing: string[] = [];
for (const h of syncMessage.vertexHashes) {
const vertex = object.vertices.find((v) => v.hash === h);
if (vertex) {
diff.delete(vertex);
} else {
missing.push(h);
}
}

// process, calculate diffs, and send back
if (diff.size === 0 && missing.length === 0) return;

const message = NetworkPb.Message.create({
sender: node.networkNode.peerId,
type: NetworkPb.Message_MessageType.SYNC_ACCEPT,
// add data here
data: new Uint8Array(0),
data: NetworkPb.SyncAccept.encode(
NetworkPb.SyncAccept.create({
objectId: object.id,
diff: [...diff],
missing,
}),
).finish(),
});

node.networkNode.sendMessage(sender, [protocol], message);
}

/*
data: { id: string, operations: {nonce: string, fn: string, args: string[] }[] }
operations array contain the full remote operations array
*/
function syncAcceptHandler(node: TopologyNode, data: Uint8Array) {
// don't blindly accept, validate the operations
// might have have appeared in the meantime
const object_operations = ObjectPb.TopologyObjectBase.decode(data);
const object: TopologyObject | undefined = node.objectStore.get(
object_operations.id,
);
function syncAcceptHandler(
node: TopologyNode,
protocol: string,
sender: string,
data: Uint8Array,
) {
const syncAcceptMessage = NetworkPb.SyncAccept.decode(data);
const object = node.objectStore.get(syncAcceptMessage.objectId);
if (!object) {
console.error("topology::node::syncAcceptHandler", "Object not found");
return false;
return;
}

object_operations.vertices.filter((v1) => {
if (object?.vertices.find((v2) => v1.hash === v2.hash)) {
return false;
}
return true;
const vertices: Vertex[] = syncAcceptMessage.diff.map((v) => {
return {
hash: v.hash,
nodeId: v.nodeId,
operation: {
type: v.operation?.type ?? "",
value: v.operation?.value,
},
dependencies: v.dependencies,
};
});
object.vertices.push(...object_operations.vertices);
node.objectStore.put(object.id, object);

return true;
// TODO missing sending back the diff
if (vertices.length !== 0) {
object.merge(vertices);
node.objectStore.put(object.id, object);
}

// send missing vertices
const diff: NetworkPb.Vertex[] = [];
for (const h of syncAcceptMessage.missing) {
const vertex = object.vertices.find((v) => v.hash === h);
if (vertex) {
diff.push(vertex);
}
}

if (diff.length === 0) return;

const message = NetworkPb.Message.create({
sender: node.networkNode.peerId,
type: NetworkPb.Message_MessageType.SYNC_ACCEPT,
data: NetworkPb.SyncAccept.encode(
NetworkPb.SyncAccept.create({
objectId: object.id,
diff,
missing: [],
}),
).finish(),
});
node.networkNode.sendMessage(sender, [protocol], message);
}

/* data: { id: string } */
Expand All @@ -148,9 +206,7 @@ export function topologyObjectChangesHandler(
node: TopologyNode,
obj: TopologyObject,
originFn: string,
vertices: ObjectPb.Vertex[],
) {
console.log("topology::node::objectChangesHandler", obj, originFn, vertices);
switch (originFn) {
case "merge":
node.objectStore.put(obj.id, obj);
Expand All @@ -159,6 +215,7 @@ export function topologyObjectChangesHandler(
node.objectStore.put(obj.id, obj);
// send vertices to the pubsub group
const message = NetworkPb.Message.create({
sender: node.networkNode.peerId,
type: NetworkPb.Message_MessageType.UPDATE,
data: ObjectPb.TopologyObjectBase.encode(obj).finish(),
});
Expand Down
9 changes: 6 additions & 3 deletions packages/node/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,15 @@ export class TopologyNode {
) {
const object = new TopologyObject(this.networkNode.peerId, cro, id, abi);
operations.createObject(this, object);
operations.subscribeObject(this, object.id, sync, peerId);
operations.subscribeObject(this, object.id);
if (sync) {
operations.syncObject(this, object.id, peerId);
}
return object;
}

async subscribeObject(id: string, sync?: boolean, peerId?: string) {
return operations.subscribeObject(this, id, sync, peerId);
async subscribeObject(id: string) {
return operations.subscribeObject(this, id);
}

unsubscribeObject(id: string, purge?: boolean) {
Expand Down
37 changes: 5 additions & 32 deletions packages/node/src/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,46 +23,17 @@ enum OPERATIONS {

export function createObject(node: TopologyNode, object: TopologyObject) {
node.objectStore.put(object.id, object);
object.subscribe((obj, originFn, vertices) =>
topologyObjectChangesHandler(node, obj, originFn, vertices),
object.subscribe((obj, originFn, _) =>
topologyObjectChangesHandler(node, obj, originFn),
);
}

/* data: { id: string } */
export async function subscribeObject(
node: TopologyNode,
objectId: string,
sync?: boolean,
peerId?: string,
) {
export async function subscribeObject(node: TopologyNode, objectId: string) {
node.networkNode.subscribe(objectId);
node.networkNode.addGroupMessageHandler(objectId, async (e) =>
topologyMessagesHandler(node, undefined, e.detail.msg.data),
);

if (!sync) return;
// complies with format, since the operations array is empty
const message = NetworkPb.Message.create({
sender: node.networkNode.peerId,
type: NetworkPb.Message_MessageType.SYNC,
data: new Uint8Array(0),
});

console.log(message, "message");

if (!peerId) {
await node.networkNode.sendGroupMessageRandomPeer(
objectId,
["/topology/message/0.0.1"],
message,
);
} else {
await node.networkNode.sendMessage(
peerId,
["/topology/message/0.0.1"],
message,
);
}
}

export function unsubscribeObject(
Expand All @@ -88,9 +59,11 @@ export async function syncObject(
return;
}
const data = NetworkPb.Sync.create({
objectId,
vertexHashes: object.vertices.map((v) => v.hash),
});
const message = NetworkPb.Message.create({
sender: node.networkNode.peerId,
type: NetworkPb.Message_MessageType.SYNC,
data: NetworkPb.Sync.encode(data).finish(),
});
Expand Down
Loading

0 comments on commit 858b75d

Please sign in to comment.