Skip to content

Commit

Permalink
refactor core
Browse files Browse the repository at this point in the history
  • Loading branch information
ponderingdemocritus committed Jan 22, 2025
1 parent b9ba530 commit 82bc563
Showing 1 changed file with 75 additions and 40 deletions.
115 changes: 75 additions & 40 deletions packages/core/src/core/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,66 +5,93 @@ import { LogLevel } from "../types";
import type { Processor } from "./processor";
import type { z } from "zod";

// Basic Input interface with scheduling details
/**
* Interface for defining input handlers that can be registered with the Core system.
* @template T The type of data returned by the input handler
*/
export interface Input<T = unknown> {
/** Unique identifier for this input */
name: string;
/** Handler function that processes the input and returns a Promise of type T */
handler: (...args: unknown[]) => Promise<T>;
/** Zod schema for validating the response */
response: z.ZodType<T>;

/**
* If set, `interval` means this input is recurring.
* e.g. 60000 = runs every 60 seconds.
* Optional interval in milliseconds for recurring inputs.
* If set, the input will run repeatedly at this interval.
* @example
* ```ts
* // Run every minute
* interval: 60000
* ```
*/
interval?: number;

/**
* For scheduling. If omitted, we schedule it immediately (Date.now()).
* Optional timestamp for when this input should next run.
* If omitted, defaults to immediate execution (Date.now()).
*/
nextRun?: number;
}

/**
* Interface for defining output handlers that can be registered with the Core system.
* @template T The type of data the output handler accepts
*/
export interface Output<T = unknown> {
/** Unique identifier for this output */
name: string;
/** Handler function that processes the output data */
handler: (data: T) => Promise<unknown>;
/** Zod schema for validating the input data */
schema: z.ZodType<T>;
}

/**
* Configuration options for the Core system
*/
export interface CoreConfig {
/** Logging configuration */
logging?: {
/** Log level to use */
level: LogLevel;
/** Whether to enable colored output */
enableColors?: boolean;
/** Whether to include timestamps in logs */
enableTimestamp?: boolean;
};
}

/**
* A small priority-queue-like helper for tasks:
* - tasks must have a `nextRun` (epoch ms)
* Priority queue implementation for scheduling tasks.
* Tasks are ordered by their nextRun timestamp.
* @template T Type must include a nextRun timestamp property
*/
class TaskScheduler<T extends { nextRun: number }> {
private tasks: T[] = [];
private timerId?: NodeJS.Timeout;

/**
* @param onTaskDue Callback executed when a task is due to run
*/
constructor(private readonly onTaskDue: (task: T) => Promise<void>) {}

/**
* Adds or updates a task in the queue.
* Then, (re)starts the scheduler with the earliest task.
* Schedules a new task or updates an existing one.
* Tasks are automatically sorted by nextRun timestamp.
* @param task The task to schedule
*/
public scheduleTask(task: T): void {
// remove any existing version of the same task
this.tasks = this.tasks.filter((t) => t !== task);
// insert
this.tasks.push(task);
// sort by nextRun ascending
this.tasks.sort((a, b) => a.nextRun - b.nextRun);

this.start();
}

/**
* Clears the current timer and starts a new one
* for the *soonest* nextRun in the queue.
* Starts or restarts the scheduler timer for the next due task.
* @private
*/
private start() {
if (this.timerId) {
Expand All @@ -74,45 +101,51 @@ class TaskScheduler<T extends { nextRun: number }> {
if (this.tasks.length === 0) return;

const now = Date.now();
// The earliest scheduled task
const earliestTask = this.tasks[0];
const delay = Math.max(0, earliestTask.nextRun - now);

this.timerId = setTimeout(async () => {
this.timerId = undefined;
// Pop it from the queue
const task = this.tasks.shift();
if (!task) return;

// Execute
await this.onTaskDue(task);

// If there are still tasks, schedule the next
if (this.tasks.length) {
this.start();
}
}, delay) as unknown as NodeJS.Timeout;
}

/**
* Cancels all scheduled tasks.
* Stops the scheduler and clears all pending tasks.
*/
public stop() {
if (this.timerId) clearTimeout(this.timerId);
this.tasks = [];
}
}

/**
* Core system that manages inputs, outputs, and processing.
* Coordinates between input sources, the processor, and output handlers.
*/
export class Core {
private readonly inputs = new Map<string, Input & { nextRun: number }>();
private readonly outputs = new Map<string, Output>();
private readonly logger: Logger;
private readonly processor: Processor;
public readonly vectorDb: VectorDB;

// Our single scheduler for all inputs
private readonly inputScheduler: TaskScheduler<Input & { nextRun: number }>;

/**
* Creates a new Core instance.
* @param roomManager - Manager for handling rooms/spaces
* @param vectorDb - Vector database for storing embeddings
* @param processor - Processor for handling content
* @param config - Optional configuration options
*/
constructor(
private readonly roomManager: RoomManager,
vectorDb: VectorDB,
Expand All @@ -129,27 +162,22 @@ export class Core {
}
);

// Initialize the scheduler, telling it how to run tasks when they're due
this.inputScheduler = new TaskScheduler(async (task) => {
await this.handleInput(task);
});
}

/**
* Registers an input. If it's recurring, we schedule repeated runs.
* If not, it will run once (by default, immediately).
* Registers a new input handler with the system.
* If the input is recurring (has an interval), it will be scheduled to run repeatedly.
* @param input The input configuration to register
*/
public registerInput(input: Input): void {
const now = Date.now();

// If nextRun is not set, run it ASAP.
const nextRun = input.nextRun ?? now;

// Store it in our map (so we can reference it, remove it, etc.)
const scheduledInput = { ...input, nextRun };
this.inputs.set(input.name, scheduledInput);

// Add to the scheduler
this.inputScheduler.scheduleTask(scheduledInput);

this.logger.info("Core.registerInput", "Registered input", {
Expand All @@ -159,6 +187,10 @@ export class Core {
});
}

/**
* Removes a registered input handler.
* @param name Name of the input to remove
*/
public removeInput(name: string): void {
const input = this.inputs.get(name);
if (input) {
Expand All @@ -168,7 +200,8 @@ export class Core {
}

/**
* Registers an output. This is unchanged from your original flow.
* Registers a new output handler with the system.
* @param output The output configuration to register
*/
public registerOutput(output: Output): void {
this.logger.info("Core.registerOutput", "Registering output", {
Expand All @@ -178,6 +211,10 @@ export class Core {
this.processor.registerAvailableOutput(output);
}

/**
* Removes a registered output handler.
* @param name Name of the output to remove
*/
public removeOutput(name: string): void {
if (this.outputs.has(name)) {
this.outputs.delete(name);
Expand All @@ -186,7 +223,9 @@ export class Core {
}

/**
* Callback for when the scheduler says "it's time" for an input to run.
* Handles execution of an input when it's scheduled to run.
* @param input The input to handle
* @private
*/
private async handleInput(input: Input & { nextRun: number }): Promise<void> {
const { name, interval } = input;
Expand All @@ -195,18 +234,13 @@ export class Core {
const result = await input.handler();
if (!result) return;

// Re-schedule if it's recurring
if (interval && interval > 0) {
// Now + interval
input.nextRun = Date.now() + interval;
this.inputScheduler.scheduleTask(input);
} else {
// It's one-time, so remove from the map
this.inputs.delete(name);
}

// The rest of this function can do the same stuff as your original:
// - find the relevant room, run the processor, store memory, etc.
const room = await this.roomManager.ensureRoom(name, "core");
const items = Array.isArray(result) ? result : [result];

Expand All @@ -223,8 +257,6 @@ export class Core {
...processed.enrichedContext,
}
);
// handle any suggested outputs, etc.
// ...
}
}
} catch (error) {
Expand All @@ -236,7 +268,11 @@ export class Core {
}

/**
* Example same 'executeOutput' as before. (Unchanged for brevity.)
* Executes a registered output handler with the provided data.
* @param name Name of the output to execute
* @param data Data to pass to the output handler
* @returns The result of the output handler
* @template T The type of data the output handler accepts
*/
public async executeOutput<T>(name: string, data: T): Promise<T> {
const output = this.outputs.get(name);
Expand All @@ -246,15 +282,14 @@ export class Core {
this.logger.debug("Core.executeOutput", "Executing output", { name, data });

try {
// etc.
return data;
} catch (error) {
return error as T;
}
}

/**
* Stop everything.
* Stops all scheduled tasks and shuts down the Core system.
*/
public stop(): void {
this.inputScheduler.stop();
Expand Down

0 comments on commit 82bc563

Please sign in to comment.