Skip to content

Commit

Permalink
refactor!(lib/events): modify the interface of EventPublisher
Browse files Browse the repository at this point in the history
  • Loading branch information
hasundue committed Oct 17, 2023
1 parent 1e76f3a commit a16a7b2
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 17 deletions.
1 change: 0 additions & 1 deletion core/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { Lock } from "./x/async.ts";
export class NonExclusiveWritableStream<W = unknown> extends EventTarget
implements WritableStream<W> {
readonly locked = false;

readonly #writer: Lock<WritableStreamDefaultWriter<W>>;

constructor(
Expand Down
1 change: 1 addition & 0 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 6 additions & 16 deletions lib/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ import type {
ClientToRelayMessage,
EventContentFor,
EventKind,
PrivateKey,
RelayLike,
TagFor,
} from "../mod.ts";
import { Stringified } from "../core/types.ts";
Expand All @@ -14,23 +12,15 @@ export interface EventInit<K extends EventKind = EventKind> {
content: EventContentFor<K> | Stringified<EventContentFor<K>>;
}

import { Signer } from "./signs.ts";
import type { Signer } from "./signs.ts";

export class EventPublisher<K extends EventKind = EventKind>
extends WritableStream<EventInit<K>> {
#signer: Signer;
#messenger: WritableStreamDefaultWriter<ClientToRelayMessage>;

constructor(relayLike: RelayLike, nsec: PrivateKey) {
extends TransformStream<EventInit<K>, ClientToRelayMessage<"EVENT", K>> {
constructor(readonly signer: Signer) {
super({
write: (event) => this.publish(event),
close: () => this.#messenger.releaseLock(),
transform: (init, controller) => {
controller.enqueue(["EVENT", this.signer.sign(init)]);
},
});
this.#signer = new Signer(nsec);
this.#messenger = relayLike.getWriter();
}

publish<K extends EventKind>(event: EventInit<K>): Promise<void> {
return this.#messenger.write(["EVENT", this.#signer.sign(event)]);
}
}
47 changes: 47 additions & 0 deletions lib/events_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { afterAll, beforeAll, describe, it } from "./std/testing.ts";
import { assertEquals, assertInstanceOf } from "./std/assert.ts";
import { pipeThroughFrom } from "./x/streamtools.ts";
import type {} from "../core/protocol.d.ts";
import { Relay } from "../core/relays.ts?nips=1";
import { PrivateKey, Signer } from "./signs.ts";
import { EventInit, EventPublisher } from "./events.ts";
import { MockWebSocket } from "../lib/testing.ts";

describe("EventPublisher", () => {
let relay: Relay;
let signer: Signer;
let publisher: EventPublisher<1>;

beforeAll(() => {
globalThis.WebSocket = MockWebSocket;
signer = new Signer(PrivateKey.generate());
});
afterAll(() => {
relay?.close();
});

it("should be a TransformStream", () => {
publisher = new EventPublisher(signer);
assertInstanceOf(publisher, TransformStream);
});

// FIXME: dangling promise
it.ignore("should transform EventInit to ClientToRelayMessage", async () => {
const { readable, writable } = publisher;
const reader = readable.getReader();
const writer = writable.getWriter();
const init = { kind: 1, content: "hello" } satisfies EventInit<1>;
await writer.write(init);
const { value } = await reader.read();
assertEquals(value, ["EVENT", signer.sign(init)]);
await writer.close();
await reader.cancel();
});

// FIXME: runtime error
it.ignore("should be connectable to a relay", () => {
relay = new Relay("wss://example.com");
const writable = pipeThroughFrom(relay, publisher);
assertInstanceOf(writable, WritableStream);
});
});
1 change: 1 addition & 0 deletions lib/x/streamtools.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export { collect } from "https://deno.land/x/[email protected]/collect.ts";
export { pop } from "https://deno.land/x/[email protected]/pop.ts";
export { pipeThroughFrom } from "https://deno.land/x/[email protected]/pipe_through_from.ts";

0 comments on commit a16a7b2

Please sign in to comment.