Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update proxy #86

Merged
merged 6 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 67 additions & 34 deletions pyleco/coordinators/proxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,9 @@
methods
-------
pub_sub_proxy
Listens on port 11100 and publishes to 11099, available to all IP addresses.
pub_sub_remote_proxy
Connects two local proxy servers on different computers, connecting to their
publisher port at 11099 and publishing to the local proxy at 11100
Run a publisher subscriber proxy in the current thread (blocking).
start_proxy
Start a proxy server of either type. If it is merely local, a local server
is started, otherwise a connecting proxy.
Start a proxy server, either local or remote, in its own thread.


Execute this module to start a proxy server. If no remote connection given, a
Expand All @@ -44,10 +40,10 @@
-s NAME/IP Subscribe to the local proxy of some other computer
-p NAME/IP Publish to the local proxy of some other computer


Created on Mon Jun 27 09:57:05 2022 by Benedikt Moneke
Created on Mon Jun 27 09:57:05 2022 by Benedikt Burger
"""

from __future__ import annotations
import logging
import threading
from typing import Optional
Expand All @@ -66,8 +62,14 @@


# Technical method to start the proxy server. Use `start_proxy` instead.
def pub_sub_proxy(context: zmq.Context, captured: bool = False, sub: str = "localhost",
pub: str = "localhost", offset: int = 0):
def pub_sub_proxy(
context: zmq.Context,
captured: bool = False,
sub: str = "localhost",
pub: str = "localhost",
offset: int = 0,
event: Optional[threading.Event] = None,
) -> None:
"""Run a publisher subscriber proxy in the current thread (blocking)."""
s: zmq.Socket = context.socket(zmq.XSUB)
p: zmq.Socket = context.socket(zmq.XPUB)
Expand All @@ -77,8 +79,10 @@
s.bind(f"tcp://*:{_port}")
p.bind(f"tcp://*:{_port - 1}")
else:
log.info(f"Start remote proxy server subsribing to {sub}:{_port - 1} and publishing to "
f"{pub}:{_port}.")
log.info(

Check warning on line 82 in pyleco/coordinators/proxy_server.py

View check run for this annotation

Codecov / codecov/patch

pyleco/coordinators/proxy_server.py#L82

Added line #L82 was not covered by tests
f"Start remote proxy server subsribing to {sub}:{_port - 1} and publishing to "
f"{pub}:{_port}."
)
s.connect(f"tcp://{sub}:{port -1 - 2 * offset}")
p.connect(f"tcp://{pub}:{port - 2 * offset}")

Expand All @@ -88,6 +92,8 @@
c.bind("inproc://capture")
else:
c = None # type: ignore
if event is not None:
event.set()

Check warning on line 96 in pyleco/coordinators/proxy_server.py

View check run for this annotation

Codecov / codecov/patch

pyleco/coordinators/proxy_server.py#L96

Added line #L96 was not covered by tests
try:
zmq.proxy_steerable(p, s, capture=c)
except zmq.ContextTerminated:
Expand All @@ -96,8 +102,13 @@
log.exception("Some other exception on proxy happened.", exc)


def start_proxy(context: Optional[zmq.Context] = None, captured: bool = False,
sub: str = "localhost", pub: str = "localhost", offset: int = 0) -> zmq.Context:
def start_proxy(
context: Optional[zmq.Context] = None,
captured: bool = False,
sub: str = "localhost",
pub: str = "localhost",
offset: int = 0,
) -> zmq.Context:
"""Start a proxy server, either local or remote, in its own thread.

Examples:
Expand All @@ -116,65 +127,87 @@
# Stop the proxy:
c.destroy()

:param context: The zmq context. If None, it generates its own context.
:param context: The zmq context.
:param bool captured: Print the captured messages.
:param str sub: Name or IP Address of the server to subscribe to.
:param str pub: Name or IP Address of the server to publish to.
:param offset: How many servers (pairs of ports) to offset from the base one.
:return: The zmq context. To stop, call `context.destroy()`.
"""
context = context or zmq.Context.instance()
thread = threading.Thread(target=pub_sub_proxy, args=(context, captured, sub, pub, offset))
event = threading.Event()
thread = threading.Thread(

Check warning on line 139 in pyleco/coordinators/proxy_server.py

View check run for this annotation

Codecov / codecov/patch

pyleco/coordinators/proxy_server.py#L138-L139

Added lines #L138 - L139 were not covered by tests
target=pub_sub_proxy, args=(context, captured, sub, pub, offset, event)
)
thread.daemon = True
thread.start()
started = event.wait(1)

Check warning on line 144 in pyleco/coordinators/proxy_server.py

View check run for this annotation

Codecov / codecov/patch

pyleco/coordinators/proxy_server.py#L144

Added line #L144 was not covered by tests
if not started:
raise TimeoutError("Starting of proxy server failed.")

Check warning on line 146 in pyleco/coordinators/proxy_server.py

View check run for this annotation

Codecov / codecov/patch

pyleco/coordinators/proxy_server.py#L146

Added line #L146 was not covered by tests
log.info("Proxy thread started.")
return context


def main() -> None:
def main(
arguments: Optional[list[str]] = None, stop_event: Optional[threading.Event] = None
) -> None:
from pyleco.utils.parser import ArgumentParser, parse_command_line_parameters

parser = ArgumentParser(prog="Proxy server")
parser.add_argument("-s", "--sub", help="set the host name to subscribe to",
default="localhost")
parser.add_argument(

Check warning on line 157 in pyleco/coordinators/proxy_server.py

View check run for this annotation

Codecov / codecov/patch

pyleco/coordinators/proxy_server.py#L157

Added line #L157 was not covered by tests
"-s", "--sub", help="set the host name to subscribe to", default="localhost"
)
parser.add_argument("-p", "--pub", help="set the host name to publish to", default="localhost")
parser.add_argument("-v", "--verbose", action="count", default=0,
help="increase the logging level by one, may be used more than once")
parser.add_argument("-c", "--captured", action="store_true", default=False,
help="log all messages sent through the proxy")
parser.add_argument(

Check warning on line 161 in pyleco/coordinators/proxy_server.py

View check run for this annotation

Codecov / codecov/patch

pyleco/coordinators/proxy_server.py#L161

Added line #L161 was not covered by tests
"-v",
"--verbose",
action="count",
default=0,
help="increase the logging level by one, may be used more than once",
)
parser.add_argument(

Check warning on line 168 in pyleco/coordinators/proxy_server.py

View check run for this annotation

Codecov / codecov/patch

pyleco/coordinators/proxy_server.py#L168

Added line #L168 was not covered by tests
"-c",
"--captured",
action="store_true",
default=False,
help="log all messages sent through the proxy",
)
parser.add_argument("-o", "--offset", help="shifting the port numbers.", default=0, type=int)
kwargs = parse_command_line_parameters(parser=parser, logger=log, logging_default=logging.INFO)
kwargs = parse_command_line_parameters(

Check warning on line 176 in pyleco/coordinators/proxy_server.py

View check run for this annotation

Codecov / codecov/patch

pyleco/coordinators/proxy_server.py#L176

Added line #L176 was not covered by tests
parser=parser, logger=log, arguments=arguments, logging_default=logging.INFO
)

log.addHandler(logging.StreamHandler())
if kwargs.get("captured"):
log.setLevel(logging.DEBUG)
merely_local = kwargs.get("pub") == "localhost" and kwargs.get("sub") == "localhost"

if not merely_local:
log.info(f"Remote proxy from {kwargs.get('sub', 'localhost')} "
f"to {kwargs.get('pub', 'localhost')}.")
log.info(

Check warning on line 186 in pyleco/coordinators/proxy_server.py

View check run for this annotation

Codecov / codecov/patch

pyleco/coordinators/proxy_server.py#L186

Added line #L186 was not covered by tests
f"Remote proxy from {kwargs.get('sub', 'localhost')} "
f"to {kwargs.get('pub', 'localhost')}."
)
else:
log.info(
"This data broker manages the data between measurement software, "
f"which publishes on port {port}, and all the consumers of data "
f" (DataLogger, Beamprofiler etc.), which subscribe on port {port - 1}."
)

context = start_proxy(**kwargs)
context = zmq.Context()
start_proxy(context=context, **kwargs)

Check warning on line 197 in pyleco/coordinators/proxy_server.py

View check run for this annotation

Codecov / codecov/patch

pyleco/coordinators/proxy_server.py#L196-L197

Added lines #L196 - L197 were not covered by tests
if merely_local:
start_proxy(offset=1) # for log entries
start_proxy(context=context, offset=1) # for log entries

Check warning on line 199 in pyleco/coordinators/proxy_server.py

View check run for this annotation

Codecov / codecov/patch

pyleco/coordinators/proxy_server.py#L199

Added line #L199 was not covered by tests
reader = context.socket(zmq.SUB)
reader.connect("inproc://capture")
reader.subscribe(b"")
poller = zmq.Poller()
poller.register(reader, zmq.POLLIN)
while True:
if input("Quit with q:") == "q":
context.destroy()
break
if socks := dict(poller.poll(100)):
while stop_event is None or not stop_event.is_set():
if socks := dict(poller.poll(1)):
if reader in socks:
received = reader.recv_multipart()
log.debug(f"Message brokered: {received}")
context.term()

Check warning on line 210 in pyleco/coordinators/proxy_server.py

View check run for this annotation

Codecov / codecov/patch

pyleco/coordinators/proxy_server.py#L210

Added line #L210 was not covered by tests


if __name__ == "__main__": # pragma: no cover
Expand Down
108 changes: 108 additions & 0 deletions tests/coordinators/test_proxy_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#
# This file is part of the PyLECO package.
#
# Copyright (c) 2023-2024 PyLECO Developers
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#

"""Hangs infinitely on import on github CI."""

# from __future__ import annotations
# import threading
# from typing import Optional

# import pytest
# import zmq

# from pyleco.core import (
# PROXY_RECEIVING_PORT,
# PROXY_SENDING_PORT,
# )
# from pyleco.test import FakeContext, FakeSocket

# from pyleco.coordinators.proxy_server import pub_sub_proxy, start_proxy

# parameters: tuple[FakeSocket, FakeSocket, Optional[FakeSocket]]


# @pytest.fixture
# def fake_proxy_steerable(monkeypatch: pytest.MonkeyPatch) -> None:
# def _fake_proxy_steerable(
# frontend: FakeSocket, backend: FakeSocket, capture: Optional[FakeSocket] = None
# ):
# global parameters
# parameters = frontend, backend, capture
# raise zmq.ContextTerminated

# monkeypatch.setattr("zmq.proxy_steerable", _fake_proxy_steerable)


# class Test_pub_sub_proxy:
# def test_default_config(self, fake_proxy_steerable):
# pub_sub_proxy(FakeContext()) # type: ignore
# global parameters
# f, b, c = parameters
# assert f.addr == f"tcp://*:{PROXY_SENDING_PORT}"
# assert b.addr == f"tcp://*:{PROXY_RECEIVING_PORT}"
# assert c is None

# def test_event_set_for_successful_binding(self, fake_proxy_steerable):
# event = threading.Event()
# pub_sub_proxy(FakeContext(), event=event) # type: ignore
# assert event.is_set()

# @pytest.mark.parametrize(
# "pub, sub",
# (
# ("localhost", "remote"),
# ("remote", "localhost"),
# ("a", "b"),
# ),
# )
# def test_remote_configuration(self, pub: str, sub: str, fake_proxy_steerable):
# pub_sub_proxy(FakeContext(), sub=sub, pub=pub) # type: ignore
# global parameters
# f, b, c = parameters
# assert f.addr == f"tcp://{pub}:{PROXY_RECEIVING_PORT}"
# assert b.addr == f"tcp://{sub}:{PROXY_SENDING_PORT}"

# def test_capture(self, fake_proxy_steerable):
# pub_sub_proxy(context=FakeContext(), captured=True) # type: ignore
# global parameters
# f, b, c = parameters # type: ignore
# c: FakeSocket
# assert c.addr == "inproc://capture"
# assert c.socket_type == zmq.PUB


# def test_start_proxy():
# context = start_proxy()
# # assert no error is raised
# context.term()


# @pytest.mark.skip(reason="Hangs infinitely")
# def test_start_proxy_fails_if_already_started():
# # arrange
# context = start_proxy()
# with pytest.raises(TimeoutError):
# start_proxy()
# # assert no error is raised
# context.term()