diff --git a/plico/__version__.py b/plico/__version__.py index 29e50be..deab738 100644 --- a/plico/__version__.py +++ b/plico/__version__.py @@ -1,3 +1,3 @@ -VERSION = (0, 29, 0) +VERSION = (0, 30, 0) __version__ = '.'.join(map(str, VERSION)) diff --git a/plico/utils/process_monitor_runner.py b/plico/utils/process_monitor_runner.py new file mode 100644 index 0000000..c9f9b16 --- /dev/null +++ b/plico/utils/process_monitor_runner.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python + +import time +import sys +import signal +import os +import subprocess +import psutil +from plico.utils.base_runner import BaseRunner +from plico.utils.decorator import override +from plico.utils.logger import Logger +from plico.types.server_info import ServerInfo + + +# Windows old versions +if not hasattr(os, 'EX_OK'): + os.EX_OK = 0 + + +def RUNNING_MESSAGE(server_name): + '''Return a running message customized for the managed server name''' + return f'Monitor of {server_name} processes is running' + + +class ProcessMonitorRunner(BaseRunner): + + def __init__(self, server_process_name, + runner_config_section='processMonitor', + default_server_config_prefix=None): + BaseRunner.__init__(self) + self._my_config_section = runner_config_section + self._server_process_name = server_process_name + self._default_server_config_prefix = default_server_config_prefix + + INITIALIZED_LATER = None + self._prefix = INITIALIZED_LATER + self._logger= INITIALIZED_LATER + self._processes= [] + self._timeToDie= False + + def _determineInstalledBinaryDir(self): + try: + self._binFolder= self._configuration.getValue( + self._my_config_section, + 'binaries_installation_directory') + except KeyError: + self._binFolder= None + + def _logRunning(self): + self._logger.notice(RUNNING_MESSAGE(self._server_process_name)) + sys.stdout.flush() + + def _setSignalIntHandler(self): + signal.signal(signal.SIGINT, self._signalHandling) + + def _signalHandling(self, signalNumber, stackFrame): + self._logger.notice("Received signal %d (%s)" % + (signalNumber, str(stackFrame))) + if signalNumber == signal.SIGINT: + self._timeToDie= True + + def _terminateAll(self): + + def on_terminate(proc): + self._logger.notice( + "process {} terminated with exit code {}". + format(proc, proc.returncode)) + + self._logger.notice("Terminating all subprocesses using psutil") + self._logger.notice("My pid %d" % os.getpid()) + parent = psutil.Process(os.getpid()) + processes = parent.children(recursive=True) + for process in processes: + try: + self._logger.notice( + "Killing pid %d %s" % (process.pid, process.cmdline())) + process.send_signal(signal.SIGTERM) + except Exception as e: + self._logger.error("Failed killing process %s: %s" % + (str(process), str(e))) + _, alive = psutil.wait_procs(processes, + timeout=10, + callback=on_terminate) + if alive: + for p in alive: + self._logger.notice( + "process %s survived SIGTERM; giving up" % str(p)) + + self._logger.notice("terminated all") + + def serverInfo(self): + sections = self._configuration.numberedSectionList(prefix=self._prefix) + info = [] + for section in sections: + name = self._configuration.getValue(section, 'name') + host = self._configuration.getValue(section, 'host') + port = self._configuration.getValue(section, 'port') + controller_info = ServerInfo(name, 0, host, port) + info.append(controller_info) + return info + + def _spawnController(self, name, section): + if self._binFolder: + cmd= [os.path.join(self._binFolder, name)] + else: + cmd= [name] + cmd += [self._configuration._filename, section] + self._logger.notice("controller cmd is %s" % cmd) + controller= subprocess.Popen(cmd) + self._processes.append(controller) + return controller + + def _setup(self): + self._logger= Logger.of(self.name) + self._setSignalIntHandler() + self._logger.notice(f"Creating process {self.name}") + self._determineInstalledBinaryDir() + + # Get the prefix for servers in configuration file, mandatory + try: + self._prefix = self._configuration.getValue(self._my_config_section, + 'server_config_prefix') + except KeyError: + if not self._default_server_config_prefix: + self._logger.error('Key "server_config_prefix" missing from process monitor configuration' + ' and no default given') + raise + else: + self._prefix = self._default_server_config_prefix + + # Get the spawn delay, default = 1 second + try: + delay = self._configuration.getValue(self._my_config_section, + 'spawn_delay', getfloat=True) + except KeyError: + self._logger.warn('Key "spawn_delay" missing from process monitor configuration, using default delay = 1 second') + delay = 1 + + # Get the process monitor network port, mandatory + try: + port = self._configuration.getValue(self._my_config_section, + 'port', getint=True) + except KeyError: + self._logger.error('Key "port" missing from process monitor configuration') + raise + + sections = self._configuration.numberedSectionList(prefix=self._prefix) + for section in sections: + self._spawnController(self._server_process_name, section) + time.sleep(delay) + self._replySocket = self.rpc().replySocket(port) + + def _handleRequest(self): + '''Handler for serverInfo''' + self.rpc().handleRequest(self, self._replySocket, multi=True) + + def _runLoop(self): + self._logRunning() + while self._timeToDie is False: + self._handleRequest() + time.sleep(0.1) + self._terminateAll() + + @override + def run(self): + self._setup() + self._runLoop() + return os.EX_OK + + @override + def terminate(self, signal, frame): + self._logger.notice("Terminating..") + self._terminateAll() diff --git a/test/integration/__init__.py b/test/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/integration/calib/calib.txt b/test/integration/calib/calib.txt new file mode 100644 index 0000000..8e7503f --- /dev/null +++ b/test/integration/calib/calib.txt @@ -0,0 +1 @@ +just a stub for calibrations \ No newline at end of file diff --git a/test/integration/conffiles/plico.conf b/test/integration/conffiles/plico.conf new file mode 100644 index 0000000..9c707f1 --- /dev/null +++ b/test/integration/conffiles/plico.conf @@ -0,0 +1,17 @@ + +[processMonitor] +name= Monitor of plico processes +host= localhost +port= 5030 +binaries_installation_directory= test/integration/tmp/apps/bin +server_config_prefix = dummy + +[global] +app_name= inaf.arcetri.ao.plico +app_author= INAF Arcetri Adaptive Optics +python_package_name= plico +force_log_dir= test/integration/tmp/log +force_calib_folder_dest= test/integration/tmp/calib + + + diff --git a/test/integration/integration_test.py b/test/integration/integration_test.py new file mode 100644 index 0000000..efd2447 --- /dev/null +++ b/test/integration/integration_test.py @@ -0,0 +1,136 @@ +import os +import sys +import subprocess +import shutil +import unittest +import logging +from functools import wraps + +from test.test_helper import TestHelper, Poller, MessageInFileProbe +from plico.utils.logger import Logger +from plico.utils.configuration import Configuration +from plico.utils.process_monitor_runner import RUNNING_MESSAGE + + + +runner_main = '''#!/usr/bin/env python +import sys +from plico.utils.process_monitor_runner import ProcessMonitorRunner + +if __name__ == '__main__': + runner = ProcessMonitorRunner(server_process_name='plico', + runner_config_section='processMonitor') + sys.exit(runner.start(sys.argv)) +''' + + +def _dumpEnterAndExit(enterMessage, exitMessage, f, self, *args, **kwds): + doDump = True + if doDump: + print(enterMessage) + res = f(self, *args, **kwds) + if doDump: + print(exitMessage) + return res + + +def dumpEnterAndExit(enterMessage, exitMessage): + + def wrapperFunc(f): + + @wraps(f) + def wrapper(self, *args, **kwds): + return _dumpEnterAndExit(enterMessage, exitMessage, + f, self, *args, **kwds) + + return wrapper + + return wrapperFunc + + +@unittest.skipIf(sys.platform == "win32", + "Integration test doesn't run on Windows. Fix it!") +class IntegrationTest(unittest.TestCase): + + TEST_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), + "./tmp/") + LOG_DIR = os.path.join(TEST_DIR, "log") + CONF_FILE = 'test/integration/conffiles/plico.conf' + CALIB_FOLDER = 'test/integration/calib' + CONF_SECTION = 'processMonitor' + RUNNING_MESSAGE = RUNNING_MESSAGE(server_name='plico') + SERVER_LOG_PATH = os.path.join(LOG_DIR, "%s.log" % CONF_SECTION) + SERVER_PREFIX = 'test_server' + BIN_DIR = os.path.join(TEST_DIR, "apps", "bin") + RUN_FILE = os.path.join(BIN_DIR, 'run_integration_test.py') + SOURCE_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), + "../..") + + def setUp(self): + self._setUpBasicLogging() + self.server = None + self._wasSuccessful = False + + self._removeTestFolderIfItExists() + self._makeTestDir() + self.configuration = Configuration() + self.configuration.load(self.CONF_FILE) + + calibrationRootDir = self.configuration.calibrationRootDir() + self._setUpCalibrationTempFolder(calibrationRootDir) + print("Setup completed") + + def _setUpBasicLogging(self): + logging.basicConfig(level=logging.DEBUG) + self._logger = Logger.of('Integration Test') + + def _makeTestDir(self): + os.makedirs(self.TEST_DIR) + os.makedirs(self.LOG_DIR) + os.makedirs(self.BIN_DIR) + + def _setUpCalibrationTempFolder(self, calibTempFolder): + shutil.copytree(self.CALIB_FOLDER, + calibTempFolder) + + def _removeTestFolderIfItExists(self): + if os.path.exists(self.TEST_DIR): + shutil.rmtree(self.TEST_DIR) + + @dumpEnterAndExit("tearing down", "teared down") + def tearDown(self): + TestHelper.dumpFileToStdout(self.SERVER_LOG_PATH) + + if self.server is not None: + TestHelper.terminateSubprocess(self.server) + + if self._wasSuccessful: + self._removeTestFolderIfItExists() + + @dumpEnterAndExit("creating starter scripts", "starter scripts created") + def _createStarterScripts(self): + with open(self.RUN_FILE, 'w') as f: + f.write(runner_main) + if not sys.platform == "win32": + subprocess.call(f'chmod +x "{self.RUN_FILE}"', shell=True) + + @dumpEnterAndExit("starting processes", "processes started") + def _startProcesses(self): + serverLog = open(self.SERVER_LOG_PATH, "wb") + self.server = subprocess.Popen( + [self.RUN_FILE, + self.CONF_FILE, + self.CONF_SECTION], + stdout=serverLog, stderr=serverLog) + Poller(5).check(MessageInFileProbe( + self.RUNNING_MESSAGE, self.SERVER_LOG_PATH)) + + def test_main(self): + self._createStarterScripts() + self._startProcesses() + self._wasSuccessful = True + + +if __name__ == "__main__": + unittest.main() +