From c913f90cc0cc1708b8fe6d4c1d402ed5cacd0539 Mon Sep 17 00:00:00 2001 From: Shihao Guo Date: Wed, 29 Jan 2025 12:10:47 -0800 Subject: [PATCH 1/3] refactor xapiskill and add image generation --- .../activities/activity_post_a_tweet.py | 97 ++++---- .../activity_post_recent_memory_tweet.py | 112 +-------- my_digital_being/skills/skill_x_api.py | 216 +++++++++--------- 3 files changed, 171 insertions(+), 254 deletions(-) diff --git a/my_digital_being/activities/activity_post_a_tweet.py b/my_digital_being/activities/activity_post_a_tweet.py index b7368a6..458b09e 100644 --- a/my_digital_being/activities/activity_post_a_tweet.py +++ b/my_digital_being/activities/activity_post_a_tweet.py @@ -1,10 +1,12 @@ import logging -from typing import Dict, Any, List +from typing import Dict, Any, List, Tuple from framework.activity_decorator import activity, ActivityBase, ActivityResult from framework.api_management import api_manager from framework.memory import Memory from skills.skill_chat import chat_skill +from skills.skill_generate_image import ImageGenerationSkill +from skills.skill_x_api import XAPISkill logger = logging.getLogger(__name__) @@ -12,8 +14,8 @@ @activity( name="post_a_tweet", energy_cost=0.4, - cooldown=10000, # 1 hour - required_skills=["twitter_posting"], + cooldown=3600, # 1 hour + required_skills=["twitter_posting", "image_generation"], ) class PostTweetActivity(ActivityBase): """ @@ -26,11 +28,13 @@ class PostTweetActivity(ActivityBase): def __init__(self): super().__init__() self.max_length = 280 - # The Composio action name from your logs - self.composio_action = "TWITTER_CREATION_OF_A_POST" # If you know your Twitter username, you can embed it in the link # or fetch it dynamically. Otherwise, substitute accordingly: self.twitter_username = "YourUserName" + # set this to True if you want to generate an image for the tweet + self.image_generation_enabled = True + self.default_size = (1024, 1024) # Added for image generation + self.default_format = "png" # Added for image generation async def execute(self, shared_data) -> ActivityResult: try: @@ -61,8 +65,18 @@ async def execute(self, shared_data) -> ActivityResult: if len(tweet_text) > self.max_length: tweet_text = tweet_text[: self.max_length - 3] + "..." - # 4) Post the tweet via Composio - post_result = self._post_tweet_via_composio(tweet_text) + # 4) Generate an image based on the tweet text + if self.image_generation_enabled: + image_prompt, media_urls = await self._generate_image_for_tweet(tweet_text, personality_data) + else: + image_prompt, media_urls = None, [] + + # 5) Post the tweet via X API + x_api = XAPISkill({ + "enabled": True, + "twitter_username": self.twitter_username + }) + post_result = await x_api.post_tweet(tweet_text, media_urls) if not post_result["success"]: error_msg = post_result.get( "error", "Unknown error posting tweet via Composio" @@ -77,7 +91,7 @@ async def execute(self, shared_data) -> ActivityResult: else None ) - # 5) Return success, adding link & prompt in metadata + # 6) Return success, adding link & prompt in metadata logger.info(f"Successfully posted tweet: {tweet_text[:50]}...") return ActivityResult( success=True, @@ -88,7 +102,9 @@ async def execute(self, shared_data) -> ActivityResult: "model": chat_response["data"].get("model"), "finish_reason": chat_response["data"].get("finish_reason"), "tweet_link": tweet_link, - "prompt_used": prompt_text, # <--- includes the full prompt + "prompt_used": prompt_text, + "image_prompt_used": image_prompt, + "image_count": len(media_urls), }, ) @@ -159,38 +175,37 @@ def _build_chat_prompt( f"but not repeating old tweets. Avoid hashtags or repeated phrases.\n" ) - def _post_tweet_via_composio(self, tweet_text: str) -> Dict[str, Any]: + def _build_image_prompt(self, tweet_text: str, personality: Dict[str, Any]) -> str: + personality_str = "\n".join(f"{t}: {v}" for t, v in personality.items()) + return f"Our digital being has these personality traits:\n" \ + f"{personality_str}\n\n" \ + f"And is creating a tweet with the text: {tweet_text}\n\n" \ + f"Generate an image that represents the story of the tweet and reflects the personality traits. Do not include the tweet text in the image." + + async def _generate_image_for_tweet(self, tweet_text: str, personality_data: Dict[str, Any]) -> Tuple[str, List[str]]: """ - Post a tweet using the "Creation of a post" Composio action. - The response returns {'successfull': True, ...}, not 'success'. - We'll check 'successfull' or fallback if needed. + Generate an image for the tweet and upload it to Twitter. + Returns a tuple of (image_prompt, media_urls). + If generation fails, returns (None, []). """ - try: - from framework.composio_integration import composio_manager - - logger.info( - f"Posting tweet via Composio action='{self.composio_action}', text='{tweet_text[:50]}...'" - ) - - response = composio_manager._toolset.execute_action( - action=self.composio_action, - params={"text": tweet_text}, - entity_id="MyDigitalBeing", + logger.info("Decided to generate an image for tweet") + image_skill = ImageGenerationSkill({ + "enabled": True, + "max_generations_per_day": 50, + "supported_formats": ["png", "jpg"], + }) + + if await image_skill.can_generate(): + image_prompt = self._build_image_prompt(tweet_text, personality_data) + image_result = await image_skill.generate_image( + prompt=image_prompt, + size=self.default_size, + format=self.default_format ) - - # The actual success key is "successfull" (with 2 Ls) - success_val = response.get("success", response.get("successfull")) - if success_val: - data_section = response.get("data", {}) - nested_data = data_section.get("data", {}) - tweet_id = nested_data.get("id") - return {"success": True, "tweet_id": tweet_id} - else: - return { - "success": False, - "error": response.get("error", "Unknown or missing success key"), - } - - except Exception as e: - logger.error(f"Error in Composio tweet post: {e}", exc_info=True) - return {"success": False, "error": str(e)} + + if image_result.get("success") and image_result.get("image_data", {}).get("url"): + return image_prompt, [image_result["image_data"]["url"]] + else: + logger.warning("Image generation not available, proceeding with text-only tweet") + + return None, [] diff --git a/my_digital_being/activities/activity_post_recent_memory_tweet.py b/my_digital_being/activities/activity_post_recent_memory_tweet.py index 24e96bf..2508fea 100644 --- a/my_digital_being/activities/activity_post_recent_memory_tweet.py +++ b/my_digital_being/activities/activity_post_recent_memory_tweet.py @@ -6,6 +6,7 @@ from framework.api_management import api_manager from framework.memory import Memory from skills.skill_chat import chat_skill +from skills.skill_x_api import XAPISkill logger = logging.getLogger(__name__) @@ -27,7 +28,6 @@ class PostRecentMemoriesTweetActivity(ActivityBase): def __init__(self, num_activities_to_fetch: int = 10): super().__init__() self.max_length = 280 - self.composio_action = "TWITTER_CREATION_OF_A_POST" self.twitter_username = "YourUserName" # Activity types to ignore in memory results @@ -88,9 +88,8 @@ async def execute(self, shared_data) -> ActivityResult: new_memories=new_memories, ) - # 6) Extract drawing URLs and upload to Twitter media + # 6) Extract drawing URLs from memories drawing_urls = self._extract_drawing_urls(new_memories) - media_ids = await self._upload_drawings_to_twitter(drawing_urls) # 7) Use chat skill to generate the tweet text chat_response = await chat_skill.get_chat_completion( @@ -108,8 +107,12 @@ async def execute(self, shared_data) -> ActivityResult: if len(tweet_text) > self.max_length: tweet_text = tweet_text[: self.max_length - 3] + "..." - # 8) Post to Twitter via Composio - post_result = self._post_tweet_via_composio(tweet_text, media_ids) + # 8) Post to Twitter via X API with any extracted images + x_api = XAPISkill({ + "enabled": True, + "twitter_username": self.twitter_username + }) + post_result = await x_api.post_tweet(tweet_text, drawing_urls) if not post_result["success"]: error_msg = post_result.get( "error", "Unknown error posting tweet via Composio" @@ -141,6 +144,7 @@ async def execute(self, shared_data) -> ActivityResult: "prompt_used": prompt_text, "model": chat_response["data"].get("model"), "finish_reason": chat_response["data"].get("finish_reason"), + "image_count": len(drawing_urls), }, ) @@ -264,42 +268,6 @@ def _build_chat_prompt( ) return prompt - def _post_tweet_via_composio(self, tweet_text: str, media_ids: List[str]) -> Dict[str, Any]: - """ - Post tweet via Composio with optional media_ids. - """ - try: - from framework.composio_integration import composio_manager - - logger.info( - f"Posting tweet via Composio action='{self.composio_action}', text='{tweet_text[:50]}...', media_count={len(media_ids)}" - ) - - response = composio_manager._toolset.execute_action( - action=self.composio_action, - params={ - "text": tweet_text, - "media__media__ids": media_ids if media_ids else None - }, - entity_id="MyDigitalBeing", - ) - - success_val = response.get("success", response.get("successfull")) - if success_val: - data_section = response.get("data", {}) - nested_data = data_section.get("data", {}) - tweet_id = nested_data.get("id") - return {"success": True, "tweet_id": tweet_id} - else: - return { - "success": False, - "error": response.get("error", "Unknown or missing success key"), - } - - except Exception as e: - logger.error(f"Error in Composio tweet post: {e}", exc_info=True) - return {"success": False, "error": str(e)} - def _extract_drawing_urls(self, memories: List[str]) -> List[str]: """ Extract URLs from all DrawActivity entries in memories. @@ -329,65 +297,3 @@ def _extract_drawing_urls(self, memories: List[str]) -> List[str]: continue return drawing_urls - - async def _upload_drawings_to_twitter(self, drawing_urls: List[str]) -> List[str]: - """ - Downloads images from URLs and uploads them to Twitter via Composio. - Returns a list of Twitter media IDs. - """ - import aiohttp - import base64 - from framework.composio_integration import composio_manager - - media_ids = [] - - if not drawing_urls: - return media_ids - - async with aiohttp.ClientSession() as session: - for url in drawing_urls: - try: - # Download image - async with session.get(url) as response: - if response.status != 200: - logger.warning(f"Failed to download image from {url}: {response.status}") - continue - - image_data = await response.read() - - # Convert to base64 - base64_image = base64.b64encode(image_data).decode('utf-8') - - # Extract filename from URL or use default - filename = url.split('/')[-1].split('?')[0] or 'image.png' - - # Upload to Twitter via Composio - upload_response = composio_manager._toolset.execute_action( - action="TWITTER_MEDIA_UPLOAD_MEDIA", - params={ - "media": { - "name": filename, - "content": base64_image - } - }, - entity_id="MyDigitalBeing" - ) - - # Composio returns 'successfull' instead of 'successful' - if upload_response.get("successful") or upload_response.get("successfull"): - media_id = upload_response.get("media_id") or upload_response.get("data", {}).get("media_id") - if media_id: - media_ids.append(media_id) - logger.info(f"Successfully uploaded image to Twitter, media_id: {media_id}") - else: - logger.warning(f"Upload succeeded but no media_id returned. Response: {upload_response}") - else: - error = upload_response.get("error", "Unknown error") - logger.warning(f"Failed to upload image to Twitter: {error}") - - except Exception as e: - logger.error(f"Error uploading image to Twitter: {e}", exc_info=True) - continue - - logger.debug(f"Uploaded drawing URLs to Twitter media: {media_ids}") - return media_ids diff --git a/my_digital_being/skills/skill_x_api.py b/my_digital_being/skills/skill_x_api.py index 4cf966b..5d8b75c 100644 --- a/my_digital_being/skills/skill_x_api.py +++ b/my_digital_being/skills/skill_x_api.py @@ -1,24 +1,21 @@ """X (Twitter) API integration skill.""" -import os import logging -from typing import Dict, Any, Optional -import requests -from requests_oauthlib import OAuth1Session -from framework.skill_config import SkillConfig -from framework.api_management import api_manager +import base64 +import aiohttp +from typing import Dict, Any, Optional, List +from framework.composio_integration import composio_manager logger = logging.getLogger(__name__) class XAPIError(Exception): """Custom exception for X API errors""" - pass class XAPISkill: - """Skill for interacting with X (Twitter) API.""" + """Skill for interacting with X (Twitter) API via Composio.""" def __init__(self, config: Dict[str, Any]): """Initialize skill configuration.""" @@ -27,129 +24,128 @@ def __init__(self, config: Dict[str, Any]): self.rate_limit = config.get("rate_limit", 100) self.cooldown_period = config.get("cooldown_period", 300) self.posts_count = 0 - self.skill_config = SkillConfig("twitter_posting") - self.oauth_session: Optional[OAuth1Session] = None + self.twitter_username = config.get("twitter_username", "YourUserName") # Get from config + + # Composio action names + self.post_action = "TWITTER_CREATION_OF_A_POST" + self.media_upload_action = "TWITTER_MEDIA_UPLOAD_MEDIA" - async def initialize(self) -> bool: - """Initialize the X API skill with required credentials.""" - try: - # Register required API keys - required_keys = [ - "API_KEY", - "API_SECRET", - "ACCESS_TOKEN", - "ACCESS_TOKEN_SECRET", - ] - api_manager.register_required_keys("twitter_posting", required_keys) - - # Check for missing credentials - missing_keys = [] - for key in required_keys: - if not self.skill_config.get_api_key(key): - missing_keys.append(key) - - if missing_keys: - logger.info(f"Missing X API credentials: {missing_keys}") - return False # Let the front-end handle credential requests - - # Try to authenticate if we have all credentials - return await self.authenticate() - - except Exception as e: - logger.error(f"Failed to initialize X API skill: {e}") - return False + if not self.twitter_username: + logger.warning("No twitter_username provided in config") def can_post(self) -> bool: """Check if posting is allowed based on rate limits.""" return self.enabled and self.posts_count < self.rate_limit - async def authenticate(self) -> bool: - """Set up OAuth session for X API.""" + async def upload_media(self, media_url: str) -> Optional[str]: + """ + Download image from URL and upload to Twitter via Composio. + Returns media ID if successful, None otherwise. + """ try: - api_key = self.skill_config.get_api_key("API_KEY") - api_secret = self.skill_config.get_api_key("API_SECRET") - access_token = self.skill_config.get_api_key("ACCESS_TOKEN") - access_token_secret = self.skill_config.get_api_key("ACCESS_TOKEN_SECRET") - - if not all([api_key, api_secret, access_token, access_token_secret]): - logger.error("Missing required X API credentials") - return False - - self.oauth_session = OAuth1Session( - client_key=api_key, - client_secret=api_secret, - resource_owner_key=access_token, - resource_owner_secret=access_token_secret, + logger.info(f"Downloading media from URL: {media_url}") + async with aiohttp.ClientSession() as session: + async with session.get(media_url) as response: + if response.status != 200: + logger.warning(f"Failed to download image from {media_url}: {response.status}") + return None + + image_data = await response.read() + + # Convert to base64 + base64_image = base64.b64encode(image_data).decode('utf-8') + + # Extract filename from URL or use default + filename = media_url.split('/')[-1].split('?')[0] or 'image.png' + + # Upload to Twitter via Composio + logger.info(f"Uploading media to Twitter: {filename}") + upload_response = composio_manager._toolset.execute_action( + action=self.media_upload_action, + params={ + "media": { + "name": filename, + "content": base64_image + } + }, + entity_id="MyDigitalBeing" ) - return True - + + if upload_response.get("successful") or upload_response.get("successfull"): + media_id = upload_response.get("media_id") or upload_response.get("data", {}).get("media_id") + if media_id: + logger.info(f"Successfully uploaded image to Twitter, media_id: {media_id}") + return media_id + + logger.warning(f"Failed to upload image to Twitter: {upload_response.get('error', 'Unknown error')}") + return None + except Exception as e: - logger.error(f"Authentication failed: {e}") - return False + logger.error(f"Error uploading image to Twitter: {e}", exc_info=True) + return None - async def post_tweet( - self, text: str, media_path: Optional[str] = None - ) -> Dict[str, Any]: - """Post a tweet with optional media attachment.""" + async def post_tweet(self, text: str, media_urls: List[str] = None) -> Dict[str, Any]: + """ + Post a tweet with optional media attachments using Composio. + Handles media upload internally if media_urls are provided. + Returns dict with success status and tweet data. + """ if not self.can_post(): return {"success": False, "error": "Rate limit exceeded or skill disabled"} - if not self.oauth_session: - if not await self.authenticate(): - return {"success": False, "error": "Authentication failed"} - try: - # Handle media upload if provided - media_id = None - if media_path and os.path.exists(media_path): - media_id = await self._upload_media(media_path) - - # Prepare tweet payload - post_payload = {"text": text} - if media_id: - post_payload["media"] = {"media_ids": [media_id]} - - # Post tweet - response = self.oauth_session.post( - "https://api.twitter.com/2/tweets", json=post_payload + # First handle media uploads if any + media_ids = [] + if media_urls: + for url in media_urls: + media_id = await self.upload_media(url) + if media_id: + media_ids.append(media_id) + + # Now post the tweet with any media IDs we collected + logger.info( + f"Posting tweet via Composio action='{self.post_action}', " + f"text='{text[:50]}...', media_count={len(media_ids)}" ) - if response.status_code != 201: - error_data = response.json() if response.text else {} - raise XAPIError(f"Failed to post tweet: {error_data}") - - self.posts_count += 1 - return { - "success": True, - "tweet_id": response.json()["data"]["id"], - "content": text, - } + params = {"text": text} + if media_ids: + params["media__media__ids"] = media_ids - except Exception as e: - logger.error(f"Failed to post tweet: {e}") - return {"success": False, "error": str(e)} - - async def _upload_media(self, media_path: str) -> Optional[str]: - """Upload media to X and return media_id.""" - try: - with open(media_path, "rb") as f: - files = {"media": f} - upload_response = self.oauth_session.post( - "https://upload.twitter.com/1.1/media/upload.json", files=files - ) + response = composio_manager._toolset.execute_action( + action=self.post_action, + params=params, + entity_id="MyDigitalBeing", + ) - if upload_response.status_code != 200: - logger.error( - f"Failed to upload media. Status code: {upload_response.status_code}" + # The actual success key is "successfull" (with 2 Ls) + success_val = response.get("success", response.get("successfull")) + if success_val: + data_section = response.get("data", {}) + nested_data = data_section.get("data", {}) + tweet_id = nested_data.get("id") + + tweet_link = ( + f"https://twitter.com/{self.twitter_username}/status/{tweet_id}" + if tweet_id else None ) - return None - - media_data = upload_response.json() - return media_data.get("media_id_string") + + self.posts_count += 1 + return { + "success": True, + "tweet_id": tweet_id, + "content": text, + "tweet_link": tweet_link, + "media_count": len(media_ids) + } + else: + error_msg = response.get("error", "Unknown or missing success key") + logger.error(f"Tweet posting failed: {error_msg}") + return {"success": False, "error": error_msg} except Exception as e: - logger.error(f"Media upload failed: {e}") - return None + logger.error(f"Failed to post tweet: {e}", exc_info=True) + return {"success": False, "error": str(e)} def reset_counts(self): """Reset the post counter.""" From f20323ac301816e6b9da8b70caa53e5b3d493254 Mon Sep 17 00:00:00 2001 From: Shihao Guo Date: Fri, 31 Jan 2025 12:45:50 -0800 Subject: [PATCH 2/3] default image generation to false --- my_digital_being/activities/activity_post_a_tweet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/my_digital_being/activities/activity_post_a_tweet.py b/my_digital_being/activities/activity_post_a_tweet.py index 458b09e..0615c5e 100644 --- a/my_digital_being/activities/activity_post_a_tweet.py +++ b/my_digital_being/activities/activity_post_a_tweet.py @@ -32,7 +32,7 @@ def __init__(self): # or fetch it dynamically. Otherwise, substitute accordingly: self.twitter_username = "YourUserName" # set this to True if you want to generate an image for the tweet - self.image_generation_enabled = True + self.image_generation_enabled = False self.default_size = (1024, 1024) # Added for image generation self.default_format = "png" # Added for image generation From dd7bb8d7b13fd1eb801b383c1acb07a6239deb9a Mon Sep 17 00:00:00 2001 From: Shihao Guo Date: Tue, 4 Feb 2025 09:18:26 -0800 Subject: [PATCH 3/3] update image upload method due to composio api change --- my_digital_being/skills/skill_x_api.py | 79 ++++++++++++++++---------- 1 file changed, 50 insertions(+), 29 deletions(-) diff --git a/my_digital_being/skills/skill_x_api.py b/my_digital_being/skills/skill_x_api.py index 5d8b75c..48d8567 100644 --- a/my_digital_being/skills/skill_x_api.py +++ b/my_digital_being/skills/skill_x_api.py @@ -3,8 +3,10 @@ import logging import base64 import aiohttp +import os from typing import Dict, Any, Optional, List from framework.composio_integration import composio_manager +from pathlib import Path logger = logging.getLogger(__name__) @@ -26,6 +28,15 @@ def __init__(self, config: Dict[str, Any]): self.posts_count = 0 self.twitter_username = config.get("twitter_username", "YourUserName") # Get from config + # Create storage directory if it doesn't exist + # Get the project root directory (2 levels up from this file) + current_file = Path(__file__) + project_root = current_file.parent.parent + self.storage_path = project_root / "storage" / "images" + self.storage_path.mkdir(parents=True, exist_ok=True) + + logger.info(f"Image storage path: {self.storage_path}") + # Composio action names self.post_action = "TWITTER_CREATION_OF_A_POST" self.media_upload_action = "TWITTER_MEDIA_UPLOAD_MEDIA" @@ -42,6 +53,7 @@ async def upload_media(self, media_url: str) -> Optional[str]: Download image from URL and upload to Twitter via Composio. Returns media ID if successful, None otherwise. """ + local_path = None try: logger.info(f"Downloading media from URL: {media_url}") async with aiohttp.ClientSession() as session: @@ -49,40 +61,49 @@ async def upload_media(self, media_url: str) -> Optional[str]: if response.status != 200: logger.warning(f"Failed to download image from {media_url}: {response.status}") return None - + + # Extract filename and create local path + filename = media_url.split('/')[-1].split('?')[0] or 'image.png' + local_path = self.storage_path / filename + + # Save the file locally image_data = await response.read() - - # Convert to base64 - base64_image = base64.b64encode(image_data).decode('utf-8') - - # Extract filename from URL or use default - filename = media_url.split('/')[-1].split('?')[0] or 'image.png' - - # Upload to Twitter via Composio - logger.info(f"Uploading media to Twitter: {filename}") - upload_response = composio_manager._toolset.execute_action( - action=self.media_upload_action, - params={ - "media": { - "name": filename, - "content": base64_image - } - }, - entity_id="MyDigitalBeing" - ) - - if upload_response.get("successful") or upload_response.get("successfull"): - media_id = upload_response.get("media_id") or upload_response.get("data", {}).get("media_id") - if media_id: - logger.info(f"Successfully uploaded image to Twitter, media_id: {media_id}") - return media_id - - logger.warning(f"Failed to upload image to Twitter: {upload_response.get('error', 'Unknown error')}") - return None + with open(local_path, 'wb') as f: + f.write(image_data) + + logger.info(f"Saved image to {local_path}") + + # Upload to Twitter via Composio + logger.info(f"Uploading media to Twitter") + upload_response = composio_manager._toolset.execute_action( + action=self.media_upload_action, + params={ + "media": str(local_path) # Just pass the file path as a string + }, + entity_id="MyDigitalBeing" + ) + if upload_response.get("successful"): + media_id = upload_response.get("data", {}).get("media_id") + if media_id: + logger.info(f"Successfully uploaded image to Twitter, media_id: {media_id}") + return media_id + + logger.warning(f"Failed to upload image to Twitter: {upload_response.get('error', 'Unknown error')}") + return None + except Exception as e: logger.error(f"Error uploading image to Twitter: {e}", exc_info=True) return None + + finally: + # Clean up the temporary file + if local_path and local_path.exists(): + try: + local_path.unlink() + logger.info(f"Cleaned up temporary file: {local_path}") + except Exception as e: + logger.warning(f"Failed to clean up temporary file {local_path}: {e}") async def post_tweet(self, text: str, media_urls: List[str] = None) -> Dict[str, Any]: """