From cec3f5f20ca2aef1ebf6f1874ee31d17ba712a77 Mon Sep 17 00:00:00 2001 From: Richard Roberson Date: Wed, 15 Jan 2025 13:53:26 -0700 Subject: [PATCH] Test seperate scheduler container --- docker-compose.yml | 14 +++++++++ firehose/data_filter.py | 1 - firehose/data_stream.py | 6 +--- firehose/database.py | 4 +-- firehose/requirements.txt | 1 - scheduler/Dockerfile | 18 ++++++++++++ scheduler/database.py | 33 +++++++++++++++++++++ {firehose => scheduler}/db_scheduler.py | 26 ++++++++++++++--- scheduler/requirements.txt | 5 ++++ scheduler/utils/__init__.py | 0 scheduler/utils/config.py | 38 +++++++++++++++++++++++++ scheduler/utils/logger.py | 8 ++++++ 12 files changed, 141 insertions(+), 13 deletions(-) create mode 100644 scheduler/Dockerfile create mode 100644 scheduler/database.py rename {firehose => scheduler}/db_scheduler.py (95%) create mode 100644 scheduler/requirements.txt create mode 100644 scheduler/utils/__init__.py create mode 100644 scheduler/utils/config.py create mode 100644 scheduler/utils/logger.py diff --git a/docker-compose.yml b/docker-compose.yml index e4b0542..e6d7a7d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,6 +20,20 @@ services: tty: true depends_on: - postgres + + scheduler: + container_name: scheduler + image: richardr1126/cosmere-db-scheduler + build: + context: ./scheduler + env_file: + - .env + depends_on: + - postgres + - firehose + restart: unless-stopped + stdin_open: true + tty: true web: container_name: web diff --git a/firehose/data_filter.py b/firehose/data_filter.py index b87ec3e..23e3646 100644 --- a/firehose/data_filter.py +++ b/firehose/data_filter.py @@ -4,7 +4,6 @@ from atproto import models, Client, IdResolver from utils.logger import logger from database import db, Post -from db_scheduler import init_client import json from pathlib import Path diff --git a/firehose/data_stream.py b/firehose/data_stream.py index 529734b..33e7bee 100644 --- a/firehose/data_stream.py +++ b/firehose/data_stream.py @@ -10,10 +10,8 @@ ) from atproto.exceptions import FirehoseError -from database import SubscriptionState, db -from utils.logger import logger from database import db, Post, SubscriptionState, SessionState, Requests -import db_scheduler as db_scheduler +from utils.logger import logger # Define the types of records we're interested in and their corresponding namespace IDs _INTERESTED_RECORDS = { @@ -97,8 +95,6 @@ def run(name, operations_callback, stream_stop_event=None): db.create_tables([Post, SubscriptionState, SessionState, Requests]) logger.info("Database connected and tables created.") - db_scheduler.start() - while stream_stop_event is None or not stream_stop_event.is_set(): try: # Start the main run loop diff --git a/firehose/database.py b/firehose/database.py index 14abc27..185f3c8 100644 --- a/firehose/database.py +++ b/firehose/database.py @@ -27,7 +27,7 @@ class SessionState(BaseModel): service = peewee.CharField(unique=True) session_string = peewee.TextField(null=True) -# table for storing dids class Requests(BaseModel): indexed_at = peewee.DateTimeField(default=datetime.now(timezone.utc), index=True) - did = peewee.CharField(null=True, default=None, index=True) \ No newline at end of file + did = peewee.CharField(null=True, default=None, index=True) + diff --git a/firehose/requirements.txt b/firehose/requirements.txt index aa5e065..e9c308a 100644 --- a/firehose/requirements.txt +++ b/firehose/requirements.txt @@ -2,4 +2,3 @@ psycopg2-binary atproto peewee python-dotenv -apscheduler \ No newline at end of file diff --git a/scheduler/Dockerfile b/scheduler/Dockerfile new file mode 100644 index 0000000..d43bd91 --- /dev/null +++ b/scheduler/Dockerfile @@ -0,0 +1,18 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + gcc \ + libpq-dev \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements first to leverage Docker cache +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY . . + +CMD ["python", "db_scheduler.py"] \ No newline at end of file diff --git a/scheduler/database.py b/scheduler/database.py new file mode 100644 index 0000000..14abc27 --- /dev/null +++ b/scheduler/database.py @@ -0,0 +1,33 @@ +from datetime import datetime, timedelta, timezone +from utils.config import POSTGRES_DB, POSTGRES_PASSWORD, POSTGRES_USER +import peewee + +# Database setup +db = peewee.PostgresqlDatabase(POSTGRES_DB, user=POSTGRES_USER, password=POSTGRES_PASSWORD, host='postgres', port=5432) + +# Database Models +class BaseModel(peewee.Model): + class Meta: + database = db + +class Post(BaseModel): + uri = peewee.CharField(index=True) + cid = peewee.CharField() + reply_parent = peewee.CharField(null=True, default=None) + reply_root = peewee.CharField(null=True, default=None) + indexed_at = peewee.DateTimeField(default=datetime.now(timezone.utc), index=True) + author = peewee.CharField(null=True, default=None, index=True) + interactions = peewee.BigIntegerField(default=0, index=True) + +class SubscriptionState(BaseModel): + service = peewee.CharField(unique=True) + cursor = peewee.BigIntegerField() + +class SessionState(BaseModel): + service = peewee.CharField(unique=True) + session_string = peewee.TextField(null=True) + +# table for storing dids +class Requests(BaseModel): + indexed_at = peewee.DateTimeField(default=datetime.now(timezone.utc), index=True) + did = peewee.CharField(null=True, default=None, index=True) \ No newline at end of file diff --git a/firehose/db_scheduler.py b/scheduler/db_scheduler.py similarity index 95% rename from firehose/db_scheduler.py rename to scheduler/db_scheduler.py index b5ae8f8..51653e4 100644 --- a/firehose/db_scheduler.py +++ b/scheduler/db_scheduler.py @@ -8,21 +8,36 @@ from typing import Optional import peewee import time +import signal +import sys from utils.logger import logger from utils.config import HANDLE, PASSWORD from database import db, Post, SessionState # Main Function -def start(): +def main(): # Initialize Client client = init_client() # Start Scheduler scheduler = start_scheduler(client, schedule_hydration=True) - # for job in scheduler.get_jobs(): - # job.modify(next_run_time=datetime.now()) # Trigger all jobs immediately + for job in scheduler.get_jobs(): + job.modify(next_run_time=datetime.now()) # Trigger all jobs immediately + + # Handle graceful shutdown + def signal_handler(sig, frame): + logger.info("Shutting down scheduler...") + shutdown_scheduler(scheduler) + sys.exit(0) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # Keep the main thread alive + while True: + signal.pause() # Wait for signals # Postgres database management functions def clear_old_posts(clear_days: int): @@ -323,4 +338,7 @@ def shutdown_scheduler(scheduler: BackgroundScheduler): db.close() logger.info("Database connection closed by force") except peewee.PeeweeException as e: - logger.error(f"Error closing database: {e}") \ No newline at end of file + logger.error(f"Error closing database: {e}") + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/scheduler/requirements.txt b/scheduler/requirements.txt new file mode 100644 index 0000000..b544795 --- /dev/null +++ b/scheduler/requirements.txt @@ -0,0 +1,5 @@ +atproto +apscheduler +peewee +psycopg2-binary +python-dotenv \ No newline at end of file diff --git a/scheduler/utils/__init__.py b/scheduler/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scheduler/utils/config.py b/scheduler/utils/config.py new file mode 100644 index 0000000..d6f5d15 --- /dev/null +++ b/scheduler/utils/config.py @@ -0,0 +1,38 @@ +import os + +SERVICE_DID = os.environ.get('SERVICE_DID', None) +HOSTNAME = os.environ.get('HOSTNAME', None) +HANDLE = os.environ.get('HANDLE', None) +PASSWORD = os.environ.get('PASSWORD', None) +POSTGRES_USER = os.environ.get('POSTGRES_USER', None) +POSTGRES_PASSWORD = os.environ.get('POSTGRES_PASSWORD', None) +POSTGRES_DB = os.environ.get('POSTGRES_DB', None) + +if POSTGRES_USER is None: + raise RuntimeError('You should set "POSTGRES_USER" environment variable first.') + +if POSTGRES_PASSWORD is None: + raise RuntimeError('You should set "POSTGRES_PASSWORD" environment variable first.') + +if POSTGRES_DB is None: + raise RuntimeError('You should set "POSTGRES_DB" environment variable first.') + +if HOSTNAME is None: + raise RuntimeError('You should set "HOSTNAME" environment variable first.') + +if SERVICE_DID is None: + SERVICE_DID = f'did:web:{HOSTNAME}' + + +CHRONOLOGICAL_TRENDING_URI = os.environ.get('CHRONOLOGICAL_TRENDING_URI') +if CHRONOLOGICAL_TRENDING_URI is None: + raise RuntimeError('Publish your feed first (run publish_feed.py) to obtain Feed URI. ' + 'Set this URI to "CHRONOLOGICAL_TRENDING_URI" environment variable.') + +# logger.info(f'HANDLE: {HANDLE}') +# logger.info(f'PASSWORD: {PASSWORD}') +if HANDLE is None: + raise RuntimeError('You should set "HANDLE" environment variable first.') + +if PASSWORD is None: + raise RuntimeError('You should set "PASSWORD" environment variable first.') diff --git a/scheduler/utils/logger.py b/scheduler/utils/logger.py new file mode 100644 index 0000000..d115111 --- /dev/null +++ b/scheduler/utils/logger.py @@ -0,0 +1,8 @@ +import logging + +logger = logging.getLogger(__name__) +logging.basicConfig( + level=logging.INFO, + format='[%(asctime)s] [%(levelname)s] %(message)s', + datefmt='%H:%M:%S' +)