diff --git a/std/watch.ts b/std/watch.ts index a074546..0237838 100644 --- a/std/watch.ts +++ b/std/watch.ts @@ -1,31 +1,52 @@ +import { WebSocketEventType, WebSocketLike } from "@lophus/lib/websockets"; import { AnyEventTypeRecord } from "@lophus/core/nodes"; import { EventType, Node, NodeEvent } from "@lophus/core/nodes"; import type { InterNodeMessage } from "@lophus/core/protocol"; -interface WatchChainable< +interface WatchNodeChainable< R extends AnyEventTypeRecord, > { >(...events: T[]): ReadableStream>; } +interface WatchWebSocketChainable { + ( + ...events: T[] + ): ReadableStream; +} + export function watch( ...nodes: Node[] -): WatchChainable { +): WatchNodeChainable; + +export function watch( + ...wss: WebSocketLike[] +): WatchWebSocketChainable; + +export function watch( + ...targets: Node[] | WebSocketLike[] +): WatchNodeChainable | WatchWebSocketChainable { const aborter = new AbortController(); - return >(...events: T[]) => - new ReadableStream>({ - start(controller) { - nodes.forEach((node) => - events.forEach((type) => - node.addEventListener(type, (event) => { - // De-prioritize to regular listeners - queueMicrotask(() => controller.enqueue(event)); - }, { signal: aborter.signal }) - ) - ); - }, - cancel() { - aborter.abort(); + return | WebSocketEventType>( + ...events: T[] + ) => { + return new ReadableStream( + { + start(controller) { + targets.forEach((target) => + events.forEach((type) => + // @ts-ignore we do not type this strictly for readability + target.addEventListener(type, (event) => { + // de-prioritize to regular listeners + queueMicrotask(() => controller.enqueue(event)); + }, { signal: aborter.signal }) + ) + ); + }, + cancel() { + aborter.abort(); + }, }, - }); + ); + }; } diff --git a/std/watch_test.ts b/std/watch_test.ts index 2ccf31c..2b242f8 100644 --- a/std/watch_test.ts +++ b/std/watch_test.ts @@ -4,7 +4,46 @@ import { MockWebSocket } from "@lophus/lib/testing"; import { Relay } from "@lophus/nips/relays"; import { watch } from "./watch.ts"; -describe("watch - Relay", () => { +describe("watch - websockets", () => { + const ws = new MockWebSocket(); + + it("should create a chainable from a websocket", () => { + const chainable = watch(ws); + assertExists(chainable.call); + }); + + it("should create a stream of events from multiple websockets", () => { + const chainable = watch(ws, ws); + assertExists(chainable.call); + }); + + it("should create a stream of events from a websocket", () => { + const stream = watch(ws)("message"); + assertInstanceOf(stream, ReadableStream); + }); + + it("should create a stream of events of multiple types from a websocket", () => { + const stream = watch(ws)("message", "open"); + assertInstanceOf(stream, ReadableStream); + }); + + it("should create a stream of events of multiple types from multiple websockets", () => { + const stream = watch(ws, ws)("message", "open"); + assertInstanceOf(stream, ReadableStream); + }); + + it("should receive an event from a websocket", async () => { + const stream = watch(ws)("message"); + const reader = stream.getReader(); + ws.dispatchEvent(new MessageEvent("message", { data: "test" })); + const { value } = await reader.read(); + assertExists(value); + assertEquals(value.type, "message"); + assertEquals(value.data, "test"); + }); +}); + +describe("watch - relays", () => { let relay: Relay; beforeAll(() => {