Skip to content

Commit

Permalink
STY: formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
dmgav committed Apr 20, 2024
1 parent 0b95e22 commit cd429f6
Showing 1 changed file with 33 additions and 118 deletions.
151 changes: 33 additions & 118 deletions src/bluesky/tests/test_zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ def test_proxy_script():
assert p.returncode == 0


def test_zmq_basic(RE, hw):
# COMPONENT 1
# Run a 0MQ proxy on a separate process.

def _start_proxy_proc():
def start_proxy(start_event):
start_event.set()
Proxy(5567, 5568).start()
Expand All @@ -35,39 +32,53 @@ def start_proxy(start_event):
assert proxy_start_event.is_set()
time.sleep(0.2)

# COMPONENT 2
# Run a Publisher and a RunEngine in this main process.

p = Publisher("127.0.0.1:5567") # noqa
RE.subscribe(p)
return proxy_proc

# COMPONENT 3
# Run a RemoteDispatcher on another separate process. Pass the documents
# it receives over a Queue to this process, so we can count them for our
# test.

def _start_dispatcher_proc(prefix=None):
def make_and_start_dispatcher(queue, start_event):
def put_in_queue(name, doc):
print("putting ", name, "in queue")
start_event.set()
queue.put((name, doc))

d = RemoteDispatcher("127.0.0.1:5568")
kwargs = {"prefix": prefix} if prefix else {}
d = RemoteDispatcher("127.0.0.1:5568", **kwargs)
d.subscribe(put_in_queue)
print("REMOTE IS READY TO START")
d.loop.call_later(9, d.stop)
start_event.set()
d.start()


dispatcher_start_event = multiprocess.Event()
queue = multiprocess.Queue()
dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue, dispatcher_start_event))
dispatcher_proc = multiprocess.Process(
target=make_and_start_dispatcher, daemon=True, args=(queue, dispatcher_start_event)
)
dispatcher_proc.start()
dispatcher_start_event.wait(timeout=5)
assert dispatcher_start_event.is_set()
time.sleep(0.2)

return dispatcher_proc, queue


def test_zmq_basic(RE, hw):
# COMPONENT 1
# Run a 0MQ proxy on a separate process.
proxy_proc = _start_proxy_proc()

# COMPONENT 2
# Run a Publisher and a RunEngine in this main process.
p = Publisher("127.0.0.1:5567") # noqa
RE.subscribe(p)

# COMPONENT 3
# Run a RemoteDispatcher on another separate process. Pass the documents
# it receives over a Queue to this process, so we can count them for our
# test.
dispatcher_proc, queue = _start_dispatcher_proc()

# Generate two documents. The Publisher will send them to the proxy
# device over 5567, and the proxy will send them to the
# RemoteDispatcher over 5568. The RemoteDispatcher will push them into
Expand Down Expand Up @@ -153,49 +164,17 @@ def test_zmq_RD_ports_spec(host):
def test_zmq_no_RE_basic(RE):
# COMPONENT 1
# Run a 0MQ proxy on a separate process.

def start_proxy(start_event):
start_event.set()
Proxy(5567, 5568).start()

proxy_start_event = multiprocess.Event()
proxy_proc = multiprocess.Process(target=start_proxy, args=(proxy_start_event,), daemon=True)
proxy_proc.start()
proxy_start_event.wait(timeout=5)
assert proxy_start_event.is_set()
time.sleep(0.2)
proxy_proc = _start_proxy_proc()

# COMPONENT 2
# Run a Publisher and a RunEngine in this main process.

p = Publisher("127.0.0.1:5567") # noqa

# COMPONENT 3
# Run a RemoteDispatcher on another separate process. Pass the documents
# it receives over a Queue to this process, so we can count them for our
# test.

def make_and_start_dispatcher(queue, start_event):
def put_in_queue(name, doc):
print("putting ", name, "in queue")
start_event.set()
queue.put((name, doc))

d = RemoteDispatcher("127.0.0.1:5568")
d.subscribe(put_in_queue)
print("REMOTE IS READY TO START")
d.loop.call_later(9, d.stop)
start_event.set()
d.start()


dispatcher_start_event = multiprocess.Event()
queue = multiprocess.Queue()
dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue, dispatcher_start_event))
dispatcher_proc.start()
dispatcher_start_event.wait(timeout=5)
assert dispatcher_start_event.is_set()
time.sleep(0.2)
dispatcher_proc, queue = _start_dispatcher_proc()

# Generate two documents. The Publisher will send them to the proxy
# device over 5567, and the proxy will send them to the
Expand Down Expand Up @@ -234,16 +213,7 @@ def test_zmq_no_RE_newserializer(RE):

# COMPONENT 1
# Run a 0MQ proxy on a separate process.
def start_proxy(start_event):
start_event.set()
Proxy(5567, 5568).start()

proxy_start_event = multiprocess.Event()
proxy_proc = multiprocess.Process(target=start_proxy, args=(proxy_start_event,), daemon=True)
proxy_proc.start()
proxy_start_event.wait(timeout=5)
assert proxy_start_event.is_set()
time.sleep(0.2)
proxy_proc = _start_proxy_proc()

# COMPONENT 2
# Run a Publisher and a RunEngine in this main process.
Expand All @@ -253,27 +223,7 @@ def start_proxy(start_event):
# Run a RemoteDispatcher on another separate process. Pass the documents
# it receives over a Queue to this process, so we can count them for our
# test.
def make_and_start_dispatcher(queue, start_event):
def put_in_queue(name, doc):
print("putting ", name, "in queue")
start_event.set()
queue.put((name, doc))

d = RemoteDispatcher("127.0.0.1:5568")
d.subscribe(put_in_queue)
print("REMOTE IS READY TO START")
d.loop.call_later(9, d.stop)
start_event.set()
d.start()


dispatcher_start_event = multiprocess.Event()
queue = multiprocess.Queue()
dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue, dispatcher_start_event))
dispatcher_proc.start()
dispatcher_start_event.wait(timeout=5)
assert dispatcher_start_event.is_set()
time.sleep(0.2)
dispatcher_proc, queue = _start_dispatcher_proc()

# Generate two documents. The Publisher will send them to the proxy
# device over 5567, and the proxy will send them to the
Expand Down Expand Up @@ -310,16 +260,7 @@ def local_cb(name, doc):
def test_zmq_prefix(RE, hw):
# COMPONENT 1
# Run a 0MQ proxy on a separate process.
def start_proxy(start_event):
start_event.set()
Proxy(5567, 5568).start()

proxy_start_event = multiprocess.Event()
proxy_proc = multiprocess.Process(target=start_proxy, args=(proxy_start_event,), daemon=True)
proxy_proc.start()
proxy_start_event.wait(timeout=5)
assert proxy_start_event.is_set()
time.sleep(0.2)
proxy_proc = _start_proxy_proc()

# COMPONENT 2
# Run a Publisher and a RunEngine in this main process.
Expand All @@ -332,33 +273,7 @@ def start_proxy(start_event):
# Run a RemoteDispatcher on another separate process. Pass the documents
# it receives over a Queue to this process, so we can count them for our
# test.

def make_and_start_dispatcher(queue, start_event):
def put_in_queue(name, doc):
print("putting ", name, "in queue")
start_event.set()
queue.put((name, doc))

d = RemoteDispatcher("127.0.0.1:5568", prefix=b"sb")
d.subscribe(put_in_queue)
print("REMOTE IS READY TO START")
d.loop.call_later(9, d.stop)
start_event.set()
d.start()


dispatcher_start_event = multiprocess.Event()
queue = multiprocess.Queue()
dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue, dispatcher_start_event))
dispatcher_proc.start()
dispatcher_start_event.wait(timeout=5)
assert dispatcher_start_event.is_set()
time.sleep(0.2)

# queue = multiprocess.Queue()
# dispatcher_proc = multiprocess.Process(target=make_and_start_dispatcher, daemon=True, args=(queue,))
# dispatcher_proc.start()
# time.sleep(5) # As above, give this plenty of time to start.
dispatcher_proc, queue = _start_dispatcher_proc(prefix=b"sb")

# Generate two documents. The Publisher will send them to the proxy
# device over 5567, and the proxy will send them to the
Expand Down

0 comments on commit cd429f6

Please sign in to comment.