From 09a3713471944cac0d6f2872e313b52d4db37a2a Mon Sep 17 00:00:00 2001 From: hasundue Date: Wed, 4 Oct 2023 07:47:04 +0900 Subject: [PATCH] feat(relay): respond to EVENT messages with OK messages --- client.ts | 3 +-- deno.json | 2 +- relay.ts | 44 +++++++++++++++++++++++++++++--------------- tests/relay_test.ts | 17 ++++++++++++++--- 4 files changed, 45 insertions(+), 21 deletions(-) diff --git a/client.ts b/client.ts index d91df80..39cb24b 100644 --- a/client.ts +++ b/client.ts @@ -176,8 +176,7 @@ export class Relay extends NostrNode { (resolve, reject) => { this.#published.set(event.id, { resolve, reject }); }, - ).finally(async () => { - await this.#publisher.ready; + ).finally(() => { this.#published.delete(event.id); }); diff --git a/deno.json b/deno.json index 6f3d482..15977d4 100644 --- a/deno.json +++ b/deno.json @@ -4,7 +4,7 @@ "check": "deno check ./**/*.ts", "test": "deno test -A --no-check", "build": "mkdir -p ./dist && deno run -A ./bin/bundle.ts", - "dev": "deno fmt && deno task -q check && deno task -q test" + "dev": "deno fmt && deno lint && deno task -q check && deno task -q test" }, "exclude": [ "dist/", diff --git a/relay.ts b/relay.ts index 4722cad..8d51972 100644 --- a/relay.ts +++ b/relay.ts @@ -43,33 +43,47 @@ export class Client extends NostrNode { enqueueRequest = controller.enqueue.bind(controller); }, }); - this.ws.addEventListener("message", (ev: MessageEvent) => { + const writer = this.getWriter(); + this.ws.addEventListener("message", async (ev: MessageEvent) => { // TODO: Validate the type of the message. const msg = JSON.parse(ev.data) as ClientToRelayMessage; + // TODO: Apply backpressure when a queue is full. + const kind = msg[0]; if (kind === "EVENT") { - return enqueueEvent(msg[1]); + const event = msg[1]; + + // TODO: Validate the event and send OkMessage if necessary. + + await writer.ready; + writer.write(["OK", event.id, true, ""]); + return enqueueEvent(event); } - const id = msg[1]; + const sid = msg[1]; if (kind === "CLOSE") { - const sub = this.subscriptions.get(id); + const sub = this.subscriptions.get(sid); if (!sub) { + this.config.logger?.warn?.("Unknown subscription:", sid); return; } - this.subscriptions.delete(id); + this.subscriptions.delete(sid); return sub.close(); } - // kind === "REQ" - const filter = msg[2]; - const writer = this.getWriter(); - const writable = new WritableStream({ - write: (event) => { - return writer.write(["EVENT", id, event]); - }, - }); - this.subscriptions.set(id, writable); - return enqueueRequest([id, filter]); + if (kind === "REQ") { + const filter = msg[2]; + this.subscriptions.set( + sid, + new WritableStream({ + write: async (event) => { + await writer.ready; + return writer.write(["EVENT", sid, event]); + }, + }), + ); + return enqueueRequest([sid, filter]); + } + this.config.logger?.warn?.("Unknown message kind:", kind); }); } diff --git a/tests/relay_test.ts b/tests/relay_test.ts index 2aec24a..6a50b8c 100644 --- a/tests/relay_test.ts +++ b/tests/relay_test.ts @@ -1,4 +1,9 @@ -import { EventMessage, NostrEvent, SubscriptionId } from "../core/nips/01.ts"; +import { + EventMessage, + NostrEvent, + OkMessage, + SubscriptionId, +} from "../core/nips/01.ts"; import { Client } from "../relay.ts"; import { afterAll, beforeAll, describe, it } from "../lib/std/testing.ts"; import { assert, assertEquals } from "../lib/std/assert.ts"; @@ -27,8 +32,13 @@ describe("Client", () => { it("should return a ReadableStream of events", () => { assert(client.events instanceof ReadableStream); }); - it("should receive events", async () => { - const ev = { kind: 0 }; + it("should receive an event and send a OK message", async () => { + const ev = { id: "test-ok", kind: 0 }; + const received = new Promise((resolve) => { + ws.remote.addEventListener("message", (ev: MessageEvent) => { + resolve(JSON.parse(ev.data)); + }); + }); ws.dispatchEvent( new MessageEvent("message", { data: JSON.stringify(["EVENT", ev]), @@ -37,6 +47,7 @@ describe("Client", () => { const reader = client.events.getReader(); const { value } = await reader.read(); assertEquals(value, ev); + assertEquals(await received, ["OK", "test-ok", true, ""]); }); });