|
1 |
| -import json |
| 1 | +import asyncio |
2 | 2 | import logging
|
3 | 3 | import threading
|
4 | 4 | import time
|
5 |
| -import requests |
6 |
| -from typing import List |
7 | 5 | from unittest import TestCase
|
| 6 | +from urllib.error import URLError |
| 7 | +from urllib.request import urlopen |
| 8 | + |
| 9 | +from aiohttp import WSMsgType, web |
8 | 10 |
|
9 | 11 | socket_mode_envelopes = [
|
10 | 12 | """{"envelope_id":"57d6a792-4d35-4d0b-b6aa-3361493e1caf","payload":{"type":"shortcut","token":"xxx","action_ts":"1610198080.300836","team":{"id":"T111","domain":"seratch"},"user":{"id":"U111","username":"seratch","team_id":"T111"},"is_enterprise_install":false,"enterprise":null,"callback_id":"do-something","trigger_id":"111.222.xxx"},"type":"interactive","accepts_response_payload":false}""",
|
11 | 13 | """{"envelope_id":"1d3c79ab-0ffb-41f3-a080-d19e85f53649","payload":{"token":"xxx","team_id":"T111","team_domain":"xxx","channel_id":"C111","channel_name":"random","user_id":"U111","user_name":"seratch","command":"/hello-socket-mode","text":"","api_app_id":"A111","response_url":"https://hooks.slack.com/commands/T111/111/xxx","trigger_id":"111.222.xxx"},"type":"slash_commands","accepts_response_payload":true}""",
|
12 | 14 | """{"envelope_id":"08cfc559-d933-402e-a5c1-79e135afaae4","payload":{"token":"xxx","team_id":"T111","api_app_id":"A111","event":{"client_msg_id":"c9b466b5-845c-49c6-a371-57ae44359bf1","type":"message","text":"<@W111>","user":"U111","ts":"1610197986.000300","team":"T111","blocks":[{"type":"rich_text","block_id":"1HBPc","elements":[{"type":"rich_text_section","elements":[{"type":"user","user_id":"U111"}]}]}],"channel":"C111","event_ts":"1610197986.000300","channel_type":"channel"},"type":"event_callback","event_id":"Ev111","event_time":1610197986,"authorizations":[{"enterprise_id":null,"team_id":"T111","user_id":"U111","is_bot":true,"is_enterprise_install":false}],"is_ext_shared_channel":false,"event_context":"1-message-T111-C111"},"type":"events_api","accepts_response_payload":false,"retry_attempt":1,"retry_reason":"timeout"}""",
|
13 | 15 | ]
|
14 | 16 |
|
15 |
| -from flask import Flask |
16 |
| -from flask_sockets import Sockets |
17 |
| - |
18 | 17 |
|
19 | 18 | def start_thread_socket_mode_server(test: TestCase, port: int):
|
20 |
| - def _start_thread_socket_mode_server(): |
21 |
| - logger = logging.getLogger(__name__) |
22 |
| - app: Flask = Flask(__name__) |
| 19 | + logger = logging.getLogger(__name__) |
| 20 | + state = {} |
| 21 | + |
| 22 | + def reset_server_state(): |
| 23 | + state.update( |
| 24 | + envelopes_to_consume=list(socket_mode_envelopes), |
| 25 | + ) |
| 26 | + |
| 27 | + test.reset_server_state = reset_server_state |
| 28 | + |
| 29 | + async def health(request: web.Request): |
| 30 | + wr = web.Response() |
| 31 | + await wr.prepare(request) |
| 32 | + wr.set_status(200) |
| 33 | + return wr |
| 34 | + |
| 35 | + async def link(request: web.Request): |
| 36 | + ws = web.WebSocketResponse() |
| 37 | + await ws.prepare(request) |
| 38 | + |
| 39 | + async for msg in ws: |
| 40 | + if msg.type != WSMsgType.TEXT: |
| 41 | + continue |
23 | 42 |
|
24 |
| - @app.route("/state") |
25 |
| - def state(): |
26 |
| - return json.dumps({"success": True}), 200, {"ContentType": "application/json"} |
| 43 | + if state["envelopes_to_consume"]: |
| 44 | + e = state["envelopes_to_consume"].pop(0) |
| 45 | + logger.debug(f"Send an envelope: {e}") |
| 46 | + await ws.send_str(e) |
27 | 47 |
|
28 |
| - sockets: Sockets = Sockets(app) |
| 48 | + message = msg.data |
| 49 | + logger.debug(f"Server received a message: {message}") |
29 | 50 |
|
30 |
| - envelopes_to_consume: List[str] = list(socket_mode_envelopes) |
| 51 | + await ws.send_str(message) |
31 | 52 |
|
32 |
| - @sockets.route("/link") |
33 |
| - def link(ws): |
34 |
| - while not ws.closed: |
35 |
| - message = ws.read_message() |
36 |
| - if message is not None: |
37 |
| - if len(envelopes_to_consume) > 0: |
38 |
| - e = envelopes_to_consume.pop(0) |
39 |
| - logger.debug(f"Send an envelope: {e}") |
40 |
| - ws.send(e) |
| 53 | + return ws |
41 | 54 |
|
42 |
| - logger.debug(f"Server received a message: {message}") |
43 |
| - ws.send(message) |
| 55 | + app = web.Application() |
| 56 | + app.add_routes( |
| 57 | + [ |
| 58 | + web.get("/link", link), |
| 59 | + web.get("/health", health), |
| 60 | + ] |
| 61 | + ) |
| 62 | + runner = web.AppRunner(app) |
44 | 63 |
|
45 |
| - from gevent import pywsgi |
46 |
| - from geventwebsocket.handler import WebSocketHandler |
| 64 | + def run_server(): |
| 65 | + reset_server_state() |
47 | 66 |
|
48 |
| - server = pywsgi.WSGIServer(("", port), app, handler_class=WebSocketHandler) |
49 |
| - test.server = server |
50 |
| - server.serve_forever(stop_timeout=1) |
| 67 | + test.loop = asyncio.new_event_loop() |
| 68 | + asyncio.set_event_loop(test.loop) |
| 69 | + test.loop.run_until_complete(runner.setup()) |
| 70 | + site = web.TCPSite(runner, "127.0.0.1", port, reuse_port=True) |
| 71 | + test.loop.run_until_complete(site.start()) |
51 | 72 |
|
52 |
| - return _start_thread_socket_mode_server |
| 73 | + # run until it's stopped from the main thread |
| 74 | + test.loop.run_forever() |
| 75 | + |
| 76 | + test.loop.run_until_complete(runner.cleanup()) |
| 77 | + test.loop.close() |
| 78 | + |
| 79 | + return run_server |
53 | 80 |
|
54 | 81 |
|
55 | 82 | def start_socket_mode_server(test, port: int):
|
56 | 83 | test.sm_thread = threading.Thread(target=start_thread_socket_mode_server(test, port))
|
57 | 84 | test.sm_thread.daemon = True
|
58 | 85 | test.sm_thread.start()
|
59 |
| - wait_for_socket_mode_server(port, 4) # wait for the server |
| 86 | + wait_for_socket_mode_server(port, 4) |
60 | 87 |
|
61 | 88 |
|
62 |
| -def wait_for_socket_mode_server(port: int, secs: int): |
| 89 | +def wait_for_socket_mode_server(port: int, timeout: int): |
63 | 90 | start_time = time.time()
|
64 |
| - while (time.time() - start_time) < secs: |
65 |
| - response = requests.get(url=f"http://localhost:{port}/state") |
66 |
| - if response.ok: |
67 |
| - break |
68 |
| - time.sleep(0.01) |
69 |
| - |
70 |
| - |
71 |
| -def stop_socket_mode_server(test): |
72 |
| - test.server.stop() |
73 |
| - test.server.close() |
74 |
| - |
75 |
| - |
76 |
| -async def stop_socket_mode_server_async(test: TestCase): |
77 |
| - test.server.stop() |
78 |
| - test.server.close() |
| 91 | + while (time.time() - start_time) < timeout: |
| 92 | + try: |
| 93 | + urlopen(f"http://127.0.0.1:{port}/health") |
| 94 | + return |
| 95 | + except URLError: |
| 96 | + time.sleep(0.01) |
| 97 | + |
| 98 | + |
| 99 | +def stop_socket_mode_server(test: TestCase): |
| 100 | + # An event loop runs in a thread and executes all callbacks and Tasks in |
| 101 | + # its thread. While a Task is running in the event loop, no other Tasks |
| 102 | + # can run in the same thread. When a Task executes an await expression, the |
| 103 | + # running Task gets suspended, and the event loop executes the next Task. |
| 104 | + # To schedule a callback from another OS thread, the loop.call_soon_threadsafe() method should be used. |
| 105 | + # https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading |
| 106 | + test.loop.call_soon_threadsafe(test.loop.stop) |
| 107 | + test.sm_thread.join(timeout=5) |
0 commit comments