Skip to content

Commit

Permalink
Test seperate scheduler container
Browse files Browse the repository at this point in the history
  • Loading branch information
richardr1126 committed Jan 15, 2025
1 parent 10b15da commit cec3f5f
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 13 deletions.
14 changes: 14 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,20 @@ services:
tty: true
depends_on:
- postgres

scheduler:
container_name: scheduler
image: richardr1126/cosmere-db-scheduler
build:
context: ./scheduler
env_file:
- .env
depends_on:
- postgres
- firehose
restart: unless-stopped
stdin_open: true
tty: true

web:
container_name: web
Expand Down
1 change: 0 additions & 1 deletion firehose/data_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from atproto import models, Client, IdResolver
from utils.logger import logger
from database import db, Post
from db_scheduler import init_client
import json
from pathlib import Path

Expand Down
6 changes: 1 addition & 5 deletions firehose/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
)
from atproto.exceptions import FirehoseError

from database import SubscriptionState, db
from utils.logger import logger
from database import db, Post, SubscriptionState, SessionState, Requests
import db_scheduler as db_scheduler
from utils.logger import logger

# Define the types of records we're interested in and their corresponding namespace IDs
_INTERESTED_RECORDS = {
Expand Down Expand Up @@ -97,8 +95,6 @@ def run(name, operations_callback, stream_stop_event=None):
db.create_tables([Post, SubscriptionState, SessionState, Requests])
logger.info("Database connected and tables created.")

db_scheduler.start()

while stream_stop_event is None or not stream_stop_event.is_set():
try:
# Start the main run loop
Expand Down
4 changes: 2 additions & 2 deletions firehose/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class SessionState(BaseModel):
service = peewee.CharField(unique=True)
session_string = peewee.TextField(null=True)

# table for storing dids
class Requests(BaseModel):
indexed_at = peewee.DateTimeField(default=datetime.now(timezone.utc), index=True)
did = peewee.CharField(null=True, default=None, index=True)
did = peewee.CharField(null=True, default=None, index=True)

1 change: 0 additions & 1 deletion firehose/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@ psycopg2-binary
atproto
peewee
python-dotenv
apscheduler
18 changes: 18 additions & 0 deletions scheduler/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM python:3.11-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*

# Copy requirements first to leverage Docker cache
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

CMD ["python", "db_scheduler.py"]
33 changes: 33 additions & 0 deletions scheduler/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from datetime import datetime, timedelta, timezone
from utils.config import POSTGRES_DB, POSTGRES_PASSWORD, POSTGRES_USER
import peewee

# Database setup
db = peewee.PostgresqlDatabase(POSTGRES_DB, user=POSTGRES_USER, password=POSTGRES_PASSWORD, host='postgres', port=5432)

# Database Models
class BaseModel(peewee.Model):
class Meta:
database = db

class Post(BaseModel):
uri = peewee.CharField(index=True)
cid = peewee.CharField()
reply_parent = peewee.CharField(null=True, default=None)
reply_root = peewee.CharField(null=True, default=None)
indexed_at = peewee.DateTimeField(default=datetime.now(timezone.utc), index=True)
author = peewee.CharField(null=True, default=None, index=True)
interactions = peewee.BigIntegerField(default=0, index=True)

class SubscriptionState(BaseModel):
service = peewee.CharField(unique=True)
cursor = peewee.BigIntegerField()

class SessionState(BaseModel):
service = peewee.CharField(unique=True)
session_string = peewee.TextField(null=True)

# table for storing dids
class Requests(BaseModel):
indexed_at = peewee.DateTimeField(default=datetime.now(timezone.utc), index=True)
did = peewee.CharField(null=True, default=None, index=True)
26 changes: 22 additions & 4 deletions firehose/db_scheduler.py → scheduler/db_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,36 @@
from typing import Optional
import peewee
import time
import signal
import sys

from utils.logger import logger
from utils.config import HANDLE, PASSWORD
from database import db, Post, SessionState

# Main Function
def start():
def main():
# Initialize Client
client = init_client()

# Start Scheduler
scheduler = start_scheduler(client, schedule_hydration=True)

# for job in scheduler.get_jobs():
# job.modify(next_run_time=datetime.now()) # Trigger all jobs immediately
for job in scheduler.get_jobs():
job.modify(next_run_time=datetime.now()) # Trigger all jobs immediately

# Handle graceful shutdown
def signal_handler(sig, frame):
logger.info("Shutting down scheduler...")
shutdown_scheduler(scheduler)
sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

# Keep the main thread alive
while True:
signal.pause() # Wait for signals

# Postgres database management functions
def clear_old_posts(clear_days: int):
Expand Down Expand Up @@ -323,4 +338,7 @@ def shutdown_scheduler(scheduler: BackgroundScheduler):
db.close()
logger.info("Database connection closed by force")
except peewee.PeeweeException as e:
logger.error(f"Error closing database: {e}")
logger.error(f"Error closing database: {e}")

if __name__ == '__main__':
main()
5 changes: 5 additions & 0 deletions scheduler/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
atproto
apscheduler
peewee
psycopg2-binary
python-dotenv
Empty file added scheduler/utils/__init__.py
Empty file.
38 changes: 38 additions & 0 deletions scheduler/utils/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import os

SERVICE_DID = os.environ.get('SERVICE_DID', None)
HOSTNAME = os.environ.get('HOSTNAME', None)
HANDLE = os.environ.get('HANDLE', None)
PASSWORD = os.environ.get('PASSWORD', None)
POSTGRES_USER = os.environ.get('POSTGRES_USER', None)
POSTGRES_PASSWORD = os.environ.get('POSTGRES_PASSWORD', None)
POSTGRES_DB = os.environ.get('POSTGRES_DB', None)

if POSTGRES_USER is None:
raise RuntimeError('You should set "POSTGRES_USER" environment variable first.')

if POSTGRES_PASSWORD is None:
raise RuntimeError('You should set "POSTGRES_PASSWORD" environment variable first.')

if POSTGRES_DB is None:
raise RuntimeError('You should set "POSTGRES_DB" environment variable first.')

if HOSTNAME is None:
raise RuntimeError('You should set "HOSTNAME" environment variable first.')

if SERVICE_DID is None:
SERVICE_DID = f'did:web:{HOSTNAME}'


CHRONOLOGICAL_TRENDING_URI = os.environ.get('CHRONOLOGICAL_TRENDING_URI')
if CHRONOLOGICAL_TRENDING_URI is None:
raise RuntimeError('Publish your feed first (run publish_feed.py) to obtain Feed URI. '
'Set this URI to "CHRONOLOGICAL_TRENDING_URI" environment variable.')

# logger.info(f'HANDLE: {HANDLE}')
# logger.info(f'PASSWORD: {PASSWORD}')
if HANDLE is None:
raise RuntimeError('You should set "HANDLE" environment variable first.')

if PASSWORD is None:
raise RuntimeError('You should set "PASSWORD" environment variable first.')
8 changes: 8 additions & 0 deletions scheduler/utils/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] [%(levelname)s] %(message)s',
datefmt='%H:%M:%S'
)

0 comments on commit cec3f5f

Please sign in to comment.