Skip to content

Commit

Permalink
double way streams returning the objects
Browse files Browse the repository at this point in the history
  • Loading branch information
d-roak committed Jun 11, 2024
1 parent 81ed5ed commit f636b65
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 103 deletions.
19 changes: 13 additions & 6 deletions examples/canvas/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { TopologyNode } from "@topology-foundation/node";
import { Canvas } from "./objects/canvas";
import { Canvas, ICanvas } from "./objects/canvas";
import { TopologyObject } from "@topology-foundation/object";
import { Pixel } from "./objects/pixel";

const node = new TopologyNode();
let canvasCRO: Canvas;

const render = () => {
const canvas = canvasCRO.canvas();
const canvas = canvasCRO.canvas;
const canvas_element = <HTMLDivElement>document.getElementById("canvas");
canvas_element.style.display = "inline-grid";

Expand All @@ -20,7 +22,7 @@ const render = () => {
pixel.id = `${x}-${y}`;
pixel.style.width = "25px";
pixel.style.height = "25px";
pixel.style.backgroundColor = `rgb(${canvas[x][y][0]}, ${canvas[x][y][1]}, ${canvas[x][y][2]})`;
pixel.style.backgroundColor = `rgb(${canvas[x][y].color()[0]}, ${canvas[x][y].color()[1]}, ${canvas[x][y].color()[2]})`;
pixel.style.cursor = "pointer";
pixel.addEventListener("click", () => paint_pixel(pixel));
canvas_element.appendChild(pixel);
Expand Down Expand Up @@ -62,10 +64,15 @@ async function init() {
.value;
try {
// TODO don't create a new canvas
canvasCRO = new Canvas(5, 10);
//await node.sub
// canvasCRO = new Canvas(5, 10);

await node.subscribeObject(croId);
canvasCRO = <Canvas>await node.getObject(croId);

let object = (await node.getObject(croId)) as Canvas;
console.log(object);

canvasCRO = object;
console.log(canvasCRO);

(<HTMLSpanElement>document.getElementById("canvasId")).innerText = croId;
node.sendObjectUpdate(croId);
Expand Down
67 changes: 40 additions & 27 deletions examples/canvas/src/objects/canvas.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,35 @@
import { TopologyObject } from "@topology-foundation/object";
import { Pixel } from "./pixel";
import { IPixel, Pixel } from "./pixel";

export class Canvas extends TopologyObject {
private static _width: number;
private static _height: number;
private _canvas: Pixel[][];
export interface ICanvas {
width: number;
height: number;
canvas: IPixel[][];
splash(
node_id: string,
offset: [number, number],
size: [number, number],
rgb: [number, number, number],
): void;
paint(
nodeId: string,
offset: [number, number],
rgb: [number, number, number],
): void;
pixel(x: number, y: number): IPixel;
merge(peerCanvas: Canvas): void;
}

export class Canvas extends TopologyObject implements ICanvas {
width: number;
height: number;
canvas: IPixel[][];

constructor(width: number, height: number) {
super();
Canvas._init(width, height);
this._canvas = Array(width).fill(Array(height).fill(new Pixel()));
}

private static _init(width: number, height: number) {
this._width = width;
this._height = height;
this.width = width;
this.height = height;
this.canvas = Array(width).fill(Array(height).fill(new Pixel()));
}

splash(
Expand All @@ -23,16 +38,12 @@ export class Canvas extends TopologyObject {
size: [number, number],
rgb: [number, number, number],
): void {
if (offset[0] < 0 || Canvas._width < offset[0]) return;
if (offset[1] < 0 || Canvas._height < offset[1]) return;
if (offset[0] < 0 || this.width < offset[0]) return;
if (offset[1] < 0 || this.height < offset[1]) return;

for (let x = offset[0]; x < Canvas._width || x < offset[0] + size[0]; x++) {
for (
let y = offset[1];
y < Canvas._height || y < offset[1] + size[1];
y++
) {
this._canvas[x][y].paint(node_id, rgb);
for (let x = offset[0]; x < this.width || x < offset[0] + size[0]; x++) {
for (let y = offset[1]; y < this.height || y < offset[1] + size[1]; y++) {
this.canvas[x][y].paint(node_id, rgb);
}
}
}
Expand All @@ -42,22 +53,24 @@ export class Canvas extends TopologyObject {
offset: [number, number],
rgb: [number, number, number],
): void {
if (offset[0] < 0 || this._canvas.length < offset[0]) return;
if (offset[1] < 0 || this._canvas[offset[0]].length < offset[1]) return;
if (offset[0] < 0 || this.canvas.length < offset[0]) return;
if (offset[1] < 0 || this.canvas[offset[0]].length < offset[1]) return;

this._canvas[offset[0]][offset[1]].paint(nodeId, rgb);
this.canvas[offset[0]][offset[1]].paint(nodeId, rgb);
}

/*
canvas(): [number, number, number][][] {
return this._canvas.map((row) => row.map((pixel) => pixel.color()));
}
*/

pixel(x: number, y: number): Pixel {
return this._canvas[x][y];
pixel(x: number, y: number): IPixel {
return this.canvas[x][y];
}

merge(peerCanvas: Canvas): void {
this._canvas.forEach((row, x) =>
this.canvas.forEach((row, x) =>
row.forEach((pixel, y) => pixel.merge(peerCanvas.pixel(x, y))),
);
}
Expand Down
48 changes: 29 additions & 19 deletions examples/canvas/src/objects/pixel.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,50 @@
import { GCounter } from "@topology-foundation/crdt";
import { GCounter, IGCounter } from "@topology-foundation/crdt";
import { TopologyObject } from "@topology-foundation/object";

export class Pixel extends TopologyObject {
private _red: GCounter;
private _green: GCounter;
private _blue: GCounter;
export interface IPixel {
red: IGCounter;
green: IGCounter;
blue: IGCounter;
color(): [number, number, number];
paint(nodeId: string, rgb: [number, number, number]): void;
counters(): [IGCounter, IGCounter, IGCounter];
merge(peerPixel: IPixel): void;
}

export class Pixel extends TopologyObject implements IPixel {
red: IGCounter;
green: IGCounter;
blue: IGCounter;

constructor() {
super();
this._red = new GCounter({});
this._green = new GCounter({});
this._blue = new GCounter({});
this.red = new GCounter({});
this.green = new GCounter({});
this.blue = new GCounter({});
}

color(): [number, number, number] {
return [
this._red.value() % 256,
this._green.value() % 256,
this._blue.value() % 256,
this.red.value() % 256,
this.green.value() % 256,
this.blue.value() % 256,
];
}

paint(nodeId: string, rgb: [number, number, number]): void {
this._red.increment(nodeId, rgb[0]);
this._green.increment(nodeId, rgb[1]);
this._blue.increment(nodeId, rgb[2]);
this.red.increment(nodeId, rgb[0]);
this.green.increment(nodeId, rgb[1]);
this.blue.increment(nodeId, rgb[2]);
}

counters(): [GCounter, GCounter, GCounter] {
return [this._red, this._green, this._blue];
counters(): [IGCounter, IGCounter, IGCounter] {
return [this.red, this.green, this.blue];
}

merge(peerPixel: Pixel): void {
let peerCounters = peerPixel.counters();
this._red.merge(peerCounters[0]);
this._green.merge(peerCounters[1]);
this._blue.merge(peerCounters[2]);
this.red.merge(peerCounters[0]);
this.green.merge(peerCounters[1]);
this.blue.merge(peerCounters[2]);
}
}
46 changes: 24 additions & 22 deletions packages/crdt/src/builtins/GCounter/index.ts
Original file line number Diff line number Diff line change
@@ -1,48 +1,50 @@
export interface IGCounter {
globalCounter: number;
counts: { [nodeKey: string]: number };
value(): number;
increment(nodeId: string, amount: number): void;
compare(peerCounter: IGCounter): boolean;
merge(peerCounter: IGCounter): void;
}

/// GCounter with support for state and op changes
export class GCounter {
private _globalCounter: number;
export class GCounter implements IGCounter {
globalCounter: number;
// instead of standard incremental id for replicas
// we map the counter with the node id
private _counts: { [nodeId: string]: number };
counts: { [nodeId: string]: number };

constructor(counts: { [nodeId: string]: number }) {
this._globalCounter = Object.values(counts).reduce((a, b) => a + b, 0);
this._counts = counts;
this.globalCounter = Object.values(counts).reduce((a, b) => a + b, 0);
this.counts = counts;
}

value(): number {
return this._globalCounter;
return this.globalCounter;
}

increment(nodeId: string, amount: number): void {
this._globalCounter += amount;
this._counts[nodeId] += amount;
}

counts(): { [nodeKey: string]: number } {
return this._counts;
this.globalCounter += amount;
this.counts[nodeId] += amount;
}

compare(peerCounter: GCounter): boolean {
for (let key in Object.keys(this._counts)) {
if (this._counts[key] > peerCounter.counts()[key]) {
compare(peerCounter: IGCounter): boolean {
for (let key in Object.keys(this.counts)) {
if (this.counts[key] > peerCounter.counts[key]) {
return false;
}
}
return true;
}

merge(peerCounter: GCounter): void {
merge(peerCounter: IGCounter): void {
let temp: { [nodeKey: string]: number } = Object.assign(
{},
this._counts,
peerCounter.counts(),
this.counts,
peerCounter.counts,
);
Object.keys(temp).forEach((key) => {
this._counts[key] = Math.max(
this._counts[key],
peerCounter.counts()[key],
);
this.counts[key] = Math.max(this.counts[key], peerCounter.counts[key]);
});
}
}
2 changes: 1 addition & 1 deletion packages/crdt/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// TODO dumb export logic, fix this
export { GCounter } from "./builtins/GCounter";
export { IGCounter, GCounter } from "./builtins/GCounter";
export { PNCounter } from "./builtins/PNCounter";
export { GSet } from "./builtins/GSet";
export { TwoPSet } from "./builtins/2PSet";
40 changes: 31 additions & 9 deletions packages/network/src/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ export class TopologyNetworkNode {
"topology::network::start: Successfuly started topology network w/ peer_id",
this.peerId,
);

this._node.addEventListener("peer:connect", (evt) => {
console.log("topology::network::peer::connect: ", evt.detail.toString());
});
}

subscribe(topic: string) {
Expand All @@ -78,6 +82,7 @@ export class TopologyNetworkNode {

try {
this._pubsub?.subscribe(topic);
this._pubsub?.getPeers();
console.log(
"topology::network::subscribe: Successfuly subscribed the topic",
topic,
Expand Down Expand Up @@ -121,22 +126,39 @@ export class TopologyNetworkNode {
}

async sendMessage(peerId: string, protocols: string[], message: string) {
const connection = await this._node?.dial([multiaddr(peerId)]);
const stream: Stream = (await connection?.newStream(protocols)) as Stream;
stringToStream(stream, message);
try {
const connection = await this._node?.dial([multiaddr(`/p2p/${peerId}`)]);
const stream = <Stream>await connection?.newStream(protocols);
stringToStream(stream, message);

console.log(
`topology::network::sendMessage: Successfuly sent message to peer: ${peerId} with message: ${message}`,
);
} catch (e) {
console.error("topology::network::sendMessage:", e);
}
}

async sendMessageRandomTopicPeer(
topic: string,
protocols: string[],
message: string,
) {
const peers = this._pubsub?.getSubscribers(topic);
if (!peers) return;
const peerId = peers[Math.floor(Math.random() * peers.length)].toString();
const connection = await this._node?.dial([multiaddr(peerId)]);
const stream: Stream = (await connection?.newStream(protocols)) as Stream;
stringToStream(stream, message);
try {
const peers = this._pubsub?.getSubscribers(topic);
if (!peers || peers.length === 0) throw Error("Topic wo/ peers");
const peerId = peers[Math.floor(Math.random() * peers.length)];

const connection = await this._node?.dial(peerId);
const stream: Stream = (await connection?.newStream(protocols)) as Stream;
stringToStream(stream, message);

console.log(
`topology::network::sendMessageRandomTopicPeer: Successfuly sent message to peer: ${peerId} with message: ${message}`,
);
} catch (e) {
console.error("topology::network::sendMessageRandomTopicPeer:", e);
}
}

addPubsubEventListener(
Expand Down
9 changes: 5 additions & 4 deletions packages/network/src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import { pipe } from "it-pipe";
import { fromString as uint8ArrayFromString } from "uint8arrays/from-string";
import { toString as uint8ArrayToString } from "uint8arrays/to-string";

export function stringToStream(stream: Stream, input: string) {
return pipe(
export async function stringToStream(stream: Stream, input: string) {
await pipe(
input,
(source) => map(source, (string) => uint8ArrayFromString(string)),
(source) => lp.encode(source),
Expand All @@ -42,10 +42,11 @@ export async function streamToString(stream: Stream) {
(source) => lp.decode(source),
(source) => map(source, (buf) => uint8ArrayToString(buf.subarray())),
async function (source) {
let output: string[] = [];
for await (const msg of source) {
// one-line json obj
return msg.toString().replace("\n", "");
output.push(msg.toString().replace("\n", ""));
}
return output.join("").trim();
},
);
}
Loading

0 comments on commit f636b65

Please sign in to comment.