-
Notifications
You must be signed in to change notification settings - Fork 10
/
postgresql2websocket.py
executable file
·65 lines (58 loc) · 1.95 KB
/
postgresql2websocket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#!/usr/bin/env python
#
# Python >= 3.5
import asyncio
import configparser
import asyncpg
from aiohttp import web, WSCloseCode
def callback_websocket(ws):
def callback(connection, pid, channel, payload):
asyncio.ensure_future(ws.send_str(payload))
return callback
async def websocket_handler(request):
channel = request.match_info.get('channel', 'postgresql2websocket')
ws = web.WebSocketResponse()
await ws.prepare(request)
request.app['websockets'].append(ws)
pool = request.app['pool']
async with pool.acquire() as connection:
await connection.add_listener(channel, callback_websocket(ws))
try:
async for msg in ws:
async with connection.transaction():
async for record in connection.cursor(msg.data):
asyncio.ensure_future(ws.send_json(dict(record)))
finally:
request.app['websockets'].remove(ws)
return ws
async def init_app(config):
app = web.Application()
app['pool'] = await asyncpg.create_pool(**config['postgresql'])
app.router.add_route('GET', '/{channel}', websocket_handler)
return app
async def on_shutdown(app):
for ws in app['websockets']:
await ws.close(code=WSCloseCode.GOING_AWAY,
message='Server shutdown')
def main(filename = 'postgresql2websocket.conf'):
config = configparser.ConfigParser()
if not config.read(filename):
print("Unable to read %s" % filename)
exit(1)
loop = asyncio.get_event_loop()
app = loop.run_until_complete(init_app(config))
app['websockets'] = []
app.on_shutdown.append(on_shutdown)
try:
web.run_app(app,
host = config.get('web', 'host'),
port = config.getint('web', 'port'),
)
except KeyboardInterrupt:
pass
finally:
for task in asyncio.Task.all_tasks():
task.cancel()
loop.close()
if __name__ == '__main__':
main()