Skip to content

Commit

Permalink
Orchestrator manager + fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Cheelax committed Jan 29, 2025
1 parent 6e27048 commit 2795176
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 96 deletions.
189 changes: 142 additions & 47 deletions examples/example-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import { z } from "zod";
import express from "express";
import cors from "cors";
import { ObjectId } from "mongodb";

// ---- Import your internal classes and functions here ----
import { OrchestratorData } from "./types";
import { LLMClient } from "../packages/core/src/core/llm-client";

import { ChromaVectorDB } from "../packages/core/src/core/vector-db";
Expand All @@ -32,13 +31,16 @@ console.log(chalk.green("✅ Scheduled task database connected"));
// ------------------------------------------------------
// 1) CREATE DAYDREAMS AGENT
// ------------------------------------------------------
async function createDaydreamsAgent() {
async function createDaydreamsAgent(params?: {
model?: string;
temperature?: number;
}) {
const loglevel = LogLevel.INFO;

// 1.1. LLM Initialization
const llmClient = new LLMClient({
model: "anthropic/claude-3-5-sonnet-latest",
temperature: 0.3,
model: params?.model || "anthropic/claude-3-5-sonnet-latest",
temperature: params?.temperature || 0.3,
});

// 1.2. Vector memory initialization
Expand Down Expand Up @@ -136,20 +138,45 @@ wss.on("connection", (ws) => {
const { userId, goal: userMessage, orchestratorId } = parsed;

if (!userMessage || typeof userMessage !== "string") {
throw new Error(
"Invalid message format. Expected { goal: string, userId: string }"
);
throw new Error("Invalid message format. Expected { goal: string, userId: string }");
}

if (!userId || typeof userId !== "string") {
throw new Error("userId is required");
}

console.log("orchestratorId", orchestratorId);
// Process the message using the orchestrator with the provided userId
const currentOrchestrator = await scheduledTaskDb.getOrchestratorById(orchestratorId)
let currentOrchestrator;
if (orchestratorId) {
const orchestratorData = await scheduledTaskDb.getOrchestratorById(orchestratorId);

if (!orchestratorData) {
throw new Error(`Orchestrator not found with ID: ${orchestratorId}`);
}

currentOrchestrator = await createOrchestratorFromDb(orchestratorData as OrchestratorData);
} else {
const existingOrchestrators = await scheduledTaskDb.getOrchestratorsByUserId(userId);

if (existingOrchestrators.length > 0) {
currentOrchestrator = await createOrchestratorFromDb(existingOrchestrators[0]);
} else {
const defaultOrchestratorData = {
_id: new ObjectId(),
name: "Default Orchestrator",
userId: userId.toString(),
model: "anthropic/claude-3-5-sonnet-latest",
temperature: 0.3,
messages: [],
createdAt: new Date(),
updatedAt: new Date()
};

await scheduledTaskDb.createOrchestrator(defaultOrchestratorData);
currentOrchestrator = await createOrchestratorFromDb(defaultOrchestratorData);
}
}

const outputs = await orchestrator.dispatchToInput(
const outputs = await currentOrchestrator.dispatchToInput(
"user_chat",
{
content: userMessage,
Expand All @@ -159,9 +186,8 @@ wss.on("connection", (ws) => {
orchestratorId ? new ObjectId(orchestratorId) : undefined
);

// Send responses back through WebSocket
if (outputs && (outputs as any).length > 0) {
for (const out of outputs as any[]) {
if (outputs?.length > 0) {
for (const out of outputs) {
if (out.name === "chat_reply") {
sendJSON(ws, {
type: "response",
Expand Down Expand Up @@ -317,20 +343,18 @@ app.get("/api/orchestrators", async (req, res) => {
return res.status(400).json({ error: "userId is required" });
}

console.log(chalk.blue("[API] Listing orchestrators for user:", userId));
let orchestrators = await scheduledTaskDb.getOrchestratorsByUserId(userId);

// Récupérer les orchestrateurs depuis la base de données
let orchestrators = await scheduledTaskDb.getOrchestratorsByUserId(userId)

// Si aucun orchestrateur n'existe pour cet utilisateur, en créer un par défaut
if (orchestrators.length === 0) {
if (!orchestrators || orchestrators.length === 0) {
console.log(chalk.yellow("[API] No orchestrators found, creating default one"));

const id = new ObjectId();
const defaultOrchestrator = {
_id: id,
name: "Default Orchestrator",
userId: userId.toString(),
model: "anthropic/claude-3-5-sonnet-latest",
temperature: 0.3,
messages: [{
role: "system",
name: "system",
Expand All @@ -344,21 +368,29 @@ app.get("/api/orchestrators", async (req, res) => {
updatedAt: new Date()
};

await scheduledTaskDb.createOrchestrator(userId);
orchestrators = [defaultOrchestrator];
console.log(chalk.green("[API] Created default orchestrator"));

await scheduledTaskDb.createOrchestrator(defaultOrchestrator);

orchestrators = await scheduledTaskDb.getOrchestratorsByUserId(userId);
console.log(chalk.green("[API] Created default orchestrator"), orchestrators);
}

console.log(chalk.blue("[API] Returning orchestrators:"), orchestrators);
if (!orchestrators || orchestrators.length === 0) {
console.error(chalk.red("[API] Still no orchestrators after creation attempt"));
throw new Error("Failed to create or retrieve orchestrators");
}

res.json(orchestrators.map(orch => ({
const response = orchestrators.map(orch => ({
id: orch._id.toString(),
name: orch.name || `Chat ${new Date(orch.createdAt).toLocaleString()}`,
userId: orch.userId,
model: orch.model,
temperature: orch.temperature,
messages: orch.messages || [],
createdAt: orch.createdAt,
updatedAt: orch.updatedAt
})));
}));
res.json(response);
} catch (error) {
console.error(chalk.red("[API] Error listing orchestrators:"), error);
res.status(500).json({
Expand All @@ -371,15 +403,13 @@ app.get("/api/orchestrators", async (req, res) => {
// Créer un orchestrateur
app.post("/api/orchestrators", async (req, res) => {
try {
const { name, userId } = req.body;
const { name, userId, model, temperature } = req.body;
if (!userId) {
return res.status(400).json({ error: "userId is required" });
}

console.log(chalk.blue("[API] Creating orchestrator:"), { name, userId });
const id = new ObjectId();

// Message initial pour debug
const initialMessage = {
role: "system",
name: "system",
Expand All @@ -390,24 +420,26 @@ app.post("/api/orchestrators", async (req, res) => {
timestamp: new Date()
};

// Créer un objet simplifié pour la sauvegarde
const orchestratorData = {
_id: id,
name: name || `Orchestrator ${id}`,
userId,
model: model || "anthropic/claude-3-5-sonnet-latest",
temperature: temperature || 0.3,
messages: [initialMessage],
createdAt: new Date(),
updatedAt: new Date()
};

console.log(chalk.blue("[API] Saving orchestrator data:"), orchestratorData);
await scheduledTaskDb.createOrchestrator(userId);
await scheduledTaskDb.createOrchestrator(orchestratorData);

// Envoyer la réponse
res.json({
id: id.toString(),
name: orchestratorData.name,
userId: orchestratorData.userId,
model: orchestratorData.model,
temperature: orchestratorData.temperature
});
} catch (error) {
console.error(chalk.red("[API] Error creating orchestrator:"), error);
Expand All @@ -418,11 +450,10 @@ app.post("/api/orchestrators", async (req, res) => {
}
});

// Récupérer un orchestrateur spécifique avec ses messages
app.get("/api/orchestrators/:orchestratorId", async (req, res) => {
try {
const { orchestratorId } = req.params;
const { userId } = req.query; // Ajout de userId en query param
const { userId } = req.query;

console.log(chalk.blue("[API] Fetching orchestrator:"), { orchestratorId, userId });

Expand All @@ -433,26 +464,28 @@ app.get("/api/orchestrators/:orchestratorId", async (req, res) => {
return res.status(400).json({ error: "Invalid orchestrator ID format" });
}

// Rechercher l'orchestrateur avec l'ID et le userId
const orchestrator = await scheduledTaskDb.collection.findOne({
const orchestratorData = await scheduledTaskDb.collection.findOne({
_id: objectId,
userId: userId?.toString() // Vérifier que l'orchestrateur appartient à l'utilisateur
userId: userId?.toString()
});

if (!orchestrator) {
if (!orchestratorData) {
return res.status(404).json({ error: "Orchestrator not found" });
}

console.log(chalk.blue("[API] Found orchestrator:"), orchestrator);
await createOrchestratorFromDb(orchestratorData);

console.log(chalk.blue("[API] Found orchestrator:"), orchestratorData);

// Formater la réponse
res.json({
id: orchestrator._id.toString(),
name: orchestrator.name,
userId: orchestrator.userId,
messages: orchestrator.messages || [],
createdAt: orchestrator.createdAt,
updatedAt: orchestrator.updatedAt
id: orchestratorData._id.toString(),
name: orchestratorData.name,
userId: orchestratorData.userId,
model: orchestratorData.model,
temperature: orchestratorData.temperature,
messages: orchestratorData.messages || [],
createdAt: orchestratorData.createdAt,
updatedAt: orchestratorData.updatedAt
});
} catch (error) {
console.error(chalk.red("[API] Error fetching orchestrator:"), error);
Expand All @@ -472,3 +505,65 @@ app.listen(API_PORT, () => {
)
);
});

async function createOrchestratorFromDb(orchestratorData: OrchestratorData) {
if (!orchestratorData) {
throw new Error('Cannot create orchestrator from null data');
}

const loglevel = LogLevel.INFO;
console.log('🔍 Creating orchestrator from data:', orchestratorData);

const llmClient = new LLMClient({
model: orchestratorData.model || "anthropic/claude-3-5-sonnet-latest",
temperature: orchestratorData.temperature || 0.3,
});

const vectorDb = new ChromaVectorDB("agent_memory", {
chromaUrl: "http://localhost:8000",
logLevel: loglevel,
});

const roomManager = new RoomManager(vectorDb);
const processor = new MessageProcessor(llmClient, defaultCharacter, loglevel);

const orchestrator = new Orchestrator(
roomManager,
vectorDb,
[processor],
scheduledTaskDb,
{
level: loglevel,
enableColors: true,
enableTimestamp: true,
}
);

orchestrator.registerIOHandler({
name: "user_chat",
role: HandlerRole.INPUT,
schema: z.object({
content: z.string(),
userId: z.string().optional(),
}),
handler: async (payload) => payload,
});

orchestrator.registerIOHandler({
name: "chat_reply",
role: HandlerRole.OUTPUT,
schema: z.object({
userId: z.string().optional(),
message: z.string(),
}),
handler: async (payload) => {
const { userId, message } = payload as {
userId?: string;
message: string;
};
console.log(`Reply to user ${userId ?? "??"}: ${message}`);
},
});

return orchestrator;
}
20 changes: 20 additions & 0 deletions examples/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { ObjectId } from "mongodb";

export interface OrchestratorData extends OrchestratorChat {
_id: ObjectId;
name: string;
userId: string;
model: string;
temperature: number;
messages: Array<any>;
createdAt: Date;
updatedAt: Date;
}

export interface CreateOrchestratorParams {
name?: string;
userId: string;
model?: string;
temperature?: number;
initialMessage?: any;
}
Loading

0 comments on commit 2795176

Please sign in to comment.