diff --git a/README.md b/README.md index bb9cdc3..0cde675 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,10 @@ Infrastructure for scalable, reliable, and economical Multi-Modal Model API serv - Swarms Memory API - OES API - Swarms Multi-Agent API + - Parallel API + - Sequential API + - Hiearchical API Etc + # License MIT diff --git a/api.py b/api.py index a255c56..d50cfbd 100644 --- a/api.py +++ b/api.py @@ -111,8 +111,6 @@ async def create_agent(request: Request, agent_input: AgentInput): max_loops=agent_input.max_loops, autosave=agent_input.autosave, dynamic_temperature_enabled=agent_input.dynamic_temperature_enabled, - dashboard=agent_input.dashboard, - verbose=agent_input.verbose, streaming_on=agent_input.streaming_on, saved_state_path=agent_input.saved_state_path, sop=agent_input.sop, @@ -200,8 +198,6 @@ async def agent_completions(agent_input: AgentInput): max_loops=max_loops, autosave=agent_input.autosave, dynamic_temperature_enabled=agent_input.dynamic_temperature_enabled, - dashboard=agent_input.dashboard, - verbose=agent_input.verbose, streaming_on=agent_input.streaming_on, saved_state_path=agent_input.saved_state_path, sop=agent_input.sop, diff --git a/parallel_swarm_api.py b/parallel_swarm_api.py new file mode 100644 index 0000000..9024be7 --- /dev/null +++ b/parallel_swarm_api.py @@ -0,0 +1,131 @@ +import os + +from fastapi import FastAPI, Request +from fastapi.middleware.cors import CORSMiddleware +from swarms_cloud.schema.agent_api_schemas import ( + ParallelSwarmAPIInput, + ParallelSwarmAPIOutput, +) +from swarms_cloud.schema.swarm_schema import SwarmAPISchema, AllSwarmsSchema +from swarms_cloud.utils.create_agent import create_agent_sync + +# Create a FastAPI app +app = FastAPI( + debug=True, + title="Parallel Swarm API", + version="0.1.0", +) + +# Load the middleware to handle CORS +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +@app.get("/") +def read_root(): + return {"Hello": "World"} + + +@app.get("/health") +def health(): + return {"status": "ok"} + + +@app.get("/version") +def version(): + return {"version": "0.1.0"} + + +@app.post("v1/swarms/parallel/create/{swarm_id}", response_model=SwarmAPISchema) +def create_parallel_swarm(request: Request, swarm_input: ParallelSwarmAPIInput): + config = swarm_input.config + task = swarm_input.task + + created_agents = [] + + # Parse the schema for all the agents + for agent in swarm_input.agents: + created_agents.append(create_agent_sync(agent)) + + # Now execute all the agents in parallel + import concurrent.futures + + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [executor.submit(agent.run, task) for agent in created_agents] + + # Wait for all the tasks to complete + results = [ + future.result() for future in concurrent.futures.as_completed(futures) + ] + + # + + +@app.post( + "v1/swarms/parallel/{swarm_id}/completions", response_model=ParallelSwarmAPIOutput +) +def run_parallel_swarm_completions( + request: Request, swarm_input: ParallelSwarmAPIInput +): + config = swarm_input.config + task = swarm_input.task + + created_agents = [] + + # Parse the schema for all the agents + for agent in swarm_input.agents: + created_agents.append(create_agent_sync(agent)) + + # Now execute all the agents in parallel + import concurrent.futures + + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [executor.submit(agent.run, task) for agent in created_agents] + + # Wait for all the tasks to complete + results = [ + future.result() for future in concurrent.futures.as_completed(futures) + ] + + # log_entry = ParallelSwarmAPIOutput( + # completions=MultipleAgentOutputs( + # agents = + # ) + # ) + + +@app.post("v1/swarms", response_model=AllSwarmsSchema) +def get_all_swarms(request: Request): + return AllSwarmsSchema( + swarms=[ + SwarmAPISchema( + id="1", + swarm_name="Swarm API", + swarm_description="Swarm API description", + created_at=1628584185, + owned_by="TGSC", + tags=["tag_1", "agent"], + use_cases={ + "use_case_1": "Use case 1 description", + "use_case_2": "Use case 2 description", + }, + ) + ] + ) + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run( + app, + host="0.0.0.0", + port=os.getenv("AGENT_PORT"), + use_colors=True, + log_level="info", + ) diff --git a/pyproject.toml b/pyproject.toml index 30ef48e..37ae319 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms-cloud" -version = "0.3.4" +version = "0.3.7" description = "Swarms Cloud - Pytorch" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms_cloud/schema/__init__.py b/swarms_cloud/schema/__init__.py index db91738..d85e224 100644 --- a/swarms_cloud/schema/__init__.py +++ b/swarms_cloud/schema/__init__.py @@ -28,7 +28,6 @@ GenerateResponse, GenerationConfig, ModelCard, - ModelList, ModelPermission, UsageInfo, ) diff --git a/swarms_cloud/schema/agent_api_schemas.py b/swarms_cloud/schema/agent_api_schemas.py index 0431552..4bdaca1 100644 --- a/swarms_cloud/schema/agent_api_schemas.py +++ b/swarms_cloud/schema/agent_api_schemas.py @@ -6,11 +6,12 @@ from swarms_cloud.schema.cog_vlm_schemas import ( AgentChatCompletionResponse, ) +from swarms_cloud.schema.swarm_schema import SwarmAPISchema # Define the input model using Pydantic class AgentInput(BaseModel): - id: str = (uuid.uuid4().hex,) + id: str = uuid.uuid4().hex created_at: int = time.time() owned_by: Optional[str] = Field(None, description="The owner of the agent.") agent_name: str = "Swarm Agent" @@ -18,12 +19,8 @@ class AgentInput(BaseModel): agent_description: str = None model_name: str = "OpenAIChat" max_loops: int = 1 - autosave: bool = False dynamic_temperature_enabled: bool = False - dashboard: bool = False - verbose: bool = False streaming_on: bool = False - saved_state_path: str = "agent_saved_state.json" sop: str = None sop_list: List[str] = None user_name: str = "User" @@ -80,7 +77,7 @@ class ModelSchema(BaseModel): class ModelList(BaseModel): object: str = "list" - data: List[ModelSchema] = [] + data: List[ModelSchema] = Field(..., description="The list of models available.") # Define the output model using Pydantic @@ -88,12 +85,6 @@ class AgentOutput(BaseModel): completions: AgentChatCompletionResponse -class MultipleAgentOutputs(BaseModel): - agents: List[AgentOutput] = Field( - ..., description="The list of agents and their completions." - ) - - class ParallelSwarmAPIInput(BaseModel): """ Represents a parallel swarm API. @@ -105,11 +96,13 @@ class ParallelSwarmAPIInput(BaseModel): owned_by (str): The owner of the API. """ - id: str = (uuid.uuid4().hex,) - swarm_name: str = "Swarm API" - agents: List[AgentInput] = [] - created_at: int = time.time() - owned_by: str = "TGSC" + config: SwarmAPISchema = Field( + ..., description="The configuration for the swarm API." + ) + agents: List[AgentInput] = Field( + ..., description="The list of agents in the swarm." + ) + task: str = Field(..., description="The task to be performed by the agents.,") class ParallelSwarmAPIOutput(BaseModel): @@ -123,10 +116,33 @@ class ParallelSwarmAPIOutput(BaseModel): owned_by (str): The owner of the API. """ - id: str = uuid.uuid4().hex - swarm_name: str = "Swarm API" - completions: MultipleAgentOutputs = Field( - ..., description="The list of agents in the swarm." + config: SwarmAPISchema = Field( + ..., description="The configuration for the swarm API." ) - created_at: int = time.time() - owned_by: str = "TGSC" + completions: List[AgentOutput] = Field( + ..., description="The list of agents and their completions." + ) + + +# full_example = ParallelSwarmAPIOutput( +# completions=[ +# AgentOutput( +# completions=AgentChatCompletionResponse( +# agent_name="Agent 1", +# completion="Completion 1", +# created_at=1628584185, +# owned_by="TGSC", +# ) +# ), +# AgentOutput( +# completions=AgentChatCompletionResponse( +# agent_name="Agent 2", +# completion="Completion 2", +# created_at=1628584185, +# owned_by="TGSC", +# ) +# ), +# ] +# ) + +# print(full_example.dict()) diff --git a/swarms_cloud/schema/cog_vlm_schemas.py b/swarms_cloud/schema/cog_vlm_schemas.py index 7aaf8b7..5ee965f 100644 --- a/swarms_cloud/schema/cog_vlm_schemas.py +++ b/swarms_cloud/schema/cog_vlm_schemas.py @@ -88,6 +88,7 @@ class UsageInfo(BaseModel): prompt_tokens: int = 0 total_tokens: int = 0 completion_tokens: Optional[int] = 0 + tokens_per_second: Optional[float] = Field(default_factory=lambda: 0.0) class ChatCompletionResponse(BaseModel): @@ -104,7 +105,7 @@ class ChatCompletionResponse(BaseModel): class AgentChatCompletionResponse(BaseModel): id: str = f"agent-{uuid.uuid4().hex}" - agent: str = Field( + agent_name: str = Field( ..., description="The name of the agent that generated the completion response.", ) @@ -115,7 +116,6 @@ class AgentChatCompletionResponse(BaseModel): created: Optional[int] = Field(default_factory=lambda: int(time.time())) usage: Optional[UsageInfo] = None completion_time: Optional[float] = Field(default_factory=lambda: 0.0) - tokens_per_second: Optional[float] = Field(default_factory=lambda: 0.0) # out = AgentChatCompletionResponse( diff --git a/swarms_cloud/schema/swarm_schema.py b/swarms_cloud/schema/swarm_schema.py new file mode 100644 index 0000000..a3d06fa --- /dev/null +++ b/swarms_cloud/schema/swarm_schema.py @@ -0,0 +1,55 @@ +from pydantic import BaseModel, Field +from typing import Dict, Optional, List +import time +import uuid + + +class SwarmAPISchema(BaseModel): + + id: str = Field(default_factory=lambda: uuid.uuid4().hex) + swarm_name: Optional[str] = Field(default="Swarm API") + swarm_description: Optional[str] = Field(default="Swarm API description") + created_at: Optional[int] = Field(default_factory=lambda: int(time.time())) + owned_by: Optional[str] = Field( + default="TGSC", + description="The owner of the API.", + examples="TGSC", + ) + tags: Optional[list] = Field( + default=..., + description="The tags for the API.", + examples=["tag_1", "agent"], + ) + use_cases: Optional[Dict[str, str]] = Field( + default=..., + description="The use cases for the API.", + examples={ + "use_case_1": "Use case 1 description", + "use_case_2": "Use case 2 description", + }, + ) + + +class AllSwarmsSchema(BaseModel): + swarms: Optional[List[SwarmAPISchema]] = Field( + default=..., + description="The list of all swarms.", + examples=[], + ) + + +# example = { +# "swarm_name": "Swarm API", +# "swarm_description": "Swarm API description", +# "created_at": 1628584185, +# "owned_by": "TGSC", +# "tags": ["tag_1", "agent"], +# "use_cases": { +# "use_case_1": "Use case 1 description", +# "use_case_2": "Use case 2 description", +# }, +# } + +# # Define the input model using Pydantic +# out = SwarmAPISchema(**example) +# print(out) diff --git a/swarms_cloud/utils/__init__.py b/swarms_cloud/utils/__init__.py index fbaa654..105c84a 100644 --- a/swarms_cloud/utils/__init__.py +++ b/swarms_cloud/utils/__init__.py @@ -1,9 +1,10 @@ from swarms_cloud.utils.api_key_generator import generate_api_key -from swarms_cloud.utils.calculate_pricing import calculate_pricing, count_tokens -from swarms_cloud.utils.rate_limiter import rate_limiter +from swarms_cloud.utils.calculate_pricing import calculate_pricing, count_tokens_hf from swarms_cloud.utils.check_model_list import ( create_error_response, ) +from swarms_cloud.utils.count_tokens import count_tokens, count_tokens_async +from swarms_cloud.utils.rate_limiter import rate_limiter __all__ = [ "generate_api_key", @@ -11,4 +12,6 @@ "count_tokens", "rate_limiter", "create_error_response", + "count_tokens_async", + "count_tokens_hf", ] diff --git a/swarms_cloud/utils/calculate_pricing.py b/swarms_cloud/utils/calculate_pricing.py index 58c0cd5..e9950e5 100644 --- a/swarms_cloud/utils/calculate_pricing.py +++ b/swarms_cloud/utils/calculate_pricing.py @@ -4,7 +4,9 @@ from transformers import AutoTokenizer, PreTrainedTokenizer -def count_tokens(texts: List[str], tokenizer: PreTrainedTokenizer, model: str) -> int: +def count_tokens_hf( + texts: List[str], tokenizer: PreTrainedTokenizer, model: str +) -> int: """ Counts the total number of tokens in a list of texts using a tokenizer. diff --git a/swarms_cloud/utils/count_tokens.py b/swarms_cloud/utils/count_tokens.py new file mode 100644 index 0000000..9805749 --- /dev/null +++ b/swarms_cloud/utils/count_tokens.py @@ -0,0 +1,59 @@ +import tiktoken +from fastapi import HTTPException +from swarms.utils.loguru_logger import logger +import asyncio + +logger.info("Starting the agent API server...") + + +def count_tokens(text: str) -> int: + """ + Counts the number of tokens in the given text. + + Args: + text (str): The input text to count tokens from. + + Returns: + int: The number of tokens in the text. + + Raises: + HTTPException: If there is an error counting tokens. + """ + try: + # Get the encoding for the specific model + enc = tiktoken.get_encoding("cl100k_base") + + # Encode the text + tokens = enc.encode(text) + + # Count the tokens + return len(tokens) + except Exception as e: + raise HTTPException(status_code=400, detail=f"Error counting tokens: {e}") + + +async def count_tokens_async(text: str) -> int: + """ + Counts the number of tokens in the given text. + + Args: + text (str): The input text. + + Returns: + int: The number of tokens in the text. + + Raises: + HTTPException: If there is an error counting tokens. + """ + loop = asyncio.get_event_loop() + try: + # Get the encoding for the specific model + enc = await loop.run_in_executor(None, tiktoken.get_encoding, "cl100k_base") + + # Encode the text + tokens = await loop.run_in_executor(None, enc.encode, text) + + # Count the tokens + return len(tokens) + except Exception as e: + raise HTTPException(status_code=400, detail=f"Error counting tokens: {e}") diff --git a/swarms_cloud/utils/create_agent.py b/swarms_cloud/utils/create_agent.py new file mode 100644 index 0000000..ac95f23 --- /dev/null +++ b/swarms_cloud/utils/create_agent.py @@ -0,0 +1,108 @@ +import asyncio +import os + +from pydantic import ValidationError +from swarms import Agent, OpenAIChat + +from swarms_cloud.schema.agent_api_schemas import AgentInput + + +def create_agent_sync(input_data: AgentInput) -> Agent: + """ + Creates an agent based on the provided input data synchronously. + + Args: + input_data (AgentInput): The input data model for the agent configuration. + + Returns: + Agent: The configured agent ready to run. + + Raises: + ValueError: If validation fails for the input data. + """ + try: + # Validate input data + input_data.validate(input_data.dict()) + + # Initialize the model + model = OpenAIChat( + api_key=os.getenv("OPENAI_API_KEY"), + model_name=input_data.model_name, + temperature=0.1, + ) + + # Initialize and return the agent + agent = Agent( + agent_name=input_data.agent_name, + system_prompt=input_data.system_prompt, + llm=model, + max_loops=input_data.max_loops, + dynamic_temperature_enabled=input_data.dynamic_temperature_enabled, + streaming_on=input_data.streaming_on, + sop=input_data.sop, + sop_list=input_data.sop_list, + user_name=input_data.user_name, + retry_attempts=input_data.retry_attempts, + context_length=input_data.context_length, + max_tokens=input_data.max_tokens, + tool_schema=input_data.tool_schema, + # long_term_memory=input_data.long_term_memory, + # tools=input_data.tools, + ) + return agent + + except ValidationError as e: + print(f"Validation Error: {e}") + raise + + +async def create_agent_async(input_data: AgentInput) -> Agent: + """ + Creates an agent based on the provided input data asynchronously. + + Args: + input_data (AgentInput): The input data model for the agent configuration. + + Returns: + Agent: The configured agent ready to run. + + Raises: + ValueError: If validation fails for the input data. + """ + try: + # Validate input data + input_data.validate(input_data.dict()) + + # Simulate async operation, e.g., fetching API keys + await asyncio.sleep(0) # Replace with actual async operation if needed + + # Initialize the model + model = OpenAIChat( + api_key=os.getenv("OPENAI_API_KEY"), + model_name=input_data.model_name, + temperature=0.1, + ) + + # Initialize and return the agent + agent = Agent( + agent_name=input_data.agent_name, + system_prompt=input_data.system_prompt, + llm=model, + max_loops=input_data.max_loops, + dynamic_temperature_enabled=input_data.dynamic_temperature_enabled, + streaming_on=input_data.streaming_on, + sop=input_data.sop, + sop_list=input_data.sop_list, + user_name=input_data.user_name, + retry_attempts=input_data.retry_attempts, + context_length=input_data.context_length, + max_tokens=input_data.max_tokens, + tool_schema=input_data.tool_schema, + # long_term_memory=input_data.long_term_memory, + # tools=input_data.tools, + ) + return agent + + except ValidationError as e: + print(f"Validation Error: {e}") + raise