Skip to content

Commit

Permalink
fix message forwarding during pairing
Browse files Browse the repository at this point in the history
  • Loading branch information
tibroc committed Aug 8, 2024
1 parent d01b0f0 commit c4bc682
Showing 1 changed file with 51 additions and 22 deletions.
73 changes: 51 additions & 22 deletions pairing-server/bbb_pairing_server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async def generate_pin_task(connection_id):
print(f"pin task: cancelled for channel {connection_id}")
return
except Exception as e:
print("exception 2")
print("pin task: exception 2")
traceback.print_exception(e)
return

Expand All @@ -53,21 +53,24 @@ async def handle_room() -> None:
forward_task = None
connection_id = None
last_pin = None
await websocket.send(json.dumps({'hello': 'starting ;)'}))
try:
data = await websocket.receive()
data = json.loads(data)
print(f"@ws_room: Received data: {json.dumps(data, indent=2)}")
if 'config' not in data:
return
config = data['config']
connection_id = room_connections.inc()
room_config[connection_id] = config
print(f'room config: {connection_id} is {config}')
print(f'@ws_room: New room has ID {connection_id}')
to_room[connection_id] = asyncio.Queue()
pin_task = asyncio.create_task(generate_pin_task(
connection_id), name=f"pin_generate_{connection_id}")
while True:
msg = await to_room[connection_id].get()
print(f"room queue received {msg}")
to_room[connection_id].task_done()
print(f"@ws_room: Room queue received: {json.dumps(msg, indent=2)}")
if 'pin' in msg:
pin = msg['pin']
if last_pin:
Expand All @@ -93,9 +96,10 @@ async def handle_room() -> None:
}
)
)
print(f"@ws_room: tp_plugin-q: {to_plugin}")
forward_task = asyncio.create_task(
forward_from_queue_to_websocket(
to_room[connection_id], websocket)
forward_from_ws_to_q(
to_plugin[msg['connection_id']], websocket)
)
elif 'stop_pin_generation' in msg:
pin_task.cancel()
Expand All @@ -105,7 +109,7 @@ async def handle_room() -> None:
name=f"pin_generate_{connection_id}")

finally:
print("room close")
print("@ws_room: Closing room.")
if pin_task:
pin_task.cancel()
if forward_task:
Expand All @@ -114,6 +118,7 @@ async def handle_room() -> None:
q = to_room[connection_id]
while not q.empty():
await q.get()
q.task_done()
del to_room[connection_id]
del q

Expand All @@ -128,46 +133,49 @@ async def handle_ws() -> None:
forward_task = None
try:
data = await websocket.receive_json()
print(f"plugin received {data}")
print(f"@ws: Plugin received data: {json.dumps(data, indent=2)}")
pin = data['pin']
if pin not in pin_to_room:
await websocket.send(json.dumps({'status': 404, 'msg': 'PIN not found'}))
return
room_connection_id = pin_to_room[pin]
config = room_config.get(room_connection_id)
print(f"plugin found config {config}")
print(f"@ws: Plugin found config: {json.dumps(config, indent=2)}")
if config is None:
print("no config")
print("@ws: No config!")
await websocket.send(json.dumps({'status': 500, 'msg': 'client config not found. This is a bug'}))
return
if not validate_client_config(config):
print("@ws: Invalid config!")
await websocket.send(json.dumps({'status': 500, 'msg': 'invalid client config'}))
return
print("config is valid")
print("sende stop_pin_generation")
print("@ws: Config is valid.")
print("@ws: Will send stop_pin_generation.")
to_room_queue = to_room[room_connection_id]
await to_room_queue.put({'stop_pin_generation': True})
pin_generation_stopped = True
print("gesendet")
print("@ws: stop_pin_generation event send.")
await websocket.send_json({'status': 200, 'msg': 'ok', 'config': config})
# jetzt bekommen wir die URLs
print("Warte auf URLS")
print("@ws: Waiting for join URLs!")
data = await websocket.receive_json()
print(f"empfangen: {data}")
print(f"@ws: received: {json.dumps(data, indent=2)}")
if 'urls' not in data:
await websocket.send(json.dumps({'status': 500, 'msg': 'invalid format. Expecting urls'}))
return
to_plugin[pin] = asyncio.Queue()
await to_room_queue.put({'start': True, 'urls': data['urls'], 'pairing_pin': pairing_pin})
plugin_connection_id = pin
to_plugin[plugin_connection_id] = asyncio.Queue()
print(f"@ws: Generated plugin queue with key: {pin}")
await to_room_queue.put({'start': True, 'urls': data['urls'], 'pairing_pin': pairing_pin, 'connection_id': plugin_connection_id})
await websocket.send(json.dumps({'status': 200, 'msg': 'pairing', 'pairing_pin': pairing_pin}))
forward_task = asyncio.create_task(
forward_from_queue_to_websocket(to_plugin[pin], websocket))
forward_from_queue_to_websocket(to_plugin[plugin_connection_id], websocket))
while True:
data = await websocket.receive()
data = json.loads(data)
print(f"@ws: Data received from room: {data}")
await to_room.put(data)

# wait for session terminate
# wait for session to terminate

except KeyError:
await websocket.send(json.dumps({'status': 500, 'msg': 'invalid format'}))
Expand All @@ -176,27 +184,48 @@ async def handle_ws() -> None:
traceback.print_exception(e)
return
finally:
print("closing plugin")
print("@ws: Closing plugin.")
if pin_generation_stopped:
await to_room_queue.put({'start_pin_generation': True})
if forward_task:
print("@ws Cancelling forward_task!")
forward_task.cancel()
if pin in to_plugin:
while not to_plugin[pin].empty():
await to_plugin[pin].get()
msg = await to_plugin[pin].get()
print(f"@ws: still in queue: {msg}")
to_plugin[pin].task_done()
del to_plugin[pin]
await websocket.close(1007)
print("closed plugin")
print("@ws: Plugin closed.")
return


async def forward_from_queue_to_websocket(q, websocket):
print(f"Created from Q to WS task: {q}")
try:
while True:
print("entered while loop!")
data = await q.get()
data = json.loads(data)
if 'type' in data and data['type'] == 'ping':
continue
print(f"Forwarding data from queue to websocket: {data}")
await websocket.send_json(data)
q.task_done()
except asyncio.CancelledError:
return


async def forward_from_ws_to_q(q, websocket):
try:
while True:
data = await websocket.receive()
data = json.loads(data)
if 'type' in data and data['type'] == 'ping':
continue
print(f"Forwarding data from websocket to queue: {data}")
await q.put(json.dumps(data))
except asyncio.CancelledError:
return

Expand Down

0 comments on commit c4bc682

Please sign in to comment.