Skip to content

Commit

Permalink
added subprocess test client
Browse files Browse the repository at this point in the history
  • Loading branch information
aranvir authored and cofin committed Aug 25, 2024
1 parent b1f7134 commit 469a572
Show file tree
Hide file tree
Showing 8 changed files with 342 additions and 0 deletions.
38 changes: 38 additions & 0 deletions docs/examples/testing/subprocess_sse_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""
Assemble components into an app that shall be tested
"""

from typing import AsyncIterator

from redis.asyncio import Redis

from litestar import Litestar, get
from litestar.channels import ChannelsPlugin
from litestar.channels.backends.redis import RedisChannelsPubSubBackend
from litestar.response import ServerSentEvent


@get("/notify/{topic:str}")
async def get_notified(topic: str, channels: ChannelsPlugin) -> ServerSentEvent:
async def generator() -> AsyncIterator[bytes]:
async with channels.start_subscription([topic]) as subscriber:
async for event in subscriber.iter_events():
yield event

return ServerSentEvent(generator(), event_type="Notifier")


def create_test_app() -> Litestar:
redis_instance = Redis()
channels_backend = RedisChannelsPubSubBackend(redis=redis_instance)
channels_instance = ChannelsPlugin(backend=channels_backend, arbitrary_channels_allowed=True)

return Litestar(
route_handlers=[
get_notified,
],
plugins=[channels_instance],
)


app = create_test_app()
72 changes: 72 additions & 0 deletions docs/examples/testing/test_subprocess_sse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""
Test the app running in a subprocess
"""

import asyncio
import pathlib
import sys
from typing import AsyncIterator

import httpx
import httpx_sse
import pytest
from redis.asyncio import Redis

from litestar.channels import ChannelsPlugin
from litestar.channels.backends.redis import RedisChannelsPubSubBackend
from litestar.testing import subprocess_async_client

if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

pytestmark = pytest.mark.anyio


@pytest.fixture(scope="session")
def anyio_backend() -> str:
return "asyncio"


ROOT = pathlib.Path(__file__).parent


@pytest.fixture(name="async_client", scope="session")
async def fx_async_client() -> AsyncIterator[httpx.AsyncClient]:
async with subprocess_async_client(workdir=ROOT, app="subprocess_sse_app:app") as client:
yield client


@pytest.fixture(name="redis_channels")
async def fx_redis_channels() -> AsyncIterator[ChannelsPlugin]:
# Expects separate redis set-up
redis_instance = Redis()
channels_backend = RedisChannelsPubSubBackend(redis=redis_instance)
channels_instance = ChannelsPlugin(backend=channels_backend, arbitrary_channels_allowed=True)
await channels_instance._on_startup()
yield channels_instance
await channels_instance._on_shutdown()


async def test_subprocess_async_client(async_client: httpx.AsyncClient, redis_channels: ChannelsPlugin) -> None:
"""Demonstrates functionality of the async client with an infinite SSE source that cannot be tested with the
regular async test client.
"""
topic = "demo"
message = "hello"

running = asyncio.Event()
running.set()

async def send_notifications() -> None:
while running.is_set():
await redis_channels.wait_published(message, channels=[topic])
await asyncio.sleep(0.1)

task = asyncio.create_task(send_notifications())

async with httpx_sse.aconnect_sse(async_client, "GET", f"/notify/{topic}") as event_source:
async for event in event_source.aiter_sse():
assert event.data == message
running.clear()
break
await task
13 changes: 13 additions & 0 deletions docs/usage/testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,19 @@ But also this:
assert response.text == "healthy"
Creating an external test app
-------------------

The test clients make use of the capability to directly load an ASGI app into an httpx Client without having to run an actual server like uvicorn. For most test cases, this is sufficient but there are some situations where this will not work. For example, when using server-sent events with an infinite generator, it will lock up the test client, since it tries to first exhaust the generator and then return to the test function.

Litestar offers two helper functions called :func:`subprocess_sync_client <litestar.testing.client.subprocess_client.subprocess_sync_client>` and :func:`subprocess_async_client <litestar.testing.client.subprocess_client.subprocess_async_client>` that will launch a litestar instance with uvicorn in a subprocess and set up an httpx client for running tests. You can either load your actual app file or create subsets from it as you would with the regular test client setup. An example is shown below.

.. literalinclude:: /examples/testing/subprocess_sse_app.py
:language: python

.. literalinclude:: /examples/testing/test_subprocess_sse.py
:language: python

RequestFactory
--------------

Expand Down
3 changes: 3 additions & 0 deletions litestar/testing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from litestar.testing.client.async_client import AsyncTestClient
from litestar.testing.client.base import BaseTestClient
from litestar.testing.client.subprocess_client import subprocess_async_client, subprocess_sync_client
from litestar.testing.client.sync_client import TestClient
from litestar.testing.helpers import create_async_test_client, create_test_client
from litestar.testing.request_factory import RequestFactory
Expand All @@ -13,4 +14,6 @@
"RequestFactory",
"TestClient",
"WebSocketTestSession",
"subprocess_sync_client",
"subprocess_async_client",
)
70 changes: 70 additions & 0 deletions litestar/testing/client/subprocess_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import pathlib
import socket
import subprocess
import time
from contextlib import asynccontextmanager, contextmanager
from typing import AsyncIterator, Iterator

import httpx


class StartupError(RuntimeError):
pass


def _get_available_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
# Bind to a free port provided by the host
try:
sock.bind(("localhost", 0))
except OSError as e:
raise StartupError("Could not find an open port") from e
else:
port: int = sock.getsockname()[1]
return port


@contextmanager
def run_app(workdir: pathlib.Path, app: str) -> Iterator[str]:
"""Launch a litestar application in a subprocess with a random available port."""
port = _get_available_port()
proc = subprocess.Popen(
args=["litestar", "--app", app, "run", "--port", str(port)],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
cwd=workdir,
)
url = f"http://127.0.0.1:{port}"
for _ in range(100):
try:
httpx.get(url, timeout=0.1)
break
except httpx.TransportError:
time.sleep(1)
yield url
proc.kill()


@asynccontextmanager
async def subprocess_async_client(workdir: pathlib.Path, app: str) -> AsyncIterator[httpx.AsyncClient]:
"""Provides an async httpx client for a litestar app launched in a subprocess.
Args:
workdir: Path to the directory in which the app module resides.
app: Uvicorn app string that can be resolved in the provided working directory, e.g.: "app:app"
"""
with run_app(workdir=workdir, app=app) as url:
async with httpx.AsyncClient(base_url=url) as client:
yield client


@contextmanager
def subprocess_sync_client(workdir: pathlib.Path, app: str) -> Iterator[httpx.Client]:
"""Provides a sync httpx client for a litestar app launched in a subprocess.
Args:
workdir: Path to the directory in which the app module resides.
app: Uvicorn app string that can be resolved in the provided working directory, e.g.: "app:app"
"""
with run_app(workdir=workdir, app=app) as url, httpx.Client(base_url=url) as client:
yield client
Empty file.
38 changes: 38 additions & 0 deletions tests/unit/test_testing/test_sub_client/demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""
Assemble components into an app that shall be tested
"""

from typing import AsyncIterator

from redis.asyncio import Redis

from litestar import Litestar, get
from litestar.channels import ChannelsPlugin
from litestar.channels.backends.redis import RedisChannelsPubSubBackend
from litestar.response import ServerSentEvent


@get("/notify/{topic:str}")
async def get_notified(topic: str, channels: ChannelsPlugin) -> ServerSentEvent:
async def generator() -> AsyncIterator[bytes]:
async with channels.start_subscription([topic]) as subscriber:
async for event in subscriber.iter_events():
yield event

return ServerSentEvent(generator(), event_type="Notifier")


def create_test_app() -> Litestar:
redis_instance = Redis()
channels_backend = RedisChannelsPubSubBackend(redis=redis_instance)
channels_instance = ChannelsPlugin(backend=channels_backend, arbitrary_channels_allowed=True)

return Litestar(
route_handlers=[
get_notified,
],
plugins=[channels_instance],
)


app = create_test_app()
108 changes: 108 additions & 0 deletions tests/unit/test_testing/test_sub_client/test_subprocess_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""
Test the app running in a subprocess
"""

import asyncio
import pathlib
import sys
import threading
from typing import Any, AsyncIterator, Iterator

import httpx
import httpx_sse
import pytest
from redis.asyncio import Redis

from litestar.channels import ChannelsPlugin
from litestar.channels.backends.redis import RedisChannelsPubSubBackend
from litestar.testing import subprocess_async_client, subprocess_sync_client

if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

pytestmark = pytest.mark.anyio


@pytest.fixture(scope="session")
def anyio_backend() -> str:
return "asyncio"


ROOT = pathlib.Path(__file__).parent


@pytest.fixture(name="async_client", scope="session")
async def fx_async_client() -> AsyncIterator[httpx.AsyncClient]:
async with subprocess_async_client(workdir=ROOT, app="demo:app") as client:
yield client


@pytest.fixture(name="sync_client", scope="session")
def fx_sync_client() -> Iterator[httpx.Client]:
with subprocess_sync_client(workdir=ROOT, app="demo:app") as client:
yield client


@pytest.fixture(name="redis_channels")
async def fx_redis_channels(redis_service: Any) -> AsyncIterator[ChannelsPlugin]:
redis_instance = Redis()
channels_backend = RedisChannelsPubSubBackend(redis=redis_instance)
channels_instance = ChannelsPlugin(backend=channels_backend, arbitrary_channels_allowed=True)
await channels_instance._on_startup()
yield channels_instance
await channels_instance._on_shutdown()


async def test_subprocess_async_client(async_client: httpx.AsyncClient, redis_channels: ChannelsPlugin) -> None:
"""Demonstrates functionality of the async client with an infinite SSE source that cannot be tested with the
regular async test client.
"""
topic = "demo"
message = "hello"

running = asyncio.Event()
running.set()

async def send_notifications() -> None:
while running.is_set():
await redis_channels.wait_published(message, channels=[topic])
await asyncio.sleep(0.1)

task = asyncio.create_task(send_notifications())

async with httpx_sse.aconnect_sse(async_client, "GET", f"/notify/{topic}") as event_source:
async for event in event_source.aiter_sse():
assert event.data == message
running.clear()
break
await task


async def test_subprocess_sync_client(sync_client: httpx.Client, redis_channels: ChannelsPlugin) -> None:
"""Demonstrates functionality of the sync client with an infinite SSE source that cannot be tested with the
regular sync test client.
"""
topic = "demo"
message = "hello"

running = threading.Event()
running.set()

async def send_notifications() -> None:
while running.is_set():
await redis_channels.wait_published(message, channels=[topic])
await asyncio.sleep(0.1)

task = asyncio.create_task(send_notifications())

def consume_notifications() -> None:
with httpx_sse.connect_sse(sync_client, "GET", f"/notify/{topic}") as event_source:
for event in event_source.iter_sse():
assert event.data == message
running.clear()
break

thread_consume = threading.Thread(target=consume_notifications, daemon=True)
thread_consume.start()
await task
thread_consume.join()

0 comments on commit 469a572

Please sign in to comment.