Skip to content

ability to display docker compose output and live logs of the services #112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,18 @@ Get the list of docker_compose commands to be executed for test clean-up actions
Override this fixture in your tests if you need to change clean-up actions.
Returning anything that would evaluate to False will skip this command.

## Docker Live Output

```python
@pytest.fixture(scope="session")
def http_service(docker_ip, docker_services):
docker_services.display_live_logs("service_name")
```

```bash
pytest --capture=tee-sys <test directory>
```

# Development

Use of a virtual environment is recommended. See the
Expand Down
82 changes: 73 additions & 9 deletions src/pytest_docker/plugin.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
from concurrent.futures import Future, ThreadPoolExecutor
import contextlib
import os
import re
import subprocess
import time
import timeit
from typing import Any, Dict, Iterable, Iterator, List, Tuple, Union
from types import TracebackType
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Type, Union

import attr
import pytest
from _pytest.config import Config
from _pytest.fixtures import FixtureRequest

_MAX_LOG_WORKERS = 100


@pytest.fixture
def container_scope_fixture(request: FixtureRequest) -> Any:
Expand All @@ -21,7 +25,7 @@ def containers_scope(fixture_name: str, config: Config) -> Any: # pylint: disab
return config.getoption("--container-scope", "session")


def execute(command: str, success_codes: Iterable[int] = (0,)) -> Union[bytes, Any]:
def execute_and_get_output(command: str, success_codes: Iterable[int] = (0,)) -> Union[bytes, Any]:
"""Run a shell command."""
try:
output = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
Expand All @@ -38,6 +42,20 @@ def execute(command: str, success_codes: Iterable[int] = (0,)) -> Union[bytes, A
return output


def execute(command: str, success_codes: Iterable[int] = (0,)) -> None:
try:
process = subprocess.run(command, stderr=subprocess.STDOUT, shell=True)
returncode = process.returncode
except subprocess.CalledProcessError as error:
returncode = error.returncode
command = error.cmd

if returncode not in success_codes:
raise Exception(
'Command {} returned {}'.format(command, returncode)
)


def get_docker_ip() -> Union[str, Any]:
# When talking to the Docker daemon via a UNIX socket, route all TCP
# traffic to docker containers via the TCP loopback interface.
Expand All @@ -59,9 +77,18 @@ def docker_ip() -> Union[str, Any]:


@attr.s(frozen=True)
class Services:
_docker_compose: Any = attr.ib()
_services: Dict[Any, Dict[Any, Any]] = attr.ib(init=False, default=attr.Factory(dict))
class Services(contextlib.AbstractContextManager): # type: ignore
_docker_compose: "DockerComposeExecutor" = attr.ib()
_services: Dict[Any, Dict[Any, Any]] = attr.ib(
init=False, default=attr.Factory(dict)
)
_live_logs: Dict[str, Future[Any]] = attr.ib(init=False, default=attr.Factory(dict))
_thread_pool_executor: ThreadPoolExecutor = attr.ib(
init=False,
default=attr.Factory(
lambda: ThreadPoolExecutor(max_workers=_MAX_LOG_WORKERS, thread_name_prefix="docker_")
),
)

def port_for(self, service: str, container_port: int) -> int:
"""Return the "host" port for `service` and `container_port`.
Expand All @@ -83,7 +110,7 @@ def port_for(self, service: str, container_port: int) -> int:
if cache is not None:
return cache

output = self._docker_compose.execute("port %s %d" % (service, container_port))
output = self._docker_compose.execute_and_get_output("port %s %d" % (service, container_port))
endpoint = output.strip().decode("utf-8")
if not endpoint:
raise ValueError('Could not detect port for "%s:%d".' % (service, container_port))
Expand Down Expand Up @@ -119,6 +146,36 @@ def wait_until_responsive(

raise Exception("Timeout reached while waiting on service!")

def display_live_logs(self, service: str) -> None:
"""Run `logs` command with the follow flag to show live logs of a service."""
if service in self._live_logs:
return

if len(self._live_logs) == _MAX_LOG_WORKERS:
raise NotImplementedError(
f"""\
{_MAX_LOG_WORKERS} worker threads are supported to display live logs. \
Please submit a PR if you want to change that."""
)

self._live_logs[service] = self._thread_pool_executor.submit(
self._docker_compose.execute, f"logs {service} -f"
)

def close(self) -> None:
for _, fut in self._live_logs.items():
_ = fut.cancel()
self._thread_pool_executor.shutdown(wait=False)

def __exit__(
self,
_exc_type: Optional[Type[BaseException]],
__exc_value: Optional[BaseException],
__traceback: Optional[TracebackType],
) -> None:
self.close()
return None


def str_to_list(arg: Union[str, List[Any], Tuple[Any]]) -> Union[List[Any], Tuple[Any]]:
if isinstance(arg, (list, tuple)):
Expand All @@ -132,12 +189,18 @@ class DockerComposeExecutor:
_compose_files: Any = attr.ib(converter=str_to_list)
_compose_project_name: str = attr.ib()

def execute(self, subcommand: str) -> Union[bytes, Any]:
def execute_and_get_output(self, subcommand: str) -> Union[bytes, Any]:
return execute_and_get_output(self._format_cmd(subcommand))

def execute(self, subcommand: str) -> None:
execute(self._format_cmd(subcommand))

def _format_cmd(self, subcommand: str) -> str:
command = self._compose_command
for compose_file in self._compose_files:
command += ' -f "{}"'.format(compose_file)
command += ' -p "{}" {}'.format(self._compose_project_name, subcommand)
return execute(command)
return command


@pytest.fixture(scope=containers_scope)
Expand Down Expand Up @@ -213,7 +276,8 @@ def get_docker_services(

try:
# Let test(s) run.
yield Services(docker_compose)
with Services(docker_compose) as services:
yield services
finally:
# Clean up.
if docker_cleanup:
Expand Down
Loading