Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ponderingdemocritus committed Jan 25, 2025
1 parent 2e3b338 commit 3b7d0a7
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 96 deletions.
11 changes: 10 additions & 1 deletion examples/example-twitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,19 @@ async function main() {
return null;
}

return mentions;
return mentions.map((mention) => ({
type: "tweet",
room: mention.metadata.conversationId,
messageId: mention.metadata.tweetId,
user: mention.metadata.username,
content: mention.content,
metadata: mention,
}));
},
schema: z.object({
type: z.string(),
room: z.string(),
user: z.string(),
content: z.string(),
metadata: z.record(z.any()),
}),
Expand Down
118 changes: 60 additions & 58 deletions packages/core/src/core/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Logger } from "./logger";
import { RoomManager } from "./room-manager";
import { TaskScheduler } from "./task-scheduler";
import type { BaseProcessor, MessageProcessor } from "./processor";
import type { ProcessedResult, VectorDB } from "./types";
import type { Memory, ProcessedResult, VectorDB } from "./types";
import { HandlerRole, LogLevel, type LoggerConfig } from "./types";
import type { IOHandler } from "./types";
import type { ScheduledTaskMongoDb } from "./scheduled-db";
Expand Down Expand Up @@ -68,56 +68,6 @@ export class Orchestrator {
this.startPolling();
}

public async processContent(
content: any,
source: string
): Promise<ProcessedResult | null> {
const room = await this.roomManager.ensureRoom(source, "core");

// 1) Find the first processor that can handle it
const processor = Array.from(this.processors.values()).find((p) =>
p.canHandle(content)
);

const canHandle = processor !== undefined;

if (!canHandle || !processor) {
// No processor found; fallback
console.log("No suitable processor found for content:", content);
return null;
}

const availableOutputs = Array.from(this.ioHandlers.values()).filter(
(h) => h.role === HandlerRole.OUTPUT
);

const availableActions = Array.from(this.ioHandlers.values()).filter(
(h) => h.role === HandlerRole.ACTION
);

// 2) Process
const processedResult = await processor.process(content, room, {
availableOutputs,
availableActions,
});

// 3) If you want to store memory, do it here
// if (!processedResult.alreadyProcessed) {
// await this.roomManager.addMemory(
// room.id,
// JSON.stringify(processedResult.content),
// {
// source,
// type: "input",
// ...processedResult.metadata,
// ...processedResult.enrichedContext,
// }
// );
// }

return processedResult;
}

/**
* Primary method to register any IOHandler (input or output).
* - If it's an input with an interval, schedule it for recurring runs.
Expand Down Expand Up @@ -312,26 +262,26 @@ export class Orchestrator {
}

// 3) Process with the found processor
const processed = await processor.process(data, room);
const processed = await this.processContent(data, source);

// If the processor thinks we've already processed it, we skip
if (processed.alreadyProcessed) {
if (processed?.alreadyProcessed) {
continue;
}

// 4) Save to memory (like you do in processInputTask)
await this.roomManager.addMemory(
room.id,
JSON.stringify(processed.content),
JSON.stringify(processed?.content),
{
source,
type: "input",
...processed.metadata,
...processed.enrichedContext,
...processed?.metadata,
...processed?.enrichedContext,
}
);

if (processed.updateTasks) {
if (processed?.updateTasks) {
for (const task of processed.updateTasks) {
await this.scheduleTaskInDb(
task.name,
Expand All @@ -342,7 +292,7 @@ export class Orchestrator {
}

// 5) For each suggested output, see if it's an action or an output
for (const output of processed.suggestedOutputs) {
for (const output of processed?.suggestedOutputs ?? []) {
const handler = this.ioHandlers.get(output.name);
if (!handler) {
this.logger.warn(
Expand Down Expand Up @@ -583,6 +533,58 @@ export class Orchestrator {
}
}

public async processContent(
content: any,
source: string
): Promise<ProcessedResult | null> {
let memories: Memory[] = [];

if (content.room) {
const room = await this.roomManager.ensureRoom(
content.room,
source
);
memories = await this.roomManager.getMemoriesFromRoom(room.id);

this.logger.debug(
"Orchestrator.processContent",
"Processing content with context",
{
content,
source,
roomId: room.id,
relevantMemories: memories,
}
);
}

const processor = Array.from(this.processors.values()).find((p) =>
p.canHandle(content)
);

if (!processor) {
this.logger.debug(
"Orchestrator.processContent",
"No suitable processor found for content",
{ content }
);
return null;
}

const availableOutputs = Array.from(this.ioHandlers.values()).filter(
(h) => h.role === HandlerRole.OUTPUT
);

const availableActions = Array.from(this.ioHandlers.values()).filter(
(h) => h.role === HandlerRole.ACTION
);

return processor.process(content, JSON.stringify(memories), {
availableOutputs,
availableActions,
});
}

/**
* Stops all scheduled tasks and shuts down the orchestrator.
*/
Expand Down
43 changes: 6 additions & 37 deletions packages/core/src/core/processor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { LLMClient } from "./llm-client";
import { Logger } from "./logger";
import { Room } from "./room";

import type { Character, ProcessedResult, SuggestedOutput } from "./types";
import { LogLevel } from "./types";
Expand Down Expand Up @@ -41,7 +40,7 @@ export abstract class BaseProcessor {
*/
public abstract process(
content: any,
room: Room,
otherContext: string,
ioContext?: {
availableOutputs: IOHandler[];
availableActions: IOHandler[];
Expand Down Expand Up @@ -74,51 +73,19 @@ export class MessageProcessor extends BaseProcessor {

async process(
content: any,
room: Room,
otherContext: string,
ioContext?: {
availableOutputs: IOHandler[];
availableActions: IOHandler[];
}
): Promise<ProcessedResult> {
this.logger.debug("Processor.process", "Processing content", {
content,
roomId: room.id,
});

// const contentId = this.generateContentId(content);

// const hasProcessed = await this.hasProcessedContent(contentId, room);

// if (hasProcessed) {
// return {
// content,
// metadata: {},
// enrichedContext: {
// timeContext: getTimeContext(new Date()),
// summary: "",
// topics: [],
// relatedMemories: [],
// sentiment: "neutral",
// entities: [],
// intent: "unknown",
// availableOutputs: Array.from(this.ioHandlers.keys()),
// },
// suggestedOutputs: [],
// alreadyProcessed: true,
// };
// }

const contentStr =
typeof content === "string" ? content : JSON.stringify(content);

// TODO: fix this abstraction
// // Get related memories first since we'll need them for context
// const relatedMemories = await this.vectorDb.findSimilarInRoom(
// contentStr,
// room.id,
// 3
// );

const outputsSchemaPart = ioContext?.availableOutputs
.map((handler) => {
return `${handler.name}: ${JSON.stringify(zodToJsonSchema(handler.schema, handler.name))}`;
Expand All @@ -133,9 +100,11 @@ export class MessageProcessor extends BaseProcessor {

const prompt = `Analyze the following content and provide a complete analysis:
# New Content to process:
${contentStr}
# New Content to process:
${contentStr}
# Other context:
${otherContext}
# Available Outputs:
${outputsSchemaPart}
Expand Down
24 changes: 24 additions & 0 deletions packages/core/src/core/room-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,28 @@ export class RoomManager {
await this.vectorDb.deleteRoom(roomId);
this.logger.info("RoomManager.deleteRoom", "Room deleted", { roomId });
}

public async getMemoriesFromRoom(
roomId: string,
limit?: number
): Promise<Memory[]> {
if (!this.vectorDb) {
throw new Error("VectorDB required for getting memories");
}

const room = await this.getRoom(roomId);
if (!room) {
throw new Error(`Room ${roomId} not found`);
}

const memories = await this.vectorDb.getMemoriesFromRoom(roomId, limit);

return memories.map((memory) => ({
id: memory.metadata?.memoryId,
roomId: roomId,
content: memory.content,
timestamp: new Date(memory.metadata?.timestamp),
metadata: memory.metadata,
}));
}
}
38 changes: 38 additions & 0 deletions packages/core/src/core/vector-db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1668,4 +1668,42 @@ export class ChromaVectorDB implements VectorDB {
throw error;
}
}

/**
* Gets all memories from a specific room's collection, optionally limited to a certain number
*/
public async getMemoriesFromRoom(
roomId: string,
limit?: number
): Promise<{ content: string; metadata?: Record<string, any> }[]> {
try {
const collection = await this.getCollectionForRoom(roomId);

// Get all documents from the collection, with optional limit
const results = await collection.get({
limit,
include: ["documents", "metadatas"] as IncludeEnum[],
});

if (!results.ids.length) {
return [];
}

return results.ids.map((_, idx) => ({
content: results.documents[idx] || "",
metadata: results.metadatas?.[idx] || {},
}));
} catch (error) {
this.logger.error(
"ChromaVectorDB.getMemoriesFromRoom",
"Failed to get memories",
{
error:
error instanceof Error ? error.message : String(error),
roomId,
}
);
throw error;
}
}
}

0 comments on commit 3b7d0a7

Please sign in to comment.