diff --git a/tests/slack_sdk/socket_mode/mock_socket_mode_server.py b/tests/slack_sdk/socket_mode/mock_socket_mode_server.py index 02f9b84a..561cef8f 100644 --- a/tests/slack_sdk/socket_mode/mock_socket_mode_server.py +++ b/tests/slack_sdk/socket_mode/mock_socket_mode_server.py @@ -1,7 +1,12 @@ import asyncio import logging import os +import threading import time +from typing import Any, Dict +from urllib.error import URLError +from urllib.request import urlopen +from unittest import TestCase from aiohttp import WSMsgType, web @@ -28,89 +33,40 @@ socket_mode_disconnect_message = """{"type":"disconnect","reason":"too_many_websockets","num_connections":2,"debug_info":{"host":"applink-111-xxx"},"connection_info":{"app_id":"A111"}}""" -def start_socket_mode_server(self, port: int): +def start_thread_socket_mode_server(self, port: int): logger = logging.getLogger(__name__) - state = {} + state: Dict[str, Any] = {} def reset_server_state(): state.update( hello_sent=False, + disconnect=False, envelopes_to_consume=list(socket_mode_envelopes), ) self.reset_server_state = reset_server_state - async def link(request): - ws = web.WebSocketResponse() - await ws.prepare(request) - - async for msg in ws: - if msg.type != WSMsgType.TEXT: - continue - - message = msg.data - logger.debug(f"Server received a message: {message}") - - if not state["hello_sent"]: - state["hello_sent"] = True - await ws.send_str(socket_mode_hello_message) - - if state["envelopes_to_consume"]: - e = state["envelopes_to_consume"].pop(0) - logger.debug(f"Send an envelope: {e}") - await ws.send_str(e) - - await ws.send_str(message) - - return ws - - app = web.Application() - app.add_routes([web.get("/link", link)]) - runner = web.AppRunner(app) - - def run_server(): - reset_server_state() - - self.loop = loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.run_until_complete(runner.setup()) - site = web.TCPSite(runner, "127.0.0.1", port, reuse_port=True) - loop.run_until_complete(site.start()) + async def health(request: web.Request): + wr = web.Response() + await wr.prepare(request) + wr.set_status(200) + return wr - # run until it's stopped from the main thread - loop.run_forever() - - loop.run_until_complete(runner.cleanup()) - loop.run_until_complete(asyncio.sleep(1)) - loop.close() - - return run_server - - -def start_socket_mode_server_with_disconnection(self, port: int): - logger = logging.getLogger(__name__) - state = {} - - def reset_server_state(): - state.update( - hello_sent=False, - disconnect_sent=False, - envelopes_to_consume=list(socket_mode_envelopes), - ) - - self.reset_server_state = reset_server_state + async def disconnect(request: web.Request): + state["disconnect"] = True + wr = web.Response() + await wr.prepare(request) + wr.set_status(200) + return wr async def link(request): - disconnected = False + connected = True ws = web.WebSocketResponse() await ws.prepare(request) async for msg in ws: - # To ensure disconnect message is received and handled, - # need to keep this ws alive to bypass client ping-pong check. if msg.type == WSMsgType.PING: - t = time.time() - await ws.pong(f"sdk-ping-pong:{t}") + await ws.pong(f"sdk-ping-pong:{time.time()}") continue if msg.type != WSMsgType.TEXT: continue @@ -122,14 +78,14 @@ async def link(request): state["hello_sent"] = True await ws.send_str(socket_mode_hello_message) - if not state["disconnect_sent"]: + if state["disconnect"]: state["hello_sent"] = False - state["disconnect_sent"] = True - disconnected = True + state["disconnect"] = False + connected = False await ws.send_str(socket_mode_disconnect_message) - logger.debug(f"Disconnect message sent") + logger.debug("Disconnect message sent") - if state["envelopes_to_consume"] and not disconnected: + if state["envelopes_to_consume"] and connected: e = state["envelopes_to_consume"].pop(0) logger.debug(f"Send an envelope: {e}") await ws.send_str(e) @@ -139,7 +95,13 @@ async def link(request): return ws app = web.Application() - app.add_routes([web.get("/link", link)]) + app.add_routes( + [ + web.get("/link", link), + web.get("/health", health), + web.get("/disconnect", disconnect), + ] + ) runner = web.AppRunner(app) def run_server(): @@ -155,7 +117,44 @@ def run_server(): loop.run_forever() loop.run_until_complete(runner.cleanup()) - loop.run_until_complete(asyncio.sleep(1)) loop.close() return run_server + + +def start_socket_mode_server(test, port: int): + test.sm_thread = threading.Thread(target=start_thread_socket_mode_server(test, port)) + test.sm_thread.daemon = True + test.sm_thread.start() + wait_for_socket_mode_server(port, 4) + + +def stop_socket_mode_server(test: TestCase): + # An event loop runs in a thread and executes all callbacks and Tasks in + # its thread. While a Task is running in the event loop, no other Tasks + # can run in the same thread. When a Task executes an await expression, the + # running Task gets suspended, and the event loop executes the next Task. + # To schedule a callback from another OS thread, the loop.call_soon_threadsafe() method should be used. + # https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading + test.loop.call_soon_threadsafe(test.loop.stop) + test.sm_thread.join(timeout=5) + + +def wait_for_socket_mode_server(port: int, timeout: int): + start_time = time.time() + while (time.time() - start_time) < timeout: + try: + urlopen(f"http://127.0.0.1:{port}/health") + return + except URLError: + time.sleep(0.01) + + +def request_socket_mode_server_disconnect(port: int, timeout: int): + start_time = time.time() + while (time.time() - start_time) < timeout: + try: + urlopen(f"http://127.0.0.1:{port}/disconnect") + return + except URLError: + time.sleep(0.01) diff --git a/tests/slack_sdk/socket_mode/test_interactions_builtin.py b/tests/slack_sdk/socket_mode/test_interactions_builtin.py index 43577200..4ff576fc 100644 --- a/tests/slack_sdk/socket_mode/test_interactions_builtin.py +++ b/tests/slack_sdk/socket_mode/test_interactions_builtin.py @@ -2,7 +2,6 @@ import time import unittest from random import randint -from threading import Thread import pytest @@ -17,6 +16,7 @@ start_socket_mode_server, socket_mode_envelopes, socket_mode_hello_message, + stop_socket_mode_server, ) from tests.slack_sdk.socket_mode.mock_web_api_handler import MockHandler from tests.mock_web_api_server import setup_mock_web_api_server, cleanup_mock_web_api_server @@ -33,9 +33,11 @@ def setUp(self): token="xoxb-api_test", base_url="http://localhost:8888", ) + start_socket_mode_server(self, 3011) def tearDown(self): cleanup_mock_web_api_server(self) + stop_socket_mode_server(self) def test_buffer_size_validation(self): try: @@ -54,11 +56,6 @@ def test_interactions(self): # we believe that the same situation never happens in the production usage. sys.setrecursionlimit(10000) - t = Thread(target=start_socket_mode_server(self, 3011)) - t.daemon = True - t.start() - time.sleep(2) # wait for the server - try: buffer_size_list = [1024, 9000, 35, 49] + list([randint(16, 128) for _ in range(10)]) for buffer_size in buffer_size_list: @@ -123,17 +120,10 @@ def socket_mode_request_handler(client: BaseSocketModeClient, request: SocketMod # Restore the default value sys.setrecursionlimit(default_recursion_limit) client.close() - self.loop.stop() - t.join(timeout=5) self.logger.info(f"Passed with buffer size: {buffer_size_list}") def test_send_message_while_disconnection(self): - t = Thread(target=start_socket_mode_server(self, 3011)) - t.daemon = True - t.start() - time.sleep(2) # wait for the server - try: self.reset_server_state() client = SocketModeClient( @@ -157,5 +147,3 @@ def test_send_message_while_disconnection(self): client.send_message("foo") finally: client.close() - self.loop.stop() - t.join(timeout=5) diff --git a/tests/slack_sdk/socket_mode/test_interactions_websocket_client.py b/tests/slack_sdk/socket_mode/test_interactions_websocket_client.py index cfe86156..15df8688 100644 --- a/tests/slack_sdk/socket_mode/test_interactions_websocket_client.py +++ b/tests/slack_sdk/socket_mode/test_interactions_websocket_client.py @@ -2,7 +2,6 @@ import time import unittest from random import randint -from threading import Thread from websocket import WebSocketException @@ -17,6 +16,7 @@ start_socket_mode_server, socket_mode_envelopes, socket_mode_hello_message, + stop_socket_mode_server, ) from tests.slack_sdk.socket_mode.mock_web_api_handler import MockHandler from tests.mock_web_api_server import setup_mock_web_api_server, cleanup_mock_web_api_server @@ -31,15 +31,13 @@ def setUp(self): token="xoxb-api_test", base_url="http://localhost:8888", ) + start_socket_mode_server(self, 3012) def tearDown(self): cleanup_mock_web_api_server(self) + stop_socket_mode_server(self) def test_interactions(self): - t = Thread(target=start_socket_mode_server(self, 3012)) - t.daemon = True - t.start() - received_messages = [] received_socket_mode_requests = [] @@ -63,7 +61,6 @@ def socket_mode_request_handler(client: BaseSocketModeClient, request: SocketMod client.socket_mode_request_listeners.append(socket_mode_request_handler) try: - time.sleep(1) # wait for the server client.wss_uri = "ws://0.0.0.0:3012/link" client.connect() time.sleep(1) # wait for the message receiver @@ -91,17 +88,11 @@ def socket_mode_request_handler(client: BaseSocketModeClient, request: SocketMod self.assertEqual(len(socket_mode_envelopes), len(received_socket_mode_requests)) finally: client.close() - self.loop.stop() - t.join(timeout=5) def test_send_message_while_disconnection(self): if is_ci_unstable_test_skip_enabled(): # this test tends to fail on the GitHub Actions platform return - t = Thread(target=start_socket_mode_server(self, 3012)) - t.daemon = True - t.start() - time.sleep(2) # wait for the server try: client = SocketModeClient( @@ -129,5 +120,3 @@ def test_send_message_while_disconnection(self): client.send_message("foo") finally: client.close() - self.loop.stop() - t.join(timeout=5) diff --git a/tests/slack_sdk_async/socket_mode/test_interactions_aiohttp.py b/tests/slack_sdk_async/socket_mode/test_interactions_aiohttp.py index cfe71a83..91e28d7c 100644 --- a/tests/slack_sdk_async/socket_mode/test_interactions_aiohttp.py +++ b/tests/slack_sdk_async/socket_mode/test_interactions_aiohttp.py @@ -3,7 +3,6 @@ import time import unittest from random import randint -from threading import Thread import pytest from aiohttp import WSMessage @@ -15,10 +14,11 @@ from slack_sdk.socket_mode.aiohttp import SocketModeClient from slack_sdk.web.async_client import AsyncWebClient from tests.slack_sdk.socket_mode.mock_socket_mode_server import ( + request_socket_mode_server_disconnect, start_socket_mode_server, - start_socket_mode_server_with_disconnection, socket_mode_envelopes, socket_mode_hello_message, + stop_socket_mode_server, ) from tests.slack_sdk.socket_mode.mock_web_api_handler import MockHandler from tests.mock_web_api_server import setup_mock_web_api_server_async, cleanup_mock_web_api_server_async @@ -34,16 +34,14 @@ def setUp(self): token="xoxb-api_test", base_url="http://localhost:8888", ) + start_socket_mode_server(self, 3001) def tearDown(self): cleanup_mock_web_api_server_async(self) + stop_socket_mode_server(self) @async_test async def test_interactions(self): - t = Thread(target=start_socket_mode_server(self, 3001)) - t.daemon = True - t.start() - received_messages = [] received_socket_mode_requests = [] @@ -70,7 +68,6 @@ async def socket_mode_listener( client.socket_mode_request_listeners.append(socket_mode_listener) try: - time.sleep(1) # wait for the server client.wss_uri = "ws://0.0.0.0:3001/link" await client.connect() await asyncio.sleep(1) # wait for the message receiver @@ -97,15 +94,9 @@ async def socket_mode_listener( self.assertEqual(len(socket_mode_envelopes), len(received_socket_mode_requests)) finally: await client.close() - self.loop.stop() - t.join(timeout=5) @async_test async def test_interactions_with_disconnection(self): - t = Thread(target=start_socket_mode_server_with_disconnection(self, 3001)) - t.daemon = True - t.start() - self.disconnected = False received_messages = [] received_socket_mode_requests = [] @@ -137,11 +128,12 @@ async def socket_mode_listener( client.socket_mode_request_listeners.append(socket_mode_listener) try: - time.sleep(1) # wait for the server client.wss_uri = "ws://0.0.0.0:3001/link" await client.connect() await asyncio.sleep(1) # wait for the message receiver + request_socket_mode_server_disconnect(3001, 1) + # Because we want to check the expected messages of new session, # we need to ensure we send messaged after disconnected. count = 0 @@ -183,15 +175,9 @@ async def socket_mode_listener( self.assertEqual(len(socket_mode_envelopes), len(received_socket_mode_requests)) finally: await client.close() - self.loop.stop() - t.join(timeout=5) @async_test async def test_send_message_while_disconnection(self): - t = Thread(target=start_socket_mode_server(self, 3001)) - t.daemon = True - t.start() - client = SocketModeClient( app_token="xapp-A111-222-xyz", web_client=self.web_client, @@ -200,7 +186,6 @@ async def test_send_message_while_disconnection(self): ) try: - time.sleep(1) # wait for the server client.wss_uri = "ws://0.0.0.0:3001/link" await client.connect() await asyncio.sleep(1) # wait for the message receiver @@ -216,5 +201,3 @@ async def test_send_message_while_disconnection(self): await client.send_message("foo") finally: await client.close() - self.loop.stop() - t.join(timeout=5) diff --git a/tests/slack_sdk_async/socket_mode/test_interactions_websockets.py b/tests/slack_sdk_async/socket_mode/test_interactions_websockets.py index 99652b68..5b408dbc 100644 --- a/tests/slack_sdk_async/socket_mode/test_interactions_websockets.py +++ b/tests/slack_sdk_async/socket_mode/test_interactions_websockets.py @@ -3,7 +3,6 @@ import time import unittest from random import randint -from threading import Thread from typing import Optional import pytest @@ -14,7 +13,6 @@ from slack_sdk.socket_mode.async_client import AsyncBaseSocketModeClient from slack_sdk.socket_mode.websockets import SocketModeClient from slack_sdk.web.async_client import AsyncWebClient -from tests.helpers import is_ci_unstable_test_skip_enabled from tests.slack_sdk.socket_mode.mock_socket_mode_server import ( start_socket_mode_server, socket_mode_envelopes, @@ -34,16 +32,13 @@ def setUp(self): token="xoxb-api_test", base_url="http://localhost:8888", ) + start_socket_mode_server(self, 3001) def tearDown(self): cleanup_mock_web_api_server_async(self) @async_test async def test_interactions(self): - t = Thread(target=start_socket_mode_server(self, 3002)) - t.daemon = True - t.start() - received_messages = [] received_socket_mode_requests = [] @@ -73,8 +68,7 @@ async def socket_mode_listener( client.socket_mode_request_listeners.append(socket_mode_listener) try: - time.sleep(1) # wait for the server - client.wss_uri = "ws://0.0.0.0:3002/link" + client.wss_uri = "ws://0.0.0.0:3001/link" await client.connect() await asyncio.sleep(1) # wait for the message receiver @@ -99,18 +93,9 @@ async def socket_mode_listener( self.assertEqual(len(socket_mode_envelopes), len(received_socket_mode_requests)) finally: await client.close() - self.loop.stop() - t.join(timeout=5) @async_test async def test_send_message_while_disconnection(self): - if is_ci_unstable_test_skip_enabled(): - # this test tends to fail on the GitHub Actions platform - return - t = Thread(target=start_socket_mode_server(self, 3001)) - t.daemon = True - t.start() - client = SocketModeClient( app_token="xapp-A111-222-xyz", web_client=self.web_client, @@ -119,7 +104,6 @@ async def test_send_message_while_disconnection(self): ) try: - time.sleep(1) # wait for the server client.wss_uri = "ws://0.0.0.0:3001/link" await client.connect() await asyncio.sleep(1) # wait for the message receiver @@ -135,5 +119,3 @@ async def test_send_message_while_disconnection(self): await client.send_message("foo") finally: await client.close() - self.loop.stop() - t.join(timeout=5)