Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: improve flaky tests #1615

Merged
merged 1 commit into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 71 additions & 72 deletions tests/slack_sdk/socket_mode/mock_socket_mode_server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import asyncio
import logging
import os
import threading
import time
from typing import Any, Dict
from urllib.error import URLError
from urllib.request import urlopen
from unittest import TestCase

from aiohttp import WSMsgType, web

Expand All @@ -28,89 +33,40 @@
socket_mode_disconnect_message = """{"type":"disconnect","reason":"too_many_websockets","num_connections":2,"debug_info":{"host":"applink-111-xxx"},"connection_info":{"app_id":"A111"}}"""


def start_socket_mode_server(self, port: int):
def start_thread_socket_mode_server(self, port: int):
logger = logging.getLogger(__name__)
state = {}
state: Dict[str, Any] = {}

def reset_server_state():
state.update(
hello_sent=False,
disconnect=False,
envelopes_to_consume=list(socket_mode_envelopes),
)

self.reset_server_state = reset_server_state

async def link(request):
ws = web.WebSocketResponse()
await ws.prepare(request)

async for msg in ws:
if msg.type != WSMsgType.TEXT:
continue

message = msg.data
logger.debug(f"Server received a message: {message}")

if not state["hello_sent"]:
state["hello_sent"] = True
await ws.send_str(socket_mode_hello_message)

if state["envelopes_to_consume"]:
e = state["envelopes_to_consume"].pop(0)
logger.debug(f"Send an envelope: {e}")
await ws.send_str(e)

await ws.send_str(message)

return ws

app = web.Application()
app.add_routes([web.get("/link", link)])
runner = web.AppRunner(app)

def run_server():
reset_server_state()

self.loop = loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(runner.setup())
site = web.TCPSite(runner, "127.0.0.1", port, reuse_port=True)
loop.run_until_complete(site.start())
async def health(request: web.Request):
wr = web.Response()
await wr.prepare(request)
wr.set_status(200)
return wr

# run until it's stopped from the main thread
loop.run_forever()

loop.run_until_complete(runner.cleanup())
loop.run_until_complete(asyncio.sleep(1))
loop.close()

return run_server


def start_socket_mode_server_with_disconnection(self, port: int):
logger = logging.getLogger(__name__)
state = {}

def reset_server_state():
state.update(
hello_sent=False,
disconnect_sent=False,
envelopes_to_consume=list(socket_mode_envelopes),
)

self.reset_server_state = reset_server_state
async def disconnect(request: web.Request):
state["disconnect"] = True
wr = web.Response()
await wr.prepare(request)
wr.set_status(200)
return wr

async def link(request):
disconnected = False
connected = True
ws = web.WebSocketResponse()
await ws.prepare(request)

async for msg in ws:
# To ensure disconnect message is received and handled,
# need to keep this ws alive to bypass client ping-pong check.
if msg.type == WSMsgType.PING:
t = time.time()
await ws.pong(f"sdk-ping-pong:{t}")
await ws.pong(f"sdk-ping-pong:{time.time()}")
continue
if msg.type != WSMsgType.TEXT:
continue
Expand All @@ -122,14 +78,14 @@ async def link(request):
state["hello_sent"] = True
await ws.send_str(socket_mode_hello_message)

if not state["disconnect_sent"]:
if state["disconnect"]:
state["hello_sent"] = False
state["disconnect_sent"] = True
disconnected = True
state["disconnect"] = False
connected = False
await ws.send_str(socket_mode_disconnect_message)
logger.debug(f"Disconnect message sent")
logger.debug("Disconnect message sent")

if state["envelopes_to_consume"] and not disconnected:
if state["envelopes_to_consume"] and connected:
e = state["envelopes_to_consume"].pop(0)
logger.debug(f"Send an envelope: {e}")
await ws.send_str(e)
Expand All @@ -139,7 +95,13 @@ async def link(request):
return ws

app = web.Application()
app.add_routes([web.get("/link", link)])
app.add_routes(
[
web.get("/link", link),
web.get("/health", health),
web.get("/disconnect", disconnect),
]
)
runner = web.AppRunner(app)

def run_server():
Expand All @@ -155,7 +117,44 @@ def run_server():
loop.run_forever()

loop.run_until_complete(runner.cleanup())
loop.run_until_complete(asyncio.sleep(1))
loop.close()

return run_server


def start_socket_mode_server(test, port: int):
test.sm_thread = threading.Thread(target=start_thread_socket_mode_server(test, port))
test.sm_thread.daemon = True
test.sm_thread.start()
wait_for_socket_mode_server(port, 4)


def stop_socket_mode_server(test: TestCase):
# An event loop runs in a thread and executes all callbacks and Tasks in
# its thread. While a Task is running in the event loop, no other Tasks
# can run in the same thread. When a Task executes an await expression, the
# running Task gets suspended, and the event loop executes the next Task.
# To schedule a callback from another OS thread, the loop.call_soon_threadsafe() method should be used.
# https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading
test.loop.call_soon_threadsafe(test.loop.stop)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this line is creating more stability; taken from bolt-python

test.sm_thread.join(timeout=5)


def wait_for_socket_mode_server(port: int, timeout: int):
start_time = time.time()
while (time.time() - start_time) < timeout:
try:
urlopen(f"http://127.0.0.1:{port}/health")
return
except URLError:
time.sleep(0.01)


def request_socket_mode_server_disconnect(port: int, timeout: int):
start_time = time.time()
while (time.time() - start_time) < timeout:
try:
urlopen(f"http://127.0.0.1:{port}/disconnect")
return
except URLError:
time.sleep(0.01)
18 changes: 3 additions & 15 deletions tests/slack_sdk/socket_mode/test_interactions_builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import time
import unittest
from random import randint
from threading import Thread

import pytest

Expand All @@ -17,6 +16,7 @@
start_socket_mode_server,
socket_mode_envelopes,
socket_mode_hello_message,
stop_socket_mode_server,
)
from tests.slack_sdk.socket_mode.mock_web_api_handler import MockHandler
from tests.mock_web_api_server import setup_mock_web_api_server, cleanup_mock_web_api_server
Expand All @@ -33,9 +33,11 @@ def setUp(self):
token="xoxb-api_test",
base_url="http://localhost:8888",
)
start_socket_mode_server(self, 3011)

def tearDown(self):
cleanup_mock_web_api_server(self)
stop_socket_mode_server(self)

def test_buffer_size_validation(self):
try:
Expand All @@ -54,11 +56,6 @@ def test_interactions(self):
# we believe that the same situation never happens in the production usage.
sys.setrecursionlimit(10000)

t = Thread(target=start_socket_mode_server(self, 3011))
t.daemon = True
t.start()
time.sleep(2) # wait for the server

try:
buffer_size_list = [1024, 9000, 35, 49] + list([randint(16, 128) for _ in range(10)])
for buffer_size in buffer_size_list:
Expand Down Expand Up @@ -123,17 +120,10 @@ def socket_mode_request_handler(client: BaseSocketModeClient, request: SocketMod
# Restore the default value
sys.setrecursionlimit(default_recursion_limit)
client.close()
self.loop.stop()
t.join(timeout=5)

self.logger.info(f"Passed with buffer size: {buffer_size_list}")

def test_send_message_while_disconnection(self):
t = Thread(target=start_socket_mode_server(self, 3011))
t.daemon = True
t.start()
time.sleep(2) # wait for the server

try:
self.reset_server_state()
client = SocketModeClient(
Expand All @@ -157,5 +147,3 @@ def test_send_message_while_disconnection(self):
client.send_message("foo")
finally:
client.close()
self.loop.stop()
t.join(timeout=5)
17 changes: 3 additions & 14 deletions tests/slack_sdk/socket_mode/test_interactions_websocket_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import time
import unittest
from random import randint
from threading import Thread

from websocket import WebSocketException

Expand All @@ -17,6 +16,7 @@
start_socket_mode_server,
socket_mode_envelopes,
socket_mode_hello_message,
stop_socket_mode_server,
)
from tests.slack_sdk.socket_mode.mock_web_api_handler import MockHandler
from tests.mock_web_api_server import setup_mock_web_api_server, cleanup_mock_web_api_server
Expand All @@ -31,15 +31,13 @@ def setUp(self):
token="xoxb-api_test",
base_url="http://localhost:8888",
)
start_socket_mode_server(self, 3012)

def tearDown(self):
cleanup_mock_web_api_server(self)
stop_socket_mode_server(self)

def test_interactions(self):
t = Thread(target=start_socket_mode_server(self, 3012))
t.daemon = True
t.start()

received_messages = []
received_socket_mode_requests = []

Expand All @@ -63,7 +61,6 @@ def socket_mode_request_handler(client: BaseSocketModeClient, request: SocketMod
client.socket_mode_request_listeners.append(socket_mode_request_handler)

try:
time.sleep(1) # wait for the server
client.wss_uri = "ws://0.0.0.0:3012/link"
client.connect()
time.sleep(1) # wait for the message receiver
Expand Down Expand Up @@ -91,17 +88,11 @@ def socket_mode_request_handler(client: BaseSocketModeClient, request: SocketMod
self.assertEqual(len(socket_mode_envelopes), len(received_socket_mode_requests))
finally:
client.close()
self.loop.stop()
t.join(timeout=5)

def test_send_message_while_disconnection(self):
if is_ci_unstable_test_skip_enabled():
# this test tends to fail on the GitHub Actions platform
return
t = Thread(target=start_socket_mode_server(self, 3012))
t.daemon = True
t.start()
time.sleep(2) # wait for the server

try:
client = SocketModeClient(
Expand Down Expand Up @@ -129,5 +120,3 @@ def test_send_message_while_disconnection(self):
client.send_message("foo")
finally:
client.close()
self.loop.stop()
t.join(timeout=5)
Loading
Loading