Skip to content

Commit

Permalink
Propagate write commands through the generic adapter system (WIP).
Browse files Browse the repository at this point in the history
  • Loading branch information
ajgdls authored and GDYendell committed Jul 15, 2024
1 parent 2f93f7a commit 9f99c4e
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 7 deletions.
4 changes: 2 additions & 2 deletions python/src/odin_data/control/frame_processor_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ def get(self, path, request):
status_code = 200
content_type = "application/json"

logging.error("{}".format(path))
logging.debug("Get Request: {}".format(path))
try:
response = self._fp_adapter._controller.get(path, wants_metadata(request))
logging.error("{}".format(response))
logging.debug("Get Response: {}".format(response))
except ParameterTreeError as param_error:
response = {
"response": "OdinDatatAdapter GET error: {}".format(param_error)
Expand Down
60 changes: 59 additions & 1 deletion python/src/odin_data/control/frame_processor_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(self, name):
{},
),
},
#'write': (lambda: 1, self.queue_write, {})
"write": (lambda: False, self.execute_write, {})
},
},
mutable=True,
Expand All @@ -63,11 +63,69 @@ def initialize(self, odin_adapter_frs, odin_adapter_fps):
self._odin_adapter_frs = odin_adapter_frs
self._odin_adapter_fps = odin_adapter_fps

self._odin_adapter_fps._controller.merge_external_tree("config", self._fp_params)

# Set up the rank of the individual FP applications
# This must be called after the adapters have started
self._thread = threading.Thread(target=self.init_rank)
self._thread.start()

def execute_write(self, value):
# Queue the write command
logging.error("Writing command: {}".format(value))

if value:
# Before attempting to write files, make some simple error checks

# Check if we have a valid buffer status from the FR adapter

# TODO: Need to check FR buffer status
# valid, reason = self.check_fr_status()
# if not valid:
# raise RuntimeError(reason)

# Check the file prefix is not empty
if str(self._file_prefix) == '':
raise RuntimeError("File prefix must not be empty")

# First setup the rank for the frameProcessor applications
self.setup_rank()

processes = self._odin_adapter_fps._controller.get("count", False)["count"]
try:
for rank in range(processes):
# Setup the number of processes and the rank for each client
config = {
'hdf': {
'frames': self._frames
}
}
logging.error("Sending config to FP odin adapter %i: %s", rank, config)
self._odin_adapter_fps._controller.put(f"{rank}/config", config)
config = {
'hdf': {
'acquisition_id': self._acquisition_id,
'file': {
'path': str(self._file_path),
'name': str(self._file_prefix),
'extension': str(self._file_extension)
}
}
}
logging.error("Sending config to FP odin adapter %i: %s", rank, config)
self._odin_adapter_fps._controller.put(f"{rank}/config", config)
except Exception as err:
logging.debug("Failed to send rank information to FP applications")
logging.error("Error: %s", err)
try:
config = {'hdf': {'write': value}}
for rank in range(processes):
logging.error("Sending config to FP odin adapter %i: %s", rank, config)
self._odin_adapter_fps._controller.put(f"{rank}/config", config)
except Exception as err:
logging.debug("Failed to send write command to FP applications")
logging.error("Error: %s", err)

def init_rank(self):
# Send the setup rank after allowing time for the
# other adapters to run up.
Expand Down
17 changes: 13 additions & 4 deletions python/src/odin_data/control/odin_data_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,32 +37,38 @@ def __init__(self, name, endpoints, update_interval=0.5):
self._clients.append(IpcTornadoClient(ep["ip_address"], ep["port"]))
self._client_connections.append(False)

_tree = {
self._tree = {
"api": (lambda: self._api, None, {}),
"module": (lambda: self._name, None, {}),
"endpoints": [],
"count": (lambda: len(self._clients), None, {}),
"update_interval": (lambda: self._update_interval, None, {}),
}
for idx, endpoint in enumerate(self._endpoints):
_tree["endpoints"].append(
self._tree["endpoints"].append(
# Note the default here binds unique variables into each closure
{k: (lambda v=v: v, None, {}) for k, v in endpoint.items()}
)
for idx, _client in enumerate(self._clients):
_tree[str(idx)] = {
self._tree[str(idx)] = {
"status": {"error": (lambda: self._error, None, {})},
"config": {},
}
# TODO: Consider renaming this
self._params = ParameterTree(_tree, mutable=True)
self._params = ParameterTree(self._tree, mutable=True)

# Create the status loop handling thread
self._status_running = True
self._status_lock = threading.Lock()
self._status_thread = threading.Thread(target=self.update_loop)
self._status_thread.start()

def merge_external_tree(self, path, tree):
# First we need to insert the new parameter tree
self._tree[path] = tree
# Next, we must re-build the complete parameter tree
self._params = ParameterTree(self._tree, mutable=True)

@property
def first_update(self):
return self._first_update
Expand Down Expand Up @@ -136,6 +142,9 @@ def update_loop(self):
f"{index}/status", client.parameters["status"]
)
if "config" in client.parameters:
if "hdf" in client.parameters["config"]:
# Hard code a write option
client.parameters["config"]["hdf"]["write"] = False
self._params.replace(
f"{index}/config", client.parameters["config"]
)
Expand Down

0 comments on commit 9f99c4e

Please sign in to comment.