Skip to content

Commit

Permalink
refactor(lib/streams): rename classes
Browse files Browse the repository at this point in the history
  • Loading branch information
hasundue committed Mar 21, 2024
1 parent bf5b1d5 commit 590053c
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 21 deletions.
18 changes: 8 additions & 10 deletions lib/streams.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
/**
* TransformStream which filters out duplicate values from a stream.
*/
export class Distinctor<R = unknown, T = unknown>
export class DistinctStream<R = unknown, T = unknown>
extends TransformStream<R, R> {
#seen: Set<T>;
constructor(protected readonly fn: (value: R) => T) {
constructor(protected readonly selector: (value: R) => T) {
super({
transform: (value, controller) => {
const key = fn(value);
const key = selector(value);

if (!this.#seen.has(key)) {
this.#seen.add(key);
Expand All @@ -19,15 +19,13 @@ export class Distinctor<R = unknown, T = unknown>
}
}

export type LogLevel = "error" | "warn" | "info" | "debug";

export interface ConsoleLoggerOptions {
level?: LogLevel;
export interface ConsoleLogStreamOptions {
level?: "error" | "warn" | "info" | "debug";
}

export class ConsoleLogger<W = unknown> extends WritableStream<W> {
readonly level: LogLevel;
constructor(options?: ConsoleLoggerOptions) {
export class ConsoleLogStream<W = unknown> extends WritableStream<W> {
readonly level: ConsoleLogStreamOptions["level"];
constructor(options?: ConsoleLogStreamOptions) {
const level = options?.level ?? "info";
super({
write(chunk) {
Expand Down
18 changes: 9 additions & 9 deletions lib/streams_test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { assert, assertArrayIncludes, assertEquals } from "@std/assert";
import { beforeEach, describe, it } from "@std/testing/bdd";
import { ConsoleLogger, Distinctor } from "@lophus/lib/streams";
import { ConsoleLogStream, DistinctStream } from "@lophus/lib/streams";

describe("Distinctor", () => {
it("filters out duplicate values from a stream", async () => {
describe("DistinctStream", () => {
it("should filter out duplicate values from a stream", async () => {
const stream = new ReadableStream<number>({
start(controller) {
controller.enqueue(1);
Expand All @@ -14,14 +14,14 @@ describe("Distinctor", () => {
},
});
const values = await Array.fromAsync(
stream.pipeThrough(new Distinctor((v) => v)),
stream.pipeThrough(new DistinctStream((v) => v)),
);
assertEquals(values.length, 3);
assertArrayIncludes(values, [1, 2, 3]);
});
});

describe("ConsoleLogger", () => {
describe("ConsoleLogStream", () => {
let output = "";

beforeEach(() => {
Expand All @@ -35,13 +35,13 @@ describe("ConsoleLogger", () => {
});

it("should be a writable stream", () => {
const logger = new ConsoleLogger();
const logger = new ConsoleLogStream();
assert(logger instanceof WritableStream);
});

it("should log to console", async () => {
const logger = new ConsoleLogger();
await logger.getWriter().write("ConsoleLogger");
assertEquals(output, "ConsoleLogger");
const logger = new ConsoleLogStream();
await logger.getWriter().write("ConsoleLogStream");
assertEquals(output, "ConsoleLogStream");
});
});
4 changes: 2 additions & 2 deletions std/pools.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { mergeReadableStreams as merge } from "@std/streams";
import { Distinctor } from "@lophus/lib/streams";
import { DistinctStream } from "@lophus/lib/streams";
import type {
ClientToRelayMessage,
EventKind,
Expand Down Expand Up @@ -47,7 +47,7 @@ export class RelayPool implements RelayLike {
opts: Partial<SubscriptionOptions> = {},
): ReadableStream<NostrEvent<K>> {
const subs = this.#relays_read.map((r) => r.subscribe(filter, opts));
return merge(...subs).pipeThrough(new Distinctor((m) => m.id));
return merge(...subs).pipeThrough(new DistinctStream((m) => m.id));
}

async publish<K extends EventKind>(
Expand Down

0 comments on commit 590053c

Please sign in to comment.