From 806082f651fd6fdfa35a7215e086bb305928d49d Mon Sep 17 00:00:00 2001 From: Max Rux Date: Fri, 10 Jan 2025 14:26:52 -0600 Subject: [PATCH] init --- agents/example.json | 4 + src/actions/agentipy_actions.py | 219 +++++++++++++ src/connection_manager.py | 100 +++--- src/connections/agentipy_connection.py | 410 +++++++++++++++++++++++++ 4 files changed, 698 insertions(+), 35 deletions(-) create mode 100644 src/actions/agentipy_actions.py create mode 100644 src/connections/agentipy_connection.py diff --git a/agents/example.json b/agents/example.json index 1184ca5..d1bbdb8 100644 --- a/agents/example.json +++ b/agents/example.json @@ -60,6 +60,10 @@ { "name": "galadriel", "model": "gpt-3.5-turbo" + }, + { + "name": "agentipy", + "rpc": "https://api.mainnet-beta.solana.com" } ], "tasks": [ diff --git a/src/actions/agentipy_actions.py b/src/actions/agentipy_actions.py new file mode 100644 index 0000000..ad4a1ad --- /dev/null +++ b/src/actions/agentipy_actions.py @@ -0,0 +1,219 @@ +import logging +from src.action_handler import register_action + +logger = logging.getLogger("agent") + + +@register_action("agentipy-transfer") +def agentipy_transfer(agent, **kwargs): + """Transfer SOL or SPL tokens""" + agent.logger.info("\nšŸ’ø INITIATING TRANSFER") + try: + result = agent.connection_manager.perform_action( + connection_name="agentipy", + action_name="transfer", + params=[ + kwargs.get("to_address"), + kwargs.get("amount"), + kwargs.get("token_mint", None), + ], + ) + agent.logger.info("āœ… Transfer completed!") + return result + except Exception as e: + agent.logger.error(f"āŒ Transfer failed: {str(e)}") + return False + + +@register_action("agentipy-swap") +def agentipy_swap(agent, **kwargs): + """Swap tokens using Jupiter""" + agent.logger.info("\nšŸ”„ INITIATING TOKEN SWAP") + try: + result = agent.connection_manager.perform_action( + connection_name="agentipy", + action_name="trade", + params=[ + kwargs.get("output_mint"), + kwargs.get("input_amount"), + kwargs.get("input_mint", None), + kwargs.get("slippage_bps", 100), + ], + ) + agent.logger.info("āœ… Swap completed!") + return result + except Exception as e: + agent.logger.error(f"āŒ Swap failed: {str(e)}") + return False + + +@register_action("agentipy-balance") +def agentipy_balance(agent, **kwargs): + """Check SOL or token balance""" + agent.logger.info("\nšŸ’° CHECKING BALANCE") + try: + result = agent.connection_manager.perform_action( + connection_name="agentipy", + action_name="get-balance", + params=[kwargs.get("token_address", None)], + ) + agent.logger.info(f"Balance: {result}") + return result + except Exception as e: + agent.logger.error(f"āŒ Balance check failed: {str(e)}") + return None + + +@register_action("agentipy-stake") +def agentipy_stake(agent, **kwargs): + """Stake SOL""" + agent.logger.info("\nšŸŽÆ INITIATING SOL STAKE") + try: + result = agent.connection_manager.perform_action( + connection_name="agentipy", + action_name="stake", + params=[kwargs.get("amount")], + ) + agent.logger.info("āœ… Staking completed!") + return result + except Exception as e: + agent.logger.error(f"āŒ Staking failed: {str(e)}") + return False + + +@register_action("agentipy-lend") +def agentipy_lend(agent, **kwargs): + """Lend assets using Lulo""" + agent.logger.info("\nšŸ¦ INITIATING LENDING") + try: + result = agent.connection_manager.perform_action( + connection_name="agentipy", + action_name="lend-assets", + params=[kwargs.get("amount")], + ) + agent.logger.info("āœ… Lending completed!") + return result + except Exception as e: + agent.logger.error(f"āŒ Lending failed: {str(e)}") + return False + + +@register_action("agentipy-request-funds") +def request_faucet_funds(agent, **kwargs): + """Request faucet funds for testing""" + agent.logger.info("\nšŸš° REQUESTING FAUCET FUNDS") + try: + result = agent.connection_manager.perform_action( + connection_name="agentipy", action_name="request-faucet", params=[] + ) + agent.logger.info("āœ… Faucet request completed!") + return result + except Exception as e: + agent.logger.error(f"āŒ Faucet request failed: {str(e)}") + return False + + +@register_action("agentipy-deploy-token") +def agentipy_deploy_token(agent, **kwargs): + """Deploy a new token""" + agent.logger.info("\nšŸŖ™ DEPLOYING NEW TOKEN") + try: + result = agent.connection_manager.perform_action( + connection_name="agentipy", + action_name="deploy-token", + params=[kwargs.get("decimals", 9)], + ) + agent.logger.info("āœ… Token deployed!") + return result + except Exception as e: + agent.logger.error(f"āŒ Token deployment failed: {str(e)}") + return False + + +@register_action("agentipy-get-price") +def agentipy_get_price(agent, **kwargs): + """Get token price""" + agent.logger.info("\nšŸ’² FETCHING TOKEN PRICE") + try: + result = agent.connection_manager.perform_action( + connection_name="agentipy", + action_name="fetch-price", + params=[kwargs.get("token_id")], + ) + agent.logger.info(f"Price: {result}") + return result + except Exception as e: + agent.logger.error(f"āŒ Price fetch failed: {str(e)}") + return None + + +@register_action("agentipy-get-tps") +def agentipy_get_tps(agent, **kwargs): + """Get current Solana TPS""" + agent.logger.info("\nšŸ“Š FETCHING CURRENT TPS") + try: + result = agent.connection_manager.perform_action( + connection_name="agentipy", action_name="get-tps", params=[] + ) + agent.logger.info(f"Current TPS: {result}") + return result + except Exception as e: + agent.logger.error(f"āŒ TPS fetch failed: {str(e)}") + return None + + +@register_action("agentipy-get-token-by-ticker") +def get_token_data_by_ticker(agent, **kwargs): + """Get token data by ticker""" + agent.logger.info("\nšŸ” FETCHING TOKEN DATA BY TICKER") + try: + result = agent.connection_manager.perform_action( + connection_name="agentipy", + action_name="get-token-by-ticker", + params=[kwargs.get("ticker")], + ) + agent.logger.info("āœ… Token data retrieved!") + return result + except Exception as e: + agent.logger.error(f"āŒ Token data fetch failed: {str(e)}") + return None + + +@register_action("agentipy-get-token-by-address") +def get_token_data_by_address(agent, **kwargs): + """Get token data by address""" + agent.logger.info("\nšŸ” FETCHING TOKEN DATA BY ADDRESS") + try: + result = agent.connection_manager.perform_action( + connection_name="agentipy", + action_name="get-token-by-address", + params=[kwargs.get("mint")], + ) + agent.logger.info("āœ… Token data retrieved!") + return result + except Exception as e: + agent.logger.error(f"āŒ Token data fetch failed: {str(e)}") + return None + + +@register_action("agentipy-launch-pump-token") +def launch_pump_fun_token(agent, **kwargs): + """Launch a Pump & Fun token""" + agent.logger.info("\nšŸš€ LAUNCHING PUMP & FUN TOKEN") + try: + result = agent.connection_manager.perform_action( + connection_name="agentipy", + action_name="launch-pump-token", + params=[ + kwargs.get("token_name"), + kwargs.get("token_ticker"), + kwargs.get("description"), + kwargs.get("image_url"), + kwargs.get("options", {}), + ], + ) + agent.logger.info("āœ… Token launched successfully!") + return result + except Exception as e: + agent.logger.error(f"āŒ Token launch failed: {str(e)}") + return False diff --git a/src/connection_manager.py b/src/connection_manager.py index 657a1d7..f1e3aac 100644 --- a/src/connection_manager.py +++ b/src/connection_manager.py @@ -1,5 +1,6 @@ import logging from typing import Any, List, Optional, Type, Dict +from src.connections.agentipy_connection import AgentipyConnection from src.connections.base_connection import BaseConnection from src.connections.anthropic_connection import AnthropicConnection from src.connections.eternalai_connection import EternalAIConnection @@ -14,12 +15,13 @@ logger = logging.getLogger("connection_manager") + class ConnectionManager: def __init__(self, agent_config): - self.connections : Dict[str, BaseConnection] = {} + self.connections: Dict[str, BaseConnection] = {} for config in agent_config: self._register_connection(config) - + @staticmethod def _class_name_to_type(class_name: str) -> Type[BaseConnection]: if class_name == "twitter": @@ -42,13 +44,15 @@ def _class_name_to_type(class_name: str) -> Type[BaseConnection]: return HyperbolicConnection elif class_name == "galadriel": return GaladrielConnection + elif class_name == "agentipy": + return AgentipyConnection return None - + def _register_connection(self, config_dic: Dict[str, Any]) -> None: """ Create and register a new connection with configuration - + Args: name: Identifier for the connection connection_class: The connection class to instantiate @@ -62,12 +66,14 @@ def _register_connection(self, config_dic: Dict[str, Any]) -> None: except Exception as e: logging.error(f"Failed to initialize connection {name}: {e}") - def _check_connection(self, connection_string: str)-> bool: + def _check_connection(self, connection_string: str) -> bool: try: connection = self.connections[connection_string] return connection.is_configured(verbose=True) except KeyError: - logging.error("\nUnknown connection. Try 'list-connections' to see all supported connections.") + logging.error( + "\nUnknown connection. Try 'list-connections' to see all supported connections." + ) return False except Exception as e: logging.error(f"\nAn error occurred: {e}") @@ -78,15 +84,19 @@ def configure_connection(self, connection_name: str) -> bool: try: connection = self.connections[connection_name] success = connection.configure() - + if success: - logging.info(f"\nāœ… SUCCESSFULLY CONFIGURED CONNECTION: {connection_name}") + logging.info( + f"\nāœ… SUCCESSFULLY CONFIGURED CONNECTION: {connection_name}" + ) else: logging.error(f"\nāŒ ERROR CONFIGURING CONNECTION: {connection_name}") return success - + except KeyError: - logging.error("\nUnknown connection. Try 'list-connections' to see all supported connections.") + logging.error( + "\nUnknown connection. Try 'list-connections' to see all supported connections." + ) return False except Exception as e: logging.error(f"\nAn error occurred: {e}") @@ -96,19 +106,25 @@ def list_connections(self) -> None: """List all available connections and their status""" logging.info("\nAVAILABLE CONNECTIONS:") for name, connection in self.connections.items(): - status = "āœ… Configured" if connection.is_configured() else "āŒ Not Configured" + status = ( + "āœ… Configured" if connection.is_configured() else "āŒ Not Configured" + ) logging.info(f"- {name}: {status}") def list_actions(self, connection_name: str) -> None: """List all available actions for a specific connection""" try: connection = self.connections[connection_name] - + if connection.is_configured(): - logging.info(f"\nāœ… {connection_name} is configured. You can use any of its actions.") + logging.info( + f"\nāœ… {connection_name} is configured. You can use any of its actions." + ) else: - logging.info(f"\nāŒ {connection_name} is not configured. You must configure a connection to use its actions.") - + logging.info( + f"\nāŒ {connection_name} is not configured. You must configure a connection to use its actions." + ) + logging.info("\nAVAILABLE ACTIONS:") for action_name, action in connection.actions.items(): logging.info(f"- {action_name}: {action.description}") @@ -116,56 +132,70 @@ def list_actions(self, connection_name: str) -> None: for param in action.parameters: req = "required" if param.required else "optional" logging.info(f" - {param.name} ({req}): {param.description}") - + except KeyError: - logging.error("\nUnknown connection. Try 'list-connections' to see all supported connections.") + logging.error( + "\nUnknown connection. Try 'list-connections' to see all supported connections." + ) except Exception as e: logging.error(f"\nAn error occurred: {e}") - def perform_action(self, connection_name: str, action_name: str, params: List[Any]) -> Optional[Any]: + def perform_action( + self, connection_name: str, action_name: str, params: List[Any] + ) -> Optional[Any]: """Perform an action on a specific connection with given parameters""" try: connection = self.connections[connection_name] - + if not connection.is_configured(): - logging.error(f"\nError: Connection '{connection_name}' is not configured") + logging.error( + f"\nError: Connection '{connection_name}' is not configured" + ) return None - + if action_name not in connection.actions: - logging.error(f"\nError: Unknown action '{action_name}' for connection '{connection_name}'") + logging.error( + f"\nError: Unknown action '{action_name}' for connection '{connection_name}'" + ) return None - + action = connection.actions[action_name] - + # Convert list of params to kwargs dictionary, handling both required and optional params kwargs = {} param_index = 0 - + # Add provided parameters up to the number provided for i, param in enumerate(action.parameters): if param_index < len(params): kwargs[param.name] = params[param_index] param_index += 1 - + # Validate all required parameters are present missing_required = [ - param.name for param in action.parameters + param.name + for param in action.parameters if param.required and param.name not in kwargs ] - + if missing_required: - logging.error(f"\nError: Missing required parameters: {', '.join(missing_required)}") + logging.error( + f"\nError: Missing required parameters: {', '.join(missing_required)}" + ) return None - + return connection.perform_action(action_name, kwargs) - + except Exception as e: - logging.error(f"\nAn error occurred while trying action {action_name} for {connection_name} connection: {e}") + logging.error( + f"\nAn error occurred while trying action {action_name} for {connection_name} connection: {e}" + ) return None def get_model_providers(self) -> List[str]: """Get a list of all LLM provider connections""" return [ - name for name, conn in self.connections.items() - if conn.is_configured() and getattr(conn, 'is_llm_provider', lambda: False) - ] \ No newline at end of file + name + for name, conn in self.connections.items() + if conn.is_configured() and getattr(conn, "is_llm_provider", lambda: False) + ] diff --git a/src/connections/agentipy_connection.py b/src/connections/agentipy_connection.py new file mode 100644 index 0000000..da2290d --- /dev/null +++ b/src/connections/agentipy_connection.py @@ -0,0 +1,410 @@ +import logging +import os +import requests +import asyncio +from typing import Dict, Any, Optional + +from src.connections.base_connection import BaseConnection, Action, ActionParameter +from src.types import JupiterTokenData +from src.constants import LAMPORTS_PER_SOL, SPL_TOKENS + + +from agentipy import SolanaAgentKit + +from dotenv import load_dotenv, set_key + +from jupiter_python_sdk.jupiter import Jupiter + +from solana.rpc.async_api import AsyncClient +from solana.rpc.commitment import Confirmed + +from solders.keypair import Keypair # type: ignore + +from solders.pubkey import Pubkey # type: ignore + +logger = logging.getLogger("connections.solana_connection") + + +class AgentipyConnectionError(Exception): + """Base exception for Solana connection errors""" + + pass + + +class AgentipyConfigurationError(AgentipyConnectionError): + """Raised when there are configuration/credential issues""" + + pass + + +class AgentipyConnection(BaseConnection): + def __init__(self, config: Dict[str, Any]): + logger.info("Initializing Solana connection...") + super().__init__(config) + + @property + def is_llm_provider(self) -> bool: + return False + + def _get_connection_async(self) -> AsyncClient: + conn = AsyncClient(self.config["rpc"]) + return conn + + def _get_wallet(self): + creds = self._get_credentials() + return Keypair.from_base58_string(creds["SOLANA_PRIVATE_KEY"]) + + def _get_credentials(self) -> Dict[str, str]: + """Get Solana credentials from environment with validation""" + logger.debug("Retrieving Solana Credentials") + load_dotenv() + required_vars = {"SOLANA_PRIVATE_KEY": "solana wallet private key"} + credentials = {} + missing = [] + + for env_var, description in required_vars.items(): + value = os.getenv(env_var) + if not value: + missing.append(description) + credentials[env_var] = value + + if missing: + error_msg = f"Missing Solana credentials: {', '.join(missing)}" + raise AgentipyConfigurationError(error_msg) + + Keypair.from_base58_string(credentials["SOLANA_PRIVATE_KEY"]) + logger.debug("All required credentials found") + return credentials + + def _get_agentipy(self) -> SolanaAgentKit: + priv_key = self._get_credentials()["SOLANA_PRIVATE_KEY"] + agentipy = SolanaAgentKit(priv_key, self.config["rpc"]) + return agentipy + + def _get_jupiter(self, keypair, async_client): + jupiter = Jupiter( + async_client=async_client, + keypair=keypair, + quote_api_url="https://quote-api.jup.ag/v6/quote?", + swap_api_url="https://quote-api.jup.ag/v6/swap", + open_order_api_url="https://jup.ag/api/limit/v1/createOrder", + cancel_orders_api_url="https://jup.ag/api/limit/v1/cancelOrders", + query_open_orders_api_url="https://jup.ag/api/limit/v1/openOrders?wallet=", + query_order_history_api_url="https://jup.ag/api/limit/v1/orderHistory", + query_trade_history_api_url="https://jup.ag/api/limit/v1/tradeHistory", + ) + return jupiter + + def validate_config(self, config: Dict[str, Any]) -> Dict[str, Any]: + """Validate Solana configuration from JSON""" + required_fields = ["rpc"] + missing_fields = [field for field in required_fields if field not in config] + if missing_fields: + raise ValueError( + f"Missing required configuration fields: {', '.join(missing_fields)}" + ) + + if not isinstance(config["rpc"], str): + raise ValueError("rpc must be a positive integer") + + return config + + def register_actions(self) -> None: + """Register available Solana actions""" + self.actions = { + "transfer": Action( + name="transfer", + parameters=[ + ActionParameter("to_address", True, str, "Destination address"), + ActionParameter("amount", True, float, "Amount to transfer"), + ActionParameter( + "token_mint", + False, + str, + "Token mint address (optional for SOL)", + ), + ], + description="Transfer SOL or SPL tokens", + ), + "trade": Action( + name="trade", + parameters=[ + ActionParameter( + "output_mint", True, str, "Output token mint address" + ), + ActionParameter("input_amount", True, float, "Input amount"), + ActionParameter( + "input_mint", False, str, "Input token mint (optional for SOL)" + ), + ActionParameter( + "slippage_bps", False, int, "Slippage in basis points" + ), + ], + description="Swap tokens using Jupiter", + ), + "get-balance": Action( + name="get-balance", + parameters=[ + ActionParameter( + "token_address", + False, + str, + "Token mint address (optional for SOL)", + ) + ], + description="Check SOL or token balance", + ), + "stake": Action( + name="stake", + parameters=[ + ActionParameter("amount", True, float, "Amount of SOL to stake") + ], + description="Stake SOL", + ), + "lend-assets": Action( + name="lend-assets", + parameters=[ActionParameter("amount", True, float, "Amount to lend")], + description="Lend assets", + ), + "request-faucet": Action( + name="request-faucet", + parameters=[], + description="Request funds from faucet for testing", + ), + "deploy-token": Action( + name="deploy-token", + parameters=[ + ActionParameter( + "decimals", False, int, "Token decimals (default 9)" + ) + ], + description="Deploy a new token", + ), + "fetch-price": Action( + name="fetch-price", + parameters=[ + ActionParameter( + "token_id", True, str, "Token ID to fetch price for" + ) + ], + description="Get token price", + ), + "get-tps": Action( + name="get-tps", parameters=[], description="Get current Solana TPS" + ), + "get-token-by-ticker": Action( + name="get-token-by-ticker", + parameters=[ + ActionParameter("ticker", True, str, "Token ticker symbol") + ], + description="Get token data by ticker symbol", + ), + "get-token-by-address": Action( + name="get-token-by-address", + parameters=[ActionParameter("mint", True, str, "Token mint address")], + description="Get token data by mint address", + ), + "launch-pump-token": Action( + name="launch-pump-token", + parameters=[ + ActionParameter("token_name", True, str, "Name of the token"), + ActionParameter("token_ticker", True, str, "Token ticker symbol"), + ActionParameter("description", True, str, "Token description"), + ActionParameter("image_url", True, str, "Token image URL"), + ActionParameter("options", False, dict, "Additional token options"), + ], + description="Launch a Pump & Fun token", + ), + } + + def configure(self) -> bool: + """Sets up Solana credentials""" + logger.info("\nšŸ”‘ SOLANA CREDENTIALS SETUP") + + if self.is_configured(): + logger.info("\nSolana is already configured.") + response = input("Do you want to reconfigure? (y/n): ") + if response.lower() != "y": + return True + + logger.info("\nšŸ“ To get your Solana private key:") + logger.info("1. Export your private key from your wallet") + logger.info("2. Make sure it's in base58 format") + logger.info("3. Never share this key with anyone") + + private_key = input("\nEnter your Solana private key: ") + + try: + # Validate the private key format by attempting to create a keypair + Keypair.from_base58_string(private_key) + + if not os.path.exists(".env"): + with open(".env", "w") as f: + f.write("") + + set_key(".env", "SOLANA_PRIVATE_KEY", private_key) + load_dotenv(override=True) + + logger.info("\nāœ… Solana configuration successfully saved!") + logger.info("Your private key has been stored in the .env file.") + return True + + except Exception as e: + logger.error(f"\nāŒ Configuration failed: {e}") + return False + + def is_configured(self, verbose: bool = False) -> bool: + """Check if Solana credentials are configured and valid""" + try: + # First check if credentials exist and key is valid + load_dotenv(override=True) + private_key = os.getenv("SOLANA_PRIVATE_KEY") + if not private_key: + if verbose: + logger.debug("Solana private key not found in environment") + return False + + # Validate the key format + Keypair.from_base58_string(private_key) + + # We successfully validated the private key exists and is in correct format + if verbose: + logger.debug("Solana configuration is valid") + return True + + except Exception as e: + if verbose: + error_msg = str(e) + if isinstance(e, AgentipyConfigurationError): + error_msg = f"Configuration error: {error_msg}" + elif isinstance(e, AgentipyConnectionError): + error_msg = f"API validation error: {error_msg}" + logger.debug(f"Solana Configuration validation failed: {error_msg}") + return False + + def transfer( + self, to_address: str, amount: float, token_mint: Optional[str] = None + ) -> str: + logger.info(f"Transferring {amount} to {to_address}") + agent = self._get_agentipy() + res = agent.transfer(to_address, amount, token_mint) + res = asyncio.run(res) + return res + + def trade( + self, + output_mint: str, + input_amount: float, + input_mint: Optional[str] = SPL_TOKENS["USDC"], + slippage_bps: int = 100, + ) -> str: + logger.info(f"Swapping {input_amount} for {output_mint}") + agent = self._get_agentipy() + res = agent.trade(output_mint, input_amount, input_mint, slippage_bps) + res = asyncio.run(res) + return res + + def get_balance(self, token_address: str = None) -> float: + logger.info("Agentipy Fetching balance") + if token_address: + logger.info(f"Fetching balance for {token_address}") + token_address = Pubkey(token_address) + else: + token_address = None + agent = self._get_agentipy() + res = agent.get_balance(token_address) + res = asyncio.run(res) + return res + + # good + def stake(self, amount: float) -> str: + logger.info(f"Staking {amount} SOL") + agent = self._get_agentipy() + res = agent.stake(amount) + res = asyncio.run(res) + return res + + # todo: test on mainnet + def lend_assets(self, amount: float) -> str: + logger.info(f"Lending {amount} assets") + agent = self._get_agentipy() + res = agent.lend_assets(amount) + res = asyncio.run(res) + return res + + def request_faucet(self) -> str: + logger.info("Requesting faucet funds") + agent = self._get_agentipy() + res = agent.request_faucet_funds() + res = asyncio.run(res) + return res + + def deploy_token(self, decimals: int = 9) -> str: + logger.info(f"Deploying token with {decimals} decimals") + agent = self._get_agentipy() + res = agent.deploy_token(decimals) + res = asyncio.run(res) + return res + + # good + def fetch_price(self, token_id: str) -> float: + logger.info(f"Fetching price for {token_id}") + agent = self._get_agentipy() + res = agent.fetch_price(token_id) + res = asyncio.run(res) + return res + + # good + def get_tps(self) -> int: + logger.info("Fetching TPS") + agent = self._get_agentipy() + res = agent.get_tps() + res = asyncio.run(res) + return res + + # good + def get_token_by_ticker(self, ticker: str) -> str: + logger.info(f"Fetching token data for {ticker}") + agent = self._get_agentipy() + res = agent.get_token_data_by_ticker(ticker) + res = asyncio.run(res) + return res + + # good + def get_token_by_address(self, mint: str) -> Dict[str, Any]: + logger.info(f"Fetching token data for {mint}") + agent = self._get_agentipy() + res = agent.get_token_data_by_address(mint) + res = asyncio.run(res) + return res + + # todo: test on mainnet + def launch_pump_token( + self, + token_name: str, + token_ticker: str, + description: str, + image_url: str, + options: Optional[Dict[str, Any]] = None, + ) -> str: + logger.info(f"Launching Pump & Fun token {token_name}") + agent = self._get_agentipy() + res = agent.launch_pump_fun_token( + token_name, token_ticker, description, image_url, options + ) + res = asyncio.run(res) + return res + + def perform_action(self, action_name: str, kwargs) -> Any: + """Execute a Solana action with validation""" + if action_name not in self.actions: + raise KeyError(f"Unknown action: {action_name}") + + action = self.actions[action_name] + errors = action.validate_params(kwargs) + if errors: + raise ValueError(f"Invalid parameters: {', '.join(errors)}") + + method_name = action_name.replace("-", "_") + method = getattr(self, method_name) + return method(**kwargs)