Skip to content

Commit

Permalink
feature: add ability to upload recent drawings to twitter media
Browse files Browse the repository at this point in the history
  • Loading branch information
shihao-guo committed Jan 25, 2025
1 parent 126cca1 commit c5c56f0
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 14 deletions.
116 changes: 108 additions & 8 deletions my_digital_being/activities/activity_post_recent_memory_tweet.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from typing import Dict, Any, List
from urllib.parse import urlparse

from framework.activity_decorator import activity, ActivityBase, ActivityResult
from framework.api_management import api_manager
Expand Down Expand Up @@ -87,7 +88,11 @@ async def execute(self, shared_data) -> ActivityResult:
new_memories=new_memories,
)

# 6) Use chat skill to generate the tweet text
# 6) Extract drawing URLs and upload to Twitter media
drawing_urls = self._extract_drawing_urls(new_memories)
media_ids = await self._upload_drawings_to_twitter(drawing_urls)

# 7) Use chat skill to generate the tweet text
chat_response = await chat_skill.get_chat_completion(
prompt=prompt_text,
system_prompt=(
Expand All @@ -103,8 +108,8 @@ async def execute(self, shared_data) -> ActivityResult:
if len(tweet_text) > self.max_length:
tweet_text = tweet_text[: self.max_length - 3] + "..."

# 7) Post to Twitter via Composio
post_result = self._post_tweet_via_composio(tweet_text)
# 8) Post to Twitter via Composio
post_result = self._post_tweet_via_composio(tweet_text, media_ids)
if not post_result["success"]:
error_msg = post_result.get(
"error", "Unknown error posting tweet via Composio"
Expand All @@ -119,7 +124,7 @@ async def execute(self, shared_data) -> ActivityResult:
else None
)

# 8) Return success, storing the new memories in "data" so we can skip them next time
# 9) Return success, storing the new memories in "data" so we can skip them next time
logger.info(
f"Successfully posted tweet about recent memories: {tweet_text[:50]}..."
)
Expand Down Expand Up @@ -259,20 +264,23 @@ def _build_chat_prompt(
)
return prompt

def _post_tweet_via_composio(self, tweet_text: str) -> Dict[str, Any]:
def _post_tweet_via_composio(self, tweet_text: str, media_ids: List[str]) -> Dict[str, Any]:
"""
Same as your original PostTweetActivity approach to tweeting via Composio.
Post tweet via Composio with optional media_ids.
"""
try:
from framework.composio_integration import composio_manager

logger.info(
f"Posting tweet via Composio action='{self.composio_action}', text='{tweet_text[:50]}...'"
f"Posting tweet via Composio action='{self.composio_action}', text='{tweet_text[:50]}...', media_count={len(media_ids)}"
)

response = composio_manager._toolset.execute_action(
action=self.composio_action,
params={"text": tweet_text},
params={
"text": tweet_text,
"media__media__ids": media_ids if media_ids else None
},
entity_id="MyDigitalBeing",
)

Expand All @@ -291,3 +299,95 @@ def _post_tweet_via_composio(self, tweet_text: str) -> Dict[str, Any]:
except Exception as e:
logger.error(f"Error in Composio tweet post: {e}", exc_info=True)
return {"success": False, "error": str(e)}

def _extract_drawing_urls(self, memories: List[str]) -> List[str]:
"""
Extract URLs from all DrawActivity entries in memories.
Returns a list of valid URLs, empty list if none found.
"""
drawing_urls = []

for memory in memories:
if memory.startswith("DrawActivity =>"):
try:
# Extract the JSON-like string after '=>'
data_str = memory.split("=>")[1].strip()
# Convert string representation to dict
data = eval(data_str)

# Extract URL from image_data
if 'image_data' in data and 'url' in data['image_data']:
url = data['image_data']['url']
# Validate URL
result = urlparse(url)
if all([result.scheme, result.netloc]):
drawing_urls.append(url)
else:
logger.warning(f"Invalid URL format found in DrawActivity: {url}")
except Exception as e:
logger.error(f"Failed to extract drawing URL: {e}")
continue

return drawing_urls

async def _upload_drawings_to_twitter(self, drawing_urls: List[str]) -> List[str]:
"""
Downloads images from URLs and uploads them to Twitter via Composio.
Returns a list of Twitter media IDs.
"""
import aiohttp
import base64
from framework.composio_integration import composio_manager

media_ids = []

if not drawing_urls:
return media_ids

async with aiohttp.ClientSession() as session:
for url in drawing_urls:
try:
# Download image
async with session.get(url) as response:
if response.status != 200:
logger.warning(f"Failed to download image from {url}: {response.status}")
continue

image_data = await response.read()

# Convert to base64
base64_image = base64.b64encode(image_data).decode('utf-8')

# Extract filename from URL or use default
filename = url.split('/')[-1].split('?')[0] or 'image.png'

# Upload to Twitter via Composio
upload_response = composio_manager._toolset.execute_action(
action="TWITTER_MEDIA_UPLOAD_MEDIA",
params={
"media": {
"name": filename,
"content": base64_image
}
},
entity_id="MyDigitalBeing"
)

# Composio returns 'successfull' instead of 'successful'
if upload_response.get("successful") or upload_response.get("successfull"):
media_id = upload_response.get("media_id") or upload_response.get("data", {}).get("media_id")
if media_id:
media_ids.append(media_id)
logger.info(f"Successfully uploaded image to Twitter, media_id: {media_id}")
else:
logger.warning(f"Upload succeeded but no media_id returned. Response: {upload_response}")
else:
error = upload_response.get("error", "Unknown error")
logger.warning(f"Failed to upload image to Twitter: {error}")

except Exception as e:
logger.error(f"Error uploading image to Twitter: {e}", exc_info=True)
continue

logger.debug(f"Uploaded drawing URLs to Twitter media: {media_ids}")
return media_ids
40 changes: 34 additions & 6 deletions my_digital_being/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,33 @@
from pathlib import Path
from typing import Dict, Any, Set, Union, Tuple
from datetime import datetime
import os
import warnings

import websockets
from websockets.server import serve
from websockets.legacy.server import WebSocketServerProtocol
from websockets.server import serve as ws_serve
from websockets.server import WebSocketServerProtocol

# Import api_manager at top-level (not again inside any function)
from framework.api_management import api_manager
from framework.main import DigitalBeing
from framework.skill_config import DynamicComposioSkills

logging.basicConfig(level=logging.INFO)
warnings.filterwarnings("ignore", message="Valid config keys have changed in V2")

def configure_logging():
"""Configure logging with proper format and level"""
log_level = os.getenv('LOGLEVEL', 'INFO').upper()
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
force=True # Force reconfiguration of the root logger
)
# Suppress websockets debug logs unless specifically wanted
if log_level != 'DEBUG':
logging.getLogger('websockets').setLevel(logging.INFO)

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -723,7 +739,7 @@ async def start_server(self):
"""Start the server using websockets.serve()."""
try:
await self.initialize()
async with serve(
async with ws_serve(
self.handle_websocket,
self.host,
self.port,
Expand All @@ -736,6 +752,18 @@ async def start_server(self):
raise


if __name__ == "__main__":
async def main():
# Configure logging first thing
configure_logging()

logger.debug("Logging configured with level: %s", os.getenv('LOGLEVEL', 'INFO'))
logger.info("Starting Digital Being server...")
logger.debug("Creating server instance...")

server = DigitalBeingServer()
asyncio.run(server.start_server())
logger.debug("Starting server...")
await server.start_server()


if __name__ == "__main__":
asyncio.run(main())

0 comments on commit c5c56f0

Please sign in to comment.