Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: rename RelayPool to RelayGroup and modify the API #19

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/nodes.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { NostrMessage } from "./protocol.d.ts";
import type { Logger } from "./types.ts";
import { WebSocketLike, WebSocketReadyState } from "./websockets.ts";
import { WebSocketLike } from "./websockets.ts";
import { NonExclusiveWritableStream } from "./streams.ts";

export interface NostrNodeConfig<
Expand Down Expand Up @@ -39,7 +39,7 @@ export class NostrNode<
this.config.modules.forEach((m) => this.addModule(m));
}

get status(): WebSocketReadyState {
get status(): WebSocket["readyState"] {
return this.ws.readyState;
}

Expand Down
28 changes: 18 additions & 10 deletions core/relays.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ export class ConnectionClosed extends Error {}
// Interfaces
// ----------------------

export interface RelayLike
extends NonExclusiveWritableStream<ClientToRelayMessage> {
subscribe: Relay["subscribe"];
publish: Relay["publish"];
}

export interface RelayConfig
extends NostrNodeConfig<RelayFunctionParameterTypeRecord> {
url: RelayUrl;
Expand Down Expand Up @@ -131,10 +125,6 @@ export class Relay extends NostrNode<
);
this.callFunction("startSubscription", { controller, ...context });
},
pull: async () => {
await this.ws.ready();
// TODO: backpressure
},
cancel: (reason) => {
return this.callFunction("closeSubscription", { reason, ...context });
},
Expand Down Expand Up @@ -170,6 +160,24 @@ export class Relay extends NostrNode<
}
}

// ----------------------
// RelayLikes
// ----------------------

export interface RelayLike
extends NonExclusiveWritableStream<ClientToRelayMessage> {
readonly config: RelayLikeConfig;
subscribe: Relay["subscribe"];
publish: Relay["publish"];
}

export type RelayLikeConfig = Omit<
RelayConfig,
"url" | keyof NostrNodeConfig<RelayFunctionParameterTypeRecord>
>;

export type RelayLikeOptions = Partial<RelayLikeConfig>;

// ----------------------
// Events
// ----------------------
Expand Down
45 changes: 15 additions & 30 deletions core/websockets.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import { Notify } from "./x/async.ts";

export type WebSocketEventType = keyof WebSocketEventMap;

export interface WebSocketLike {
readonly url: string;
readonly readyState: WebSocketReadyState;
readonly readyState: WebSocket["readyState"];
send(
data: string | ArrayBufferLike | Blob | ArrayBufferView,
): void | Promise<void>;
Expand All @@ -25,14 +23,11 @@
*/
export class LazyWebSocket implements WebSocketLike {
#ws?: WebSocket;
#createWebSocket: () => WebSocket;

readonly #createWebSocket: () => WebSocket;
readonly #eventListenerMap = new Map<
WebSocketEventType,
EventListenerOptionsMap
>();
readonly #notifier = new Notify();

readonly url: string;

constructor(
Expand All @@ -41,11 +36,6 @@
) {
this.#createWebSocket = () => {
const ws = new WebSocket(url, protocols);
for (const type of ["close", "open"] as const) {
ws.addEventListener(type, () => {
this.#notifier.notifyAll();
});
}
this.#eventListenerMap.forEach((map, type) => {
map.forEach((options, listener) => {
ws.addEventListener(type, listener, options);
Expand All @@ -60,21 +50,26 @@
return this.#ws ??= this.#createWebSocket();
}

#once(type: WebSocketEventType): Promise<void> {
return new Promise<void>((resolve) => {
this.#created().addEventListener(type, () => resolve(), { once: true });
});
}

async #ready(): Promise<WebSocket> {
this.#ws = this.#created();
switch (this.#ws.readyState) {
case WebSocket.CONNECTING:
await this.#notifier.notified();
/* falls through */
case WebSocket.OPEN:
break;
case WebSocket.OPEN:
return this.#ws;
case WebSocket.CLOSING:
await this.#notifier.notified();
await this.#once("close");

Check warning on line 67 in core/websockets.ts

View check run for this annotation

Codecov / codecov/patch

core/websockets.ts#L67

Added line #L67 was not covered by tests
/* falls through */
case WebSocket.CLOSED:
this.#ws = this.#createWebSocket();
await this.#notifier.notified();
}
await this.#once("open");
return this.#ws;
}

Expand All @@ -93,21 +88,21 @@
}
switch (this.#ws.readyState) {
case WebSocket.CONNECTING:
await this.#notifier.notified();
await this.#once("open");

Check warning on line 91 in core/websockets.ts

View check run for this annotation

Codecov / codecov/patch

core/websockets.ts#L91

Added line #L91 was not covered by tests
/* falls through */
case WebSocket.OPEN:
this.#ws.close(code, reason);
/* falls through */
case WebSocket.CLOSING:
await this.#notifier.notified();
await this.#once("close");
/* falls through */
case WebSocket.CLOSED:
break;
}
this.#ws = undefined;
}

get readyState(): WebSocketReadyState {
get readyState(): WebSocket["readyState"] {
return this.#ws ? this.#ws.readyState : WebSocket.CLOSED;
}

Expand Down Expand Up @@ -138,13 +133,3 @@
return this.#ws?.dispatchEvent(event) ?? false;
};
}

/**
* The ready state of a WebSocket.
*/
export enum WebSocketReadyState {
CONNECTING = 0,
OPEN = 1,
CLOSING = 2,
CLOSED = 3,
}
1 change: 0 additions & 1 deletion core/x/async.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
export { Notify } from "https://deno.land/x/[email protected]/notify.ts";
export { Lock } from "https://deno.land/x/[email protected]/lock.ts";
1 change: 0 additions & 1 deletion deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 16 additions & 15 deletions lib/pools.ts → lib/relays.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
ClientToRelayMessage,
EventKind,
NostrEvent,
RelayUrl,
SubscriptionFilter,
} from "../core/protocol.d.ts";
import {
Relay,
RelayInit,
RelayLike,
RelayLikeConfig,
RelayLikeOptions,
SubscriptionOptions,
} from "../core/relays.ts";
import { NonExclusiveWritableStream } from "../core/streams.ts";
Expand All @@ -17,26 +16,28 @@
/**
* A pool of relays that can be used as a single relay.
*/
export class RelayPool extends NonExclusiveWritableStream<ClientToRelayMessage>
export class RelayGroup extends NonExclusiveWritableStream<ClientToRelayMessage>
implements RelayLike {
readonly relays: Relay[];

#relays_read: Relay[];

constructor(...init: (RelayUrl | RelayInit)[]) {
const relays = init.map((i) => new Relay(i));
readonly config: Readonly<RelayLikeConfig>;
#relays_read: RelayLike[];
#relays_write: RelayLike[];

constructor(readonly relays: RelayLike[], options?: RelayLikeOptions) {
const writers = relays.filter((r) => r.config.write)
.map((r) => r.getWriter());

super({
async write(msg) {
await Promise.all(writers.map((r) => r.write(msg)));
},
}, { highWaterMark: Math.max(...relays.map((r) => r.config.nbuffer)) });

this.relays = relays;
});
this.config = {
name: relays.map((r) => r.config.name).join(", "),
read: true,
write: true,
...options,
};
this.#relays_read = this.relays.filter((r) => r.config.read);
this.#relays_write = this.relays.filter((r) => r.config.write);
}

// ----------------------
Expand All @@ -54,7 +55,7 @@
async publish<K extends EventKind>(
msg: NostrEvent<K>,
) {
await Promise.all(this.relays.map((r) => r.publish(msg)));
await Promise.all(this.#relays_write.map((r) => r.publish(msg)));

Check warning on line 58 in lib/relays.ts

View check run for this annotation

Codecov / codecov/patch

lib/relays.ts#L58

Added line #L58 was not covered by tests
}

// ----------------------
Expand Down
75 changes: 75 additions & 0 deletions lib/relays_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { afterAll, beforeAll, describe, it } from "../lib/std/testing.ts";
import { assertEquals, assertInstanceOf } from "../lib/std/assert.ts";
import { NostrEvent } from "../core/protocol.d.ts";
import { Relay } from "../core/relays.ts?nips=1";
import { RelayGroup } from "../lib/relays.ts";
import { MockWebSocket } from "../lib/testing.ts";

describe("RelayGroup", () => {
let relays: Relay[];
let group: RelayGroup;
let sub: ReadableStream<NostrEvent>;

// ----------------------
// Setup
// ----------------------
beforeAll(() => {
globalThis.WebSocket = MockWebSocket;
relays = [
new Relay("ws://localhost:80", {
name: "relay-1",
read: true,
write: true,
}),
new Relay("ws://localhost:81", {
name: "relay-2",
read: true,
write: false,
}),
new Relay("ws://localhost:82", {
name: "relay-3",
read: false,
write: true,
}),
];
});
afterAll(() => {
group.close();
});

// ----------------------
// Constructor
// ----------------------
it("should create a group of relays", () => {
group = new RelayGroup(relays);
assertInstanceOf(group, RelayGroup);
});
it("should not have a url", () => {
// @ts-expect-error RelayGroup does not have a url
assertEquals(group.url, undefined);
});
it("should have a default name", () => {
assertEquals(group.config.name, "relay-1, relay-2, relay-3");
});
it("should have a custom name if provided", () => {
const group = new RelayGroup(relays, { name: "custom" });
assertEquals(group.config.name, "custom");
});
it("should have default read and write config", () => {
assertEquals(group.config.read, true);
assertEquals(group.config.write, true);
});
it("should have custom read and write config if provided", () => {
const group = new RelayGroup(relays, { read: false, write: false });
assertEquals(group.config.read, false);
assertEquals(group.config.write, false);
});

// ----------------------
// Subscription
// ----------------------
it("should create a subscription", () => {
sub = group.subscribe({ kinds: [1] }, { id: "test-group" });
assertInstanceOf(sub, ReadableStream);
});
});
29 changes: 26 additions & 3 deletions lib/streams.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
export { mergeReadableStreams as merge } from "./std/streams.ts";

/**
* TransformStream which filters out duplicate values from a stream.
*/
export class Distinctor<R = unknown, T = unknown>
extends TransformStream<R, R> {
#seen: Set<T>;

constructor(protected readonly fn: (value: R) => T) {
super({
transform: (value, controller) => {
Expand Down Expand Up @@ -35,3 +32,29 @@
});
}
}

export function merge<T>(
...streams: ReadableStream<T>[]
) {
const readers = streams.map((r) => r.getReader());
return new ReadableStream<T>({
async pull(controller) {
await Promise.any(readers.map(async (r) => {
const { value, done } = await r.read();
if (done) {
readers.splice(readers.indexOf(r), 1);
r.releaseLock();
}
if (value) {
controller.enqueue(value);
}
})).catch((e) => {
if (e instanceof AggregateError) {
controller.close();
} else {
throw e;
}

Check warning on line 56 in lib/streams.ts

View check run for this annotation

Codecov / codecov/patch

lib/streams.ts#L55-L56

Added lines #L55 - L56 were not covered by tests
});
},
});
}
Loading