Skip to content

Commit

Permalink
Merge branch 'feature/mllp-addition'
Browse files Browse the repository at this point in the history
  • Loading branch information
acv committed Nov 17, 2024
2 parents 7731ed1 + 5173c05 commit 277287a
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 8 deletions.
91 changes: 89 additions & 2 deletions src/hl7lw/mllp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,50 @@
START_BYTE = b'\x0B'
END_BYTES = b'\x1C\x0D'
BUFSIZE = 4096
MAX_MESSAGE_SIZE = 1 * 1024 * 1024 # 1 MB is probably reasonable.


class MllpClient:
"""
MllpClient provides a simple API for a client to talk to an server, whether an
integration engine, a RIS, a HIS, a PACS, whatever it is.
The main assumption made is that messages will be sent one at a time, over a
single connection. Encryption is not currently supported.
There is a 1MB limit for the messages out of the box to control the memory usage,
this can be changed by setting `hl7lw.mllp.MAX_MESSAGE_SIZE` to another value.
The basic usage goes like:
```
c = MllpClient()
c.connect(host="127.0.0.1", port=1234)
c.send(message)
ack = c.recv()
c.close()
```
"""
def __init__(self) -> None:
self.socket: Optional[socket.socket] = None
self.connected: bool = False
self.host: Optional[str] = None
self.port: Optional[int] = None
self.buffer: bytes = b''

def close(self):
def is_connected(self) -> bool:
"""
Used to check if connected.
"""
return self.connected

def close(self) -> None:
"""
Close the connection, also resets the internal state.
Raises an `MllpConnectionError` if called on a closed connection.
"""
if self.connected:
self.connected = False
self.buffer = b''
Expand All @@ -26,6 +59,17 @@ def close(self):
raise MllpConnectionError("Not connected!")

def connect(self, host: str, port: int) -> None:
"""
Connect to an `host` and `port`. Host will be resolved and it resolves to
multiple IPv4 or IPv6 addresses, they will all be tried before giving up,
see `socked.create_connection()` for details.
If already connected, the previous connection is closed and the new connection
is then opened.
Network related exceptions will get wrapped into an `MllpConnectionError` and
then returned.
"""
self.host = host
self.port = port
if self.connected:
Expand All @@ -42,6 +86,20 @@ def connect(self, host: str, port: int) -> None:
self.connected = True

def send(self, message: bytes, auto_reconnect: bool = True) -> None:
"""
Send a `message` over the connection.
If not connected and the `auto_reconnect` option is enabled, `connect()` will be
called with the last connection details used. If `connect()` had yet to be called,
no connection details will be available and an `MllpConnectionError` will be
raised instead.
Should sending fail with an exception, the connection will be closed before the
exception is re-raised as an `MllpConnectionError`. There's no way to know how
much of the message was sent.
MLLP framing is handled by the method.
"""
if not self.connected:
if auto_reconnect:
if self.host is None or self.port is None:
Expand All @@ -58,6 +116,27 @@ def send(self, message: bytes, auto_reconnect: bool = True) -> None:
raise MllpConnectionError("Failed to send message to client.") from e

def recv(self) -> bytes:
"""
Receive a message from the connection. If there's multiple messages to
be read, this method needs to be called repeatedly. The method will block
until a message is available.
If the connection is not connected, an `MllpConnectionError` will be raised.
If there's any network error, an `MllpConnectionError` will be raised.
If a message being read exceeds the maximum size, an `MllpConnectionError`
will be raised.
Whenever an exception is raised, the connection will also be closed if it
wasn't closed already.
WARNING: You are responsible to use this method to consume the ACKs from
the other system. If you do not consume the ACKs, eventually the network
buffers will fill and the other side will block trying to send a ACK. If
you write an application that loops sending messages and it keeps stalling
it might be that you are not consumming the ACKs!
"""
if not self.connected:
# No point in connecting. Clients aren't normally polling in MLLP.
# Maybe if asynch ACKs are used? But this client implementation really
Expand All @@ -69,21 +148,29 @@ def recv(self) -> bytes:
self.buffer = b''
while True:
try:
# This is slow for very large messages.
buffer += self.socket.recv(BUFSIZE)
except Exception as e:
self.connected = False
self.socket.close()
raise MllpConnectionError("Failed to read from socket, closing it.") from e
start = buffer.find(START_BYTE)
if start == -1:
# This only happens if buffer is only junk, so get rid of it to minimize
# memory footprint.
buffer = b''
else:
elif start > 0:
# START_BYTE at index 0 is expected normal when message is split
buffer = buffer[start:]
end = buffer.find(END_BYTES)
if end != -1:
message = buffer[:end]
self.buffer = buffer[end:]
return message[1:] # Discard leading START_BYTE
if len(buffer) > MAX_MESSAGE_SIZE:
self.connected = False
self.socket.close()
raise MllpConnectionError(f"Maximum messages size {MAX_MESSAGE_SIZE} exceeded!")


class MllpServer:
Expand Down
28 changes: 22 additions & 6 deletions tests/test_mllp.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest
from unittest.mock import call
import src.hl7lw.mllp
from src.hl7lw.mllp import MllpClient, MllpServer, START_BYTE, END_BYTES
from src.hl7lw.exceptions import MllpConnectionError

Expand All @@ -11,7 +12,7 @@ def test_client_connect(mocker) -> None:
c.connect(host='test', port=1234)
mock_create_connection.assert_called_once_with(('test', 1234))
assert c.socket is sentinel_socket
assert c.connected
assert c.is_connected()
assert c.host == 'test'
assert c.port == 1234

Expand All @@ -25,7 +26,7 @@ def test_client_connect_twice(mocker) -> None:
assert mock_create_connection.call_count == 2
assert mock_socket.close.called
assert mock_socket.close.call_count == 1
assert c.connected
assert c.is_connected()
assert c.host == 'test2'
assert c.port == 1234

Expand Down Expand Up @@ -76,6 +77,21 @@ def test_get_message_no_connected(mocker) -> None:
c.recv()


def test_get_message_too_big(mocker) -> None:
c = MllpClient()
mock_socket = mocker.patch('socket.socket')
junk = [START_BYTE]
l = b"A" * 1024
for i in range(1100):
# 1.1MB of junk.
junk.append(l)
mock_socket.recv.side_effect = junk
mocker.patch("socket.create_connection", return_value=mock_socket)
c.connect(host='test', port=1234)
with pytest.raises(MllpConnectionError, match=r'^Maximum messages size.*'):
c.recv()


def test_get_message_fragmented(mocker, trivial_a08: bytes) -> None:
c = MllpClient()
mock_socket = mocker.patch('socket.socket')
Expand Down Expand Up @@ -128,10 +144,10 @@ def test_send_message_not_connected_default(mocker, trivial_a08: bytes) -> None:
mock_socket = mocker.patch('socket.socket')
mocker.patch("socket.create_connection", return_value=mock_socket)
c.connect(host='test', port=1234)
assert c.connected == True
assert c.is_connected()
c.close()
assert mock_socket.close.called
assert c.connected == False
assert not c.is_connected()
c.send(trivial_a08)
assert mock_socket.sendall.call_args == call(START_BYTE + trivial_a08 + END_BYTES)

Expand All @@ -147,10 +163,10 @@ def test_send_message_not_connected_no_auto_reconnect(mocker, trivial_a08: bytes
mock_socket = mocker.patch('socket.socket')
mocker.patch("socket.create_connection", return_value=mock_socket)
c.connect(host='test', port=1234)
assert c.connected == True
assert c.is_connected()
c.close()
assert mock_socket.close.called
assert c.connected == False
assert not c.is_connected()
with pytest.raises(MllpConnectionError, match=r'^Not connected!'):
c.send(trivial_a08, auto_reconnect=False)

Expand Down

0 comments on commit 277287a

Please sign in to comment.