From 6defcee7e77d2dc570b2977d535bc3bb372cce03 Mon Sep 17 00:00:00 2001 From: hasundue Date: Tue, 17 Oct 2023 15:02:00 +0900 Subject: [PATCH] refactor!: rename RelayPool to RelayGroup and modify the API --- core/nodes.ts | 4 +- core/relays.ts | 28 +++++++++----- core/websockets.ts | 45 ++++++++-------------- core/x/async.ts | 1 - deno.lock | 1 - lib/{pools.ts => relays.ts} | 31 +++++++-------- lib/relays_test.ts | 75 +++++++++++++++++++++++++++++++++++++ lib/streams.ts | 29 ++++++++++++-- lib/streams_test.ts | 26 ++++++++++++- nips/01/relays.ts | 7 +--- nips/01/relays_test.ts | 5 +-- 11 files changed, 180 insertions(+), 72 deletions(-) rename lib/{pools.ts => relays.ts} (68%) create mode 100644 lib/relays_test.ts diff --git a/core/nodes.ts b/core/nodes.ts index 7b4e073..2a494a4 100644 --- a/core/nodes.ts +++ b/core/nodes.ts @@ -1,6 +1,6 @@ import type { NostrMessage } from "./protocol.d.ts"; import type { Logger } from "./types.ts"; -import { WebSocketLike, WebSocketReadyState } from "./websockets.ts"; +import { WebSocketLike } from "./websockets.ts"; import { NonExclusiveWritableStream } from "./streams.ts"; export interface NostrNodeConfig< @@ -39,7 +39,7 @@ export class NostrNode< this.config.modules.forEach((m) => this.addModule(m)); } - get status(): WebSocketReadyState { + get status(): WebSocket["readyState"] { return this.ws.readyState; } diff --git a/core/relays.ts b/core/relays.ts index 124b986..36dbaed 100644 --- a/core/relays.ts +++ b/core/relays.ts @@ -37,12 +37,6 @@ export class ConnectionClosed extends Error {} // Interfaces // ---------------------- -export interface RelayLike - extends NonExclusiveWritableStream { - subscribe: Relay["subscribe"]; - publish: Relay["publish"]; -} - export interface RelayConfig extends NostrNodeConfig { url: RelayUrl; @@ -131,10 +125,6 @@ export class Relay extends NostrNode< ); this.callFunction("startSubscription", { controller, ...context }); }, - pull: async () => { - await this.ws.ready(); - // TODO: backpressure - }, cancel: (reason) => { return this.callFunction("closeSubscription", { reason, ...context }); }, @@ -170,6 +160,24 @@ export class Relay extends NostrNode< } } +// ---------------------- +// RelayLikes +// ---------------------- + +export interface RelayLike + extends NonExclusiveWritableStream { + readonly config: RelayLikeConfig; + subscribe: Relay["subscribe"]; + publish: Relay["publish"]; +} + +export type RelayLikeConfig = Omit< + RelayConfig, + "url" | keyof NostrNodeConfig +>; + +export type RelayLikeOptions = Partial; + // ---------------------- // Events // ---------------------- diff --git a/core/websockets.ts b/core/websockets.ts index 8e05ae2..b2bf283 100644 --- a/core/websockets.ts +++ b/core/websockets.ts @@ -1,10 +1,8 @@ -import { Notify } from "./x/async.ts"; - export type WebSocketEventType = keyof WebSocketEventMap; export interface WebSocketLike { readonly url: string; - readonly readyState: WebSocketReadyState; + readonly readyState: WebSocket["readyState"]; send( data: string | ArrayBufferLike | Blob | ArrayBufferView, ): void | Promise; @@ -25,14 +23,11 @@ type EventListenerOptionsMap = Map< */ export class LazyWebSocket implements WebSocketLike { #ws?: WebSocket; - #createWebSocket: () => WebSocket; - + readonly #createWebSocket: () => WebSocket; readonly #eventListenerMap = new Map< WebSocketEventType, EventListenerOptionsMap >(); - readonly #notifier = new Notify(); - readonly url: string; constructor( @@ -41,11 +36,6 @@ export class LazyWebSocket implements WebSocketLike { ) { this.#createWebSocket = () => { const ws = new WebSocket(url, protocols); - for (const type of ["close", "open"] as const) { - ws.addEventListener(type, () => { - this.#notifier.notifyAll(); - }); - } this.#eventListenerMap.forEach((map, type) => { map.forEach((options, listener) => { ws.addEventListener(type, listener, options); @@ -60,21 +50,26 @@ export class LazyWebSocket implements WebSocketLike { return this.#ws ??= this.#createWebSocket(); } + #once(type: WebSocketEventType): Promise { + return new Promise((resolve) => { + this.#created().addEventListener(type, () => resolve(), { once: true }); + }); + } + async #ready(): Promise { this.#ws = this.#created(); switch (this.#ws.readyState) { case WebSocket.CONNECTING: - await this.#notifier.notified(); - /* falls through */ - case WebSocket.OPEN: break; + case WebSocket.OPEN: + return this.#ws; case WebSocket.CLOSING: - await this.#notifier.notified(); + await this.#once("close"); /* falls through */ case WebSocket.CLOSED: this.#ws = this.#createWebSocket(); - await this.#notifier.notified(); } + await this.#once("open"); return this.#ws; } @@ -93,13 +88,13 @@ export class LazyWebSocket implements WebSocketLike { } switch (this.#ws.readyState) { case WebSocket.CONNECTING: - await this.#notifier.notified(); + await this.#once("open"); /* falls through */ case WebSocket.OPEN: this.#ws.close(code, reason); /* falls through */ case WebSocket.CLOSING: - await this.#notifier.notified(); + await this.#once("close"); /* falls through */ case WebSocket.CLOSED: break; @@ -107,7 +102,7 @@ export class LazyWebSocket implements WebSocketLike { this.#ws = undefined; } - get readyState(): WebSocketReadyState { + get readyState(): WebSocket["readyState"] { return this.#ws ? this.#ws.readyState : WebSocket.CLOSED; } @@ -138,13 +133,3 @@ export class LazyWebSocket implements WebSocketLike { return this.#ws?.dispatchEvent(event) ?? false; }; } - -/** - * The ready state of a WebSocket. - */ -export enum WebSocketReadyState { - CONNECTING = 0, - OPEN = 1, - CLOSING = 2, - CLOSED = 3, -} diff --git a/core/x/async.ts b/core/x/async.ts index 06b05e5..81a1734 100644 --- a/core/x/async.ts +++ b/core/x/async.ts @@ -1,2 +1 @@ -export { Notify } from "https://deno.land/x/async@v2.0.2/notify.ts"; export { Lock } from "https://deno.land/x/async@v2.0.2/lock.ts"; diff --git a/deno.lock b/deno.lock index 9e389aa..f996195 100644 --- a/deno.lock +++ b/deno.lock @@ -107,7 +107,6 @@ "https://deno.land/std@0.204.0/testing/types.ts": "6832887bea73ee3549dda953a8df41f8bfa8790219ab56a3b94b984a71a3b3e9", "https://deno.land/x/async@v2.0.2/lock.ts": "fd7dc5b54c72d0093072adddb1691e1ae4a801c55b2015589454ea60f1c59adb", "https://deno.land/x/async@v2.0.2/mutex.ts": "312dcad7468c82f84fd018be157df451361ed19bdc12fd59af8d12b2e6c3ae28", - "https://deno.land/x/async@v2.0.2/notify.ts": "3127ab5835c97527fdf978a5ec5844d71a2fd62a75988db6f175aac6f61259d0", "https://deno.land/x/deno_cache@0.5.2/auth_tokens.ts": "5d1d56474c54a9d152e44d43ea17c2e6a398dd1e9682c69811a313567c01ee1e", "https://deno.land/x/deno_cache@0.5.2/cache.ts": "92ce8511e1e5c00fdf53a41619aa77d632ea8e0fc711324322e4d5ebf8133911", "https://deno.land/x/deno_cache@0.5.2/deno_dir.ts": "1ea355b8ba11c630d076b222b197cfc937dd81e5a4a260938997da99e8ff93a0", diff --git a/lib/pools.ts b/lib/relays.ts similarity index 68% rename from lib/pools.ts rename to lib/relays.ts index 6cb14f6..8370ae5 100644 --- a/lib/pools.ts +++ b/lib/relays.ts @@ -2,13 +2,12 @@ import type { ClientToRelayMessage, EventKind, NostrEvent, - RelayUrl, SubscriptionFilter, } from "../core/protocol.d.ts"; import { - Relay, - RelayInit, RelayLike, + RelayLikeConfig, + RelayLikeOptions, SubscriptionOptions, } from "../core/relays.ts"; import { NonExclusiveWritableStream } from "../core/streams.ts"; @@ -17,26 +16,28 @@ import { Distinctor, merge } from "../lib/streams.ts"; /** * A pool of relays that can be used as a single relay. */ -export class RelayPool extends NonExclusiveWritableStream +export class RelayGroup extends NonExclusiveWritableStream implements RelayLike { - readonly relays: Relay[]; - - #relays_read: Relay[]; - - constructor(...init: (RelayUrl | RelayInit)[]) { - const relays = init.map((i) => new Relay(i)); + readonly config: Readonly; + #relays_read: RelayLike[]; + #relays_write: RelayLike[]; + constructor(readonly relays: RelayLike[], options?: RelayLikeOptions) { const writers = relays.filter((r) => r.config.write) .map((r) => r.getWriter()); - super({ async write(msg) { await Promise.all(writers.map((r) => r.write(msg))); }, - }, { highWaterMark: Math.max(...relays.map((r) => r.config.nbuffer)) }); - - this.relays = relays; + }); + this.config = { + name: relays.map((r) => r.config.name).join(", "), + read: true, + write: true, + ...options, + }; this.#relays_read = this.relays.filter((r) => r.config.read); + this.#relays_write = this.relays.filter((r) => r.config.write); } // ---------------------- @@ -54,7 +55,7 @@ export class RelayPool extends NonExclusiveWritableStream async publish( msg: NostrEvent, ) { - await Promise.all(this.relays.map((r) => r.publish(msg))); + await Promise.all(this.#relays_write.map((r) => r.publish(msg))); } // ---------------------- diff --git a/lib/relays_test.ts b/lib/relays_test.ts new file mode 100644 index 0000000..9ed1b24 --- /dev/null +++ b/lib/relays_test.ts @@ -0,0 +1,75 @@ +import { afterAll, beforeAll, describe, it } from "../lib/std/testing.ts"; +import { assertEquals, assertInstanceOf } from "../lib/std/assert.ts"; +import { NostrEvent } from "../core/protocol.d.ts"; +import { Relay } from "../core/relays.ts?nips=1"; +import { RelayGroup } from "../lib/relays.ts"; +import { MockWebSocket } from "../lib/testing.ts"; + +describe("RelayGroup", () => { + let relays: Relay[]; + let group: RelayGroup; + let sub: ReadableStream; + + // ---------------------- + // Setup + // ---------------------- + beforeAll(() => { + globalThis.WebSocket = MockWebSocket; + relays = [ + new Relay("ws://localhost:80", { + name: "relay-1", + read: true, + write: true, + }), + new Relay("ws://localhost:81", { + name: "relay-2", + read: true, + write: false, + }), + new Relay("ws://localhost:82", { + name: "relay-3", + read: false, + write: true, + }), + ]; + }); + afterAll(() => { + group.close(); + }); + + // ---------------------- + // Constructor + // ---------------------- + it("should create a group of relays", () => { + group = new RelayGroup(relays); + assertInstanceOf(group, RelayGroup); + }); + it("should not have a url", () => { + // @ts-expect-error RelayGroup does not have a url + assertEquals(group.url, undefined); + }); + it("should have a default name", () => { + assertEquals(group.config.name, "relay-1, relay-2, relay-3"); + }); + it("should have a custom name if provided", () => { + const group = new RelayGroup(relays, { name: "custom" }); + assertEquals(group.config.name, "custom"); + }); + it("should have default read and write config", () => { + assertEquals(group.config.read, true); + assertEquals(group.config.write, true); + }); + it("should have custom read and write config if provided", () => { + const group = new RelayGroup(relays, { read: false, write: false }); + assertEquals(group.config.read, false); + assertEquals(group.config.write, false); + }); + + // ---------------------- + // Subscription + // ---------------------- + it("should create a subscription", () => { + sub = group.subscribe({ kinds: [1] }, { id: "test-group" }); + assertInstanceOf(sub, ReadableStream); + }); +}); diff --git a/lib/streams.ts b/lib/streams.ts index 4e64151..d41a44f 100644 --- a/lib/streams.ts +++ b/lib/streams.ts @@ -1,12 +1,9 @@ -export { mergeReadableStreams as merge } from "./std/streams.ts"; - /** * TransformStream which filters out duplicate values from a stream. */ export class Distinctor extends TransformStream { #seen: Set; - constructor(protected readonly fn: (value: R) => T) { super({ transform: (value, controller) => { @@ -35,3 +32,29 @@ export class Transformer }); } } + +export function merge( + ...streams: ReadableStream[] +) { + const readers = streams.map((r) => r.getReader()); + return new ReadableStream({ + async pull(controller) { + await Promise.any(readers.map(async (r) => { + const { value, done } = await r.read(); + if (done) { + readers.splice(readers.indexOf(r), 1); + r.releaseLock(); + } + if (value) { + controller.enqueue(value); + } + })).catch((e) => { + if (e instanceof AggregateError) { + controller.close(); + } else { + throw e; + } + }); + }, + }); +} diff --git a/lib/streams_test.ts b/lib/streams_test.ts index 3fc718b..7b8a7e6 100644 --- a/lib/streams_test.ts +++ b/lib/streams_test.ts @@ -1,7 +1,7 @@ import { describe, it } from "../lib/std/testing.ts"; import { assertArrayIncludes, assertEquals } from "../lib/std/assert.ts"; import { collect } from "../lib/x/streamtools.ts"; -import { Distinctor } from "./streams.ts"; +import { Distinctor, merge } from "./streams.ts"; describe("Distinctor", () => { it("filters out duplicate values from a stream", async () => { @@ -19,3 +19,27 @@ describe("Distinctor", () => { assertArrayIncludes(values, [1, 2, 3]); }); }); + +describe("merge", () => { + it("merges multiple streams into one", async () => { + const streams = [ + new ReadableStream({ + start(controller) { + controller.enqueue(1); + controller.enqueue(2); + controller.close(); + }, + }), + new ReadableStream({ + start(controller) { + controller.enqueue(3); + controller.enqueue(4); + controller.close(); + }, + }), + ]; + const values = await collect(merge(...streams)); + assertEquals(values.length, 4); + assertArrayIncludes(values, [1, 2, 3, 4]); + }); +}); diff --git a/nips/01/relays.ts b/nips/01/relays.ts index 70dd2c5..5b969c7 100644 --- a/nips/01/relays.ts +++ b/nips/01/relays.ts @@ -66,12 +66,7 @@ export default { startSubscription({ filters, id, relay }) { const messenger = relay.getWriter(); - const request = () => messenger.write(["REQ", id, ...filters]); - if (relay.ws.readyState === WebSocket.OPEN) { - request(); - } - // To start the subscription when the relay (re)connects. - relay.ws.addEventListener("open", request); + return messenger.write(["REQ", id, ...filters]); }, async closeSubscription({ id, relay }) { diff --git a/nips/01/relays_test.ts b/nips/01/relays_test.ts index 3d25022..7cbb587 100644 --- a/nips/01/relays_test.ts +++ b/nips/01/relays_test.ts @@ -33,10 +33,9 @@ describe("NIP-01/Relay", () => { it("should have loaded NIP-01 module", () => { assertEquals(relay.config.modules.length, 1); }); - it("should not connect when a subscription is created", () => { + it("should create a subscription", () => { sub_1 = relay.subscribe({ kinds: [1] }, { id: "test-1" }); - assert(sub_1 instanceof ReadableStream); - assertEquals(relay.status, WebSocket.CLOSED); + assertInstanceOf(sub_1, ReadableStream); }); it("should receive text notes", async () => { const reader = sub_1.getReader();