-
Notifications
You must be signed in to change notification settings - Fork 430
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
LangChain primitive usage #110
Comments
I integrated with langchain and crewai I'll upload my code later |
That's super cool! Would you consider joining our team (it's an opensource team, just for research and open-source developing, not as official research group or company:) ) and working on making data-centric agent learning more practical and accessible? I know currently it's not always working due to the limitations of LLMs' reasoning abilities, but we believe it's the future for agent research! |
Sure, I see how powerful this can be. Watching it choose it's own agents
and make them up on the fly was wild. I was very excited when I found the
research paper.
…On Wed, Jul 24, 2024, 10:07 AM Wangchunshu Zhou ***@***.***> wrote:
I integrated with langchain and crewai I'll upload my code later
That's super cool! Would you consider joining our team (it's an opensource
team, just for research and open-source developing, not as official
research group or company:) ) and working on making data-centric agent
learning more practical and accessible? I know currently it's not always
working due to the limitations of LLMs' reasoning abilities, but we believe
it's the future for agent research!
—
Reply to this email directly, view it on GitHub
<#110 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ATTUXRPXCETLJSIBTZFZSWTZN6YJPAVCNFSM6AAAAABLMROQCOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDENBYGA3DINZVG4>
.
You are receiving this because you commented.Message ID:
***@***.***>
|
hi @zdondada , I'm currently experimenting with auto-update in the Langchain framework, but I've run into some issues and would love to see your awesome work!
|
Ok I just got access to GitHub Models so I was planning on going back in
the project and trying out new models. Do you have a discord or anything?
…On Tue, Sep 10, 2024, 3:03 AM Zhou guanya ***@***.***> wrote:
hi @zdondada <https://github.com/zdondada> , I'm currently experimenting
with auto-update in the Langchain framework, but I've run into some issues
and would love to see your awesome work!
I integrated with langchain and crewai I'll upload my code later
—
Reply to this email directly, view it on GitHub
<#110 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ATTUXRKJEFQOF5CLSTIGZGDZV2KTDAVCNFSM6AAAAABLMROQCOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDGMZZHAZTEMJQGY>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
Yeah, I have Discord—my username is guanya5365. Looking forward to chatting more!
安心
***@***.***
…------------------ 原始邮件 ------------------
发件人: ***@***.***>;
发送时间: 2024年9月11日(星期三) 凌晨2:15
收件人: ***@***.***>;
抄送: ***@***.***>; ***@***.***>;
主题: Re: [aiwaves-cn/agents] LangChain primitive usage (Issue #110)
Ok I just got access to GitHub Models so I was planning on going back in
the project and trying out new models. Do you have a discord or anything?
On Tue, Sep 10, 2024, 3:03 AM Zhou guanya ***@***.***> wrote:
> hi @zdondada <" rel="noopener" target="_blank">https://github.com/zdondada>; , I'm currently experimenting
> with auto-update in the Langchain framework, but I've run into some issues
> and would love to see your awesome work!
>
> I integrated with langchain and crewai I'll upload my code later
>
> —
> Reply to this email directly, view it on GitHub
> <" rel="noopener" target="_blank">#110 (comment)>;,
> or unsubscribe
> <" rel="noopener" target="_blank">https://github.com/notifications/unsubscribe-auth/ATTUXRKJEFQOF5CLSTIGZGDZV2KTDAVCNFSM6AAAAABLMROQCOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDGMZZHAZTEMJQGY>;
> .
> You are receiving this because you were mentioned.Message ID:
> ***@***.***>
>
—
Reply to this email directly, view it on GitHub, or unsubscribe.
You are receiving this because you commented.Message ID: ***@***.***>
|
Here's one thing I was messing with:
# Advanced EvoMARL+ Prototype with Extended Features
# Using CrewAI, LangChain, and Gemini Pro API
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from dotenv import load_dotenv
from crewai import Agent, Task, Crew, Process
from langchain.agents import Tool
from langchain.llms import GooglePalm
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from google.generativeai import GenerativeModel, generate_text
from typing import List, Dict, Tuple
import gym
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
# Load environment variables
load_dotenv()
# Initialize Gemini Pro API
genai_api_key = os.getenv("GENAI_API_KEY")
gemini_model = GenerativeModel(model_name="gemini-pro",
api_key=genai_api_key)
# Initialize Google Palm as a fallback
palm_llm = GooglePalm(google_api_key=os.getenv("GOOGLE_API_KEY"))
# Define a more advanced Graph Neural Network
class AdvancedGNN(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
super(AdvancedGNN, self).__init__()
self.layers = nn.ModuleList([nn.Linear(input_dim if i == 0 else
hidden_dim, hidden_dim) for i in range(num_layers)])
self.output = nn.Linear(hidden_dim, output_dim)
self.activation = nn.ReLU()
def forward(self, x, adj_matrix):
for layer in self.layers:
x = self.activation(layer(torch.matmul(adj_matrix, x)))
return self.output(x)
# Define a simple environment for multi-agent reinforcement learning
class SimpleMultiAgentEnv(gym.Env):
def __init__(self, num_agents):
super(SimpleMultiAgentEnv, self).__init__()
self.num_agents = num_agents
self.action_space = gym.spaces.Discrete(4) # 4 actions: up, down,
left, right
self.observation_space = gym.spaces.Box(low=-1, high=1, shape=(2,))
self.reset()
def reset(self):
self.agents = np.random.uniform(-1, 1, (self.num_agents, 2))
return self._get_obs()
def step(self, actions):
for i, action in enumerate(actions):
if action == 0: # up
self.agents[i, 1] = min(1, self.agents[i, 1] + 0.1)
elif action == 1: # down
self.agents[i, 1] = max(-1, self.agents[i, 1] - 0.1)
elif action == 2: # left
self.agents[i, 0] = max(-1, self.agents[i, 0] - 0.1)
elif action == 3: # right
self.agents[i, 0] = min(1, self.agents[i, 0] + 0.1)
rewards = self._calculate_rewards()
done = False # For simplicity, we'll say the episode never ends
return self._get_obs(), rewards, done, {}
def _get_obs(self):
return self.agents
def _calculate_rewards(self):
# Reward based on distance to center
distances = np.sqrt(np.sum(self.agents**2, axis=1))
return -distances # Negative distance as reward (closer to center
is better)
# Enhanced EvoMARLTools
class EvoMARLTools:
@staticmethod
def evolve_neural_network(input_dim: int, hidden_dim: int, output_dim:
int, num_layers: int) -> AdvancedGNN:
population_size = 20
generations = 10
mutation_rate = 0.1
def create_individual():
return AdvancedGNN(input_dim, hidden_dim, output_dim,
num_layers)
def mutate(individual):
for param in individual.parameters():
if np.random.rand() < mutation_rate:
param.data += torch.randn_like(param.data) * 0.1
return individual
def fitness(individual, X, adj_matrix, y):
with torch.no_grad():
outputs = individual(X, adj_matrix)
loss = nn.MSELoss()(outputs, y)
return -loss.item()
population = [create_individual() for _ in range(population_size)]
X = torch.randn(10, input_dim)
adj_matrix = torch.randint(0, 2, (10, 10)).float()
y = torch.randn(10, output_dim)
for generation in range(generations):
fitnesses = [fitness(ind, X, adj_matrix, y) for ind in
population]
sorted_pop = [x for _, x in sorted(zip(fitnesses, population),
key=lambda pair: pair[0], reverse=True)]
new_population = sorted_pop[:2]
while len(new_population) < population_size:
parent1, parent2 = np.random.choice(sorted_pop[:5], 2,
replace=False)
child = create_individual()
# Implement crossover
for c_param, p1_param, p2_param in zip(child.parameters(),
parent1.parameters(), parent2.parameters()):
c_param.data.copy_(torch.where(torch.rand_like(c_param)
< 0.5, p1_param, p2_param))
child = mutate(child)
new_population.append(child)
population = new_population
best_individual = max(population, key=lambda ind: fitness(ind, X,
adj_matrix, y))
return best_individual
@staticmethod
def optimize_swarm_behavior(n_agents: int, n_dimensions: int,
iterations: int = 100) -> List[Dict[str, float]]:
class Particle:
def __init__(self):
self.position = np.random.rand(n_dimensions)
self.velocity = np.random.rand(n_dimensions)
self.best_position = self.position.copy()
self.best_score = float('-inf')
def objective_function(x):
return -(np.sum(x**2) + np.sum(np.sin(x))) # More complex
objective function
particles = [Particle() for _ in range(n_agents)]
global_best_position = np.random.rand(n_dimensions)
global_best_score = float('-inf')
for _ in range(iterations):
for particle in particles:
score = objective_function(particle.position)
if score > particle.best_score:
particle.best_score = score
particle.best_position = particle.position.copy()
if score > global_best_score:
global_best_score = score
global_best_position = particle.position.copy()
for particle in particles:
inertia = 0.5
cognitive = 1.5
social = 1.5
cognitive_velocity = cognitive * np.random.rand() *
(particle.best_position - particle.position)
social_velocity = social * np.random.rand() *
(global_best_position - particle.position)
particle.velocity = inertia * particle.velocity +
cognitive_velocity + social_velocity
particle.position += particle.velocity
particle.position = np.clip(particle.position, 0, 1)
return [{"position": p.position.tolist(), "best_score":
p.best_score} for p in particles]
@staticmethod
def human_feedback_integration(actions: List[str], feedback:
List[float], previous_weights: Dict[str, float] = None) -> Dict[str, float]:
action_scores = {}
for action, score in zip(actions, feedback):
if action in action_scores:
action_scores[action].append(score)
else:
action_scores[action] = [score]
weighted_scores = {}
for action, scores in action_scores.items():
if previous_weights and action in previous_weights:
# Incorporate previous weights for continuous learning
weighted_scores[action] = 0.7 * np.average(scores,
weights=np.linspace(0.5, 1, len(scores))) + 0.3 * previous_weights[action]
else:
weighted_scores[action] = np.average(scores,
weights=np.linspace(0.5, 1, len(scores)))
return weighted_scores
@staticmethod
def hierarchical_task_decomposition(task_description: str) -> Dict[str,
List[str]]:
prompt = f"Decompose the following task into a hierarchical
structure with main tasks and subtasks, considering dependencies and
parallel execution possibilities: {task_description}"
response = gemini_model.generate_content(prompt)
lines = response.text.split('\n')
hierarchy = {}
current_main_task = None
for line in lines:
if line.startswith('- '): # Main task
current_main_task = line[2:].strip()
hierarchy[current_main_task] = {"subtasks": [],
"dependencies": [], "parallel": False}
elif line.startswith(' - ') and current_main_task: # Subtask
subtask = line[4:].strip()
if "Dependency:" in subtask:
dependency = subtask.split("Dependency:")[1].strip()
hierarchy[current_main_task]["dependencies"].append(dependency)
elif "Parallel" in subtask:
hierarchy[current_main_task]["parallel"] = True
else:
hierarchy[current_main_task]["subtasks"].append(subtask)
return hierarchy
@staticmethod
def transfer_learning(source_model: nn.Module, target_data:
torch.Tensor, fine_tune_layers: List[str] = None) -> nn.Module:
if fine_tune_layers:
for name, param in source_model.named_parameters():
if any(layer in name for layer in fine_tune_layers):
param.requires_grad = True
else:
param.requires_grad = False
else:
for param in source_model.parameters():
param.requires_grad = False
if hasattr(source_model, 'output'):
source_model.output =
nn.Linear(source_model.output.in_features, target_data.shape[1])
else:
raise AttributeError("Source model does not have an 'output'
layer. Please adjust the architecture.")
optimizer = optim.Adam(filter(lambda p: p.requires_grad,
source_model.parameters()), lr=0.001)
criterion = nn.MSELoss()
for epoch in range(200): # Increased epochs for better fine-tuning
optimizer.zero_grad()
output = source_model(target_data,
torch.eye(target_data.shape[0]))
loss = criterion(output, target_data)
loss.backward()
optimizer.step()
if epoch % 50 == 0:
print(f"Epoch {epoch}, Loss: {loss.item()}")
return source_model
@staticmethod
def multi_agent_reinforcement_learning(num_agents: int, num_episodes:
int) -> Tuple[List[float], List[AdvancedGNN]]:
env = SimpleMultiAgentEnv(num_agents)
agents = [AdvancedGNN(input_dim=2, hidden_dim=64, output_dim=4,
num_layers=2) for _ in range(num_agents)]
optimizers = [optim.Adam(agent.parameters(), lr=0.001) for agent in
agents]
episode_rewards = []
for episode in range(num_episodes):
state = env.reset()
episode_reward = 0
for _ in range(100): # 100 steps per episode
actions = []
log_probs = []
for i, agent in enumerate(agents):
agent_state = torch.FloatTensor(state[i])
action_probs =
torch.softmax(agent(agent_state.unsqueeze(0), torch.eye(1)), dim=1)
action = torch.multinomial(action_probs, 1).item()
actions.append(action)
log_probs.append(torch.log(action_probs[0, action]))
next_state, rewards, done, _ = env.step(actions)
episode_reward += sum(rewards)
# Update agents
for i, (agent, optimizer, log_prob, reward) in
enumerate(zip(agents, optimizers, log_probs, rewards)):
optimizer.zero_grad()
loss = -log_prob * reward # Simple policy gradient
loss.backward()
optimizer.step()
state = next_state
if done:
break
episode_rewards.append(episode_reward)
if episode % 10 == 0:
print(f"Episode {episode}, Average Reward:
{np.mean(episode_rewards[-10:])}")
return episode_rewards, agents
@staticmethod
def visualize_multi_agent_system(agents: np.ndarray):
plt.figure(figsize=(8, 8))
plt.scatter(agents[:, 0], agents[:, 1], c='blue', s=50)
plt.xlim(-1, 1)
plt.ylim(-1, 1)
plt.title("Multi-Agent System Visualization")
plt.xlabel("X coordinate")
plt.ylabel("Y coordinate")
plt.grid(True)
plt.show()
@staticmethod
def cluster_agent_behaviors(agent_data: np.ndarray, n_clusters: int =
3) -> np.ndarray:
kmeans = KMeans(n_clusters=n_clusters)
clusters = kmeans.fit_predict(agent_data)
return clusters
# Create enhanced tools
evolve_nn_tool = Tool(
name="Evolve Neural Network",
func=EvoMARLTools.evolve_neural_network,
description="Evolves an advanced Graph Neural Network architecture"
)
optimize_swarm_tool = Tool(
name="Optimize Swarm Behavior",
func=EvoMARLTools.optimize_swarm_behavior,
description="Optimizes the behavior of a swarm using enhanced Particle
Swarm Optimization"
)
human_feedback_tool = Tool(
name="Integrate Human Feedback",
func=EvoMARLTools.human_feedback_integration,
description="Integrates human feedback into the system using weighted
averages and continuous learning"
)
task_decomposition_tool = Tool(
name="Hierarchical Task Decomposition",
func=EvoMARLTools.hierarchical_task_decomposition,
description="Decomposes a high-level task into a hierarchical structure
of subtasks with dependencies and parallelization"
)
transfer_learning_tool = Tool(
name="Transfer Learning",
func=EvoMARLTools.transfer_learning,
description="Applies transfer learning to adapt a pre-trained model to
new data"
)
reinforcement_learning_tool = Tool(
name="Multi-Agent Reinforcement Learning",
func=EvoMARLTools.multi_agent_reinforcement_learning,
description="Trains multiple agents in a reinforcement learning setting"
)
visualization_tool = Tool(
name="Visualize Multi-Agent System",
func=EvoMARLTools.visualize_multi_agent_system,
description="Visualizes the positions and interactions of agents in a
multi-agent system"
)
clustering_tool = Tool(
name="Cluster Agent Behaviors",
func=EvoMARLTools.cluster_agent_behaviors,
description="Clusters agent behaviors using K-Means clustering"
)
# Define more advanced agents
advanced_architect_agent = Agent(
role='Advanced System Architect',
goal='Design and refine the architecture of EvoMARL+ with advanced
features',
tools=[evolve_nn_tool, task_decomposition_tool],
llm=palm_llm
)
advanced_ml_engineer_agent = Agent(
role='Advanced ML Engineer',
goal='Implement and fine-tune complex machine learning models and
algorithms',
tools=[transfer_learning_tool, reinforcement_learning_tool],
llm=palm_llm
)
advanced_human_interaction_agent = Agent(
role='Human-AI Interaction Specialist',
goal='Enhance the interpretability and human integration aspects of the
system',
tools=[human_feedback_tool, visualization_tool],
llm=palm_llm
)
advanced_swarm_intelligence_agent = Agent(
role='Advanced Swarm Intelligence Expert',
goal='Optimize and innovate in swarm behavior algorithms using advanced
techniques',
tools=[optimize_swarm_tool, clustering_tool],
llm=palm_llm
)
# Define advanced tasks
advanced_design_task = Task(
description='Refine the EvoMARL+ architecture with cutting-edge
methodologies and tools',
agent=advanced_architect_agent
)
advanced_ml_implementation_task = Task(
description='Integrate and optimize advanced ML techniques, including
transfer learning and reinforcement learning',
agent=advanced_ml_engineer_agent
)
advanced_human_integration_task = Task(
description='Develop robust human-AI interaction interfaces and
feedback mechanisms',
agent=advanced_human_interaction_agent
)
advanced_swarm_optimization_task = Task(
description='Innovate in swarm intelligence, focusing on optimization
and clustering of agent behaviors',
agent=advanced_swarm_intelligence_agent
)
# Create an advanced crew
advanced_evomarl_crew = Crew(
agents=[advanced_architect_agent, advanced_ml_engineer_agent,
advanced_human_interaction_agent, advanced_swarm_intelligence_agent],
tasks=[advanced_design_task, advanced_ml_implementation_task,
advanced_human_integration_task, advanced_swarm_optimization_task],
process=Process.parallel # Utilize parallel processing for efficiency
)
# Run the advanced crew
advanced_result = advanced_evomarl_crew.kickoff()
print(advanced_result)
# Example of using LangChain with Gemini Pro for specific EvoMARL+ advanced
tasks
advanced_evomarl_prompt = PromptTemplate(
input_variables=["component"],
template="Provide a comprehensive and technical guide on implementing
and optimizing the {component} component in EvoMARL+, highlighting advanced
techniques, potential challenges, and best practices."
)
advanced_evomarl_chain = LLMChain(llm=palm_llm,
prompt=advanced_evomarl_prompt)
# Example usage
advanced_component = "Hierarchical Reinforcement Learning"
advanced_explanation = advanced_evomarl_chain.run(advanced_component)
print(f"Comprehensive Guide for {advanced_component}:")
print(advanced_explanation)
On Tue, Sep 10, 2024 at 10:02 PM Zhou guanya ***@***.***>
wrote:
… Yeah, I have Discord—my username is guanya5365. Looking forward to
chatting more!
安心
***@***.***
------------------ 原始邮件 ------------------
发件人: ***@***.***>;
发送时间: 2024年9月11日(星期三) 凌晨2:15
收件人: ***@***.***>;
抄送: ***@***.***>; ***@***.***>;
主题: Re: [aiwaves-cn/agents] LangChain primitive usage (Issue #110)
Ok I just got access to GitHub Models so I was planning on going back in
the project and trying out new models. Do you have a discord or anything?
On Tue, Sep 10, 2024, 3:03 AM Zhou guanya ***@***.***> wrote:
> hi @zdondada <" rel="noopener" target="_blank">
https://github.com/zdondada>; , I'm currently experimenting
> with auto-update in the Langchain framework, but I've run into some
issues
> and would love to see your awesome work!
>
> I integrated with langchain and crewai I'll upload my code later
>
> —
> Reply to this email directly, view it on GitHub
> <" rel="noopener" target="_blank">
#110 (comment)>;,
> or unsubscribe
> <" rel="noopener" target="_blank">
https://github.com/notifications/unsubscribe-auth/ATTUXRKJEFQOF5CLSTIGZGDZV2KTDAVCNFSM6AAAAABLMROQCOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDGMZZHAZTEMJQGY>;
> .
> You are receiving this because you were mentioned.Message ID:
> ***@***.***>
>
—
Reply to this email directly, view it on GitHub, or unsubscribe.
You are receiving this because you commented.Message ID: ***@***.***>
—
Reply to this email directly, view it on GitHub
<#110 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ATTUXRJO7MUKTOGHY4LSXELZV6QDRAVCNFSM6AAAAABLMROQCOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDGNBSGQ3DOMRUGA>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
Then I was messing with this one (I had a bunch of different iterations,
I'm closing out a bunch of open notepads and sending you what I have as I
come across it, lol.)
import json
import ast
import re
from typing import Any, List, Optional
from collections import Counter
import nltk
from nltk.corpus import stopwords
import spacy
from spacy import displacy
import ast
nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')
import logging
import chromadb
from chromadb.config import Settings
from google.generativeai import GenerativeModel, configure
from langchain.llms.base import LLM
from pydantic import BaseModel, Field
from crewai import Agent, Task, Crew, Process
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s
- %(message)s')
# Configure Google Gemini API
configure(api_key="AIzaSyBSzFDPZhK-2MD6o-uvWVnxmdBa26SO14A")
# Initialize ChromaDB
chroma_client = chromadb.Client(Settings(persist_directory="./db"))
memory_collection = chroma_client.get_or_create_collection("agent_memory")
class GeminiWrapper(LLM):
model: Any
def __init__(self):
super().__init__()
self.model = GenerativeModel('gemini-pro')
self.callbacks = None
def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str:
response = self.model.generate_content(prompt)
return response.text
@Property
def _llm_type(self) -> str:
return "gemini"
gemini_llm = GeminiWrapper()
class PersistentMemoryAgent(Agent, BaseModel):
memory_id: str = Field(default="")
def __init__(self, **data):
super().__init__(**data)
self.memory_id = f"{self.role}_memory"
def remember(self, key, value):
memory_collection.upsert(
documents=[json.dumps(value)],
metadatas=[{"key": key}],
ids=[f"{self.memory_id}_{key}"]
)
def recall(self, key):
results = memory_collection.get(
ids=[f"{self.memory_id}_{key}"],
include=["documents"]
)
if results['documents']:
return json.loads(results['documents'][0])
return None
class DynamicAgentPipeline:
def __init__(self):
self.agents = []
self.llm = gemini_llm
def create_agent(self, role, goal, backstory):
agent = PersistentMemoryAgent(
role=role,
goal=goal,
backstory=backstory,
verbose=True,
allow_delegation=True,
llm=self.llm
)
self.agents.append(agent)
return agent
def get_or_create_agent(self, role, goal, backstory):
for agent in self.agents:
if agent.role == role:
return agent
return self.create_agent(role, goal, backstory)
class SymbolicLearningFramework:
def __init__(self, pipeline):
self.pipeline = pipeline
def forward_pass(self, input_data):
current_output = input_data
trajectory = []
for agent in self.pipeline.agents:
task = Task(
description=f"Process the following input:
{current_output}",
agent=agent,
expected_output="Processed output"
)
crew = Crew(agents=[agent], tasks=[task],
process=Process.sequential)
current_output = crew.kickoff()
trajectory.append((agent, current_output))
return current_output, trajectory
def compute_language_loss(self, output, expected_output):
loss_computer = self.pipeline.get_or_create_agent(
"Loss Computer",
"Compute the discrepancy between expected and actual outcomes",
"Expert in evaluating outputs and providing detailed feedback"
)
loss_task = Task(
description=f"Compute the loss between the output: '{output}'
and the expected output: '{expected_output}'. Provide a detailed analysis
and a numeric score from 0 to 10.",
agent=loss_computer,
expected_output="A detailed analysis and a numeric loss score"
)
loss_crew = Crew(agents=[loss_computer], tasks=[loss_task],
process=Process.sequential)
return loss_crew.kickoff()
def backward_pass(self, trajectory, loss):
gradients = []
for agent, output in reversed(trajectory):
gradient_computer = self.pipeline.get_or_create_agent(
f"Gradient Computer for {agent.role}",
f"Compute gradients for improving {agent.role}'s
performance",
f"Expert in analyzing performance and suggesting
improvements for {agent.role}"
)
gradient_task = Task(
description=f"Analyze the output: '{output}' and the
overall loss: '{loss}'. Provide specific suggestions for improving
{agent.role}'s performance. Include both textual analysis and a numeric
impact score from -1 to 1 for each suggestion.",
agent=gradient_computer,
expected_output="Specific improvement suggestions with
impact scores"
)
gradient_crew = Crew(agents=[gradient_computer],
tasks=[gradient_task], process=Process.sequential)
gradients.append((agent, gradient_crew.kickoff()))
return gradients
def apply_gradients(self, gradients):
for agent, gradient in gradients:
optimizer = self.pipeline.get_or_create_agent(
f"Optimizer for {agent.role}",
f"Optimize {agent.role}'s performance based on computed
gradients",
f"Expert in fine-tuning agent parameters and strategies for
{agent.role}"
)
optimization_task = Task(
description=f"Given the following gradient information for
{agent.role}: '{gradient}', provide specific updates to the agent's
parameters, including its role description, goal, and backstory. Format the
updates as a JSON object.",
agent=optimizer,
expected_output="JSON object with updated agent parameters"
)
optimization_crew = Crew(agents=[optimizer],
tasks=[optimization_task], process=Process.sequential)
updates = optimization_crew.kickoff()
# Apply the updates to the agent
self.update_agent(agent, updates)
def update_agent(self, agent, updates):
try:
updates_dict = json.loads(updates.string_output() if
hasattr(updates, 'string_output') else str(updates))
if 'role' in updates_dict:
agent.role = updates_dict['role']
if 'goal' in updates_dict:
agent.goal = updates_dict['goal']
if 'backstory' in updates_dict:
agent.backstory = updates_dict['backstory']
# Store the updates in persistent memory
agent.remember('latest_update', updates_dict)
except json.JSONDecodeError:
print(f"Error decoding JSON updates: {updates}")
def extract_numeric_value(self, text):
if hasattr(text, 'string_output'):
text = text.string_output()
numeric_values = re.findall(r'\d+(?:\.\d+)?', str(text))
if numeric_values:
return float(numeric_values[0])
else:
print(f"No numeric value found in: {text}")
return 10.0 # Default high loss
def train(self, input_data, expected_output, epochs=1, min_loss=2,
max_retries=3, additional_feedback=""):
best_output = None
best_loss = float('inf')
learning_rate = 0.1
consecutive_errors = 0
for epoch in range(epochs):
retry_count = 0
while retry_count < max_retries:
try:
output, trajectory = self.forward_pass(input_data)
loss = self.compute_language_loss(output,
expected_output)
loss_value = self.extract_numeric_value(loss)
if loss_value < best_loss:
best_loss = loss_value
best_output = output
if loss_value <= min_loss:
print(f"Training completed. Loss: {loss_value}")
return output
gradients = self.backward_pass(trajectory, loss_value)
# Incorporate additional feedback into the gradient
computation
if additional_feedback:
feedback_task = Task(
description=f"Analyze the following feedback
and suggest improvements: {additional_feedback}",
agent=self.pipeline.get_or_create_agent("Feedback Analyzer", "Analyze
feedback and suggest improvements", "Expert in code review and
optimization"),
expected_output="Improvement suggestions based
on feedback"
)
feedback_crew = Crew(agents=[feedback_task.agent],
tasks=[feedback_task], process=Process.sequential)
feedback_suggestions = feedback_crew.kickoff()
gradients.append((feedback_task.agent,
feedback_suggestions))
self.apply_gradients(gradients)
print(f"Epoch {epoch + 1}/{epochs} - Loss:
{loss_value}")
consecutive_errors = 0 # Reset error counter on
successful epoch
break # Exit retry loop if successful
except Exception as e:
retry_count += 1
consecutive_errors += 1
print(f"Error during training (Attempt
{retry_count}/{max_retries}): {e}")
if retry_count == max_retries:
print(f"Skipping epoch {epoch + 1} due to repeated
errors")
if consecutive_errors > 3:
learning_rate *= 0.5 # Reduce learning rate if
we're having repeated issues
print(f"Adjusting learning rate to {learning_rate}")
if consecutive_errors > 5:
print("Too many consecutive errors. Terminating
training early.")
return best_output if best_output is not None else
input_data
return best_output if best_output is not None else output
class DynamicTaskManager:
def __init__(self, framework):
self.framework = framework
def decompose_task(self, task_description):
decomposer = self.framework.pipeline.get_or_create_agent(
"Task Decomposer",
"Break down complex tasks into manageable subtasks",
"Expert in task analysis and strategic planning"
)
decompose_task = Task(
description=f"Analyze the following task and break it down into
subtasks: '{task_description}'. For each subtask, specify the required
agent role, goal, and backstory. Format the output as a JSON array.",
agent=decomposer,
expected_output="A JSON array of subtasks with roles, goals,
and backstories"
)
decompose_crew = Crew(agents=[decomposer], tasks=[decompose_task],
process=Process.sequential)
result = decompose_crew.kickoff()
return result.string_output() if hasattr(result, 'string_output')
else str(result)
def execute_task(self, task_description):
subtasks = self.decompose_task(task_description)
try:
subtasks_data = json.loads(subtasks)
if isinstance(subtasks_data, dict) and 'subtasks' in
subtasks_data:
subtasks_list = subtasks_data['subtasks']
elif isinstance(subtasks_data, list):
subtasks_list = subtasks_data
else:
raise ValueError("Unexpected subtasks format")
except json.JSONDecodeError:
print(f"Error decoding JSON. Raw output: {subtasks}")
return "Error: Unable to parse subtasks"
except ValueError as e:
print(f"Error: {str(e)}. Raw output: {subtasks}")
return "Error: Unexpected subtasks format"
final_output = ""
for subtask in subtasks_list:
if not isinstance(subtask, dict):
print(f"Error: Subtask is not a dictionary. Subtask:
{subtask}")
continue
agent = self.framework.pipeline.get_or_create_agent(
subtask.get('role', 'Default Role'),
subtask.get('goal', 'Complete the subtask'),
subtask.get('backstory', '')
)
subtask_execution = Task(
description=subtask.get('goal', str(subtask)), # Using
'goal' as the task description
agent=agent,
expected_output="Completed subtask output"
)
subtask_crew = Crew(agents=[agent], tasks=[subtask_execution],
process=Process.sequential)
subtask_output = subtask_crew.kickoff()
final_output += subtask_output.string_output() if
hasattr(subtask_output, 'string_output') else str(
subtask_output) + "\n"
return final_output
class ContinuousLearner:
def __init__(self, framework, task_manager):
self.framework = framework
self.task_manager = task_manager
def learn_from_experience(self, task_description, expected_output,
max_attempts=5, learning_rate=0.1):
for attempt in range(max_attempts):
actual_output = self.task_manager.execute_task(task_description)
# For coding tasks, validate syntax first
if "Write a Python function" in task_description:
is_valid, error_message =
validate_python_syntax(actual_output)
if not is_valid:
print(f"Syntax Error (Attempt {attempt + 1}):
{error_message}")
loss_value = 10 # High loss for syntax errors
else:
# If syntax is valid, check for correctness
error = execute_python_safely(actual_output)
if error:
print(f"Runtime Error (Attempt {attempt + 1}):
{error}")
loss_value = 8 # High loss for runtime errors
else:
loss =
self.framework.compute_language_loss(actual_output, expected_output)
loss_value =
self.framework.extract_numeric_value(loss)
else:
# For non-coding tasks, compute loss as before
loss = self.framework.compute_language_loss(actual_output,
expected_output)
loss_value = self.framework.extract_numeric_value(loss)
print(f"Attempt {attempt + 1}, Loss: {loss_value}")
if loss_value <= 2: # Satisfactory performance
print(f"Task completed successfully on attempt {attempt +
1}. Loss: {loss_value}")
return actual_output
# Provide more specific feedback for improvement
if "Write a Python function" in task_description:
if not is_valid:
feedback = f"The code has a syntax error:
{error_message}. Please fix the syntax and ensure the code is valid Python."
elif error:
feedback = f"The code has a runtime error: {error}.
Please fix the logic to ensure the function runs correctly."
else:
feedback = "The code syntax is correct, but it may not
meet all requirements. Please review the function logic and output."
else:
feedback = "The output does not fully meet the expected
criteria. Please improve based on the given requirements."
# Adjust learning rate based on loss
adjusted_learning_rate = learning_rate * loss_value / 10
self.framework.train(task_description, expected_output,
epochs=int(1 / adjusted_learning_rate), min_loss=2,
additional_feedback=feedback)
print(f"Failed to achieve satisfactory performance after
{max_attempts} attempts.")
return actual_output
def optimize_pipeline(self):
optimizer = self.framework.pipeline.get_or_create_agent(
"Pipeline Optimizer",
"Optimize the overall agent pipeline for improved performance",
"Expert in system architecture and optimization strategies"
)
optimize_task = Task(
description="Analyze the current agent pipeline and suggest
optimizations. Consider adding, removing, or modifying agent roles to
improve overall system performance. Provide suggestions in a JSON format.",
agent=optimizer,
expected_output="JSON-formatted optimization suggestions"
)
optimize_crew = Crew(agents=[optimizer], tasks=[optimize_task],
process=Process.sequential)
optimizations = optimize_crew.kickoff()
try:
# Handle different types of output
if isinstance(optimizations, str):
optimizations_str = optimizations
elif hasattr(optimizations, 'string_output'):
optimizations_str = optimizations.string_output()
else:
optimizations_str = str(optimizations)
# Remove backticks and "json" identifier if present
optimizations_str = re.sub(r'^```json\s*|\s*```$', '',
optimizations_str).strip()
optimization_list = json.loads(optimizations_str)
# Check if 'optimizations' key exists and contains a list
if isinstance(optimization_list, dict) and 'optimizations' in
optimization_list:
optimization_list = optimization_list['optimizations']
elif not isinstance(optimization_list, list):
raise ValueError("Unexpected optimizations format. Expected
a list or a dictionary with 'optimizations' key")
for opt in optimization_list:
action = opt.get('action')
if action == 'add':
self.framework.pipeline.create_agent(opt.get('role'),
opt.get('goal'), opt.get('backstory'))
elif action == 'remove':
self.framework.pipeline.agents = [agent for agent in
self.framework.pipeline.agents if agent.role != opt.get('role')]
elif action == 'modify':
agent = next((a for a in self.framework.pipeline.agents
if a.role == opt.get('role')), None)
if agent:
self.framework.update_agent(agent,
json.dumps(opt.get('updates')))
except json.JSONDecodeError:
print(f"Error decoding JSON optimizations: {optimizations_str}")
except ValueError as e:
print(f"Error processing optimizations: {e}")
except Exception as e:
print(f"Unexpected error during pipeline optimization: {e}")
# Modify the DynamicTaskManager class to handle the JSON parsing issue
class DynamicTaskManager:
def __init__(self, framework):
self.framework = framework
def decompose_task(self, task_description):
decomposer = self.framework.pipeline.get_or_create_agent(
"Task Decomposer",
"Break down complex tasks into manageable subtasks",
"Expert in task analysis and strategic planning"
)
decompose_task = Task(
description=f"Analyze the following task and break it down into
subtasks: '{task_description}'. For each subtask, specify the required
agent role, goal, and backstory. Format the output as a JSON array.",
agent=decomposer,
expected_output="A JSON array of subtasks with roles, goals,
and backstories"
)
decompose_crew = Crew(agents=[decomposer], tasks=[decompose_task],
process=Process.sequential)
result = decompose_crew.kickoff()
return result.string_output() if hasattr(result, 'string_output')
else str(result)
def execute_task(self, task_description):
subtasks = self.decompose_task(task_description)
try:
# Remove backticks and "json" identifier if present
subtasks_str = re.sub(r'^```json\s*|\s*```$', '',
subtasks).strip()
subtasks_data = json.loads(subtasks_str)
if isinstance(subtasks_data, dict) and 'subtasks' in
subtasks_data:
subtasks_list = subtasks_data['subtasks']
elif isinstance(subtasks_data, list):
subtasks_list = subtasks_data
else:
raise ValueError("Unexpected subtasks format")
except json.JSONDecodeError:
print(f"Error decoding JSON. Raw output: {subtasks}")
return "Error: Unable to parse subtasks"
except ValueError as e:
print(f"Error: {str(e)}. Raw output: {subtasks}")
return "Error: Unexpected subtasks format"
final_output = ""
for subtask in subtasks_list:
if not isinstance(subtask, dict):
print(f"Error: Subtask is not a dictionary. Subtask:
{subtask}")
continue
agent = self.framework.pipeline.get_or_create_agent(
subtask.get('role', 'Default Role'),
subtask.get('goal', 'Complete the subtask'),
subtask.get('backstory', '')
)
subtask_execution = Task(
description=subtask.get('goal', str(subtask)),
agent=agent,
expected_output="Completed subtask output"
)
subtask_crew = Crew(agents=[agent], tasks=[subtask_execution],
process=Process.sequential)
subtask_output = subtask_crew.kickoff()
final_output += subtask_output.string_output() if
hasattr(subtask_output, 'string_output') else str(subtask_output) + "\n"
return final_output
def execute_python_safely(code):
# First, validate the syntax
is_valid, error_message = validate_python_syntax(code)
if not is_valid:
return f"Syntax Error: {error_message}"
# If syntax is valid, try to execute
try:
exec(code)
return None # No error
except Exception as e:
return str(e) # Return the error message
def load_manuscript(file_path: str) -> str:
"""Load the manuscript content from a file."""
try:
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
except FileNotFoundError:
logging.error(f"File not found: {file_path}")
raise
except IOError as e:
logging.error(f"Error reading file {file_path}: {str(e)}")
raise
def split_into_chapters(manuscript_content: str) -> List[str]:
"""Split the manuscript content into chapters."""
chapters = manuscript_content.split("Chapter")[1:] # Assumes chapters
start with "Chapter"
return [chapter.strip() for chapter in chapters]
class GeminiAnalyzer:
def __init__(self):
self.model = GenerativeModel('gemini-pro')
def _safe_json_loads(self, text):
try:
return json.loads(text)
except json.JSONDecodeError:
logging.error(f"Failed to parse JSON: {text}")
return {"error": "Failed to parse response as JSON"}
def analyze_chapter(self, chapter_content: str, chapter_number: int) ->
dict:
prompt = f"""
Analyze the following chapter (Chapter {chapter_number}) of a
manuscript:
{chapter_content[:2000]}... # Truncated for API limits
Provide a detailed analysis including:
1. Main themes and ideas
2. Character development (if applicable)
3. Plot progression (if applicable)
4. Writing style and tone
5. Pacing and flow
6. Strengths of the chapter
7. Areas for potential improvement
8. How this chapter fits into the overall narrative (based on what
you can infer)
Format your response as a JSON object with these keys.
"""
try:
response = self.model.generate_content(prompt)
logging.info(f"Raw response for Chapter {chapter_number}:
{response.text}")
return self._safe_json_loads(response.text)
except Exception as e:
logging.error(f"Error analyzing Chapter {chapter_number}:
{str(e)}")
return {"error": f"Failed to analyze Chapter {chapter_number}"}
def analyze_transitions(self, prev_chapter: str, next_chapter: str,
chapter_number: int) -> dict:
prompt = f"""
Analyze the transition between these two consecutive chapters:
End of Chapter {chapter_number}:
{prev_chapter[-500:]}
Beginning of Chapter {chapter_number + 1}:
{next_chapter[:500]}
Provide an analysis of:
1. Continuity and flow between chapters
2. Effectiveness of the transition
3. Any cliffhangers or hooks used
4. Suggestions for improving the transition (if necessary)
Format your response as a JSON object with these keys.
"""
try:
response = self.model.generate_content(prompt)
logging.info(f"Raw response for transition
{chapter_number}-{chapter_number+1}: {response.text}")
return self._safe_json_loads(response.text)
except Exception as e:
logging.error(f"Error analyzing transition
{chapter_number}-{chapter_number+1}: {str(e)}")
return {"error": f"Failed to analyze transition
{chapter_number}-{chapter_number+1}"}
def analyze_overall_structure(self, chapter_summaries: List[dict]) ->
dict:
prompt = f"""
Based on the following summaries of each chapter, analyze the
overall structure of the manuscript:
{json.dumps(chapter_summaries, indent=2)}
Provide an analysis of:
1. Overall narrative arc
2. Pacing throughout the manuscript
3. Character development across chapters
4. Consistency in themes and tone
5. Strengths of the overall structure
6. Areas for potential improvement in the manuscript's structure
7. Suggestions for enhancing the flow and coherence of the narrative
Format your response as a JSON object with these keys.
"""
try:
response = self.model.generate_content(prompt)
logging.info(f"Raw response for overall structure:
{response.text}")
return self._safe_json_loads(response.text)
except Exception as e:
logging.error(f"Error analyzing overall structure: {str(e)}")
return {"error": "Failed to analyze overall structure"}
def analyze_manuscript(file_path: str) -> dict:
"""Perform a deep analysis of the manuscript using Gemini."""
logging.info("Starting manuscript analysis")
manuscript_content = load_manuscript(file_path)
chapters = split_into_chapters(manuscript_content)
analyzer = GeminiAnalyzer()
chapter_analyses = []
transitions = []
for i, chapter in enumerate(chapters):
logging.info(f"Analyzing Chapter {i+1}")
chapter_analysis = analyzer.analyze_chapter(chapter, i+1)
chapter_analyses.append(chapter_analysis)
if i < len(chapters) - 1:
transition_analysis = analyzer.analyze_transitions(chapter,
chapters[i+1], i+1)
transitions.append(transition_analysis)
overall_analysis = analyzer.analyze_overall_structure(chapter_analyses)
logging.info("Manuscript analysis completed")
return {
'overall_analysis': overall_analysis,
'chapter_analyses': chapter_analyses,
'transitions': transitions
}
def print_analysis_results(analysis: dict):
"""Print the analysis results in a readable format."""
print("\n=== Overall Manuscript Analysis ===")
for key, value in analysis['overall_analysis'].items():
print(f"\n{key.replace('_', ' ').title()}:")
print(value)
print("\n=== Chapter-by-Chapter Analysis ===")
for i, chapter in enumerate(analysis['chapter_analyses']):
print(f"\nChapter {i+1}:")
for key, value in chapter.items():
print(f"\n{key.replace('_', ' ').title()}:")
print(value)
print("\n=== Transitions Between Chapters ===")
for i, transition in enumerate(analysis['transitions']):
print(f"\nTransition between Chapter {i+1} and Chapter {i+2}:")
for key, value in transition.items():
print(f"\n{key.replace('_', ' ').title()}:")
print(value)
def main():
try:
manuscript_path = "book.txt" # Update this path if your manuscript
is located elsewhere
analysis_results = analyze_manuscript(manuscript_path)
print_analysis_results(analysis_results)
except Exception as e:
logging.error(f"An error occurred during the manuscript analysis:
{str(e)}")
raise
if __name__ == "__main__":
main()
…On Wed, Sep 18, 2024 at 9:43 PM Zach Bennett ***@***.***> wrote:
Here's one thing I was messing with:
# Advanced EvoMARL+ Prototype with Extended Features
# Using CrewAI, LangChain, and Gemini Pro API
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from dotenv import load_dotenv
from crewai import Agent, Task, Crew, Process
from langchain.agents import Tool
from langchain.llms import GooglePalm
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from google.generativeai import GenerativeModel, generate_text
from typing import List, Dict, Tuple
import gym
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
# Load environment variables
load_dotenv()
# Initialize Gemini Pro API
genai_api_key = os.getenv("GENAI_API_KEY")
gemini_model = GenerativeModel(model_name="gemini-pro",
api_key=genai_api_key)
# Initialize Google Palm as a fallback
palm_llm = GooglePalm(google_api_key=os.getenv("GOOGLE_API_KEY"))
# Define a more advanced Graph Neural Network
class AdvancedGNN(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
super(AdvancedGNN, self).__init__()
self.layers = nn.ModuleList([nn.Linear(input_dim if i == 0 else
hidden_dim, hidden_dim) for i in range(num_layers)])
self.output = nn.Linear(hidden_dim, output_dim)
self.activation = nn.ReLU()
def forward(self, x, adj_matrix):
for layer in self.layers:
x = self.activation(layer(torch.matmul(adj_matrix, x)))
return self.output(x)
# Define a simple environment for multi-agent reinforcement learning
class SimpleMultiAgentEnv(gym.Env):
def __init__(self, num_agents):
super(SimpleMultiAgentEnv, self).__init__()
self.num_agents = num_agents
self.action_space = gym.spaces.Discrete(4) # 4 actions: up, down,
left, right
self.observation_space = gym.spaces.Box(low=-1, high=1, shape=(2,))
self.reset()
def reset(self):
self.agents = np.random.uniform(-1, 1, (self.num_agents, 2))
return self._get_obs()
def step(self, actions):
for i, action in enumerate(actions):
if action == 0: # up
self.agents[i, 1] = min(1, self.agents[i, 1] + 0.1)
elif action == 1: # down
self.agents[i, 1] = max(-1, self.agents[i, 1] - 0.1)
elif action == 2: # left
self.agents[i, 0] = max(-1, self.agents[i, 0] - 0.1)
elif action == 3: # right
self.agents[i, 0] = min(1, self.agents[i, 0] + 0.1)
rewards = self._calculate_rewards()
done = False # For simplicity, we'll say the episode never ends
return self._get_obs(), rewards, done, {}
def _get_obs(self):
return self.agents
def _calculate_rewards(self):
# Reward based on distance to center
distances = np.sqrt(np.sum(self.agents**2, axis=1))
return -distances # Negative distance as reward (closer to center
is better)
# Enhanced EvoMARLTools
class EvoMARLTools:
@staticmethod
def evolve_neural_network(input_dim: int, hidden_dim: int, output_dim:
int, num_layers: int) -> AdvancedGNN:
population_size = 20
generations = 10
mutation_rate = 0.1
def create_individual():
return AdvancedGNN(input_dim, hidden_dim, output_dim,
num_layers)
def mutate(individual):
for param in individual.parameters():
if np.random.rand() < mutation_rate:
param.data += torch.randn_like(param.data) * 0.1
return individual
def fitness(individual, X, adj_matrix, y):
with torch.no_grad():
outputs = individual(X, adj_matrix)
loss = nn.MSELoss()(outputs, y)
return -loss.item()
population = [create_individual() for _ in range(population_size)]
X = torch.randn(10, input_dim)
adj_matrix = torch.randint(0, 2, (10, 10)).float()
y = torch.randn(10, output_dim)
for generation in range(generations):
fitnesses = [fitness(ind, X, adj_matrix, y) for ind in
population]
sorted_pop = [x for _, x in sorted(zip(fitnesses, population),
key=lambda pair: pair[0], reverse=True)]
new_population = sorted_pop[:2]
while len(new_population) < population_size:
parent1, parent2 = np.random.choice(sorted_pop[:5], 2,
replace=False)
child = create_individual()
# Implement crossover
for c_param, p1_param, p2_param in zip(child.parameters(),
parent1.parameters(), parent2.parameters()):
c_param.data.copy_(torch.where(torch.rand_like(c_param) < 0.5, p1_param,
p2_param))
child = mutate(child)
new_population.append(child)
population = new_population
best_individual = max(population, key=lambda ind: fitness(ind, X,
adj_matrix, y))
return best_individual
@staticmethod
def optimize_swarm_behavior(n_agents: int, n_dimensions: int,
iterations: int = 100) -> List[Dict[str, float]]:
class Particle:
def __init__(self):
self.position = np.random.rand(n_dimensions)
self.velocity = np.random.rand(n_dimensions)
self.best_position = self.position.copy()
self.best_score = float('-inf')
def objective_function(x):
return -(np.sum(x**2) + np.sum(np.sin(x))) # More complex
objective function
particles = [Particle() for _ in range(n_agents)]
global_best_position = np.random.rand(n_dimensions)
global_best_score = float('-inf')
for _ in range(iterations):
for particle in particles:
score = objective_function(particle.position)
if score > particle.best_score:
particle.best_score = score
particle.best_position = particle.position.copy()
if score > global_best_score:
global_best_score = score
global_best_position = particle.position.copy()
for particle in particles:
inertia = 0.5
cognitive = 1.5
social = 1.5
cognitive_velocity = cognitive * np.random.rand() *
(particle.best_position - particle.position)
social_velocity = social * np.random.rand() *
(global_best_position - particle.position)
particle.velocity = inertia * particle.velocity +
cognitive_velocity + social_velocity
particle.position += particle.velocity
particle.position = np.clip(particle.position, 0, 1)
return [{"position": p.position.tolist(), "best_score":
p.best_score} for p in particles]
@staticmethod
def human_feedback_integration(actions: List[str], feedback:
List[float], previous_weights: Dict[str, float] = None) -> Dict[str, float]:
action_scores = {}
for action, score in zip(actions, feedback):
if action in action_scores:
action_scores[action].append(score)
else:
action_scores[action] = [score]
weighted_scores = {}
for action, scores in action_scores.items():
if previous_weights and action in previous_weights:
# Incorporate previous weights for continuous learning
weighted_scores[action] = 0.7 * np.average(scores,
weights=np.linspace(0.5, 1, len(scores))) + 0.3 * previous_weights[action]
else:
weighted_scores[action] = np.average(scores,
weights=np.linspace(0.5, 1, len(scores)))
return weighted_scores
@staticmethod
def hierarchical_task_decomposition(task_description: str) ->
Dict[str, List[str]]:
prompt = f"Decompose the following task into a hierarchical
structure with main tasks and subtasks, considering dependencies and
parallel execution possibilities: {task_description}"
response = gemini_model.generate_content(prompt)
lines = response.text.split('\n')
hierarchy = {}
current_main_task = None
for line in lines:
if line.startswith('- '): # Main task
current_main_task = line[2:].strip()
hierarchy[current_main_task] = {"subtasks": [],
"dependencies": [], "parallel": False}
elif line.startswith(' - ') and current_main_task: # Subtask
subtask = line[4:].strip()
if "Dependency:" in subtask:
dependency = subtask.split("Dependency:")[1].strip()
hierarchy[current_main_task]["dependencies"].append(dependency)
elif "Parallel" in subtask:
hierarchy[current_main_task]["parallel"] = True
else:
hierarchy[current_main_task]["subtasks"].append(subtask)
return hierarchy
@staticmethod
def transfer_learning(source_model: nn.Module, target_data:
torch.Tensor, fine_tune_layers: List[str] = None) -> nn.Module:
if fine_tune_layers:
for name, param in source_model.named_parameters():
if any(layer in name for layer in fine_tune_layers):
param.requires_grad = True
else:
param.requires_grad = False
else:
for param in source_model.parameters():
param.requires_grad = False
if hasattr(source_model, 'output'):
source_model.output =
nn.Linear(source_model.output.in_features, target_data.shape[1])
else:
raise AttributeError("Source model does not have an 'output'
layer. Please adjust the architecture.")
optimizer = optim.Adam(filter(lambda p: p.requires_grad,
source_model.parameters()), lr=0.001)
criterion = nn.MSELoss()
for epoch in range(200): # Increased epochs for better fine-tuning
optimizer.zero_grad()
output = source_model(target_data,
torch.eye(target_data.shape[0]))
loss = criterion(output, target_data)
loss.backward()
optimizer.step()
if epoch % 50 == 0:
print(f"Epoch {epoch}, Loss: {loss.item()}")
return source_model
@staticmethod
def multi_agent_reinforcement_learning(num_agents: int, num_episodes:
int) -> Tuple[List[float], List[AdvancedGNN]]:
env = SimpleMultiAgentEnv(num_agents)
agents = [AdvancedGNN(input_dim=2, hidden_dim=64, output_dim=4,
num_layers=2) for _ in range(num_agents)]
optimizers = [optim.Adam(agent.parameters(), lr=0.001) for agent
in agents]
episode_rewards = []
for episode in range(num_episodes):
state = env.reset()
episode_reward = 0
for _ in range(100): # 100 steps per episode
actions = []
log_probs = []
for i, agent in enumerate(agents):
agent_state = torch.FloatTensor(state[i])
action_probs =
torch.softmax(agent(agent_state.unsqueeze(0), torch.eye(1)), dim=1)
action = torch.multinomial(action_probs, 1).item()
actions.append(action)
log_probs.append(torch.log(action_probs[0, action]))
next_state, rewards, done, _ = env.step(actions)
episode_reward += sum(rewards)
# Update agents
for i, (agent, optimizer, log_prob, reward) in
enumerate(zip(agents, optimizers, log_probs, rewards)):
optimizer.zero_grad()
loss = -log_prob * reward # Simple policy gradient
loss.backward()
optimizer.step()
state = next_state
if done:
break
episode_rewards.append(episode_reward)
if episode % 10 == 0:
print(f"Episode {episode}, Average Reward:
{np.mean(episode_rewards[-10:])}")
return episode_rewards, agents
@staticmethod
def visualize_multi_agent_system(agents: np.ndarray):
plt.figure(figsize=(8, 8))
plt.scatter(agents[:, 0], agents[:, 1], c='blue', s=50)
plt.xlim(-1, 1)
plt.ylim(-1, 1)
plt.title("Multi-Agent System Visualization")
plt.xlabel("X coordinate")
plt.ylabel("Y coordinate")
plt.grid(True)
plt.show()
@staticmethod
def cluster_agent_behaviors(agent_data: np.ndarray, n_clusters: int =
3) -> np.ndarray:
kmeans = KMeans(n_clusters=n_clusters)
clusters = kmeans.fit_predict(agent_data)
return clusters
# Create enhanced tools
evolve_nn_tool = Tool(
name="Evolve Neural Network",
func=EvoMARLTools.evolve_neural_network,
description="Evolves an advanced Graph Neural Network architecture"
)
optimize_swarm_tool = Tool(
name="Optimize Swarm Behavior",
func=EvoMARLTools.optimize_swarm_behavior,
description="Optimizes the behavior of a swarm using enhanced Particle
Swarm Optimization"
)
human_feedback_tool = Tool(
name="Integrate Human Feedback",
func=EvoMARLTools.human_feedback_integration,
description="Integrates human feedback into the system using weighted
averages and continuous learning"
)
task_decomposition_tool = Tool(
name="Hierarchical Task Decomposition",
func=EvoMARLTools.hierarchical_task_decomposition,
description="Decomposes a high-level task into a hierarchical
structure of subtasks with dependencies and parallelization"
)
transfer_learning_tool = Tool(
name="Transfer Learning",
func=EvoMARLTools.transfer_learning,
description="Applies transfer learning to adapt a pre-trained model to
new data"
)
reinforcement_learning_tool = Tool(
name="Multi-Agent Reinforcement Learning",
func=EvoMARLTools.multi_agent_reinforcement_learning,
description="Trains multiple agents in a reinforcement learning
setting"
)
visualization_tool = Tool(
name="Visualize Multi-Agent System",
func=EvoMARLTools.visualize_multi_agent_system,
description="Visualizes the positions and interactions of agents in a
multi-agent system"
)
clustering_tool = Tool(
name="Cluster Agent Behaviors",
func=EvoMARLTools.cluster_agent_behaviors,
description="Clusters agent behaviors using K-Means clustering"
)
# Define more advanced agents
advanced_architect_agent = Agent(
role='Advanced System Architect',
goal='Design and refine the architecture of EvoMARL+ with advanced
features',
tools=[evolve_nn_tool, task_decomposition_tool],
llm=palm_llm
)
advanced_ml_engineer_agent = Agent(
role='Advanced ML Engineer',
goal='Implement and fine-tune complex machine learning models and
algorithms',
tools=[transfer_learning_tool, reinforcement_learning_tool],
llm=palm_llm
)
advanced_human_interaction_agent = Agent(
role='Human-AI Interaction Specialist',
goal='Enhance the interpretability and human integration aspects of
the system',
tools=[human_feedback_tool, visualization_tool],
llm=palm_llm
)
advanced_swarm_intelligence_agent = Agent(
role='Advanced Swarm Intelligence Expert',
goal='Optimize and innovate in swarm behavior algorithms using
advanced techniques',
tools=[optimize_swarm_tool, clustering_tool],
llm=palm_llm
)
# Define advanced tasks
advanced_design_task = Task(
description='Refine the EvoMARL+ architecture with cutting-edge
methodologies and tools',
agent=advanced_architect_agent
)
advanced_ml_implementation_task = Task(
description='Integrate and optimize advanced ML techniques, including
transfer learning and reinforcement learning',
agent=advanced_ml_engineer_agent
)
advanced_human_integration_task = Task(
description='Develop robust human-AI interaction interfaces and
feedback mechanisms',
agent=advanced_human_interaction_agent
)
advanced_swarm_optimization_task = Task(
description='Innovate in swarm intelligence, focusing on optimization
and clustering of agent behaviors',
agent=advanced_swarm_intelligence_agent
)
# Create an advanced crew
advanced_evomarl_crew = Crew(
agents=[advanced_architect_agent, advanced_ml_engineer_agent,
advanced_human_interaction_agent, advanced_swarm_intelligence_agent],
tasks=[advanced_design_task, advanced_ml_implementation_task,
advanced_human_integration_task, advanced_swarm_optimization_task],
process=Process.parallel # Utilize parallel processing for efficiency
)
# Run the advanced crew
advanced_result = advanced_evomarl_crew.kickoff()
print(advanced_result)
# Example of using LangChain with Gemini Pro for specific EvoMARL+
advanced tasks
advanced_evomarl_prompt = PromptTemplate(
input_variables=["component"],
template="Provide a comprehensive and technical guide on implementing
and optimizing the {component} component in EvoMARL+, highlighting advanced
techniques, potential challenges, and best practices."
)
advanced_evomarl_chain = LLMChain(llm=palm_llm,
prompt=advanced_evomarl_prompt)
# Example usage
advanced_component = "Hierarchical Reinforcement Learning"
advanced_explanation = advanced_evomarl_chain.run(advanced_component)
print(f"Comprehensive Guide for {advanced_component}:")
print(advanced_explanation)
On Tue, Sep 10, 2024 at 10:02 PM Zhou guanya ***@***.***>
wrote:
> Yeah, I have Discord—my username is guanya5365. Looking forward to
> chatting more!
>
>
> 安心
> ***@***.***
>
>
>
>
>
>
>
>
> ------------------ 原始邮件 ------------------
> 发件人: ***@***.***>;
> 发送时间: 2024年9月11日(星期三) 凌晨2:15
> 收件人: ***@***.***>;
> 抄送: ***@***.***>; ***@***.***>;
> 主题: Re: [aiwaves-cn/agents] LangChain primitive usage (Issue #110)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Ok I just got access to GitHub Models so I was planning on going back in
> the project and trying out new models. Do you have a discord or anything?
>
> On Tue, Sep 10, 2024, 3:03 AM Zhou guanya ***@***.***> wrote:
>
> > hi @zdondada <" rel="noopener" target="_blank">
> https://github.com/zdondada>; , I'm currently experimenting
> > with auto-update in the Langchain framework, but I've run into some
> issues
> > and would love to see your awesome work!
> >
> > I integrated with langchain and crewai I'll upload my code later
> >
> > —
> > Reply to this email directly, view it on GitHub
> > <" rel="noopener" target="_blank">
> #110 (comment)>;,
>
> > or unsubscribe
> > <" rel="noopener" target="_blank">
> https://github.com/notifications/unsubscribe-auth/ATTUXRKJEFQOF5CLSTIGZGDZV2KTDAVCNFSM6AAAAABLMROQCOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDGMZZHAZTEMJQGY>;
>
> > .
> > You are receiving this because you were mentioned.Message ID:
> > ***@***.***>
> >
>
> —
> Reply to this email directly, view it on GitHub, or unsubscribe.
> You are receiving this because you commented.Message ID: ***@***.***>
>
> —
> Reply to this email directly, view it on GitHub
> <#110 (comment)>,
> or unsubscribe
> <https://github.com/notifications/unsubscribe-auth/ATTUXRJO7MUKTOGHY4LSXELZV6QDRAVCNFSM6AAAAABLMROQCOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDGNBSGQ3DOMRUGA>
> .
> You are receiving this because you were mentioned.Message ID:
> ***@***.***>
>
|
I think this is the first thing I created when trying to follow the paper
that worked:
import json
import re
from typing import Any, List, Optional
import chromadb
from chromadb.config import Settings
from google.generativeai import GenerativeModel, configure
from langchain.llms.base import LLM
from pydantic import BaseModel, Field
from crewai import Agent, Task, Crew, Process
# Configure Google Gemini API
configure(api_key="YOUR_API_KEY_HERE")
# Initialize ChromaDB
chroma_client = chromadb.Client(Settings(persist_directory="./db"))
memory_collection = chroma_client.get_or_create_collection("agent_memory")
class GeminiWrapper(LLM):
model: Any
def __init__(self):
super().__init__()
self.model = GenerativeModel('gemini-pro')
self.callbacks = None
def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str:
response = self.model.generate_content(prompt)
return response.text
@Property
def _llm_type(self) -> str:
return "gemini"
gemini_llm = GeminiWrapper()
class PersistentMemoryAgent(Agent, BaseModel):
memory_id: str = Field(default="")
def __init__(self, **data):
super().__init__(**data)
self.memory_id = f"{self.role}_memory"
def remember(self, key, value):
memory_collection.upsert(
documents=[json.dumps(value)],
metadatas=[{"key": key}],
ids=[f"{self.memory_id}_{key}"]
)
def recall(self, key):
results = memory_collection.get(
ids=[f"{self.memory_id}_{key}"],
include=["documents"]
)
if results['documents']:
return json.loads(results['documents'][0])
return None
class DynamicAgentPipeline:
def __init__(self):
self.agents = []
self.llm = gemini_llm
def create_agent(self, role, goal, backstory):
agent = PersistentMemoryAgent(
role=role,
goal=goal,
backstory=backstory,
verbose=True,
allow_delegation=True,
llm=self.llm
)
self.agents.append(agent)
return agent
def get_or_create_agent(self, role, goal, backstory):
for agent in self.agents:
if agent.role == role:
return agent
return self.create_agent(role, goal, backstory)
class SymbolicLearningFramework:
def __init__(self, pipeline):
self.pipeline = pipeline
def forward_pass(self, input_data):
current_output = input_data
trajectory = []
for agent in self.pipeline.agents:
task = Task(
description=f"Process the following input:
{current_output}",
agent=agent,
expected_output="Processed output"
)
crew = Crew(agents=[agent], tasks=[task],
process=Process.sequential)
current_output = crew.kickoff()
trajectory.append((agent, current_output))
return current_output, trajectory
def compute_language_loss(self, output, expected_output):
loss_computer = self.pipeline.get_or_create_agent(
"Loss Computer",
"Compute the discrepancy between expected and actual outcomes",
"Expert in evaluating outputs and providing detailed feedback"
)
loss_task = Task(
description=f"Compute the loss between the output: '{output}'
and the expected output: '{expected_output}'. Provide a detailed analysis
and a numeric score from 0 to 10.",
agent=loss_computer,
expected_output="A detailed analysis and a numeric loss score"
)
loss_crew = Crew(agents=[loss_computer], tasks=[loss_task],
process=Process.sequential)
return loss_crew.kickoff()
def backward_pass(self, trajectory, loss):
gradients = []
for agent, output in reversed(trajectory):
gradient_computer = self.pipeline.get_or_create_agent(
f"Gradient Computer for {agent.role}",
f"Compute gradients for improving {agent.role}'s
performance",
f"Expert in analyzing performance and suggesting
improvements for {agent.role}"
)
gradient_task = Task(
description=f"Analyze the output: '{output}' and the
overall loss: '{loss}'. Provide specific suggestions for improving
{agent.role}'s performance. Include both textual analysis and a numeric
impact score from -1 to 1 for each suggestion.",
agent=gradient_computer,
expected_output="Specific improvement suggestions with
impact scores"
)
gradient_crew = Crew(agents=[gradient_computer],
tasks=[gradient_task], process=Process.sequential)
gradients.append((agent, gradient_crew.kickoff()))
return gradients
def apply_gradients(self, gradients):
for agent, gradient in gradients:
optimizer = self.pipeline.get_or_create_agent(
f"Optimizer for {agent.role}",
f"Optimize {agent.role}'s performance based on computed
gradients",
f"Expert in fine-tuning agent parameters and strategies for
{agent.role}"
)
optimization_task = Task(
description=f"Given the following gradient information for
{agent.role}: '{gradient}', provide specific updates to the agent's
parameters, including its role description, goal, and backstory. Format the
updates as a JSON object.",
agent=optimizer,
expected_output="JSON object with updated agent parameters"
)
optimization_crew = Crew(agents=[optimizer],
tasks=[optimization_task], process=Process.sequential)
updates = optimization_crew.kickoff()
# Apply the updates to the agent
self.update_agent(agent, updates)
def update_agent(self, agent, updates):
try:
updates_dict = json.loads(updates.string_output() if
hasattr(updates, 'string_output') else str(updates))
if 'role' in updates_dict:
agent.role = updates_dict['role']
if 'goal' in updates_dict:
agent.goal = updates_dict['goal']
if 'backstory' in updates_dict:
agent.backstory = updates_dict['backstory']
# Store the updates in persistent memory
agent.remember('latest_update', updates_dict)
except json.JSONDecodeError:
print(f"Error decoding JSON updates: {updates}")
def extract_numeric_value(self, text):
if hasattr(text, 'string_output'):
text = text.string_output()
numeric_values = re.findall(r'\d+(?:\.\d+)?', str(text))
if numeric_values:
return float(numeric_values[0])
else:
print(f"No numeric value found in: {text}")
return 10.0 # Default high loss
def train(self, input_data, expected_output, epochs=1, min_loss=2,
max_retries=3):
best_output = None
best_loss = float('inf')
learning_rate = 0.1
consecutive_errors = 0
for epoch in range(epochs):
retry_count = 0
while retry_count < max_retries:
try:
output, trajectory = self.forward_pass(input_data)
loss = self.compute_language_loss(output,
expected_output)
loss_value = self.extract_numeric_value(loss)
if loss_value < best_loss:
best_loss = loss_value
best_output = output
if loss_value <= min_loss:
print(f"Training completed. Loss: {loss_value}")
return output
gradients = self.backward_pass(trajectory, loss_value)
self.apply_gradients(gradients)
print(f"Epoch {epoch + 1}/{epochs} - Loss:
{loss_value}")
consecutive_errors = 0 # Reset error counter on
successful epoch
break # Exit retry loop if successful
except Exception as e:
retry_count += 1
consecutive_errors += 1
print(f"Error during training (Attempt
{retry_count}/{max_retries}): {e}")
if retry_count == max_retries:
print(f"Skipping epoch {epoch + 1} due to repeated
errors")
if consecutive_errors > 3:
learning_rate *= 0.5 # Reduce learning rate if
we're having repeated issues
print(f"Adjusting learning rate to {learning_rate}")
if consecutive_errors > 5:
print("Too many consecutive errors. Terminating
training early.")
return best_output if best_output is not None else
input_data
return best_output if best_output is not None else output
class DynamicTaskManager:
def __init__(self, framework):
self.framework = framework
def decompose_task(self, task_description):
decomposer = self.framework.pipeline.get_or_create_agent(
"Task Decomposer",
"Break down complex tasks into manageable subtasks",
"Expert in task analysis and strategic planning"
)
decompose_task = Task(
description=f"Analyze the following task and break it down into
subtasks: '{task_description}'. For each subtask, specify the required
agent role, goal, and backstory. Format the output as a JSON array.",
agent=decomposer,
expected_output="A JSON array of subtasks with roles, goals,
and backstories"
)
decompose_crew = Crew(agents=[decomposer], tasks=[decompose_task],
process=Process.sequential)
result = decompose_crew.kickoff()
return result.string_output() if hasattr(result, 'string_output')
else str(result)
def execute_task(self, task_description):
subtasks = self.decompose_task(task_description)
try:
# Remove backticks and "json" identifier if present
subtasks_str = re.sub(r'^```json\s*|\s*```$', '',
subtasks).strip()
subtasks_data = json.loads(subtasks_str)
if isinstance(subtasks_data, dict) and 'subtasks' in
subtasks_data:
subtasks_list = subtasks_data['subtasks']
elif isinstance(subtasks_data, list):
subtasks_list = subtasks_data
else:
raise ValueError("Unexpected subtasks format")
except json.JSONDecodeError:
print(f"Error decoding JSON. Raw output: {subtasks}")
return "Error: Unable to parse subtasks"
except ValueError as e:
print(f"Error: {str(e)}. Raw output: {subtasks}")
return "Error: Unexpected subtasks format"
final_output = ""
for subtask in subtasks_list:
if not isinstance(subtask, dict):
print(f"Error: Subtask is not a dictionary. Subtask:
{subtask}")
continue
agent = self.framework.pipeline.get_or_create_agent(
subtask.get('role', 'Default Role'),
subtask.get('goal', 'Complete the subtask'),
subtask.get('backstory', '')
)
subtask_execution = Task(
description=subtask.get('goal', str(subtask)),
agent=agent,
expected_output="Completed subtask output"
)
subtask_crew = Crew(agents=[agent], tasks=[subtask_execution],
process=Process.sequential)
subtask_output = subtask_crew.kickoff()
final_output += subtask_output.string_output() if
hasattr(subtask_output, 'string_output') else str(subtask_output) + "\n"
return final_output
class ContinuousLearner:
def __init__(self, framework, task_manager):
self.framework = framework
self.task_manager = task_manager
def learn_from_experience(self, task_description, expected_output,
max_attempts=3, learning_rate=0.1):
for attempt in range(max_attempts):
actual_output = self.task_manager.execute_task(task_description)
loss = self.framework.compute_language_loss(actual_output,
expected_output)
loss_value = self.framework.extract_numeric_value(loss)
print(f"Attempt {attempt + 1}, Loss: {loss_value}")
if loss_value <= 2: # Satisfactory performance
print(f"Task completed successfully on attempt {attempt +
1}. Loss: {loss_value}")
return actual_output
# Adjust learning rate based on loss
adjusted_learning_rate = learning_rate * loss_value / 10
self.framework.train(task_description, expected_output,
epochs=int(1 / adjusted_learning_rate), min_loss=2)
print(f"Failed to achieve satisfactory performance after
{max_attempts} attempts.")
return actual_output
def optimize_pipeline(self):
optimizer = self.framework.pipeline.get_or_create_agent(
"Pipeline Optimizer",
"Optimize the overall agent pipeline for improved performance",
"Expert in system architecture and optimization strategies"
)
optimize_task = Task(
description="Analyze the current agent pipeline and suggest
optimizations. Consider adding, removing, or modifying agent roles to
improve overall system performance. Provide suggestions in a JSON format.",
agent=optimizer,
expected_output="JSON-formatted optimization suggestions"
)
optimize_crew = Crew(agents=[optimizer], tasks=[optimize_task],
process=Process.sequential)
optimizations = optimize_crew.kickoff()
try:
# Remove backticks and "json" identifier if present
optimizations_str = re.sub(r'^```json\s*|\s*```$', '',
optimizations.string_output()).strip()
optimization_list = json.loads(optimizations_str)
# Check if 'optimizations' key exists and contains a list
if isinstance(optimization_list, dict) and 'optimizations' in
optimization_list:
optimization_list = optimization_list['optimizations']
elif not isinstance(optimization_list, list):
raise ValueError("Unexpected optimizations format. Expected
a list or a dictionary with 'optimizations' key")
for opt in optimization_list:
action = opt.get('action')
if action == 'add':
self.framework.pipeline.create_agent(opt.get('role'),
opt.get('goal'), opt.get('backstory'))
elif action == 'remove':
self.framework.pipeline.agents = [agent for agent in
self.framework.pipeline.agents if agent.role != opt.get('role')]
elif action == 'modify':
agent = next((a for a in self.framework.pipeline.agents
if a.role == opt.get('role')), None)
if agent:
self.framework.update_agent(agent,
json.dumps(opt.get('updates')))
except json.JSONDecodeError:
print(f"Error decoding JSON optimizations: {optimizations}")
except ValueError as e:
print(f"Error processing optimizations: {e}")
def execute_python_safely(code):
try:
exec(code)
return None # No error
except Exception as e:
return str(e) # Return the error message
# Usage example
pipeline = DynamicAgentPipeline()
framework = SymbolicLearningFramework(pipeline)
task_manager = DynamicTaskManager(framework)
continuous_learner = ContinuousLearner(framework, task_manager)
# Initial task execution
task = "Write a comprehensive report on the impact of artificial
intelligence on healthcare"
result = task_manager.execute_task(task)
print("Initial Result:", result)
# Learn from experience
expected_output = """
1. Introduction to AI in Healthcare
2. Current Applications of AI in Healthcare
3. Benefits of AI in Healthcare
4. Challenges and Limitations
5. Future Prospects
6. Ethical Considerations
7. Conclusion
"""
improved_result = continuous_learner.learn_from_experience(task,
expected_output)
print("Improved Result:", improved_result)
# Optimize the pipeline
continuous_learner.optimize_pipeline()
# Execute a coding task
coding_task = "Write a Python function to calculate the Fibonacci sequence
up to n terms"
code_result = task_manager.execute_task(coding_task)
print("Generated Code:", code_result)
# Execute the generated code safely
error = execute_python_safely(code_result)
if error:
print(f"Error in generated code: {error}")
# Re-attempt the coding task
code_result = continuous_learner.learn_from_experience(coding_task, "A
correct Python function to calculate Fibonacci sequence")
print("Improved Code:", code_result)
# Execute the improved code
error = execute_python_safely(code_result)
if error:
print(f"Error in improved code: {error}")
else:
print("Code executed successfully")
else:
print("Code executed successfully")
if __name__ == "__main__":
# This ensures that the script only runs when it's directly executed,
not when it's imported as a module
pass # You can replace this with any specific startup code if needed
…On Wed, Sep 18, 2024 at 9:56 PM Zach Bennett ***@***.***> wrote:
Then I was messing with this one (I had a bunch of different iterations,
I'm closing out a bunch of open notepads and sending you what I have as I
come across it, lol.)
import json
import ast
import re
from typing import Any, List, Optional
from collections import Counter
import nltk
from nltk.corpus import stopwords
import spacy
from spacy import displacy
import ast
nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')
import logging
import chromadb
from chromadb.config import Settings
from google.generativeai import GenerativeModel, configure
from langchain.llms.base import LLM
from pydantic import BaseModel, Field
from crewai import Agent, Task, Crew, Process
logging.basicConfig(level=logging.INFO, format='%(asctime)s -
%(levelname)s - %(message)s')
# Configure Google Gemini API
configure(api_key="AIzaSyBSzFDPZhK-2MD6o-uvWVnxmdBa26SO14A")
# Initialize ChromaDB
chroma_client = chromadb.Client(Settings(persist_directory="./db"))
memory_collection = chroma_client.get_or_create_collection("agent_memory")
class GeminiWrapper(LLM):
model: Any
def __init__(self):
super().__init__()
self.model = GenerativeModel('gemini-pro')
self.callbacks = None
def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str:
response = self.model.generate_content(prompt)
return response.text
@Property
def _llm_type(self) -> str:
return "gemini"
gemini_llm = GeminiWrapper()
class PersistentMemoryAgent(Agent, BaseModel):
memory_id: str = Field(default="")
def __init__(self, **data):
super().__init__(**data)
self.memory_id = f"{self.role}_memory"
def remember(self, key, value):
memory_collection.upsert(
documents=[json.dumps(value)],
metadatas=[{"key": key}],
ids=[f"{self.memory_id}_{key}"]
)
def recall(self, key):
results = memory_collection.get(
ids=[f"{self.memory_id}_{key}"],
include=["documents"]
)
if results['documents']:
return json.loads(results['documents'][0])
return None
class DynamicAgentPipeline:
def __init__(self):
self.agents = []
self.llm = gemini_llm
def create_agent(self, role, goal, backstory):
agent = PersistentMemoryAgent(
role=role,
goal=goal,
backstory=backstory,
verbose=True,
allow_delegation=True,
llm=self.llm
)
self.agents.append(agent)
return agent
def get_or_create_agent(self, role, goal, backstory):
for agent in self.agents:
if agent.role == role:
return agent
return self.create_agent(role, goal, backstory)
class SymbolicLearningFramework:
def __init__(self, pipeline):
self.pipeline = pipeline
def forward_pass(self, input_data):
current_output = input_data
trajectory = []
for agent in self.pipeline.agents:
task = Task(
description=f"Process the following input:
{current_output}",
agent=agent,
expected_output="Processed output"
)
crew = Crew(agents=[agent], tasks=[task],
process=Process.sequential)
current_output = crew.kickoff()
trajectory.append((agent, current_output))
return current_output, trajectory
def compute_language_loss(self, output, expected_output):
loss_computer = self.pipeline.get_or_create_agent(
"Loss Computer",
"Compute the discrepancy between expected and actual outcomes",
"Expert in evaluating outputs and providing detailed feedback"
)
loss_task = Task(
description=f"Compute the loss between the output: '{output}'
and the expected output: '{expected_output}'. Provide a detailed analysis
and a numeric score from 0 to 10.",
agent=loss_computer,
expected_output="A detailed analysis and a numeric loss score"
)
loss_crew = Crew(agents=[loss_computer], tasks=[loss_task],
process=Process.sequential)
return loss_crew.kickoff()
def backward_pass(self, trajectory, loss):
gradients = []
for agent, output in reversed(trajectory):
gradient_computer = self.pipeline.get_or_create_agent(
f"Gradient Computer for {agent.role}",
f"Compute gradients for improving {agent.role}'s
performance",
f"Expert in analyzing performance and suggesting
improvements for {agent.role}"
)
gradient_task = Task(
description=f"Analyze the output: '{output}' and the
overall loss: '{loss}'. Provide specific suggestions for improving
{agent.role}'s performance. Include both textual analysis and a numeric
impact score from -1 to 1 for each suggestion.",
agent=gradient_computer,
expected_output="Specific improvement suggestions with
impact scores"
)
gradient_crew = Crew(agents=[gradient_computer],
tasks=[gradient_task], process=Process.sequential)
gradients.append((agent, gradient_crew.kickoff()))
return gradients
def apply_gradients(self, gradients):
for agent, gradient in gradients:
optimizer = self.pipeline.get_or_create_agent(
f"Optimizer for {agent.role}",
f"Optimize {agent.role}'s performance based on computed
gradients",
f"Expert in fine-tuning agent parameters and strategies
for {agent.role}"
)
optimization_task = Task(
description=f"Given the following gradient information for
{agent.role}: '{gradient}', provide specific updates to the agent's
parameters, including its role description, goal, and backstory. Format the
updates as a JSON object.",
agent=optimizer,
expected_output="JSON object with updated agent parameters"
)
optimization_crew = Crew(agents=[optimizer],
tasks=[optimization_task], process=Process.sequential)
updates = optimization_crew.kickoff()
# Apply the updates to the agent
self.update_agent(agent, updates)
def update_agent(self, agent, updates):
try:
updates_dict = json.loads(updates.string_output() if
hasattr(updates, 'string_output') else str(updates))
if 'role' in updates_dict:
agent.role = updates_dict['role']
if 'goal' in updates_dict:
agent.goal = updates_dict['goal']
if 'backstory' in updates_dict:
agent.backstory = updates_dict['backstory']
# Store the updates in persistent memory
agent.remember('latest_update', updates_dict)
except json.JSONDecodeError:
print(f"Error decoding JSON updates: {updates}")
def extract_numeric_value(self, text):
if hasattr(text, 'string_output'):
text = text.string_output()
numeric_values = re.findall(r'\d+(?:\.\d+)?', str(text))
if numeric_values:
return float(numeric_values[0])
else:
print(f"No numeric value found in: {text}")
return 10.0 # Default high loss
def train(self, input_data, expected_output, epochs=1, min_loss=2,
max_retries=3, additional_feedback=""):
best_output = None
best_loss = float('inf')
learning_rate = 0.1
consecutive_errors = 0
for epoch in range(epochs):
retry_count = 0
while retry_count < max_retries:
try:
output, trajectory = self.forward_pass(input_data)
loss = self.compute_language_loss(output,
expected_output)
loss_value = self.extract_numeric_value(loss)
if loss_value < best_loss:
best_loss = loss_value
best_output = output
if loss_value <= min_loss:
print(f"Training completed. Loss: {loss_value}")
return output
gradients = self.backward_pass(trajectory, loss_value)
# Incorporate additional feedback into the gradient
computation
if additional_feedback:
feedback_task = Task(
description=f"Analyze the following feedback
and suggest improvements: {additional_feedback}",
agent=self.pipeline.get_or_create_agent("Feedback Analyzer", "Analyze
feedback and suggest improvements", "Expert in code review and
optimization"),
expected_output="Improvement suggestions based
on feedback"
)
feedback_crew = Crew(agents=[feedback_task.agent],
tasks=[feedback_task], process=Process.sequential)
feedback_suggestions = feedback_crew.kickoff()
gradients.append((feedback_task.agent,
feedback_suggestions))
self.apply_gradients(gradients)
print(f"Epoch {epoch + 1}/{epochs} - Loss:
{loss_value}")
consecutive_errors = 0 # Reset error counter on
successful epoch
break # Exit retry loop if successful
except Exception as e:
retry_count += 1
consecutive_errors += 1
print(f"Error during training (Attempt
{retry_count}/{max_retries}): {e}")
if retry_count == max_retries:
print(f"Skipping epoch {epoch + 1} due to repeated
errors")
if consecutive_errors > 3:
learning_rate *= 0.5 # Reduce learning rate if
we're having repeated issues
print(f"Adjusting learning rate to
{learning_rate}")
if consecutive_errors > 5:
print("Too many consecutive errors. Terminating
training early.")
return best_output if best_output is not None else
input_data
return best_output if best_output is not None else output
class DynamicTaskManager:
def __init__(self, framework):
self.framework = framework
def decompose_task(self, task_description):
decomposer = self.framework.pipeline.get_or_create_agent(
"Task Decomposer",
"Break down complex tasks into manageable subtasks",
"Expert in task analysis and strategic planning"
)
decompose_task = Task(
description=f"Analyze the following task and break it down
into subtasks: '{task_description}'. For each subtask, specify the required
agent role, goal, and backstory. Format the output as a JSON array.",
agent=decomposer,
expected_output="A JSON array of subtasks with roles, goals,
and backstories"
)
decompose_crew = Crew(agents=[decomposer], tasks=[decompose_task],
process=Process.sequential)
result = decompose_crew.kickoff()
return result.string_output() if hasattr(result, 'string_output')
else str(result)
def execute_task(self, task_description):
subtasks = self.decompose_task(task_description)
try:
subtasks_data = json.loads(subtasks)
if isinstance(subtasks_data, dict) and 'subtasks' in
subtasks_data:
subtasks_list = subtasks_data['subtasks']
elif isinstance(subtasks_data, list):
subtasks_list = subtasks_data
else:
raise ValueError("Unexpected subtasks format")
except json.JSONDecodeError:
print(f"Error decoding JSON. Raw output: {subtasks}")
return "Error: Unable to parse subtasks"
except ValueError as e:
print(f"Error: {str(e)}. Raw output: {subtasks}")
return "Error: Unexpected subtasks format"
final_output = ""
for subtask in subtasks_list:
if not isinstance(subtask, dict):
print(f"Error: Subtask is not a dictionary. Subtask:
{subtask}")
continue
agent = self.framework.pipeline.get_or_create_agent(
subtask.get('role', 'Default Role'),
subtask.get('goal', 'Complete the subtask'),
subtask.get('backstory', '')
)
subtask_execution = Task(
description=subtask.get('goal', str(subtask)), # Using
'goal' as the task description
agent=agent,
expected_output="Completed subtask output"
)
subtask_crew = Crew(agents=[agent], tasks=[subtask_execution],
process=Process.sequential)
subtask_output = subtask_crew.kickoff()
final_output += subtask_output.string_output() if
hasattr(subtask_output, 'string_output') else str(
subtask_output) + "\n"
return final_output
class ContinuousLearner:
def __init__(self, framework, task_manager):
self.framework = framework
self.task_manager = task_manager
def learn_from_experience(self, task_description, expected_output,
max_attempts=5, learning_rate=0.1):
for attempt in range(max_attempts):
actual_output =
self.task_manager.execute_task(task_description)
# For coding tasks, validate syntax first
if "Write a Python function" in task_description:
is_valid, error_message =
validate_python_syntax(actual_output)
if not is_valid:
print(f"Syntax Error (Attempt {attempt + 1}):
{error_message}")
loss_value = 10 # High loss for syntax errors
else:
# If syntax is valid, check for correctness
error = execute_python_safely(actual_output)
if error:
print(f"Runtime Error (Attempt {attempt + 1}):
{error}")
loss_value = 8 # High loss for runtime errors
else:
loss =
self.framework.compute_language_loss(actual_output, expected_output)
loss_value =
self.framework.extract_numeric_value(loss)
else:
# For non-coding tasks, compute loss as before
loss = self.framework.compute_language_loss(actual_output,
expected_output)
loss_value = self.framework.extract_numeric_value(loss)
print(f"Attempt {attempt + 1}, Loss: {loss_value}")
if loss_value <= 2: # Satisfactory performance
print(f"Task completed successfully on attempt {attempt +
1}. Loss: {loss_value}")
return actual_output
# Provide more specific feedback for improvement
if "Write a Python function" in task_description:
if not is_valid:
feedback = f"The code has a syntax error:
{error_message}. Please fix the syntax and ensure the code is valid Python."
elif error:
feedback = f"The code has a runtime error: {error}.
Please fix the logic to ensure the function runs correctly."
else:
feedback = "The code syntax is correct, but it may not
meet all requirements. Please review the function logic and output."
else:
feedback = "The output does not fully meet the expected
criteria. Please improve based on the given requirements."
# Adjust learning rate based on loss
adjusted_learning_rate = learning_rate * loss_value / 10
self.framework.train(task_description, expected_output,
epochs=int(1 / adjusted_learning_rate), min_loss=2,
additional_feedback=feedback)
print(f"Failed to achieve satisfactory performance after
{max_attempts} attempts.")
return actual_output
def optimize_pipeline(self):
optimizer = self.framework.pipeline.get_or_create_agent(
"Pipeline Optimizer",
"Optimize the overall agent pipeline for improved performance",
"Expert in system architecture and optimization strategies"
)
optimize_task = Task(
description="Analyze the current agent pipeline and suggest
optimizations. Consider adding, removing, or modifying agent roles to
improve overall system performance. Provide suggestions in a JSON format.",
agent=optimizer,
expected_output="JSON-formatted optimization suggestions"
)
optimize_crew = Crew(agents=[optimizer], tasks=[optimize_task],
process=Process.sequential)
optimizations = optimize_crew.kickoff()
try:
# Handle different types of output
if isinstance(optimizations, str):
optimizations_str = optimizations
elif hasattr(optimizations, 'string_output'):
optimizations_str = optimizations.string_output()
else:
optimizations_str = str(optimizations)
# Remove backticks and "json" identifier if present
optimizations_str = re.sub(r'^```json\s*|\s*```$', '',
optimizations_str).strip()
optimization_list = json.loads(optimizations_str)
# Check if 'optimizations' key exists and contains a list
if isinstance(optimization_list, dict) and 'optimizations' in
optimization_list:
optimization_list = optimization_list['optimizations']
elif not isinstance(optimization_list, list):
raise ValueError("Unexpected optimizations format.
Expected a list or a dictionary with 'optimizations' key")
for opt in optimization_list:
action = opt.get('action')
if action == 'add':
self.framework.pipeline.create_agent(opt.get('role'),
opt.get('goal'), opt.get('backstory'))
elif action == 'remove':
self.framework.pipeline.agents = [agent for agent in
self.framework.pipeline.agents if agent.role != opt.get('role')]
elif action == 'modify':
agent = next((a for a in
self.framework.pipeline.agents if a.role == opt.get('role')), None)
if agent:
self.framework.update_agent(agent,
json.dumps(opt.get('updates')))
except json.JSONDecodeError:
print(f"Error decoding JSON optimizations:
{optimizations_str}")
except ValueError as e:
print(f"Error processing optimizations: {e}")
except Exception as e:
print(f"Unexpected error during pipeline optimization: {e}")
# Modify the DynamicTaskManager class to handle the JSON parsing issue
class DynamicTaskManager:
def __init__(self, framework):
self.framework = framework
def decompose_task(self, task_description):
decomposer = self.framework.pipeline.get_or_create_agent(
"Task Decomposer",
"Break down complex tasks into manageable subtasks",
"Expert in task analysis and strategic planning"
)
decompose_task = Task(
description=f"Analyze the following task and break it down
into subtasks: '{task_description}'. For each subtask, specify the required
agent role, goal, and backstory. Format the output as a JSON array.",
agent=decomposer,
expected_output="A JSON array of subtasks with roles, goals,
and backstories"
)
decompose_crew = Crew(agents=[decomposer], tasks=[decompose_task],
process=Process.sequential)
result = decompose_crew.kickoff()
return result.string_output() if hasattr(result, 'string_output')
else str(result)
def execute_task(self, task_description):
subtasks = self.decompose_task(task_description)
try:
# Remove backticks and "json" identifier if present
subtasks_str = re.sub(r'^```json\s*|\s*```$', '',
subtasks).strip()
subtasks_data = json.loads(subtasks_str)
if isinstance(subtasks_data, dict) and 'subtasks' in
subtasks_data:
subtasks_list = subtasks_data['subtasks']
elif isinstance(subtasks_data, list):
subtasks_list = subtasks_data
else:
raise ValueError("Unexpected subtasks format")
except json.JSONDecodeError:
print(f"Error decoding JSON. Raw output: {subtasks}")
return "Error: Unable to parse subtasks"
except ValueError as e:
print(f"Error: {str(e)}. Raw output: {subtasks}")
return "Error: Unexpected subtasks format"
final_output = ""
for subtask in subtasks_list:
if not isinstance(subtask, dict):
print(f"Error: Subtask is not a dictionary. Subtask:
{subtask}")
continue
agent = self.framework.pipeline.get_or_create_agent(
subtask.get('role', 'Default Role'),
subtask.get('goal', 'Complete the subtask'),
subtask.get('backstory', '')
)
subtask_execution = Task(
description=subtask.get('goal', str(subtask)),
agent=agent,
expected_output="Completed subtask output"
)
subtask_crew = Crew(agents=[agent], tasks=[subtask_execution],
process=Process.sequential)
subtask_output = subtask_crew.kickoff()
final_output += subtask_output.string_output() if
hasattr(subtask_output, 'string_output') else str(subtask_output) + "\n"
return final_output
def execute_python_safely(code):
# First, validate the syntax
is_valid, error_message = validate_python_syntax(code)
if not is_valid:
return f"Syntax Error: {error_message}"
# If syntax is valid, try to execute
try:
exec(code)
return None # No error
except Exception as e:
return str(e) # Return the error message
def load_manuscript(file_path: str) -> str:
"""Load the manuscript content from a file."""
try:
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
except FileNotFoundError:
logging.error(f"File not found: {file_path}")
raise
except IOError as e:
logging.error(f"Error reading file {file_path}: {str(e)}")
raise
def split_into_chapters(manuscript_content: str) -> List[str]:
"""Split the manuscript content into chapters."""
chapters = manuscript_content.split("Chapter")[1:] # Assumes chapters
start with "Chapter"
return [chapter.strip() for chapter in chapters]
class GeminiAnalyzer:
def __init__(self):
self.model = GenerativeModel('gemini-pro')
def _safe_json_loads(self, text):
try:
return json.loads(text)
except json.JSONDecodeError:
logging.error(f"Failed to parse JSON: {text}")
return {"error": "Failed to parse response as JSON"}
def analyze_chapter(self, chapter_content: str, chapter_number: int)
-> dict:
prompt = f"""
Analyze the following chapter (Chapter {chapter_number}) of a
manuscript:
{chapter_content[:2000]}... # Truncated for API limits
Provide a detailed analysis including:
1. Main themes and ideas
2. Character development (if applicable)
3. Plot progression (if applicable)
4. Writing style and tone
5. Pacing and flow
6. Strengths of the chapter
7. Areas for potential improvement
8. How this chapter fits into the overall narrative (based on what
you can infer)
Format your response as a JSON object with these keys.
"""
try:
response = self.model.generate_content(prompt)
logging.info(f"Raw response for Chapter {chapter_number}:
{response.text}")
return self._safe_json_loads(response.text)
except Exception as e:
logging.error(f"Error analyzing Chapter {chapter_number}:
{str(e)}")
return {"error": f"Failed to analyze Chapter {chapter_number}"}
def analyze_transitions(self, prev_chapter: str, next_chapter: str,
chapter_number: int) -> dict:
prompt = f"""
Analyze the transition between these two consecutive chapters:
End of Chapter {chapter_number}:
{prev_chapter[-500:]}
Beginning of Chapter {chapter_number + 1}:
{next_chapter[:500]}
Provide an analysis of:
1. Continuity and flow between chapters
2. Effectiveness of the transition
3. Any cliffhangers or hooks used
4. Suggestions for improving the transition (if necessary)
Format your response as a JSON object with these keys.
"""
try:
response = self.model.generate_content(prompt)
logging.info(f"Raw response for transition
{chapter_number}-{chapter_number+1}: {response.text}")
return self._safe_json_loads(response.text)
except Exception as e:
logging.error(f"Error analyzing transition
{chapter_number}-{chapter_number+1}: {str(e)}")
return {"error": f"Failed to analyze transition
{chapter_number}-{chapter_number+1}"}
def analyze_overall_structure(self, chapter_summaries: List[dict]) ->
dict:
prompt = f"""
Based on the following summaries of each chapter, analyze the
overall structure of the manuscript:
{json.dumps(chapter_summaries, indent=2)}
Provide an analysis of:
1. Overall narrative arc
2. Pacing throughout the manuscript
3. Character development across chapters
4. Consistency in themes and tone
5. Strengths of the overall structure
6. Areas for potential improvement in the manuscript's structure
7. Suggestions for enhancing the flow and coherence of the
narrative
Format your response as a JSON object with these keys.
"""
try:
response = self.model.generate_content(prompt)
logging.info(f"Raw response for overall structure:
{response.text}")
return self._safe_json_loads(response.text)
except Exception as e:
logging.error(f"Error analyzing overall structure: {str(e)}")
return {"error": "Failed to analyze overall structure"}
def analyze_manuscript(file_path: str) -> dict:
"""Perform a deep analysis of the manuscript using Gemini."""
logging.info("Starting manuscript analysis")
manuscript_content = load_manuscript(file_path)
chapters = split_into_chapters(manuscript_content)
analyzer = GeminiAnalyzer()
chapter_analyses = []
transitions = []
for i, chapter in enumerate(chapters):
logging.info(f"Analyzing Chapter {i+1}")
chapter_analysis = analyzer.analyze_chapter(chapter, i+1)
chapter_analyses.append(chapter_analysis)
if i < len(chapters) - 1:
transition_analysis = analyzer.analyze_transitions(chapter,
chapters[i+1], i+1)
transitions.append(transition_analysis)
overall_analysis = analyzer.analyze_overall_structure(chapter_analyses)
logging.info("Manuscript analysis completed")
return {
'overall_analysis': overall_analysis,
'chapter_analyses': chapter_analyses,
'transitions': transitions
}
def print_analysis_results(analysis: dict):
"""Print the analysis results in a readable format."""
print("\n=== Overall Manuscript Analysis ===")
for key, value in analysis['overall_analysis'].items():
print(f"\n{key.replace('_', ' ').title()}:")
print(value)
print("\n=== Chapter-by-Chapter Analysis ===")
for i, chapter in enumerate(analysis['chapter_analyses']):
print(f"\nChapter {i+1}:")
for key, value in chapter.items():
print(f"\n{key.replace('_', ' ').title()}:")
print(value)
print("\n=== Transitions Between Chapters ===")
for i, transition in enumerate(analysis['transitions']):
print(f"\nTransition between Chapter {i+1} and Chapter {i+2}:")
for key, value in transition.items():
print(f"\n{key.replace('_', ' ').title()}:")
print(value)
def main():
try:
manuscript_path = "book.txt" # Update this path if your
manuscript is located elsewhere
analysis_results = analyze_manuscript(manuscript_path)
print_analysis_results(analysis_results)
except Exception as e:
logging.error(f"An error occurred during the manuscript analysis:
{str(e)}")
raise
if __name__ == "__main__":
main()
On Wed, Sep 18, 2024 at 9:43 PM Zach Bennett ***@***.***>
wrote:
> Here's one thing I was messing with:
>
>
> # Advanced EvoMARL+ Prototype with Extended Features
> # Using CrewAI, LangChain, and Gemini Pro API
>
> import os
> import numpy as np
> import torch
> import torch.nn as nn
> import torch.optim as optim
> from dotenv import load_dotenv
> from crewai import Agent, Task, Crew, Process
> from langchain.agents import Tool
> from langchain.llms import GooglePalm
> from langchain.chains import LLMChain
> from langchain.prompts import PromptTemplate
> from google.generativeai import GenerativeModel, generate_text
> from typing import List, Dict, Tuple
> import gym
> import matplotlib.pyplot as plt
> from sklearn.cluster import KMeans
>
> # Load environment variables
> load_dotenv()
>
> # Initialize Gemini Pro API
> genai_api_key = os.getenv("GENAI_API_KEY")
> gemini_model = GenerativeModel(model_name="gemini-pro",
> api_key=genai_api_key)
>
> # Initialize Google Palm as a fallback
> palm_llm = GooglePalm(google_api_key=os.getenv("GOOGLE_API_KEY"))
>
> # Define a more advanced Graph Neural Network
> class AdvancedGNN(nn.Module):
> def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
> super(AdvancedGNN, self).__init__()
> self.layers = nn.ModuleList([nn.Linear(input_dim if i == 0 else
> hidden_dim, hidden_dim) for i in range(num_layers)])
> self.output = nn.Linear(hidden_dim, output_dim)
> self.activation = nn.ReLU()
>
> def forward(self, x, adj_matrix):
> for layer in self.layers:
> x = self.activation(layer(torch.matmul(adj_matrix, x)))
> return self.output(x)
>
> # Define a simple environment for multi-agent reinforcement learning
> class SimpleMultiAgentEnv(gym.Env):
> def __init__(self, num_agents):
> super(SimpleMultiAgentEnv, self).__init__()
> self.num_agents = num_agents
> self.action_space = gym.spaces.Discrete(4) # 4 actions: up,
> down, left, right
> self.observation_space = gym.spaces.Box(low=-1, high=1,
> shape=(2,))
> self.reset()
>
> def reset(self):
> self.agents = np.random.uniform(-1, 1, (self.num_agents, 2))
> return self._get_obs()
>
> def step(self, actions):
> for i, action in enumerate(actions):
> if action == 0: # up
> self.agents[i, 1] = min(1, self.agents[i, 1] + 0.1)
> elif action == 1: # down
> self.agents[i, 1] = max(-1, self.agents[i, 1] - 0.1)
> elif action == 2: # left
> self.agents[i, 0] = max(-1, self.agents[i, 0] - 0.1)
> elif action == 3: # right
> self.agents[i, 0] = min(1, self.agents[i, 0] + 0.1)
>
> rewards = self._calculate_rewards()
> done = False # For simplicity, we'll say the episode never ends
> return self._get_obs(), rewards, done, {}
>
> def _get_obs(self):
> return self.agents
>
> def _calculate_rewards(self):
> # Reward based on distance to center
> distances = np.sqrt(np.sum(self.agents**2, axis=1))
> return -distances # Negative distance as reward (closer to
> center is better)
>
> # Enhanced EvoMARLTools
> class EvoMARLTools:
> @staticmethod
> def evolve_neural_network(input_dim: int, hidden_dim: int,
> output_dim: int, num_layers: int) -> AdvancedGNN:
> population_size = 20
> generations = 10
> mutation_rate = 0.1
>
> def create_individual():
> return AdvancedGNN(input_dim, hidden_dim, output_dim,
> num_layers)
>
> def mutate(individual):
> for param in individual.parameters():
> if np.random.rand() < mutation_rate:
> param.data += torch.randn_like(param.data) * 0.1
> return individual
>
> def fitness(individual, X, adj_matrix, y):
> with torch.no_grad():
> outputs = individual(X, adj_matrix)
> loss = nn.MSELoss()(outputs, y)
> return -loss.item()
>
> population = [create_individual() for _ in range(population_size)]
>
> X = torch.randn(10, input_dim)
> adj_matrix = torch.randint(0, 2, (10, 10)).float()
> y = torch.randn(10, output_dim)
>
> for generation in range(generations):
> fitnesses = [fitness(ind, X, adj_matrix, y) for ind in
> population]
> sorted_pop = [x for _, x in sorted(zip(fitnesses,
> population), key=lambda pair: pair[0], reverse=True)]
>
> new_population = sorted_pop[:2]
> while len(new_population) < population_size:
> parent1, parent2 = np.random.choice(sorted_pop[:5], 2,
> replace=False)
> child = create_individual()
> # Implement crossover
> for c_param, p1_param, p2_param in
> zip(child.parameters(), parent1.parameters(), parent2.parameters()):
>
> c_param.data.copy_(torch.where(torch.rand_like(c_param) < 0.5, p1_param,
> p2_param))
> child = mutate(child)
> new_population.append(child)
>
> population = new_population
>
> best_individual = max(population, key=lambda ind: fitness(ind, X,
> adj_matrix, y))
> return best_individual
>
> @staticmethod
> def optimize_swarm_behavior(n_agents: int, n_dimensions: int,
> iterations: int = 100) -> List[Dict[str, float]]:
> class Particle:
> def __init__(self):
> self.position = np.random.rand(n_dimensions)
> self.velocity = np.random.rand(n_dimensions)
> self.best_position = self.position.copy()
> self.best_score = float('-inf')
>
> def objective_function(x):
> return -(np.sum(x**2) + np.sum(np.sin(x))) # More complex
> objective function
>
> particles = [Particle() for _ in range(n_agents)]
> global_best_position = np.random.rand(n_dimensions)
> global_best_score = float('-inf')
>
> for _ in range(iterations):
> for particle in particles:
> score = objective_function(particle.position)
>
> if score > particle.best_score:
> particle.best_score = score
> particle.best_position = particle.position.copy()
>
> if score > global_best_score:
> global_best_score = score
> global_best_position = particle.position.copy()
>
> for particle in particles:
> inertia = 0.5
> cognitive = 1.5
> social = 1.5
>
> cognitive_velocity = cognitive * np.random.rand() *
> (particle.best_position - particle.position)
> social_velocity = social * np.random.rand() *
> (global_best_position - particle.position)
>
> particle.velocity = inertia * particle.velocity +
> cognitive_velocity + social_velocity
> particle.position += particle.velocity
> particle.position = np.clip(particle.position, 0, 1)
>
> return [{"position": p.position.tolist(), "best_score":
> p.best_score} for p in particles]
>
> @staticmethod
> def human_feedback_integration(actions: List[str], feedback:
> List[float], previous_weights: Dict[str, float] = None) -> Dict[str, float]:
> action_scores = {}
> for action, score in zip(actions, feedback):
> if action in action_scores:
> action_scores[action].append(score)
> else:
> action_scores[action] = [score]
>
> weighted_scores = {}
> for action, scores in action_scores.items():
> if previous_weights and action in previous_weights:
> # Incorporate previous weights for continuous learning
> weighted_scores[action] = 0.7 * np.average(scores,
> weights=np.linspace(0.5, 1, len(scores))) + 0.3 * previous_weights[action]
> else:
> weighted_scores[action] = np.average(scores,
> weights=np.linspace(0.5, 1, len(scores)))
>
> return weighted_scores
>
> @staticmethod
> def hierarchical_task_decomposition(task_description: str) ->
> Dict[str, List[str]]:
> prompt = f"Decompose the following task into a hierarchical
> structure with main tasks and subtasks, considering dependencies and
> parallel execution possibilities: {task_description}"
> response = gemini_model.generate_content(prompt)
>
> lines = response.text.split('\n')
> hierarchy = {}
> current_main_task = None
>
> for line in lines:
> if line.startswith('- '): # Main task
> current_main_task = line[2:].strip()
> hierarchy[current_main_task] = {"subtasks": [],
> "dependencies": [], "parallel": False}
> elif line.startswith(' - ') and current_main_task: # Subtask
> subtask = line[4:].strip()
> if "Dependency:" in subtask:
> dependency = subtask.split("Dependency:")[1].strip()
>
> hierarchy[current_main_task]["dependencies"].append(dependency)
> elif "Parallel" in subtask:
> hierarchy[current_main_task]["parallel"] = True
> else:
>
> hierarchy[current_main_task]["subtasks"].append(subtask)
>
> return hierarchy
>
> @staticmethod
> def transfer_learning(source_model: nn.Module, target_data:
> torch.Tensor, fine_tune_layers: List[str] = None) -> nn.Module:
> if fine_tune_layers:
> for name, param in source_model.named_parameters():
> if any(layer in name for layer in fine_tune_layers):
> param.requires_grad = True
> else:
> param.requires_grad = False
> else:
> for param in source_model.parameters():
> param.requires_grad = False
>
> if hasattr(source_model, 'output'):
> source_model.output =
> nn.Linear(source_model.output.in_features, target_data.shape[1])
> else:
> raise AttributeError("Source model does not have an 'output'
> layer. Please adjust the architecture.")
>
> optimizer = optim.Adam(filter(lambda p: p.requires_grad,
> source_model.parameters()), lr=0.001)
> criterion = nn.MSELoss()
>
> for epoch in range(200): # Increased epochs for better
> fine-tuning
> optimizer.zero_grad()
> output = source_model(target_data,
> torch.eye(target_data.shape[0]))
> loss = criterion(output, target_data)
> loss.backward()
> optimizer.step()
>
> if epoch % 50 == 0:
> print(f"Epoch {epoch}, Loss: {loss.item()}")
>
> return source_model
>
> @staticmethod
> def multi_agent_reinforcement_learning(num_agents: int, num_episodes:
> int) -> Tuple[List[float], List[AdvancedGNN]]:
> env = SimpleMultiAgentEnv(num_agents)
> agents = [AdvancedGNN(input_dim=2, hidden_dim=64, output_dim=4,
> num_layers=2) for _ in range(num_agents)]
> optimizers = [optim.Adam(agent.parameters(), lr=0.001) for agent
> in agents]
>
> episode_rewards = []
>
> for episode in range(num_episodes):
> state = env.reset()
> episode_reward = 0
>
> for _ in range(100): # 100 steps per episode
> actions = []
> log_probs = []
>
> for i, agent in enumerate(agents):
> agent_state = torch.FloatTensor(state[i])
> action_probs =
> torch.softmax(agent(agent_state.unsqueeze(0), torch.eye(1)), dim=1)
> action = torch.multinomial(action_probs, 1).item()
> actions.append(action)
> log_probs.append(torch.log(action_probs[0, action]))
>
> next_state, rewards, done, _ = env.step(actions)
> episode_reward += sum(rewards)
>
> # Update agents
> for i, (agent, optimizer, log_prob, reward) in
> enumerate(zip(agents, optimizers, log_probs, rewards)):
> optimizer.zero_grad()
> loss = -log_prob * reward # Simple policy gradient
> loss.backward()
> optimizer.step()
>
> state = next_state
> if done:
> break
>
> episode_rewards.append(episode_reward)
> if episode % 10 == 0:
> print(f"Episode {episode}, Average Reward:
> {np.mean(episode_rewards[-10:])}")
>
> return episode_rewards, agents
>
> @staticmethod
> def visualize_multi_agent_system(agents: np.ndarray):
> plt.figure(figsize=(8, 8))
> plt.scatter(agents[:, 0], agents[:, 1], c='blue', s=50)
> plt.xlim(-1, 1)
> plt.ylim(-1, 1)
> plt.title("Multi-Agent System Visualization")
> plt.xlabel("X coordinate")
> plt.ylabel("Y coordinate")
> plt.grid(True)
> plt.show()
>
> @staticmethod
> def cluster_agent_behaviors(agent_data: np.ndarray, n_clusters: int =
> 3) -> np.ndarray:
> kmeans = KMeans(n_clusters=n_clusters)
> clusters = kmeans.fit_predict(agent_data)
> return clusters
>
> # Create enhanced tools
> evolve_nn_tool = Tool(
> name="Evolve Neural Network",
> func=EvoMARLTools.evolve_neural_network,
> description="Evolves an advanced Graph Neural Network architecture"
> )
>
> optimize_swarm_tool = Tool(
> name="Optimize Swarm Behavior",
> func=EvoMARLTools.optimize_swarm_behavior,
> description="Optimizes the behavior of a swarm using enhanced
> Particle Swarm Optimization"
> )
>
> human_feedback_tool = Tool(
> name="Integrate Human Feedback",
> func=EvoMARLTools.human_feedback_integration,
> description="Integrates human feedback into the system using weighted
> averages and continuous learning"
> )
>
> task_decomposition_tool = Tool(
> name="Hierarchical Task Decomposition",
> func=EvoMARLTools.hierarchical_task_decomposition,
> description="Decomposes a high-level task into a hierarchical
> structure of subtasks with dependencies and parallelization"
> )
>
> transfer_learning_tool = Tool(
> name="Transfer Learning",
> func=EvoMARLTools.transfer_learning,
> description="Applies transfer learning to adapt a pre-trained model
> to new data"
> )
>
> reinforcement_learning_tool = Tool(
> name="Multi-Agent Reinforcement Learning",
> func=EvoMARLTools.multi_agent_reinforcement_learning,
> description="Trains multiple agents in a reinforcement learning
> setting"
> )
>
> visualization_tool = Tool(
> name="Visualize Multi-Agent System",
> func=EvoMARLTools.visualize_multi_agent_system,
> description="Visualizes the positions and interactions of agents in a
> multi-agent system"
> )
>
> clustering_tool = Tool(
> name="Cluster Agent Behaviors",
> func=EvoMARLTools.cluster_agent_behaviors,
> description="Clusters agent behaviors using K-Means clustering"
> )
>
> # Define more advanced agents
> advanced_architect_agent = Agent(
> role='Advanced System Architect',
> goal='Design and refine the architecture of EvoMARL+ with advanced
> features',
> tools=[evolve_nn_tool, task_decomposition_tool],
> llm=palm_llm
> )
|
Hi, I do not fully understand why you are not using the LangChain primities as there are plenty of tools already available. Is there an intent to migrate? Also, where is the roadmap ? thanks
The text was updated successfully, but these errors were encountered: