Skip to content

Commit

Permalink
Add seperate docker container for firehose
Browse files Browse the repository at this point in the history
  • Loading branch information
richardr1126 committed Dec 13, 2024
1 parent 6d984be commit 396f8c0
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ COPY . .
EXPOSE 8000

# Runs when the container is started
CMD honcho start
CMD gunicorn web.app:app --bind 0.0.0.0:8000 --workers 4
17 changes: 15 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
services:
app:
image: richardr1126/cosmere-feed-bsky
firehose:
container_name: firehose
image: richardr1126/firehose
build:
context: ./firehose
env_file:
- .env
volumes:
- feeddata:/var/data
stdin_open: true
tty: true

web:
container_name: web
image: richardr1126/cosmere-feed
build:
context: .
env_file:
Expand Down
16 changes: 16 additions & 0 deletions firehose/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM python:3.12.8-slim-bookworm

# create a volume for the sqlite database, so that it persists between container restarts
# need persistent storage attached to server
VOLUME /var/data/
WORKDIR /usr/src/app/

# Copy package files and install dependencies
COPY ../requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

# Copy the rest of the application code
COPY . .

# Runs when the container is started
CMD python start_stream.py
2 changes: 1 addition & 1 deletion firehose/data_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections import defaultdict
from atproto import models, Client, IdResolver
from utils.logger import logger
from firehose.database import db, Post, init_client
from database import db, Post, init_client

PHRASES = [
'17th shard',
Expand Down
12 changes: 6 additions & 6 deletions firehose/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
)
from atproto.exceptions import FirehoseError

from firehose.database import SubscriptionState, db
from database import SubscriptionState, db
from utils.logger import logger

# Define the types of records we're interested in and their corresponding namespace IDs
Expand Down Expand Up @@ -123,7 +123,7 @@ def _run(name, operations_callback, stream_stop_event=None):
SubscriptionState.create(service=name, cursor=0)

# Initialize the firehose client w/o a cursor for now
client = FirehoseSubscribeReposClient(params)
client = FirehoseSubscribeReposClient()

def on_message_handler(message: firehose_models.MessageFrame) -> None:
"""
Expand Down Expand Up @@ -156,10 +156,10 @@ def on_message_handler(message: firehose_models.MessageFrame) -> None:
# Update the client's parameters with the new cursor
client.update_params(models.ComAtprotoSyncSubscribeRepos.Params(cursor=commit.seq))
# Persist the new cursor in the database
try:
SubscriptionState.update(cursor=commit.seq).where(SubscriptionState.service == name).execute()
except Exception as e:
logger.error(f"Failed to update cursor in database: {e}")
# try:
# SubscriptionState.update(cursor=commit.seq).where(SubscriptionState.service == name).execute()
# except Exception as e:
# logger.error(f"Failed to update cursor in database: {e}")

if not commit.blocks:
# Skip if there are no blocks to process
Expand Down
4 changes: 4 additions & 0 deletions firehose/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
atproto
peewee
python-dotenv
apscheduler
4 changes: 2 additions & 2 deletions start_stream.py → firehose/start_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

from utils import config
from utils.logger import logger
import firehose.data_stream as data_stream
from firehose.data_filter import operations_callback
import data_stream as data_stream
from data_filter import operations_callback

def main():
stream_stop_event = threading.Event()
Expand Down
27 changes: 27 additions & 0 deletions firehose/utils/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import os
from utils.logger import logger

SERVICE_DID = os.environ.get('SERVICE_DID', None)
HOSTNAME = os.environ.get('HOSTNAME', None)
HANDLE = os.environ.get('HANDLE', None)
PASSWORD = os.environ.get('PASSWORD', None)

if HOSTNAME is None:
raise RuntimeError('You should set "HOSTNAME" environment variable first.')

if SERVICE_DID is None:
SERVICE_DID = f'did:web:{HOSTNAME}'


CHRONOLOGICAL_TRENDING_URI = os.environ.get('CHRONOLOGICAL_TRENDING_URI')
if CHRONOLOGICAL_TRENDING_URI is None:
raise RuntimeError('Publish your feed first (run publish_feed.py) to obtain Feed URI. '
'Set this URI to "CHRONOLOGICAL_TRENDING_URI" environment variable.')

# logger.info(f'HANDLE: {HANDLE}')
# logger.info(f'PASSWORD: {PASSWORD}')
if HANDLE is None:
raise RuntimeError('You should set "HANDLE" environment variable first.')

if PASSWORD is None:
raise RuntimeError('You should set "PASSWORD" environment variable first.')
4 changes: 4 additions & 0 deletions firehose/utils/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

0 comments on commit 396f8c0

Please sign in to comment.