From 0696dcf7d34efe7305bf7336eafb655f1544b507 Mon Sep 17 00:00:00 2001 From: ponderingdemocritus Date: Thu, 30 Jan 2025 15:59:32 +1100 Subject: [PATCH 1/6] abstract db for other services, move sheduler out of core --- examples/example-api.ts | 9 +- examples/example-discord.ts | 2 +- examples/example-server.ts | 8 +- examples/example-twitter.ts | 48 ++- packages/core/src/core/{ => db}/mongo-db.ts | 82 ++--- packages/core/src/core/index.ts | 4 +- packages/core/src/core/memory.ts | 73 +++++ packages/core/src/core/orchestrator.ts | 324 +++++--------------- packages/core/src/core/schedule-service.ts | 125 ++++++++ packages/core/src/core/task-scheduler.ts | 62 ---- 10 files changed, 343 insertions(+), 394 deletions(-) rename packages/core/src/core/{ => db}/mongo-db.ts (82%) create mode 100644 packages/core/src/core/memory.ts create mode 100644 packages/core/src/core/schedule-service.ts delete mode 100644 packages/core/src/core/task-scheduler.ts diff --git a/examples/example-api.ts b/examples/example-api.ts index 9b3b8621..336f2497 100644 --- a/examples/example-api.ts +++ b/examples/example-api.ts @@ -20,7 +20,7 @@ import { defaultCharacter } from "../packages/core/src/core/character"; import { Consciousness } from "../packages/core/src/core/consciousness"; import { z } from "zod"; import readline from "readline"; -import { MongoDb } from "../packages/core/src/core/mongo-db"; +import { MongoDb } from "../packages/core/src/core/db/mongo-db"; async function main() { const loglevel = LogLevel.DEBUG; @@ -162,13 +162,6 @@ async function main() { orchestrator.registerIOHandler({ name: "user_chat", role: HandlerRole.INPUT, - // This schema describes what a user message looks like - outputSchema: z.object({ - content: z.string(), - userId: z.string().optional(), - }), - // For "on-demand" input handlers, the `handler()` can be a no-op. - // We'll call it manually with data, so we don't need an interval. execute: async (payload) => { // We simply return the payload so the Orchestrator can process it return payload; diff --git a/examples/example-discord.ts b/examples/example-discord.ts index fbbb6100..3551b142 100644 --- a/examples/example-discord.ts +++ b/examples/example-discord.ts @@ -16,7 +16,7 @@ import chalk from "chalk"; import { defaultCharacter } from "../packages/core/src/core/character"; import { z } from "zod"; import readline from "readline"; -import { MongoDb } from "../packages/core/src/core/mongo-db"; +import { MongoDb } from "../packages/core/src/core/db/mongo-db"; import { Message } from "discord.js"; async function main() { diff --git a/examples/example-server.ts b/examples/example-server.ts index 13732ad0..122d0f78 100644 --- a/examples/example-server.ts +++ b/examples/example-server.ts @@ -17,7 +17,7 @@ import { MessageProcessor } from "../packages/core/src/core/processors/message-p import { defaultCharacter } from "../packages/core/src/core/character"; import { LogLevel } from "../packages/core/src/core/types"; -import { MongoDb } from "../packages/core/src/core/mongo-db"; +import { MongoDb } from "../packages/core/src/core/db/mongo-db"; const scheduledTaskDb = new MongoDb( "mongodb://localhost:27017", @@ -75,10 +75,6 @@ async function createDaydreamsAgent() { orchestrator.registerIOHandler({ name: "user_chat", role: HandlerRole.INPUT, - outputSchema: z.object({ - content: z.string(), - userId: z.string().optional(), - }), execute: async (payload) => { return payload; }, @@ -155,7 +151,7 @@ wss.on("connection", (ws) => { userId: userId, }, userId, - orchestratorId ? new ObjectId(orchestratorId) : undefined + orchestratorId ? orchestratorId : undefined ); // Send responses back through WebSocket diff --git a/examples/example-twitter.ts b/examples/example-twitter.ts index 17759d6b..3f17344c 100644 --- a/examples/example-twitter.ts +++ b/examples/example-twitter.ts @@ -21,7 +21,9 @@ import { defaultCharacter } from "../packages/core/src/core/character"; import { Consciousness } from "../packages/core/src/core/consciousness"; import { z } from "zod"; import readline from "readline"; -import { MongoDb } from "../packages/core/src/core/mongo-db"; +import { MongoDb } from "../packages/core/src/core/db/mongo-db"; +import { SchedulerService } from "../packages/core/src/core/schedule-service"; +import { Logger } from "../packages/core/src/core/logger"; async function main() { const loglevel = LogLevel.DEBUG; @@ -59,7 +61,7 @@ async function main() { await scheduledTaskDb.deleteAll(); // Initialize core system - const core = new Orchestrator( + const orchestrator = new Orchestrator( roomManager, vectorDb, [processor], @@ -71,6 +73,23 @@ async function main() { } ); + const scheduler = new SchedulerService( + { + logger: new Logger({ + level: loglevel, + enableColors: true, + enableTimestamp: true, + }), + orchestratorDb: scheduledTaskDb, + roomManager: roomManager, + vectorDb: vectorDb, + }, + orchestrator, + 10000 + ); + + scheduler.start(); + // Set up Twitter client with credentials const twitter = new TwitterClient( { @@ -89,7 +108,7 @@ async function main() { }); // Register input handler for Twitter mentions - core.registerIOHandler({ + orchestrator.registerIOHandler({ name: "twitter_mentions", role: HandlerRole.INPUT, execute: async () => { @@ -115,7 +134,7 @@ async function main() { }); // Register input handler for autonomous thoughts - core.registerIOHandler({ + orchestrator.registerIOHandler({ name: "consciousness_thoughts", role: HandlerRole.INPUT, execute: async () => { @@ -132,7 +151,7 @@ async function main() { }); // Register output handler for posting thoughts to Twitter - core.registerIOHandler({ + orchestrator.registerIOHandler({ name: "twitter_thought", role: HandlerRole.OUTPUT, execute: async (data: unknown) => { @@ -157,11 +176,16 @@ async function main() { }); // Schedule a task to run every minute - await core.scheduleTaskInDb("sleever", "twitter_mentions", {}, 6000); // Check mentions every minute - await core.scheduleTaskInDb("sleever", "consciousness_thoughts", {}, 30000); // Think every 5 minutes + await scheduler.scheduleTaskInDb("sleever", "twitter_mentions", {}, 6000); // Check mentions every minute + await scheduler.scheduleTaskInDb( + "sleever", + "consciousness_thoughts", + {}, + 30000 + ); // Think every 5 minutes // Register output handler for Twitter replies - core.registerIOHandler({ + orchestrator.registerIOHandler({ name: "twitter_reply", role: HandlerRole.OUTPUT, execute: async (data: unknown) => { @@ -199,10 +223,10 @@ async function main() { // Clean up resources await consciousness.stop(); - core.removeIOHandler("twitter_mentions"); - core.removeIOHandler("consciousness_thoughts"); - core.removeIOHandler("twitter_reply"); - core.removeIOHandler("twitter_thought"); + orchestrator.removeIOHandler("twitter_mentions"); + orchestrator.removeIOHandler("consciousness_thoughts"); + orchestrator.removeIOHandler("twitter_reply"); + orchestrator.removeIOHandler("twitter_thought"); rl.close(); console.log(chalk.green("✅ Shutdown complete")); diff --git a/packages/core/src/core/mongo-db.ts b/packages/core/src/core/db/mongo-db.ts similarity index 82% rename from packages/core/src/core/mongo-db.ts rename to packages/core/src/core/db/mongo-db.ts index 56e08c2d..ffe37a71 100644 --- a/packages/core/src/core/mongo-db.ts +++ b/packages/core/src/core/db/mongo-db.ts @@ -1,35 +1,13 @@ import { MongoClient, Collection, ObjectId } from "mongodb"; -import type { HandlerRole } from "./types"; +import type { HandlerRole } from "../types"; +import type { + OrchestratorChat, + OrchestratorDb, + OrchestratorMessage, + ScheduledTask, +} from "../memory"; -// Define the shape of a scheduled task document in Mongo -export interface ScheduledTask { - _id?: ObjectId; - userId: string; - handlerName: string; // Which IOHandler to invoke - taskData: Record; // Arbitrary data passed to the handler - nextRunAt: Date; // When the task is next due - intervalMs?: number; // If present, re-schedule after each run - status: "pending" | "running" | "completed" | "failed"; - createdAt: Date; - updatedAt: Date; -} - -export interface OrchestratorMessage { - role: HandlerRole; // "input" | "output" | "action" - name: string; // The IOHandler name - data: unknown; // Arbitrary data your orchestrator is processing - timestamp: Date; -} - -export interface OrchestratorChat { - _id?: ObjectId; - userId: string; - createdAt: Date; - updatedAt: Date; - messages: OrchestratorMessage[]; -} - -export class MongoDb { +export class MongoDb implements OrchestratorDb { private client: MongoClient; private collection!: Collection; private orchestratorCollection!: Collection; @@ -94,9 +72,10 @@ export class MongoDb { taskData: Record = {}, nextRunAt: Date, intervalMs?: number - ): Promise { + ): Promise { const now = new Date(); const doc: ScheduledTask = { + _id: new ObjectId().toString(), userId, handlerName, taskData, @@ -135,7 +114,7 @@ export class MongoDb { /** * Marks a task's status as "running". Typically called right before invoking it. */ - public async markRunning(taskId: ObjectId): Promise { + public async markRunning(taskId: string): Promise { const now = new Date(); await this.collection.updateOne( { _id: taskId }, @@ -151,10 +130,7 @@ export class MongoDb { /** * Marks a task as completed (or failed). */ - public async markCompleted( - taskId: ObjectId, - failed = false - ): Promise { + public async markCompleted(taskId: string, failed = false): Promise { const now = new Date(); await this.collection.updateOne( { _id: taskId }, @@ -171,7 +147,7 @@ export class MongoDb { * Updates a task to run again in the future (if intervalMs is present). */ public async updateNextRun( - taskId: ObjectId, + taskId: string, newRunTime: Date ): Promise { const now = new Date(); @@ -213,7 +189,7 @@ export class MongoDb { * Creates a new "orchestrator" document for a user, returning its generated _id. * This can represent a "new chat/session" with the agent. */ - public async createOrchestrator(userId: string): Promise { + public async createOrchestrator(userId: string): Promise { const chat: OrchestratorChat = { userId: userId, createdAt: new Date(), @@ -221,7 +197,7 @@ export class MongoDb { messages: [], }; const result = await this.orchestratorCollection.insertOne(chat); - return result.insertedId; + return result.insertedId.toString(); } /** @@ -233,7 +209,7 @@ export class MongoDb { * @param data - The data payload to store (e.g., text, JSON from APIs, etc). */ public async addMessage( - orchestratorId: ObjectId, + orchestratorId: string, role: HandlerRole, name: string, data: unknown @@ -260,7 +236,7 @@ export class MongoDb { * Retrieves all messages in a specific orchestrator's conversation. */ public async getMessages( - orchestratorId: ObjectId + orchestratorId: string ): Promise { const doc = await this.orchestratorCollection.findOne({ _id: orchestratorId, @@ -282,21 +258,29 @@ export class MongoDb { * Retrieves a single orchestrator document by its ObjectId. */ public async getOrchestratorById( - orchestratorId: ObjectId + orchestratorId: string ): Promise { - return this.orchestratorCollection.findOne({ _id: orchestratorId }); + return this.orchestratorCollection.findOne({ + _id: orchestratorId, + }); } - public async getOrchestratorsByUserId(userId: string) { + public async getOrchestratorsByUserId( + userId: string + ): Promise { try { - const collection = this.client - .db(this.dbName) - .collection("orchestrators"); - - return await collection + const documents = await this.orchestratorCollection .find({ userId }) .sort({ createdAt: -1 }) .toArray(); + + return documents.map((doc) => ({ + _id: doc._id.toString(), + userId: doc.userId, + createdAt: doc.createdAt, + updatedAt: doc.updatedAt, + messages: doc.messages, + })); } catch (error) { console.error( "MongoDb.getOrchestratorsByUserId", diff --git a/packages/core/src/core/index.ts b/packages/core/src/core/index.ts index 995ad966..c77b2054 100644 --- a/packages/core/src/core/index.ts +++ b/packages/core/src/core/index.ts @@ -5,7 +5,6 @@ import { ChromaVectorDB } from "./vector-db"; import { BaseProcessor } from "./processor"; import { GoalManager } from "./goal-manager"; import { ChainOfThought } from "./chain-of-thought"; -import { TaskScheduler } from "./task-scheduler"; import { Logger } from "./logger"; import { Consciousness } from "./consciousness"; import { LLMClient } from "./llm-client"; @@ -17,6 +16,7 @@ import * as Chains from "./chains"; import * as IO from "./io"; import * as Types from "./types"; import * as Processors from "./processors"; +import { SchedulerService } from "./schedule-service"; export { BaseProcessor, @@ -35,7 +35,7 @@ export { Room, RoomManager, StepManager, - TaskScheduler, Types, Utils, + SchedulerService, }; diff --git a/packages/core/src/core/memory.ts b/packages/core/src/core/memory.ts new file mode 100644 index 00000000..c5410973 --- /dev/null +++ b/packages/core/src/core/memory.ts @@ -0,0 +1,73 @@ +import type { HandlerRole, Memory } from "./types"; +import type { Room } from "./room"; + +// Define interfaces matching MongoDB document shapes +export interface ScheduledTask { + _id: string; + userId: string; + handlerName: string; + taskData: Record; + nextRunAt: Date; + intervalMs?: number; + status: "pending" | "running" | "completed" | "failed"; + createdAt: Date; + updatedAt: Date; +} + +export interface OrchestratorMessage { + role: HandlerRole; + name: string; + data: unknown; + timestamp: Date; +} + +export interface OrchestratorChat { + _id?: string; + userId: string; + createdAt: Date; + updatedAt: Date; + messages: OrchestratorMessage[]; +} + +export interface OrchestratorDb { + connect(): Promise; + close(): Promise; + + // Orchestrator methods + getOrchestratorById(id: string): Promise; + getOrchestratorsByUserId(userId: string): Promise; + createOrchestrator(userId: string): Promise; + addMessage( + orchestratorId: string, + role: HandlerRole, + name: string, + data: any + ): Promise; + getMessages(orchestratorId: string): Promise; + + // Task management methods + createTask( + userId: string, + handlerName: string, + taskData: Record, + nextRunAt: Date, + intervalMs?: number + ): Promise; + findDueTasks(limit?: number): Promise; + markRunning(taskId: string): Promise; + markCompleted(taskId: string, failed?: boolean): Promise; + updateNextRun(taskId: string, newRunTime: Date): Promise; + rescheduleIfRecurring(task: ScheduledTask): Promise; + deleteAll(): Promise; +} + +export interface MemoryManager { + hasProcessedContentInRoom( + contentId: string, + roomId: string + ): Promise; + ensureRoom(roomId: string, source: string, userId?: string): Promise; + getMemoriesFromRoom(roomId: string): Promise; + addMemory(roomId: string, content: string, metadata?: any): Promise; + markContentAsProcessed(contentId: string, roomId: string): Promise; +} diff --git a/packages/core/src/core/orchestrator.ts b/packages/core/src/core/orchestrator.ts index 1c52efa8..bd0928bb 100644 --- a/packages/core/src/core/orchestrator.ts +++ b/packages/core/src/core/orchestrator.ts @@ -1,12 +1,11 @@ import { Logger } from "./logger"; import { RoomManager } from "./room-manager"; -import { TaskScheduler } from "./task-scheduler"; import type { BaseProcessor } from "./processor"; import type { Memory, ProcessedResult, VectorDB } from "./types"; import { HandlerRole, LogLevel, type LoggerConfig } from "./types"; import type { IOHandler } from "./types"; -import type { MongoDb } from "./mongo-db"; -import { ObjectId } from "mongodb"; + +import type { OrchestratorDb } from "./memory"; /** * Orchestrator system that manages both "input" and "output" handlers @@ -19,23 +18,33 @@ export class Orchestrator { */ private readonly ioHandlers = new Map(); - private pollIntervalId?: ReturnType; - + /** + * Collection of processors used to handle different types of content. + * Keyed by processor name. + */ private processors: Map = new Map(); /** - * A TaskScheduler that only schedules and runs input handlers + * Logger instance for logging messages and errors. */ - private readonly inputScheduler: TaskScheduler< - IOHandler & { nextRun: number } - >; - private readonly logger: Logger; - private readonly mongoDb: MongoDb; + /** + * orchestratorDb instance for database operations. + */ + private readonly orchestratorDb: OrchestratorDb; + /** + * User ID associated with the orchestrator. + */ public userId: string; + /** + * Map of unsubscribe functions for various handlers. + * Keyed by handler name. + */ + private unsubscribers = new Map void>(); + /** * Other references in your system. Adjust as needed. */ @@ -44,7 +53,7 @@ export class Orchestrator { private readonly roomManager: RoomManager, vectorDb: VectorDB, processors: BaseProcessor[], - mongoDb: MongoDb, + orchestratorDb: OrchestratorDb, config?: LoggerConfig ) { this.vectorDb = vectorDb; @@ -53,7 +62,7 @@ export class Orchestrator { return [p.getName(), p]; }) ); - this.mongoDb = mongoDb; + this.orchestratorDb = orchestratorDb; this.logger = new Logger( config ?? { @@ -66,19 +75,20 @@ export class Orchestrator { // Initialize userId to an empty string this.userId = ""; - // Our TaskScheduler will handle only input-type IOHandlers - this.inputScheduler = new TaskScheduler(async (handler) => { - await this.processInputTask(handler); - }); + this.logger.info( + "Orchestrator.constructor", + "Orchestrator initialized" + ); + } - this.startPolling(); + public getHandler(name: string): IOHandler | undefined { + return this.ioHandlers.get(name); } public initializeOrchestrator(userId: string) { this.userId = userId; } - private unsubscribers = new Map void>(); /** * Primary method to register any IOHandler (input or output). * - If it's an input with an interval, schedule it for recurring runs. @@ -131,7 +141,9 @@ export class Orchestrator { // Remove the handler itself this.ioHandlers.delete(name); - console.log(`Removed IOHandler: ${name}`); + this.logger.info("Orchestrator.removeIOHandler", "Removed IOHandler", { + name, + }); } /** @@ -174,54 +186,6 @@ export class Orchestrator { } } - /** - * The method the TaskScheduler calls for each scheduled input. - * We only schedule inputs in the constructor's scheduler. - */ - private async processInputTask(handler: IOHandler): Promise { - if (!handler.execute) { - this.logger.error( - "Orchestrator.processInputTask", - "Handler has no execute method", - { handler } - ); - return; - } - try { - // it's undefined because this might be fetching data from an api or something - const result = await handler.execute(undefined); - if (!result) return; - - if (Array.isArray(result)) { - for (const item of result) { - await this.runAutonomousFlow( - item, - handler.name, - this.userId - ); - } - } else { - await this.runAutonomousFlow(result, handler.name, this.userId); - } - } catch (error) { - this.logger.error( - "Orchestrator.processInputTask", - "Error processing input", - { - name: handler.name, - error: - error instanceof Error - ? { - message: error.message, - stack: error.stack, - name: error.name, - } - : error, - handlerType: handler.role, - } - ); - } - } /** * Dispatches data to a registered action handler and returns its result. * @@ -257,12 +221,18 @@ export class Orchestrator { if (handler.role !== "action") { throw new Error(`Handler "${name}" is not an action handler`); } - this.logger.debug("Orchestrator.dispatchToAction", "Executing action", { - name, - data, - }); + try { const result = await handler.execute(data); + + this.logger.debug( + "Orchestrator.dispatchToAction", + "Executing action", + { + name, + data, + } + ); return result; } catch (error) { this.logger.error( @@ -285,7 +255,7 @@ export class Orchestrator { initialData: unknown, sourceName: string, userId: string, - orchestratorId?: ObjectId + orchestratorId?: string ) { const queue: Array<{ data: unknown; source: string }> = []; @@ -304,12 +274,12 @@ export class Orchestrator { // check if we have an orchestratorId if (orchestratorId) { // check if it exists in the db - const existingOrchestrator = await this.mongoDb.getOrchestratorById( - new ObjectId(orchestratorId) - ); + const existingOrchestrator = + await this.orchestratorDb.getOrchestratorById(orchestratorId); if (!existingOrchestrator) { - orchestratorId = await this.mongoDb.createOrchestrator(userId); + orchestratorId = + await this.orchestratorDb.createOrchestrator(userId); } } @@ -317,7 +287,7 @@ export class Orchestrator { if (orchestratorId) { // Record the initial input - await this.mongoDb.addMessage( + await this.orchestratorDb.addMessage( orchestratorId, HandlerRole.INPUT, sourceName, @@ -340,7 +310,7 @@ export class Orchestrator { // Record any action results if we have an orchestratorId if (orchestratorId) { - await this.mongoDb.addMessage( + await this.orchestratorDb.addMessage( orchestratorId, HandlerRole.INPUT, source, @@ -383,20 +353,30 @@ export class Orchestrator { // If any tasks need to be scheduled in the DB, do so if (processed.updateTasks) { for (const task of processed.updateTasks) { - await this.scheduleTaskInDb( - userId, - task.name, - task.data, - task.intervalMs + const now = Date.now(); + const nextRunAt = new Date( + now + (task.intervalMs ?? 0) ); - this.logger.debug( + this.logger.info( "Orchestrator.runAutonomousFlow", - "Scheduled task in DB", + `Scheduling task ${task.name}`, { - task, + nextRunAt, + intervalMs: task.intervalMs, } ); + + await this.orchestratorDb.createTask( + userId, + task.name, + { + request: task.name, + task_data: JSON.stringify(task.data), + }, + nextRunAt, + task.intervalMs + ); } } @@ -427,7 +407,7 @@ export class Orchestrator { // Record output in DB if (orchestratorId) { - await this.mongoDb.addMessage( + await this.orchestratorDb.addMessage( orchestratorId, HandlerRole.OUTPUT, output.name, @@ -452,7 +432,7 @@ export class Orchestrator { // Record action in DB if (orchestratorId) { - await this.mongoDb.addMessage( + await this.orchestratorDb.addMessage( orchestratorId, HandlerRole.ACTION, output.name, @@ -530,7 +510,7 @@ export class Orchestrator { name: string, data: T, userId: string, - orchestratorId?: ObjectId + orchestratorId?: string ): Promise { const handler = this.ioHandlers.get(name); if (!handler) throw new Error(`No IOHandler: ${name}`); @@ -562,159 +542,6 @@ export class Orchestrator { } } - public async scheduleTaskInDb( - userId: string, - handlerName: string, - data: Record = {}, - intervalMs?: number - ): Promise { - const now = Date.now(); - const nextRunAt = new Date(now + (intervalMs ?? 0)); - - this.logger.info( - "Orchestrator.scheduleTaskInDb", - `Scheduling task ${handlerName}`, - { - nextRunAt, - intervalMs, - } - ); - - return await this.mongoDb.createTask( - userId, - handlerName, - { - request: handlerName, - task_data: JSON.stringify(data), - }, - nextRunAt, - intervalMs - ); - } - - public startPolling(everyMs = 10_000): void { - // Stop existing polling if it exists - if (this.pollIntervalId) { - clearInterval(this.pollIntervalId); - } - - this.pollIntervalId = setInterval(() => { - this.pollScheduledTasks().catch((err) => { - this.logger.error( - "Orchestrator.startPolling", - "Error in pollScheduledTasks", - err - ); - }); - }, everyMs); - - this.logger.info( - "Orchestrator.startPolling", - "Started polling for scheduled tasks", - { - intervalMs: everyMs, - } - ); - } - - private async pollScheduledTasks() { - try { - // Guard against undefined collection - if (!this.mongoDb) { - this.logger.error( - "pollScheduledTasks error", - "scheduledTaskDb is not initialized" - ); - return; - } - - const tasks = await this.mongoDb.findDueTasks(); - if (!tasks) { - return; - } - - for (const task of tasks) { - if (!task._id) { - this.logger.error( - "pollScheduledTasks error", - "Task is missing _id" - ); - continue; - } - - // 2. Mark them as 'running' (or handle concurrency the way you want) - await this.mongoDb.markRunning(task._id); - - const handler = this.ioHandlers.get(task.handlerName); - if (!handler) { - throw new Error(`No handler found: ${task.handlerName}`); - } - - const taskData = - typeof task.taskData.task_data === "string" - ? JSON.parse(task.taskData.task_data) - : task.taskData; - - if (handler.role === HandlerRole.INPUT) { - try { - await this.dispatchToInput( - task.handlerName, - taskData, - task.userId - ); - } catch (error) { - this.logger.error( - "Task execution failed", - `Task ${task._id}: ${error instanceof Error ? error.message : String(error)}` - ); - } - } else if (handler.role === HandlerRole.ACTION) { - try { - const actionResult = await this.dispatchToAction( - task.handlerName, - taskData - ); - if (actionResult) { - await this.runAutonomousFlow( - actionResult, - task.handlerName, - task.userId - ); - } - } catch (error) { - this.logger.error( - "Task execution failed", - `Task ${task._id}: ${error instanceof Error ? error.message : String(error)}` - ); - } - } else if (handler.role === HandlerRole.OUTPUT) { - try { - await this.dispatchToOutput(task.handlerName, taskData); - } catch (error) { - this.logger.error( - "Task execution failed", - `Task ${task._id}: ${error instanceof Error ? error.message : String(error)}` - ); - } - } - - // 4. If the task is recurring (interval_ms), update next_run_at - if (task.intervalMs) { - const nextRunAt = new Date(Date.now() + task.intervalMs); - await this.mongoDb.updateNextRun(task._id, nextRunAt); - } else { - // Otherwise, mark completed - await this.mongoDb.markCompleted(task._id); - } - } - } catch (err) { - this.logger.error( - "pollScheduledTasks error", - err instanceof Error ? err.message : String(err) - ); - } - } - public async processContent( content: any, source: string, @@ -841,15 +668,4 @@ export class Orchestrator { return result; } - - /** - * Stops all scheduled tasks and shuts down the orchestrator. - */ - public stop(): void { - this.inputScheduler.stop(); - if (this.pollIntervalId) { - clearInterval(this.pollIntervalId); - } - this.logger.info("Orchestrator.stop", "All scheduled inputs stopped."); - } } diff --git a/packages/core/src/core/schedule-service.ts b/packages/core/src/core/schedule-service.ts new file mode 100644 index 00000000..5197fbdd --- /dev/null +++ b/packages/core/src/core/schedule-service.ts @@ -0,0 +1,125 @@ +import { Orchestrator } from "./orchestrator"; +import { HandlerRole, type VectorDB } from "./types"; +import type { Logger } from "./logger"; +import type { RoomManager } from "./room-manager"; +import type { OrchestratorDb } from "./memory"; + +export interface IOrchestratorContext { + logger: Logger; + orchestratorDb: OrchestratorDb; + roomManager: RoomManager; + vectorDb: VectorDB; +} + +export class SchedulerService { + private intervalId?: ReturnType; + + constructor( + private context: IOrchestratorContext, + private orchestrator: Orchestrator, + private pollMs: number = 10_000 + ) {} + + public start() { + if (this.intervalId) { + clearInterval(this.intervalId); + } + this.intervalId = setInterval(() => this.pollTasks(), this.pollMs); + this.context.logger.info( + "SchedulerService.start", + `Scheduler started polling with pollMs: ${this.pollMs}` + ); + } + + private async pollTasks() { + try { + const tasks = await this.context.orchestratorDb.findDueTasks(); + for (const task of tasks) { + await this.context.orchestratorDb.markRunning(task._id); + + const handler = this.orchestrator.getHandler(task.handlerName); + if (!handler) { + this.context.logger.warn("No handler found", "warn", { + name: task.handlerName, + }); + continue; + } + + // parse out data + const data = JSON.parse(task.taskData.task_data); + + switch (handler.role) { + case HandlerRole.INPUT: + await this.orchestrator.dispatchToInput( + task.handlerName, + data, + task.userId + ); + break; + case HandlerRole.ACTION: + await this.orchestrator.dispatchToAction( + task.handlerName, + data + ); + break; + case HandlerRole.OUTPUT: + await this.orchestrator.dispatchToOutput( + task.handlerName, + data + ); + break; + } + + // handle recurring or complete + if (task.intervalMs) { + await this.context.orchestratorDb.updateNextRun( + task._id, + new Date(Date.now() + task.intervalMs) + ); + } else { + await this.context.orchestratorDb.markCompleted(task._id); + } + } + } catch (err) { + this.context.logger.error("pollTasks error", "error", { + data: err, + }); + } + } + + public async scheduleTaskInDb( + userId: string, + handlerName: string, + data: Record = {}, + intervalMs?: number + ): Promise { + const now = Date.now(); + const nextRunAt = new Date(now + (intervalMs ?? 0)); + + this.context.logger.info( + "SchedulerService.scheduleTaskInDb", + `Scheduling task ${handlerName}`, + { + nextRunAt, + intervalMs, + } + ); + + return await this.context.orchestratorDb.createTask( + userId, + handlerName, + { + request: handlerName, + task_data: JSON.stringify(data), + }, + nextRunAt, + intervalMs + ); + } + + public stop() { + if (this.intervalId) { + clearInterval(this.intervalId); + } + } +} diff --git a/packages/core/src/core/task-scheduler.ts b/packages/core/src/core/task-scheduler.ts deleted file mode 100644 index 0f947187..00000000 --- a/packages/core/src/core/task-scheduler.ts +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Priority queue implementation for scheduling tasks. - * Tasks are ordered by their nextRun timestamp. - * @template T Type must include a nextRun timestamp property - */ -export class TaskScheduler { - private tasks: T[] = []; - private timerId?: NodeJS.Timeout; - - /** - * @param onTaskDue Callback executed when a task is due to run - */ - constructor(private readonly onTaskDue: (task: T) => Promise) {} - - /** - * Schedules a new task or updates an existing one. - * Tasks are automatically sorted by nextRun timestamp. - * @param task The task to schedule - */ - public scheduleTask(task: T): void { - this.tasks = this.tasks.filter((t) => t !== task); - this.tasks.push(task); - this.tasks.sort((a, b) => a.nextRun - b.nextRun); - this.start(); - } - - /** - * Starts or restarts the scheduler timer for the next due task. - * @private - */ - private start() { - if (this.timerId) { - clearTimeout(this.timerId); - this.timerId = undefined; - } - if (this.tasks.length === 0) return; - - const now = Date.now(); - const earliestTask = this.tasks[0]; - const delay = Math.max(0, earliestTask.nextRun - now); - - this.timerId = setTimeout(async () => { - this.timerId = undefined; - const task = this.tasks.shift(); - if (!task) return; - - await this.onTaskDue(task); - - if (this.tasks.length) { - this.start(); - } - }, delay) as unknown as NodeJS.Timeout; - } - - /** - * Stops the scheduler and clears all pending tasks. - */ - public stop() { - if (this.timerId) clearTimeout(this.timerId); - this.tasks = []; - } -} From ae0d1581c9f0c3184801595ddbddf8c86abab54a Mon Sep 17 00:00:00 2001 From: ponderingdemocritus Date: Thu, 30 Jan 2025 19:54:53 +1100 Subject: [PATCH 2/6] processor abstraction --- examples/example-api.ts | 22 +- examples/example-discord.ts | 15 +- examples/example-server.ts | 15 +- examples/example-twitter.ts | 13 +- packages/core/src/core/orchestrator.ts | 58 ++-- packages/core/src/core/processor.ts | 70 +++-- packages/core/src/core/processors/index.ts | 1 + .../src/core/processors/master-processor.ts | 286 ++++++++++++++++++ packages/core/src/core/types/index.ts | 1 + 9 files changed, 402 insertions(+), 79 deletions(-) create mode 100644 packages/core/src/core/processors/master-processor.ts diff --git a/examples/example-api.ts b/examples/example-api.ts index 336f2497..0326e31e 100644 --- a/examples/example-api.ts +++ b/examples/example-api.ts @@ -21,6 +21,7 @@ import { Consciousness } from "../packages/core/src/core/consciousness"; import { z } from "zod"; import readline from "readline"; import { MongoDb } from "../packages/core/src/core/db/mongo-db"; +import { MasterProcessor } from "../packages/core/src/core/processors/master-processor"; async function main() { const loglevel = LogLevel.DEBUG; @@ -45,20 +46,29 @@ async function main() { temperature: 0.3, }); - const researchProcessor = new ResearchQuantProcessor( - researchClient, + const masterProcessor = new MasterProcessor( + llmClient, defaultCharacter, - loglevel, - 1000 // chunk size, depends + loglevel ); // Initialize processor with default character personality - const processor = new MessageProcessor( + const messageProcessor = new MessageProcessor( llmClient, defaultCharacter, loglevel ); + const researchProcessor = new ResearchQuantProcessor( + researchClient, + defaultCharacter, + loglevel, + 1000 // chunk size, depends + ); + + // Add processors to the master processor + masterProcessor.addProcessor([messageProcessor, researchProcessor]); + const scheduledTaskDb = new MongoDb( "mongodb://localhost:27017", "myApp", @@ -74,7 +84,7 @@ async function main() { const orchestrator = new Orchestrator( roomManager, vectorDb, - [processor, researchProcessor], + masterProcessor, scheduledTaskDb, { level: loglevel, diff --git a/examples/example-discord.ts b/examples/example-discord.ts index 3551b142..6007382d 100644 --- a/examples/example-discord.ts +++ b/examples/example-discord.ts @@ -18,6 +18,7 @@ import { z } from "zod"; import readline from "readline"; import { MongoDb } from "../packages/core/src/core/db/mongo-db"; import { Message } from "discord.js"; +import { MasterProcessor } from "../packages/core/src/core/processors/master-processor"; async function main() { // Set logging level as you see fit @@ -39,13 +40,21 @@ async function main() { temperature: 0.3, }); - // Use a sample message processor with a default "character" config - const processor = new MessageProcessor( + const masterProcessor = new MasterProcessor( llmClient, defaultCharacter, loglevel ); + // Initialize processor with default character personality + const messageProcessor = new MessageProcessor( + llmClient, + defaultCharacter, + loglevel + ); + + masterProcessor.addProcessor(messageProcessor); + // Connect to MongoDB (for scheduled tasks, if you use them) const scheduledTaskDb = new MongoDb( "mongodb://localhost:27017", @@ -62,7 +71,7 @@ async function main() { const core = new Orchestrator( roomManager, vectorDb, - [processor], + masterProcessor, scheduledTaskDb, { level: loglevel, diff --git a/examples/example-server.ts b/examples/example-server.ts index 122d0f78..4655e5b0 100644 --- a/examples/example-server.ts +++ b/examples/example-server.ts @@ -18,6 +18,7 @@ import { defaultCharacter } from "../packages/core/src/core/character"; import { LogLevel } from "../packages/core/src/core/types"; import { MongoDb } from "../packages/core/src/core/db/mongo-db"; +import { MasterProcessor } from "../packages/core/src/core/processors/master-processor"; const scheduledTaskDb = new MongoDb( "mongodb://localhost:27017", @@ -51,18 +52,26 @@ async function createDaydreamsAgent() { // 1.3. Room manager initialization const roomManager = new RoomManager(vectorDb); - // 1.4. Initialize processor with default character - const processor = new MessageProcessor( + const masterProcessor = new MasterProcessor( llmClient, defaultCharacter, loglevel ); + // Initialize processor with default character personality + const messageProcessor = new MessageProcessor( + llmClient, + defaultCharacter, + loglevel + ); + + masterProcessor.addProcessor(messageProcessor); + // 1.5. Initialize core system const orchestrator = new Orchestrator( roomManager, vectorDb, - [processor], + masterProcessor, scheduledTaskDb, { level: loglevel, diff --git a/examples/example-twitter.ts b/examples/example-twitter.ts index 3f17344c..7c7eae2e 100644 --- a/examples/example-twitter.ts +++ b/examples/example-twitter.ts @@ -23,6 +23,7 @@ import { z } from "zod"; import readline from "readline"; import { MongoDb } from "../packages/core/src/core/db/mongo-db"; import { SchedulerService } from "../packages/core/src/core/schedule-service"; +import { MasterProcessor } from "../packages/core/src/core/processors/master-processor"; import { Logger } from "../packages/core/src/core/logger"; async function main() { @@ -42,13 +43,21 @@ async function main() { temperature: 0.3, }); + const masterProcessor = new MasterProcessor( + llmClient, + defaultCharacter, + loglevel + ); + // Initialize processor with default character personality - const processor = new MessageProcessor( + const messageProcessor = new MessageProcessor( llmClient, defaultCharacter, loglevel ); + masterProcessor.addProcessor(messageProcessor); + const scheduledTaskDb = new MongoDb( "mongodb://localhost:27017", "myApp", @@ -64,7 +73,7 @@ async function main() { const orchestrator = new Orchestrator( roomManager, vectorDb, - [processor], + masterProcessor, scheduledTaskDb, { level: loglevel, diff --git a/packages/core/src/core/orchestrator.ts b/packages/core/src/core/orchestrator.ts index bd0928bb..53c21131 100644 --- a/packages/core/src/core/orchestrator.ts +++ b/packages/core/src/core/orchestrator.ts @@ -18,12 +18,6 @@ export class Orchestrator { */ private readonly ioHandlers = new Map(); - /** - * Collection of processors used to handle different types of content. - * Keyed by processor name. - */ - private processors: Map = new Map(); - /** * Logger instance for logging messages and errors. */ @@ -52,16 +46,12 @@ export class Orchestrator { constructor( private readonly roomManager: RoomManager, vectorDb: VectorDB, - processors: BaseProcessor[], + private processor: BaseProcessor, orchestratorDb: OrchestratorDb, config?: LoggerConfig ) { this.vectorDb = vectorDb; - this.processors = new Map( - processors.map((p) => { - return [p.getName(), p]; - }) - ); + this.orchestratorDb = orchestratorDb; this.logger = new Logger( @@ -571,6 +561,13 @@ export class Orchestrator { return singleResult ? [singleResult] : []; } + /** + * Process a single piece of content. This is where we: + * - Retrieve memories for the content's room (if any) + * - Let the "master" processor do an initial pass + * - Potentially use a child processor (either from `.nextProcessor` or from `canHandle()`) + * - Save the result to memory, mark as processed, etc. + */ private async processContentItem( content: any, source: string, @@ -578,6 +575,7 @@ export class Orchestrator { ): Promise { let memories: Memory[] = []; + // If the content includes some "room" identifier if (content.room) { const hasProcessed = await this.roomManager.hasProcessedContentInRoom( @@ -587,7 +585,7 @@ export class Orchestrator { if (hasProcessed) { this.logger.debug( - "Orchestrator.processContent", + "Orchestrator.processContentItem", "Content already processed", { contentId: content.contentId, @@ -597,15 +595,18 @@ export class Orchestrator { ); return null; } + + // Make sure the room is created or retrieved const room = await this.roomManager.ensureRoom( content.room, source, userId ); + // Get prior memories from that room memories = await this.roomManager.getMemoriesFromRoom(room.id); this.logger.debug( - "Orchestrator.processContent", + "Orchestrator.processContentItem", "Processing content with context", { content, @@ -617,19 +618,7 @@ export class Orchestrator { ); } - const processor = Array.from(this.processors.values()).find((p) => - p.canHandle(content) - ); - - if (!processor) { - this.logger.debug( - "Orchestrator.processContent", - "No suitable processor found for content", - { content } - ); - return null; - } - + // Gather possible outputs & actions to pass to the Processor const availableOutputs = Array.from(this.ioHandlers.values()).filter( (h) => h.role === HandlerRole.OUTPUT ); @@ -638,7 +627,8 @@ export class Orchestrator { (h) => h.role === HandlerRole.ACTION ); - const result = await processor.process( + // Process the content - delegation is now handled inside the processor + const result = await this.processor.process( content, JSON.stringify(memories), { @@ -647,19 +637,17 @@ export class Orchestrator { } ); - if (content.room) { - // Save the result to memory + // Save and mark processed if we have a room + if (content.room && result) { await this.roomManager.addMemory( content.room, - JSON.stringify(result?.content), + JSON.stringify(result.content), { source, - ...result?.metadata, - ...result?.enrichedContext, + ...result.metadata, + ...result.enrichedContext, } ); - - // Mark the content as processed await this.roomManager.markContentAsProcessed( content.contentId, content.room diff --git a/packages/core/src/core/processor.ts b/packages/core/src/core/processor.ts index f682b365..1e2491b4 100644 --- a/packages/core/src/core/processor.ts +++ b/packages/core/src/core/processor.ts @@ -1,31 +1,20 @@ -import { LLMClient } from "./llm-client"; -import { Logger } from "./logger"; - -import type { Character, ProcessedResult } from "./types"; -import { LogLevel } from "./types"; +// processor.ts -import { type IOHandler } from "./types"; +import { Logger } from "./logger"; +import { LogLevel, type Character, type ProcessedResult } from "./types"; +import type { IOHandler } from "./types"; -/** - * Base abstract class for content processors that handle different types of input - * and generate appropriate responses using LLM. - */ export abstract class BaseProcessor { /** Logger instance for this processor */ protected logger: Logger; + /** Map of child processors (sub-processors) that this processor can delegate to */ + public processors: Map = new Map(); - /** - * Creates a new BaseProcessor instance - * @param metadata - Metadata about this processor including name and description - * @param loggerLevel - The logging level to use - * @param character - The character personality to use for responses - * @param llmClient - The LLM client instance to use for processing - */ constructor( protected metadata: { name: string; description: string }, - protected loggerLevel: LogLevel = LogLevel.ERROR, + protected loggerLevel: LogLevel, protected character: Character, - protected llmClient: LLMClient, + protected llmClient: any, // your LLM client type protected contentLimit: number = 1000 ) { this.logger = new Logger({ @@ -37,34 +26,55 @@ export abstract class BaseProcessor { /** * Gets the name of this processor - * @returns The processor name from metadata */ public getName(): string { return this.metadata.name; } + /** + * Gets the description of this processor + */ + public getDescription(): string { + return this.metadata.description; + } + /** * Determines if this processor can handle the given content. - * @param content - The content to check - * @returns True if this processor can handle the content, false otherwise */ public abstract canHandle(content: any): boolean; /** * Processes the given content and returns a result. - * @param content - The content to process - * @param otherContext - Additional context string to consider during processing - * @param ioContext - Optional context containing available outputs and actions - * @param ioContext.availableOutputs - Array of available output handlers - * @param ioContext.availableActions - Array of available action handlers - * @returns Promise resolving to the processed result */ public abstract process( content: any, otherContext: string, ioContext?: { - availableOutputs: IOHandler[]; - availableActions: IOHandler[]; + availableOutputs?: IOHandler[]; + availableActions?: IOHandler[]; } ): Promise; + + /** + * Adds one or more child processors to this processor + */ + public addProcessor(processors: BaseProcessor | BaseProcessor[]): this { + const toAdd = Array.isArray(processors) ? processors : [processors]; + + for (const processor of toAdd) { + const name = processor.getName(); + if (this.processors.has(name)) { + throw new Error(`Processor with name '${name}' already exists`); + } + this.processors.set(name, processor); + } + return this; + } + + /** + * Gets a child processor by name + */ + public getProcessor(name: string): BaseProcessor | undefined { + return this.processors.get(name); + } } diff --git a/packages/core/src/core/processors/index.ts b/packages/core/src/core/processors/index.ts index 5af41345..09a2a84c 100644 --- a/packages/core/src/core/processors/index.ts +++ b/packages/core/src/core/processors/index.ts @@ -1,2 +1,3 @@ export { MessageProcessor } from "./message-processor"; export { ResearchQuantProcessor } from "./research-processor"; +export { MasterProcessor } from "./master-processor"; diff --git a/packages/core/src/core/processors/master-processor.ts b/packages/core/src/core/processors/master-processor.ts new file mode 100644 index 00000000..c151526c --- /dev/null +++ b/packages/core/src/core/processors/master-processor.ts @@ -0,0 +1,286 @@ +import { LLMClient } from "../llm-client"; + +import type { + ActionIOHandler, + Character, + OutputIOHandler, + ProcessedResult, + SuggestedOutput, +} from "../types"; + +import { getTimeContext, validateLLMResponseSchema } from "../utils"; +import { z } from "zod"; +import { zodToJsonSchema } from "zod-to-json-schema"; +import { BaseProcessor } from "../processor"; +import { LogLevel } from "../types"; + +export class MasterProcessor extends BaseProcessor { + constructor( + protected llmClient: LLMClient, + protected character: Character, + logLevel: LogLevel = LogLevel.ERROR + ) { + super( + { + name: "master", + description: + "This processor handles messages or short text inputs.", + }, + logLevel, + character, + llmClient + ); + } + + /** + * Logic to decide if this processor can handle the given content. + * This processor is designed to handle shorter messages and text content. + */ + public canHandle(content: any): boolean { + // Convert content to string for length check + const contentStr = + typeof content === "string" ? content : JSON.stringify(content); + + // Check if content is short enough for message processing (<1000 chars) + return contentStr.length < this.contentLimit; + } + + async process( + content: any, + otherContext: string, + ioContext?: { + availableOutputs: OutputIOHandler[]; + availableActions: ActionIOHandler[]; + } + ): Promise { + this.logger.debug("Processor.process", "Processing content", { + content, + }); + + const contentStr = + typeof content === "string" ? content : JSON.stringify(content); + + // Add child processors context + const processorContext = Array.from(this.processors.entries()) + .map(([name, processor]) => { + return `${name}: ${processor.getDescription()}`; + }) + .join("\n"); + + const outputsSchemaPart = ioContext?.availableOutputs + .map((handler) => { + return `${handler.name}: ${JSON.stringify(zodToJsonSchema(handler.outputSchema!, handler.name))}`; + }) + .join("\n"); + + const actionsSchemaPart = ioContext?.availableActions + .map((handler) => { + return `${handler.name}: ${JSON.stringify(zodToJsonSchema(handler.outputSchema!, handler.name))}`; + }) + .join("\n"); + + const prompt = `Analyze the following content and provide a complete analysis: + + # New Content to process: + ${contentStr} + + # Other context: + ${otherContext} + + # Available Child Processors: + ${processorContext} + + # Available Outputs: + ${outputsSchemaPart} + + # Available Actions: + ${actionsSchemaPart} + + + 1. Content classification and type + 2. Content enrichment (summary, topics, sentiment, entities, intent) + 3. Determine if any child processors should handle this content + + + + 1. Suggested outputs/actions based on the available handlers based on the content and the available handlers. + 2. If the content is a message, use the personality of the character to determine if the output was successful. + 3. If possible you should include summary of the content in the output for the user to avoid more processing. + + + + 1. Suggested tasks based on the available handlers based on the content and the available handlers. + 2. Only make tasks if you have been told, based off what you think is possible. + + + + 1. Should you reply to the message? + 2. You should only reply if you have been mentioned in the message or you think you can help deeply. + 3. You should never respond to yourself. + + + + + # Speak in the following voice: + ${JSON.stringify(this.character.voice)} + + # Use the following traits to define your behavior: + ${JSON.stringify(this.character.traits)} + + # Use the following examples to guide your behavior: + ${JSON.stringify(this.character.instructions)} + + # Use the following template to craft your message: + ${JSON.stringify(this.character.templates?.tweetTemplate)} + +`; + + try { + const result = await validateLLMResponseSchema({ + prompt, + systemPrompt: + "You are an expert system that analyzes content and provides comprehensive analysis with appropriate automated responses. You can delegate to specialized processors when needed.", + schema: z.object({ + classification: z.object({ + contentType: z.string(), + requiresProcessing: z.boolean(), + delegateToProcessor: z + .string() + .optional() + .describe( + "The name of the processor to delegate to" + ), + context: z.object({ + topic: z.string(), + urgency: z.enum(["high", "medium", "low"]), + additionalContext: z.string(), + }), + }), + enrichment: z.object({ + summary: z.string().max(1000), + topics: z.array(z.string()).max(20), + sentiment: z.enum(["positive", "negative", "neutral"]), + entities: z.array(z.string()), + intent: z + .string() + .describe("The intent of the content"), + }), + updateTasks: z + .array( + z.object({ + name: z + .string() + .describe( + "The name of the task to schedule. This should be a handler name." + ), + confidence: z + .number() + .describe("The confidence score (0-1)"), + intervalMs: z + .number() + .describe("The interval in milliseconds"), + data: z + .any() + .describe( + "The data that matches the task's schema" + ), + }) + ) + .describe( + "Suggested tasks to schedule based on the content and the available handlers. Making this will mean the handlers will be called in the future." + ), + suggestedOutputs: z.array( + z.object({ + name: z + .string() + .describe("The name of the output or action"), + data: z + .any() + .describe( + "The data that matches the output's schema. leave empty if you don't have any data to provide." + ), + confidence: z + .number() + .describe("The confidence score (0-1)"), + reasoning: z + .string() + .describe("The reasoning for the suggestion"), + }) + ), + }), + llmClient: this.llmClient, + logger: this.logger, + }); + + // Check if we should delegate to a child processor + // @dev maybe this should be elsewhere + if (result.classification.delegateToProcessor) { + const childProcessor = this.getProcessor( + result.classification.delegateToProcessor + ); + if (childProcessor && childProcessor.canHandle(content)) { + this.logger.debug( + "Processor.process", + "Delegating to child processor", + { + processor: + result.classification.delegateToProcessor, + } + ); + return childProcessor.process( + content, + otherContext, + ioContext + ); + } + } + + this.logger.debug("Processor.process", "Processed content", { + content, + result, + }); + + return { + content, + metadata: { + ...result.classification.context, + contentType: result.classification.contentType, + }, + enrichedContext: { + ...result.enrichment, + timeContext: getTimeContext(new Date()), + relatedMemories: [], // TODO: fix this abstraction + availableOutputs: ioContext?.availableOutputs.map( + (handler) => handler.name + ), + }, + updateTasks: result.updateTasks, + suggestedOutputs: + result.suggestedOutputs as SuggestedOutput[], + alreadyProcessed: false, + }; + } catch (error) { + this.logger.error("Processor.process", "Processing failed", { + error, + }); + return { + content, + metadata: {}, + enrichedContext: { + timeContext: getTimeContext(new Date()), + summary: contentStr.slice(0, 100), + topics: [], + relatedMemories: [], + sentiment: "neutral", + entities: [], + intent: "unknown", + availableOutputs: ioContext?.availableOutputs.map( + (handler) => handler.name + ), + }, + suggestedOutputs: [], + alreadyProcessed: false, + }; + } + } +} diff --git a/packages/core/src/core/types/index.ts b/packages/core/src/core/types/index.ts index cd58d329..7184c664 100644 --- a/packages/core/src/core/types/index.ts +++ b/packages/core/src/core/types/index.ts @@ -285,6 +285,7 @@ export interface ProcessedResult { suggestedOutputs: SuggestedOutput[]; isOutputSuccess?: boolean; alreadyProcessed?: boolean; + nextProcessor?: string; updateTasks?: { name: string; data?: any; From 6a937132309729cd62635e6989ba4fcd8ce0fe96 Mon Sep 17 00:00:00 2001 From: ponderingdemocritus Date: Thu, 30 Jan 2025 20:04:45 +1100 Subject: [PATCH 3/6] chain of thought --- packages/core/src/core/chain-of-thought.ts | 2 +- .../processors/chain-of-thought-processor.ts | 220 ++++++++++++++++++ 2 files changed, 221 insertions(+), 1 deletion(-) create mode 100644 packages/core/src/core/processors/chain-of-thought-processor.ts diff --git a/packages/core/src/core/chain-of-thought.ts b/packages/core/src/core/chain-of-thought.ts index 7a81fa21..8a2764b0 100644 --- a/packages/core/src/core/chain-of-thought.ts +++ b/packages/core/src/core/chain-of-thought.ts @@ -28,7 +28,7 @@ import { LogLevel } from "./types"; const ajv = new Ajv(); export class ChainOfThought extends EventEmitter { - private stepManager: StepManager; + stepManager: StepManager; private context: ChainOfThoughtContext; private snapshots: ChainOfThoughtContext[]; private logger: Logger; diff --git a/packages/core/src/core/processors/chain-of-thought-processor.ts b/packages/core/src/core/processors/chain-of-thought-processor.ts new file mode 100644 index 00000000..99dba110 --- /dev/null +++ b/packages/core/src/core/processors/chain-of-thought-processor.ts @@ -0,0 +1,220 @@ +import { BaseProcessor } from "../processor"; +import { ChainOfThought } from "../chain-of-thought"; +import type { IOHandler, ProcessedResult, VectorDB } from "../types"; +import { LogLevel } from "../types"; +import type { LLMClient } from "../llm-client"; +import type { Character } from "../types"; + +export class ChainOfThoughtProcessor extends BaseProcessor { + private chainOfThought: ChainOfThought; + + constructor( + llmClient: LLMClient, + memory: VectorDB, + character: Character, + logLevel: LogLevel = LogLevel.ERROR + ) { + super( + { + name: "chain-of-thought", + description: + "Handles complex reasoning tasks using chain of thought processing and goal-based execution. Uses a goal manager to manage goals and a step manager to manage steps.", + }, + logLevel, + character, + llmClient + ); + + this.chainOfThought = new ChainOfThought(llmClient, memory, undefined, { + logLevel, + }); + } + + public canHandle(content: any): boolean { + const contentStr = + typeof content === "string" ? content : JSON.stringify(content); + + // Chain of thought is best for: + // 1. Complex queries requiring multiple steps + // 2. Goal-based reasoning + // 3. Content requiring planning and execution + return ( + contentStr.length > 100 && + (contentStr.includes("goal") || + contentStr.includes("plan") || + contentStr.includes("achieve") || + contentStr.includes("how to") || + contentStr.includes("steps to")) + ); + } + + public async process( + content: any, + otherContext: string, + ioContext?: { + availableOutputs?: IOHandler[]; + availableActions?: IOHandler[]; + } + ): Promise { + this.logger.debug( + "ChainOfThoughtProcessor.process", + "Processing content", + { content } + ); + + // Register available outputs and actions + if (ioContext) { + [ + ...(ioContext.availableOutputs || []), + ...(ioContext.availableActions || []), + ].forEach((handler) => { + this.chainOfThought.registerOutput(handler); + }); + } + + try { + // First, decompose the content into goals + await this.chainOfThought.decomposeObjectiveIntoGoals( + typeof content === "string" ? content : JSON.stringify(content) + ); + + const stats = { + completed: 0, + failed: 0, + total: 0, + }; + + // Execute goals until completion + while (true) { + const readyGoals = + this.chainOfThought.goalManager.getReadyGoals(); + const activeGoals = this.chainOfThought.goalManager + .getGoalsByHorizon("short") + .filter((g) => g.status === "active"); + const pendingGoals = this.chainOfThought.goalManager + .getGoalsByHorizon("short") + .filter((g) => g.status === "pending"); + + // Log progress + this.logger.debug( + "ChainOfThoughtProcessor.process", + "Goal execution progress:", + JSON.stringify({ + ready: readyGoals.length, + active: activeGoals.length, + pending: pendingGoals.length, + completed: stats.completed, + failed: stats.failed, + }) + ); + + // Check if all goals are complete + if ( + readyGoals.length === 0 && + activeGoals.length === 0 && + pendingGoals.length === 0 + ) { + this.logger.debug( + "ChainOfThoughtProcessor.process", + "All goals completed!", + { + ready: readyGoals.length, + active: activeGoals.length, + pending: pendingGoals.length, + completed: stats.completed, + failed: stats.failed, + } + ); + break; + } + + // Handle blocked goals + if (readyGoals.length === 0 && activeGoals.length === 0) { + this.logger.warn( + "ChainOfThoughtProcessor.process", + "No ready or active goals, but some goals are pending", + { + pending: pendingGoals.length, + } + ); + pendingGoals.forEach((goal) => { + const blockingGoals = + this.chainOfThought.goalManager.getBlockingGoals( + goal.id + ); + this.logger.warn( + "ChainOfThoughtProcessor.process", + `Pending Goal: ${goal.description}`, + { + blockedBy: blockingGoals.length, + } + ); + }); + break; + } + + // Execute next goal + try { + await this.chainOfThought.processHighestPriorityGoal(); + stats.completed++; + } catch (error) { + this.logger.error( + "ChainOfThoughtProcessor.process", + "Goal execution failed:", + error + ); + stats.failed++; + } + + stats.total++; + } + + // Get final state and create result + const steps = this.chainOfThought.stepManager.getSteps(); + const context = await this.chainOfThought.getBlackboardState(); + const recentExperiences = + await this.chainOfThought.memory.getRecentEpisodes(5); + + return { + content, + metadata: { + processor: "chain-of-thought", + steps: steps.length, + goalsCompleted: stats.completed, + goalsFailed: stats.failed, + successRate: + stats.total > 0 + ? (stats.completed / stats.total) * 100 + : 0, + topic: "goal-based-reasoning", + urgency: "high", + }, + enrichedContext: { + summary: steps.map((s) => s.content).join("\n"), + topics: steps.flatMap((s) => s.tags || []), + sentiment: "neutral", + entities: [], + intent: "goal-execution", + timeContext: new Date().toISOString(), + relatedMemories: recentExperiences.map( + (exp) => + `Action: ${exp.action}, Outcome: ${exp.outcome}, Importance: ${exp.importance}` + ), + availableOutputs: ioContext?.availableOutputs?.map( + (h) => h.name + ), + ...context, + }, + suggestedOutputs: [], // Chain of thought handles its own outputs + alreadyProcessed: true, // Mark as processed since CoT handles execution + }; + } catch (error) { + this.logger.error( + "ChainOfThoughtProcessor.process", + "Processing failed", + { error } + ); + throw error; + } + } +} From af34465c7f225cd1a7b84f2bbb09535e81c5f323 Mon Sep 17 00:00:00 2001 From: ponderingdemocritus Date: Thu, 30 Jan 2025 20:17:51 +1100 Subject: [PATCH 4/6] orchestrator --- packages/core/src/core/orchestrator.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/src/core/orchestrator.ts b/packages/core/src/core/orchestrator.ts index 53c21131..ce5f6dca 100644 --- a/packages/core/src/core/orchestrator.ts +++ b/packages/core/src/core/orchestrator.ts @@ -627,7 +627,7 @@ export class Orchestrator { (h) => h.role === HandlerRole.ACTION ); - // Process the content - delegation is now handled inside the processor + // Processor has nested logic for processing content and returns the final result const result = await this.processor.process( content, JSON.stringify(memories), From fb2f42b18f64c8c42c6e11b8e580984b5b4ed6c3 Mon Sep 17 00:00:00 2001 From: ponderingdemocritus Date: Thu, 30 Jan 2025 20:29:13 +1100 Subject: [PATCH 5/6] processor --- .../src/core/processors/master-processor.ts | 37 +++++-------------- .../src/core/processors/research-processor.ts | 1 - 2 files changed, 10 insertions(+), 28 deletions(-) diff --git a/packages/core/src/core/processors/master-processor.ts b/packages/core/src/core/processors/master-processor.ts index c151526c..7cf122de 100644 --- a/packages/core/src/core/processors/master-processor.ts +++ b/packages/core/src/core/processors/master-processor.ts @@ -79,7 +79,7 @@ export class MasterProcessor extends BaseProcessor { }) .join("\n"); - const prompt = `Analyze the following content and provide a complete analysis: + const prompt = `You are a master processor that can delegate to child processors. Decide on what do to with the following content: # New Content to process: ${contentStr} @@ -96,6 +96,11 @@ export class MasterProcessor extends BaseProcessor { # Available Actions: ${actionsSchemaPart} + + 1. Decide on what do to with the content. If you an output or action is suggested, you should use it. + 2. If you can't decide, delegate to a child processor or just return. + + 1. Content classification and type 2. Content enrichment (summary, topics, sentiment, entities, intent) @@ -107,32 +112,6 @@ export class MasterProcessor extends BaseProcessor { 2. If the content is a message, use the personality of the character to determine if the output was successful. 3. If possible you should include summary of the content in the output for the user to avoid more processing. - - - 1. Suggested tasks based on the available handlers based on the content and the available handlers. - 2. Only make tasks if you have been told, based off what you think is possible. - - - - 1. Should you reply to the message? - 2. You should only reply if you have been mentioned in the message or you think you can help deeply. - 3. You should never respond to yourself. - - - - - # Speak in the following voice: - ${JSON.stringify(this.character.voice)} - - # Use the following traits to define your behavior: - ${JSON.stringify(this.character.traits)} - - # Use the following examples to guide your behavior: - ${JSON.stringify(this.character.instructions)} - - # Use the following template to craft your message: - ${JSON.stringify(this.character.templates?.tweetTemplate)} - `; try { @@ -212,6 +191,10 @@ export class MasterProcessor extends BaseProcessor { logger: this.logger, }); + this.logger.debug("MasterProcessor.process", "Result", { + result, + }); + // Check if we should delegate to a child processor // @dev maybe this should be elsewhere if (result.classification.delegateToProcessor) { diff --git a/packages/core/src/core/processors/research-processor.ts b/packages/core/src/core/processors/research-processor.ts index 5adb9f04..732a0700 100644 --- a/packages/core/src/core/processors/research-processor.ts +++ b/packages/core/src/core/processors/research-processor.ts @@ -4,7 +4,6 @@ import { LogLevel } from "../types"; import { getTimeContext, validateLLMResponseSchema } from "../utils"; import { z } from "zod"; import { zodToJsonSchema } from "zod-to-json-schema"; -import { type IOHandler } from "../types"; import { BaseProcessor } from "../processor"; import { encodingForModel } from "js-tiktoken"; From 3971a0a2aa577c903c5bdba4652b099cea304e1c Mon Sep 17 00:00:00 2001 From: ponderingdemocritus Date: Thu, 30 Jan 2025 21:35:27 +1100 Subject: [PATCH 6/6] remove user id --- examples/example-api.ts | 6 +- examples/example-server.ts | 9 +- packages/core/src/core/orchestrator.ts | 276 +++++++++------------ packages/core/src/core/schedule-service.ts | 18 +- packages/core/src/core/types/index.ts | 4 + 5 files changed, 142 insertions(+), 171 deletions(-) diff --git a/examples/example-api.ts b/examples/example-api.ts index 0326e31e..cb9d0de9 100644 --- a/examples/example-api.ts +++ b/examples/example-api.ts @@ -220,9 +220,11 @@ async function main() { const outputs: any = await orchestrator.dispatchToInput( "user_chat", { - content: userMessage, - userId, + headers: { + "x-user-id": userId, + }, }, + userMessage, userId ); diff --git a/examples/example-server.ts b/examples/example-server.ts index 4655e5b0..02eed6ed 100644 --- a/examples/example-server.ts +++ b/examples/example-server.ts @@ -150,16 +150,15 @@ wss.on("connection", (ws) => { throw new Error("userId is required"); } - orchestrator.initializeOrchestrator(userId); - // Process the message using the orchestrator with the provided userId const outputs = await orchestrator.dispatchToInput( "user_chat", { - content: userMessage, - userId: userId, + headers: { + "x-user-id": userId, + }, }, - userId, + userMessage, orchestratorId ? orchestratorId : undefined ); diff --git a/packages/core/src/core/orchestrator.ts b/packages/core/src/core/orchestrator.ts index ce5f6dca..bdb55099 100644 --- a/packages/core/src/core/orchestrator.ts +++ b/packages/core/src/core/orchestrator.ts @@ -1,7 +1,7 @@ import { Logger } from "./logger"; import { RoomManager } from "./room-manager"; import type { BaseProcessor } from "./processor"; -import type { Memory, ProcessedResult, VectorDB } from "./types"; +import type { AgentRequest, Memory, ProcessedResult, VectorDB } from "./types"; import { HandlerRole, LogLevel, type LoggerConfig } from "./types"; import type { IOHandler } from "./types"; @@ -28,11 +28,6 @@ export class Orchestrator { */ private readonly orchestratorDb: OrchestratorDb; - /** - * User ID associated with the orchestrator. - */ - public userId: string; - /** * Map of unsubscribe functions for various handlers. * Keyed by handler name. @@ -43,6 +38,7 @@ export class Orchestrator { * Other references in your system. Adjust as needed. */ public readonly vectorDb: VectorDB; + constructor( private readonly roomManager: RoomManager, vectorDb: VectorDB, @@ -51,9 +47,7 @@ export class Orchestrator { config?: LoggerConfig ) { this.vectorDb = vectorDb; - this.orchestratorDb = orchestratorDb; - this.logger = new Logger( config ?? { level: LogLevel.ERROR, @@ -62,9 +56,6 @@ export class Orchestrator { } ); - // Initialize userId to an empty string - this.userId = ""; - this.logger.info( "Orchestrator.constructor", "Orchestrator initialized" @@ -75,10 +66,6 @@ export class Orchestrator { return this.ioHandlers.get(name); } - public initializeOrchestrator(userId: string) { - this.userId = userId; - } - /** * Primary method to register any IOHandler (input or output). * - If it's an input with an interval, schedule it for recurring runs. @@ -100,12 +87,13 @@ export class Orchestrator { this.logger.info( "Orchestrator.registerIOHandler", "Starting stream", - { - data, - } + { data } ); + // Simulate a request-like object here if you want a consistent approach. + // this will register as an agent request + const fakeRequest: AgentRequest = { headers: {} }; // Whenever data arrives, pass it into runAutonomousFlow - await this.runAutonomousFlow(data, handler.name, this.userId); + await this.runAutonomousFlow(fakeRequest, data, handler.name); }); this.unsubscribers.set(handler.name, unsubscribe); } @@ -137,10 +125,13 @@ export class Orchestrator { } /** - * Executes a handler with role="output" by name, passing data to it. - * This is effectively "dispatchToOutput." + * Dispatches data to a registered *output* handler by name, passing in a request plus data. */ - public async dispatchToOutput(name: string, data: T): Promise { + public async dispatchToOutput( + name: string, + request: AgentRequest, + data: T + ): Promise { const handler = this.ioHandlers.get(name); if (!handler || !handler.execute) { throw new Error(`No IOHandler registered with name: ${name}`); @@ -153,6 +144,7 @@ export class Orchestrator { this.logger.debug("Orchestrator.dispatchToOutput", "Executing output", { name, data, + headers: request.headers, }); try { @@ -177,33 +169,13 @@ export class Orchestrator { } /** - * Dispatches data to a registered action handler and returns its result. - * - * @param name - The name of the registered action handler to dispatch to - * @param data - The data to pass to the action handler - * @returns Promise resolving to the action handler's result - * @throws Error if no handler is found with the given name or if it's not an action handler - * - * @example - * ```ts - * // Register an action handler - * orchestrator.registerIOHandler({ - * name: "sendEmail", - * role: "action", - * handler: async (data: {to: string, body: string}) => { - * // Send email logic - * return {success: true}; - * } - * }); - * - * // Dispatch to the action - * const result = await orchestrator.dispatchToAction("sendEmail", { - * to: "user@example.com", - * body: "Hello world" - * }); - * ``` + * Dispatches data to a registered *action* handler by name, passing in a request plus data. */ - public async dispatchToAction(name: string, data: T): Promise { + public async dispatchToAction( + name: string, + request: AgentRequest, + data: T + ): Promise { const handler = this.ioHandlers.get(name); if (!handler || !handler.execute) { throw new Error(`No IOHandler registered with name: ${name}`); @@ -221,6 +193,7 @@ export class Orchestrator { { name, data, + headers: request.headers, } ); return result; @@ -237,16 +210,67 @@ export class Orchestrator { } } + /** + * Dispatches data to a registered *input* handler by name, passing in a request plus data. + * Then continues through the autonomous flow. + */ + public async dispatchToInput( + name: string, + request: AgentRequest, + data: T, + orchestratorId?: string + ): Promise { + const handler = this.ioHandlers.get(name); + if (!handler) throw new Error(`No IOHandler: ${name}`); + if (!handler.execute) { + throw new Error(`Handler "${name}" has no execute method`); + } + if (handler.role !== "input") { + throw new Error(`Handler "${name}" is not role=input`); + } + + try { + // Possibly run a transformation or normalizing step inside `handler.execute` + const result = await handler.execute(data); + + if (result) { + return await this.runAutonomousFlow( + request, + result, + handler.name, + orchestratorId + ); + } + return []; + } catch (error) { + this.logger.error( + "Orchestrator.dispatchToInput", + `dispatchToInput Error: ${ + error instanceof Error ? error.message : String(error) + }` + ); + throw error; + } + } + /** * Takes some incoming piece of data, processes it through the system, - * and handles any follow-on "action" or "output" suggestions in a chain. + * and handles any follow-on "action" or "output" suggestions in a queue. + * + * @param request A request-like object (headers, etc.) from which we extract user info + * @param initialData The data payload to process + * @param sourceName The IOHandler name that provided this data + * @param orchestratorId An optional existing orchestrator record ID to tie into */ private async runAutonomousFlow( + request: AgentRequest, initialData: unknown, sourceName: string, - userId: string, orchestratorId?: string ) { + // For illustration, extract userId from headers. Adjust the header name as needed. + const userId = request.headers["x-user-id"] || "agent"; + const queue: Array<{ data: unknown; source: string }> = []; // If the initial data is already an array, enqueue each item @@ -258,35 +282,36 @@ export class Orchestrator { queue.push({ data: initialData, source: sourceName }); } - // You can keep track of any "outputs" you need to return or do something with + // Optionally store final outputs to return or do something with them const outputs: Array<{ name: string; data: any }> = []; - // check if we have an orchestratorId + // If orchestratorId is provided, verify it in the DB or create a new record if (orchestratorId) { - // check if it exists in the db - const existingOrchestrator = + const existing = await this.orchestratorDb.getOrchestratorById(orchestratorId); - - if (!existingOrchestrator) { + if (!existing) { orchestratorId = await this.orchestratorDb.createOrchestrator(userId); } } - // Create a new orchestrator record if we have a userId + // Otherwise, create a new orchestrator record if needed + if (!orchestratorId) { + orchestratorId = + await this.orchestratorDb.createOrchestrator(userId); + } + // Record initial data as an input message if (orchestratorId) { - // Record the initial input await this.orchestratorDb.addMessage( orchestratorId, HandlerRole.INPUT, sourceName, initialData ); - this.logger.debug( "Orchestrator.runAutonomousFlow", - "Created orchestrator record", + "Created or continued orchestrator record", { orchestratorId, userId, @@ -294,11 +319,11 @@ export class Orchestrator { ); } - // Keep processing while there is something in the queue + // Process items in a queue while (queue.length > 0) { const { data, source } = queue.shift()!; - // Record any action results if we have an orchestratorId + // Record each chunk of data if you want if (orchestratorId) { await this.orchestratorDb.addMessage( orchestratorId, @@ -321,40 +346,32 @@ export class Orchestrator { ); } - // processContent now returns an array of ProcessedResult + // processContent can return an array of ProcessedResult const processedResults = await this.processContent( data, source, userId ); - // If there's nothing to process further, continue if (!processedResults || processedResults.length === 0) { continue; } - // Now handle each ProcessedResult for (const processed of processedResults) { - // If the processor says it's already been handled, skip - if (processed.alreadyProcessed) { - continue; - } + // If the item was already processed, skip + if (processed.alreadyProcessed) continue; - // If any tasks need to be scheduled in the DB, do so + // Possibly schedule any tasks in the DB if (processed.updateTasks) { for (const task of processed.updateTasks) { const now = Date.now(); const nextRunAt = new Date( now + (task.intervalMs ?? 0) ); - this.logger.info( "Orchestrator.runAutonomousFlow", `Scheduling task ${task.name}`, - { - nextRunAt, - intervalMs: task.intervalMs, - } + { nextRunAt, intervalMs: task.intervalMs } ); await this.orchestratorDb.createTask( @@ -370,13 +387,13 @@ export class Orchestrator { } } - // For each suggested output + // For each suggested output or action for (const output of processed.suggestedOutputs ?? []) { const handler = this.ioHandlers.get(output.name); if (!handler) { this.logger.warn( - "No handler found for suggested output", - output.name + "Orchestrator.runAutonomousFlow", + `No handler found for suggested output: ${output.name}` ); continue; } @@ -384,7 +401,11 @@ export class Orchestrator { if (handler.role === HandlerRole.OUTPUT) { // e.g. send a Slack message outputs.push({ name: output.name, data: output.data }); - await this.dispatchToOutput(output.name, output.data); + await this.dispatchToOutput( + output.name, + request, + output.data + ); this.logger.debug( "Orchestrator.runAutonomousFlow", @@ -395,7 +416,6 @@ export class Orchestrator { } ); - // Record output in DB if (orchestratorId) { await this.orchestratorDb.addMessage( orchestratorId, @@ -408,6 +428,7 @@ export class Orchestrator { // e.g. fetch data from an external API const actionResult = await this.dispatchToAction( output.name, + request, output.data ); @@ -420,7 +441,6 @@ export class Orchestrator { } ); - // Record action in DB if (orchestratorId) { await this.orchestratorDb.addMessage( orchestratorId, @@ -433,8 +453,7 @@ export class Orchestrator { ); } - // If the action returns new data (array or single), - // feed it back into the queue to continue the flow + // If the action returns new data, queue it up if (actionResult) { if (Array.isArray(actionResult)) { for (const item of actionResult) { @@ -452,6 +471,7 @@ export class Orchestrator { } } else { this.logger.warn( + "Orchestrator.runAutonomousFlow", "Suggested output has an unrecognized role", handler.role ); @@ -460,78 +480,14 @@ export class Orchestrator { } } - // If you want, you can return the final outputs array or handle it differently + // Return the final outputs array, or handle them in your own way return outputs; } /** - * Dispatches data to a registered input handler and processes the result through the autonomous flow. - * - * @param name - The name of the input handler to dispatch to - * @param data - The data to pass to the input handler - * @returns An array of output suggestions generated from processing the input - * - * @example - * ```ts - * // Register a chat input handler - * orchestrator.registerIOHandler({ - * name: "user_chat", - * role: "input", - * handler: async (message) => { - * return { - * type: "chat", - * content: message.content, - * metadata: { userId: message.userId } - * }; - * } - * }); - * - * // Dispatch a message to the chat handler - * const outputs = await orchestrator.dispatchToInput("user_chat", { - * content: "Hello AI!", - * userId: "user123" - * }); - * ``` - * - * @throws {Error} If no handler is found with the given name - * @throws {Error} If the handler's role is not "input" + * Processes *any* content by splitting it into items (if needed) and + * calling the single-item processor. */ - public async dispatchToInput( - name: string, - data: T, - userId: string, - orchestratorId?: string - ): Promise { - const handler = this.ioHandlers.get(name); - if (!handler) throw new Error(`No IOHandler: ${name}`); - if (!handler.execute) - throw new Error(`Handler "${name}" has no execute method`); - if (handler.role !== "input") { - throw new Error(`Handler "${name}" is not role=input`); - } - - try { - const result = await handler.execute(data); - - if (result) { - return await this.runAutonomousFlow( - result, - handler.name, - userId, - orchestratorId - ); - } - return []; - } catch (error) { - this.logger.error( - "dispatchToInput Error", - `dispatchToInput Error: ${ - error instanceof Error ? error.message : String(error) - }` - ); - } - } - public async processContent( content: any, source: string, @@ -540,6 +496,7 @@ export class Orchestrator { if (Array.isArray(content)) { const allResults: ProcessedResult[] = []; for (const item of content) { + // Example delay to show chunk processing, remove if not needed await new Promise((resolve) => setTimeout(resolve, 5000)); const result = await this.processContentItem( item, @@ -562,11 +519,10 @@ export class Orchestrator { } /** - * Process a single piece of content. This is where we: - * - Retrieve memories for the content's room (if any) - * - Let the "master" processor do an initial pass - * - Potentially use a child processor (either from `.nextProcessor` or from `canHandle()`) - * - Save the result to memory, mark as processed, etc. + * Processes a single item of content: + * - Retrieves prior memories from its room + * - Lets the "master" processor handle it + * - Optionally saves the result to memory/marks it processed */ private async processContentItem( content: any, @@ -575,14 +531,13 @@ export class Orchestrator { ): Promise { let memories: Memory[] = []; - // If the content includes some "room" identifier + // If the content indicates a "room" property if (content.room) { const hasProcessed = await this.roomManager.hasProcessedContentInRoom( content.contentId, content.room ); - if (hasProcessed) { this.logger.debug( "Orchestrator.processContentItem", @@ -596,13 +551,11 @@ export class Orchestrator { return null; } - // Make sure the room is created or retrieved const room = await this.roomManager.ensureRoom( content.room, source, userId ); - // Get prior memories from that room memories = await this.roomManager.getMemoriesFromRoom(room.id); this.logger.debug( @@ -622,12 +575,11 @@ export class Orchestrator { const availableOutputs = Array.from(this.ioHandlers.values()).filter( (h) => h.role === HandlerRole.OUTPUT ); - const availableActions = Array.from(this.ioHandlers.values()).filter( (h) => h.role === HandlerRole.ACTION ); - // Processor has nested logic for processing content and returns the final result + // Processor's main entry point const result = await this.processor.process( content, JSON.stringify(memories), @@ -637,7 +589,7 @@ export class Orchestrator { } ); - // Save and mark processed if we have a room + // If there's a room, save the memory and mark processed if (content.room && result) { await this.roomManager.addMemory( content.room, diff --git a/packages/core/src/core/schedule-service.ts b/packages/core/src/core/schedule-service.ts index 5197fbdd..0f768d85 100644 --- a/packages/core/src/core/schedule-service.ts +++ b/packages/core/src/core/schedule-service.ts @@ -52,19 +52,33 @@ export class SchedulerService { case HandlerRole.INPUT: await this.orchestrator.dispatchToInput( task.handlerName, - data, - task.userId + { + headers: { + "x-user-id": task.userId, + }, + }, + data ); break; case HandlerRole.ACTION: await this.orchestrator.dispatchToAction( task.handlerName, + { + headers: { + "x-user-id": task.userId, + }, + }, data ); break; case HandlerRole.OUTPUT: await this.orchestrator.dispatchToOutput( task.handlerName, + { + headers: { + "x-user-id": task.userId, + }, + }, data ); break; diff --git a/packages/core/src/core/types/index.ts b/packages/core/src/core/types/index.ts index 7184c664..3cec67fd 100644 --- a/packages/core/src/core/types/index.ts +++ b/packages/core/src/core/types/index.ts @@ -610,3 +610,7 @@ export interface ActionIOHandler extends BaseIOHandler { /** Union type of all possible IO handler types */ export type IOHandler = InputIOHandler | OutputIOHandler | ActionIOHandler; + +export interface AgentRequest { + headers: Record; +}