-
Notifications
You must be signed in to change notification settings - Fork 649
/
maestro-gpt4o.py
276 lines (222 loc) · 15.1 KB
/
maestro-gpt4o.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
import os
import re
from rich.console import Console
from rich.panel import Panel
from datetime import datetime
import json
from openai import OpenAI
from anthropic import Anthropic
from tavily import TavilyClient
# Initialize OpenAI and Anthropic API clients
openai_client = OpenAI(api_key="YOUR API KEY")
anthropic_client = Anthropic(api_key="YOUR API KEY")
# Available OpenAI models
ORCHESTRATOR_MODEL = "gpt-4o"
SUB_AGENT_MODEL = "gpt-4o"
# Available Claude models for Anthropic API
REFINER_MODEL = "claude-3-opus-20240229"
# Initialize the Rich Console
console = Console()
def calculate_subagent_cost(model, input_tokens, output_tokens):
# Pricing information per model
pricing = {
"claude-3-opus-20240229": {"input_cost_per_mtok": 15.00, "output_cost_per_mtok": 75.00},
"claude-3-haiku-20240307": {"input_cost_per_mtok": 0.25, "output_cost_per_mtok": 1.25},
"claude-3-sonnet-20240229": {"input_cost_per_mtok": 3.00, "output_cost_per_mtok": 15.00},
}
# Calculate cost
input_cost = (input_tokens / 1_000_000) * pricing[model]["input_cost_per_mtok"]
output_cost = (output_tokens / 1_000_000) * pricing[model]["output_cost_per_mtok"]
total_cost = input_cost + output_cost
return total_cost
def gpt_orchestrator(objective, file_content=None, previous_results=None, use_search=False):
console.print(f"\n[bold]Calling Orchestrator for your objective[/bold]")
previous_results_text = "\n".join(previous_results) if previous_results else "None"
if file_content:
console.print(Panel(f"File content:\n{file_content}", title="[bold blue]File Content[/bold blue]", title_align="left", border_style="blue"))
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": f"Based on the following objective{' and file content' if file_content else ''}, and the previous sub-task results (if any), please break down the objective into the next sub-task, and create a concise and detailed prompt for a subagent so it can execute that task. IMPORTANT!!! when dealing with code tasks make sure you check the code for errors and provide fixes and support as part of the next sub-task. If you find any bugs or have suggestions for better code, please include them in the next sub-task prompt. Please assess if the objective has been fully achieved. If the previous sub-task results comprehensively address all aspects of the objective, include the phrase 'The task is complete:' at the beginning of your response. If the objective is not yet fully achieved, break it down into the next sub-task and create a concise and detailed prompt for a subagent to execute that task.:\n\nObjective: {objective}" + ('\nFile content:\n' + file_content if file_content else '') + f"\n\nPrevious sub-task results:\n{previous_results_text}"}
]
if use_search:
messages.append({"role": "user", "content": "Please also generate a JSON object containing a single 'search_query' key, which represents a question that, when asked online, would yield important information for solving the subtask. The question should be specific and targeted to elicit the most relevant and helpful resources. Format your JSON like this, with no additional text before or after:\n{\"search_query\": \"<question>\"}\n"})
gpt_response = openai_client.chat.completions.create(
model=ORCHESTRATOR_MODEL,
messages=messages,
max_tokens=4096
)
response_text = gpt_response.choices[0].message.content
usage = gpt_response.usage
console.print(Panel(response_text, title=f"[bold green]gpt Orchestrator[/bold green]", title_align="left", border_style="green", subtitle="Sending task to gpt 👇"))
console.print(f"Input Tokens: {usage.prompt_tokens}, Output Tokens: {usage.completion_tokens}, Total Tokens: {usage.total_tokens}")
search_query = None
if use_search:
json_match = re.search(r'{.*}', response_text, re.DOTALL)
if json_match:
json_string = json_match.group()
try:
search_query = json.loads(json_string)["search_query"]
console.print(Panel(f"Search Query: {search_query}", title="[bold blue]Search Query[/bold blue]", title_align="left", border_style="blue"))
response_text = response_text.replace(json_string, "").strip()
except json.JSONDecodeError as e:
console.print(Panel(f"Error parsing JSON: {e}", title="[bold red]JSON Parsing Error[/bold red]", title_align="left", border_style="red"))
console.print(Panel(f"Skipping search query extraction.", title="[bold yellow]Search Query Extraction Skipped[/bold yellow]", title_align="left", border_style="yellow"))
else:
search_query = None
return response_text, file_content, search_query
def gpt_sub_agent(prompt, search_query=None, previous_gpt_tasks=None, use_search=False, continuation=False):
if previous_gpt_tasks is None:
previous_gpt_tasks = []
continuation_prompt = "Continuing from the previous answer, please complete the response."
system_message = "Previous gpt tasks:\n" + "\n".join(f"Task: {task['task']}\nResult: {task['result']}" for task in previous_gpt_tasks)
if continuation:
prompt = continuation_prompt
qna_response = None
if search_query and use_search:
tavily = TavilyClient(api_key="YOUR_API_KEY")
qna_response = tavily.qna_search(query=search_query)
console.print(f"QnA response: {qna_response}", style="yellow")
messages = [
{"role": "system", "content": system_message},
{"role": "user", "content": prompt}
]
if qna_response:
messages.append({"role": "user", "content": f"\nSearch Results:\n{qna_response}"})
gpt_response = openai_client.chat.completions.create(
model=SUB_AGENT_MODEL,
messages=messages,
max_tokens=4096
)
response_text = gpt_response.choices[0].message.content
usage = gpt_response.usage
console.print(Panel(response_text, title="[bold blue]gpt Sub-agent Result[/bold blue]", title_align="left", border_style="blue", subtitle="Task completed, sending result to gpt 👇"))
console.print(f"Input Tokens: {usage.prompt_tokens}, Output Tokens: {usage.completion_tokens}, Total Tokens: {usage.total_tokens}")
if usage.completion_tokens >= 4000: # Threshold set to 4000 as a precaution
console.print("[bold yellow]Warning:[/bold yellow] Output may be truncated. Attempting to continue the response.")
continuation_response_text = gpt_sub_agent(prompt, search_query, previous_gpt_tasks, use_search, continuation=True)
response_text += continuation_response_text
return response_text
def anthropic_refine(objective, sub_task_results, filename, projectname, continuation=False):
console.print("\nCalling Opus to provide the refined final output for your objective:")
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": "Objective: " + objective + "\n\nSub-task results:\n" + "\n".join(sub_task_results) + "\n\nPlease review and refine the sub-task results into a cohesive final output. Add any missing information or details as needed. When working on code projects, ONLY AND ONLY IF THE PROJECT IS CLEARLY A CODING ONE please provide the following:\n1. Project Name: Create a concise and appropriate project name that fits the project based on what it's creating. The project name should be no more than 20 characters long.\n2. Folder Structure: Provide the folder structure as a valid JSON object, where each key represents a folder or file, and nested keys represent subfolders. Use null values for files. Ensure the JSON is properly formatted without any syntax errors. Please make sure all keys are enclosed in double quotes, and ensure objects are correctly encapsulated with braces, separating items with commas as necessary.\nWrap the JSON object in <folder_structure> tags.\n3. Code Files: For each code file, include ONLY the file name NEVER EVER USE THE FILE PATH OR ANY OTHER FORMATTING YOU ONLY USE THE FOLLOWING format 'Filename: <filename>' followed by the code block enclosed in triple backticks, with the language identifier after the opening backticks, like this:\n\n```python\n<code>\n```"}
]
}
]
opus_response = anthropic_client.messages.create(
model=REFINER_MODEL,
max_tokens=4096,
messages=messages
)
response_text = opus_response.content[0].text.strip()
console.print(f"Input Tokens: {opus_response.usage.input_tokens}, Output Tokens: {opus_response.usage.output_tokens}")
total_cost = calculate_subagent_cost(REFINER_MODEL, opus_response.usage.input_tokens, opus_response.usage.output_tokens)
console.print(f"Refine Cost: ${total_cost:.4f}")
if opus_response.usage.output_tokens >= 4000 and not continuation: # Threshold set to 4000 as a precaution
console.print("[bold yellow]Warning:[/bold yellow] Output may be truncated. Attempting to continue the response.")
continuation_response_text = anthropic_refine(objective, sub_task_results + [response_text], filename, projectname, continuation=True)
response_text += "\n" + continuation_response_text
console.print(Panel(response_text, title="[bold green]Final Output[/bold green]", title_align="left", border_style="green"))
return response_text
def create_folder_structure(project_name, folder_structure, code_blocks):
try:
os.makedirs(project_name, exist_ok=True)
console.print(Panel(f"Created project folder: [bold]{project_name}[/bold]", title="[bold green]Project Folder[/bold green]", title_align="left", border_style="green"))
except OSError as e:
console.print(Panel(f"Error creating project folder: [bold]{project_name}[/bold]\nError: {e}", title="[bold red]Project Folder Creation Error[/bold red]", title_align="left", border_style="red"))
return
create_folders_and_files(project_name, folder_structure, code_blocks)
def create_folders_and_files(current_path, structure, code_blocks):
for key, value in structure.items():
path = os.path.join(current_path, key)
if isinstance(value, dict):
try:
os.makedirs(path, exist_ok=True)
console.print(Panel(f"Created folder: [bold]{path}[/bold]", title="[bold blue]Folder Creation[/bold blue]", title_align="left", border_style="blue"))
create_folders_and_files(path, value, code_blocks)
except OSError as e:
console.print(Panel(f"Error creating folder: [bold]{path}[/bold]\nError: {e}", title="[bold red]Folder Creation Error[/bold red]", title_align="left", border_style="red"))
else:
code_content = next((code for file, code in code_blocks if file == key), None)
if code_content:
try:
with open(path, 'w') as file:
file.write(code_content)
console.print(Panel(f"Created file: [bold]{path}[/bold]", title="[bold green]File Creation[/bold green]", title_align="left", border_style="green"))
except IOError as e:
console.print(Panel(f"Error creating file: [bold]{path}[/bold]\nError: {e}", title="[bold red]File Creation Error[/bold red]", title_align="left", border_style="red"))
else:
console.print(Panel(f"Code content not found for file: [bold]{key}[/bold]", title="[bold yellow]Missing Code Content[/bold yellow]", title_align="left", border_style="yellow"))
def read_file(file_path):
with open(file_path, 'r') as file:
content = file.read()
return content
# Get the objective from user input
objective = input("Please enter your objective: ")
# Ask the user if they want to provide a file path
provide_file = input("Do you want to provide a file path? (y/n): ").lower() == 'y'
if provide_file:
file_path = input("Please enter the file path: ")
if os.path.exists(file_path):
file_content = read_file(file_path)
else:
print(f"File not found: {file_path}")
file_content = None
else:
file_content = None
# Ask the user if they want to use search
use_search = input("Do you want to use search? (y/n): ").lower() == 'y'
task_exchanges = []
gpt_tasks = []
while True:
previous_results = [result for _, result in task_exchanges]
if not task_exchanges:
gpt_result, file_content_for_gpt, search_query = gpt_orchestrator(objective, file_content, previous_results, use_search)
else:
gpt_result, _, search_query = gpt_orchestrator(objective, previous_results=previous_results, use_search=use_search)
if "The task is complete:" in gpt_result:
final_output = gpt_result.replace("The task is complete:", "").strip()
break
else:
sub_task_prompt = gpt_result
if file_content_for_gpt and not gpt_tasks:
sub_task_prompt = f"{sub_task_prompt}\n\nFile content:\n{file_content_for_gpt}"
sub_task_result = gpt_sub_agent(sub_task_prompt, search_query, gpt_tasks, use_search)
gpt_tasks.append({"task": sub_task_prompt, "result": sub_task_result})
task_exchanges.append((sub_task_prompt, sub_task_result))
file_content_for_gpt = None
sanitized_objective = re.sub(r'\W+', '_', objective)
timestamp = datetime.now().strftime("%H-%M-%S")
refined_output = anthropic_refine(objective, [result for _, result in task_exchanges], timestamp, sanitized_objective)
project_name_match = re.search(r'Project Name: (.*)', refined_output)
project_name = project_name_match.group(1).strip() if project_name_match else sanitized_objective
folder_structure_match = re.search(r'<folder_structure>(.*?)</folder_structure>', refined_output, re.DOTALL)
folder_structure = {}
if folder_structure_match:
json_string = folder_structure_match.group(1).strip()
try:
folder_structure = json.loads(json_string)
except json.JSONDecodeError as e:
console.print(Panel(f"Error parsing JSON: {e}", title="[bold red]JSON Parsing Error[/bold red]", title_align="left", border_style="red"))
console.print(Panel(f"Invalid JSON string: [bold]{json_string}[/bold]", title="[bold red]Invalid JSON String[/bold red]", title_align="left", border_style="red"))
# Ensure proper extraction of filenames and code contents
code_blocks = re.findall(r'Filename: (\S+)\s*```[\w]*\n(.*?)\n```', refined_output, re.DOTALL)
create_folder_structure(project_name, folder_structure, code_blocks)
max_length = 25
truncated_objective = sanitized_objective[:max_length] if len(sanitized_objective) > max_length else sanitized_objective
filename = f"{timestamp}_{truncated_objective}.md"
exchange_log = f"Objective: {objective}\n\n"
exchange_log += "=" * 40 + " Task Breakdown " + "=" * 40 + "\n\n"
for i, (prompt, result) in enumerate(task_exchanges, start=1):
exchange_log += f"Task {i}:\n"
exchange_log += f"Prompt: {prompt}\n"
exchange_log += f"Result: {result}\n\n"
exchange_log += "=" * 40 + " Refined Final Output " + "=" * 40 + "\n\n"
exchange_log += refined_output
console.print(f"\n[bold]Refined Final output:[/bold]\n{refined_output}")
with open(filename, 'w') as file:
file.write(exchange_log)
print(f"\nFull exchange log saved to {filename}")