diff --git a/examples/example-api.ts b/examples/example-api.ts index a2741839..2798d631 100644 --- a/examples/example-api.ts +++ b/examples/example-api.ts @@ -11,7 +11,7 @@ import { Orchestrator } from "../packages/core/src/core/orchestrator"; import { HandlerRole } from "../packages/core/src/core/types"; import { RoomManager } from "../packages/core/src/core/room-manager"; import { ChromaVectorDB } from "../packages/core/src/core/vector-db"; -import { Processor } from "../packages/core/src/core/processor"; +import { MessageProcessor } from "../packages/core/src/core/processors/message-processor"; import { LLMClient } from "../packages/core/src/core/llm-client"; import { LogLevel } from "../packages/core/src/core/types"; import chalk from "chalk"; @@ -39,8 +39,7 @@ async function main() { }); // Initialize processor with default character personality - const processor = new Processor( - vectorDb, + const processor = new MessageProcessor( llmClient, defaultCharacter, loglevel @@ -61,7 +60,7 @@ async function main() { const orchestrator = new Orchestrator( roomManager, vectorDb, - processor, + [processor], scheduledTaskDb, { level: loglevel, diff --git a/examples/example-twitter.ts b/examples/example-twitter.ts index 394edcbe..52bad035 100644 --- a/examples/example-twitter.ts +++ b/examples/example-twitter.ts @@ -12,7 +12,7 @@ import { HandlerRole } from "../packages/core/src/core/types"; import { TwitterClient } from "../packages/core/src/core/io/twitter"; import { RoomManager } from "../packages/core/src/core/room-manager"; import { ChromaVectorDB } from "../packages/core/src/core/vector-db"; -import { Processor } from "../packages/core/src/core/processor"; +import { MessageProcessor } from "../packages/core/src/core/processors/message-processor"; import { LLMClient } from "../packages/core/src/core/llm-client"; import { env } from "../packages/core/src/core/env"; import { LogLevel } from "../packages/core/src/core/types"; @@ -41,8 +41,7 @@ async function main() { }); // Initialize processor with default character personality - const processor = new Processor( - vectorDb, + const processor = new MessageProcessor( llmClient, defaultCharacter, loglevel @@ -63,7 +62,7 @@ async function main() { const core = new Orchestrator( roomManager, vectorDb, - processor, + [processor], scheduledTaskDb, { level: loglevel, @@ -104,10 +103,19 @@ async function main() { return null; } - return mentions; + return mentions.map((mention) => ({ + type: "tweet", + room: mention.metadata.conversationId, + contentId: mention.metadata.tweetId, + user: mention.metadata.username, + content: mention.content, + metadata: mention, + })); }, schema: z.object({ type: z.string(), + room: z.string(), + user: z.string(), content: z.string(), metadata: z.record(z.any()), }), diff --git a/packages/core/src/core/index.ts b/packages/core/src/core/index.ts index 1c595e30..995ad966 100644 --- a/packages/core/src/core/index.ts +++ b/packages/core/src/core/index.ts @@ -2,7 +2,7 @@ import { Orchestrator } from "./orchestrator"; import { RoomManager } from "./room-manager"; import { Room } from "./room"; import { ChromaVectorDB } from "./vector-db"; -import { Processor } from "./processor"; +import { BaseProcessor } from "./processor"; import { GoalManager } from "./goal-manager"; import { ChainOfThought } from "./chain-of-thought"; import { TaskScheduler } from "./task-scheduler"; @@ -16,24 +16,26 @@ import * as Providers from "./providers"; import * as Chains from "./chains"; import * as IO from "./io"; import * as Types from "./types"; +import * as Processors from "./processors"; export { - Orchestrator, + BaseProcessor, + Chains, + ChainOfThought, + ChromaVectorDB, Consciousness, + defaultCharacter, + GoalManager, + IO, LLMClient, - StepManager, - TaskScheduler, Logger, - RoomManager, - Room, - ChromaVectorDB, - Processor, - GoalManager, - ChainOfThought, - Utils, - defaultCharacter, + Orchestrator, + Processors, Providers, - Chains, - IO, + Room, + RoomManager, + StepManager, + TaskScheduler, Types, + Utils, }; diff --git a/packages/core/src/core/orchestrator.ts b/packages/core/src/core/orchestrator.ts index d85bb71d..e5312cbc 100644 --- a/packages/core/src/core/orchestrator.ts +++ b/packages/core/src/core/orchestrator.ts @@ -1,8 +1,8 @@ import { Logger } from "./logger"; import { RoomManager } from "./room-manager"; import { TaskScheduler } from "./task-scheduler"; -import type { Processor } from "./processor"; -import type { VectorDB } from "./types"; +import type { BaseProcessor } from "./processor"; +import type { Memory, ProcessedResult, VectorDB } from "./types"; import { HandlerRole, LogLevel, type LoggerConfig } from "./types"; import type { IOHandler } from "./types"; import type { ScheduledTaskMongoDb } from "./scheduled-db"; @@ -21,6 +21,8 @@ export class Orchestrator { private pollIntervalId?: ReturnType; + private processors: Map = new Map(); + /** * A TaskScheduler that only schedules and runs input handlers */ @@ -29,7 +31,6 @@ export class Orchestrator { >; private readonly logger: Logger; - private readonly processor: Processor; private readonly scheduledTaskDb: ScheduledTaskMongoDb; @@ -40,12 +41,16 @@ export class Orchestrator { constructor( private readonly roomManager: RoomManager, vectorDb: VectorDB, - processor: Processor, + processors: BaseProcessor[], scheduledTaskDb: ScheduledTaskMongoDb, config?: LoggerConfig ) { this.vectorDb = vectorDb; - this.processor = processor; + this.processors = new Map( + processors.map((p) => { + return [p.getName(), p]; + }) + ); this.scheduledTaskDb = scheduledTaskDb; this.logger = new Logger( config ?? { @@ -80,8 +85,6 @@ export class Orchestrator { } this.ioHandlers.set(handler.name, handler); - this.processor.registerIOHandler(handler); - this.logger.info( "Orchestrator.registerIOHandler", `Registered ${handler.role}`, @@ -235,86 +238,101 @@ export class Orchestrator { * and handles any follow-on "action" or "output" suggestions in a chain. */ private async runAutonomousFlow(initialData: unknown, sourceName: string) { - // We keep a queue of "items" to process - const queue: Array<{ data: unknown; source: string }> = [ - { data: initialData, source: sourceName }, - ]; + // A queue of "things to process" + const queue: Array<{ data: unknown; source: string }> = []; + + // If the initial data is already an array, enqueue each item + if (Array.isArray(initialData)) { + for (const item of initialData) { + queue.push({ data: item, source: sourceName }); + } + } else { + queue.push({ data: initialData, source: sourceName }); + } + // You can keep track of any "outputs" you need to return or do something with const outputs: Array<{ name: string; data: any }> = []; + // Keep processing while there is something in the queue while (queue.length > 0) { const { data, source } = queue.shift()!; - // 1) Ensure there's a room - const room = await this.roomManager.ensureRoom(source, "core"); - - // 2) Process with your existing processor logic - const processed = await this.processor.process(data, room); + // processContent now returns an array of ProcessedResult + const processedResults = await this.processContent(data, source); - // If the processor thinks we've already processed it, we skip - if (processed.alreadyProcessed) { + // If there's nothing to process further, continue + if (!processedResults || processedResults.length === 0) { continue; } - // 3) Save to memory (like you do in processInputTask) - await this.roomManager.addMemory( - room.id, - JSON.stringify(processed.content), - { - source, - type: "input", - ...processed.metadata, - ...processed.enrichedContext, + // Now handle each ProcessedResult + for (const processed of processedResults) { + // If the processor says it's already been handled, skip + if (processed.alreadyProcessed) { + continue; } - ); - if (processed.updateTasks) { - for (const task of processed.updateTasks) { - await this.scheduleTaskInDb( - task.name, - task.data, - task.intervalMs - ); - } - } - - // 4) For each suggested output, see if it’s an action or an output - for (const output of processed.suggestedOutputs) { - const handler = this.ioHandlers.get(output.name); - if (!handler) { - this.logger.warn( - "No handler found for suggested output", - output.name - ); - continue; + // If any tasks need to be scheduled in the DB, do so + if (processed.updateTasks) { + for (const task of processed.updateTasks) { + await this.scheduleTaskInDb( + task.name, + task.data, + task.intervalMs + ); + } } - if (handler.role === HandlerRole.OUTPUT) { - outputs.push({ name: output.name, data: output.data }); + // For each suggested output + for (const output of processed.suggestedOutputs ?? []) { + const handler = this.ioHandlers.get(output.name); + if (!handler) { + this.logger.warn( + "No handler found for suggested output", + output.name + ); + continue; + } - // Dispatch to an output handler (e.g. send a Slack message) - await this.dispatchToOutput(output.name, output.data); - } else if (handler.role === HandlerRole.ACTION) { - // Execute an action (e.g. fetch data from an API), wait for the result - const actionResult = await this.dispatchToAction( - output.name, - output.data - ); - // Then feed the result back into the queue, so it will be processed - if (actionResult) { - queue.push({ - data: actionResult, - source: output.name, // or keep the same source, your choice - }); + if (handler.role === HandlerRole.OUTPUT) { + // e.g. send a Slack message + outputs.push({ name: output.name, data: output.data }); + await this.dispatchToOutput(output.name, output.data); + } else if (handler.role === HandlerRole.ACTION) { + // e.g. fetch data from an external API + const actionResult = await this.dispatchToAction( + output.name, + output.data + ); + // If the action returns new data (array or single), + // feed it back into the queue to continue the flow + if (actionResult) { + if (Array.isArray(actionResult)) { + for (const item of actionResult) { + queue.push({ + data: item, + source: output.name, + }); + } + } else { + queue.push({ + data: actionResult, + source: output.name, + }); + } + } + } else { + this.logger.warn( + "Suggested output has an unrecognized role", + handler.role + ); } - } else { - this.logger.warn( - "Suggested output has an unrecognized role", - handler.role - ); } } } + + // If you want, you can return the final outputs array or handle it differently + return outputs; } /** @@ -520,6 +538,124 @@ export class Orchestrator { } } + public async processContent( + content: any, + source: string + ): Promise { + // If content is already an array, process each item individually + if (Array.isArray(content)) { + const allResults: ProcessedResult[] = []; + for (const item of content) { + // (Optional) Delay to throttle + await new Promise((resolve) => setTimeout(resolve, 5000)); + + const result = await this.processContentItem(item, source); + if (result) { + allResults.push(result); + } + } + return allResults; + } + + // For single item, wrap in array + const singleResult = await this.processContentItem(content, source); + return singleResult ? [singleResult] : []; + } + + private async processContentItem( + content: any, + source: string + ): Promise { + let memories: Memory[] = []; + + if (content.room) { + const hasProcessed = + await this.roomManager.hasProcessedContentInRoom( + content.contentId, + content.room + ); + + if (hasProcessed) { + this.logger.debug( + "Orchestrator.processContent", + "Content already processed", + { + contentId: content.contentId, + roomId: content.room, + } + ); + return null; + } + const room = await this.roomManager.ensureRoom( + content.room, + source + ); + memories = await this.roomManager.getMemoriesFromRoom(room.id); + + this.logger.debug( + "Orchestrator.processContent", + "Processing content with context", + { + content, + source, + roomId: room.id, + relevantMemories: memories, + } + ); + } + + const processor = Array.from(this.processors.values()).find((p) => + p.canHandle(content) + ); + + if (!processor) { + this.logger.debug( + "Orchestrator.processContent", + "No suitable processor found for content", + { content } + ); + return null; + } + + const availableOutputs = Array.from(this.ioHandlers.values()).filter( + (h) => h.role === HandlerRole.OUTPUT + ); + + const availableActions = Array.from(this.ioHandlers.values()).filter( + (h) => h.role === HandlerRole.ACTION + ); + + const result = await processor.process( + content, + JSON.stringify(memories), + { + availableOutputs, + availableActions, + } + ); + + if (content.room) { + // Save the result to memory + await this.roomManager.addMemory( + content.room, + JSON.stringify(result?.content), + { + source, + ...result?.metadata, + ...result?.enrichedContext, + } + ); + + // Mark the content as processed + await this.roomManager.markContentAsProcessed( + content.contentId, + content.room + ); + } + + return result; + } + /** * Stops all scheduled tasks and shuts down the orchestrator. */ diff --git a/packages/core/src/core/processor.ts b/packages/core/src/core/processor.ts index 625003e3..a7f424ad 100644 --- a/packages/core/src/core/processor.ts +++ b/packages/core/src/core/processor.ts @@ -1,442 +1,69 @@ import { LLMClient } from "./llm-client"; import { Logger } from "./logger"; -import { Room } from "./room"; -import type { VectorDB } from "./types"; -import type { - Character, - ProcessedResult, - SearchResult, - SuggestedOutput, -} from "./types"; +import type { Character, ProcessedResult } from "./types"; import { LogLevel } from "./types"; -import { hashString, validateLLMResponseSchema } from "./utils"; -import { z } from "zod"; -import { zodToJsonSchema } from "zod-to-json-schema"; -import { HandlerRole, type IOHandler } from "./types"; - -export class Processor { - private logger: Logger; - private readonly ioHandlers = new Map(); - +import { type IOHandler } from "./types"; + +/** + * Base abstract class for content processors that handle different types of input + * and generate appropriate responses using LLM. + */ +export abstract class BaseProcessor { + /** Logger instance for this processor */ + protected logger: Logger; + + /** + * Creates a new BaseProcessor instance + * @param metadata - Metadata about this processor including name and description + * @param loggerLevel - The logging level to use + * @param character - The character personality to use for responses + * @param llmClient - The LLM client instance to use for processing + */ constructor( - private vectorDb: VectorDB, - private llmClient: LLMClient, - private character: Character, - logLevel: LogLevel = LogLevel.ERROR + protected metadata: { name: string; description: string }, + protected loggerLevel: LogLevel = LogLevel.ERROR, + protected character: Character, + protected llmClient: LLMClient ) { this.logger = new Logger({ - level: logLevel, + level: loggerLevel, enableColors: true, enableTimestamp: true, }); } - public registerIOHandler(handler: IOHandler): void { - this.ioHandlers.set(handler.name, handler); + /** + * Gets the name of this processor + * @returns The processor name from metadata + */ + public getName(): string { + return this.metadata.name; } - async process(content: any, room: Room): Promise { - this.logger.debug("Processor.process", "Processing content", { - content, - roomId: room.id, - }); - - const contentId = this.generateContentId(content); - - const hasProcessed = await this.hasProcessedContent(contentId, room); - - if (hasProcessed) { - return { - content, - metadata: {}, - enrichedContext: { - timeContext: this.getTimeContext(new Date()), - summary: "", - topics: [], - relatedMemories: [], - sentiment: "neutral", - entities: [], - intent: "unknown", - availableOutputs: Array.from(this.ioHandlers.keys()), - }, - suggestedOutputs: [], - alreadyProcessed: true, - }; - } - - const contentStr = - typeof content === "string" ? content : JSON.stringify(content); - - // TODO: fix this abstraction - // // Get related memories first since we'll need them for context - // const relatedMemories = await this.vectorDb.findSimilarInRoom( - // contentStr, - // room.id, - // 3 - // ); - - const prompt = `Analyze the following content and provide a complete analysis: - -# New Content to process: -${contentStr} - - -# Available Outputs: -${Array.from(this.ioHandlers.entries()) - .filter(([_, handler]) => handler.role === HandlerRole.OUTPUT) - .map( - ([name, output]) => - `${name}: ${JSON.stringify(zodToJsonSchema(output.schema, name))}` - ) - .join("\n")} - -# Available Actions: -${Array.from(this.ioHandlers.entries()) - .filter(([_, handler]) => handler.role === HandlerRole.ACTION) - .map( - ([name, action]) => - `${name}: ${JSON.stringify(zodToJsonSchema(action.schema, name))}` - ) - .join("\n")} - - - 1. Content classification and type - 2. Content enrichment (summary, topics, sentiment, entities, intent) - - - - 1. Suggested outputs/actions based on the available handlers based on the content and the available handlers. - 2. If the content is a message, use the personality of the character to determine if the output was successful. - 3. If possible you should include summary of the content in the output for the user to avoid more processing. - - - - 1. Suggested tasks based on the available handlers based on the content and the available handlers. - 2. Only make tasks if you have been told, based off what you think is possible. - - - - # Speak in the following voice: - ${JSON.stringify(this.character.voice)} - - # Use the following traits to define your behavior: - ${JSON.stringify(this.character.traits)} - - # Use the following examples to guide your behavior: - ${JSON.stringify(this.character.instructions)} - - # Use the following template to craft your message: - ${JSON.stringify(this.character.templates?.tweetTemplate)} - -`; - - try { - const result = await validateLLMResponseSchema({ - prompt, - systemPrompt: - "You are an expert system that analyzes content and provides comprehensive analysis with appropriate automated responses.", - schema: z.object({ - classification: z.object({ - contentType: z.string(), - requiresProcessing: z.boolean(), - context: z.object({ - topic: z.string(), - urgency: z.enum(["high", "medium", "low"]), - additionalContext: z.string(), - }), - }), - enrichment: z.object({ - summary: z.string().max(1000), - topics: z.array(z.string()).max(20), - sentiment: z.enum(["positive", "negative", "neutral"]), - entities: z.array(z.string()), - intent: z - .string() - .describe("The intent of the content"), - }), - updateTasks: z - .array( - z.object({ - name: z - .string() - .describe( - "The name of the task to schedule. This should be a handler name." - ), - confidence: z - .number() - .describe("The confidence score (0-1)"), - intervalMs: z - .number() - .describe("The interval in milliseconds"), - data: z - .any() - .describe( - "The data that matches the task's schema" - ), - }) - ) - .describe( - "Suggested tasks to schedule based on the content and the available handlers. Making this will mean the handlers will be called in the future." - ), - suggestedOutputs: z.array( - z.object({ - name: z - .string() - .describe("The name of the output or action"), - data: z - .any() - .describe( - "The data that matches the output's schema. leave empty if you don't have any data to provide." - ), - confidence: z - .number() - .describe("The confidence score (0-1)"), - reasoning: z - .string() - .describe("The reasoning for the suggestion"), - }) - ), - }), - llmClient: this.llmClient, - logger: this.logger, - }); - - // await this.markContentAsProcessed(contentId, room); - - this.logger.debug("Processor.process", "Processed content", { - content, - result, - }); - - return { - content, - metadata: { - ...result.classification.context, - contentType: result.classification.contentType, - }, - enrichedContext: { - ...result.enrichment, - timeContext: this.getTimeContext(new Date()), - relatedMemories: [], // TODO: fix this abstraction - availableOutputs: Array.from(this.ioHandlers.keys()), - }, - updateTasks: result.updateTasks, - suggestedOutputs: - result.suggestedOutputs as SuggestedOutput[], - alreadyProcessed: false, - }; - } catch (error) { - this.logger.error("Processor.process", "Processing failed", { - error, - }); - return { - content, - metadata: {}, - enrichedContext: { - timeContext: this.getTimeContext(new Date()), - summary: contentStr.slice(0, 100), - topics: [], - relatedMemories: [], - sentiment: "neutral", - entities: [], - intent: "unknown", - availableOutputs: Array.from(this.ioHandlers.keys()), - }, - suggestedOutputs: [], - alreadyProcessed: false, - }; + /** + * Determines if this processor can handle the given content. + * @param content - The content to check + * @returns True if this processor can handle the content, false otherwise + */ + public abstract canHandle(content: any): boolean; + + /** + * Processes the given content and returns a result. + * @param content - The content to process + * @param otherContext - Additional context string to consider during processing + * @param ioContext - Optional context containing available outputs and actions + * @param ioContext.availableOutputs - Array of available output handlers + * @param ioContext.availableActions - Array of available action handlers + * @returns Promise resolving to the processed result + */ + public abstract process( + content: any, + otherContext: string, + ioContext?: { + availableOutputs: IOHandler[]; + availableActions: IOHandler[]; } - } - - private getTimeContext(timestamp: Date): string { - const now = new Date(); - const hoursDiff = - (now.getTime() - timestamp.getTime()) / (1000 * 60 * 60); - - if (hoursDiff < 24) return "very_recent"; - if (hoursDiff < 72) return "recent"; - if (hoursDiff < 168) return "this_week"; - if (hoursDiff < 720) return "this_month"; - return "older"; - } - - // Helper method to generate a consistent ID for content - private generateContentId(content: any): string { - try { - // 1. Special handling for Twitter mentions/tweets array - if (Array.isArray(content) && content[0]?.type === "tweet") { - // Use the newest tweet's ID as the marker - const newestTweet = content[0]; - return `tweet_batch_${newestTweet.metadata.tweetId}`; - } - - // 2. Single tweet handling - if (content?.type === "tweet") { - return `tweet_${content.metadata.tweetId}`; - } - - // 3. If it's a plain string, fallback to hashing the string but also add a small random/time factor. - // This ensures repeated user messages with the same text won't collapse to the same ID. - if (typeof content === "string") { - // Add a short suffix: e.g. timestamp + small random - const suffix = `${Date.now()}_${Math.random() - .toString(36) - .slice(2, 6)}`; - return `content_${hashString(content)}_${suffix}`; - } - - // 4. For arrays (non-tweets), attempt to find known IDs or hash the items - if (Array.isArray(content)) { - const ids = content.map((item) => { - // Check if there's an explicit .id - if (item.id) return item.id; - // Check for item.metadata?.id - if (item.metadata?.id) return item.metadata.id; - - // Otherwise, hash the item - const relevantData = { - content: item.content || item, - type: item.type, - }; - return hashString(JSON.stringify(relevantData)); - }); - - // Join them, but also add a short suffix so different array orders don't collide - const suffix = `${Date.now()}_${Math.random() - .toString(36) - .slice(2, 6)}`; - return `array_${ids.join("_").slice(0, 100)}_${suffix}`; - } - - // 5. For single objects, check .id first - if (content.id) { - return `obj_${content.id}`; - } - - // 6. Special handling for "internal_thought" or "consciousness" - if ( - content.type === "internal_thought" || - content.source === "consciousness" - ) { - const thoughtData = { - content: content.content, - timestamp: content.timestamp, - }; - return `thought_${hashString(JSON.stringify(thoughtData))}`; - } - - // 7. Then check if there's a metadata.id - if (content.metadata?.id) { - return `obj_${content.metadata.id}`; - } - - // 8. Or any metadata key ending with 'id' - if (content.metadata) { - for (const [key, value] of Object.entries(content.metadata)) { - if (key.toLowerCase().endsWith("id") && value) { - return `obj_${value}`; - } - } - } - - // 9. Finally, fallback to hashing the object, - // but add a random/time suffix so repeated content isn't auto-deduplicated. - const relevantData = { - content: content.content || content, - type: content.type, - // Include source if available - ...(content.source && - content.source !== "consciousness" && { - source: content.source, - }), - }; - const baseHash = hashString(JSON.stringify(relevantData)); - const suffix = `${Date.now()}_${Math.random().toString(36).slice(2, 6)}`; - return `obj_${baseHash}_${suffix}`; - } catch (error) { - this.logger.error( - "Processor.generateContentId", - "Error generating ID", - { - error, - content: - typeof content === "object" - ? JSON.stringify(content) - : content, - } - ); - return `fallback_${Date.now()}`; - } - } - - // Check if we've already processed this content - private async hasProcessedContent( - contentId: string, - room: Room - ): Promise { - try { - // Use findSimilarInRoom but with exact metadata matching - const results = await this.vectorDb.findSimilarInRoom( - contentId, // Simple query text since we're using metadata matching - room.id, - 1, - { - contentId: contentId, - } - ); - - return results.length > 0; - } catch (error) { - this.logger.error("Processor.hasProcessedContent", "Check failed", { - error: error instanceof Error ? error.message : String(error), - contentId, - roomId: room.id, - }); - return false; - } - } - - // Mark content as processed - private async markContentAsProcessed( - contentId: string, - room: Room - ): Promise { - try { - const markerId = `processed_${contentId}`; - - await this.vectorDb.storeInRoom( - `Processed marker for content: ${contentId}`, - room.id, - { - type: "processed_marker", - contentId: contentId, - timestamp: new Date().toISOString(), - } - ); - - this.logger.debug( - "Processor.markContentAsProcessed", - "Marked content as processed", - { - contentId, - roomId: room.id, - markerId, - } - ); - } catch (error) { - this.logger.error( - "Processor.markContentAsProcessed", - "Failed to mark content", - { - error: - error instanceof Error ? error.message : String(error), - contentId, - roomId: room.id, - } - ); - throw error; - } - } + ): Promise; } diff --git a/packages/core/src/core/processors/index.ts b/packages/core/src/core/processors/index.ts new file mode 100644 index 00000000..526ac35e --- /dev/null +++ b/packages/core/src/core/processors/index.ts @@ -0,0 +1 @@ +export { MessageProcessor } from "./message-processor"; diff --git a/packages/core/src/core/processors/message-processor.ts b/packages/core/src/core/processors/message-processor.ts new file mode 100644 index 00000000..b39af8b0 --- /dev/null +++ b/packages/core/src/core/processors/message-processor.ts @@ -0,0 +1,226 @@ +import { LLMClient } from "../llm-client"; + +import type { Character, ProcessedResult, SuggestedOutput } from "../types"; +import { LogLevel } from "../types"; + +import { getTimeContext, validateLLMResponseSchema } from "../utils"; +import { z } from "zod"; +import { zodToJsonSchema } from "zod-to-json-schema"; +import { type IOHandler } from "../types"; +import { BaseProcessor } from "../processor"; + +export class MessageProcessor extends BaseProcessor { + constructor( + protected llmClient: LLMClient, + protected character: Character, + logLevel: LogLevel = LogLevel.ERROR + ) { + super( + { + name: "message", + description: + "This processor handles messages or short text inputs.", + }, + logLevel, + character, + llmClient + ); + } + + // TODO: fix this + public canHandle(content: any): boolean { + return true; + } + + async process( + content: any, + otherContext: string, + ioContext?: { + availableOutputs: IOHandler[]; + availableActions: IOHandler[]; + } + ): Promise { + this.logger.debug("Processor.process", "Processing content", { + content, + }); + + const contentStr = + typeof content === "string" ? content : JSON.stringify(content); + + const outputsSchemaPart = ioContext?.availableOutputs + .map((handler) => { + return `${handler.name}: ${JSON.stringify(zodToJsonSchema(handler.schema, handler.name))}`; + }) + .join("\n"); + + const actionsSchemaPart = ioContext?.availableActions + .map((handler) => { + return `${handler.name}: ${JSON.stringify(zodToJsonSchema(handler.schema, handler.name))}`; + }) + .join("\n"); + + const prompt = `Analyze the following content and provide a complete analysis: + + # New Content to process: + ${contentStr} + + # Other context: + ${otherContext} + + # Available Outputs: + ${outputsSchemaPart} + + # Available Actions: + ${actionsSchemaPart} + + + 1. Content classification and type + 2. Content enrichment (summary, topics, sentiment, entities, intent) + + + + 1. Suggested outputs/actions based on the available handlers based on the content and the available handlers. + 2. If the content is a message, use the personality of the character to determine if the output was successful. + 3. If possible you should include summary of the content in the output for the user to avoid more processing. + + + + 1. Suggested tasks based on the available handlers based on the content and the available handlers. + 2. Only make tasks if you have been told, based off what you think is possible. + + + + # Speak in the following voice: + ${JSON.stringify(this.character.voice)} + + # Use the following traits to define your behavior: + ${JSON.stringify(this.character.traits)} + + # Use the following examples to guide your behavior: + ${JSON.stringify(this.character.instructions)} + + # Use the following template to craft your message: + ${JSON.stringify(this.character.templates?.tweetTemplate)} + +`; + + try { + const result = await validateLLMResponseSchema({ + prompt, + systemPrompt: + "You are an expert system that analyzes content and provides comprehensive analysis with appropriate automated responses.", + schema: z.object({ + classification: z.object({ + contentType: z.string(), + requiresProcessing: z.boolean(), + context: z.object({ + topic: z.string(), + urgency: z.enum(["high", "medium", "low"]), + additionalContext: z.string(), + }), + }), + enrichment: z.object({ + summary: z.string().max(1000), + topics: z.array(z.string()).max(20), + sentiment: z.enum(["positive", "negative", "neutral"]), + entities: z.array(z.string()), + intent: z + .string() + .describe("The intent of the content"), + }), + updateTasks: z + .array( + z.object({ + name: z + .string() + .describe( + "The name of the task to schedule. This should be a handler name." + ), + confidence: z + .number() + .describe("The confidence score (0-1)"), + intervalMs: z + .number() + .describe("The interval in milliseconds"), + data: z + .any() + .describe( + "The data that matches the task's schema" + ), + }) + ) + .describe( + "Suggested tasks to schedule based on the content and the available handlers. Making this will mean the handlers will be called in the future." + ), + suggestedOutputs: z.array( + z.object({ + name: z + .string() + .describe("The name of the output or action"), + data: z + .any() + .describe( + "The data that matches the output's schema. leave empty if you don't have any data to provide." + ), + confidence: z + .number() + .describe("The confidence score (0-1)"), + reasoning: z + .string() + .describe("The reasoning for the suggestion"), + }) + ), + }), + llmClient: this.llmClient, + logger: this.logger, + }); + + this.logger.debug("Processor.process", "Processed content", { + content, + result, + }); + + return { + content, + metadata: { + ...result.classification.context, + contentType: result.classification.contentType, + }, + enrichedContext: { + ...result.enrichment, + timeContext: getTimeContext(new Date()), + relatedMemories: [], // TODO: fix this abstraction + availableOutputs: ioContext?.availableOutputs.map( + (handler) => handler.name + ), + }, + updateTasks: result.updateTasks, + suggestedOutputs: + result.suggestedOutputs as SuggestedOutput[], + alreadyProcessed: false, + }; + } catch (error) { + this.logger.error("Processor.process", "Processing failed", { + error, + }); + return { + content, + metadata: {}, + enrichedContext: { + timeContext: getTimeContext(new Date()), + summary: contentStr.slice(0, 100), + topics: [], + relatedMemories: [], + sentiment: "neutral", + entities: [], + intent: "unknown", + availableOutputs: ioContext?.availableOutputs.map( + (handler) => handler.name + ), + }, + suggestedOutputs: [], + alreadyProcessed: false, + }; + } + } +} diff --git a/packages/core/src/core/room-manager.ts b/packages/core/src/core/room-manager.ts index a8680dfb..01b242da 100644 --- a/packages/core/src/core/room-manager.ts +++ b/packages/core/src/core/room-manager.ts @@ -225,4 +225,78 @@ export class RoomManager { await this.vectorDb.deleteRoom(roomId); this.logger.info("RoomManager.deleteRoom", "Room deleted", { roomId }); } + + public async getMemoriesFromRoom( + roomId: string, + limit?: number + ): Promise { + if (!this.vectorDb) { + throw new Error("VectorDB required for getting memories"); + } + + const room = await this.getRoom(roomId); + if (!room) { + throw new Error(`Room ${roomId} not found`); + } + + const memories = await this.vectorDb.getMemoriesFromRoom(roomId, limit); + + return memories.map((memory) => ({ + id: memory.metadata?.memoryId, + roomId: roomId, + content: memory.content, + timestamp: new Date(memory.metadata?.timestamp), + metadata: memory.metadata, + })); + } + + public async hasProcessedContentInRoom( + contentId: string, + roomId: string + ): Promise { + if (!this.vectorDb) { + throw new Error("VectorDB required for getting memories"); + } + + const room = await this.getRoom(roomId); + if (!room) { + this.logger.error( + "RoomManager.markContentAsProcessed", + "Room not found", + { + roomId, + } + ); + return false; + } + + return await this.vectorDb.hasProcessedContent(contentId, room); + } + + public async markContentAsProcessed( + contentId: string, + roomId: string + ): Promise { + if (!this.vectorDb) { + throw new Error( + "VectorDB required for marking content as processed" + ); + } + + const room = await this.getRoom(roomId); + + if (!room) { + this.logger.error( + "RoomManager.markContentAsProcessed", + "Room not found", + { + roomId, + } + ); + return false; + } + + await this.vectorDb.markContentAsProcessed(contentId, room); + return true; + } } diff --git a/packages/core/src/core/utils.ts b/packages/core/src/core/utils.ts index 476f4abe..246d38b4 100644 --- a/packages/core/src/core/utils.ts +++ b/packages/core/src/core/utils.ts @@ -251,3 +251,111 @@ export function hashString(str: string): string { } return Math.abs(hash).toString(36); // Convert to base36 for shorter strings } + +export function getTimeContext(timestamp: Date): string { + const now = new Date(); + const hoursDiff = (now.getTime() - timestamp.getTime()) / (1000 * 60 * 60); + + if (hoursDiff < 24) return "very_recent"; + if (hoursDiff < 72) return "recent"; + if (hoursDiff < 168) return "this_week"; + if (hoursDiff < 720) return "this_month"; + return "older"; +} + +export function generateContentId(content: any): string { + try { + // 1. Special handling for Twitter mentions/tweets array + if (Array.isArray(content) && content[0]?.type === "tweet") { + // Use the newest tweet's ID as the marker + const newestTweet = content[0]; + return `tweet_batch_${newestTweet.metadata.tweetId}`; + } + + // 2. Single tweet handling + if (content?.type === "tweet") { + return `tweet_${content.metadata.tweetId}`; + } + + // 3. If it's a plain string, fallback to hashing the string but also add a small random/time factor. + // This ensures repeated user messages with the same text won't collapse to the same ID. + if (typeof content === "string") { + // Add a short suffix: e.g. timestamp + small random + const suffix = `${Date.now()}_${Math.random() + .toString(36) + .slice(2, 6)}`; + return `content_${hashString(content)}_${suffix}`; + } + + // 4. For arrays (non-tweets), attempt to find known IDs or hash the items + if (Array.isArray(content)) { + const ids = content.map((item) => { + // Check if there's an explicit .id + if (item.id) return item.id; + // Check for item.metadata?.id + if (item.metadata?.id) return item.metadata.id; + + // Otherwise, hash the item + const relevantData = { + content: item.content || item, + type: item.type, + }; + return hashString(JSON.stringify(relevantData)); + }); + + // Join them, but also add a short suffix so different array orders don't collide + const suffix = `${Date.now()}_${Math.random() + .toString(36) + .slice(2, 6)}`; + return `array_${ids.join("_").slice(0, 100)}_${suffix}`; + } + + // 5. For single objects, check .id first + if (content.id) { + return `obj_${content.id}`; + } + + // 6. Special handling for "internal_thought" or "consciousness" + if ( + content.type === "internal_thought" || + content.source === "consciousness" + ) { + const thoughtData = { + content: content.content, + timestamp: content.timestamp, + }; + return `thought_${hashString(JSON.stringify(thoughtData))}`; + } + + // 7. Then check if there's a metadata.id + if (content.metadata?.id) { + return `obj_${content.metadata.id}`; + } + + // 8. Or any metadata key ending with 'id' + if (content.metadata) { + for (const [key, value] of Object.entries(content.metadata)) { + if (key.toLowerCase().endsWith("id") && value) { + return `obj_${value}`; + } + } + } + + // 9. Finally, fallback to hashing the object, + // but add a random/time suffix so repeated content isn't auto-deduplicated. + const relevantData = { + content: content.content || content, + type: content.type, + // Include source if available + ...(content.source && + content.source !== "consciousness" && { + source: content.source, + }), + }; + const baseHash = hashString(JSON.stringify(relevantData)); + const suffix = `${Date.now()}_${Math.random().toString(36).slice(2, 6)}`; + return `obj_${baseHash}_${suffix}`; + } catch (error) { + return `fallback_${Date.now()}`; + } +} diff --git a/packages/core/src/core/vector-db.ts b/packages/core/src/core/vector-db.ts index cb51f261..16f090f0 100644 --- a/packages/core/src/core/vector-db.ts +++ b/packages/core/src/core/vector-db.ts @@ -255,7 +255,7 @@ export class ChromaVectorDB implements VectorDB { description: "Room-specific memory storage", roomId, platform: roomId.split("_")[0], - platformId: roomId.split("_")[1], + platformId: roomId.split("_")[0] + "_platform", // TODO: This is a hack to get the platform ID created: new Date().toISOString(), lastActive: new Date().toISOString(), }, @@ -1591,7 +1591,7 @@ export class ChromaVectorDB implements VectorDB { } // Check if we've already processed this content - private async hasProcessedContent( + public async hasProcessedContent( contentId: string, room: Room ): Promise { @@ -1625,7 +1625,7 @@ export class ChromaVectorDB implements VectorDB { } // Mark content as processed - private async markContentAsProcessed( + public async markContentAsProcessed( contentId: string, room: Room ): Promise { @@ -1668,4 +1668,42 @@ export class ChromaVectorDB implements VectorDB { throw error; } } + + /** + * Gets all memories from a specific room's collection, optionally limited to a certain number + */ + public async getMemoriesFromRoom( + roomId: string, + limit?: number + ): Promise<{ content: string; metadata?: Record }[]> { + try { + const collection = await this.getCollectionForRoom(roomId); + + // Get all documents from the collection, with optional limit + const results = await collection.get({ + limit, + include: ["documents", "metadatas"] as IncludeEnum[], + }); + + if (!results.ids.length) { + return []; + } + + return results.ids.map((_, idx) => ({ + content: results.documents[idx] || "", + metadata: results.metadatas?.[idx] || {}, + })); + } catch (error) { + this.logger.error( + "ChromaVectorDB.getMemoriesFromRoom", + "Failed to get memories", + { + error: + error instanceof Error ? error.message : String(error), + roomId, + } + ); + throw error; + } + } }