-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfastapi_app.py
136 lines (117 loc) · 4.65 KB
/
fastapi_app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, FileResponse
from fastapi.middleware.cors import CORSMiddleware
import logging
import os
import json
# Initialize FastAPI app
app = FastAPI()
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Add ConnectionManager class
class ConnectionManager:
def __init__(self):
self.active_connections: set[WebSocket] = set()
self.market_maker_output = []
def format_fix_message(self, message: str) -> str:
"""Format FIX message with proper delimiters and clean structure"""
try:
# Split the message into parts
parts = message.split(chr(1))
# Filter out empty parts and format each tag-value pair
formatted_parts = []
for part in parts:
if part.strip():
if '=' in part:
tag, value = part.split('=', 1)
formatted_parts.append(f"{tag.strip()}={value.strip()}")
# Join with proper delimiter
return " | ".join(formatted_parts)
except Exception as e:
logger.error(f"Error formatting FIX message: {e}")
return message
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.add(websocket)
logger.info(f"New WebSocket connection. Total connections: {len(self.active_connections)}")
# Send initial market maker output history with formatted messages
for message in self.market_maker_output:
await websocket.send_json({
"type": "maker_output",
"message": self.format_fix_message(message)
})
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
logger.info(f"WebSocket disconnected. Remaining connections: {len(self.active_connections)}")
async def broadcast_market_data(self, data: str):
"""Specifically handle market data messages"""
message = {
"type": "market_data",
"data": data
}
await self.broadcast(json.dumps(message))
async def broadcast_maker_output(self, message: str):
"""Handle market maker output messages"""
# Only broadcast non-market data messages as maker output
if not self._is_market_data_message(message):
ws_message = {
"type": "maker_output",
"message": message
}
await self.broadcast(json.dumps(ws_message))
async def broadcast_order_update(self, message: str):
"""Handle order update messages"""
# Only broadcast if it's a genuine order update
if not self._is_market_data_message(message):
ws_message = {
"type": "order_update",
"order": message
}
await self.broadcast(json.dumps(ws_message))
def _is_market_data_message(self, message: str) -> bool:
"""Helper method to identify market data messages"""
# Check if message contains market data indicators
market_data_indicators = [
"MarketDataSnapshotFullRefresh",
"MDEntryType",
"MDReqID",
"NoMDEntries"
]
return any(indicator in message for indicator in market_data_indicators)
async def broadcast(self, message: str):
disconnected = set()
for connection in self.active_connections:
try:
await connection.send_text(message)
except Exception as e:
logger.error(f"Error broadcasting message: {e}")
disconnected.add(connection)
for connection in disconnected:
self.active_connections.remove(connection)
# Create manager instance
manager = ConnectionManager()
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount static files
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/", response_class=HTMLResponse)
async def get():
with open('static/index.html', 'r') as f:
return HTMLResponse(content=f.read())
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
logger.info(f"Received message from WebSocket: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)