From 82bc563d544a28ebbd8ee42be3cc0a4245310237 Mon Sep 17 00:00:00 2001 From: ponderingdemocritus Date: Wed, 22 Jan 2025 20:23:54 +1100 Subject: [PATCH] refactor core --- packages/core/src/core/core.ts | 115 +++++++++++++++++++++------------ 1 file changed, 75 insertions(+), 40 deletions(-) diff --git a/packages/core/src/core/core.ts b/packages/core/src/core/core.ts index 4ce5451e..81eb858b 100644 --- a/packages/core/src/core/core.ts +++ b/packages/core/src/core/core.ts @@ -5,66 +5,93 @@ import { LogLevel } from "../types"; import type { Processor } from "./processor"; import type { z } from "zod"; -// Basic Input interface with scheduling details +/** + * Interface for defining input handlers that can be registered with the Core system. + * @template T The type of data returned by the input handler + */ export interface Input { + /** Unique identifier for this input */ name: string; + /** Handler function that processes the input and returns a Promise of type T */ handler: (...args: unknown[]) => Promise; + /** Zod schema for validating the response */ response: z.ZodType; /** - * If set, `interval` means this input is recurring. - * e.g. 60000 = runs every 60 seconds. + * Optional interval in milliseconds for recurring inputs. + * If set, the input will run repeatedly at this interval. + * @example + * ```ts + * // Run every minute + * interval: 60000 + * ``` */ interval?: number; /** - * For scheduling. If omitted, we schedule it immediately (Date.now()). + * Optional timestamp for when this input should next run. + * If omitted, defaults to immediate execution (Date.now()). */ nextRun?: number; } +/** + * Interface for defining output handlers that can be registered with the Core system. + * @template T The type of data the output handler accepts + */ export interface Output { + /** Unique identifier for this output */ name: string; + /** Handler function that processes the output data */ handler: (data: T) => Promise; + /** Zod schema for validating the input data */ schema: z.ZodType; } +/** + * Configuration options for the Core system + */ export interface CoreConfig { + /** Logging configuration */ logging?: { + /** Log level to use */ level: LogLevel; + /** Whether to enable colored output */ enableColors?: boolean; + /** Whether to include timestamps in logs */ enableTimestamp?: boolean; }; } /** - * A small priority-queue-like helper for tasks: - * - tasks must have a `nextRun` (epoch ms) + * Priority queue implementation for scheduling tasks. + * Tasks are ordered by their nextRun timestamp. + * @template T Type must include a nextRun timestamp property */ class TaskScheduler { private tasks: T[] = []; private timerId?: NodeJS.Timeout; + /** + * @param onTaskDue Callback executed when a task is due to run + */ constructor(private readonly onTaskDue: (task: T) => Promise) {} /** - * Adds or updates a task in the queue. - * Then, (re)starts the scheduler with the earliest task. + * Schedules a new task or updates an existing one. + * Tasks are automatically sorted by nextRun timestamp. + * @param task The task to schedule */ public scheduleTask(task: T): void { - // remove any existing version of the same task this.tasks = this.tasks.filter((t) => t !== task); - // insert this.tasks.push(task); - // sort by nextRun ascending this.tasks.sort((a, b) => a.nextRun - b.nextRun); - this.start(); } /** - * Clears the current timer and starts a new one - * for the *soonest* nextRun in the queue. + * Starts or restarts the scheduler timer for the next due task. + * @private */ private start() { if (this.timerId) { @@ -74,20 +101,16 @@ class TaskScheduler { if (this.tasks.length === 0) return; const now = Date.now(); - // The earliest scheduled task const earliestTask = this.tasks[0]; const delay = Math.max(0, earliestTask.nextRun - now); this.timerId = setTimeout(async () => { this.timerId = undefined; - // Pop it from the queue const task = this.tasks.shift(); if (!task) return; - // Execute await this.onTaskDue(task); - // If there are still tasks, schedule the next if (this.tasks.length) { this.start(); } @@ -95,7 +118,7 @@ class TaskScheduler { } /** - * Cancels all scheduled tasks. + * Stops the scheduler and clears all pending tasks. */ public stop() { if (this.timerId) clearTimeout(this.timerId); @@ -103,6 +126,10 @@ class TaskScheduler { } } +/** + * Core system that manages inputs, outputs, and processing. + * Coordinates between input sources, the processor, and output handlers. + */ export class Core { private readonly inputs = new Map(); private readonly outputs = new Map(); @@ -110,9 +137,15 @@ export class Core { private readonly processor: Processor; public readonly vectorDb: VectorDB; - // Our single scheduler for all inputs private readonly inputScheduler: TaskScheduler; + /** + * Creates a new Core instance. + * @param roomManager - Manager for handling rooms/spaces + * @param vectorDb - Vector database for storing embeddings + * @param processor - Processor for handling content + * @param config - Optional configuration options + */ constructor( private readonly roomManager: RoomManager, vectorDb: VectorDB, @@ -129,27 +162,22 @@ export class Core { } ); - // Initialize the scheduler, telling it how to run tasks when they're due this.inputScheduler = new TaskScheduler(async (task) => { await this.handleInput(task); }); } /** - * Registers an input. If it's recurring, we schedule repeated runs. - * If not, it will run once (by default, immediately). + * Registers a new input handler with the system. + * If the input is recurring (has an interval), it will be scheduled to run repeatedly. + * @param input The input configuration to register */ public registerInput(input: Input): void { const now = Date.now(); - - // If nextRun is not set, run it ASAP. const nextRun = input.nextRun ?? now; - // Store it in our map (so we can reference it, remove it, etc.) const scheduledInput = { ...input, nextRun }; this.inputs.set(input.name, scheduledInput); - - // Add to the scheduler this.inputScheduler.scheduleTask(scheduledInput); this.logger.info("Core.registerInput", "Registered input", { @@ -159,6 +187,10 @@ export class Core { }); } + /** + * Removes a registered input handler. + * @param name Name of the input to remove + */ public removeInput(name: string): void { const input = this.inputs.get(name); if (input) { @@ -168,7 +200,8 @@ export class Core { } /** - * Registers an output. This is unchanged from your original flow. + * Registers a new output handler with the system. + * @param output The output configuration to register */ public registerOutput(output: Output): void { this.logger.info("Core.registerOutput", "Registering output", { @@ -178,6 +211,10 @@ export class Core { this.processor.registerAvailableOutput(output); } + /** + * Removes a registered output handler. + * @param name Name of the output to remove + */ public removeOutput(name: string): void { if (this.outputs.has(name)) { this.outputs.delete(name); @@ -186,7 +223,9 @@ export class Core { } /** - * Callback for when the scheduler says "it's time" for an input to run. + * Handles execution of an input when it's scheduled to run. + * @param input The input to handle + * @private */ private async handleInput(input: Input & { nextRun: number }): Promise { const { name, interval } = input; @@ -195,18 +234,13 @@ export class Core { const result = await input.handler(); if (!result) return; - // Re-schedule if it's recurring if (interval && interval > 0) { - // Now + interval input.nextRun = Date.now() + interval; this.inputScheduler.scheduleTask(input); } else { - // It's one-time, so remove from the map this.inputs.delete(name); } - // The rest of this function can do the same stuff as your original: - // - find the relevant room, run the processor, store memory, etc. const room = await this.roomManager.ensureRoom(name, "core"); const items = Array.isArray(result) ? result : [result]; @@ -223,8 +257,6 @@ export class Core { ...processed.enrichedContext, } ); - // handle any suggested outputs, etc. - // ... } } } catch (error) { @@ -236,7 +268,11 @@ export class Core { } /** - * Example same 'executeOutput' as before. (Unchanged for brevity.) + * Executes a registered output handler with the provided data. + * @param name Name of the output to execute + * @param data Data to pass to the output handler + * @returns The result of the output handler + * @template T The type of data the output handler accepts */ public async executeOutput(name: string, data: T): Promise { const output = this.outputs.get(name); @@ -246,7 +282,6 @@ export class Core { this.logger.debug("Core.executeOutput", "Executing output", { name, data }); try { - // etc. return data; } catch (error) { return error as T; @@ -254,7 +289,7 @@ export class Core { } /** - * Stop everything. + * Stops all scheduled tasks and shuts down the Core system. */ public stop(): void { this.inputScheduler.stop();