Skip to content

Commit

Permalink
fix: add an exception catch to the streaming function (#451)
Browse files Browse the repository at this point in the history
Co-authored-by: droak <[email protected]>
  • Loading branch information
magnified103 and d-roak authored Feb 10, 2025
1 parent 36286ca commit a78078f
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 12 deletions.
19 changes: 12 additions & 7 deletions packages/node/src/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,18 @@ import { deserializeStateMessage, serializeStateMessage } from "./utils.js";
*/
export async function drpMessagesHandler(node: DRPNode, stream?: Stream, data?: Uint8Array) {
let message: NetworkPb.Message;
if (stream) {
const byteArray = await streamToUint8Array(stream);
message = NetworkPb.Message.decode(byteArray);
} else if (data) {
message = NetworkPb.Message.decode(data);
} else {
log.error("::messageHandler: Stream and data are undefined");
try {
if (stream) {
const byteArray = await streamToUint8Array(stream);
message = NetworkPb.Message.decode(byteArray);
} else if (data) {
message = NetworkPb.Message.decode(data);
} else {
log.error("::messageHandler: Stream and data are undefined");
return;
}
} catch (err) {
log.error("::messageHandler: Error decoding message", err);
return;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,52 @@
import { Connection, IdentifyResult, Libp2p } from "@libp2p/interface";
import type { Connection, IdentifyResult, Libp2p, Stream } from "@libp2p/interface";
import { SetDRP } from "@ts-drp/blueprints";
import { DRPNetworkNode, DRPNetworkNodeConfig, NetworkPb } from "@ts-drp/network";
import { DRPNetworkNode, type DRPNetworkNodeConfig, NetworkPb } from "@ts-drp/network";
import { DrpType } from "@ts-drp/object";
import { DRPObject, ObjectACL } from "@ts-drp/object";
import { type DRPObject, ObjectACL } from "@ts-drp/object";
import { raceEvent } from "race-event";
import { beforeAll, describe, expect, test, afterAll } from "vitest";
import { beforeAll, describe, expect, test, afterAll, vi } from "vitest";

import { signGeneratedVertices } from "../src/handlers.js";
import { drpMessagesHandler, signGeneratedVertices } from "../src/handlers.js";
import { DRPNode } from "../src/index.js";

describe("drpMessagesHandler inputs", () => {
let node: DRPNode;
const consoleSpy = vi.spyOn(console, "error");

beforeAll(async () => {
node = new DRPNode();
});

test("normal inputs", async () => {
await drpMessagesHandler(node);
expect(consoleSpy).toHaveBeenLastCalledWith(
"drp::node ::messageHandler: Stream and data are undefined"
);

const msg = NetworkPb.Message.create({
sender: node.networkNode.peerId,
type: -1,
data: new Uint8Array(),
});
await drpMessagesHandler(node, undefined, msg.data);
expect(consoleSpy).toHaveBeenLastCalledWith("drp::node ::messageHandler: Invalid operation");

await drpMessagesHandler(
node,
{
close: async () => {},
closeRead: async () => {},
closeWrite: async () => {},
} as Stream,
undefined
);
expect(consoleSpy).toHaveBeenLastCalledWith(
"drp::node ::messageHandler: Error decoding message",
new Error("Empty pipeline")
);
});
});

describe("Handle message correctly", () => {
const controller = new AbortController();
let node1: DRPNode;
Expand Down

0 comments on commit a78078f

Please sign in to comment.