Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: abstract db for other services, move sheduler out of core #74

Merged
merged 6 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions examples/example-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import { defaultCharacter } from "../packages/core/src/core/character";
import { Consciousness } from "../packages/core/src/core/consciousness";
import { z } from "zod";
import readline from "readline";
import { MongoDb } from "../packages/core/src/core/mongo-db";
import { MongoDb } from "../packages/core/src/core/db/mongo-db";
import { MasterProcessor } from "../packages/core/src/core/processors/master-processor";

async function main() {
const loglevel = LogLevel.DEBUG;
Expand All @@ -45,20 +46,29 @@ async function main() {
temperature: 0.3,
});

const researchProcessor = new ResearchQuantProcessor(
researchClient,
const masterProcessor = new MasterProcessor(
llmClient,
defaultCharacter,
loglevel,
1000 // chunk size, depends
loglevel
);

// Initialize processor with default character personality
const processor = new MessageProcessor(
const messageProcessor = new MessageProcessor(
llmClient,
defaultCharacter,
loglevel
);

const researchProcessor = new ResearchQuantProcessor(
researchClient,
defaultCharacter,
loglevel,
1000 // chunk size, depends
);

// Add processors to the master processor
masterProcessor.addProcessor([messageProcessor, researchProcessor]);

const scheduledTaskDb = new MongoDb(
"mongodb://localhost:27017",
"myApp",
Expand All @@ -74,7 +84,7 @@ async function main() {
const orchestrator = new Orchestrator(
roomManager,
vectorDb,
[processor, researchProcessor],
masterProcessor,
scheduledTaskDb,
{
level: loglevel,
Expand Down Expand Up @@ -162,13 +172,6 @@ async function main() {
orchestrator.registerIOHandler({
name: "user_chat",
role: HandlerRole.INPUT,
// This schema describes what a user message looks like
outputSchema: z.object({
content: z.string(),
userId: z.string().optional(),
}),
// For "on-demand" input handlers, the `handler()` can be a no-op.
// We'll call it manually with data, so we don't need an interval.
execute: async (payload) => {
// We simply return the payload so the Orchestrator can process it
return payload;
Expand Down Expand Up @@ -217,9 +220,11 @@ async function main() {
const outputs: any = await orchestrator.dispatchToInput(
"user_chat",
{
content: userMessage,
userId,
headers: {
"x-user-id": userId,
},
},
userMessage,
userId
);

Expand Down
17 changes: 13 additions & 4 deletions examples/example-discord.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import chalk from "chalk";
import { defaultCharacter } from "../packages/core/src/core/character";
import { z } from "zod";
import readline from "readline";
import { MongoDb } from "../packages/core/src/core/mongo-db";
import { MongoDb } from "../packages/core/src/core/db/mongo-db";
import { Message } from "discord.js";
import { MasterProcessor } from "../packages/core/src/core/processors/master-processor";

async function main() {
// Set logging level as you see fit
Expand All @@ -39,13 +40,21 @@ async function main() {
temperature: 0.3,
});

// Use a sample message processor with a default "character" config
const processor = new MessageProcessor(
const masterProcessor = new MasterProcessor(
llmClient,
defaultCharacter,
loglevel
);

// Initialize processor with default character personality
const messageProcessor = new MessageProcessor(
llmClient,
defaultCharacter,
loglevel
);

masterProcessor.addProcessor(messageProcessor);

// Connect to MongoDB (for scheduled tasks, if you use them)
const scheduledTaskDb = new MongoDb(
"mongodb://localhost:27017",
Expand All @@ -62,7 +71,7 @@ async function main() {
const core = new Orchestrator(
roomManager,
vectorDb,
[processor],
masterProcessor,
scheduledTaskDb,
{
level: loglevel,
Expand Down
32 changes: 18 additions & 14 deletions examples/example-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import { MessageProcessor } from "../packages/core/src/core/processors/message-p
import { defaultCharacter } from "../packages/core/src/core/character";

import { LogLevel } from "../packages/core/src/core/types";
import { MongoDb } from "../packages/core/src/core/mongo-db";
import { MongoDb } from "../packages/core/src/core/db/mongo-db";
import { MasterProcessor } from "../packages/core/src/core/processors/master-processor";

const scheduledTaskDb = new MongoDb(
"mongodb://localhost:27017",
Expand Down Expand Up @@ -51,18 +52,26 @@ async function createDaydreamsAgent() {
// 1.3. Room manager initialization
const roomManager = new RoomManager(vectorDb);

// 1.4. Initialize processor with default character
const processor = new MessageProcessor(
const masterProcessor = new MasterProcessor(
llmClient,
defaultCharacter,
loglevel
);

// Initialize processor with default character personality
const messageProcessor = new MessageProcessor(
llmClient,
defaultCharacter,
loglevel
);

masterProcessor.addProcessor(messageProcessor);

// 1.5. Initialize core system
const orchestrator = new Orchestrator(
roomManager,
vectorDb,
[processor],
masterProcessor,
scheduledTaskDb,
{
level: loglevel,
Expand All @@ -75,10 +84,6 @@ async function createDaydreamsAgent() {
orchestrator.registerIOHandler({
name: "user_chat",
role: HandlerRole.INPUT,
outputSchema: z.object({
content: z.string(),
userId: z.string().optional(),
}),
execute: async (payload) => {
return payload;
},
Expand Down Expand Up @@ -145,17 +150,16 @@ wss.on("connection", (ws) => {
throw new Error("userId is required");
}

orchestrator.initializeOrchestrator(userId);

// Process the message using the orchestrator with the provided userId
const outputs = await orchestrator.dispatchToInput(
"user_chat",
{
content: userMessage,
userId: userId,
headers: {
"x-user-id": userId,
},
},
userId,
orchestratorId ? new ObjectId(orchestratorId) : undefined
userMessage,
orchestratorId ? orchestratorId : undefined
);

// Send responses back through WebSocket
Expand Down
61 changes: 47 additions & 14 deletions examples/example-twitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import { defaultCharacter } from "../packages/core/src/core/character";
import { Consciousness } from "../packages/core/src/core/consciousness";
import { z } from "zod";
import readline from "readline";
import { MongoDb } from "../packages/core/src/core/mongo-db";
import { MongoDb } from "../packages/core/src/core/db/mongo-db";
import { SchedulerService } from "../packages/core/src/core/schedule-service";
import { MasterProcessor } from "../packages/core/src/core/processors/master-processor";
import { Logger } from "../packages/core/src/core/logger";

async function main() {
const loglevel = LogLevel.DEBUG;
Expand All @@ -40,13 +43,21 @@ async function main() {
temperature: 0.3,
});

const masterProcessor = new MasterProcessor(
llmClient,
defaultCharacter,
loglevel
);

// Initialize processor with default character personality
const processor = new MessageProcessor(
const messageProcessor = new MessageProcessor(
llmClient,
defaultCharacter,
loglevel
);

masterProcessor.addProcessor(messageProcessor);

const scheduledTaskDb = new MongoDb(
"mongodb://localhost:27017",
"myApp",
Expand All @@ -59,10 +70,10 @@ async function main() {
await scheduledTaskDb.deleteAll();

// Initialize core system
const core = new Orchestrator(
const orchestrator = new Orchestrator(
roomManager,
vectorDb,
[processor],
masterProcessor,
scheduledTaskDb,
{
level: loglevel,
Expand All @@ -71,6 +82,23 @@ async function main() {
}
);

const scheduler = new SchedulerService(
{
logger: new Logger({
level: loglevel,
enableColors: true,
enableTimestamp: true,
}),
orchestratorDb: scheduledTaskDb,
roomManager: roomManager,
vectorDb: vectorDb,
},
orchestrator,
10000
);

scheduler.start();

// Set up Twitter client with credentials
const twitter = new TwitterClient(
{
Expand All @@ -89,7 +117,7 @@ async function main() {
});

// Register input handler for Twitter mentions
core.registerIOHandler({
orchestrator.registerIOHandler({
name: "twitter_mentions",
role: HandlerRole.INPUT,
execute: async () => {
Expand All @@ -115,7 +143,7 @@ async function main() {
});

// Register input handler for autonomous thoughts
core.registerIOHandler({
orchestrator.registerIOHandler({
name: "consciousness_thoughts",
role: HandlerRole.INPUT,
execute: async () => {
Expand All @@ -132,7 +160,7 @@ async function main() {
});

// Register output handler for posting thoughts to Twitter
core.registerIOHandler({
orchestrator.registerIOHandler({
name: "twitter_thought",
role: HandlerRole.OUTPUT,
execute: async (data: unknown) => {
Expand All @@ -157,11 +185,16 @@ async function main() {
});

// Schedule a task to run every minute
await core.scheduleTaskInDb("sleever", "twitter_mentions", {}, 6000); // Check mentions every minute
await core.scheduleTaskInDb("sleever", "consciousness_thoughts", {}, 30000); // Think every 5 minutes
await scheduler.scheduleTaskInDb("sleever", "twitter_mentions", {}, 6000); // Check mentions every minute
await scheduler.scheduleTaskInDb(
"sleever",
"consciousness_thoughts",
{},
30000
); // Think every 5 minutes
Comment on lines +188 to +194
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Adjust task scheduling intervals to prevent rate limiting

The current intervals are too aggressive and could hit Twitter API rate limits:

  • Mentions check every 6 seconds could make up to 150 requests per 15-minute window, dangerously close to Twitter's 180 requests limit
  • Recommended changes:
    • Increase mentions check to at least 60 seconds (60000ms) - the default interval in the Twitter client
    • Keep consciousness thoughts at 30 seconds or higher since it's not API-intensive
🔗 Analysis chain

Verify task scheduling intervals.

The scheduling intervals seem quite aggressive:

  • Mentions check: 6 seconds
  • Consciousness thoughts: 30 seconds

Consider if these intervals might cause rate limiting issues with Twitter's API.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check Twitter API rate limit handling

# Search for rate limit handling in Twitter client
rg -A 10 "rate.*limit|RateLimit" 

Length of output: 782


Script:

#!/bin/bash
# Find Twitter-related files and check their content
echo "=== Twitter Files ==="
fd -t f twitter -I

echo -e "\n=== Twitter API Usage ==="
rg -i "twitter.*api|api.*twitter" -A 5

echo -e "\n=== Mentions Implementation ==="
rg "mentions" -A 10

Length of output: 18153


// Register output handler for Twitter replies
core.registerIOHandler({
orchestrator.registerIOHandler({
name: "twitter_reply",
role: HandlerRole.OUTPUT,
execute: async (data: unknown) => {
Expand Down Expand Up @@ -199,10 +232,10 @@ async function main() {

// Clean up resources
await consciousness.stop();
core.removeIOHandler("twitter_mentions");
core.removeIOHandler("consciousness_thoughts");
core.removeIOHandler("twitter_reply");
core.removeIOHandler("twitter_thought");
orchestrator.removeIOHandler("twitter_mentions");
orchestrator.removeIOHandler("consciousness_thoughts");
orchestrator.removeIOHandler("twitter_reply");
orchestrator.removeIOHandler("twitter_thought");
rl.close();

console.log(chalk.green("✅ Shutdown complete"));
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/core/chain-of-thought.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import { LogLevel } from "./types";
const ajv = new Ajv();

export class ChainOfThought extends EventEmitter {
private stepManager: StepManager;
stepManager: StepManager;
private context: ChainOfThoughtContext;
private snapshots: ChainOfThoughtContext[];
private logger: Logger;
Expand Down
Loading