Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: abstract db for other services, move sheduler out of core #74

Merged
merged 6 commits into from
Jan 30, 2025

Conversation

ponderingdemocritus
Copy link
Contributor

@ponderingdemocritus ponderingdemocritus commented Jan 30, 2025

Summary by CodeRabbit

  • Refactor

    • Reorganized MongoDB database interaction and task scheduling services
    • Updated import paths for core modules
    • Simplified ID handling by converting ObjectId to string
    • Removed TaskScheduler and replaced it with a new SchedulerService
  • New Features

    • Introduced SchedulerService for more robust task management
    • Added new interfaces for orchestrator and memory management
    • Added MasterProcessor for enhanced message processing capabilities
    • Introduced ChainOfThoughtProcessor for complex reasoning tasks
    • Enhanced message handling through the restructuring of processors
  • Chores

    • Improved logging and error handling in core services
    • Streamlined database interaction methods
    • Enhanced accessibility of properties in various classes

Copy link

vercel bot commented Jan 30, 2025

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
daydreams ✅ Ready (Inspect) Visit Preview 💬 Add feedback Jan 30, 2025 10:36am

Copy link
Contributor

coderabbitai bot commented Jan 30, 2025

Walkthrough

This pull request introduces significant architectural changes to the core system, focusing on restructuring the MongoDB database interaction, task scheduling, and orchestrator management. The modifications include moving the MongoDb class to a new directory, replacing the TaskScheduler with a new SchedulerService, and updating method signatures to use string-based identifiers. Additionally, the introduction of the MasterProcessor enhances how message processing is managed. These changes aim to improve the modularity and flexibility of the system's core components.

Changes

File Change Summary
examples/*-*.ts Updated MongoDb import path from ../packages/core/src/core/mongo-db to ../packages/core/src/core/db/mongo-db, added MasterProcessor, and updated processor management in main functions.
packages/core/src/core/orchestrator.ts Replaced mongoDb with orchestratorDb, removed TaskScheduler, updated method signatures to use string IDs.
packages/core/src/core/index.ts Removed TaskScheduler import/export, added SchedulerService.
packages/core/src/core/memory.ts Added new interfaces for ScheduledTask, OrchestratorMessage, OrchestratorChat, OrchestratorDb, and MemoryManager.
packages/core/src/core/db/mongo-db.ts Implemented OrchestratorDb interface, changed method signatures to use string IDs.
packages/core/src/core/schedule-service.ts Introduced new SchedulerService with task polling and management capabilities.
packages/core/src/core/processors/* Added MasterProcessor, updated processor management and exports.
packages/core/src/core/types/index.ts Added nextProcessor property to ProcessedResult interface.
packages/core/src/core/task-scheduler.ts Removed TaskScheduler class.

Possibly related PRs

Poem

🐰 Hop, hop, through code's new design,
Refactoring paths with a rabbit's fine line!
MongoDB moves, schedulers dance,
Strings replace IDs in this coding romance.
A system transformed, more modular and bright,
Our rabbit's code leaps to a new height! 🚀

✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🔭 Outside diff range comments (3)
packages/core/src/core/db/mongo-db.ts (1)

Line range hint 212-239: Unsuccessful queries if _id is stored as ObjectId.

The code uses { _id: orchestratorId } but orchestratorId is a string. If the _id in MongoDB is an ObjectId, this query will fail to match. Convert the string to an ObjectId or store _id as a string consistently:

await this.orchestratorCollection.updateOne(
-  { _id: orchestratorId },
+  { _id: new ObjectId(orchestratorId) },
  ...
);

...

return this.orchestratorCollection.findOne({
-  _id: orchestratorId,
+  _id: new ObjectId(orchestratorId),
});

Also applies to: 261-265, 268-283

packages/core/src/core/orchestrator.ts (1)

Line range hint 258-282: Ensure getOrchestratorById returns the correct record.

Because _id in Mongo might be stored as an ObjectId while orchestratorId is a string, this call risks returning null. Consider either storing _id as a string or casting from string to ObjectId in the query. Otherwise, each new orchestrator attempt will create a new record unnecessarily.

examples/example-server.ts (1)

Line range hint 228-252: Inconsistent ID handling in API endpoints.

The API endpoints still use ObjectId while the rest of the codebase has moved to string IDs. This inconsistency should be addressed.

-        let objectId;
-        try {
-            objectId = new ObjectId(chatId);
-        } catch (err) {
-            return res.status(400).json({ error: "Invalid chat ID format" });
-        }
-
-        const history = await scheduledTaskDb.getOrchestratorById(objectId);
+        const history = await scheduledTaskDb.getOrchestratorById(chatId);
🧹 Nitpick comments (10)
packages/core/src/core/schedule-service.ts (3)

23-32: Consider concurrency checks or leadership election for multiple service instances.

If multiple SchedulerService instances run in parallel, there is a possibility that more than one instance may pick up the same tasks. To avoid duplicate processing, consider implementing a “leadership election” or adding a stronger guard (like an atomic “compare and set” on task status) to ensure that only one scheduler processes a pending task at a time.


90-119: Validate scheduling parameters to prevent misconfigured intervals.

If an end-user sets a negative or extremely large interval, it could result in scheduling anomalies. A quick validation step (e.g., ensuring intervalMs >= 0) helps avoid unexpected behavior.


120-125: Consider graceful shutdown.

Although stop() clears the polling interval, any in-flight tasks won’t be canceled. If you need a truly graceful shutdown, ensure you also wait for currently running tasks to finish before exiting your application.

packages/core/src/core/db/mongo-db.ts (2)

133-133: Use a "failed" status for errors.
Marking tasks as “completed” or “failed” is a good approach to categorize the final outcome. Make sure that any relevant logs or metrics reflect the difference so that failed tasks can be retried or inspected.


150-150: Revert the status to “pending” only after verifying concurrency.

If multiple service instances run, one instance could update the same task’s nextRunAt, while another instance is concurrently running it. Consider an additional check to confirm the current status before re-pending the task.

packages/core/src/core/orchestrator.ts (1)

356-379: Scheduling tasks here is consistent but verify concurrency approach.

Creating tasks directly in the Orchestrator helps chain further actions. However, if multiple Orchestrator instances run concurrently, tasks may be scheduled more than once. Consider a uniqueness clause or deduplication approach to avoid duplicates for the same user/event.

packages/core/src/core/memory.ts (2)

33-34: Consider adding error types for database operations.

The connect and close methods could benefit from specific error types to handle different failure scenarios.

-    connect(): Promise<void>;
-    close(): Promise<void>;
+    connect(): Promise<void | DatabaseConnectionError>;
+    close(): Promise<void | DatabaseCloseError>;

9-9: Consider using a more specific type for taskData.

Using Record<string, any> reduces type safety. Consider defining a specific interface or type for task data based on your use cases.

-    taskData: Record<string, any>;
+    taskData: TaskData;

+interface TaskData {
+    // Define specific fields based on your use cases
+    [key: string]: string | number | boolean | object;
+}
examples/example-twitter.ts (1)

179-185: Consider extracting magic numbers for intervals.

The interval values should be defined as named constants at the top of the file for better maintainability.

+const MENTION_CHECK_INTERVAL_MS = 6000;  // Check mentions every minute
+const CONSCIOUSNESS_INTERVAL_MS = 30000;  // Think every 5 minutes

-    await scheduler.scheduleTaskInDb("sleever", "twitter_mentions", {}, 6000);
-    await scheduler.scheduleTaskInDb("sleever", "consciousness_thoughts", {}, 30000);
+    await scheduler.scheduleTaskInDb("sleever", "twitter_mentions", {}, MENTION_CHECK_INTERVAL_MS);
+    await scheduler.scheduleTaskInDb("sleever", "consciousness_thoughts", {}, CONSCIOUSNESS_INTERVAL_MS);
examples/example-api.ts (1)

Line range hint 1-13: Update file documentation to reflect architectural changes.

The file header comments should be updated to:

  1. Remove Twitter-specific references since this is now a general API example
  2. Document the database abstraction layer usage

Apply this diff to update the documentation:

 /**
- * Example demonstrating a Twitter bot using the Daydreams package.
+ * Example demonstrating a general API integration using the Daydreams package.
  * This bot can:
- * - Monitor Twitter mentions and auto-reply
- * - Generate autonomous thoughts and tweet them
+ * - Process API requests and auto-reply
+ * - Generate autonomous thoughts
  * - Maintain conversation memory using ChromaDB
+ * - Store scheduled tasks using MongoDB
  * - Process inputs through a character-based personality
  */
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5b25421 and 0696dcf.

📒 Files selected for processing (10)
  • examples/example-api.ts (1 hunks)
  • examples/example-discord.ts (1 hunks)
  • examples/example-server.ts (2 hunks)
  • examples/example-twitter.ts (8 hunks)
  • packages/core/src/core/db/mongo-db.ts (9 hunks)
  • packages/core/src/core/index.ts (2 hunks)
  • packages/core/src/core/memory.ts (1 hunks)
  • packages/core/src/core/orchestrator.ts (14 hunks)
  • packages/core/src/core/schedule-service.ts (1 hunks)
  • packages/core/src/core/task-scheduler.ts (0 hunks)
💤 Files with no reviewable changes (1)
  • packages/core/src/core/task-scheduler.ts
✅ Files skipped from review due to trivial changes (1)
  • examples/example-discord.ts
🔇 Additional comments (13)
packages/core/src/core/db/mongo-db.ts (3)

2-2: No issues found.
This import line is correct and maps to HandlerRole from ../types.


75-79: Be cautious storing _id as a string in tasks.

You are manually overriding the _id field with new ObjectId().toString(). While valid, this diverges from typical Mongo usage (where _id is ObjectId). Ensure that queries match this string-based _id, and be mindful of potential consistency issues if other parts of the code expect ObjectIds.


117-117: Atomic updates look good.

By using $set with {_id: taskId}, the code ensures consistent status updates for “running” tasks. Keep in mind that concurrency conflicts could arise if another scheduler tries to pick up the same task at the same time—assuming it's the intended design, you may want to use options like {upsert: false} or check the modifiedCount.

packages/core/src/core/orchestrator.ts (2)

Line range hint 56-65: Check ID consistency with OrchestratorDb.

The constructor now receives OrchestratorDb, which implies string-based IDs. Ensure each place referencing an Orchestrator ID uses a string or properly casts to/from an ObjectId so that queries match records in Mongo.


313-315: Logging and message additions look good.

Providing rich logs and storing messages for each step is beneficial for observability and debugging. Ensure sensitive data is properly sanitized before logging, if applicable.

Also applies to: 410-435

packages/core/src/core/index.ts (1)

19-19: LGTM! Clean replacement of TaskScheduler with SchedulerService.

The changes maintain the module's public API while moving the scheduler functionality out of core, aligning with the PR objectives.

Also applies to: 40-40

packages/core/src/core/memory.ts (1)

4-73: Well-structured interfaces with clear separation of concerns.

The interfaces provide a solid foundation for abstracting database operations and memory management.

examples/example-server.ts (2)

20-20: LGTM! Import path updated to reflect new directory structure.

The change is consistent with the architectural changes moving the MongoDb class.


154-154: LGTM! Simplified orchestratorId handling.

The change to use string IDs directly improves consistency and removes unnecessary ObjectId wrapping.

examples/example-twitter.ts (2)

76-92: Well-structured scheduler initialization with proper configuration.

Good separation of concerns and clear configuration of the scheduler service.


226-229: LGTM! Clean handler removal during shutdown.

The shutdown process properly removes all registered handlers.

examples/example-api.ts (2)

23-23: LGTM! Import path updated correctly.

The import path change aligns with the PR objective of abstracting database implementations.


Line range hint 67-71: Consider using a database interface for better abstraction.

Since the PR aims to abstract the database layer, consider updating the example to demonstrate how to use alternative database implementations.

Let's verify if an interface or base class exists for database implementations:

Also applies to: 73-74

Comment on lines 34 to 88
private async pollTasks() {
try {
const tasks = await this.context.orchestratorDb.findDueTasks();
for (const task of tasks) {
await this.context.orchestratorDb.markRunning(task._id);

const handler = this.orchestrator.getHandler(task.handlerName);
if (!handler) {
this.context.logger.warn("No handler found", "warn", {
name: task.handlerName,
});
continue;
}

// parse out data
const data = JSON.parse(task.taskData.task_data);

switch (handler.role) {
case HandlerRole.INPUT:
await this.orchestrator.dispatchToInput(
task.handlerName,
data,
task.userId
);
break;
case HandlerRole.ACTION:
await this.orchestrator.dispatchToAction(
task.handlerName,
data
);
break;
case HandlerRole.OUTPUT:
await this.orchestrator.dispatchToOutput(
task.handlerName,
data
);
break;
}

// handle recurring or complete
if (task.intervalMs) {
await this.context.orchestratorDb.updateNextRun(
task._id,
new Date(Date.now() + task.intervalMs)
);
} else {
await this.context.orchestratorDb.markCompleted(task._id);
}
}
} catch (err) {
this.context.logger.error("pollTasks error", "error", {
data: err,
});
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle task failures with a fallback flow.

If an error occurs after calling markRunning but before calling markCompleted, the task remains stuck in the “running” state. You may want to:

  1. Wrap each handler dispatch in its own try/catch block and mark the task as failed if an exception occurs.
  2. Consider a “retry” mechanism or a “failed” status to differentiate between normal completions and errors.
       for (const task of tasks) {
         try {
+          await this.context.orchestratorDb.markRunning(task._id);
           const handler = this.orchestrator.getHandler(task.handlerName);
           ...
+        } catch (error) {
+          await this.context.orchestratorDb.markCompleted(task._id, true);
+          this.context.logger.error("pollTasks error", "error", { error });
+        }
       }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +192 to +200
public async createOrchestrator(userId: string): Promise<string> {
const chat: OrchestratorChat = {
userId: userId,
createdAt: new Date(),
updatedAt: new Date(),
messages: [],
};
const result = await this.orchestratorCollection.insertOne(chat);
return result.insertedId;
return result.insertedId.toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential mismatch between _id storage for orchestrators and tasks.

Here, you rely on MongoDB’s default _id (an ObjectId) and convert it to a string, whereas for tasks you store a string-based _id directly in the document. Make sure to consistently store _id as either an ObjectId or a string across your collections to avoid confusion and difficulty in referencing records.

     const chat: OrchestratorChat = {
+        _id: new ObjectId().toString(),
         userId: userId,
         ...
     };
     const result = await this.orchestratorCollection.insertOne(chat);
-    return result.insertedId.toString();
+    return chat._id;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public async createOrchestrator(userId: string): Promise<string> {
const chat: OrchestratorChat = {
userId: userId,
createdAt: new Date(),
updatedAt: new Date(),
messages: [],
};
const result = await this.orchestratorCollection.insertOne(chat);
return result.insertedId;
return result.insertedId.toString();
public async createOrchestrator(userId: string): Promise<string> {
const chat: OrchestratorChat = {
_id: new ObjectId().toString(),
userId: userId,
createdAt: new Date(),
updatedAt: new Date(),
messages: [],
};
const result = await this.orchestratorCollection.insertOne(chat);
return chat._id;

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (11)
packages/core/src/core/processors/master-processor.ts (4)

17-33: Use a constructor-level comment or docstring to clarify usage and dependencies.

The constructor’s parameters are self-explanatory but adding a short docstring clarifying how the character, llmClient, and other fields fit together would improve maintainability and onboarding.


39-46: Ensure alignment of canHandle logic with contentLimit.

Currently, canHandle checks if the content length is < 1000, matching the default in the base class constructor. If you allow further changes to contentLimit, consider referencing this.contentLimit explicitly in the docstring or method name for clarity.


215-221: Leverage optional chaining to reduce complexity.

Static analysis suggests using an optional chain for safety. However, your current check is still valid. If you prefer a fluent style, consider this refactor:

- if (childProcessor && childProcessor.canHandle(content)) {
+ if (childProcessor?.canHandle(content)) {
🧰 Tools
🪛 Biome (1.9.4)

[error] 221-221: Change to an optional chain.

Unsafe fix: Change to an optional chain.

(lint/complexity/useOptionalChain)


238-285: Provide more nuanced fallback handling.

When an error occurs, the fallback response always returns neutral sentiment without giving the user enough context. Consider capturing partial results (if any) or providing an error field to alert upstream consumers.

packages/core/src/core/processor.ts (5)

10-11: Encourage a typed record for child processors.

While Map<string, BaseProcessor> is adequate, using a typed object literal or an expanded interface might enable clearer relationships and reduce possible runtime errors.


15-17: Constructor parameters could benefit from partial typing or defaults.

You removed the default for loggerLevel. If there are no strict requirements that it must always be provided, consider a default to avoid collisions in other places that instantiate BaseProcessor.


34-39: Expand docstring to convey the intended usage of getDescription.

This method currently returns the processor’s description. Documenting when and how it might be used (e.g., debugging, generating help text) helps future maintainers.


58-72: Validate child processors’ compatibility prior to adding.

addProcessor throws an error if a processor name exists. That’s a good safeguard. Additionally, consider verifying that child processors have unique roles or are valid for delegation, so you avoid hidden conflicts in the chain of responsibility.


74-79: Provide fallback or log if the requested name does not exist.

When getProcessor is called with a name that does not exist, returning undefined might silence debugging signals. Logging an info or debug message could help trace errors faster in large deployments.

packages/core/src/core/orchestrator.ts (2)

218-225: Move debug logging before action execution.

The debug logging should be placed before executing the action to ensure we capture the input context even if the action fails.

     try {
+        this.logger.debug(
+            "Orchestrator.dispatchToAction",
+            "Executing action",
+            {
+                name,
+                data,
+            }
+        );
         const result = await handler.execute(data);
-        this.logger.debug(
-            "Orchestrator.dispatchToAction",
-            "Executing action",
-            {
-                name,
-                data,
-            }
-        );
         return result;

346-369: Extract task scheduling logic into a separate method.

The task scheduling logic could be extracted into a dedicated method for better maintainability and reusability.

+ private async scheduleTask(
+     userId: string,
+     task: { name: string; data: any; intervalMs?: number }
+ ): Promise<void> {
+     const now = Date.now();
+     const nextRunAt = new Date(now + (task.intervalMs ?? 0));
+
+     this.logger.info(
+         "Orchestrator.scheduleTask",
+         `Scheduling task ${task.name}`,
+         {
+             nextRunAt,
+             intervalMs: task.intervalMs,
+         }
+     );
+
+     await this.orchestratorDb.createTask(
+         userId,
+         task.name,
+         {
+             request: task.name,
+             task_data: JSON.stringify(task.data),
+         },
+         nextRunAt,
+         task.intervalMs
+     );
+ }

  // In runAutonomousFlow:
- const now = Date.now();
- const nextRunAt = new Date(now + (task.intervalMs ?? 0));
-
- this.logger.info(
-     "Orchestrator.runAutonomousFlow",
-     `Scheduling task ${task.name}`,
-     {
-         nextRunAt,
-         intervalMs: task.intervalMs,
-     }
- );
-
- await this.orchestratorDb.createTask(
-     userId,
-     task.name,
-     {
-         request: task.name,
-         task_data: JSON.stringify(task.data),
-     },
-     nextRunAt,
-     task.intervalMs
- );
+ await this.scheduleTask(userId, task);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0696dcf and ae0d158.

📒 Files selected for processing (9)
  • examples/example-api.ts (3 hunks)
  • examples/example-discord.ts (3 hunks)
  • examples/example-server.ts (3 hunks)
  • examples/example-twitter.ts (9 hunks)
  • packages/core/src/core/orchestrator.ts (18 hunks)
  • packages/core/src/core/processor.ts (2 hunks)
  • packages/core/src/core/processors/index.ts (1 hunks)
  • packages/core/src/core/processors/master-processor.ts (1 hunks)
  • packages/core/src/core/types/index.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • examples/example-api.ts
🧰 Additional context used
🪛 Biome (1.9.4)
packages/core/src/core/processors/master-processor.ts

[error] 221-221: Change to an optional chain.

Unsafe fix: Change to an optional chain.

(lint/complexity/useOptionalChain)

🔇 Additional comments (14)
packages/core/src/core/processors/master-processor.ts (1)

48-55: Validate otherContext to avoid potential unhandled edge cases.

If otherContext is empty or has unexpected data types, it could lead to unexpected behaviors within the prompt. Consider input validation or at least logging a warning if otherContext is not a string.

packages/core/src/core/processors/index.ts (1)

3-3: Exporting MasterProcessor.

This addition successfully exposes MasterProcessor from the module, aligning with other processor exports. No concerns here.

packages/core/src/core/processor.ts (1)

53-54: Optional properties for ioContext are helpful for flexibility.

Updating these properties to optional is a good step to enhance reusability. Ensure any calling code gracefully handles undefined outputs or actions.

packages/core/src/core/types/index.ts (1)

288-288: Verify processor chain handling.

The addition of nextProcessor enables dynamic processor chaining, but we should ensure proper handling of processor chains to prevent potential issues.

Run this script to check for potential circular dependencies in processor chains:

✅ Verification successful

No processor chain implementation concerns found

The nextProcessor property is currently unused and optional. The existing processor management system has proper safeguards against duplicate processors, and the current implementation uses a simple delegation pattern without complex chaining.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Search for processor chain implementations to verify proper handling

# Look for processor chain implementations
echo "Searching for processor chain implementations..."
rg -A 10 "nextProcessor.*=.*" 

# Look for circular chain prevention logic
echo "Searching for circular chain prevention..."
rg -A 10 "addProcessor|getProcessor"

Length of output: 4945

examples/example-discord.ts (1)

43-57: LGTM! Clean implementation of the new processor architecture.

The initialization and registration of processors follow a clear pattern:

  1. Create master processor
  2. Create message processor
  3. Register message processor with master processor
examples/example-server.ts (2)

55-69: LGTM! Consistent implementation of the processor architecture.

The processor initialization follows the same pattern as other examples, maintaining consistency across the codebase.


163-163: Verify the impact of orchestratorId type change.

The orchestratorId is now passed directly without wrapping in ObjectId. Ensure this change is compatible with MongoDB operations.

✅ Verification successful

The orchestratorId type change is safe and compatible

The codebase consistently uses string type for orchestratorId across all MongoDB operations (find, insert, update). The removal of ObjectId wrapping aligns with the existing implementation in the database layer.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check MongoDB operations with orchestratorId

# Search for MongoDB operations using orchestratorId
rg -A 5 "orchestratorId.*=|getOrchestratorById"

Length of output: 2994


Script:

#!/bin/bash
# Check for ObjectId usage and other MongoDB operations
rg -l "ObjectId|new Mongo" && \
ast-grep --pattern 'import { ObjectId } from "mongodb"' && \
rg "orchestratorCollection.*find|orchestratorCollection.*update|orchestratorCollection.*insert"

Length of output: 966

examples/example-twitter.ts (2)

46-60: LGTM! Consistent processor architecture implementation.

The processor initialization maintains consistency with other examples in the codebase.


85-101: Well-structured scheduler implementation with proper configuration.

The SchedulerService is initialized with:

  • Appropriate logger configuration
  • Required dependencies
  • Reasonable polling interval (10 seconds)
packages/core/src/core/orchestrator.ts (5)

26-29: LGTM! Well-documented database abstraction.

The introduction of OrchestratorDb type and its documentation improves the modularity of the system by abstracting database operations.


68-71: LGTM! Enhanced logging initialization.

The addition of initialization logging helps with debugging and monitoring the orchestrator's lifecycle.


134-136: LGTM! Improved handler management logging.

The structured logging in removeIOHandler enhances debugging capabilities by providing clear context about handler removal operations.


564-570: LGTM! Well-documented content processing logic.

The comprehensive documentation clearly explains the processing steps and improves code maintainability.


36-40: Verify unsubscriber cleanup in all scenarios.

The unsubscribers map stores cleanup functions for handlers. Ensure that all scenarios (error cases, re-initialization, etc.) properly clean up these references to prevent memory leaks.

✅ Verification successful

The unsubscriber cleanup implementation is correct and complete

The code properly manages unsubscriber lifecycle:

  • Unsubscribers are created only for input handlers with subscribe functionality
  • Each unsubscriber is properly cleaned up in removeIOHandler before the handler is removed
  • Error handling and logging are in place throughout the implementation
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for all places where unsubscribers are added but not cleaned up
rg -A 5 "this\.unsubscribers\.set" .
rg -A 5 "this\.unsubscribers\.delete" .

Length of output: 892


Script:

#!/bin/bash
# Search for cleanup methods and error handling
ast-grep --pattern 'class Orchestrator {
  $$$
  destroy() {
    $$$
  }
  $$$
}'

# Look for shutdown or cleanup methods
rg -B 5 "unsubscribers\.clear" .

# Check error handling in registration
rg -A 10 "registerIOHandler.*async" .

Length of output: 158


Script:

#!/bin/bash
# Get the full implementation of registerIOHandler and related methods
ast-grep --pattern 'class Orchestrator {
  $$$
  registerIOHandler($_) {
    $$$
  }
  $$$
}'

# Look for any cleanup-related methods
rg -A 5 "cleanup|dispose|destroy|shutdown" packages/core/src/core/orchestrator.ts

# Check error handling around unsubscribers
rg -A 5 "try.*{.*unsubscribers" packages/core/src/core/orchestrator.ts

Length of output: 362


Script:

#!/bin/bash
# Get the full content of orchestrator.ts
cat packages/core/src/core/orchestrator.ts

# Look for try-catch blocks
rg "try\s*{" -A 5 packages/core/src/core/orchestrator.ts

# Look for cleanup patterns with fixed regex
rg "(cleanup|dispose|destroy|shutdown)" packages/core/src/core/orchestrator.ts

Length of output: 22198

Comment on lines +188 to +194
await scheduler.scheduleTaskInDb("sleever", "twitter_mentions", {}, 6000); // Check mentions every minute
await scheduler.scheduleTaskInDb(
"sleever",
"consciousness_thoughts",
{},
30000
); // Think every 5 minutes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Adjust task scheduling intervals to prevent rate limiting

The current intervals are too aggressive and could hit Twitter API rate limits:

  • Mentions check every 6 seconds could make up to 150 requests per 15-minute window, dangerously close to Twitter's 180 requests limit
  • Recommended changes:
    • Increase mentions check to at least 60 seconds (60000ms) - the default interval in the Twitter client
    • Keep consciousness thoughts at 30 seconds or higher since it's not API-intensive
🔗 Analysis chain

Verify task scheduling intervals.

The scheduling intervals seem quite aggressive:

  • Mentions check: 6 seconds
  • Consciousness thoughts: 30 seconds

Consider if these intervals might cause rate limiting issues with Twitter's API.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check Twitter API rate limit handling

# Search for rate limit handling in Twitter client
rg -A 10 "rate.*limit|RateLimit" 

Length of output: 782


Script:

#!/bin/bash
# Find Twitter-related files and check their content
echo "=== Twitter Files ==="
fd -t f twitter -I

echo -e "\n=== Twitter API Usage ==="
rg -i "twitter.*api|api.*twitter" -A 5

echo -e "\n=== Mentions Implementation ==="
rg "mentions" -A 10

Length of output: 18153

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (4)
packages/core/src/core/processors/chain-of-thought-processor.ts (3)

8-31: Encapsulate or validate memory usage for reliable DB abstraction.
The constructor injects memory: VectorDB directly into the ChainOfThought instance. While this is consistent with the new DB abstraction approach, consider adding validation or error handling if the injected VectorDB fails to initialize or has limited availability (e.g., unavailable network resources). Encapsulating these checks can improve the processor’s resilience and clarify the system’s resource requirements.


33-49: Validate textual content more robustly in canHandle.
The canHandle method checks for keywords ("goal", "plan", etc.) and content length but does not account for edge cases like extremely large or non-textual input (e.g., binary data, HTML). For broader usage, consider implementing defensive checks or limiting memory usage for large inputs.


211-218: Extend error information when rethrowing.
While rethrowing the error preserves the stack trace, consider adding context about which content triggered the failure for extended observability. This helps pinpoint the cause of repeated errors in logs or incident reports.

packages/core/src/core/chain-of-thought.ts (1)

31-31: Reevaluate making stepManager public.
The stepManager property was changed to public, potentially exposing it to outside modifications. If external classes only need read access, consider a getter or read-only property. This preserves encapsulation while still supporting your new scheduler architecture and DB abstraction.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ae0d158 and 6a93713.

📒 Files selected for processing (2)
  • packages/core/src/core/chain-of-thought.ts (1 hunks)
  • packages/core/src/core/processors/chain-of-thought-processor.ts (1 hunks)
🔇 Additional comments (1)
packages/core/src/core/processors/chain-of-thought-processor.ts (1)

51-73: Check for null/undefined ioContext references before usage.
Although the code checks if (ioContext) at line 66, ensure future expansions account for partial definitions (e.g., ioContext exists but availableOutputs is missing). Using optional chaining or additional checks avoids potential runtime errors if future refactors alter the shape of ioContext.

Comment on lines +75 to +170
try {
// First, decompose the content into goals
await this.chainOfThought.decomposeObjectiveIntoGoals(
typeof content === "string" ? content : JSON.stringify(content)
);

const stats = {
completed: 0,
failed: 0,
total: 0,
};

// Execute goals until completion
while (true) {
const readyGoals =
this.chainOfThought.goalManager.getReadyGoals();
const activeGoals = this.chainOfThought.goalManager
.getGoalsByHorizon("short")
.filter((g) => g.status === "active");
const pendingGoals = this.chainOfThought.goalManager
.getGoalsByHorizon("short")
.filter((g) => g.status === "pending");

// Log progress
this.logger.debug(
"ChainOfThoughtProcessor.process",
"Goal execution progress:",
JSON.stringify({
ready: readyGoals.length,
active: activeGoals.length,
pending: pendingGoals.length,
completed: stats.completed,
failed: stats.failed,
})
);

// Check if all goals are complete
if (
readyGoals.length === 0 &&
activeGoals.length === 0 &&
pendingGoals.length === 0
) {
this.logger.debug(
"ChainOfThoughtProcessor.process",
"All goals completed!",
{
ready: readyGoals.length,
active: activeGoals.length,
pending: pendingGoals.length,
completed: stats.completed,
failed: stats.failed,
}
);
break;
}

// Handle blocked goals
if (readyGoals.length === 0 && activeGoals.length === 0) {
this.logger.warn(
"ChainOfThoughtProcessor.process",
"No ready or active goals, but some goals are pending",
{
pending: pendingGoals.length,
}
);
pendingGoals.forEach((goal) => {
const blockingGoals =
this.chainOfThought.goalManager.getBlockingGoals(
goal.id
);
this.logger.warn(
"ChainOfThoughtProcessor.process",
`Pending Goal: ${goal.description}`,
{
blockedBy: blockingGoals.length,
}
);
});
break;
}

// Execute next goal
try {
await this.chainOfThought.processHighestPriorityGoal();
stats.completed++;
} catch (error) {
this.logger.error(
"ChainOfThoughtProcessor.process",
"Goal execution failed:",
error
);
stats.failed++;
}

stats.total++;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding a time-limit or iteration cap to avoid potential infinite loops.
The while loop at line 88 breaks when goals are completed or none are ready, but there's a possibility of stalling if goals remain in a conflicting state. Incorporating a maximum iteration or a time-based cutoff helps avoid indefinite processing under unexpected conditions. Furthermore, providing logs or system events when the loop terminates prematurely can aid in debugging.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🔭 Outside diff range comments (1)
packages/core/src/core/orchestrator.ts (1)

Line range hint 502-529: Improve error handling in dispatchToInput.

The error handling could be more informative and consistent with the rest of the codebase:

 try {
     const result = await handler.execute(data);
     if (result) {
         return await this.runAutonomousFlow(
             result,
             handler.name,
             userId,
             orchestratorId
         );
     }
     return [];
 } catch (error) {
-    this.logger.error(
-        "dispatchToInput Error",
-        `dispatchToInput Error: ${
-            error instanceof Error ? error.message : String(error)
-        }`
-    );
+    this.logger.error(
+        "Orchestrator.dispatchToInput",
+        "Failed to execute input handler",
+        {
+            name,
+            error: error instanceof Error ? error.message : String(error),
+            userId,
+            orchestratorId
+        }
+    );
+    throw error; // Re-throw to allow proper error handling upstream
 }
🧹 Nitpick comments (7)
packages/core/src/core/processors/master-processor.ts (4)

17-33: Consider enhancing logging and description for better observability.

As this is a master processor responsible for content delegation:

  1. Consider using LogLevel.INFO as the default to improve observability of routing decisions
  2. Enhance the description to better reflect its role in content delegation and processing orchestration
-        logLevel: LogLevel = LogLevel.ERROR
+        logLevel: LogLevel = LogLevel.INFO
         ) {
         super(
             {
                 name: "master",
                 description:
-                    "This processor handles messages or short text inputs.",
+                    "Master processor that analyzes content, delegates to specialized processors, and orchestrates processing flows for messages and short text inputs.",
             },

48-115: Extract prompt construction to improve maintainability.

The prompt construction logic is complex and could benefit from being moved to a separate method or configuration file. This would make it easier to maintain and test the prompt template independently.

+    protected getProcessingPrompt(
+        content: string,
+        otherContext: string,
+        processorContext: string,
+        outputsSchemaPart?: string,
+        actionsSchemaPart?: string
+    ): string {
+        return `You are a master processor...`; // Move the entire prompt template here
+    }
+
     async process(
         content: any,
         otherContext: string,
         ioContext?: {
             availableOutputs: OutputIOHandler[];
             availableActions: ActionIOHandler[];
         }
     ): Promise<ProcessedResult> {

199-219: Use optional chaining as suggested by static analysis.

The code could benefit from using optional chaining for better readability and safety.

-            if (result.classification.delegateToProcessor) {
-                const childProcessor = this.getProcessor(
-                    result.classification.delegateToProcessor
-                );
-                if (childProcessor && childProcessor.canHandle(content)) {
+            const childProcessor = result.classification.delegateToProcessor
+                ? this.getProcessor(result.classification.delegateToProcessor)
+                : null;
+            if (childProcessor?.canHandle(content)) {
🧰 Tools
🪛 Biome (1.9.4)

[error] 204-204: Change to an optional chain.

Unsafe fix: Change to an optional chain.

(lint/complexity/useOptionalChain)


246-248: Enhance error logging for better debugging.

The error logging could be more detailed to help with debugging processing failures.

-            this.logger.error("Processor.process", "Processing failed", {
-                error,
-            });
+            this.logger.error("Processor.process", "Processing failed", {
+                error,
+                content: typeof content === 'string' ? content : JSON.stringify(content),
+                errorMessage: error instanceof Error ? error.message : String(error),
+                stack: error instanceof Error ? error.stack : undefined
+            });
packages/core/src/core/orchestrator.ts (3)

46-53: Consider reordering constructor parameters for better maintainability.

The constructor parameters mix core dependencies with optional configurations. Consider grouping required dependencies first, followed by optional parameters:

 constructor(
-    private readonly roomManager: RoomManager,
-    vectorDb: VectorDB,
-    private processor: BaseProcessor,
-    orchestratorDb: OrchestratorDb,
-    config?: LoggerConfig
+    private readonly orchestratorDb: OrchestratorDb,
+    private readonly processor: BaseProcessor,
+    private readonly roomManager: RoomManager,
+    vectorDb: VectorDB,
+    config?: LoggerConfig
 )

346-369: Refactor task scheduling logic into a separate method.

The task scheduling logic within runAutonomousFlow is complex and would be more maintainable as a separate method.

+private async scheduleTask(
+    userId: string,
+    task: { name: string; intervalMs?: number; data: unknown }
+): Promise<void> {
+    const now = Date.now();
+    const nextRunAt = new Date(now + (task.intervalMs ?? 0));
+
+    this.logger.info(
+        "Orchestrator.scheduleTask",
+        `Scheduling task ${task.name}`,
+        {
+            nextRunAt,
+            intervalMs: task.intervalMs,
+        }
+    );
+
+    await this.orchestratorDb.createTask(
+        userId,
+        task.name,
+        {
+            request: task.name,
+            task_data: JSON.stringify(task.data),
+        },
+        nextRunAt,
+        task.intervalMs
+    );
+}

 // In runAutonomousFlow:
-const now = Date.now();
-const nextRunAt = new Date(now + (task.intervalMs ?? 0));
-
-this.logger.info(
-    "Orchestrator.runAutonomousFlow",
-    `Scheduling task ${task.name}`,
-    {
-        nextRunAt,
-        intervalMs: task.intervalMs,
-    }
-);
-
-await this.orchestratorDb.createTask(
-    userId,
-    task.name,
-    {
-        request: task.name,
-        task_data: JSON.stringify(task.data),
-    },
-    nextRunAt,
-    task.intervalMs
-);
+await this.scheduleTask(userId, task);

8-8: Add JSDoc documentation for the OrchestratorDb interface.

The OrchestratorDb interface is a crucial part of the system's architecture, but it lacks proper documentation. Consider adding comprehensive JSDoc comments to document the interface's purpose, methods, and expected behavior.

+/**
+ * Interface for database operations related to orchestrator functionality.
+ * Implementations of this interface handle persistence of orchestrator state,
+ * messages, and scheduled tasks.
+ */
 import type { OrchestratorDb } from "./memory";
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6a93713 and fb2f42b.

📒 Files selected for processing (3)
  • packages/core/src/core/orchestrator.ts (18 hunks)
  • packages/core/src/core/processors/master-processor.ts (1 hunks)
  • packages/core/src/core/processors/research-processor.ts (0 hunks)
💤 Files with no reviewable changes (1)
  • packages/core/src/core/processors/research-processor.ts
🧰 Additional context used
🪛 Biome (1.9.4)
packages/core/src/core/processors/master-processor.ts

[error] 204-204: Change to an optional chain.

Unsafe fix: Change to an optional chain.

(lint/complexity/useOptionalChain)

🔇 Additional comments (2)
packages/core/src/core/processors/master-processor.ts (2)

1-16: LGTM! Clean and well-organized imports.

The imports are properly organized, separating type imports from regular imports, and all imported entities are used in the implementation.


235-235: Address the TODO comment regarding memory abstraction.

The TODO comment indicates a need to fix the memory abstraction. This should be tracked and addressed.

Would you like me to help create an issue to track this TODO or propose a solution for the memory abstraction?

Comment on lines +39 to +46
public canHandle(content: any): boolean {
// Convert content to string for length check
const contentStr =
typeof content === "string" ? content : JSON.stringify(content);

// Check if content is short enough for message processing (<1000 chars)
return contentStr.length < this.contentLimit;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add type safety and error handling to content validation.

Several improvements could enhance the robustness of content handling:

  1. The contentLimit property is not defined in the visible code
  2. JSON.stringify could throw on circular references
  3. The content parameter could benefit from type narrowing
+    protected readonly contentLimit = 1000; // Define the limit
+
     public canHandle(content: any): boolean {
+        try {
             // Convert content to string for length check
             const contentStr =
                 typeof content === "string" ? content : JSON.stringify(content);
 
             // Check if content is short enough for message processing (<1000 chars)
             return contentStr.length < this.contentLimit;
+        } catch (error) {
+            this.logger.warn("canHandle failed", { error });
+            return false;
+        }
     }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 267 to 273
const existingOrchestrator =
await this.orchestratorDb.getOrchestratorById(orchestratorId);

if (!existingOrchestrator) {
orchestratorId = await this.mongoDb.createOrchestrator(userId);
orchestratorId =
await this.orchestratorDb.createOrchestrator(userId);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential race condition in orchestrator creation.

There's a potential race condition between checking for an existing orchestrator and creating a new one. If multiple requests arrive simultaneously, this could lead to multiple orchestrators being created for the same ID.

Consider using a transaction or implementing proper locking:

-const existingOrchestrator =
-    await this.orchestratorDb.getOrchestratorById(orchestratorId);
-
-if (!existingOrchestrator) {
-    orchestratorId =
-        await this.orchestratorDb.createOrchestrator(userId);
-}
+orchestratorId = await this.orchestratorDb.getOrCreateOrchestrator(
+    orchestratorId,
+    userId
+);

This would require adding a new atomic method getOrCreateOrchestrator to the OrchestratorDb interface.

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
packages/core/src/core/schedule-service.ts (1)

34-102: ⚠️ Potential issue

Handle task failures with a fallback flow.

If an error occurs after calling markRunning but before calling markCompleted, the task remains stuck in the "running" state.

     try {
         const tasks = await this.context.orchestratorDb.findDueTasks();
         for (const task of tasks) {
+            try {
                 await this.context.orchestratorDb.markRunning(task._id);
                 const handler = this.orchestrator.getHandler(task.handlerName);
                 ...
+            } catch (error) {
+                await this.context.orchestratorDb.markCompleted(task._id, true);
+                this.context.logger.error("Task processing error", "error", { taskId: task._id, error });
+                continue;
+            }
         }
     } catch (err) {
         this.context.logger.error("pollTasks error", "error", { data: err });
     }
packages/core/src/core/orchestrator.ts (1)

290-294: ⚠️ Potential issue

Fix potential race condition in orchestrator creation.

There's a potential race condition between checking for an existing orchestrator and creating a new one.

Consider using a transaction or implementing proper locking:

-const existing =
-    await this.orchestratorDb.getOrchestratorById(orchestratorId);
-if (!existing) {
-    orchestratorId =
-        await this.orchestratorDb.createOrchestrator(userId);
-}
+orchestratorId = await this.orchestratorDb.getOrCreateOrchestrator(
+    orchestratorId,
+    userId
+);
🧹 Nitpick comments (4)
packages/core/src/core/schedule-service.ts (1)

14-32: Add validation for the polling interval.

The polling interval should be validated to ensure it's within reasonable bounds (e.g., not too short to avoid overwhelming the system, not too long to maintain responsiveness).

 constructor(
     private context: IOrchestratorContext,
     private orchestrator: Orchestrator,
-    private pollMs: number = 10_000
+    private pollMs: number = 10_000
 ) {
+    if (pollMs < 1000) {
+        throw new Error('Polling interval must be at least 1 second');
+    }
+    if (pollMs > 3600000) {
+        throw new Error('Polling interval must not exceed 1 hour');
+    }
 }
packages/core/src/core/types/index.ts (2)

288-288: Document the nextProcessor property usage.

The nextProcessor property enables processor chaining, but its usage pattern isn't immediately clear.

Add JSDoc comments to explain:

  • Expected format of the processor name
  • How it's used in the processing pipeline
  • Example usage
+  /**
+   * Name of the next processor to handle this result.
+   * @example "sentiment-analyzer" or "translation-processor"
+   */
   nextProcessor?: string;

614-616: Consider extending AgentRequest interface.

The interface currently only includes headers, but might need to be extended in the future.

Consider adding commonly needed request properties:

 export interface AgentRequest {
     headers: Record<string, string>;
+    method?: string;
+    path?: string;
+    query?: Record<string, string>;
+    body?: unknown;
 }
packages/core/src/core/orchestrator.ts (1)

592-600: Consider batching memory operations.

The code performs multiple database operations sequentially when saving memories.

Consider batching the operations for better performance:

-await this.roomManager.addMemory(
-    content.room,
-    JSON.stringify(result.content),
-    {
-        source,
-        ...result.metadata,
-        ...result.enrichedContext,
-    }
-);
-await this.roomManager.markContentAsProcessed(
-    content.contentId,
-    content.room
-);
+await Promise.all([
+    this.roomManager.addMemory(
+        content.room,
+        JSON.stringify(result.content),
+        {
+            source,
+            ...result.metadata,
+            ...result.enrichedContext,
+        }
+    ),
+    this.roomManager.markContentAsProcessed(
+        content.contentId,
+        content.room
+    )
+]);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fb2f42b and 3971a0a.

📒 Files selected for processing (5)
  • examples/example-api.ts (4 hunks)
  • examples/example-server.ts (3 hunks)
  • packages/core/src/core/orchestrator.ts (21 hunks)
  • packages/core/src/core/schedule-service.ts (1 hunks)
  • packages/core/src/core/types/index.ts (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • examples/example-api.ts
🔇 Additional comments (6)
packages/core/src/core/schedule-service.ts (2)

7-12: LGTM! Well-defined interface with clear dependencies.

The interface clearly defines all required dependencies for the scheduler service.


134-139: LGTM! Clean implementation of the stop method.

The method properly checks for and cleans up the polling interval.

examples/example-server.ts (3)

20-21: LGTM! Import paths updated to reflect new structure.

The import paths have been correctly updated to match the new directory structure, with MongoDb moved to the db subdirectory and MasterProcessor added from the processors subdirectory.


55-68: Verify processor initialization order.

The code initializes both MasterProcessor and MessageProcessor correctly. However, ensure that any processor-specific configurations or state are properly transferred when adding MessageProcessor to MasterProcessor.

Consider documenting the processor hierarchy and initialization requirements to help maintainers understand the correct setup order.


157-162: Ensure consistent user ID handling.

The code now correctly passes user information through request headers. However, verify that all downstream components consistently extract and use the user ID from x-user-id header.

Run the following script to verify consistent user ID handling:

✅ Verification successful

User ID handling is consistent across the codebase

The x-user-id header is consistently used for passing and extracting user information, with proper fallback handling in the orchestrator component.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for consistent user ID extraction across the codebase

# Search for x-user-id header usage
rg -A 5 'x-user-id'

# Search for userId extraction from headers
ast-grep --pattern 'headers["x-user-id"]'

Length of output: 2664


Script:

#!/bin/bash
# Search for other potential user ID header patterns
rg -i "user.?id.*header" -A 3
rg -i "header.*user.?id" -A 3

# Search for different header access patterns
ast-grep --pattern 'headers.$_["user"]'
ast-grep --pattern 'headers.$_["User"]'
ast-grep --pattern 'get.*[Hh]eader.*[Uu]ser'

# Search for direct userId access
rg "getUserId|get_user_id" -A 3

Length of output: 939

packages/core/src/core/orchestrator.ts (1)

Line range hint 26-62: LGTM! Improved initialization and logging.

The orchestrator initialization has been improved with better property documentation and consistent logging.

Comment on lines +104 to +132
public async scheduleTaskInDb(
userId: string,
handlerName: string,
data: Record<string, unknown> = {},
intervalMs?: number
): Promise<string> {
const now = Date.now();
const nextRunAt = new Date(now + (intervalMs ?? 0));

this.context.logger.info(
"SchedulerService.scheduleTaskInDb",
`Scheduling task ${handlerName}`,
{
nextRunAt,
intervalMs,
}
);

return await this.context.orchestratorDb.createTask(
userId,
handlerName,
{
request: handlerName,
task_data: JSON.stringify(data),
},
nextRunAt,
intervalMs
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate handler existence before scheduling.

The method should verify that the handler exists before scheduling a task to prevent scheduling tasks for non-existent handlers.

 public async scheduleTaskInDb(
     userId: string,
     handlerName: string,
     data: Record<string, unknown> = {},
     intervalMs?: number
 ): Promise<string> {
+    const handler = this.orchestrator.getHandler(handlerName);
+    if (!handler) {
+        throw new Error(`No handler registered with name: ${handlerName}`);
+    }
+
     const now = Date.now();
     const nextRunAt = new Date(now + (intervalMs ?? 0));
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public async scheduleTaskInDb(
userId: string,
handlerName: string,
data: Record<string, unknown> = {},
intervalMs?: number
): Promise<string> {
const now = Date.now();
const nextRunAt = new Date(now + (intervalMs ?? 0));
this.context.logger.info(
"SchedulerService.scheduleTaskInDb",
`Scheduling task ${handlerName}`,
{
nextRunAt,
intervalMs,
}
);
return await this.context.orchestratorDb.createTask(
userId,
handlerName,
{
request: handlerName,
task_data: JSON.stringify(data),
},
nextRunAt,
intervalMs
);
}
public async scheduleTaskInDb(
userId: string,
handlerName: string,
data: Record<string, unknown> = {},
intervalMs?: number
): Promise<string> {
const handler = this.orchestrator.getHandler(handlerName);
if (!handler) {
throw new Error(`No handler registered with name: ${handlerName}`);
}
const now = Date.now();
const nextRunAt = new Date(now + (intervalMs ?? 0));
this.context.logger.info(
"SchedulerService.scheduleTaskInDb",
`Scheduling task ${handlerName}`,
{
nextRunAt,
intervalMs,
}
);
return await this.context.orchestratorDb.createTask(
userId,
handlerName,
{
request: handlerName,
task_data: JSON.stringify(data),
},
nextRunAt,
intervalMs
);
}

Comment on lines +364 to 386
// Possibly schedule any tasks in the DB
if (processed.updateTasks) {
for (const task of processed.updateTasks) {
await this.scheduleTaskInDb(
userId,
task.name,
task.data,
task.intervalMs
const now = Date.now();
const nextRunAt = new Date(
now + (task.intervalMs ?? 0)
);

this.logger.debug(
this.logger.info(
"Orchestrator.runAutonomousFlow",
"Scheduled task in DB",
`Scheduling task ${task.name}`,
{ nextRunAt, intervalMs: task.intervalMs }
);

await this.orchestratorDb.createTask(
userId,
task.name,
{
task,
}
request: task.name,
task_data: JSON.stringify(task.data),
},
nextRunAt,
task.intervalMs
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve task scheduling error handling.

The task scheduling code lacks error handling and validation.

Add error handling and validation:

 if (processed.updateTasks) {
     for (const task of processed.updateTasks) {
+        if (!task.name) {
+            this.logger.warn(
+                "Orchestrator.runAutonomousFlow",
+                "Skipping task with missing name"
+            );
+            continue;
+        }
+        
+        if (task.intervalMs < 0) {
+            this.logger.warn(
+                "Orchestrator.runAutonomousFlow",
+                "Invalid interval, using default",
+                { taskName: task.name, invalidInterval: task.intervalMs }
+            );
+            task.intervalMs = 60000; // Default to 1 minute
+        }
+
         const now = Date.now();
         const nextRunAt = new Date(now + (task.intervalMs ?? 0));
         
-        await this.orchestratorDb.createTask(
-            userId,
-            task.name,
-            {
-                request: task.name,
-                task_data: JSON.stringify(task.data),
-            },
-            nextRunAt,
-            task.intervalMs
-        );
+        try {
+            await this.orchestratorDb.createTask(
+                userId,
+                task.name,
+                {
+                    request: task.name,
+                    task_data: JSON.stringify(task.data),
+                },
+                nextRunAt,
+                task.intervalMs
+            );
+        } catch (error) {
+            this.logger.error(
+                "Orchestrator.runAutonomousFlow",
+                "Failed to create task",
+                { taskName: task.name, error }
+            );
+        }
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Possibly schedule any tasks in the DB
if (processed.updateTasks) {
for (const task of processed.updateTasks) {
await this.scheduleTaskInDb(
userId,
task.name,
task.data,
task.intervalMs
const now = Date.now();
const nextRunAt = new Date(
now + (task.intervalMs ?? 0)
);
this.logger.debug(
this.logger.info(
"Orchestrator.runAutonomousFlow",
"Scheduled task in DB",
`Scheduling task ${task.name}`,
{ nextRunAt, intervalMs: task.intervalMs }
);
await this.orchestratorDb.createTask(
userId,
task.name,
{
task,
}
request: task.name,
task_data: JSON.stringify(task.data),
},
nextRunAt,
task.intervalMs
);
// Possibly schedule any tasks in the DB
if (processed.updateTasks) {
for (const task of processed.updateTasks) {
if (!task.name) {
this.logger.warn(
"Orchestrator.runAutonomousFlow",
"Skipping task with missing name"
);
continue;
}
if (task.intervalMs < 0) {
this.logger.warn(
"Orchestrator.runAutonomousFlow",
"Invalid interval, using default",
{ taskName: task.name, invalidInterval: task.intervalMs }
);
task.intervalMs = 60000; // Default to 1 minute
}
const now = Date.now();
const nextRunAt = new Date(
now + (task.intervalMs ?? 0)
);
this.logger.info(
"Orchestrator.runAutonomousFlow",
`Scheduling task ${task.name}`,
{ nextRunAt, intervalMs: task.intervalMs }
);
try {
await this.orchestratorDb.createTask(
userId,
task.name,
{
request: task.name,
task_data: JSON.stringify(task.data),
},
nextRunAt,
task.intervalMs
);
} catch (error) {
this.logger.error(
"Orchestrator.runAutonomousFlow",
"Failed to create task",
{ taskName: task.name, error }
);
}

@ponderingdemocritus ponderingdemocritus merged commit f558745 into main Jan 30, 2025
9 checks passed
@ponderingdemocritus ponderingdemocritus deleted the abstract branch January 30, 2025 10:42
@coderabbitai coderabbitai bot mentioned this pull request Feb 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant