Skip to content

Commit

Permalink
refactor!: abandon NonExclusiveWritableStream (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
hasundue authored Oct 17, 2023
1 parent 45725e3 commit abf06ef
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 168 deletions.
38 changes: 28 additions & 10 deletions core/nodes.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { NostrMessage } from "./protocol.d.ts";
import type { Logger } from "./types.ts";
import { WebSocketLike } from "./websockets.ts";
import { NonExclusiveWritableStream } from "./streams.ts";

export interface NostrNodeConfig<
F extends FunctionParameterTypeRecord = FunctionParameterTypeRecord,
Expand All @@ -22,7 +21,8 @@ export class NostrNode<
W extends NostrMessage = NostrMessage,
E extends EventDataTypeRecord = EventDataTypeRecord,
F extends FunctionParameterTypeRecord = FunctionParameterTypeRecord,
> extends NonExclusiveWritableStream<W> {
> extends WritableStream<W> implements EventTarget {
readonly #eventTarget = new EventTarget();
readonly config: Readonly<NostrNodeConfig<F>>;
protected readonly functions: NostrNodeFunctionSet<F> = {};
protected readonly aborter = new AbortController();
Expand All @@ -39,13 +39,23 @@ export class NostrNode<
this.config.modules.forEach((m) => this.addModule(m));
}

send(msg: W) {
return this.ws.send(JSON.stringify(msg));
}

get status(): WebSocket["readyState"] {
return this.ws.readyState;
}

close() {
async close() {
this.aborter.abort();
return super.close();
try {
await super.close();
} catch (err) {
if (super.locked) { // This should not happen.
throw err;
} // Otherwise the stream is already closed, which is fine.
}
}

addModule(module: NostrNodeModule<F>) {
Expand Down Expand Up @@ -75,27 +85,35 @@ export class NostrNode<
}
}

override addEventListener = <T extends EventType<E>>(
addEventListener = <T extends EventType<E>>(
type: T,
listener: NostrNodeEventListenerOrEventListenerObject<E, T> | null,
options?: AddEventListenerOptions,
) => {
super.addEventListener(
return this.#eventTarget.addEventListener(
type,
listener as EventListenerOrEventListenerObject,
{ signal: this.aborter.signal, ...options },
);
};

declare removeEventListener: <T extends EventType<E>>(
removeEventListener = <T extends EventType<E>>(
type: T,
listener: NostrNodeEventListenerOrEventListenerObject<E, T> | null,
options?: boolean | EventListenerOptions,
) => void;
) => {
return this.#eventTarget.removeEventListener(
type,
listener as EventListenerOrEventListenerObject,
options,
);
};

declare dispatchEvent: <T extends EventType<E>>(
dispatchEvent = <T extends EventType<E>>(
event: NostrNodeEvent<E, T>,
) => boolean;
) => {
return this.#eventTarget.dispatchEvent(event);
};
}

// ------------------------------
Expand Down
6 changes: 4 additions & 2 deletions core/nodes_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { MockWebSocket } from "../lib/testing.ts";

describe("NostrNode", () => {
let node: NostrNode;
let writer: WritableStreamDefaultWriter;

beforeAll(() => {
node = new NostrNode(new MockWebSocket());
Expand All @@ -23,12 +24,13 @@ describe("NostrNode", () => {
});

it("should be connected to the WebSocket after a message is sent", async () => {
await node.getWriter().write(["NOTICE", "test"]);
writer = node.getWriter();
await writer.write(["NOTICE", "test"]);
writer.releaseLock();
assertEquals(node.status, WebSocket.OPEN);
});

it("should close the WebSocket when the node is closed", async () => {
await node.getWriter().write(["NOTICE", "test"]);
await node.close();
assertEquals(node.status, WebSocket.CLOSED);
});
Expand Down
5 changes: 2 additions & 3 deletions core/relays.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import type {
SubscriptionFilter,
SubscriptionId,
} from "./protocol.d.ts";
import { NonExclusiveWritableStream } from "./streams.ts";
import { LazyWebSocket } from "./websockets.ts";
import {
NostrNode,
Expand Down Expand Up @@ -164,9 +163,9 @@ export class Relay extends NostrNode<
// RelayLikes
// ----------------------

export interface RelayLike
extends NonExclusiveWritableStream<ClientToRelayMessage> {
export interface RelayLike extends WritableStream<ClientToRelayMessage> {
readonly config: RelayLikeConfig;
send: Relay["send"];
subscribe: Relay["subscribe"];
publish: Relay["publish"];
}
Expand Down
42 changes: 0 additions & 42 deletions core/streams.ts

This file was deleted.

89 changes: 0 additions & 89 deletions core/streams_test.ts

This file was deleted.

3 changes: 2 additions & 1 deletion lib/events_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ describe("EventPublisher", () => {
});

// FIXME: runtime error
it.ignore("should be connectable to a relay", () => {
it("should be connectable to a relay", async () => {
relay = new Relay("wss://example.com");
const writable = pipeThroughFrom(relay, publisher);
assertInstanceOf(writable, WritableStream);
await writable.close();
});
});
11 changes: 6 additions & 5 deletions lib/relays.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,21 @@ import {
RelayLikeOptions,
SubscriptionOptions,
} from "../core/relays.ts";
import { NonExclusiveWritableStream } from "../core/streams.ts";
import { Distinctor, merge } from "../lib/streams.ts";

/**
* A pool of relays that can be used as a single relay.
*/
export class RelayGroup extends NonExclusiveWritableStream<ClientToRelayMessage>
export class RelayGroup extends WritableStream<ClientToRelayMessage>
implements RelayLike {
readonly config: Readonly<RelayLikeConfig>;
#relays_read: RelayLike[];
#relays_write: RelayLike[];

constructor(readonly relays: RelayLike[], options?: RelayLikeOptions) {
const writers = relays.filter((r) => r.config.write)
.map((r) => r.getWriter());
super({
async write(msg) {
await Promise.all(writers.map((r) => r.write(msg)));
await Promise.all(relays.map((r) => r.send(msg)));
},
});
this.config = {
Expand Down Expand Up @@ -62,6 +59,10 @@ export class RelayGroup extends NonExclusiveWritableStream<ClientToRelayMessage>
// NostrNode methods
// ----------------------

async send(msg: ClientToRelayMessage) {
await Promise.all(this.relays.map((r) => r.send(msg)));
}

async close() {
await Promise.all(this.relays.map((r) => r.close()));
}
Expand Down
12 changes: 4 additions & 8 deletions nips/01/clients.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,19 @@ import { ClientModule, ClientSubscriptionEvent } from "../../core/clients.ts";

export default {
async handleClientToRelayMessage({ message, client }) {
const messenger = client.getWriter();
const kind = message[0];
if (kind === "EVENT") {
const event = message[1];

// This should throw if the event is not acceptable.
await client.callFunction("acceptEvent", { event, client });

await messenger.ready;
return messenger.write(["OK", event.id, true, ""]);
return client.send(["OK", event.id, true, ""]);
}
if (kind === "CLOSE") {
const sid = message[1];
const sub = client.subscriptions.get(sid);
if (!sub) {
return messenger.write(["NOTICE", `Unknown subscription ID: ${sid}`]);
return client.send(["NOTICE", `Unknown subscription ID: ${sid}`]);
}
client.subscriptions.delete(sid);
return sub.close();
Expand All @@ -28,9 +25,8 @@ export default {
client.subscriptions.set(
sid,
new WritableStream<NostrEvent>({
write: async (event) => {
await messenger.ready;
return messenger.write(["EVENT", sid, event]);
write(event) {
return client.send(["EVENT", sid, event]);
},
}),
);
Expand Down
12 changes: 4 additions & 8 deletions nips/01/relays.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,16 @@ export default {
},

publishEvent({ event, relay }) {
const writer = relay.getWriter();
writer.ready.then(() => writer.write(["EVENT", event]));
return relay.send(["EVENT", event]);
},

startSubscription({ filters, id, relay }) {
const messenger = relay.getWriter();
return messenger.write(["REQ", id, ...filters]);
return relay.send(["REQ", id, ...filters]);
},

async closeSubscription({ id, relay }) {
const messenger = relay.getWriter();
closeSubscription({ id, relay }) {
if (relay.ws.readyState === WebSocket.OPEN) {
await messenger.write(["CLOSE", id]);
return relay.send(["CLOSE", id]);
}
return messenger.close();
},
} satisfies RelayModule["default"];

0 comments on commit abf06ef

Please sign in to comment.