Skip to content

Commit

Permalink
Create and test with mock-localite
Browse files Browse the repository at this point in the history
  • Loading branch information
agricolab committed Dec 9, 2019
1 parent acd0ebb commit 0bae814
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 109 deletions.
3 changes: 3 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# [run]
# omit =

149 changes: 49 additions & 100 deletions localite/flow/loc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,18 @@
import pylsl
import threading
import time
from typing import List
from pylsl import local_clock
from localite.flow.payload import Queue, get_from_queue, put_in_queue, Payload


ignored_localite_messages = [
{"pointer_status": "BLOCKED"},
{"reference_status": "BLOCKED"},
{"coil_1_status": "BLOCKED"},
{"coil_0_status": "BLOCKED"},
]

# %%
class localiteClient:
"""
Expand Down Expand Up @@ -41,60 +50,49 @@ def close(self):
self.socket.close()
del self.socket

def write(self, data):
self.socket.sendall(data.encode("ascii"))
def write(self, msg: str):
self.socket.sendall(msg.encode("ascii"))
return self

def read_byte(self, counter, buffer):
"""read next byte from the TCP/IP bytestream and decode as ASCII"""
if counter is None:
counter = 0
char = self.socket.recv(1).decode("ASCII")
buffer.append(char)
counter += {"{": 1, "}": -1}.get(char, 0)
return counter, buffer

def read(self):
"parse the message until it is a valid json"
counter = None
buffer = []
while counter is not 0:
counter, buffer = self.read_byte(counter, buffer)
response = "".join(buffer)
return self.decode(response)
def read(self) -> dict:
"parse the message"
msg = bytearray(b" ")
while True:
try:
prt = self.socket.recv(1)
msg += prt
msg = json.loads(msg.decode("ascii"))
return msg
except json.JSONDecodeError as e: # pragma no cover
pass
except Exception as e: # pragma no cover
print("locCLIENT:READ:", e)
return None

def listen(self):
self.connect()
msg = self.read()
self.close()
return msg

def decode(self, msg: str, index=0):
# msg = msg.replace("reason", '"reason"') # catches and interface error
try:
decoded = json.loads(msg)
except json.JSONDecodeError:
print("JSONDecodeError for:", msg)

key = list(decoded.keys())[index]
val = decoded[key]
return key, val

def send(self, msg: str):
self.connect()
self.write(msg)
self.close()

def request(self, msg="coil_0_amplitude"):
def request(self, msg: str = '{"get":"coil_0_amplitude"}'):
self.connect()
msg = '{"get":"' + msg + '"}'
self.write(msg)
key = val = ""
_, expected = self.decode(msg)
expected = json.loads(msg)["get"]
while key != expected:
key, val = self.read()
answer = self.read()
key = list(answer.keys())[0]
val = answer[key]
val = None if val == "NONE" else val
print("locCLIENT:RECV", key, val)
self.close()
return None if val == "NONE" else val
return json.dumps({key: val})


class LOC(threading.Thread):
Expand All @@ -106,90 +104,41 @@ def __init__(self, outbox: Queue, inbox: Queue, host: str, port: int = 6666):
self.port = port
self.is_running = threading.Event()

def await_running(self):
def await_running(self): # pragma no cover
while not self.is_running.is_set():
pass

def run(self):
self.is_running.set()
client = localiteClient(host=self.host, port=self.port)
print("Starting LOC")
while self.is_running.is_set():
payload = get_from_queue(self.inbox)
if payload is None:
continue
if payload.fmt == "cmd":
msg = client.listen()
print("LOC:MSG", msg)
if msg in ignored_localite_messages:
continue
else:
pl = Payload("mrk", msg, local_clock())
put_in_queue(pl, self.outbox)

elif payload.fmt == "cmd":
if payload.msg == "poison-pill":
self.is_running.clear()
break
elif payload.fmt == "loc":
answer = None
if "get" in payload.msg:
dec = json.loads(payload.msg)
if "get" in dec.keys():
answer = client.request(payload.msg)
print("LOC:REQU", payload.msg)
else:
client.send(payload.msg)
print("LOC:SENT", payload.msg)
if answer is not None:
print("LOC:RECV:", answer)
pl = Payload("mrk", answer, local_clock())
put_in_queue(pl, self.outbox)

print("Shutting LOC down")


# ------------------------------------------------------------------------------


class Mock(threading.Thread):
def __init__(self, host: str = "127.0.0.1", port: int = 6666):
threading.Thread.__init__(self)
self.host = host
self.port = port
self.is_running = threading.Event()

def await_running(self):
while not self.is_running.is_set():
pass

@staticmethod
def read_msg(client: socket.socket) -> dict:
"parse the message"
msg = bytearray(b" ")
while True:
try:
prt = client.recv(1)
msg += prt
msg = json.loads(msg.decode("ascii"))
return msg
except json.JSONDecodeError as e: # pragma no cover
pass
except Exception as e:
print(e)
return None

def run(self):
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listener.bind((self.host, self.port))
listener.listen(1) # one unaccepted client is allowed
self.is_running.set()
while self.is_running.is_set():
try:
client, address = listener.accept()
msg = self.read_msg(client)
if msg is None:
continue
if "cmd" in msg.keys() and "poison-pill" in msg.values():
self.is_running.clear()
break
else:
client.send(json.dumps(msg))
except Exception as e:
print(e)
finally:
client.shutdown(socket.SHUT_WR)
client.close()
print("Shutting LOC-MOCK down")

def kill(self):
client = localiteClient(self.host, self.port)
msg = {"cmd": "poison-pill"}
msg = json.dumps(msg)
client.send(msg)
155 changes: 155 additions & 0 deletions test/flow/mock_localite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import socket
import json
import pylsl
import threading
import time
from typing import List
from pylsl import local_clock
from localite.flow.payload import Queue, get_from_queue, put_in_queue, Payload
from localite.flow.loc import localiteClient, ignored_localite_messages


class Mock(threading.Thread):
def __init__(self, host: str = "127.0.0.1", port: int = 6666):
threading.Thread.__init__(self)
self.host = host
self.port = port
self.is_running = threading.Event()

def await_running(self):
while not self.is_running.is_set(): # pragma no cover
pass

@staticmethod
def read_msg(client: socket.socket) -> dict:
"parse the message"
t0 = time.time()
client.settimeout(0.1)
msg = b" "
while True:
try:
prt = client.recv(1)
msg += prt
msg = json.loads(msg.decode("ascii"))
return msg
except json.JSONDecodeError as e: # pragma no cover
pass
except socket.timeout:
return None
except Exception as e: # pragma no cover
print("MOCK:READ_MSG:", e)
return None
return None

@staticmethod
def append(outqueue: List[dict], is_running: threading.Event):
cnt = 0
from queue import Full

while is_running.is_set():
time.sleep(1)
cnt += 1
if cnt % 5 == 0:
msg = {"coil_0_position": "None"}
else:

msg = ignored_localite_messages[cnt]
try:
outqueue.put_nowait(msg)
except Full:
outqueue.get()
outqueue.task_done()
outqueue.put(msg)
print("MOCK:APP", outqueue.unfinished_tasks)

def run(self):
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listener.bind((self.host, self.port))
listener.settimeout(1)
listener.listen(1) # one unaccepted client is allowed
outqueue = Queue(maxsize=10)
outqueue.put({"coil_0_position": "None"})
self.is_running.set()
appender = threading.Thread(
target=self.append, args=(outqueue, self.is_running,)
)
appender.start()
print("Starting MOCK")
while self.is_running.is_set():
try:
client = None
client, address = listener.accept()
print("MOCK:CLIENT", address)
msg = self.read_msg(client)
if msg is not None:
print("MOCK:RECV", msg)
if "cmd" in msg.keys() and "poison-pill" in msg.values():
self.is_running.clear()
break
if "get" in msg.keys():
key = msg["get"]
# this client is not the localiteClient! but a simple socket
outqueue.put({key: "answer"})

# always send a message, if there is none queued, wait
# until one is available
while outqueue.unfinished_tasks == 0:
time.sleep(0.01)
if client is not None:
item = outqueue.get_nowait()
outqueue.task_done()
print("MRK:REM", item, outqueue.unfinished_tasks)
msg = json.dumps(item).encode("ascii")
client.sendall(msg)
client.close()
except socket.timeout:
client = None
except (
ConnectionError,
ConnectionAbortedError,
ConnectionResetError,
ConnectionRefusedError,
):
client = None
except Exception as e: # pragma no cover
print("MOCK:RUN", str(e))

time.sleep(0.001)
print("Shutting MOCK down")

def kill(self):
client = localiteClient(self.host, self.port)
msg = {"cmd": "poison-pill"}
msg = json.dumps(msg)
client.send(msg)


# if __name__ == "__main__":
# host = "127.0.0.1"
# port = 6666
# mock = Mock(host=host, port=port)
# mock.start()
# mock.await_running()
# client = localiteClient("134.2.117.146", port)

# inbox = Queue()
# outbox = Queue()
# loc = LOC(host=host, port=port, inbox=inbox, outbox=outbox)
# loc.start()
# loc.await_running()

# def listen(host, port):

# while True:
# try:
# item = get_from_queue(loc.outbox)
# if item is None:
# time.sleep(0.001)
# else:
# print(item)
# except Exception as e:
# print(e)

# t = threading.Thread(target=listen, args=(host, port,))
# t.start()
Loading

0 comments on commit 0bae814

Please sign in to comment.