diff --git a/docs/docs/pages/index.mdx b/docs/docs/pages/index.mdx index 1eeb7d49..e9c29f2f 100644 --- a/docs/docs/pages/index.mdx +++ b/docs/docs/pages/index.mdx @@ -1,6 +1,5 @@ Daydreams Logo - [![MIT License](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT) Daydreams is a powerful generative agent library for executing anything onchain. It is chain agnostic and can be used to perform tasks - by simply injecting context with api documentation. Whether you're building on Base, Solana, Ethereum, Starknet, or other chains, Daydreams has you covered. @@ -16,7 +15,6 @@ Think of it as an opinionated framework for building next generation agents. - 🎯 **Goal-Oriented** - Hierarchical goal planning and execution - 🤝 **Multi-Agent Ready** - Built for swarm intelligence and agent collaboration - ## Generative Framework Unlike traditional frameworks that require explicit integrations, Daydreams uses a generative approach where the agent dynamically creates and executes actions through Chain of Thought processing. This means: @@ -51,6 +49,7 @@ type IOHandler = { ``` This IO-centric design means you can: + - Easily plug in new capabilities through handlers - Process multiple operations in parallel - Chain handlers together for complex workflows @@ -86,12 +85,12 @@ bun add @daydreamsai/core Here's a simple example to get you started: ```typescript -import { Orchestrator, MessageProcessor, LLMClient } from '@daydreamsai/core'; +import { Orchestrator, MessageProcessor, LLMClient } from "@daydreamsai/core"; // Initialize the LLM client const llm = new LLMClient({ model: "openrouter:deepseek/deepseek-r1-distill-llama-70b", - temperature: 0.3 + temperature: 0.3, }); // Create a message processor @@ -120,12 +119,12 @@ orchestrator.registerIOHandler({ name: "simple_action", role: "action", schema: z.object({ - message: z.string() + message: z.string(), }), handler: async (payload) => { console.log(`Executing action: ${payload.message}`); return { success: true }; - } + }, }); ``` @@ -148,6 +147,7 @@ Building blocks that process data and produce outputs: ### Chain of Thought The reasoning engine that: + - Plans strategies for achieving goals - Breaks down complex tasks - Executes actions @@ -162,20 +162,22 @@ Stores and retrieves experiences using vector databases for contextual decision Here's a glimpse of what you can build with Daydreams - a Twitter bot that autonomously interacts and generates content: ```typescript -import { - Orchestrator, - TwitterClient, +import { + Orchestrator, + TwitterClient, RoomManager, ChromaVectorDB, MessageProcessor, LLMClient, - Consciousness -} from '@daydreamsai/core'; + Consciousness, + HandlerRole, + LogLevel, +} from "@daydreamsai/core"; // 1. Initialize vector database for memory storage const vectorDb = new ChromaVectorDB("twitter_agent", { chromaUrl: "http://localhost:8000", - logLevel: "debug" + logLevel: LogLevel.DEBUG, }); // 2. Set up room management for conversation contexts @@ -184,93 +186,127 @@ const roomManager = new RoomManager(vectorDb); // 3. Initialize the LLM client for processing const llm = new LLMClient({ model: "openrouter:deepseek/deepseek-r1-distill-llama-70b", - temperature: 0.3 + temperature: 0.3, }); // 4. Create message processor with default personality -const processor = new MessageProcessor(llm); +const processor = new MessageProcessor(llm, defaultCharacter, LogLevel.DEBUG); -// 5. Initialize the orchestrator to manage everything +// 5. Initialize MongoDB for scheduled tasks +const scheduledTaskDb = new MongoDb( + "mongodb://localhost:27017", + "myApp", + "scheduled_tasks" +); + +// 6. Initialize the orchestrator to manage everything const orchestrator = new Orchestrator( roomManager, vectorDb, [processor], scheduledTaskDb, { - level: "debug", + level: LogLevel.DEBUG, enableColors: true, - enableTimestamp: true + enableTimestamp: true, } ); -// 6. Set up Twitter client -const twitter = new TwitterClient({ - username: process.env.TWITTER_USERNAME, - password: process.env.TWITTER_PASSWORD, - email: process.env.TWITTER_EMAIL -}); +// 7. Set up Twitter client +const twitter = new TwitterClient( + { + username: process.env.TWITTER_USERNAME, + password: process.env.TWITTER_PASSWORD, + email: process.env.TWITTER_EMAIL, + }, + LogLevel.DEBUG +); -// 7. Initialize autonomous thought generation +// 8. Initialize autonomous thought generation const consciousness = new Consciousness(llm, roomManager, { intervalMs: 300000, // Think every 5 minutes - minConfidence: 0.7 + minConfidence: 0.7, + logLevel: LogLevel.DEBUG, }); -// 8. Register handler to monitor Twitter mentions +// 9. Register handler to monitor Twitter mentions orchestrator.registerIOHandler({ name: "twitter_mentions", - role: "input", - handler: async () => { + role: HandlerRole.INPUT, + execute: async () => { const mentions = await twitter.createMentionsInput(60000).handler(); if (!mentions?.length) return null; - - return mentions.map(mention => ({ + + return mentions.map((mention) => ({ type: "tweet", room: mention.metadata.conversationId, + contentId: mention.metadata.tweetId, user: mention.metadata.username, content: mention.content, - metadata: mention + metadata: mention, })); }, - schema: z.object({ - type: z.string(), - room: z.string(), - user: z.string(), - content: z.string(), - metadata: z.record(z.any()) - }) }); -// 9. Register handler for posting tweets +// 10. Register handler for posting tweets orchestrator.registerIOHandler({ - name: "twitter_post", - role: "output", - handler: async (data) => { - const tweetData = data as { content: string }; + name: "twitter_thought", + role: HandlerRole.OUTPUT, + execute: async (data) => { + const thoughtData = data as { content: string }; return twitter.createTweetOutput().handler({ - content: tweetData.content + content: thoughtData.content, }); }, - schema: z.object({ - content: z.string() - .regex(/^[\x20-\x7E]*$/, "ASCII characters only") - .describe("Tweet content, max 280 characters") - }) + outputSchema: z + .object({ + content: z + .string() + .regex(/^[\x20-\x7E]*$/, "No emojis or non-ASCII characters allowed"), + }) + .describe("Content of the tweet, max 280 characters"), +}); + +// 11. Register handler for Twitter replies +orchestrator.registerIOHandler({ + name: "twitter_reply", + role: HandlerRole.OUTPUT, + execute: async (data) => { + const tweetData = data as { content: string; inReplyTo: string }; + return twitter.createTweetOutput().handler(tweetData); + }, + outputSchema: z + .object({ + content: z.string(), + inReplyTo: z + .string() + .optional() + .describe("The tweet ID to reply to, if any"), + }) + .describe("Use this for replying to tweets you've been mentioned in"), }); -// 10. Schedule recurring tasks +// 12. Schedule recurring tasks await orchestrator.scheduleTaskInDb( "twitter_bot", - "twitter_mentions", - {}, - 60000 // Check mentions every minute + "twitter_mentions", + {}, + 6000 // Check mentions every minute ); -// 11. Start autonomous thought generation +await orchestrator.scheduleTaskInDb( + "twitter_bot", + "consciousness_thoughts", + {}, + 30000 // Generate thoughts every 30 seconds +); + +// Start autonomous thought generation consciousness.start(); ``` This example demonstrates: + - Setting up a vector database for memory - Configuring room management for conversations - Initializing the LLM and message processor @@ -278,4 +314,3 @@ This example demonstrates: - Registering handlers for mentions and posts - Implementing autonomous thought generation - Scheduling recurring tasks - diff --git a/examples/example-api.ts b/examples/example-api.ts index 6d8f858e..9b3b8621 100644 --- a/examples/example-api.ts +++ b/examples/example-api.ts @@ -93,10 +93,10 @@ async function main() { orchestrator.registerIOHandler({ name: "fetchGithubIssues", role: HandlerRole.ACTION, - schema: z.object({ + outputSchema: z.object({ repo: z.string(), }), - handler: async (payload) => { + execute: async (payload) => { // 1. Fetch some info from GitHub // 2. Return the fetched data so it can be processed as "new input" // to the next step in the chain. @@ -114,7 +114,7 @@ async function main() { name: "universalApiCall", role: HandlerRole.ACTION, // The agent must fill out these fields to make a valid request - schema: z + outputSchema: z .object({ method: z.enum(["GET", "POST", "PUT", "PATCH", "DELETE"]), url: z.string().url(), @@ -124,7 +124,7 @@ async function main() { .describe( "Use this to fetch data from an API. It should include the method, url, headers, and body." ), - handler: async (payload) => { + execute: async (payload) => { const { method, url, headers, body } = payload as { method: string; url: string; @@ -163,13 +163,13 @@ async function main() { name: "user_chat", role: HandlerRole.INPUT, // This schema describes what a user message looks like - schema: z.object({ + outputSchema: z.object({ content: z.string(), userId: z.string().optional(), }), // For "on-demand" input handlers, the `handler()` can be a no-op. // We'll call it manually with data, so we don't need an interval. - handler: async (payload) => { + execute: async (payload) => { // We simply return the payload so the Orchestrator can process it return payload; }, @@ -178,11 +178,11 @@ async function main() { orchestrator.registerIOHandler({ name: "ui_chat_reply", role: HandlerRole.OUTPUT, - schema: z.object({ + outputSchema: z.object({ userId: z.string().optional(), message: z.string(), }), - handler: async (payload) => { + execute: async (payload) => { const { userId, message } = payload as { userId?: string; message: string; diff --git a/examples/example-basic.ts b/examples/example-basic.ts index 871083d6..9731044d 100644 --- a/examples/example-basic.ts +++ b/examples/example-basic.ts @@ -86,11 +86,11 @@ async function main() { dreams.registerOutput({ name: "EXECUTE_TRANSACTION", role: HandlerRole.OUTPUT, - handler: async (data: any) => { + execute: async (data: any) => { const result = await starknetChain.write(data.payload); return `Transaction: ${JSON.stringify(result, null, 2)}`; }, - schema: z + outputSchema: z .object({ contractAddress: z .string() @@ -112,7 +112,7 @@ async function main() { dreams.registerOutput({ name: "GRAPHQL_FETCH", role: HandlerRole.OUTPUT, - handler: async (data: any) => { + execute: async (data: any) => { const { query, variables } = data.payload ?? {}; const result = await fetchGraphQL( env.GRAPHQL_URL + "/graphql", @@ -125,7 +125,7 @@ async function main() { ].join("\n\n"); return `GraphQL data fetched successfully: ${resultStr}`; }, - schema: z + outputSchema: z .object({ query: z.string() .describe(`"query GetRealmInfo { eternumRealmModels(where: { realm_id: 42 }) { edges { node { ... on eternum_Realm { diff --git a/examples/example-discord.ts b/examples/example-discord.ts index e167730d..fbbb6100 100644 --- a/examples/example-discord.ts +++ b/examples/example-discord.ts @@ -1,25 +1,26 @@ /** - * Example demonstrating a Discord bot using the Daydreams package. - * This bot can: - * - Reply to DMs + * Example demonstrating a Discord bot using the Daydreams package, + * updated to use a streaming IOHandler so we can handle real-time + * Discord messages without manual dispatch calls. */ import { Orchestrator } from "../packages/core/src/core/orchestrator"; -import { HandlerRole } from "../packages/core/src/core/types"; +import { HandlerRole, LogLevel } from "../packages/core/src/core/types"; import { DiscordClient } from "../packages/core/src/core/io/discord"; import { RoomManager } from "../packages/core/src/core/room-manager"; import { ChromaVectorDB } from "../packages/core/src/core/vector-db"; import { MessageProcessor } from "../packages/core/src/core/processors/message-processor"; import { LLMClient } from "../packages/core/src/core/llm-client"; import { env } from "../packages/core/src/core/env"; -import { LogLevel } from "../packages/core/src/core/types"; import chalk from "chalk"; import { defaultCharacter } from "../packages/core/src/core/character"; import { z } from "zod"; import readline from "readline"; import { MongoDb } from "../packages/core/src/core/mongo-db"; +import { Message } from "discord.js"; async function main() { + // Set logging level as you see fit const loglevel = LogLevel.DEBUG; // Initialize core dependencies @@ -28,35 +29,36 @@ async function main() { logLevel: loglevel, }); - await vectorDb.purge(); // Clear previous session data + // Optional: Purge previous session data if you want a fresh start + await vectorDb.purge(); const roomManager = new RoomManager(vectorDb); const llmClient = new LLMClient({ - model: "anthropic/claude-3-5-sonnet-latest", // Using a known supported model + model: "anthropic/claude-3-5-sonnet-latest", // Example model temperature: 0.3, }); - // Initialize processor with default character personality + // Use a sample message processor with a default "character" config const processor = new MessageProcessor( llmClient, defaultCharacter, loglevel ); - // Initialize core system + // Connect to MongoDB (for scheduled tasks, if you use them) const scheduledTaskDb = new MongoDb( "mongodb://localhost:27017", "myApp", "scheduled_tasks" ); - await scheduledTaskDb.connect(); console.log(chalk.green("✅ Scheduled task database connected")); + // Clear any existing tasks if you like await scheduledTaskDb.deleteAll(); - // Initialize core system + // Create the Orchestrator const core = new Orchestrator( roomManager, vectorDb, @@ -69,94 +71,56 @@ async function main() { } ); - function messageCreate(bot: any, message: any) { - const isMention = - message.mentions.users.findKey( - (user: any) => user.id === bot.id - ) !== undefined; - if (isMention) { - core.dispatchToInput( - "discord_mention", - { - content: message.content, - sentBy: message.author.id, - channelId: message.channelId, - }, - message.author.id - ); - } - } - - // Set up Discord client with credentials + // Initialize the Discord client const discord = new DiscordClient( { discord_token: env.DISCORD_TOKEN, + discord_bot_name: "DeepLoaf", }, - loglevel, - { - messageCreate, - } + loglevel ); - // Register input handler for Discord mentions + // 1) REGISTER A STREAMING INPUT + // This handler sets up a Discord listener. On mention, it + // pipes data into Orchestrator via "onData". core.registerIOHandler({ - name: "discord_mention", + name: "discord_stream", role: HandlerRole.INPUT, - handler: async (data: unknown) => { - const message = data as { content: string; sentBy: string }; - console.log(chalk.blue("🔍 Received Discord mention...")); - - return [message]; - }, - schema: z.object({ - sentBy: z.string(), - content: z.string(), - channelId: z.string(), - }), - }); - - // Register output handler for Discord replies - core.registerIOHandler({ - name: "discord_reply", - role: HandlerRole.OUTPUT, - handler: async (data: unknown) => { - const messageData = data as { - content: string; - channelId: string; + subscribe: (onData) => { + discord.startMessageStream((incomingMessage: Message) => { + onData(incomingMessage); + }); + return () => { + discord.stopMessageStream(); }; - return discord.createMessageOutput().handler(messageData); }, - schema: z - .object({ - content: z.string(), - channelId: z - .string() - .optional() - .describe("The channel ID of the message"), - }) - .describe( - "If you have been tagged or mentioned in Discord, use this. This is for replying to a message." - ), }); - // Set up readline interface + // 2) REGISTER AN OUTPUT HANDLER + // This allows your Processor to suggest messages that are posted back to Discord + + core.registerIOHandler(discord.createMessageOutput()); + + // (Optional) Set up a console readline for manual input, etc. const rl = readline.createInterface({ input: process.stdin, output: process.stdout, }); - // Start the prompt loop console.log(chalk.cyan("🤖 Bot is now running and monitoring Discord...")); - console.log(chalk.cyan("You can type messages in the console.")); - console.log(chalk.cyan('Type "exit" to quit')); + console.log( + chalk.cyan("You can also type messages in this console for debugging.") + ); + console.log(chalk.cyan('Type "exit" to quit.')); - // Handle graceful shutdown + // Handle graceful shutdown (Ctrl-C, etc.) process.on("SIGINT", async () => { console.log(chalk.yellow("\n\nShutting down...")); - // Clean up resources - discord.destroy(); - core.removeIOHandler("discord_mention"); + // If we want to stop the streaming IO handler: + core.removeIOHandler("discord_stream"); + + // Also remove any other handlers or do cleanup core.removeIOHandler("discord_reply"); rl.close(); diff --git a/examples/example-goal.ts b/examples/example-goal.ts index f1c5ae92..5c5a80ad 100644 --- a/examples/example-goal.ts +++ b/examples/example-goal.ts @@ -104,7 +104,7 @@ async function main() { dreams.registerOutput({ name: "EXECUTE_TRANSACTION", role: HandlerRole.OUTPUT, - handler: async (data: any) => { + execute: async (data: any) => { const result = await starknetChain.write(data.payload); return `Transaction executed successfully: ${JSON.stringify( result, @@ -112,7 +112,7 @@ async function main() { 2 )}`; }, - schema: z + outputSchema: z .object({ contractAddress: z .string() @@ -134,7 +134,7 @@ async function main() { dreams.registerOutput({ name: "GRAPHQL_FETCH", role: HandlerRole.OUTPUT, - handler: async (data: any) => { + execute: async (data: any) => { const { query, variables } = data.payload ?? {}; const result = await fetchGraphQL( env.GRAPHQL_URL + "/graphql", @@ -147,7 +147,7 @@ async function main() { ].join("\n\n"); return `GraphQL data fetched successfully: ${resultStr}`; }, - schema: z + outputSchema: z .object({ query: z.string() .describe(`"query GetRealmInfo { eternumRealmModels(where: { realm_id: 42 }) { edges { node { ... on eternum_Realm { diff --git a/examples/example-server.ts b/examples/example-server.ts index cbd2d228..13732ad0 100644 --- a/examples/example-server.ts +++ b/examples/example-server.ts @@ -75,11 +75,11 @@ async function createDaydreamsAgent() { orchestrator.registerIOHandler({ name: "user_chat", role: HandlerRole.INPUT, - schema: z.object({ + outputSchema: z.object({ content: z.string(), userId: z.string().optional(), }), - handler: async (payload) => { + execute: async (payload) => { return payload; }, }); @@ -87,11 +87,11 @@ async function createDaydreamsAgent() { orchestrator.registerIOHandler({ name: "chat_reply", role: HandlerRole.OUTPUT, - schema: z.object({ + outputSchema: z.object({ userId: z.string().optional(), message: z.string(), }), - handler: async (payload) => { + execute: async (payload) => { const { userId, message } = payload as { userId?: string; message: string; diff --git a/examples/example-twitter.ts b/examples/example-twitter.ts index 012f41f3..17759d6b 100644 --- a/examples/example-twitter.ts +++ b/examples/example-twitter.ts @@ -92,7 +92,7 @@ async function main() { core.registerIOHandler({ name: "twitter_mentions", role: HandlerRole.INPUT, - handler: async () => { + execute: async () => { console.log(chalk.blue("🔍 Checking Twitter mentions...")); // Create a static mentions input handler const mentionsInput = twitter.createMentionsInput(60000); @@ -112,20 +112,13 @@ async function main() { metadata: mention, })); }, - schema: z.object({ - type: z.string(), - room: z.string(), - user: z.string(), - content: z.string(), - metadata: z.record(z.any()), - }), }); // Register input handler for autonomous thoughts core.registerIOHandler({ name: "consciousness_thoughts", role: HandlerRole.INPUT, - handler: async () => { + execute: async () => { console.log(chalk.blue("🧠 Generating thoughts...")); const thought = await consciousness.start(); @@ -136,25 +129,20 @@ async function main() { return thought; }, - schema: z.object({ - type: z.string(), - content: z.string(), - metadata: z.record(z.any()), - }), }); // Register output handler for posting thoughts to Twitter core.registerIOHandler({ name: "twitter_thought", role: HandlerRole.OUTPUT, - handler: async (data: unknown) => { + execute: async (data: unknown) => { const thoughtData = data as { content: string }; return twitter.createTweetOutput().handler({ content: thoughtData.content, }); }, - schema: z + outputSchema: z .object({ content: z .string() @@ -176,12 +164,12 @@ async function main() { core.registerIOHandler({ name: "twitter_reply", role: HandlerRole.OUTPUT, - handler: async (data: unknown) => { + execute: async (data: unknown) => { const tweetData = data as { content: string; inReplyTo: string }; return twitter.createTweetOutput().handler(tweetData); }, - schema: z + outputSchema: z .object({ content: z.string(), inReplyTo: z diff --git a/packages/core/src/core/chain-of-thought.ts b/packages/core/src/core/chain-of-thought.ts index 79557a09..7a81fa21 100644 --- a/packages/core/src/core/chain-of-thought.ts +++ b/packages/core/src/core/chain-of-thought.ts @@ -6,6 +6,7 @@ import type { RefinedGoal, VectorDB, IOHandler, + OutputIOHandler, } from "./types"; import { Logger } from "./logger"; import { EventEmitter } from "events"; @@ -1082,13 +1083,16 @@ export class ChainOfThought extends EventEmitter { try { // Get the output handler and schema - const output = this.outputs.get(action.type); - if (!output) { + const output = this.outputs.get(action.type) as OutputIOHandler; + if (!output || !output.execute || !output.outputSchema) { return `No handler registered for action type "${action.type}" try again`; } // Convert Zod schema to JSON schema - const jsonSchema = zodToJsonSchema(output.schema, action.type); + const jsonSchema = zodToJsonSchema( + output.outputSchema, + action.type + ); const validate = ajv.compile(jsonSchema); // Validate the payload against the schema @@ -1096,7 +1100,7 @@ export class ChainOfThought extends EventEmitter { return "Invalid action payload - schema validation failed"; } - const result = await output.handler(action); + const result = await output.execute(action); // Format the result for better readability const formattedResult = @@ -1143,7 +1147,17 @@ export class ChainOfThought extends EventEmitter { const lastSteps = JSON.stringify(this.stepManager.getSteps()); - const availableOutputs = Array.from(this.outputs.entries()); + const availableOutputs = Array.from(this.outputs.entries()) as [ + string, + OutputIOHandler, + ][]; + + const availableOutputsSchema = availableOutputs + .filter(([_, output]) => output.outputSchema) + .map(([name, output]) => { + return `${name}: ${JSON.stringify(zodToJsonSchema(output.outputSchema, name), null, 2)}`; + }) + .join("\n\n"); const prompt = ` @@ -1211,13 +1225,7 @@ Each action must include: - **payload**: The action data structured as per the available actions. - **context**: A contextual description or metadata related to the action's execution. This can include statuses, results, or any pertinent information that may influence future actions. -${availableOutputs - .map( - ([name, output]) => `${name}: - ${JSON.stringify(zodToJsonSchema(output.schema, name), null, 2)} - ` - ) - .join("\n\n")} +${availableOutputsSchema} diff --git a/packages/core/src/core/io/discord.ts b/packages/core/src/core/io/discord.ts index cd76e71f..c3be1be4 100644 --- a/packages/core/src/core/io/discord.ts +++ b/packages/core/src/core/io/discord.ts @@ -1,11 +1,18 @@ -import { Client, Events, GatewayIntentBits, Partials, User } from "discord.js"; -import type { JSONSchemaType } from "ajv"; +import { + Client, + Events, + GatewayIntentBits, + Message, + Partials, +} from "discord.js"; import { Logger } from "../../core/logger"; -import { LogLevel } from "../types"; +import { HandlerRole, LogLevel, type IOHandler } from "../types"; import { env } from "../../core/env"; +import { z } from "zod"; export interface DiscordCredentials { discord_token: string; + discord_bot_name: string; } export interface MessageData { @@ -15,31 +22,23 @@ export interface MessageData { sendBy?: string; } -export interface EventCallbacks { - messageCreate?: (bot: any, message: any) => void; -} - -// Schema for message output validation -export const messageSchema: JSONSchemaType = { - type: "object", - properties: { - content: { type: "string" }, - channelId: { type: "string" }, - sendBy: { type: "string", nullable: true }, - conversationId: { type: "string", nullable: true }, - }, - required: ["content", "channelId"], - additionalProperties: false, -}; - +export const messageSchema = z.object({ + content: z.string().describe("The content of the message"), + channelId: z.string().describe("The channel ID where the message is sent"), + sendBy: z.string().optional().describe("The user ID of the sender"), + conversationId: z + .string() + .optional() + .describe("The conversation ID (if applicable)"), +}); export class DiscordClient { private client: Client; private logger: Logger; + private messageListener?: (...args: any[]) => void; constructor( private credentials: DiscordCredentials, - logLevel: LogLevel = LogLevel.INFO, - eventCallbacks: EventCallbacks + logLevel: LogLevel = LogLevel.INFO ) { this.client = new Client({ intents: [ @@ -51,72 +50,128 @@ export class DiscordClient { GatewayIntentBits.DirectMessageTyping, GatewayIntentBits.DirectMessageReactions, ], - partials: [Partials.Channel], // Enable DM + partials: [Partials.Channel], // For DM support }); - this.credentials = credentials; + this.logger = new Logger({ level: logLevel, enableColors: true, enableTimestamp: true, }); - if (eventCallbacks.messageCreate) { - this.client.on(Events.MessageCreate, (message) => { - if (eventCallbacks.messageCreate) { - eventCallbacks.messageCreate(this.client.user, message); - } - }); - } - - this.client.on(Events.ClientReady, async () => { + // Handle "ready" event + this.client.on(Events.ClientReady, () => { this.logger.info("DiscordClient", "Initialized successfully"); }); + // Log in to Discord this.client.login(this.credentials.discord_token).catch((error) => { this.logger.error("DiscordClient", "Failed to login", { error }); console.error("Login error:", error); }); } + /** + * Optionally start listening to Discord messages. + * The onData callback typically feeds data into Orchestrator or similar. + */ + public startMessageStream(onData: (data: any) => void) { + this.logger.info("DiscordClient", "Starting message stream..."); + + // If you want to capture the listener reference for removal: + this.messageListener = (message: Message) => { + // Here, you could decide what "data" looks like + // E.g., check if the bot was mentioned, etc. + + if ( + message.author.displayName == this.credentials.discord_bot_name + ) { + console.log( + `Skipping message from ${this.credentials.discord_bot_name}` + ); + return; + } + + onData({ + content: message.content, + channelId: message.channelId, + sentBy: message.author?.id, + }); + }; + + this.client.on(Events.MessageCreate, this.messageListener); + } + + /** + * Optionally remove the message listener if you want to stop the stream. + */ + public stopMessageStream() { + if (this.messageListener) { + this.client.removeListener( + Events.MessageCreate, + this.messageListener + ); + this.logger.info("DiscordClient", "Message stream stopped"); + } + } + + /** + * Gracefully destroy the Discord connection + */ public destroy() { + this.stopMessageStream(); this.client.destroy(); + this.logger.info("DiscordClient", "Client destroyed"); } /** - * Create an output for sending messages + * Create an output for sending messages (useful for Orchestrator OUTPUT handlers). */ - public createMessageOutput() { + public createMessageOutput(): IOHandler { return { + role: HandlerRole.OUTPUT, name: "discord_message", - handler: async (data: MessageData) => { - return await this.sendMessage(data); - }, - response: { - success: "boolean", - channelId: "string", + execute: async (data: T) => { + return await this.sendMessage(data as MessageData); }, - schema: messageSchema, + outputSchema: messageSchema, }; } - private async sendMessage(data: MessageData) { + private async sendMessage(data: MessageData): Promise<{ + success: boolean; + messageId?: string; + content?: string; + error?: string; + }> { try { - this.logger.info( - "DiscordClient.sendMessage", - "Would send message", - { - data, - } - ); + this.logger.info("DiscordClient.sendMessage", "Sending message", { + data, + }); if (env.DRY_RUN) { + this.logger.info( + "DiscordClient.sendMessage", + "Dry run enabled", + { + data, + } + ); return { success: true, - channelId: "DRY RUN CHANNEL ID", + messageId: "DRY_RUN", + content: "DRY_RUN", + error: "DRY_RUN", + }; + } + if (!data?.channelId || !data?.content) { + return { + success: false, + error: "Channel ID and content are required", }; } - const channel = this.client.channels.cache.get(data.channelId); + const channel = this.client.channels.cache.get(data?.channelId); if (!channel?.isTextBased()) { const error = new Error( `Invalid or unsupported channel: ${data.channelId}` @@ -130,11 +185,15 @@ export class DiscordClient { ); throw error; } + + // @ts-ignore - no idea why this is chucking error and i cannot be bothered to fix it const sentMessage = await channel.send(data.content); return { success: true, messageId: sentMessage.id, + content: data.content, + error: undefined, }; } catch (error) { this.logger.error( @@ -144,17 +203,10 @@ export class DiscordClient { error, } ); - throw error; + return { + success: false, + error: error instanceof Error ? error.message : "Unknown error", + }; } } } - -// Example usage: -/* -const discord = new DiscordClient({ - discord_token: "M..." -}); - -// Register output -core.registerOutput(discord.createMessageOutput()); -*/ diff --git a/packages/core/src/core/orchestrator.ts b/packages/core/src/core/orchestrator.ts index b6e5e034..1c52efa8 100644 --- a/packages/core/src/core/orchestrator.ts +++ b/packages/core/src/core/orchestrator.ts @@ -78,6 +78,7 @@ export class Orchestrator { this.userId = userId; } + private unsubscribers = new Map void>(); /** * Primary method to register any IOHandler (input or output). * - If it's an input with an interval, schedule it for recurring runs. @@ -88,19 +89,31 @@ export class Orchestrator { this.logger.warn( "Orchestrator.registerIOHandler", "Overwriting handler with same name", - { - name: handler.name, - } + { name: handler.name } ); } + this.ioHandlers.set(handler.name, handler); + if (handler.role === HandlerRole.INPUT && handler.subscribe) { + const unsubscribe = handler.subscribe(async (data) => { + this.logger.info( + "Orchestrator.registerIOHandler", + "Starting stream", + { + data, + } + ); + // Whenever data arrives, pass it into runAutonomousFlow + await this.runAutonomousFlow(data, handler.name, this.userId); + }); + this.unsubscribers.set(handler.name, unsubscribe); + } + this.logger.info( "Orchestrator.registerIOHandler", `Registered ${handler.role}`, - { - name: handler.name, - } + { name: handler.name } ); } @@ -108,14 +121,17 @@ export class Orchestrator { * Removes a handler (input or output) by name, stopping scheduling if needed. */ public removeIOHandler(name: string): void { - if (this.ioHandlers.has(name)) { - // If it was scheduled as an input, it will no longer be re-scheduled - this.ioHandlers.delete(name); - this.logger.info( - "Orchestrator.removeIOHandler", - `Removed IOHandler: ${name}` - ); + // If we have an unsubscribe function, call it + const unsub = this.unsubscribers.get(name); + if (unsub) { + unsub(); // e.g. remove event listeners, clear intervals, etc. + this.unsubscribers.delete(name); } + + // Remove the handler itself + this.ioHandlers.delete(name); + + console.log(`Removed IOHandler: ${name}`); } /** @@ -124,7 +140,7 @@ export class Orchestrator { */ public async dispatchToOutput(name: string, data: T): Promise { const handler = this.ioHandlers.get(name); - if (!handler) { + if (!handler || !handler.execute) { throw new Error(`No IOHandler registered with name: ${name}`); } @@ -138,7 +154,12 @@ export class Orchestrator { }); try { - const result = await handler.handler(data); + const result = await handler.execute(data); + + this.logger.info("Orchestrator.dispatchToOutput", "Output result", { + result, + }); + return result; } catch (error) { this.logger.error( @@ -158,8 +179,17 @@ export class Orchestrator { * We only schedule inputs in the constructor's scheduler. */ private async processInputTask(handler: IOHandler): Promise { + if (!handler.execute) { + this.logger.error( + "Orchestrator.processInputTask", + "Handler has no execute method", + { handler } + ); + return; + } try { - const result = await handler.handler(); + // it's undefined because this might be fetching data from an api or something + const result = await handler.execute(undefined); if (!result) return; if (Array.isArray(result)) { @@ -221,7 +251,7 @@ export class Orchestrator { */ public async dispatchToAction(name: string, data: T): Promise { const handler = this.ioHandlers.get(name); - if (!handler) { + if (!handler || !handler.execute) { throw new Error(`No IOHandler registered with name: ${name}`); } if (handler.role !== "action") { @@ -232,7 +262,7 @@ export class Orchestrator { data, }); try { - const result = await handler.handler(data); + const result = await handler.execute(data); return result; } catch (error) { this.logger.error( @@ -504,12 +534,14 @@ export class Orchestrator { ): Promise { const handler = this.ioHandlers.get(name); if (!handler) throw new Error(`No IOHandler: ${name}`); + if (!handler.execute) + throw new Error(`Handler "${name}" has no execute method`); if (handler.role !== "input") { throw new Error(`Handler "${name}" is not role=input`); } try { - const result = await handler.handler(data); + const result = await handler.execute(data); if (result) { return await this.runAutonomousFlow( diff --git a/packages/core/src/core/processors/message-processor.ts b/packages/core/src/core/processors/message-processor.ts index 3421fd51..3bb666d8 100644 --- a/packages/core/src/core/processors/message-processor.ts +++ b/packages/core/src/core/processors/message-processor.ts @@ -1,6 +1,12 @@ import { LLMClient } from "../llm-client"; -import type { Character, ProcessedResult, SuggestedOutput } from "../types"; +import type { + ActionIOHandler, + Character, + OutputIOHandler, + ProcessedResult, + SuggestedOutput, +} from "../types"; import { LogLevel } from "../types"; import { getTimeContext, validateLLMResponseSchema } from "../utils"; @@ -44,8 +50,8 @@ export class MessageProcessor extends BaseProcessor { content: any, otherContext: string, ioContext?: { - availableOutputs: IOHandler[]; - availableActions: IOHandler[]; + availableOutputs: OutputIOHandler[]; + availableActions: ActionIOHandler[]; } ): Promise { this.logger.debug("Processor.process", "Processing content", { @@ -57,13 +63,13 @@ export class MessageProcessor extends BaseProcessor { const outputsSchemaPart = ioContext?.availableOutputs .map((handler) => { - return `${handler.name}: ${JSON.stringify(zodToJsonSchema(handler.schema, handler.name))}`; + return `${handler.name}: ${JSON.stringify(zodToJsonSchema(handler.outputSchema!, handler.name))}`; }) .join("\n"); const actionsSchemaPart = ioContext?.availableActions .map((handler) => { - return `${handler.name}: ${JSON.stringify(zodToJsonSchema(handler.schema, handler.name))}`; + return `${handler.name}: ${JSON.stringify(zodToJsonSchema(handler.outputSchema!, handler.name))}`; }) .join("\n"); @@ -97,7 +103,14 @@ export class MessageProcessor extends BaseProcessor { 2. Only make tasks if you have been told, based off what you think is possible. + + 1. Should you reply to the message? + 2. You should only reply if you have been mentioned in the message or you think you can help deeply. + 3. You should never respond to yourself. + + + # Speak in the following voice: ${JSON.stringify(this.character.voice)} diff --git a/packages/core/src/core/processors/research-processor.ts b/packages/core/src/core/processors/research-processor.ts index 063c40f0..5adb9f04 100644 --- a/packages/core/src/core/processors/research-processor.ts +++ b/packages/core/src/core/processors/research-processor.ts @@ -1,5 +1,5 @@ import { LLMClient } from "../llm-client"; -import type { Character } from "../types"; +import type { ActionIOHandler, Character, OutputIOHandler } from "../types"; import { LogLevel } from "../types"; import { getTimeContext, validateLLMResponseSchema } from "../utils"; import { z } from "zod"; @@ -199,13 +199,16 @@ export class ResearchQuantProcessor extends BaseProcessor { } } - private buildHandlerSchemaPart(handlers?: IOHandler[]): string { + private buildHandlerSchemaPart( + handlers?: OutputIOHandler[] | ActionIOHandler[] + ): string { if (!handlers || handlers.length === 0) return "None"; return handlers + .filter((handler) => handler.outputSchema) .map( (handler) => `${handler.name}: ${JSON.stringify( - zodToJsonSchema(handler.schema, handler.name), + zodToJsonSchema(handler.outputSchema!, handler.name), null, 2 )}` @@ -216,8 +219,8 @@ export class ResearchQuantProcessor extends BaseProcessor { private async combineChunkResults( results: any[], ioContext?: { - availableOutputs: IOHandler[]; - availableActions: IOHandler[]; + availableOutputs: OutputIOHandler[]; + availableActions: ActionIOHandler[]; } ): Promise { const prompt = ` @@ -320,8 +323,8 @@ export class ResearchQuantProcessor extends BaseProcessor { content: any, otherContext: string, ioContext?: { - availableOutputs: IOHandler[]; - availableActions: IOHandler[]; + availableOutputs: OutputIOHandler[]; + availableActions: ActionIOHandler[]; } ): Promise { const contentStr = diff --git a/packages/core/src/core/types/index.ts b/packages/core/src/core/types/index.ts index 4c6757c5..cd58d329 100644 --- a/packages/core/src/core/types/index.ts +++ b/packages/core/src/core/types/index.ts @@ -509,18 +509,103 @@ export enum HandlerRole { } /** - * A single interface for all Inputs, Outputs. + * A single interface for all Input, Output and Action handlers in the system. + * This provides a unified way to handle different types of I/O operations. + * + * @example + * ```ts + * // Register a chat input handler + * orchestrator.registerIOHandler({ + * name: "user_chat", + * role: HandlerRole.INPUT, + * execute: async (message) => { + * return message; + * } + * }); + * ``` */ -export interface IOHandler { - /** Unique name for this handler */ + +/** + * Base interface for all IO handlers in the system + */ +interface BaseIOHandler { + /** Unique name identifier for this handler */ name: string; +} - /** "input" | "output" | (optionally "action") if you want more roles */ - role: HandlerRole; +/** + * Handler for processing input data streams + * @example + * ```ts + * // Register an input handler for chat messages + * const handler: InputIOHandler = { + * name: "chat_input", + * role: HandlerRole.INPUT, + * execute: async (message) => { + * return processMessage(message); + * } + * }; + * ``` + */ +export interface InputIOHandler extends BaseIOHandler { + /** Identifies this as an input handler */ + role: HandlerRole.INPUT; + /** Function to process input data */ + execute?: (data: any) => Promise; + /** Sets up a subscription to receive streaming data */ + subscribe?: (onData: (data: any) => void) => () => void; +} - /** The schema for the input handler */ - schema: z.ZodType; +/** + * Handler for sending output data + * @example + * ```ts + * // Register an output handler for chat responses + * const handler: OutputIOHandler = { + * name: "chat_output", + * role: HandlerRole.OUTPUT, + * outputSchema: z.object({ + * message: z.string() + * }), + * execute: async (response) => { + * await sendResponse(response); + * } + * }; + * ``` + */ +export interface OutputIOHandler extends BaseIOHandler { + /** Identifies this as an output handler */ + role: HandlerRole.OUTPUT; + /** Required schema to validate output data */ + outputSchema: z.ZodType; + /** Function to process and send output */ + execute?: (data: any) => Promise; + /** Sets up a subscription to handle output streams */ + subscribe?: (onData: (data: any) => void) => () => void; +} - /** The main function. For inputs, no payload is typically passed. For outputs, pass the data. */ - handler: (payload?: unknown) => Promise; +/** + * Handler for performing actions/side effects + * @example + * ```ts + * // Register an action handler for database operations + * const handler: ActionIOHandler = { + * name: "db_action", + * role: HandlerRole.ACTION, + * execute: async (query) => { + * return await db.execute(query); + * } + * }; + * ``` + */ +export interface ActionIOHandler extends BaseIOHandler { + /** Identifies this as an action handler */ + role: HandlerRole.ACTION; + /** Optional schema to validate action parameters */ + outputSchema?: z.ZodType; + /** Function to execute the action */ + execute?: (data: any) => Promise; } + +/** Union type of all possible IO handler types */ +export type IOHandler = InputIOHandler | OutputIOHandler | ActionIOHandler;