Skip to content

Commit

Permalink
Merge pull request #92 from jacquelinegarrahan/bug-fixes
Browse files Browse the repository at this point in the history
Add event for model execution shutdown
  • Loading branch information
jacquelinegarrahan authored Feb 9, 2022
2 parents b110f65 + bb029aa commit ccd3429
Showing 1 changed file with 24 additions and 16 deletions.
40 changes: 24 additions & 16 deletions lume_epics/epics_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import os
from typing import Dict, Mapping, Union, List
from threading import Thread
from threading import Thread, Event
from queue import Full, Empty

import pcaspy
Expand Down Expand Up @@ -142,11 +142,13 @@ def __init__(
self._running_indicator = multiprocessing.Value("b", False)
self._process_exit_events = []

# event for shutdown on model execution exceptions
self._model_exec_exit_event = Event()

# we use the running marker to make sure pvs + ca don't just keep adding queue elements
self.comm_thread = Thread(
target=self.run_comm_thread,
kwargs={
"model_kwargs": model_kwargs,
"in_queue": self.in_queue,
"out_queues": self.out_queues,
"running_indicator": self._running_indicator,
Expand Down Expand Up @@ -216,7 +218,6 @@ def run_comm_thread(
self,
*,
running_indicator: multiprocessing.Value,
model_kwargs={},
in_queue: multiprocessing.Queue = None,
out_queues: Dict[str, multiprocessing.Queue] = None,
):
Expand All @@ -225,8 +226,6 @@ def run_comm_thread(
Arguments:
model_class: Model class to be executed.
model_kwargs (dict): Dictionary of model keyword arguments.
in_queue (multiprocessing.Queue):
out_queues (Dict[str: multiprocessing.Queue]): Maps protocol to output assignment queue.
Expand Down Expand Up @@ -270,17 +269,22 @@ def run_comm_thread(
queue.put({"input_variables": inputs})

model_input = list(self.input_variables.values())
predicted_output = model.evaluate(model_input)

for protocol, queue in out_queues.items():
outputs = [
var
for var in predicted_output
if var.name in self._pva_fields
or self._epics_config[var.name]["protocol"]
in [protocol, "both"]
]
queue.put({"output_variables": outputs}, timeout=0.1)
try:
predicted_output = model.evaluate(model_input)

for protocol, queue in out_queues.items():
outputs = [
var
for var in predicted_output
if var.name in self._pva_fields
or self._epics_config[var.name]["protocol"]
in [protocol, "both"]
]
queue.put({"output_variables": outputs}, timeout=0.1)
except Exception as e:
print(e)
self._model_exec_exit_event.set()

running_indicator.value = False

Expand Down Expand Up @@ -312,7 +316,11 @@ def start(self, monitor: bool = True) -> None:
if monitor:
try:
while not any(
[exit_event.is_set() for exit_event in self._process_exit_events]
[
exit_event.is_set()
for exit_event in self._process_exit_events
+ [self._model_exec_exit_event]
]
):
time.sleep(0.1)

Expand Down

0 comments on commit ccd3429

Please sign in to comment.