From 0bae814de0c3cf75c6efb56805af3b953d8ee9d2 Mon Sep 17 00:00:00 2001 From: agricolab Date: Mon, 9 Dec 2019 18:37:25 +0100 Subject: [PATCH] Create and test with mock-localite --- .coveragerc | 3 + localite/flow/loc.py | 149 ++++++++++++----------------------- test/flow/mock_localite.py | 155 +++++++++++++++++++++++++++++++++++++ test/flow/test_loc.py | 55 ++++++++++--- 4 files changed, 253 insertions(+), 109 deletions(-) create mode 100644 .coveragerc create mode 100644 test/flow/mock_localite.py diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..73032d1 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,3 @@ +# [run] +# omit = + diff --git a/localite/flow/loc.py b/localite/flow/loc.py index bf3a60b..2bb2f6b 100644 --- a/localite/flow/loc.py +++ b/localite/flow/loc.py @@ -3,9 +3,18 @@ import pylsl import threading import time +from typing import List from pylsl import local_clock from localite.flow.payload import Queue, get_from_queue, put_in_queue, Payload + +ignored_localite_messages = [ + {"pointer_status": "BLOCKED"}, + {"reference_status": "BLOCKED"}, + {"coil_1_status": "BLOCKED"}, + {"coil_0_status": "BLOCKED"}, +] + # %% class localiteClient: """ @@ -41,27 +50,24 @@ def close(self): self.socket.close() del self.socket - def write(self, data): - self.socket.sendall(data.encode("ascii")) + def write(self, msg: str): + self.socket.sendall(msg.encode("ascii")) return self - def read_byte(self, counter, buffer): - """read next byte from the TCP/IP bytestream and decode as ASCII""" - if counter is None: - counter = 0 - char = self.socket.recv(1).decode("ASCII") - buffer.append(char) - counter += {"{": 1, "}": -1}.get(char, 0) - return counter, buffer - - def read(self): - "parse the message until it is a valid json" - counter = None - buffer = [] - while counter is not 0: - counter, buffer = self.read_byte(counter, buffer) - response = "".join(buffer) - return self.decode(response) + def read(self) -> dict: + "parse the message" + msg = bytearray(b" ") + while True: + try: + prt = self.socket.recv(1) + msg += prt + msg = json.loads(msg.decode("ascii")) + return msg + except json.JSONDecodeError as e: # pragma no cover + pass + except Exception as e: # pragma no cover + print("locCLIENT:READ:", e) + return None def listen(self): self.connect() @@ -69,32 +75,24 @@ def listen(self): self.close() return msg - def decode(self, msg: str, index=0): - # msg = msg.replace("reason", '"reason"') # catches and interface error - try: - decoded = json.loads(msg) - except json.JSONDecodeError: - print("JSONDecodeError for:", msg) - - key = list(decoded.keys())[index] - val = decoded[key] - return key, val - def send(self, msg: str): self.connect() self.write(msg) self.close() - def request(self, msg="coil_0_amplitude"): + def request(self, msg: str = '{"get":"coil_0_amplitude"}'): self.connect() - msg = '{"get":"' + msg + '"}' self.write(msg) key = val = "" - _, expected = self.decode(msg) + expected = json.loads(msg)["get"] while key != expected: - key, val = self.read() + answer = self.read() + key = list(answer.keys())[0] + val = answer[key] + val = None if val == "NONE" else val + print("locCLIENT:RECV", key, val) self.close() - return None if val == "NONE" else val + return json.dumps({key: val}) class LOC(threading.Thread): @@ -106,90 +104,41 @@ def __init__(self, outbox: Queue, inbox: Queue, host: str, port: int = 6666): self.port = port self.is_running = threading.Event() - def await_running(self): + def await_running(self): # pragma no cover while not self.is_running.is_set(): pass def run(self): self.is_running.set() client = localiteClient(host=self.host, port=self.port) + print("Starting LOC") while self.is_running.is_set(): payload = get_from_queue(self.inbox) if payload is None: - continue - if payload.fmt == "cmd": + msg = client.listen() + print("LOC:MSG", msg) + if msg in ignored_localite_messages: + continue + else: + pl = Payload("mrk", msg, local_clock()) + put_in_queue(pl, self.outbox) + + elif payload.fmt == "cmd": if payload.msg == "poison-pill": self.is_running.clear() break elif payload.fmt == "loc": answer = None - if "get" in payload.msg: + dec = json.loads(payload.msg) + if "get" in dec.keys(): answer = client.request(payload.msg) + print("LOC:REQU", payload.msg) else: client.send(payload.msg) + print("LOC:SENT", payload.msg) if answer is not None: + print("LOC:RECV:", answer) pl = Payload("mrk", answer, local_clock()) put_in_queue(pl, self.outbox) print("Shutting LOC down") - - -# ------------------------------------------------------------------------------ - - -class Mock(threading.Thread): - def __init__(self, host: str = "127.0.0.1", port: int = 6666): - threading.Thread.__init__(self) - self.host = host - self.port = port - self.is_running = threading.Event() - - def await_running(self): - while not self.is_running.is_set(): - pass - - @staticmethod - def read_msg(client: socket.socket) -> dict: - "parse the message" - msg = bytearray(b" ") - while True: - try: - prt = client.recv(1) - msg += prt - msg = json.loads(msg.decode("ascii")) - return msg - except json.JSONDecodeError as e: # pragma no cover - pass - except Exception as e: - print(e) - return None - - def run(self): - listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - listener.bind((self.host, self.port)) - listener.listen(1) # one unaccepted client is allowed - self.is_running.set() - while self.is_running.is_set(): - try: - client, address = listener.accept() - msg = self.read_msg(client) - if msg is None: - continue - if "cmd" in msg.keys() and "poison-pill" in msg.values(): - self.is_running.clear() - break - else: - client.send(json.dumps(msg)) - except Exception as e: - print(e) - finally: - client.shutdown(socket.SHUT_WR) - client.close() - print("Shutting LOC-MOCK down") - - def kill(self): - client = localiteClient(self.host, self.port) - msg = {"cmd": "poison-pill"} - msg = json.dumps(msg) - client.send(msg) diff --git a/test/flow/mock_localite.py b/test/flow/mock_localite.py new file mode 100644 index 0000000..228476b --- /dev/null +++ b/test/flow/mock_localite.py @@ -0,0 +1,155 @@ +import socket +import json +import pylsl +import threading +import time +from typing import List +from pylsl import local_clock +from localite.flow.payload import Queue, get_from_queue, put_in_queue, Payload +from localite.flow.loc import localiteClient, ignored_localite_messages + + +class Mock(threading.Thread): + def __init__(self, host: str = "127.0.0.1", port: int = 6666): + threading.Thread.__init__(self) + self.host = host + self.port = port + self.is_running = threading.Event() + + def await_running(self): + while not self.is_running.is_set(): # pragma no cover + pass + + @staticmethod + def read_msg(client: socket.socket) -> dict: + "parse the message" + t0 = time.time() + client.settimeout(0.1) + msg = b" " + while True: + try: + prt = client.recv(1) + msg += prt + msg = json.loads(msg.decode("ascii")) + return msg + except json.JSONDecodeError as e: # pragma no cover + pass + except socket.timeout: + return None + except Exception as e: # pragma no cover + print("MOCK:READ_MSG:", e) + return None + return None + + @staticmethod + def append(outqueue: List[dict], is_running: threading.Event): + cnt = 0 + from queue import Full + + while is_running.is_set(): + time.sleep(1) + cnt += 1 + if cnt % 5 == 0: + msg = {"coil_0_position": "None"} + else: + + msg = ignored_localite_messages[cnt] + try: + outqueue.put_nowait(msg) + except Full: + outqueue.get() + outqueue.task_done() + outqueue.put(msg) + print("MOCK:APP", outqueue.unfinished_tasks) + + def run(self): + listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + listener.bind((self.host, self.port)) + listener.settimeout(1) + listener.listen(1) # one unaccepted client is allowed + outqueue = Queue(maxsize=10) + outqueue.put({"coil_0_position": "None"}) + self.is_running.set() + appender = threading.Thread( + target=self.append, args=(outqueue, self.is_running,) + ) + appender.start() + print("Starting MOCK") + while self.is_running.is_set(): + try: + client = None + client, address = listener.accept() + print("MOCK:CLIENT", address) + msg = self.read_msg(client) + if msg is not None: + print("MOCK:RECV", msg) + if "cmd" in msg.keys() and "poison-pill" in msg.values(): + self.is_running.clear() + break + if "get" in msg.keys(): + key = msg["get"] + # this client is not the localiteClient! but a simple socket + outqueue.put({key: "answer"}) + + # always send a message, if there is none queued, wait + # until one is available + while outqueue.unfinished_tasks == 0: + time.sleep(0.01) + if client is not None: + item = outqueue.get_nowait() + outqueue.task_done() + print("MRK:REM", item, outqueue.unfinished_tasks) + msg = json.dumps(item).encode("ascii") + client.sendall(msg) + client.close() + except socket.timeout: + client = None + except ( + ConnectionError, + ConnectionAbortedError, + ConnectionResetError, + ConnectionRefusedError, + ): + client = None + except Exception as e: # pragma no cover + print("MOCK:RUN", str(e)) + + time.sleep(0.001) + print("Shutting MOCK down") + + def kill(self): + client = localiteClient(self.host, self.port) + msg = {"cmd": "poison-pill"} + msg = json.dumps(msg) + client.send(msg) + + +# if __name__ == "__main__": +# host = "127.0.0.1" +# port = 6666 +# mock = Mock(host=host, port=port) +# mock.start() +# mock.await_running() +# client = localiteClient("134.2.117.146", port) + +# inbox = Queue() +# outbox = Queue() +# loc = LOC(host=host, port=port, inbox=inbox, outbox=outbox) +# loc.start() +# loc.await_running() + +# def listen(host, port): + +# while True: +# try: +# item = get_from_queue(loc.outbox) +# if item is None: +# time.sleep(0.001) +# else: +# print(item) +# except Exception as e: +# print(e) + +# t = threading.Thread(target=listen, args=(host, port,)) +# t.start() diff --git a/test/flow/test_loc.py b/test/flow/test_loc.py index 3491587..212658c 100644 --- a/test/flow/test_loc.py +++ b/test/flow/test_loc.py @@ -1,34 +1,71 @@ from pytest import fixture -from localite.flow.loc import LOC, Mock -from localite.flow.payload import Queue +from .mock_localite import Mock +from localite.flow.loc import LOC, localiteClient, json +from localite.flow.payload import Queue, Payload, put_in_queue, get_from_queue import time host = "127.0.0.1" port = 6666 -@fixture +@fixture(scope="module") def mock(): mock = Mock(host=host, port=port) mock.start() mock.await_running() yield mock + # shut down in less than 7s mock.kill() t0 = time.time() d = 0 - while mock.is_running.is_set() and d < 5: + while mock.is_running.is_set() and d < 7: d = time.time() - t0 - assert d < 6 assert not mock.is_running.is_set() -@fixture -def loc(capsys): +@fixture(scope="module") +def loc(mock): inbox = Queue() outbox = Queue() - loc = LOC(host=host, port=port, inbox=inbox, outbox=outbox,) + loc = LOC(host=host, port=port, inbox=inbox, outbox=outbox) + loc.start() + loc.await_running() yield loc + # shut down in less than 7s + pl = Payload("cmd", "poison-pill", 12345) + put_in_queue(pl, loc.inbox) + t0 = time.time() + d = 0 + while loc.is_running.is_set() and d < 7: + d = time.time() - t0 + assert not loc.is_running.is_set() -def test_mock(mock): +def test_mock_running(mock): assert mock.is_running.is_set() + + +def test_loc_running(loc): + assert loc.is_running.is_set() + + +def test_get(loc, mock, capsys): + payload = Payload("loc", '{"get": "property"}', 12345) + put_in_queue(payload, loc.inbox) + time.sleep(0.5) + recv = [] + while loc.outbox.unfinished_tasks: + pl = get_from_queue(loc.outbox) + recv.append(pl) + if "property" in pl.msg: + break + + assert pl.fmt == "mrk" + assert pl.msg == '{"property": "answer"}' + assert json.loads(pl.msg) == {"property": "answer"} + + +def test_set(loc, mock, capsys): + pl = Payload("loc", '{"set": "test"}', 12345) + put_in_queue(pl, loc.inbox) +