Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add live tracker backend #9

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from horizon.middleware_manager import uya_online_tracker
from horizon.middleware_manager import dl_online_tracker

from horizon.uya_live_tracker import uya_live_tracker


ALLOWED_ORIGINS: list[str] = [
"https://www.rac-horizon.com",
Expand All @@ -39,13 +41,16 @@
# Add background tasks
@app.on_event("startup")
async def start_background_tasks():
await uya_live_tracker.start(asyncio.get_event_loop())

asyncio.create_task(uya_online_tracker.refresh_token())
asyncio.create_task(uya_online_tracker.update_recent_stat_changes())
asyncio.create_task(uya_online_tracker.poll_active_online())
asyncio.create_task(dl_online_tracker.refresh_token())
#asyncio.create_task(dl_online_tracker.update_recent_stat_changes()) # Will work once DL middleware is updated
asyncio.create_task(dl_online_tracker.poll_active_online())


# Add sub-APIs.
app.include_router(deadlocked_stats_router)
app.include_router(uya_stats_router)
Expand Down
5 changes: 4 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ services:
container_name: fastapi_app
ports:
- "8000:8000"
- "8888:8888"
environment:
SOCKET_IP: ${SOCKET_IP}
depends_on:
db:
condition: service_healthy # FastAPI waits until PostgreSQL is healthy
Expand All @@ -28,4 +31,4 @@ services:
test: ["CMD-SHELL", "pg_isready -U ${TRACKER_POSTGRES_USER}"]
interval: 10s
timeout: 5s
retries: 5
retries: 5
87 changes: 87 additions & 0 deletions horizon/uya_live_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import os
import logging
import asyncio
from time import sleep
import json
from logging import handlers
from datetime import datetime
from livetrackerbackend import LiveTrackerBackend

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)

import websockets


class UyaLiveTracker():
def __init__(self, port:int=8888, read_tick_rate:int=60, write_tick_rate:int=15):

self._backend = LiveTrackerBackend(server_ip=os.getenv('SOCKET_IP'), log_level='INFO')
self._ip = '0.0.0.0'
self._port = port

self._connected = set()

# Ticks per second
self._read_tick_rate = read_tick_rate / 60
self._write_tick_rate = write_tick_rate / 60

self._world_state = []


async def start(self, loop):
self._backend.start(loop)
await self.start_websocket()

loop.create_task(self.read_prod_socket())
loop.create_task(self.flush())

async def start_websocket(self):
await websockets.serve(self.on_websocket_connection, '0.0.0.0', self._port)
logger.info(f"Websocket serving on ('0.0.0.0', {self._port}) ...")

async def read_prod_socket(self):
while True:
try:
self._world_state = self._backend.get_world_states()
except Exception as e:
logger.error("read_prod_socket failed to update!", exc_info=True)

await asyncio.sleep(self._read_tick_rate)

async def flush(self):
while True:
try:
data = json.dumps(self._world_state)

if len(self._connected) > 0:
for connection in self._connected:
try:
await connection.send(data)
except Exception:
connection.connected = False
except Exception as e:
logger.error("flush failed to update!", exc_info=True)

await asyncio.sleep(self._write_tick_rate)

async def on_websocket_connection(self, websocket, path):
logger.info(f"Websocket client connected: {websocket.remote_address}")
# Register.
self._connected.add(websocket)
websocket.connected = True
try:
while websocket.connected:
await asyncio.sleep(.001)
finally:
logger.info("Websocket disconnected!")
# Unregister.
self._connected.remove(websocket)


uya_live_tracker = UyaLiveTracker()
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ psycopg2
requests
urllib3
aiohttp
asyncpg
asyncpg
git+https://github.com/Horizon-Private-Server/horizon-uya-bot.git
12 changes: 12 additions & 0 deletions scripts/test_websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import asyncio
import websockets

async def read_websocket():
uri = f"ws://localhost:8888"
async with websockets.connect(uri,ping_interval=None) as websocket:
while True:
data = await websocket.recv()
print(data)


asyncio.new_event_loop().run_until_complete(read_websocket())