From 3971a0a2aa577c903c5bdba4652b099cea304e1c Mon Sep 17 00:00:00 2001 From: ponderingdemocritus Date: Thu, 30 Jan 2025 21:35:27 +1100 Subject: [PATCH] remove user id --- examples/example-api.ts | 6 +- examples/example-server.ts | 9 +- packages/core/src/core/orchestrator.ts | 276 +++++++++------------ packages/core/src/core/schedule-service.ts | 18 +- packages/core/src/core/types/index.ts | 4 + 5 files changed, 142 insertions(+), 171 deletions(-) diff --git a/examples/example-api.ts b/examples/example-api.ts index 0326e31e..cb9d0de9 100644 --- a/examples/example-api.ts +++ b/examples/example-api.ts @@ -220,9 +220,11 @@ async function main() { const outputs: any = await orchestrator.dispatchToInput( "user_chat", { - content: userMessage, - userId, + headers: { + "x-user-id": userId, + }, }, + userMessage, userId ); diff --git a/examples/example-server.ts b/examples/example-server.ts index 4655e5b0..02eed6ed 100644 --- a/examples/example-server.ts +++ b/examples/example-server.ts @@ -150,16 +150,15 @@ wss.on("connection", (ws) => { throw new Error("userId is required"); } - orchestrator.initializeOrchestrator(userId); - // Process the message using the orchestrator with the provided userId const outputs = await orchestrator.dispatchToInput( "user_chat", { - content: userMessage, - userId: userId, + headers: { + "x-user-id": userId, + }, }, - userId, + userMessage, orchestratorId ? orchestratorId : undefined ); diff --git a/packages/core/src/core/orchestrator.ts b/packages/core/src/core/orchestrator.ts index ce5f6dca..bdb55099 100644 --- a/packages/core/src/core/orchestrator.ts +++ b/packages/core/src/core/orchestrator.ts @@ -1,7 +1,7 @@ import { Logger } from "./logger"; import { RoomManager } from "./room-manager"; import type { BaseProcessor } from "./processor"; -import type { Memory, ProcessedResult, VectorDB } from "./types"; +import type { AgentRequest, Memory, ProcessedResult, VectorDB } from "./types"; import { HandlerRole, LogLevel, type LoggerConfig } from "./types"; import type { IOHandler } from "./types"; @@ -28,11 +28,6 @@ export class Orchestrator { */ private readonly orchestratorDb: OrchestratorDb; - /** - * User ID associated with the orchestrator. - */ - public userId: string; - /** * Map of unsubscribe functions for various handlers. * Keyed by handler name. @@ -43,6 +38,7 @@ export class Orchestrator { * Other references in your system. Adjust as needed. */ public readonly vectorDb: VectorDB; + constructor( private readonly roomManager: RoomManager, vectorDb: VectorDB, @@ -51,9 +47,7 @@ export class Orchestrator { config?: LoggerConfig ) { this.vectorDb = vectorDb; - this.orchestratorDb = orchestratorDb; - this.logger = new Logger( config ?? { level: LogLevel.ERROR, @@ -62,9 +56,6 @@ export class Orchestrator { } ); - // Initialize userId to an empty string - this.userId = ""; - this.logger.info( "Orchestrator.constructor", "Orchestrator initialized" @@ -75,10 +66,6 @@ export class Orchestrator { return this.ioHandlers.get(name); } - public initializeOrchestrator(userId: string) { - this.userId = userId; - } - /** * Primary method to register any IOHandler (input or output). * - If it's an input with an interval, schedule it for recurring runs. @@ -100,12 +87,13 @@ export class Orchestrator { this.logger.info( "Orchestrator.registerIOHandler", "Starting stream", - { - data, - } + { data } ); + // Simulate a request-like object here if you want a consistent approach. + // this will register as an agent request + const fakeRequest: AgentRequest = { headers: {} }; // Whenever data arrives, pass it into runAutonomousFlow - await this.runAutonomousFlow(data, handler.name, this.userId); + await this.runAutonomousFlow(fakeRequest, data, handler.name); }); this.unsubscribers.set(handler.name, unsubscribe); } @@ -137,10 +125,13 @@ export class Orchestrator { } /** - * Executes a handler with role="output" by name, passing data to it. - * This is effectively "dispatchToOutput." + * Dispatches data to a registered *output* handler by name, passing in a request plus data. */ - public async dispatchToOutput(name: string, data: T): Promise { + public async dispatchToOutput( + name: string, + request: AgentRequest, + data: T + ): Promise { const handler = this.ioHandlers.get(name); if (!handler || !handler.execute) { throw new Error(`No IOHandler registered with name: ${name}`); @@ -153,6 +144,7 @@ export class Orchestrator { this.logger.debug("Orchestrator.dispatchToOutput", "Executing output", { name, data, + headers: request.headers, }); try { @@ -177,33 +169,13 @@ export class Orchestrator { } /** - * Dispatches data to a registered action handler and returns its result. - * - * @param name - The name of the registered action handler to dispatch to - * @param data - The data to pass to the action handler - * @returns Promise resolving to the action handler's result - * @throws Error if no handler is found with the given name or if it's not an action handler - * - * @example - * ```ts - * // Register an action handler - * orchestrator.registerIOHandler({ - * name: "sendEmail", - * role: "action", - * handler: async (data: {to: string, body: string}) => { - * // Send email logic - * return {success: true}; - * } - * }); - * - * // Dispatch to the action - * const result = await orchestrator.dispatchToAction("sendEmail", { - * to: "user@example.com", - * body: "Hello world" - * }); - * ``` + * Dispatches data to a registered *action* handler by name, passing in a request plus data. */ - public async dispatchToAction(name: string, data: T): Promise { + public async dispatchToAction( + name: string, + request: AgentRequest, + data: T + ): Promise { const handler = this.ioHandlers.get(name); if (!handler || !handler.execute) { throw new Error(`No IOHandler registered with name: ${name}`); @@ -221,6 +193,7 @@ export class Orchestrator { { name, data, + headers: request.headers, } ); return result; @@ -237,16 +210,67 @@ export class Orchestrator { } } + /** + * Dispatches data to a registered *input* handler by name, passing in a request plus data. + * Then continues through the autonomous flow. + */ + public async dispatchToInput( + name: string, + request: AgentRequest, + data: T, + orchestratorId?: string + ): Promise { + const handler = this.ioHandlers.get(name); + if (!handler) throw new Error(`No IOHandler: ${name}`); + if (!handler.execute) { + throw new Error(`Handler "${name}" has no execute method`); + } + if (handler.role !== "input") { + throw new Error(`Handler "${name}" is not role=input`); + } + + try { + // Possibly run a transformation or normalizing step inside `handler.execute` + const result = await handler.execute(data); + + if (result) { + return await this.runAutonomousFlow( + request, + result, + handler.name, + orchestratorId + ); + } + return []; + } catch (error) { + this.logger.error( + "Orchestrator.dispatchToInput", + `dispatchToInput Error: ${ + error instanceof Error ? error.message : String(error) + }` + ); + throw error; + } + } + /** * Takes some incoming piece of data, processes it through the system, - * and handles any follow-on "action" or "output" suggestions in a chain. + * and handles any follow-on "action" or "output" suggestions in a queue. + * + * @param request A request-like object (headers, etc.) from which we extract user info + * @param initialData The data payload to process + * @param sourceName The IOHandler name that provided this data + * @param orchestratorId An optional existing orchestrator record ID to tie into */ private async runAutonomousFlow( + request: AgentRequest, initialData: unknown, sourceName: string, - userId: string, orchestratorId?: string ) { + // For illustration, extract userId from headers. Adjust the header name as needed. + const userId = request.headers["x-user-id"] || "agent"; + const queue: Array<{ data: unknown; source: string }> = []; // If the initial data is already an array, enqueue each item @@ -258,35 +282,36 @@ export class Orchestrator { queue.push({ data: initialData, source: sourceName }); } - // You can keep track of any "outputs" you need to return or do something with + // Optionally store final outputs to return or do something with them const outputs: Array<{ name: string; data: any }> = []; - // check if we have an orchestratorId + // If orchestratorId is provided, verify it in the DB or create a new record if (orchestratorId) { - // check if it exists in the db - const existingOrchestrator = + const existing = await this.orchestratorDb.getOrchestratorById(orchestratorId); - - if (!existingOrchestrator) { + if (!existing) { orchestratorId = await this.orchestratorDb.createOrchestrator(userId); } } - // Create a new orchestrator record if we have a userId + // Otherwise, create a new orchestrator record if needed + if (!orchestratorId) { + orchestratorId = + await this.orchestratorDb.createOrchestrator(userId); + } + // Record initial data as an input message if (orchestratorId) { - // Record the initial input await this.orchestratorDb.addMessage( orchestratorId, HandlerRole.INPUT, sourceName, initialData ); - this.logger.debug( "Orchestrator.runAutonomousFlow", - "Created orchestrator record", + "Created or continued orchestrator record", { orchestratorId, userId, @@ -294,11 +319,11 @@ export class Orchestrator { ); } - // Keep processing while there is something in the queue + // Process items in a queue while (queue.length > 0) { const { data, source } = queue.shift()!; - // Record any action results if we have an orchestratorId + // Record each chunk of data if you want if (orchestratorId) { await this.orchestratorDb.addMessage( orchestratorId, @@ -321,40 +346,32 @@ export class Orchestrator { ); } - // processContent now returns an array of ProcessedResult + // processContent can return an array of ProcessedResult const processedResults = await this.processContent( data, source, userId ); - // If there's nothing to process further, continue if (!processedResults || processedResults.length === 0) { continue; } - // Now handle each ProcessedResult for (const processed of processedResults) { - // If the processor says it's already been handled, skip - if (processed.alreadyProcessed) { - continue; - } + // If the item was already processed, skip + if (processed.alreadyProcessed) continue; - // If any tasks need to be scheduled in the DB, do so + // Possibly schedule any tasks in the DB if (processed.updateTasks) { for (const task of processed.updateTasks) { const now = Date.now(); const nextRunAt = new Date( now + (task.intervalMs ?? 0) ); - this.logger.info( "Orchestrator.runAutonomousFlow", `Scheduling task ${task.name}`, - { - nextRunAt, - intervalMs: task.intervalMs, - } + { nextRunAt, intervalMs: task.intervalMs } ); await this.orchestratorDb.createTask( @@ -370,13 +387,13 @@ export class Orchestrator { } } - // For each suggested output + // For each suggested output or action for (const output of processed.suggestedOutputs ?? []) { const handler = this.ioHandlers.get(output.name); if (!handler) { this.logger.warn( - "No handler found for suggested output", - output.name + "Orchestrator.runAutonomousFlow", + `No handler found for suggested output: ${output.name}` ); continue; } @@ -384,7 +401,11 @@ export class Orchestrator { if (handler.role === HandlerRole.OUTPUT) { // e.g. send a Slack message outputs.push({ name: output.name, data: output.data }); - await this.dispatchToOutput(output.name, output.data); + await this.dispatchToOutput( + output.name, + request, + output.data + ); this.logger.debug( "Orchestrator.runAutonomousFlow", @@ -395,7 +416,6 @@ export class Orchestrator { } ); - // Record output in DB if (orchestratorId) { await this.orchestratorDb.addMessage( orchestratorId, @@ -408,6 +428,7 @@ export class Orchestrator { // e.g. fetch data from an external API const actionResult = await this.dispatchToAction( output.name, + request, output.data ); @@ -420,7 +441,6 @@ export class Orchestrator { } ); - // Record action in DB if (orchestratorId) { await this.orchestratorDb.addMessage( orchestratorId, @@ -433,8 +453,7 @@ export class Orchestrator { ); } - // If the action returns new data (array or single), - // feed it back into the queue to continue the flow + // If the action returns new data, queue it up if (actionResult) { if (Array.isArray(actionResult)) { for (const item of actionResult) { @@ -452,6 +471,7 @@ export class Orchestrator { } } else { this.logger.warn( + "Orchestrator.runAutonomousFlow", "Suggested output has an unrecognized role", handler.role ); @@ -460,78 +480,14 @@ export class Orchestrator { } } - // If you want, you can return the final outputs array or handle it differently + // Return the final outputs array, or handle them in your own way return outputs; } /** - * Dispatches data to a registered input handler and processes the result through the autonomous flow. - * - * @param name - The name of the input handler to dispatch to - * @param data - The data to pass to the input handler - * @returns An array of output suggestions generated from processing the input - * - * @example - * ```ts - * // Register a chat input handler - * orchestrator.registerIOHandler({ - * name: "user_chat", - * role: "input", - * handler: async (message) => { - * return { - * type: "chat", - * content: message.content, - * metadata: { userId: message.userId } - * }; - * } - * }); - * - * // Dispatch a message to the chat handler - * const outputs = await orchestrator.dispatchToInput("user_chat", { - * content: "Hello AI!", - * userId: "user123" - * }); - * ``` - * - * @throws {Error} If no handler is found with the given name - * @throws {Error} If the handler's role is not "input" + * Processes *any* content by splitting it into items (if needed) and + * calling the single-item processor. */ - public async dispatchToInput( - name: string, - data: T, - userId: string, - orchestratorId?: string - ): Promise { - const handler = this.ioHandlers.get(name); - if (!handler) throw new Error(`No IOHandler: ${name}`); - if (!handler.execute) - throw new Error(`Handler "${name}" has no execute method`); - if (handler.role !== "input") { - throw new Error(`Handler "${name}" is not role=input`); - } - - try { - const result = await handler.execute(data); - - if (result) { - return await this.runAutonomousFlow( - result, - handler.name, - userId, - orchestratorId - ); - } - return []; - } catch (error) { - this.logger.error( - "dispatchToInput Error", - `dispatchToInput Error: ${ - error instanceof Error ? error.message : String(error) - }` - ); - } - } - public async processContent( content: any, source: string, @@ -540,6 +496,7 @@ export class Orchestrator { if (Array.isArray(content)) { const allResults: ProcessedResult[] = []; for (const item of content) { + // Example delay to show chunk processing, remove if not needed await new Promise((resolve) => setTimeout(resolve, 5000)); const result = await this.processContentItem( item, @@ -562,11 +519,10 @@ export class Orchestrator { } /** - * Process a single piece of content. This is where we: - * - Retrieve memories for the content's room (if any) - * - Let the "master" processor do an initial pass - * - Potentially use a child processor (either from `.nextProcessor` or from `canHandle()`) - * - Save the result to memory, mark as processed, etc. + * Processes a single item of content: + * - Retrieves prior memories from its room + * - Lets the "master" processor handle it + * - Optionally saves the result to memory/marks it processed */ private async processContentItem( content: any, @@ -575,14 +531,13 @@ export class Orchestrator { ): Promise { let memories: Memory[] = []; - // If the content includes some "room" identifier + // If the content indicates a "room" property if (content.room) { const hasProcessed = await this.roomManager.hasProcessedContentInRoom( content.contentId, content.room ); - if (hasProcessed) { this.logger.debug( "Orchestrator.processContentItem", @@ -596,13 +551,11 @@ export class Orchestrator { return null; } - // Make sure the room is created or retrieved const room = await this.roomManager.ensureRoom( content.room, source, userId ); - // Get prior memories from that room memories = await this.roomManager.getMemoriesFromRoom(room.id); this.logger.debug( @@ -622,12 +575,11 @@ export class Orchestrator { const availableOutputs = Array.from(this.ioHandlers.values()).filter( (h) => h.role === HandlerRole.OUTPUT ); - const availableActions = Array.from(this.ioHandlers.values()).filter( (h) => h.role === HandlerRole.ACTION ); - // Processor has nested logic for processing content and returns the final result + // Processor's main entry point const result = await this.processor.process( content, JSON.stringify(memories), @@ -637,7 +589,7 @@ export class Orchestrator { } ); - // Save and mark processed if we have a room + // If there's a room, save the memory and mark processed if (content.room && result) { await this.roomManager.addMemory( content.room, diff --git a/packages/core/src/core/schedule-service.ts b/packages/core/src/core/schedule-service.ts index 5197fbdd..0f768d85 100644 --- a/packages/core/src/core/schedule-service.ts +++ b/packages/core/src/core/schedule-service.ts @@ -52,19 +52,33 @@ export class SchedulerService { case HandlerRole.INPUT: await this.orchestrator.dispatchToInput( task.handlerName, - data, - task.userId + { + headers: { + "x-user-id": task.userId, + }, + }, + data ); break; case HandlerRole.ACTION: await this.orchestrator.dispatchToAction( task.handlerName, + { + headers: { + "x-user-id": task.userId, + }, + }, data ); break; case HandlerRole.OUTPUT: await this.orchestrator.dispatchToOutput( task.handlerName, + { + headers: { + "x-user-id": task.userId, + }, + }, data ); break; diff --git a/packages/core/src/core/types/index.ts b/packages/core/src/core/types/index.ts index 7184c664..3cec67fd 100644 --- a/packages/core/src/core/types/index.ts +++ b/packages/core/src/core/types/index.ts @@ -610,3 +610,7 @@ export interface ActionIOHandler extends BaseIOHandler { /** Union type of all possible IO handler types */ export type IOHandler = InputIOHandler | OutputIOHandler | ActionIOHandler; + +export interface AgentRequest { + headers: Record; +}