Skip to content

Commit

Permalink
Test new atproto with firehose restarting
Browse files Browse the repository at this point in the history
  • Loading branch information
richardr1126 committed Feb 20, 2025
1 parent 03f820d commit 84e8d96
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 9 deletions.
6 changes: 3 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ services:
restart: unless-stopped
healthcheck:
test: ["CMD", "python", "health_check.py"] # Command to check the container's health
interval: 60s # Time between each health check
timeout: 10s # Time allowed for the health check to complete
interval: 15m # Time between each health check
timeout: 30s # Time allowed for the health check to complete
retries: 2 # Number of failed health checks before marking as unhealthy
start_period: 60s # Grace period before starting health checks after container starts
start_period: 60s # Time to wait before starting the health checks
depends_on:
- postgres
labels:
Expand Down
4 changes: 2 additions & 2 deletions firehose/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def _run(name, operations_callback, stream_stop_event=None):
# If no state exists, create an initial subscription state with cursor 0
SubscriptionState.create(service=name, cursor=0)

# Initialize the firehose client w/o a cursor for now
# Initialize the firehose client
client = FirehoseSubscribeReposClient(params)

def on_message_handler(message: firehose_models.MessageFrame) -> None:
Expand Down Expand Up @@ -163,7 +163,7 @@ def on_message_handler(message: firehose_models.MessageFrame) -> None:
logger.info(f'Cursor -> {commit.seq}')
# Update the client's parameters with the new cursor
client.update_params(models.ComAtprotoSyncSubscribeRepos.Params(cursor=commit.seq))
# Persist the new cursor in the database
# Persist the new cursor in the database with the last indexed timestamp
with db.atomic():
SubscriptionState.update(
cursor=commit.seq,
Expand Down
5 changes: 4 additions & 1 deletion firehose/health_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ def is_healthy():

if not state or not state.last_indexed_at:
return False
return (datetime.now(timezone.utc) - state.last_indexed_at) < timedelta(minutes=5)

# Check if the firehose last log is within the last 15 minutes
# If it is, the health check is considered healthy
return (datetime.now(timezone.utc) - state.last_indexed_at) < timedelta(minutes=15)

if __name__ == "__main__":
if is_healthy():
Expand Down
2 changes: 1 addition & 1 deletion firehose/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
psycopg2-binary
atproto==0.0.56
atproto
peewee
python-dotenv
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
atproto==0.0.56
atproto
peewee
Flask
python-dotenv
Expand Down
2 changes: 1 addition & 1 deletion scheduler/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
atproto==0.0.56
atproto
apscheduler
peewee
psycopg2-binary
Expand Down

0 comments on commit 84e8d96

Please sign in to comment.