Skip to content

Commit

Permalink
feat(core/relays): implement reconnection logic (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
hasundue authored Feb 26, 2024
1 parent 5b27059 commit 947df5e
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 76 deletions.
44 changes: 33 additions & 11 deletions core/relays.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import type { Stringified } from "./types.ts";
import type {
ClientToRelayMessage,
EventId,
EventKind,
NostrEvent,
RelayToClientMessage,
RelayToClientMessageType,
RelayUrl,
SubscriptionFilter,
SubscriptionId,
Expand Down Expand Up @@ -97,6 +99,13 @@ export class Relay extends NostrNode<
);
}

dispatch<T extends RelayEventType>(
type: T,
data: RelayEventTypeRecord[T],
): void {
this.dispatchEvent(new RelayEvent(type, data));
}

subscribe<K extends EventKind>(
filter: SubscriptionFilter<K> | SubscriptionFilter<K>[],
options: Partial<SubscriptionOptions> = {},
Expand All @@ -108,19 +117,21 @@ export class Relay extends NostrNode<
filters: [filter].flat(),
options,
};
const resubscribe = () => this.dispatch("resubscribe", { ...context });
return new ReadableStream<NostrEvent<K>>({
start: (controller) => {
this.dispatchEvent(
new RelayEvent("subscribe", { ...context, controller }),
this.addEventListener(
context.id,
() => this.ws.removeEventListener("close", resubscribe),
);
this.dispatch("subscribe", { ...context, controller });
},
pull: (controller) => {
this.dispatchEvent(new RelayEvent("pull", { ...context, controller }));
pull: () => {
this.ws.addEventListener("close", resubscribe, { once: true });
this.ws.ready();
},
cancel: (reason) => {
this.dispatchEvent(
new RelayEvent("unsubscribe", { ...context, reason }),
);
this.dispatch("unsubscribe", { ...context, reason });
},
}, new CountQueuingStrategy({ highWaterMark: options.nbuffer }));
}
Expand All @@ -133,9 +144,7 @@ export class Relay extends NostrNode<
*/
publish<K extends EventKind>(event: NostrEvent<K>): Promise<void> {
return new Promise<void>((resolve, reject) => {
this.dispatchEvent(
new RelayEvent("publish", { event, resolve, reject }),
);
this.dispatch("publish", { event, resolve, reject });
this.ws.addEventListener(
"close",
() => reject(new ConnectionClosed()),
Expand Down Expand Up @@ -183,12 +192,25 @@ export interface PublicationContext {
reject: (reason: unknown) => void;
}

type SubscriptionMessage = {
[T in RelayToClientMessageType]: RelayToClientMessage<T>[1] extends
SubscriptionId ? RelayToClientMessage<T> : never;
}[RelayToClientMessageType];

type PublicationMessage = {
[T in RelayToClientMessageType]: RelayToClientMessage<T>[1] extends EventId
? RelayToClientMessage<T>
: never;
}[RelayToClientMessageType];

export interface RelayEventTypeRecord {
message: RelayToClientMessage;
subscribe: SubscriptionContextWithController;
pull: SubscriptionContextWithController;
resubscribe: SubscriptionContext;
unsubscribe: SubscriptionContextWithReason;
publish: PublicationContext;
[id: SubscriptionId]: SubscriptionMessage;
[id: EventId]: PublicationMessage;
}

export type RelayEventType = keyof RelayEventTypeRecord;
Expand Down
4 changes: 4 additions & 0 deletions core/websockets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type EventListenerOptionsMap = Map<
*/
export class LazyWebSocket implements WebSocketLike {
#ws?: WebSocket;
readonly #ac = new AbortController();
readonly #createWebSocket: () => WebSocket;
readonly #eventListenerMap = new Map<
WebSocketEventType,
Expand Down Expand Up @@ -86,6 +87,7 @@ export class LazyWebSocket implements WebSocketLike {
if (!this.#ws) {
return;
}
this.#ac.abort();
switch (this.#ws.readyState) {
case WebSocket.CONNECTING:
await this.#once("open");
Expand All @@ -111,6 +113,8 @@ export class LazyWebSocket implements WebSocketLike {
listener: EventListenerOrEventListenerObject,
options: boolean | AddEventListenerOptions = {},
) => {
options = typeof options === "boolean" ? { capture: options } : options;
options = { signal: this.#ac.signal, ...options };
this.#ws?.addEventListener(type, listener, options);
const map = this.#eventListenerMap.get(type);
if (map) {
Expand Down
16 changes: 13 additions & 3 deletions lib/testing.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
type MessageEventData = string | ArrayBufferLike | Blob | ArrayBufferView;

export class MockWebSocket extends EventTarget implements WebSocket {
/**
* A list of all instances of MockWebSocket.
* An instance is removed from this list when it is closed.
*/
static get instances(): MockWebSocket[] {
return this.#instances;
return Array.from(this.#instances);
}
static #instances = new Set<MockWebSocket>();

static get first(): MockWebSocket | undefined {
return this.instances[0];
}
static #instances: MockWebSocket[] = [];

constructor(url?: string | URL, protocols?: string | string[]) {
super();
this.url = url?.toString() ?? "";
this.protocol = protocols ? [...protocols].flat()[0] : "";
MockWebSocket.#instances.push(this);
MockWebSocket.#instances.add(this);
// Simulate async behavior of WebSocket as much as possible.
queueMicrotask(() => {
this.#readyState = 1;
Expand Down Expand Up @@ -50,9 +58,11 @@ export class MockWebSocket extends EventTarget implements WebSocket {
if (this.#remote) {
this.#remote.#readyState = 3;
this.#remote.dispatchEvent(new CloseEvent("close", { code, reason }));
MockWebSocket.#instances.delete(this.#remote);
}
this.#readyState = 3;
this.dispatchEvent(new CloseEvent("close", { code, reason }));
MockWebSocket.#instances.delete(this);
});
}

Expand Down
37 changes: 5 additions & 32 deletions nips/01/relays.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,5 @@
import type {
EventId,
RelayToClientMessage,
RelayToClientMessageType,
SubscriptionId,
} from "../../core/protocol.d.ts";
import { EventRejected, RelayEvent, RelayModule } from "../../core/relays.ts";

type SubscriptionMessage = {
[T in RelayToClientMessageType]: RelayToClientMessage<T>[1] extends
SubscriptionId ? RelayToClientMessage<T> : never;
}[RelayToClientMessageType];

type PublicationMessage = {
[T in RelayToClientMessageType]: RelayToClientMessage<T>[1] extends EventId
? RelayToClientMessage<T>
: never;
}[RelayToClientMessageType];

type ExtentionalEventTypeRecord =
& {
[id in SubscriptionId]: SubscriptionMessage;
}
& {
[id in EventId]: PublicationMessage;
};

declare module "../../core/relays.ts" {
// deno-lint-ignore no-empty-interface
interface RelayEventTypeRecord extends ExtentionalEventTypeRecord {}
}

export class SubscriptionClosed extends Error {}

const install: RelayModule["default"] = (relay) => {
Expand Down Expand Up @@ -65,11 +35,14 @@ const install: RelayModule["default"] = (relay) => {
}
}
});
return relay.send(["REQ", id, ...filters]);
relay.send(["REQ", id, ...filters]);
});
relay.addEventListener("resubscribe", ({ data: { id, filters } }) => {
relay.send(["REQ", id, ...filters]);
});
relay.addEventListener("unsubscribe", ({ data: { id } }) => {
if (relay.status === WebSocket.OPEN) {
return relay.send(["CLOSE", id]);
relay.send(["CLOSE", id]);
}
});
relay.addEventListener("publish", (ev) => {
Expand Down
92 changes: 62 additions & 30 deletions nips/01/relays_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ import {
} from "../../core/relays.ts?nips=1";
import { SubscriptionClosed } from "../../nips/01/relays.ts";

describe("NIP-01/Relay", () => {
function getRemoteSocket() {
return MockWebSocket.instances[0].remote;
}

describe("Relay (NIP-01)", () => {
const url = "wss://localhost:8080";
let relay: Relay;
let sub_0: ReadableStream<NostrEvent<0>>;
Expand All @@ -30,6 +34,7 @@ describe("NIP-01/Relay", () => {
globalThis.WebSocket = MockWebSocket;
relay = new Relay(url);
});

afterAll(() => {
if (relay.status === WebSocket.OPEN) {
return relay.close();
Expand All @@ -39,33 +44,33 @@ describe("NIP-01/Relay", () => {
it("should have loaded NIP-01 module", () => {
assertEquals(relay.config.modules.length, 1);
});

it("should create a subscription", () => {
sub_1 = relay.subscribe({ kinds: [1] }, { id: "test-1" });
assertInstanceOf(sub_1, ReadableStream);
});

it("should receive text notes", async () => {
const reader = sub_1.getReader();
const read = reader.read();
MockWebSocket.instances[0].remote.send(
JSON.stringify(["EVENT", "test-1", { kind: 1 }]),
);
getRemoteSocket().send(JSON.stringify(["EVENT", "test-1", { kind: 1 }]));
const { value, done } = await read;
assert(!done);
assertEquals(value.kind, 1);
reader.releaseLock();
});

it("should be able to open multiple subscriptions", () => {
sub_0 = relay.subscribe({ kinds: [0], limit: 1 }, {
id: "test-0",
});
sub_0 = relay.subscribe({ kinds: [0] }, { id: "test-0" });
assert(sub_0 instanceof ReadableStream);
});

it("should recieve metas and notes simultaneously", async () => {
const reader_0 = sub_0.getReader();
const reader_1 = sub_1.getReader();
const ws = MockWebSocket.instances[0];
ws.remote.send(JSON.stringify(["EVENT", "test-0", { kind: 0 }]));
ws.remote.send(JSON.stringify(["EVENT", "test-1", { kind: 1 }]));
const remote = getRemoteSocket();
remote.send(JSON.stringify(["EVENT", "test-0", { kind: 0 }]));
remote.send(JSON.stringify(["EVENT", "test-1", { kind: 1 }]));
const [{ value: value_0 }, { value: value_1 }] = await Promise.all([
reader_0.read(),
reader_1.read(),
Expand All @@ -77,18 +82,54 @@ describe("NIP-01/Relay", () => {
reader_0.releaseLock();
reader_1.releaseLock();
});

it("should close a subscription with an error when receiving a CLOSED message", async () => {
getRemoteSocket().send(JSON.stringify(
[
"CLOSED",
"test-1" as SubscriptionId,
"error: test",
] satisfies RelayToClientMessage<"CLOSED">,
));
const reader = sub_1.getReader();
try {
await reader.read();
} catch (e) {
assertInstanceOf(e, SubscriptionClosed);
assertEquals(e.message, "error: test");
} finally {
reader.releaseLock();
}
});

it("should reconnect if connection is closed while waiting for an event", async () => {
const reader = sub_0.getReader();
const read = reader.read();
getRemoteSocket().close();
const reconnected = new Promise<true>((resolve) => {
relay.ws.addEventListener("open", () => resolve(true));
});
assert(await reconnected);
// We must use a new instance of MockWebSocket.
getRemoteSocket().send(JSON.stringify(["EVENT", "test-0", { kind: 0 }]));
const { value, done } = await read;
assert(!done);
assertEquals(value.kind, 0);
reader.releaseLock();
});

it("should publish an event and recieve an accepting OK message", async () => {
const eid = "test-true" as EventId;
const ws = MockWebSocket.instances[0];
const remote = getRemoteSocket();
const arrived = new Promise<true>((resolve) => {
ws.remote.addEventListener(
remote.addEventListener(
"message",
(ev: MessageEvent<string>) => {
// deno-fmt-ignore
const [, event] = JSON.parse(ev.data) as ClientToRelayMessage<"EVENT">;
if (event.id === eid) {
assertEquals(event.kind, 1);
ws.remote.send(
remote.send(
JSON.stringify(
["OK", eid, true, ""] satisfies RelayToClientMessage<"OK">,
),
Expand All @@ -102,21 +143,22 @@ describe("NIP-01/Relay", () => {
await relay.publish({ id: eid, kind: 1 } as any);
assert(await arrived);
});

it("should receieve a rejecting OK message and throw EventRejected", async () => {
const eid = "test-false" as EventId;
// deno-fmt-ignore
const msg = ["OK", eid, false, "error: test"] satisfies RelayToClientMessage<"OK">
const ws = MockWebSocket.instances[0];
const remote = getRemoteSocket();
const arrived = new Promise<true>((resolve) => {
ws.remote.addEventListener(
remote.addEventListener(
"message",
(ev: MessageEvent<string>) => {
// deno-fmt-ignore
const [, event] = JSON.parse(ev.data) as ClientToRelayMessage<"EVENT">;
if (event.id === eid) {
assertEquals(event.kind, 1);
resolve(true);
ws.remote.send(JSON.stringify(msg));
remote.send(JSON.stringify(msg));
}
},
);
Expand All @@ -132,26 +174,16 @@ describe("NIP-01/Relay", () => {
}
await arrived;
});

it("should throw ConnectionClosed when connection is closed before recieving an OK message", async () => {
const event = { id: "test-close" as EventId, kind: 1 };
// deno-lint-ignore no-explicit-any
const published = relay.publish(event as any).catch((e) => e);
MockWebSocket.instances[0].remote.close();
assertInstanceOf(await published, ConnectionClosed);
});
it("should close a subscription with an error when receiving a CLOSED message", async () => {
MockWebSocket.instances[0].remote.send(JSON.stringify(
[
"CLOSED",
"test-1" as SubscriptionId,
"error: test",
] satisfies RelayToClientMessage<"CLOSED">,
));
getRemoteSocket().close();
try {
await sub_1.getReader().read();
await published;
} catch (e) {
assertInstanceOf(e, SubscriptionClosed);
assertEquals(e.message, "error: test");
assertInstanceOf(e, ConnectionClosed);
}
});
});

0 comments on commit 947df5e

Please sign in to comment.