diff --git a/docs/docs/pages/index.mdx b/docs/docs/pages/index.mdx
index 1eeb7d49..e9c29f2f 100644
--- a/docs/docs/pages/index.mdx
+++ b/docs/docs/pages/index.mdx
@@ -1,6 +1,5 @@
-
[](https://opensource.org/licenses/MIT)
Daydreams is a powerful generative agent library for executing anything onchain. It is chain agnostic and can be used to perform tasks - by simply injecting context with api documentation. Whether you're building on Base, Solana, Ethereum, Starknet, or other chains, Daydreams has you covered.
@@ -16,7 +15,6 @@ Think of it as an opinionated framework for building next generation agents.
- 🎯 **Goal-Oriented** - Hierarchical goal planning and execution
- 🤝 **Multi-Agent Ready** - Built for swarm intelligence and agent collaboration
-
## Generative Framework
Unlike traditional frameworks that require explicit integrations, Daydreams uses a generative approach where the agent dynamically creates and executes actions through Chain of Thought processing. This means:
@@ -51,6 +49,7 @@ type IOHandler = {
```
This IO-centric design means you can:
+
- Easily plug in new capabilities through handlers
- Process multiple operations in parallel
- Chain handlers together for complex workflows
@@ -86,12 +85,12 @@ bun add @daydreamsai/core
Here's a simple example to get you started:
```typescript
-import { Orchestrator, MessageProcessor, LLMClient } from '@daydreamsai/core';
+import { Orchestrator, MessageProcessor, LLMClient } from "@daydreamsai/core";
// Initialize the LLM client
const llm = new LLMClient({
model: "openrouter:deepseek/deepseek-r1-distill-llama-70b",
- temperature: 0.3
+ temperature: 0.3,
});
// Create a message processor
@@ -120,12 +119,12 @@ orchestrator.registerIOHandler({
name: "simple_action",
role: "action",
schema: z.object({
- message: z.string()
+ message: z.string(),
}),
handler: async (payload) => {
console.log(`Executing action: ${payload.message}`);
return { success: true };
- }
+ },
});
```
@@ -148,6 +147,7 @@ Building blocks that process data and produce outputs:
### Chain of Thought
The reasoning engine that:
+
- Plans strategies for achieving goals
- Breaks down complex tasks
- Executes actions
@@ -162,20 +162,22 @@ Stores and retrieves experiences using vector databases for contextual decision
Here's a glimpse of what you can build with Daydreams - a Twitter bot that autonomously interacts and generates content:
```typescript
-import {
- Orchestrator,
- TwitterClient,
+import {
+ Orchestrator,
+ TwitterClient,
RoomManager,
ChromaVectorDB,
MessageProcessor,
LLMClient,
- Consciousness
-} from '@daydreamsai/core';
+ Consciousness,
+ HandlerRole,
+ LogLevel,
+} from "@daydreamsai/core";
// 1. Initialize vector database for memory storage
const vectorDb = new ChromaVectorDB("twitter_agent", {
chromaUrl: "http://localhost:8000",
- logLevel: "debug"
+ logLevel: LogLevel.DEBUG,
});
// 2. Set up room management for conversation contexts
@@ -184,93 +186,127 @@ const roomManager = new RoomManager(vectorDb);
// 3. Initialize the LLM client for processing
const llm = new LLMClient({
model: "openrouter:deepseek/deepseek-r1-distill-llama-70b",
- temperature: 0.3
+ temperature: 0.3,
});
// 4. Create message processor with default personality
-const processor = new MessageProcessor(llm);
+const processor = new MessageProcessor(llm, defaultCharacter, LogLevel.DEBUG);
-// 5. Initialize the orchestrator to manage everything
+// 5. Initialize MongoDB for scheduled tasks
+const scheduledTaskDb = new MongoDb(
+ "mongodb://localhost:27017",
+ "myApp",
+ "scheduled_tasks"
+);
+
+// 6. Initialize the orchestrator to manage everything
const orchestrator = new Orchestrator(
roomManager,
vectorDb,
[processor],
scheduledTaskDb,
{
- level: "debug",
+ level: LogLevel.DEBUG,
enableColors: true,
- enableTimestamp: true
+ enableTimestamp: true,
}
);
-// 6. Set up Twitter client
-const twitter = new TwitterClient({
- username: process.env.TWITTER_USERNAME,
- password: process.env.TWITTER_PASSWORD,
- email: process.env.TWITTER_EMAIL
-});
+// 7. Set up Twitter client
+const twitter = new TwitterClient(
+ {
+ username: process.env.TWITTER_USERNAME,
+ password: process.env.TWITTER_PASSWORD,
+ email: process.env.TWITTER_EMAIL,
+ },
+ LogLevel.DEBUG
+);
-// 7. Initialize autonomous thought generation
+// 8. Initialize autonomous thought generation
const consciousness = new Consciousness(llm, roomManager, {
intervalMs: 300000, // Think every 5 minutes
- minConfidence: 0.7
+ minConfidence: 0.7,
+ logLevel: LogLevel.DEBUG,
});
-// 8. Register handler to monitor Twitter mentions
+// 9. Register handler to monitor Twitter mentions
orchestrator.registerIOHandler({
name: "twitter_mentions",
- role: "input",
- handler: async () => {
+ role: HandlerRole.INPUT,
+ execute: async () => {
const mentions = await twitter.createMentionsInput(60000).handler();
if (!mentions?.length) return null;
-
- return mentions.map(mention => ({
+
+ return mentions.map((mention) => ({
type: "tweet",
room: mention.metadata.conversationId,
+ contentId: mention.metadata.tweetId,
user: mention.metadata.username,
content: mention.content,
- metadata: mention
+ metadata: mention,
}));
},
- schema: z.object({
- type: z.string(),
- room: z.string(),
- user: z.string(),
- content: z.string(),
- metadata: z.record(z.any())
- })
});
-// 9. Register handler for posting tweets
+// 10. Register handler for posting tweets
orchestrator.registerIOHandler({
- name: "twitter_post",
- role: "output",
- handler: async (data) => {
- const tweetData = data as { content: string };
+ name: "twitter_thought",
+ role: HandlerRole.OUTPUT,
+ execute: async (data) => {
+ const thoughtData = data as { content: string };
return twitter.createTweetOutput().handler({
- content: tweetData.content
+ content: thoughtData.content,
});
},
- schema: z.object({
- content: z.string()
- .regex(/^[\x20-\x7E]*$/, "ASCII characters only")
- .describe("Tweet content, max 280 characters")
- })
+ outputSchema: z
+ .object({
+ content: z
+ .string()
+ .regex(/^[\x20-\x7E]*$/, "No emojis or non-ASCII characters allowed"),
+ })
+ .describe("Content of the tweet, max 280 characters"),
+});
+
+// 11. Register handler for Twitter replies
+orchestrator.registerIOHandler({
+ name: "twitter_reply",
+ role: HandlerRole.OUTPUT,
+ execute: async (data) => {
+ const tweetData = data as { content: string; inReplyTo: string };
+ return twitter.createTweetOutput().handler(tweetData);
+ },
+ outputSchema: z
+ .object({
+ content: z.string(),
+ inReplyTo: z
+ .string()
+ .optional()
+ .describe("The tweet ID to reply to, if any"),
+ })
+ .describe("Use this for replying to tweets you've been mentioned in"),
});
-// 10. Schedule recurring tasks
+// 12. Schedule recurring tasks
await orchestrator.scheduleTaskInDb(
"twitter_bot",
- "twitter_mentions",
- {},
- 60000 // Check mentions every minute
+ "twitter_mentions",
+ {},
+ 6000 // Check mentions every minute
);
-// 11. Start autonomous thought generation
+await orchestrator.scheduleTaskInDb(
+ "twitter_bot",
+ "consciousness_thoughts",
+ {},
+ 30000 // Generate thoughts every 30 seconds
+);
+
+// Start autonomous thought generation
consciousness.start();
```
This example demonstrates:
+
- Setting up a vector database for memory
- Configuring room management for conversations
- Initializing the LLM and message processor
@@ -278,4 +314,3 @@ This example demonstrates:
- Registering handlers for mentions and posts
- Implementing autonomous thought generation
- Scheduling recurring tasks
-
diff --git a/examples/example-api.ts b/examples/example-api.ts
index 6d8f858e..9b3b8621 100644
--- a/examples/example-api.ts
+++ b/examples/example-api.ts
@@ -93,10 +93,10 @@ async function main() {
orchestrator.registerIOHandler({
name: "fetchGithubIssues",
role: HandlerRole.ACTION,
- schema: z.object({
+ outputSchema: z.object({
repo: z.string(),
}),
- handler: async (payload) => {
+ execute: async (payload) => {
// 1. Fetch some info from GitHub
// 2. Return the fetched data so it can be processed as "new input"
// to the next step in the chain.
@@ -114,7 +114,7 @@ async function main() {
name: "universalApiCall",
role: HandlerRole.ACTION,
// The agent must fill out these fields to make a valid request
- schema: z
+ outputSchema: z
.object({
method: z.enum(["GET", "POST", "PUT", "PATCH", "DELETE"]),
url: z.string().url(),
@@ -124,7 +124,7 @@ async function main() {
.describe(
"Use this to fetch data from an API. It should include the method, url, headers, and body."
),
- handler: async (payload) => {
+ execute: async (payload) => {
const { method, url, headers, body } = payload as {
method: string;
url: string;
@@ -163,13 +163,13 @@ async function main() {
name: "user_chat",
role: HandlerRole.INPUT,
// This schema describes what a user message looks like
- schema: z.object({
+ outputSchema: z.object({
content: z.string(),
userId: z.string().optional(),
}),
// For "on-demand" input handlers, the `handler()` can be a no-op.
// We'll call it manually with data, so we don't need an interval.
- handler: async (payload) => {
+ execute: async (payload) => {
// We simply return the payload so the Orchestrator can process it
return payload;
},
@@ -178,11 +178,11 @@ async function main() {
orchestrator.registerIOHandler({
name: "ui_chat_reply",
role: HandlerRole.OUTPUT,
- schema: z.object({
+ outputSchema: z.object({
userId: z.string().optional(),
message: z.string(),
}),
- handler: async (payload) => {
+ execute: async (payload) => {
const { userId, message } = payload as {
userId?: string;
message: string;
diff --git a/examples/example-basic.ts b/examples/example-basic.ts
index 871083d6..9731044d 100644
--- a/examples/example-basic.ts
+++ b/examples/example-basic.ts
@@ -86,11 +86,11 @@ async function main() {
dreams.registerOutput({
name: "EXECUTE_TRANSACTION",
role: HandlerRole.OUTPUT,
- handler: async (data: any) => {
+ execute: async (data: any) => {
const result = await starknetChain.write(data.payload);
return `Transaction: ${JSON.stringify(result, null, 2)}`;
},
- schema: z
+ outputSchema: z
.object({
contractAddress: z
.string()
@@ -112,7 +112,7 @@ async function main() {
dreams.registerOutput({
name: "GRAPHQL_FETCH",
role: HandlerRole.OUTPUT,
- handler: async (data: any) => {
+ execute: async (data: any) => {
const { query, variables } = data.payload ?? {};
const result = await fetchGraphQL(
env.GRAPHQL_URL + "/graphql",
@@ -125,7 +125,7 @@ async function main() {
].join("\n\n");
return `GraphQL data fetched successfully: ${resultStr}`;
},
- schema: z
+ outputSchema: z
.object({
query: z.string()
.describe(`"query GetRealmInfo { eternumRealmModels(where: { realm_id: 42 }) { edges { node { ... on eternum_Realm {
diff --git a/examples/example-discord.ts b/examples/example-discord.ts
index e167730d..fbbb6100 100644
--- a/examples/example-discord.ts
+++ b/examples/example-discord.ts
@@ -1,25 +1,26 @@
/**
- * Example demonstrating a Discord bot using the Daydreams package.
- * This bot can:
- * - Reply to DMs
+ * Example demonstrating a Discord bot using the Daydreams package,
+ * updated to use a streaming IOHandler so we can handle real-time
+ * Discord messages without manual dispatch calls.
*/
import { Orchestrator } from "../packages/core/src/core/orchestrator";
-import { HandlerRole } from "../packages/core/src/core/types";
+import { HandlerRole, LogLevel } from "../packages/core/src/core/types";
import { DiscordClient } from "../packages/core/src/core/io/discord";
import { RoomManager } from "../packages/core/src/core/room-manager";
import { ChromaVectorDB } from "../packages/core/src/core/vector-db";
import { MessageProcessor } from "../packages/core/src/core/processors/message-processor";
import { LLMClient } from "../packages/core/src/core/llm-client";
import { env } from "../packages/core/src/core/env";
-import { LogLevel } from "../packages/core/src/core/types";
import chalk from "chalk";
import { defaultCharacter } from "../packages/core/src/core/character";
import { z } from "zod";
import readline from "readline";
import { MongoDb } from "../packages/core/src/core/mongo-db";
+import { Message } from "discord.js";
async function main() {
+ // Set logging level as you see fit
const loglevel = LogLevel.DEBUG;
// Initialize core dependencies
@@ -28,35 +29,36 @@ async function main() {
logLevel: loglevel,
});
- await vectorDb.purge(); // Clear previous session data
+ // Optional: Purge previous session data if you want a fresh start
+ await vectorDb.purge();
const roomManager = new RoomManager(vectorDb);
const llmClient = new LLMClient({
- model: "anthropic/claude-3-5-sonnet-latest", // Using a known supported model
+ model: "anthropic/claude-3-5-sonnet-latest", // Example model
temperature: 0.3,
});
- // Initialize processor with default character personality
+ // Use a sample message processor with a default "character" config
const processor = new MessageProcessor(
llmClient,
defaultCharacter,
loglevel
);
- // Initialize core system
+ // Connect to MongoDB (for scheduled tasks, if you use them)
const scheduledTaskDb = new MongoDb(
"mongodb://localhost:27017",
"myApp",
"scheduled_tasks"
);
-
await scheduledTaskDb.connect();
console.log(chalk.green("✅ Scheduled task database connected"));
+ // Clear any existing tasks if you like
await scheduledTaskDb.deleteAll();
- // Initialize core system
+ // Create the Orchestrator
const core = new Orchestrator(
roomManager,
vectorDb,
@@ -69,94 +71,56 @@ async function main() {
}
);
- function messageCreate(bot: any, message: any) {
- const isMention =
- message.mentions.users.findKey(
- (user: any) => user.id === bot.id
- ) !== undefined;
- if (isMention) {
- core.dispatchToInput(
- "discord_mention",
- {
- content: message.content,
- sentBy: message.author.id,
- channelId: message.channelId,
- },
- message.author.id
- );
- }
- }
-
- // Set up Discord client with credentials
+ // Initialize the Discord client
const discord = new DiscordClient(
{
discord_token: env.DISCORD_TOKEN,
+ discord_bot_name: "DeepLoaf",
},
- loglevel,
- {
- messageCreate,
- }
+ loglevel
);
- // Register input handler for Discord mentions
+ // 1) REGISTER A STREAMING INPUT
+ // This handler sets up a Discord listener. On mention, it
+ // pipes data into Orchestrator via "onData".
core.registerIOHandler({
- name: "discord_mention",
+ name: "discord_stream",
role: HandlerRole.INPUT,
- handler: async (data: unknown) => {
- const message = data as { content: string; sentBy: string };
- console.log(chalk.blue("🔍 Received Discord mention..."));
-
- return [message];
- },
- schema: z.object({
- sentBy: z.string(),
- content: z.string(),
- channelId: z.string(),
- }),
- });
-
- // Register output handler for Discord replies
- core.registerIOHandler({
- name: "discord_reply",
- role: HandlerRole.OUTPUT,
- handler: async (data: unknown) => {
- const messageData = data as {
- content: string;
- channelId: string;
+ subscribe: (onData) => {
+ discord.startMessageStream((incomingMessage: Message) => {
+ onData(incomingMessage);
+ });
+ return () => {
+ discord.stopMessageStream();
};
- return discord.createMessageOutput().handler(messageData);
},
- schema: z
- .object({
- content: z.string(),
- channelId: z
- .string()
- .optional()
- .describe("The channel ID of the message"),
- })
- .describe(
- "If you have been tagged or mentioned in Discord, use this. This is for replying to a message."
- ),
});
- // Set up readline interface
+ // 2) REGISTER AN OUTPUT HANDLER
+ // This allows your Processor to suggest messages that are posted back to Discord
+
+ core.registerIOHandler(discord.createMessageOutput());
+
+ // (Optional) Set up a console readline for manual input, etc.
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
- // Start the prompt loop
console.log(chalk.cyan("🤖 Bot is now running and monitoring Discord..."));
- console.log(chalk.cyan("You can type messages in the console."));
- console.log(chalk.cyan('Type "exit" to quit'));
+ console.log(
+ chalk.cyan("You can also type messages in this console for debugging.")
+ );
+ console.log(chalk.cyan('Type "exit" to quit.'));
- // Handle graceful shutdown
+ // Handle graceful shutdown (Ctrl-C, etc.)
process.on("SIGINT", async () => {
console.log(chalk.yellow("\n\nShutting down..."));
- // Clean up resources
- discord.destroy();
- core.removeIOHandler("discord_mention");
+ // If we want to stop the streaming IO handler:
+ core.removeIOHandler("discord_stream");
+
+ // Also remove any other handlers or do cleanup
core.removeIOHandler("discord_reply");
rl.close();
diff --git a/examples/example-goal.ts b/examples/example-goal.ts
index f1c5ae92..5c5a80ad 100644
--- a/examples/example-goal.ts
+++ b/examples/example-goal.ts
@@ -104,7 +104,7 @@ async function main() {
dreams.registerOutput({
name: "EXECUTE_TRANSACTION",
role: HandlerRole.OUTPUT,
- handler: async (data: any) => {
+ execute: async (data: any) => {
const result = await starknetChain.write(data.payload);
return `Transaction executed successfully: ${JSON.stringify(
result,
@@ -112,7 +112,7 @@ async function main() {
2
)}`;
},
- schema: z
+ outputSchema: z
.object({
contractAddress: z
.string()
@@ -134,7 +134,7 @@ async function main() {
dreams.registerOutput({
name: "GRAPHQL_FETCH",
role: HandlerRole.OUTPUT,
- handler: async (data: any) => {
+ execute: async (data: any) => {
const { query, variables } = data.payload ?? {};
const result = await fetchGraphQL(
env.GRAPHQL_URL + "/graphql",
@@ -147,7 +147,7 @@ async function main() {
].join("\n\n");
return `GraphQL data fetched successfully: ${resultStr}`;
},
- schema: z
+ outputSchema: z
.object({
query: z.string()
.describe(`"query GetRealmInfo { eternumRealmModels(where: { realm_id: 42 }) { edges { node { ... on eternum_Realm {
diff --git a/examples/example-server.ts b/examples/example-server.ts
index cbd2d228..13732ad0 100644
--- a/examples/example-server.ts
+++ b/examples/example-server.ts
@@ -75,11 +75,11 @@ async function createDaydreamsAgent() {
orchestrator.registerIOHandler({
name: "user_chat",
role: HandlerRole.INPUT,
- schema: z.object({
+ outputSchema: z.object({
content: z.string(),
userId: z.string().optional(),
}),
- handler: async (payload) => {
+ execute: async (payload) => {
return payload;
},
});
@@ -87,11 +87,11 @@ async function createDaydreamsAgent() {
orchestrator.registerIOHandler({
name: "chat_reply",
role: HandlerRole.OUTPUT,
- schema: z.object({
+ outputSchema: z.object({
userId: z.string().optional(),
message: z.string(),
}),
- handler: async (payload) => {
+ execute: async (payload) => {
const { userId, message } = payload as {
userId?: string;
message: string;
diff --git a/examples/example-twitter.ts b/examples/example-twitter.ts
index 012f41f3..17759d6b 100644
--- a/examples/example-twitter.ts
+++ b/examples/example-twitter.ts
@@ -92,7 +92,7 @@ async function main() {
core.registerIOHandler({
name: "twitter_mentions",
role: HandlerRole.INPUT,
- handler: async () => {
+ execute: async () => {
console.log(chalk.blue("🔍 Checking Twitter mentions..."));
// Create a static mentions input handler
const mentionsInput = twitter.createMentionsInput(60000);
@@ -112,20 +112,13 @@ async function main() {
metadata: mention,
}));
},
- schema: z.object({
- type: z.string(),
- room: z.string(),
- user: z.string(),
- content: z.string(),
- metadata: z.record(z.any()),
- }),
});
// Register input handler for autonomous thoughts
core.registerIOHandler({
name: "consciousness_thoughts",
role: HandlerRole.INPUT,
- handler: async () => {
+ execute: async () => {
console.log(chalk.blue("🧠 Generating thoughts..."));
const thought = await consciousness.start();
@@ -136,25 +129,20 @@ async function main() {
return thought;
},
- schema: z.object({
- type: z.string(),
- content: z.string(),
- metadata: z.record(z.any()),
- }),
});
// Register output handler for posting thoughts to Twitter
core.registerIOHandler({
name: "twitter_thought",
role: HandlerRole.OUTPUT,
- handler: async (data: unknown) => {
+ execute: async (data: unknown) => {
const thoughtData = data as { content: string };
return twitter.createTweetOutput().handler({
content: thoughtData.content,
});
},
- schema: z
+ outputSchema: z
.object({
content: z
.string()
@@ -176,12 +164,12 @@ async function main() {
core.registerIOHandler({
name: "twitter_reply",
role: HandlerRole.OUTPUT,
- handler: async (data: unknown) => {
+ execute: async (data: unknown) => {
const tweetData = data as { content: string; inReplyTo: string };
return twitter.createTweetOutput().handler(tweetData);
},
- schema: z
+ outputSchema: z
.object({
content: z.string(),
inReplyTo: z
diff --git a/packages/core/src/core/chain-of-thought.ts b/packages/core/src/core/chain-of-thought.ts
index 79557a09..7a81fa21 100644
--- a/packages/core/src/core/chain-of-thought.ts
+++ b/packages/core/src/core/chain-of-thought.ts
@@ -6,6 +6,7 @@ import type {
RefinedGoal,
VectorDB,
IOHandler,
+ OutputIOHandler,
} from "./types";
import { Logger } from "./logger";
import { EventEmitter } from "events";
@@ -1082,13 +1083,16 @@ export class ChainOfThought extends EventEmitter {
try {
// Get the output handler and schema
- const output = this.outputs.get(action.type);
- if (!output) {
+ const output = this.outputs.get(action.type) as OutputIOHandler;
+ if (!output || !output.execute || !output.outputSchema) {
return `No handler registered for action type "${action.type}" try again`;
}
// Convert Zod schema to JSON schema
- const jsonSchema = zodToJsonSchema(output.schema, action.type);
+ const jsonSchema = zodToJsonSchema(
+ output.outputSchema,
+ action.type
+ );
const validate = ajv.compile(jsonSchema);
// Validate the payload against the schema
@@ -1096,7 +1100,7 @@ export class ChainOfThought extends EventEmitter {
return "Invalid action payload - schema validation failed";
}
- const result = await output.handler(action);
+ const result = await output.execute(action);
// Format the result for better readability
const formattedResult =
@@ -1143,7 +1147,17 @@ export class ChainOfThought extends EventEmitter {
const lastSteps = JSON.stringify(this.stepManager.getSteps());
- const availableOutputs = Array.from(this.outputs.entries());
+ const availableOutputs = Array.from(this.outputs.entries()) as [
+ string,
+ OutputIOHandler,
+ ][];
+
+ const availableOutputsSchema = availableOutputs
+ .filter(([_, output]) => output.outputSchema)
+ .map(([name, output]) => {
+ return `${name}: ${JSON.stringify(zodToJsonSchema(output.outputSchema, name), null, 2)}`;
+ })
+ .join("\n\n");
const prompt = `
@@ -1211,13 +1225,7 @@ Each action must include:
- **payload**: The action data structured as per the available actions.
- **context**: A contextual description or metadata related to the action's execution. This can include statuses, results, or any pertinent information that may influence future actions.
-${availableOutputs
- .map(
- ([name, output]) => `${name}:
- ${JSON.stringify(zodToJsonSchema(output.schema, name), null, 2)}
- `
- )
- .join("\n\n")}
+${availableOutputsSchema}
diff --git a/packages/core/src/core/io/discord.ts b/packages/core/src/core/io/discord.ts
index cd76e71f..c3be1be4 100644
--- a/packages/core/src/core/io/discord.ts
+++ b/packages/core/src/core/io/discord.ts
@@ -1,11 +1,18 @@
-import { Client, Events, GatewayIntentBits, Partials, User } from "discord.js";
-import type { JSONSchemaType } from "ajv";
+import {
+ Client,
+ Events,
+ GatewayIntentBits,
+ Message,
+ Partials,
+} from "discord.js";
import { Logger } from "../../core/logger";
-import { LogLevel } from "../types";
+import { HandlerRole, LogLevel, type IOHandler } from "../types";
import { env } from "../../core/env";
+import { z } from "zod";
export interface DiscordCredentials {
discord_token: string;
+ discord_bot_name: string;
}
export interface MessageData {
@@ -15,31 +22,23 @@ export interface MessageData {
sendBy?: string;
}
-export interface EventCallbacks {
- messageCreate?: (bot: any, message: any) => void;
-}
-
-// Schema for message output validation
-export const messageSchema: JSONSchemaType = {
- type: "object",
- properties: {
- content: { type: "string" },
- channelId: { type: "string" },
- sendBy: { type: "string", nullable: true },
- conversationId: { type: "string", nullable: true },
- },
- required: ["content", "channelId"],
- additionalProperties: false,
-};
-
+export const messageSchema = z.object({
+ content: z.string().describe("The content of the message"),
+ channelId: z.string().describe("The channel ID where the message is sent"),
+ sendBy: z.string().optional().describe("The user ID of the sender"),
+ conversationId: z
+ .string()
+ .optional()
+ .describe("The conversation ID (if applicable)"),
+});
export class DiscordClient {
private client: Client;
private logger: Logger;
+ private messageListener?: (...args: any[]) => void;
constructor(
private credentials: DiscordCredentials,
- logLevel: LogLevel = LogLevel.INFO,
- eventCallbacks: EventCallbacks
+ logLevel: LogLevel = LogLevel.INFO
) {
this.client = new Client({
intents: [
@@ -51,72 +50,128 @@ export class DiscordClient {
GatewayIntentBits.DirectMessageTyping,
GatewayIntentBits.DirectMessageReactions,
],
- partials: [Partials.Channel], // Enable DM
+ partials: [Partials.Channel], // For DM support
});
- this.credentials = credentials;
+
this.logger = new Logger({
level: logLevel,
enableColors: true,
enableTimestamp: true,
});
- if (eventCallbacks.messageCreate) {
- this.client.on(Events.MessageCreate, (message) => {
- if (eventCallbacks.messageCreate) {
- eventCallbacks.messageCreate(this.client.user, message);
- }
- });
- }
-
- this.client.on(Events.ClientReady, async () => {
+ // Handle "ready" event
+ this.client.on(Events.ClientReady, () => {
this.logger.info("DiscordClient", "Initialized successfully");
});
+ // Log in to Discord
this.client.login(this.credentials.discord_token).catch((error) => {
this.logger.error("DiscordClient", "Failed to login", { error });
console.error("Login error:", error);
});
}
+ /**
+ * Optionally start listening to Discord messages.
+ * The onData callback typically feeds data into Orchestrator or similar.
+ */
+ public startMessageStream(onData: (data: any) => void) {
+ this.logger.info("DiscordClient", "Starting message stream...");
+
+ // If you want to capture the listener reference for removal:
+ this.messageListener = (message: Message) => {
+ // Here, you could decide what "data" looks like
+ // E.g., check if the bot was mentioned, etc.
+
+ if (
+ message.author.displayName == this.credentials.discord_bot_name
+ ) {
+ console.log(
+ `Skipping message from ${this.credentials.discord_bot_name}`
+ );
+ return;
+ }
+
+ onData({
+ content: message.content,
+ channelId: message.channelId,
+ sentBy: message.author?.id,
+ });
+ };
+
+ this.client.on(Events.MessageCreate, this.messageListener);
+ }
+
+ /**
+ * Optionally remove the message listener if you want to stop the stream.
+ */
+ public stopMessageStream() {
+ if (this.messageListener) {
+ this.client.removeListener(
+ Events.MessageCreate,
+ this.messageListener
+ );
+ this.logger.info("DiscordClient", "Message stream stopped");
+ }
+ }
+
+ /**
+ * Gracefully destroy the Discord connection
+ */
public destroy() {
+ this.stopMessageStream();
this.client.destroy();
+ this.logger.info("DiscordClient", "Client destroyed");
}
/**
- * Create an output for sending messages
+ * Create an output for sending messages (useful for Orchestrator OUTPUT handlers).
*/
- public createMessageOutput() {
+ public createMessageOutput(): IOHandler {
return {
+ role: HandlerRole.OUTPUT,
name: "discord_message",
- handler: async (data: MessageData) => {
- return await this.sendMessage(data);
- },
- response: {
- success: "boolean",
- channelId: "string",
+ execute: async (data: T) => {
+ return await this.sendMessage(data as MessageData);
},
- schema: messageSchema,
+ outputSchema: messageSchema,
};
}
- private async sendMessage(data: MessageData) {
+ private async sendMessage(data: MessageData): Promise<{
+ success: boolean;
+ messageId?: string;
+ content?: string;
+ error?: string;
+ }> {
try {
- this.logger.info(
- "DiscordClient.sendMessage",
- "Would send message",
- {
- data,
- }
- );
+ this.logger.info("DiscordClient.sendMessage", "Sending message", {
+ data,
+ });
if (env.DRY_RUN) {
+ this.logger.info(
+ "DiscordClient.sendMessage",
+ "Dry run enabled",
+ {
+ data,
+ }
+ );
return {
success: true,
- channelId: "DRY RUN CHANNEL ID",
+ messageId: "DRY_RUN",
+ content: "DRY_RUN",
+ error: "DRY_RUN",
+ };
+ }
+ if (!data?.channelId || !data?.content) {
+ return {
+ success: false,
+ error: "Channel ID and content are required",
};
}
- const channel = this.client.channels.cache.get(data.channelId);
+ const channel = this.client.channels.cache.get(data?.channelId);
if (!channel?.isTextBased()) {
const error = new Error(
`Invalid or unsupported channel: ${data.channelId}`
@@ -130,11 +185,15 @@ export class DiscordClient {
);
throw error;
}
+
+ // @ts-ignore - no idea why this is chucking error and i cannot be bothered to fix it
const sentMessage = await channel.send(data.content);
return {
success: true,
messageId: sentMessage.id,
+ content: data.content,
+ error: undefined,
};
} catch (error) {
this.logger.error(
@@ -144,17 +203,10 @@ export class DiscordClient {
error,
}
);
- throw error;
+ return {
+ success: false,
+ error: error instanceof Error ? error.message : "Unknown error",
+ };
}
}
}
-
-// Example usage:
-/*
-const discord = new DiscordClient({
- discord_token: "M..."
-});
-
-// Register output
-core.registerOutput(discord.createMessageOutput());
-*/
diff --git a/packages/core/src/core/orchestrator.ts b/packages/core/src/core/orchestrator.ts
index b6e5e034..1c52efa8 100644
--- a/packages/core/src/core/orchestrator.ts
+++ b/packages/core/src/core/orchestrator.ts
@@ -78,6 +78,7 @@ export class Orchestrator {
this.userId = userId;
}
+ private unsubscribers = new Map void>();
/**
* Primary method to register any IOHandler (input or output).
* - If it's an input with an interval, schedule it for recurring runs.
@@ -88,19 +89,31 @@ export class Orchestrator {
this.logger.warn(
"Orchestrator.registerIOHandler",
"Overwriting handler with same name",
- {
- name: handler.name,
- }
+ { name: handler.name }
);
}
+
this.ioHandlers.set(handler.name, handler);
+ if (handler.role === HandlerRole.INPUT && handler.subscribe) {
+ const unsubscribe = handler.subscribe(async (data) => {
+ this.logger.info(
+ "Orchestrator.registerIOHandler",
+ "Starting stream",
+ {
+ data,
+ }
+ );
+ // Whenever data arrives, pass it into runAutonomousFlow
+ await this.runAutonomousFlow(data, handler.name, this.userId);
+ });
+ this.unsubscribers.set(handler.name, unsubscribe);
+ }
+
this.logger.info(
"Orchestrator.registerIOHandler",
`Registered ${handler.role}`,
- {
- name: handler.name,
- }
+ { name: handler.name }
);
}
@@ -108,14 +121,17 @@ export class Orchestrator {
* Removes a handler (input or output) by name, stopping scheduling if needed.
*/
public removeIOHandler(name: string): void {
- if (this.ioHandlers.has(name)) {
- // If it was scheduled as an input, it will no longer be re-scheduled
- this.ioHandlers.delete(name);
- this.logger.info(
- "Orchestrator.removeIOHandler",
- `Removed IOHandler: ${name}`
- );
+ // If we have an unsubscribe function, call it
+ const unsub = this.unsubscribers.get(name);
+ if (unsub) {
+ unsub(); // e.g. remove event listeners, clear intervals, etc.
+ this.unsubscribers.delete(name);
}
+
+ // Remove the handler itself
+ this.ioHandlers.delete(name);
+
+ console.log(`Removed IOHandler: ${name}`);
}
/**
@@ -124,7 +140,7 @@ export class Orchestrator {
*/
public async dispatchToOutput(name: string, data: T): Promise {
const handler = this.ioHandlers.get(name);
- if (!handler) {
+ if (!handler || !handler.execute) {
throw new Error(`No IOHandler registered with name: ${name}`);
}
@@ -138,7 +154,12 @@ export class Orchestrator {
});
try {
- const result = await handler.handler(data);
+ const result = await handler.execute(data);
+
+ this.logger.info("Orchestrator.dispatchToOutput", "Output result", {
+ result,
+ });
+
return result;
} catch (error) {
this.logger.error(
@@ -158,8 +179,17 @@ export class Orchestrator {
* We only schedule inputs in the constructor's scheduler.
*/
private async processInputTask(handler: IOHandler): Promise {
+ if (!handler.execute) {
+ this.logger.error(
+ "Orchestrator.processInputTask",
+ "Handler has no execute method",
+ { handler }
+ );
+ return;
+ }
try {
- const result = await handler.handler();
+ // it's undefined because this might be fetching data from an api or something
+ const result = await handler.execute(undefined);
if (!result) return;
if (Array.isArray(result)) {
@@ -221,7 +251,7 @@ export class Orchestrator {
*/
public async dispatchToAction(name: string, data: T): Promise {
const handler = this.ioHandlers.get(name);
- if (!handler) {
+ if (!handler || !handler.execute) {
throw new Error(`No IOHandler registered with name: ${name}`);
}
if (handler.role !== "action") {
@@ -232,7 +262,7 @@ export class Orchestrator {
data,
});
try {
- const result = await handler.handler(data);
+ const result = await handler.execute(data);
return result;
} catch (error) {
this.logger.error(
@@ -504,12 +534,14 @@ export class Orchestrator {
): Promise {
const handler = this.ioHandlers.get(name);
if (!handler) throw new Error(`No IOHandler: ${name}`);
+ if (!handler.execute)
+ throw new Error(`Handler "${name}" has no execute method`);
if (handler.role !== "input") {
throw new Error(`Handler "${name}" is not role=input`);
}
try {
- const result = await handler.handler(data);
+ const result = await handler.execute(data);
if (result) {
return await this.runAutonomousFlow(
diff --git a/packages/core/src/core/processors/message-processor.ts b/packages/core/src/core/processors/message-processor.ts
index 3421fd51..3bb666d8 100644
--- a/packages/core/src/core/processors/message-processor.ts
+++ b/packages/core/src/core/processors/message-processor.ts
@@ -1,6 +1,12 @@
import { LLMClient } from "../llm-client";
-import type { Character, ProcessedResult, SuggestedOutput } from "../types";
+import type {
+ ActionIOHandler,
+ Character,
+ OutputIOHandler,
+ ProcessedResult,
+ SuggestedOutput,
+} from "../types";
import { LogLevel } from "../types";
import { getTimeContext, validateLLMResponseSchema } from "../utils";
@@ -44,8 +50,8 @@ export class MessageProcessor extends BaseProcessor {
content: any,
otherContext: string,
ioContext?: {
- availableOutputs: IOHandler[];
- availableActions: IOHandler[];
+ availableOutputs: OutputIOHandler[];
+ availableActions: ActionIOHandler[];
}
): Promise {
this.logger.debug("Processor.process", "Processing content", {
@@ -57,13 +63,13 @@ export class MessageProcessor extends BaseProcessor {
const outputsSchemaPart = ioContext?.availableOutputs
.map((handler) => {
- return `${handler.name}: ${JSON.stringify(zodToJsonSchema(handler.schema, handler.name))}`;
+ return `${handler.name}: ${JSON.stringify(zodToJsonSchema(handler.outputSchema!, handler.name))}`;
})
.join("\n");
const actionsSchemaPart = ioContext?.availableActions
.map((handler) => {
- return `${handler.name}: ${JSON.stringify(zodToJsonSchema(handler.schema, handler.name))}`;
+ return `${handler.name}: ${JSON.stringify(zodToJsonSchema(handler.outputSchema!, handler.name))}`;
})
.join("\n");
@@ -97,7 +103,14 @@ export class MessageProcessor extends BaseProcessor {
2. Only make tasks if you have been told, based off what you think is possible.
+
+ 1. Should you reply to the message?
+ 2. You should only reply if you have been mentioned in the message or you think you can help deeply.
+ 3. You should never respond to yourself.
+
+
+
# Speak in the following voice:
${JSON.stringify(this.character.voice)}
diff --git a/packages/core/src/core/processors/research-processor.ts b/packages/core/src/core/processors/research-processor.ts
index 063c40f0..5adb9f04 100644
--- a/packages/core/src/core/processors/research-processor.ts
+++ b/packages/core/src/core/processors/research-processor.ts
@@ -1,5 +1,5 @@
import { LLMClient } from "../llm-client";
-import type { Character } from "../types";
+import type { ActionIOHandler, Character, OutputIOHandler } from "../types";
import { LogLevel } from "../types";
import { getTimeContext, validateLLMResponseSchema } from "../utils";
import { z } from "zod";
@@ -199,13 +199,16 @@ export class ResearchQuantProcessor extends BaseProcessor {
}
}
- private buildHandlerSchemaPart(handlers?: IOHandler[]): string {
+ private buildHandlerSchemaPart(
+ handlers?: OutputIOHandler[] | ActionIOHandler[]
+ ): string {
if (!handlers || handlers.length === 0) return "None";
return handlers
+ .filter((handler) => handler.outputSchema)
.map(
(handler) =>
`${handler.name}: ${JSON.stringify(
- zodToJsonSchema(handler.schema, handler.name),
+ zodToJsonSchema(handler.outputSchema!, handler.name),
null,
2
)}`
@@ -216,8 +219,8 @@ export class ResearchQuantProcessor extends BaseProcessor {
private async combineChunkResults(
results: any[],
ioContext?: {
- availableOutputs: IOHandler[];
- availableActions: IOHandler[];
+ availableOutputs: OutputIOHandler[];
+ availableActions: ActionIOHandler[];
}
): Promise {
const prompt = `
@@ -320,8 +323,8 @@ export class ResearchQuantProcessor extends BaseProcessor {
content: any,
otherContext: string,
ioContext?: {
- availableOutputs: IOHandler[];
- availableActions: IOHandler[];
+ availableOutputs: OutputIOHandler[];
+ availableActions: ActionIOHandler[];
}
): Promise {
const contentStr =
diff --git a/packages/core/src/core/types/index.ts b/packages/core/src/core/types/index.ts
index 4c6757c5..cd58d329 100644
--- a/packages/core/src/core/types/index.ts
+++ b/packages/core/src/core/types/index.ts
@@ -509,18 +509,103 @@ export enum HandlerRole {
}
/**
- * A single interface for all Inputs, Outputs.
+ * A single interface for all Input, Output and Action handlers in the system.
+ * This provides a unified way to handle different types of I/O operations.
+ *
+ * @example
+ * ```ts
+ * // Register a chat input handler
+ * orchestrator.registerIOHandler({
+ * name: "user_chat",
+ * role: HandlerRole.INPUT,
+ * execute: async (message) => {
+ * return message;
+ * }
+ * });
+ * ```
*/
-export interface IOHandler {
- /** Unique name for this handler */
+
+/**
+ * Base interface for all IO handlers in the system
+ */
+interface BaseIOHandler {
+ /** Unique name identifier for this handler */
name: string;
+}
- /** "input" | "output" | (optionally "action") if you want more roles */
- role: HandlerRole;
+/**
+ * Handler for processing input data streams
+ * @example
+ * ```ts
+ * // Register an input handler for chat messages
+ * const handler: InputIOHandler = {
+ * name: "chat_input",
+ * role: HandlerRole.INPUT,
+ * execute: async (message) => {
+ * return processMessage(message);
+ * }
+ * };
+ * ```
+ */
+export interface InputIOHandler extends BaseIOHandler {
+ /** Identifies this as an input handler */
+ role: HandlerRole.INPUT;
+ /** Function to process input data */
+ execute?: (data: any) => Promise;
+ /** Sets up a subscription to receive streaming data */
+ subscribe?: (onData: (data: any) => void) => () => void;
+}
- /** The schema for the input handler */
- schema: z.ZodType;
+/**
+ * Handler for sending output data
+ * @example
+ * ```ts
+ * // Register an output handler for chat responses
+ * const handler: OutputIOHandler = {
+ * name: "chat_output",
+ * role: HandlerRole.OUTPUT,
+ * outputSchema: z.object({
+ * message: z.string()
+ * }),
+ * execute: async (response) => {
+ * await sendResponse(response);
+ * }
+ * };
+ * ```
+ */
+export interface OutputIOHandler extends BaseIOHandler {
+ /** Identifies this as an output handler */
+ role: HandlerRole.OUTPUT;
+ /** Required schema to validate output data */
+ outputSchema: z.ZodType;
+ /** Function to process and send output */
+ execute?: (data: any) => Promise;
+ /** Sets up a subscription to handle output streams */
+ subscribe?: (onData: (data: any) => void) => () => void;
+}
- /** The main function. For inputs, no payload is typically passed. For outputs, pass the data. */
- handler: (payload?: unknown) => Promise;
+/**
+ * Handler for performing actions/side effects
+ * @example
+ * ```ts
+ * // Register an action handler for database operations
+ * const handler: ActionIOHandler = {
+ * name: "db_action",
+ * role: HandlerRole.ACTION,
+ * execute: async (query) => {
+ * return await db.execute(query);
+ * }
+ * };
+ * ```
+ */
+export interface ActionIOHandler extends BaseIOHandler {
+ /** Identifies this as an action handler */
+ role: HandlerRole.ACTION;
+ /** Optional schema to validate action parameters */
+ outputSchema?: z.ZodType;
+ /** Function to execute the action */
+ execute?: (data: any) => Promise;
}
+
+/** Union type of all possible IO handler types */
+export type IOHandler = InputIOHandler | OutputIOHandler | ActionIOHandler;