diff --git a/docs/sphinx.yaml b/docs/sphinx.yaml index e48e53b8..c70f6e78 100644 --- a/docs/sphinx.yaml +++ b/docs/sphinx.yaml @@ -7,6 +7,7 @@ subtrees: - file: user/tutorials/index entries: - file: user/tutorials/build + - file: user/tutorials/control - file: user/tutorials/deploy - file: user/tutorials/run - file: user/tutorials/tools diff --git a/docs/user/index.md b/docs/user/index.md index 48abb976..a2f0c5f5 100644 --- a/docs/user/index.md +++ b/docs/user/index.md @@ -10,6 +10,7 @@ - [Build odin-data](./tutorials/build) - [Run the example](./tutorials/run) - [Create an odin-data deployment](./tutorials/deploy) +- [Control an odin-data instance](./tutorials/control) +++ diff --git a/docs/user/tutorials/control.md b/docs/user/tutorials/control.md new file mode 100644 index 00000000..07e27c35 --- /dev/null +++ b/docs/user/tutorials/control.md @@ -0,0 +1,318 @@ +# Control + +Both the frameReceiver and frameProcessor can, if given the appropriate starting config, +run an acquisition and shut down when it is complete. This is made possible mainly to +provide an example. The primary use case is to run the applications up in some useful +default state and then use the control interfaces to configure and runacquisitions, +interogate status and eventually shut down. + +The control interfaces to the applications are ZMQ sockets, as defined by the start up +parameters. These control interfaces should be integrated into a wider application, such +as an odin-control server, or a web page for convenient control of the applications. For +debugging and development, minimal support for this is provided by the +`odin_zmq_client.py` script. For example, if we [run](run) the `test/dummy_example` +frameProcessor and frameReceiver, we can interrogate them. + +## Status + +We can ask the frameReceiver and frameProcessor for their entire `status` + +``````{dropdown} All Status +```bash +(venv) $ python -m odin_zmq_client status +Request: +{ + "msg_type": "cmd", + "id": 1, + "msg_val": "status", + "params": {}, + "timestamp": "2022-03-20T18:47:58.440432" +} + +Response: +{ + "id": 1, + "msg_type": "ack", + "msg_val": "status", + "params": { + "buffers": { + "empty": 292, + "mapped": 0, + "total": 292 + }, + "decoder": { + "name": "DummyUDPFrameDecoder", + "packets_dropped": 0, + "packets_lost": 0, + "packets_received": 0, + "status_get_count": 1385 + }, + "frames": { + "dropped": 0, + "received": 0, + "released": 0, + "timedout": 0 + }, + "status": { + "buffer_manager_configured": true, + "configuration_complete": true, + "decoder_configured": true, + "ipc_configured": true, + "rx_thread_configured": true + } + }, + "timestamp": "2022-04-10T19:11:09.503441" +} + +(venv) $ python -m odin_zmq_client status --port 5004 +Request: +{ + "msg_type": "cmd", + "id": 1, + "msg_val": "status", + "params": {}, + "timestamp": "2022-03-20T18:47:58.440432" +} + +Response: +{ + "id": 1, + "msg_type": "ack", + "msg_val": "status", + "params": { + "dummy": { + "packets_lost": 0, + "timing": { + "last_process": 0, + "max_process": 0, + "mean_process": 0 + } + }, + "hdf": { + "acquisition_id": "test_1", + "file_name": "test_1_000001.h5", + "file_path": "/tmp", + "frames_max": 10, + "frames_processed": 0, + "frames_written": 0, + "processes": 1, + "rank": 0, + "timeout_active": false, + "timing": { + "last_close": 0, + "last_create": 638, + "last_flush": 0, + "last_process": 0, + "last_write": 0, + "max_close": 0, + "max_create": 638, + "max_flush": 0, + "max_process": 0, + "max_write": 0, + "mean_close": 0, + "mean_create": 638, + "mean_flush": 0, + "mean_process": 0, + "mean_write": 0 + }, + "writing": true + }, + "plugins": { + "names": [ + "dummy", + "hdf" + ] + }, + "shared_memory": { + "configured": true + } + }, + "timestamp": "2022-04-10T19:12:27.639587" +} +``` +`````` + +## Specific Status + +We can ask the frameReceiver for a specific `status` parameter, e.g. the number of +dropped frames: + +``````{dropdown} Frames Dropped +```bash +(venv) $ python -m odin_zmq_client status frames/dropped +Request: +{ + "msg_type": "cmd", + "id": 1, + "msg_val": "status", + "params": {}, + "timestamp": "2022-03-20T18:47:58.440432" +} + +Response: +{ + "frames/dropped": 0, + "id": 1, + "msg_type": "ack", + "msg_val": "status", + "timestamp": "2022-04-10T19:13:38.743083" +} +``` +`````` + +Or ask the frameProcessor how many frames it has written to the current file: + +``````{dropdown} Frames Written +```bash +(venv) $ python -m odin_zmq_client status hdf/frames_written --port 5004 +Request: +{ + "msg_type": "cmd", + "id": 1, + "msg_val": "status", + "params": {}, + "timestamp": "2022-03-20T18:47:58.440432" +} + +Response: +{ + "hdf/frames_written": 0, + "id": 1, + "msg_type": "ack", + "msg_val": "status", + "timestamp": "2022-04-10T19:15:17.187217" +} +``` +`````` + +## Configure + +We can read and write `configure` parameters: + +``````{dropdown} Configure +```bash +(venv) $ python -m odin_zmq_client configure hdf/process/rank --port 5004 +Request: +{ + "msg_type": "cmd", + "id": 1, + "msg_val": "request_configuration", + "params": {}, + "timestamp": "2022-03-20T18:47:58.440432" +} + +Response: +{ + "hdf/process/rank": 0, + "id": 1, + "msg_type": "ack", + "msg_val": "request_configuration", + "timestamp": "2022-04-10T19:26:44.105480" +} + +(venv) $ python -m odin_zmq_client configure hdf/process/rank 2 --port 5004 +Request: +{ + "msg_type": "cmd", + "id": 1, + "msg_val": "configure", + "params": {"hdf": {"process": {"rank": 2}}}, + "timestamp": "2022-03-20T18:47:58.440432" +} + +Response: +{ + "params": {}, + "msg_type": "ack", + "msg_val": "configure", + "id": 1, + "timestamp": "2022-04-10T19:33:56.171776" +} + +(venv) $ python -m odin_zmq_client configure hdf/process/rank --port 5004 +Request: +{ + "msg_type": "cmd", + "id": 1, + "msg_val": "request_configuration", + "params": {}, + "timestamp": "2022-03-20T18:47:58.440432" +} + +Response: +{ + "hdf/process/rank": 2, + "id": 1, + "msg_type": "ack", + "msg_val": "request_configuration", + "timestamp": "2022-04-10T19:33:59.933050" +} +``` +`````` + +## Commands + +Some configure parameters are effectively commands, i.e. they can only be written to and +not read back. Some may have a corresponding status parameter, though. For example to +configure the FileWriterPlugin to start and stop writing: + +``````{dropdown} Command +```bash +(venv) $ python -m odin_zmq_client status hdf/writing --port 5004 +Request: +{ + "msg_type": "cmd", + "id": 1, + "msg_val": "status", + "params": {}, + "timestamp": "2022-03-20T18:47:58.440432" +} + +Response: +{ + "hdf/writing": true, + "id": 1, + "msg_type": "ack", + "msg_val": "status", + "timestamp": "2022-04-10T19:37:35.032422" +} + +(venv) $ python -m odin_zmq_client configure hdf/write false --port 5004 +Request: +{ + "msg_type": "cmd", + "id": 1, + "msg_val": "configure", + "params": {"hdf": {"write": false}}, + "timestamp": "2022-03-20T18:47:58.440432" +} + +Response: +{ + "params": {}, + "msg_type": "ack", + "msg_val": "configure", + "id": 1, + "timestamp": "2022-04-10T19:38:27.425760" +} + +(venv) $ python -m odin_zmq_client status hdf/writing --port 5004 +Request: +{ + "msg_type": "cmd", + "id": 1, + "msg_val": "status", + "params": {}, + "timestamp": "2022-03-20T18:47:58.440432" +} + +Response: +{ + "hdf/writing": false, + "id": 1, + "msg_type": "ack", + "msg_val": "status", + "timestamp": "2022-04-10T19:38:50.759214" +} +``` +`````` diff --git a/python/src/odin_data/client.py b/python/src/odin_data/client.py index c674352b..ed02d89f 100644 --- a/python/src/odin_data/client.py +++ b/python/src/odin_data/client.py @@ -1,7 +1,7 @@ """Simple command-line client for interacting with and debugging odin-data applications. -This module implements a simple command-line client that can be used to interact with and debug -odin-data applications via their control interface. +This module implements a simple command-line client that can be used to interact with +and debug odin-data applications via their control interface. Tim Nicholls, STFC Detector Systems Software Group """ @@ -39,10 +39,12 @@ def __init__(self): # create console handler and set level to debug ch = logging.StreamHandler(sys.stdout) - ch.setLevel(logging.DEBUG) + ch.setLevel(self.args.log_level) # create formatter - formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s - %(message)s') + formatter = logging.Formatter( + "%(asctime)s %(levelname)s %(name)s - %(message)s" + ) # add formatter to ch ch.setFormatter(formatter) @@ -59,19 +61,51 @@ def __init__(self): def _parse_arguments(self, prog_name=sys.argv[0]): """Parse arguments from the command line.""" - parser = argparse.ArgumentParser(prog=prog_name, description='odin-data application client') - parser.add_argument('--ctrl', type=str, default='tcp://127.0.0.1:5000', - dest='ctrl_endpoint', - help='Specify the IPC control channel endpoint URL') - parser.add_argument('--config', type=argparse.FileType('r'), dest='config_file', nargs='?', - default=None, const=sys.stdin, - help='Specify JSON configuration file to send as configure command') - parser.add_argument('--request-config', action='store_true', - help='Request configuration from odin-data application') - parser.add_argument('--status', action='store_true', - help='Request a status report from the odin-data application') - parser.add_argument('--shutdown', action='store_true', - help='Instruct the odin-data application to shut down') + parser = argparse.ArgumentParser( + prog=prog_name, description="odin-data application client" + ) + parser.add_argument( + "--ctrl", + type=str, + default="tcp://127.0.0.1:5000", + dest="ctrl_endpoint", + help="Specify the IPC control channel endpoint URL", + ) + parser.add_argument( + "--config", + type=argparse.FileType("r"), + dest="config_file", + nargs="?", + default=None, + const=sys.stdin, + help="Specify JSON configuration file to send as configure command", + ) + parser.add_argument( + "--request-config", + action="store_true", + help="Request configuration from odin-data application", + ) + parser.add_argument( + "--status", + action="store_true", + help="Request a status report from the odin-data application", + ) + parser.add_argument( + "--shutdown", + action="store_true", + help="Instruct the odin-data application to shut down", + ) + # parser.add_argument( + # "params", + # default="", + # help="Filter tree of parameters in reply", + # ) + parser.add_argument( + "--log-level", + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + default="CRITICAL", + help="Set the logging level", + ) args = parser.parse_args() return args @@ -79,37 +113,42 @@ def _parse_arguments(self, prog_name=sys.argv[0]): def _next_msg_id(self): """Return the next IPC message ID to use.""" self._msg_id += 1 - return (self._msg_id) + return self._msg_id def run(self): """Run the odin-data client.""" self.logger.info("odin-data client starting up") - self.logger.debug("Control IPC channel has identity {}".format(self.ctrl_channel.identity)) + self.logger.debug( + "Control IPC channel has identity {}".format(self.ctrl_channel.identity) + ) if self.args.config_file is not None: - self.do_config_cmd(self.args.config_file) + reply = self.do_config_cmd(self.args.config_file) if self.args.request_config: - self.do_request_config_cmd() + reply = self.do_request_config_cmd() if self.args.status: - self.do_status_cmd() + reply = self.do_status_cmd() if self.args.shutdown: - self.do_shutdown_cmd() + reply = self.do_shutdown_cmd() + + print(reply) def do_config_cmd(self, config_file): - """Send a configure command to odin-data with the specified JSON configuration file.""" + """Send a configure command populated from a JSON configuration file""" try: config_params = json.load(config_file) - config_msg = IpcMessage('cmd', 'configure', id=self._next_msg_id()) + config_msg = IpcMessage("cmd", "configure", id=self._next_msg_id()) for param, value in config_params.items(): config_msg.set_param(param, value) self.logger.info( - "Sending configure command to the odin-data application with specified parameters" + "Sending configure command to the odin-data application" + " with specified parameters" ) self.ctrl_channel.send(config_msg.encode()) self.await_response() @@ -119,32 +158,36 @@ def do_config_cmd(self, config_file): def do_status_cmd(self): """Send a status command to odin-data.""" - status_msg = IpcMessage('cmd', 'status', id=self._next_msg_id()) + status_msg = IpcMessage("cmd", "status", id=self._next_msg_id()) self.logger.info("Sending status request to the odin-data application") self.ctrl_channel.send(status_msg.encode()) - self.await_response() + return self.await_response() def do_request_config_cmd(self): """Send a request configurtion command to odin-data.""" - status_msg = IpcMessage('cmd', 'request_configuration', id=self._next_msg_id()) + status_msg = IpcMessage("cmd", "request_configuration", id=self._next_msg_id()) self.logger.info("Sending configuration request to the odin-data application") self.ctrl_channel.send(status_msg.encode()) - self.await_response() + return self.await_response() def do_shutdown_cmd(self): """Send a shutdown command to odin-data.""" - shutdown_msg = IpcMessage('cmd', 'shutdown', id=self._next_msg_id()) + shutdown_msg = IpcMessage("cmd", "shutdown", id=self._next_msg_id()) self.logger.info("Sending shutdown command to the odin-data application") self.ctrl_channel.send(shutdown_msg.encode()) - self.await_response() + return self.await_response() + + # def _send_request(self, msg_val: str) -> str: def await_response(self, timeout_ms=1000): """Await a response to a client command.""" - pollevts = self.ctrl_channel.poll(1000) + pollevts = self.ctrl_channel.poll(timeout_ms) if pollevts == IpcChannel.POLLIN: reply = IpcMessage(from_str=self.ctrl_channel.recv()) self.logger.info("Got response: {}".format(reply)) + return reply + def main(): """Run the odin-data client.""" diff --git a/python/src/odin_data/odin_zmq_client.py b/python/src/odin_data/odin_zmq_client.py new file mode 100644 index 00000000..63093ea9 --- /dev/null +++ b/python/src/odin_data/odin_zmq_client.py @@ -0,0 +1,141 @@ +from enum import Enum +import json +from typing import Any, List + +import typer +import zmq + + +MESSAGE_TEMPLATE = """{{\ +\"msg_type\": \"cmd\", \ +\"id\": 1, \ +\"msg_val\": \"{msg_val}\", \ +\"params\": {params}, \ +\"timestamp\": \"2022-03-20T18:47:58.440432\"\ +}} +""" +CONFIGURE = "configure" +REQUEST_CONFIGURATION = "request_configuration" +PARAMS = "params" + +class MsgVal(str, Enum): + status = "status" + configure = CONFIGURE + request_configuration = REQUEST_CONFIGURATION + request_version = "request_version" + + @classmethod + @property + def choices(cls) -> List[str]: + return cls.__members__.values() + + +def dumps(json_str: str) -> dict: + return json.dumps(json_str, sort_keys=True, indent=4) + + +class OdinZMQClient: + def __init__(self, endpoint: str) -> None: + self.context = zmq.Context() + self.control_socket = self.context.socket(zmq.DEALER) + self.control_socket.connect(f"tcp://{endpoint}") + + def __del__(self): + self.control_socket.close(linger=1000) + self.context.term() + + def send_request(self, request_type: str, params: dict = None) -> str: + if params is None: + params = dict() + + request = MESSAGE_TEMPLATE.format( + msg_val=request_type, params=json.dumps(params) + ) + + print(f"Request:\n{request}") + self.control_socket.send_string(request) + response = self.control_socket.recv_json() + + return response + + def configure(self, parameter_path: str, value: Any) -> str: + stem = parameter_path[:-1] + leaf = parameter_path[-1] + + params = {} + node = params + for field in stem: + if isinstance(node, dict): + node[field] = {} + node = node[field] + node[leaf] = value + + return self.send_request(CONFIGURE, params) + + +def json_value(value: str) -> Any: + """Parse value based on formatting user provided""" + if value is None: + return None + + try: + # int, float, list + value = json.loads(value) + except json.JSONDecodeError: + # str + pass + + return value + + +app = typer.Typer() + +@app.command() +def main( + ip: str = typer.Option("127.0.0.1", help="IP address of server"), + port: int = typer.Option(5000, help="Port of server"), + msg_val: str = typer.Argument(None, help="msg_val of request"), + parameter: str = typer.Argument( + None, help="Parameter subtree to get or full parameter path to set" + ), + value: str = typer.Argument(None, callback=json_value, help="Value to set"), +): + if msg_val == CONFIGURE and None in (parameter, value): + # Allow `configure` without a value as a shorthand for `request_configuration` + msg_val = MsgVal.request_configuration + if value is not None and msg_val != CONFIGURE: + raise ValueError("Value only valid for configure") + + client = OdinZMQClient(f"{ip}:{port}") + + if parameter: + parameter_path = parameter.strip("/").split("/") + else: + parameter_path = [] + + if msg_val: + if msg_val == CONFIGURE: + response = client.configure(parameter_path, value) + print(f"Response:\n{response}") + else: + response = client.send_request(msg_val) + if parameter_path: + # Filter output to subtree + for node in parameter_path: + try: + response[PARAMS] = response[PARAMS][node] + except Exception: + raise ValueError(f"Invalid path {parameter}") + + # Insert value with full parameter path as key + response[parameter] = response.pop(PARAMS) + + print(f"Response:\n{dumps(response)}") + else: + for msg_val in MsgVal.choices: + response = client.send_request(msg_val) + print(f"Response:\n{dumps(response)}") + + +if __name__ == "__main__": + typer.run(main)