Skip to content

Commit

Permalink
feat(std): watch for websockets
Browse files Browse the repository at this point in the history
  • Loading branch information
hasundue committed Mar 22, 2024
1 parent b539414 commit 5ac39ec
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 18 deletions.
55 changes: 38 additions & 17 deletions std/watch.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,52 @@
import { WebSocketEventType, WebSocketLike } from "@lophus/lib/websockets";
import { AnyEventTypeRecord } from "@lophus/core/nodes";
import { EventType, Node, NodeEvent } from "@lophus/core/nodes";
import type { InterNodeMessage } from "@lophus/core/protocol";

interface WatchChainable<
interface WatchNodeChainable<
R extends AnyEventTypeRecord,
> {
<T extends EventType<R>>(...events: T[]): ReadableStream<NodeEvent<R, T>>;
}

interface WatchWebSocketChainable {
<T extends WebSocketEventType>(
...events: T[]
): ReadableStream<WebSocketEventMap[T]>;
}

export function watch<R extends AnyEventTypeRecord>(
...nodes: Node<InterNodeMessage, R>[]
): WatchChainable<R> {
): WatchNodeChainable<R>;

export function watch(
...wss: WebSocketLike[]
): WatchWebSocketChainable;

export function watch<R extends AnyEventTypeRecord>(
...targets: Node<InterNodeMessage, R>[] | WebSocketLike[]
): WatchNodeChainable<R> | WatchWebSocketChainable {
const aborter = new AbortController();
return <T extends EventType<R>>(...events: T[]) =>
new ReadableStream<NodeEvent<R, T>>({
start(controller) {
nodes.forEach((node) =>
events.forEach((type) =>
node.addEventListener(type, (event) => {
// De-prioritize to regular listeners
queueMicrotask(() => controller.enqueue(event));
}, { signal: aborter.signal })
)
);
},
cancel() {
aborter.abort();
return <T extends EventType<R> | WebSocketEventType>(
...events: T[]
) => {
return new ReadableStream(
{
start(controller) {
targets.forEach((target) =>
events.forEach((type) =>
// @ts-ignore we do not type this strictly for readability
target.addEventListener(type, (event) => {
// de-prioritize to regular listeners
queueMicrotask(() => controller.enqueue(event));
}, { signal: aborter.signal })
)
);
},
cancel() {
aborter.abort();
},

Check warning on line 48 in std/watch.ts

View check run for this annotation

Codecov / codecov/patch

std/watch.ts#L46-L48

Added lines #L46 - L48 were not covered by tests
},
});
);
};
}
41 changes: 40 additions & 1 deletion std/watch_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,46 @@ import { MockWebSocket } from "@lophus/lib/testing";
import { Relay } from "@lophus/nips/relays";
import { watch } from "./watch.ts";

describe("watch - Relay", () => {
describe("watch - websockets", () => {
const ws = new MockWebSocket();

it("should create a chainable from a websocket", () => {
const chainable = watch(ws);
assertExists(chainable.call);
});

it("should create a stream of events from multiple websockets", () => {
const chainable = watch(ws, ws);
assertExists(chainable.call);
});

it("should create a stream of events from a websocket", () => {
const stream = watch(ws)("message");
assertInstanceOf(stream, ReadableStream);
});

it("should create a stream of events of multiple types from a websocket", () => {
const stream = watch(ws)("message", "open");
assertInstanceOf(stream, ReadableStream);
});

it("should create a stream of events of multiple types from multiple websockets", () => {
const stream = watch(ws, ws)("message", "open");
assertInstanceOf(stream, ReadableStream);
});

it("should receive an event from a websocket", async () => {
const stream = watch(ws)("message");
const reader = stream.getReader();
ws.dispatchEvent(new MessageEvent("message", { data: "test" }));
const { value } = await reader.read();
assertExists(value);
assertEquals(value.type, "message");
assertEquals(value.data, "test");
});
});

describe("watch - relays", () => {
let relay: Relay;

beforeAll(() => {
Expand Down

0 comments on commit 5ac39ec

Please sign in to comment.