-
Notifications
You must be signed in to change notification settings - Fork 66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: processor refactor #45
Conversation
WalkthroughThis pull request introduces a significant refactoring of the processor architecture in the core system. The changes involve replacing the Changes
Sequence DiagramsequenceDiagram
participant O as Orchestrator
participant P as MessageProcessor
participant R as Room
O->>P: processContent(content, source)
P->>P: canHandle(content)
alt Content can be handled
P->>R: Process content
P-->>O: Return ProcessedResult
else Content cannot be handled
O-->>O: Log "No suitable processor"
O-->>O: Return null
end
Possibly related PRs
Poem
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
packages/core/src/core/processor.ts (1)
Line range hint
140-165
: Prompt construction is thorough but watch for potential injection.
If user input is included without sanitization, malicious content might alter your prompt in unexpected ways. Consider sanitizing or bounding user-provided content to prevent injection attempts.-const contentStr = ... +const sanitizedContent = sanitizeContent(contentStr); +const prompt = ...
🧹 Nitpick comments (11)
packages/core/src/core/processor.ts (5)
13-50
: Solid approach to an abstract BaseProcessor.
By definingBaseProcessor
withcanHandle()
andprocess()
as abstract methods, you've cleanly enforced a contract for specialized processors. This design fosters extensibility when multiple processors are added. Consider adding inline documentation or test coverage to ensure all subclasses meet the contract consistently.
52-70
: MessageProcessor extends BaseProcessor effectively.
The constructor and call tosuper()
are correct. However, consider improving theTODO: fix this
placeholder incanHandle()
so it doesn’t always returntrue
. If it's truly universal, rename or remove theTODO
.
75-109
: Consider removing or refactoring the commented-out logic.
Large blocks of commented-out code can confuse future maintainers. If you plan on reviving it, track it in version control or place it behind a feature flag. Otherwise, remove it and rely on source history.
280-289
: Robust error handling on LLM processing failures.
Returning a partialProcessedResult
ensures the rest of the system can continue without throwing unhandled exceptions. Consider logging more context if repeated failures occur or add metrics for observability.
298-364
: Remove or convert commented-out logic into a plugin/feature-flag system.
Leaving large chunks of commented code can create confusion. Store any transitional features in version control or place behind a toggle if you plan to reintroduce them soon.packages/core/src/core/orchestrator.ts (3)
49-53
: Populate processor map by name while ensuring uniqueness.
Consider validating that the name is non-duplicate or raising an error if a name collision occurs.
71-119
: processContent method is well-structured and easy to follow.
- It confirms room existence.
- It locates the first capable processor.
- If none is found, logs a fallback.
Consider letting the user specify which processor to apply or fallback to a default for more fine-grained control.
Line range hint
304-364
: Multi-processor workflow in runAutonomousFlow is well-implemented.
Looping until queue is empty ensures that new content from actions is fully processed. Consider whether you might need concurrency control in high-throughput environments.examples/example-api.ts (1)
63-63
: LGTM! Architecture improvement.The change to accept an array of processors (
[processor]
) aligns with the PR objective of enabling multiple processors. This enhancement provides better extensibility and flexibility in processing pipelines.packages/core/src/core/utils.ts (2)
255-264
: Add configuration options for time thresholds.The function has hardcoded time thresholds. Consider making these configurable to support different use cases.
+interface TimeThresholds { + veryRecent: number; // hours + recent: number; + thisWeek: number; + thisMonth: number; +} + +const DEFAULT_TIME_THRESHOLDS: TimeThresholds = { + veryRecent: 24, + recent: 72, + thisWeek: 168, + thisMonth: 720, +}; + -export function getTimeContext(timestamp: Date): string { +export function getTimeContext( + timestamp: Date, + thresholds: TimeThresholds = DEFAULT_TIME_THRESHOLDS +): string { const now = new Date(); const hoursDiff = (now.getTime() - timestamp.getTime()) / (1000 * 60 * 60); - if (hoursDiff < 24) return "very_recent"; - if (hoursDiff < 72) return "recent"; - if (hoursDiff < 168) return "this_week"; - if (hoursDiff < 720) return "this_month"; + if (hoursDiff < thresholds.veryRecent) return "very_recent"; + if (hoursDiff < thresholds.recent) return "recent"; + if (hoursDiff < thresholds.thisWeek) return "this_week"; + if (hoursDiff < thresholds.thisMonth) return "this_month"; return "older"; }
299-303
: Consider adding type information for better type safety.The
relevantData
object could benefit from explicit type information to prevent potential runtime errors.+interface RelevantData { + content: any; + type?: string; +} + -const relevantData = { +const relevantData: RelevantData = { content: item.content || item, type: item.type, };
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
examples/example-api.ts
(3 hunks)examples/example-twitter.ts
(3 hunks)packages/core/src/core/orchestrator.ts
(6 hunks)packages/core/src/core/processor.ts
(4 hunks)packages/core/src/core/utils.ts
(1 hunks)
🔇 Additional comments (13)
packages/core/src/core/processor.ts (6)
4-5
: Great use of typed imports.
No concerns here; everything is well-aligned with TypeScript best practices.
29-32
: Good addition of a self-identifying getName() method.
This method simplifies debugging and ensures each processor can be referenced by name.
34-38
: Abstract canHandle method is well-defined.
It correctly enforces explicit capability checks in subclasses.
39-50
: Abstract process method sets a clear contract.
It encourages uniform handling of content while letting subclasses implement specialized logic.
122-133
: Well-structured schema generation for outputs and actions.
This approach, mapping over theioContext
handlers, is concise. If there are many handlers, watch for potential performance overhead when stringifying large schemas. Otherwise, this is fine.
261-265
: Appropriate fallback usage of optional chaining.
This ensures you gracefully handle cases where no outputs are available.packages/core/src/core/orchestrator.ts (3)
4-4
: Good usage of typed imports for new processor types.
Encourages a clear boundary between the orchestrator and its processors.
24-24
: Use of Map for processors is well-structured.
This data structure allows for quick lookups by processor name and supports dynamic additions or removals if needed in the future.
44-44
: Constructor signature updated to support multiple processors.
This elegantly sets the stage for a genuinely extensible system.examples/example-twitter.ts (3)
15-15
: Replacing Processor import with MessageProcessor is consistent with your new design.
Shows a clear adoption of the extendedBaseProcessor
.
44-44
: Initialization of MessageProcessor with the default character is straightforward.
Consider validating that the model and character traits are compatible to avoid unexpected LLM behavior.
65-65
: Passing an array of processors is a future-proof design choice.
Even if there's only one active processor, the array-based approach positions the system for multi-processor scaling.examples/example-api.ts (1)
42-45
: Verify the removal of vectorDb parameter.The
MessageProcessor
no longer requiresvectorDb
. Please confirm that vector database functionality is handled appropriately in the new architecture.✅ Verification successful
Vector database functionality has been properly decoupled
The removal of
vectorDb
fromMessageProcessor
is part of an intentional architectural improvement. The vector database functionality is now handled by a dedicatedVectorDB
service, which is directly used by components that need it, following better separation of concerns.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check if vectorDb is used elsewhere in MessageProcessor or its parent classes # Look for vectorDb usage in processor-related files rg -A 5 "vectorDb" packages/core/src/core/processor.tsLength of output: 746
Script:
#!/bin/bash # Search for vector database related files and classes fd "vector|memory" --type f --exec rg -l "class.*(?i)(vector|memory)" {} \; # Search for new implementations of similar functionality ast-grep --pattern 'class $_ implements $_ { $$$ findSimilar$_($$$) { $$$ } $$$ }'Length of output: 215
Script:
#!/bin/bash # Check the vector-db.ts implementation and its usage cat packages/core/src/core/vector-db.ts # Look for where vector-db is being used rg -l "import.*vector-db"Length of output: 56082
export function generateContentId(content: any): string { | ||
try { | ||
// 1. Special handling for Twitter mentions/tweets array | ||
if (Array.isArray(content) && content[0]?.type === "tweet") { | ||
// Use the newest tweet's ID as the marker | ||
const newestTweet = content[0]; | ||
return `tweet_batch_${newestTweet.metadata.tweetId}`; | ||
} | ||
|
||
// 2. Single tweet handling | ||
if (content?.type === "tweet") { | ||
return `tweet_${content.metadata.tweetId}`; | ||
} | ||
|
||
// 3. If it's a plain string, fallback to hashing the string but also add a small random/time factor. | ||
// This ensures repeated user messages with the same text won't collapse to the same ID. | ||
if (typeof content === "string") { | ||
// Add a short suffix: e.g. timestamp + small random | ||
const suffix = `${Date.now()}_${Math.random() | ||
.toString(36) | ||
.slice(2, 6)}`; | ||
return `content_${hashString(content)}_${suffix}`; | ||
} | ||
|
||
// 4. For arrays (non-tweets), attempt to find known IDs or hash the items | ||
if (Array.isArray(content)) { | ||
const ids = content.map((item) => { | ||
// Check if there's an explicit .id | ||
if (item.id) return item.id; | ||
// Check for item.metadata?.id | ||
if (item.metadata?.id) return item.metadata.id; | ||
|
||
// Otherwise, hash the item | ||
const relevantData = { | ||
content: item.content || item, | ||
type: item.type, | ||
}; | ||
return hashString(JSON.stringify(relevantData)); | ||
}); | ||
|
||
// Join them, but also add a short suffix so different array orders don't collide | ||
const suffix = `${Date.now()}_${Math.random() | ||
.toString(36) | ||
.slice(2, 6)}`; | ||
return `array_${ids.join("_").slice(0, 100)}_${suffix}`; | ||
} | ||
|
||
// 5. For single objects, check .id first | ||
if (content.id) { | ||
return `obj_${content.id}`; | ||
} | ||
|
||
// 6. Special handling for "internal_thought" or "consciousness" | ||
if ( | ||
content.type === "internal_thought" || | ||
content.source === "consciousness" | ||
) { | ||
const thoughtData = { | ||
content: content.content, | ||
timestamp: content.timestamp, | ||
}; | ||
return `thought_${hashString(JSON.stringify(thoughtData))}`; | ||
} | ||
|
||
// 7. Then check if there's a metadata.id | ||
if (content.metadata?.id) { | ||
return `obj_${content.metadata.id}`; | ||
} | ||
|
||
// 8. Or any metadata key ending with 'id' | ||
if (content.metadata) { | ||
for (const [key, value] of Object.entries(content.metadata)) { | ||
if (key.toLowerCase().endsWith("id") && value) { | ||
return `obj_${value}`; | ||
} | ||
} | ||
} | ||
|
||
// 9. Finally, fallback to hashing the object, | ||
// but add a random/time suffix so repeated content isn't auto-deduplicated. | ||
const relevantData = { | ||
content: content.content || content, | ||
type: content.type, | ||
// Include source if available | ||
...(content.source && | ||
content.source !== "consciousness" && { | ||
source: content.source, | ||
}), | ||
}; | ||
const baseHash = hashString(JSON.stringify(relevantData)); | ||
const suffix = `${Date.now()}_${Math.random().toString(36).slice(2, 6)}`; | ||
return `obj_${baseHash}_${suffix}`; | ||
} catch (error) { | ||
return `fallback_${Date.now()}`; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Refactor long function into smaller, focused functions.
The generateContentId
function is quite long (95+ lines) and handles multiple content types. Consider breaking it down into smaller, specialized functions for better maintainability.
+function generateTweetId(content: any, isBatch: boolean = false): string {
+ if (isBatch) {
+ const newestTweet = content[0];
+ return `tweet_batch_${newestTweet.metadata.tweetId}`;
+ }
+ return `tweet_${content.metadata.tweetId}`;
+}
+
+function generateStringId(content: string): string {
+ const suffix = `${Date.now()}_${Math.random().toString(36).slice(2, 6)}`;
+ return `content_${hashString(content)}_${suffix}`;
+}
+
+function generateArrayId(content: any[]): string {
+ const ids = content.map((item) => {
+ if (item.id) return item.id;
+ if (item.metadata?.id) return item.metadata.id;
+ const relevantData = {
+ content: item.content || item,
+ type: item.type,
+ };
+ return hashString(JSON.stringify(relevantData));
+ });
+ const suffix = `${Date.now()}_${Math.random().toString(36).slice(2, 6)}`;
+ return `array_${ids.join("_").slice(0, 100)}_${suffix}`;
+}
+
+function generateThoughtId(content: any): string {
+ const thoughtData = {
+ content: content.content,
+ timestamp: content.timestamp,
+ };
+ return `thought_${hashString(JSON.stringify(thoughtData))}`;
+}
+
export function generateContentId(content: any): string {
try {
- // 1. Special handling for Twitter mentions/tweets array
if (Array.isArray(content) && content[0]?.type === "tweet") {
- const newestTweet = content[0];
- return `tweet_batch_${newestTweet.metadata.tweetId}`;
+ return generateTweetId(content, true);
}
- // 2. Single tweet handling
if (content?.type === "tweet") {
- return `tweet_${content.metadata.tweetId}`;
+ return generateTweetId(content);
}
- // 3. If it's a plain string...
if (typeof content === "string") {
- const suffix = `${Date.now()}_${Math.random().toString(36).slice(2, 6)}`;
- return `content_${hashString(content)}_${suffix}`;
+ return generateStringId(content);
}
- // 4. For arrays (non-tweets)...
if (Array.isArray(content)) {
- const ids = content.map((item) => {
- if (item.id) return item.id;
- if (item.metadata?.id) return item.metadata.id;
- const relevantData = {
- content: item.content || item,
- type: item.type,
- };
- return hashString(JSON.stringify(relevantData));
- });
- const suffix = `${Date.now()}_${Math.random().toString(36).slice(2, 6)}`;
- return `array_${ids.join("_").slice(0, 100)}_${suffix}`;
+ return generateArrayId(content);
}
if (content.id) {
return `obj_${content.id}`;
}
- // 6. Special handling for "internal_thought" or "consciousness"
if (
content.type === "internal_thought" ||
content.source === "consciousness"
) {
- const thoughtData = {
- content: content.content,
- timestamp: content.timestamp,
- };
- return `thought_${hashString(JSON.stringify(thoughtData))}`;
+ return generateThoughtId(content);
}
// Rest of the function remains the same...
Also, consider adding JSDoc documentation to improve code maintainability:
/**
* Generates a unique content identifier based on the content type and properties.
*
* @param content - The content to generate an ID for. Can be a tweet, string, array, or object.
* @returns A unique string identifier for the content.
*
* @example
* ```typescript
* // For a tweet
* const tweetId = generateContentId({ type: 'tweet', metadata: { tweetId: '123' } });
* // Returns: 'tweet_123'
*
* // For a string
* const stringId = generateContentId('hello');
* // Returns: 'content_<hash>_<timestamp>_<random>'
* ```
*/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🔭 Outside diff range comments (1)
packages/core/src/core/processor.ts (1)
Line range hint
74-264
: Consider refactoring for better maintainability.The process method is quite long and handles multiple responsibilities. Consider:
- Extract schema construction to separate methods
- Move prompt template to a separate file
- Extract LLM response handling logic
+private constructOutputSchema(outputs: IOHandler[]): string { + return outputs + .map((handler) => + `${handler.name}: ${JSON.stringify(zodToJsonSchema(handler.schema, handler.name))}`) + .join("\n"); +} +private constructPrompt(content: any, otherContext: string, ioContext?: { + availableOutputs: IOHandler[]; + availableActions: IOHandler[]; +}): string { + const contentStr = typeof content === "string" ? content : JSON.stringify(content); + const outputsSchemaPart = ioContext?.availableOutputs + ? this.constructOutputSchema(ioContext.availableOutputs) + : ""; + const actionsSchemaPart = ioContext?.availableActions + ? this.constructOutputSchema(ioContext.availableActions) + : ""; + + return `Analyze the following content...`; // Move template to separate file +} async process( content: any, otherContext: string, ioContext?: { availableOutputs: IOHandler[]; availableActions: IOHandler[]; } ): Promise<ProcessedResult> { this.logger.debug("Processor.process", "Processing content", { content, }); - const contentStr = typeof content === "string" ? content : JSON.stringify(content); - // ... rest of the implementation + const prompt = this.constructPrompt(content, otherContext, ioContext);
🧹 Nitpick comments (6)
packages/core/src/core/room-manager.ts (1)
229-251
: LGTM! Consider adding input validation and documentation.The implementation looks good with proper error handling and data mapping. Consider these minor improvements:
- Add JSDoc documentation describing the method's purpose and parameters
- Add input validation for negative limit values
+/** + * Retrieves memories from a specific room. + * @param roomId - The ID of the room to fetch memories from + * @param limit - Optional maximum number of memories to retrieve + * @returns Promise resolving to an array of Memory objects + */ public async getMemoriesFromRoom( roomId: string, limit?: number ): Promise<Memory[]> { + if (limit !== undefined && limit < 0) { + throw new Error("Limit cannot be negative"); + } if (!this.vectorDb) { throw new Error("VectorDB required for getting memories"); }packages/core/src/core/vector-db.ts (1)
1675-1708
: LGTM! Consider adding sorting and filtering options.The implementation is solid with good error handling and logging. Consider these enhancements:
- Add sorting options (e.g., by timestamp)
- Add metadata filtering capabilities
- Add JSDoc documentation
+/** + * Gets all memories from a specific room's collection. + * @param roomId - The ID of the room to fetch memories from + * @param limit - Optional maximum number of memories to retrieve + * @param options - Optional parameters for sorting and filtering + * @returns Promise resolving to an array of memories with content and metadata + */ public async getMemoriesFromRoom( roomId: string, - limit?: number + limit?: number, + options?: { + sortBy?: { field: string; order: 'asc' | 'desc' }; + where?: Record<string, any>; + } ): Promise<{ content: string; metadata?: Record<string, any> }[]> { try { const collection = await this.getCollectionForRoom(roomId); // Get all documents from the collection, with optional limit const results = await collection.get({ limit, + where: options?.where, include: ["documents", "metadatas"] as IncludeEnum[], }); if (!results.ids.length) { return []; } + let memories = results.ids.map((_, idx) => ({ + content: results.documents[idx] || "", + metadata: results.metadatas?.[idx] || {}, + })); + + // Apply sorting if specified + if (options?.sortBy) { + memories.sort((a, b) => { + const aVal = a.metadata?.[options.sortBy!.field]; + const bVal = b.metadata?.[options.sortBy!.field]; + return options.sortBy!.order === 'asc' + ? (aVal > bVal ? 1 : -1) + : (aVal < bVal ? 1 : -1); + }); + } + + return memories;examples/example-twitter.ts (1)
106-113
: LGTM! Consider adding type safety for metadata.The return structure improvements add valuable context. Consider:
- Define a specific type for the mention metadata instead of using
any
- Update the schema to validate the metadata structure
+interface TwitterMentionMetadata { + conversationId: string; + tweetId: string; + username: string; + timestamp: string; +} return mentions.map((mention) => ({ type: "tweet", room: mention.metadata.conversationId, messageId: mention.metadata.tweetId, user: mention.metadata.username, content: mention.content, - metadata: mention, + metadata: mention as TwitterMentionMetadata, })); schema: z.object({ type: z.string(), room: z.string(), user: z.string(), content: z.string(), - metadata: z.record(z.any()), + metadata: z.object({ + conversationId: z.string(), + tweetId: z.string(), + username: z.string(), + timestamp: z.string(), + }), }),Also applies to: 117-118
packages/core/src/core/processor.ts (1)
12-49
: LGTM! Consider enhancing error handling and documentation.The abstract class provides a good foundation. Consider:
- Add JSDoc for abstract methods
- Add error handling guidelines for implementations
export abstract class BaseProcessor { protected logger: Logger; constructor( protected metadata: { name: string; description: string }, protected loggerLevel: LogLevel = LogLevel.ERROR, protected character: Character, protected llmClient: LLMClient ) { this.logger = new Logger({ level: loggerLevel, enableColors: true, enableTimestamp: true, }); } public getName(): string { return this.metadata.name; } + /** + * Determines if this processor can handle the given content. + * @param content - The content to check + * @returns true if the processor can handle the content, false otherwise + * @throws {Error} if content validation fails + */ public abstract canHandle(content: any): boolean; + /** + * Processes the given content and returns a result. + * @param content - The content to process + * @param otherContext - Additional context for processing + * @param ioContext - Optional context with available handlers + * @returns Promise resolving to the processed result + * @throws {Error} if processing fails or validation errors occur + */ public abstract process( content: any, otherContext: string, ioContext?: { availableOutputs: IOHandler[]; availableActions: IOHandler[]; } ): Promise<ProcessedResult>;packages/core/src/core/orchestrator.ts (2)
4-4
: Remove unused importMessageProcessor
.The
MessageProcessor
type is imported but never used in this file.-import type { BaseProcessor, MessageProcessor } from "./processor"; +import type { BaseProcessor } from "./processor";
254-262
: Improve processor selection and error handling.Two suggestions for improvement:
- Extract processor finding logic into a reusable method
- Uncomment the warning log for better debugging when no processor is found
-// 2) Find the first processor that can handle it -const processor = Array.from(this.processors.values()).find((p) => - p.canHandle(data) -); +// 2) Find the first processor that can handle it +const processor = this.findSuitableProcessor(data); +private findSuitableProcessor(data: unknown): BaseProcessor | undefined { + return Array.from(this.processors.values()).find((p) => + p.canHandle(data) + ); +} if (!processor) { - // this.logger.warn("No processor found for data", { source }); + this.logger.warn("No processor found for data", { source }); continue; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
examples/example-twitter.ts
(4 hunks)packages/core/src/core/orchestrator.ts
(6 hunks)packages/core/src/core/processor.ts
(3 hunks)packages/core/src/core/room-manager.ts
(1 hunks)packages/core/src/core/vector-db.ts
(1 hunks)
🔇 Additional comments (1)
examples/example-twitter.ts (1)
44-47
: LGTM! Processor initialization updated correctly.The changes properly implement the new processor architecture:
- MessageProcessor instantiated with required parameters
- Processor passed as array to Orchestrator
Also applies to: 65-65
packages/core/src/core/processor.ts
Outdated
// TODO: fix this | ||
public canHandle(content: any): boolean { | ||
return true; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix the canHandle implementation.
The current implementation with return true
and a TODO comment needs to be addressed:
- It bypasses proper content validation
- Could lead to processing invalid content
- // TODO: fix this
- public canHandle(content: any): boolean {
- return true;
- }
+ public canHandle(content: any): boolean {
+ if (!content) {
+ return false;
+ }
+
+ // Validate content is string or has string representation
+ return (
+ typeof content === "string" ||
+ (typeof content === "object" && content.toString !== Object.prototype.toString)
+ );
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
// TODO: fix this | |
public canHandle(content: any): boolean { | |
return true; | |
} | |
public canHandle(content: any): boolean { | |
if (!content) { | |
return false; | |
} | |
// Validate content is string or has string representation | |
return ( | |
typeof content === "string" || | |
(typeof content === "object" && content.toString !== Object.prototype.toString) | |
); | |
} |
// 4) Save to memory (like you do in processInputTask) | ||
await this.roomManager.addMemory( | ||
room.id, | ||
JSON.stringify(processed.content), | ||
JSON.stringify(processed?.content), | ||
{ | ||
source, | ||
type: "input", | ||
...processed.metadata, | ||
...processed.enrichedContext, | ||
...processed?.metadata, | ||
...processed?.enrichedContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle potential JSON serialization errors.
The JSON.stringify
call could throw an error if the content contains circular references or undefined values.
await this.roomManager.addMemory(
room.id,
- JSON.stringify(processed?.content),
+ JSON.stringify(processed?.content, (_, value) =>
+ typeof value === 'undefined' ? null : value
+ ),
{
source,
type: "input",
...processed?.metadata,
...processed?.enrichedContext,
}
);
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
// 4) Save to memory (like you do in processInputTask) | |
await this.roomManager.addMemory( | |
room.id, | |
JSON.stringify(processed.content), | |
JSON.stringify(processed?.content), | |
{ | |
source, | |
type: "input", | |
...processed.metadata, | |
...processed.enrichedContext, | |
...processed?.metadata, | |
...processed?.enrichedContext, | |
// 4) Save to memory (like you do in processInputTask) | |
await this.roomManager.addMemory( | |
room.id, | |
JSON.stringify(processed?.content, (_, value) => | |
typeof value === 'undefined' ? null : value | |
), | |
{ | |
source, | |
type: "input", | |
...processed?.metadata, | |
...processed?.enrichedContext, |
this.processors = new Map( | ||
processors.map((p) => { | ||
return [p.getName(), p]; | ||
}) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add validation for duplicate processor names.
When initializing the processors Map, there's no validation to ensure processor names are unique. If two processors return the same name, the latter will silently override the former.
this.processors = new Map(
- processors.map((p) => {
- return [p.getName(), p];
- })
+ processors.map((p) => {
+ const name = p.getName();
+ if (this.processors.has(name)) {
+ throw new Error(`Duplicate processor name: ${name}`);
+ }
+ return [name, p];
+ })
);
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
this.processors = new Map( | |
processors.map((p) => { | |
return [p.getName(), p]; | |
}) | |
); | |
const processorNames = new Set(); | |
processors.forEach((p) => { | |
const name = p.getName(); | |
if (processorNames.has(name)) { | |
throw new Error(`Duplicate processor name: ${name}`); | |
} | |
processorNames.add(name); | |
}); | |
this.processors = new Map( | |
processors.map((p) => { | |
return [p.getName(), p]; | |
}) | |
); |
public async processContent( | ||
content: any, | ||
source: string | ||
): Promise<ProcessedResult | null> { | ||
let memories: Memory[] = []; | ||
|
||
if (content.room) { | ||
const room = await this.roomManager.ensureRoom( | ||
content.room, | ||
source | ||
); | ||
memories = await this.roomManager.getMemoriesFromRoom(room.id); | ||
|
||
this.logger.debug( | ||
"Orchestrator.processContent", | ||
"Processing content with context", | ||
{ | ||
content, | ||
source, | ||
roomId: room.id, | ||
relevantMemories: memories, | ||
} | ||
); | ||
} | ||
|
||
const processor = Array.from(this.processors.values()).find((p) => | ||
p.canHandle(content) | ||
); | ||
|
||
if (!processor) { | ||
this.logger.debug( | ||
"Orchestrator.processContent", | ||
"No suitable processor found for content", | ||
{ content } | ||
); | ||
return null; | ||
} | ||
|
||
const availableOutputs = Array.from(this.ioHandlers.values()).filter( | ||
(h) => h.role === HandlerRole.OUTPUT | ||
); | ||
|
||
const availableActions = Array.from(this.ioHandlers.values()).filter( | ||
(h) => h.role === HandlerRole.ACTION | ||
); | ||
|
||
return processor.process(content, JSON.stringify(memories), { | ||
availableOutputs, | ||
availableActions, | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve type safety and performance of processContent method.
Several suggestions for improvement:
- Add proper typing for the content parameter
- Reuse the processor finding logic
- Restructure for early returns
- Memoize handler filtering
+private availableOutputHandlers = new Map<HandlerRole, IOHandler[]>();
+
+private getHandlersByRole(role: HandlerRole): IOHandler[] {
+ if (!this.availableOutputHandlers.has(role)) {
+ this.availableOutputHandlers.set(
+ role,
+ Array.from(this.ioHandlers.values()).filter((h) => h.role === role)
+ );
+ }
+ return this.availableOutputHandlers.get(role) || [];
+}
+
-public async processContent(
+public async processContent<T extends Record<string, unknown>>(
- content: any,
+ content: T,
source: string
): Promise<ProcessedResult | null> {
let memories: Memory[] = [];
-
- if (content.room) {
- const room = await this.roomManager.ensureRoom(
- content.room,
- source
- );
- memories = await this.roomManager.getMemoriesFromRoom(room.id);
-
- this.logger.debug(
- "Orchestrator.processContent",
- "Processing content with context",
- {
- content,
- source,
- roomId: room.id,
- relevantMemories: memories,
- }
- );
- }
-
- const processor = Array.from(this.processors.values()).find((p) =>
- p.canHandle(content)
- );
+
+ if ('room' in content) {
+ const room = await this.roomManager.ensureRoom(content.room as string, source);
+ memories = await this.roomManager.getMemoriesFromRoom(room.id);
+
+ this.logger.debug("Orchestrator.processContent", "Processing content with context", {
+ content,
+ source,
+ roomId: room.id,
+ relevantMemories: memories,
+ });
+ }
+
+ const processor = this.findSuitableProcessor(content);
if (!processor) {
this.logger.debug(
@@ -569,7 +569,7 @@
return null;
}
- const availableOutputs = Array.from(this.ioHandlers.values()).filter(
- (h) => h.role === HandlerRole.OUTPUT
- );
-
- const availableActions = Array.from(this.ioHandlers.values()).filter(
- (h) => h.role === HandlerRole.ACTION
- );
+ const availableOutputs = this.getHandlersByRole(HandlerRole.OUTPUT);
+ const availableActions = this.getHandlersByRole(HandlerRole.ACTION);
return processor.process(content, JSON.stringify(memories), {
availableOutputs,
availableActions,
});
}
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (10)
packages/core/src/core/processors/message-processor.ts (3)
12-28
: Consider making the processor name configurable.
Right now, the constructor hardcodes"message"
as the processor name in its metadata. If you plan to add more message-like processors in the future, you might benefit from passing in a configurable name to avoid confusion.- name: "message", + name: processorName ?? "message",
35-105
: Extract prompt-building into a separate method.
Theprocess
method constructs a lengthy prompt inline, making it cumbersome to maintain. Consider moving this logic into a dedicated helper inside this class or a utility file.async process(...) { - const prompt = `Analyze the following content...` // many lines + const prompt = this.buildPrompt(contentStr, otherContext, ioContext); ... } private buildPrompt(contentStr: string, otherContext: string, ioContext?: ...): string { return `Analyze the following content...`; }
107-225
: Include error details or logs in the fallback object.
Currently, the catch block sets a generic fallback with minimal metadata. Including at least the error message or a truncated stack trace could help downstream debugging.- return { - content, - metadata: {}, - ... - }; + return { + content, + metadata: { error: String(error) }, + ... + };packages/core/src/core/processor.ts (2)
13-28
: Emphasize usage guidelines for metadata.
Currently, metadata is flexible but might need constraints. If future processors rely on certain structure, consider validating or partially typing it within the constructor.
50-68
: Use stronger types for the content parameter.
BothcanHandle
andprocess
acceptcontent: any
, which can lead to runtime errors if the data structure differs from expectations. Consider introducing a generic or union type if feasible.packages/core/src/core/room-manager.ts (2)
253-274
: Fix incorrect method name in error log.The error log message uses
markContentAsProcessed
instead ofhasProcessedContentInRoom
.Apply this diff to fix the error log message:
- "RoomManager.markContentAsProcessed", + "RoomManager.hasProcessedContentInRoom",
276-301
: Add return type annotation for better type safety.The method's return type can be explicitly annotated for better type safety and documentation.
Apply this diff to add the return type annotation:
- public async markContentAsProcessed( + public async markContentAsProcessed( contentId: string, roomId: string - ): Promise<boolean> { + ): Promise<boolean> {packages/core/src/core/orchestrator.ts (2)
241-335
: Consider implementing rate limiting for the processing queue.The queue processing loop could benefit from rate limiting to prevent overwhelming the system when processing large arrays of data.
Apply this diff to add rate limiting:
while (queue.length > 0) { const { data, source } = queue.shift()!; + + // Add rate limiting + if (queue.length > 0) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } const processedResults = await this.processContent(data, source);
541-563
: Consider extracting the delay value to a configurable constant.The hardcoded delay value of 5000ms in the array processing loop should be configurable.
Apply this diff to make the delay configurable:
+private static readonly BATCH_PROCESSING_DELAY_MS = 5000; + public async processContent( content: any, source: string ): Promise<ProcessedResult[]> { if (Array.isArray(content)) { const allResults: ProcessedResult[] = []; for (const item of content) { // (Optional) Delay to throttle - await new Promise((resolve) => setTimeout(resolve, 5000)); + await new Promise((resolve) => + setTimeout(resolve, Orchestrator.BATCH_PROCESSING_DELAY_MS) + );packages/core/src/core/vector-db.ts (1)
1672-1708
: Consider adding sorting options for memory retrieval.The
getMemoriesFromRoom
method could benefit from sorting options to allow retrieval of memories in chronological order or by other metadata fields.Apply this diff to add sorting options:
public async getMemoriesFromRoom( roomId: string, - limit?: number + limit?: number, + sortBy?: { field: string; order: 'asc' | 'desc' } ): Promise<{ content: string; metadata?: Record<string, any> }[]> { try { const collection = await this.getCollectionForRoom(roomId); + const orderBy = sortBy ? { [sortBy.field]: sortBy.order } : undefined; const results = await collection.get({ limit, + orderBy, include: ["documents", "metadatas"] as IncludeEnum[], });
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
examples/example-api.ts
(3 hunks)examples/example-twitter.ts
(4 hunks)packages/core/src/core/orchestrator.ts
(5 hunks)packages/core/src/core/processor.ts
(1 hunks)packages/core/src/core/processors/message-processor.ts
(1 hunks)packages/core/src/core/room-manager.ts
(1 hunks)packages/core/src/core/vector-db.ts
(4 hunks)
🔇 Additional comments (13)
packages/core/src/core/processors/message-processor.ts (1)
30-33
: ** Fix the canHandle implementation.**
This comment was raised in a previous review. The currentcanHandle
method always returnstrue
and contains a TODO, which can cause unintended processing of invalid or empty content.examples/example-twitter.ts (5)
15-15
: Well-adapted replacement of the Processor class.
Switching fromProcessor
toMessageProcessor
aligns with the new abstract architecture and clarifies handling of message content.
44-44
: Processor instantiation looks correct.
PassingllmClient
,defaultCharacter
, andloglevel
appears consistent with the newMessageProcessor
constructor signature.
65-65
: Multiple processor support is well-utilized.
Providing an array of processors allows for more extensible scenarios, such as adding specialized processors later.
106-113
: Potential user data exposure in returned metadata.
Returning the entire mention metadata may leak personally identifiable information if it contains sensitive fields. Ensure you filter or sanitize any fields that shouldn't be exposed downstream.
117-118
: Room and user fields are now explicitly required.
This schema change makes sense for better structuring and referencing tweets in conversation contexts.examples/example-api.ts (3)
14-14
: LGTM!The import statement has been correctly updated to use the new
MessageProcessor
class from the refactored processor module.
63-63
: LGTM!The orchestrator initialization has been correctly updated to pass the processor as an array, aligning with the new multi-processor architecture.
42-45
: Verify the removal of vectorDb parameter.The processor instantiation has been simplified, but the removal of the
vectorDb
parameter needs verification to ensure it doesn't affect the functionality.Run the following script to verify the parameter usage:
✅ Verification successful
The removal of
vectorDb
parameter is safe ✅The
MessageProcessor
class implementation shows no direct dependency onvectorDb
. Vector storage operations are properly handled by theRoomManager
andOrchestrator
components, maintaining clean separation of concerns.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check if vectorDb is used in MessageProcessor or its parent classes # Test: Search for vectorDb usage in processor-related files ast-grep --pattern $'class $_ { $$$ constructor($params) { $$$ this.vectorDb $$$ } $$$ }'Length of output: 110
Script:
#!/bin/bash # Find MessageProcessor implementation and check for vectorDb usage echo "=== Finding MessageProcessor files ===" fd -e ts -e js MessageProcessor echo -e "\n=== Checking for vectorDb usage ===" rg -l "vectorDb" --type ts --type js echo -e "\n=== Checking MessageProcessor class implementation ===" ast-grep --pattern 'class MessageProcessor { $$$ }' echo -e "\n=== Checking for vectorDb imports ===" rg "import.*vectorDb" --type ts --type jsLength of output: 690
Script:
#!/bin/bash # Show context around vectorDb usage and MessageProcessor implementation echo "=== VectorDb usage context ===" rg "vectorDb" -B 2 -A 2 --type ts --type js echo -e "\n=== MessageProcessor implementation ===" rg "class MessageProcessor" -B 2 -A 10 --type ts --type js echo -e "\n=== Checking for vectorDb method calls or property access ===" ast-grep --pattern 'vectorDb.$_'Length of output: 10938
packages/core/src/core/room-manager.ts (1)
229-251
: LGTM!The
getMemoriesFromRoom
method is well-implemented with proper error handling, null checks, and memory object formatting.packages/core/src/core/orchestrator.ts (2)
49-53
: Add validation for duplicate processor names.When initializing the processors Map, there's no validation to ensure processor names are unique. If two processors return the same name, the latter will silently override the former.
639-647
: Handle potential JSON serialization errors.The
JSON.stringify
call could throw an error if the content contains circular references or undefined values.packages/core/src/core/vector-db.ts (1)
Line range hint
1594-1671
: LGTM!The methods have been correctly made public with proper error handling, logging, and documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
packages/core/src/core/index.ts (1)
22-40
: Consider maintaining consistent alphabetical ordering of exports.While most exports follow alphabetical order, there are some inconsistencies. Consider reordering for better maintainability:
export { BaseProcessor, Chains, - ChainOfThought, ChromaVectorDB, Consciousness, defaultCharacter, GoalManager, IO, LLMClient, Logger, Orchestrator, Processors, Providers, Room, RoomManager, StepManager, TaskScheduler, Types, Utils, };
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
packages/core/src/core/index.ts
(2 hunks)packages/core/src/core/processors/index.ts
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- packages/core/src/core/processors/index.ts
🔇 Additional comments (2)
packages/core/src/core/index.ts (2)
5-5
: LGTM! Import changes align with the processor abstraction.The replacement of
Processor
withBaseProcessor
and addition of theProcessors
namespace effectively support the new architecture for multiple processor implementations.Also applies to: 19-19
22-40
: Verify removal of unused exports.The
ChainOfThought
export appears to be unused based on the AI summary which indicates it was meant to be removed.
Summary by CodeRabbit
Release Notes
New Features
MessageProcessor
class for handling messages.Improvements
Technical Updates
Processor
with abstractBaseProcessor
andMessageProcessor
.