From d7fdf7f385d5b91e812cec2d3af4b16520d3fd81 Mon Sep 17 00:00:00 2001 From: Alfio Puglisi Date: Mon, 15 Jul 2024 10:39:03 +0200 Subject: [PATCH 01/10] Added BaseProcessMonitorRunner --- plico/utils/base_process_monitor_runner.py | 141 ++++++++++++++ .../utils/base_process_monitor_runner_test.py | 177 ++++++++++++++++++ 2 files changed, 318 insertions(+) create mode 100644 plico/utils/base_process_monitor_runner.py create mode 100644 test/utils/base_process_monitor_runner_test.py diff --git a/plico/utils/base_process_monitor_runner.py b/plico/utils/base_process_monitor_runner.py new file mode 100644 index 0000000..1978f13 --- /dev/null +++ b/plico/utils/base_process_monitor_runner.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python + +import time +import sys +import signal +import os +import subprocess +import psutil +from plico.utils.base_runner import BaseRunner +from plico.utils.decorator import override +from plico.utils.logger import Logger +from plico.types.server_info import ServerInfo + + +class BaseProcessMonitorRunner(BaseRunner): + + + def __init__(self, name, server_config_prefix, runner_config_section, server_process_name, process_monitor_port): + BaseRunner.__init__(self) + + self._name = name + self._prefix = server_config_prefix + self._my_config_section = runner_config_section + self._my_port = process_monitor_port + self._server_process_name = server_process_name + self._logger= None + self._processes= [] + self._timeToDie= False + self._RUNNING_MESSAGE = f"{name} is running." + + def _determineInstalledBinaryDir(self): + try: + self._binFolder= self._configuration.getValue( + self._my_config_section, + 'binaries_installation_directory') + except KeyError: + self._binFolder= None + + def _logRunning(self): + self._logger.notice(self._RUNNING_MESSAGE) + sys.stdout.flush() + + def _setSignalIntHandler(self): + signal.signal(signal.SIGINT, self._signalHandling) + + def _signalHandling(self, signalNumber, stackFrame): + self._logger.notice("Received signal %d (%s)" % + (signalNumber, str(stackFrame))) + if signalNumber == signal.SIGINT: + self._timeToDie= True + + def _terminateAll(self): + + def on_terminate(proc): + self._logger.notice( + "process {} terminated with exit code {}". + format(proc, proc.returncode)) + + self._logger.notice("Terminating all subprocesses using psutil") + self._logger.notice("My pid %d" % os.getpid()) + parent = psutil.Process(os.getpid()) + processes = parent.children(recursive=True) + for process in processes: + try: + self._logger.notice( + "Killing pid %d %s" % (process.pid, process.cmdline())) + process.send_signal(signal.SIGTERM) + except Exception as e: + self._logger.error("Failed killing process %s: %s" % + (str(process), str(e))) + _, alive = psutil.wait_procs(processes, + timeout=10, + callback=on_terminate) + if alive: + for p in alive: + self._logger.notice( + "process %s survived SIGTERM; giving up" % str(p)) + + self._logger.notice("terminated all") + + def serverInfo(self): + sections = self._configuration.numberedSectionList(prefix=self._prefix) + info = [] + for section in sections: + name = self._configuration.getValue(section, 'name') + host = self._configuration.getValue(section, 'host') + port = self._configuration.getValue(section, 'port') + controller_info = ServerInfo(name, 0, host, port) + info.append(controller_info) + return info + + def _spawnController(self, name, section): + if self._binFolder: + cmd= [os.path.join(self._binFolder, name)] + else: + cmd= [name] + cmd += [self._configuration._filename, section] + self._logger.notice("MirrorController cmd is %s" % cmd) + mirrorController= subprocess.Popen(cmd) + self._processes.append(mirrorController) + return mirrorController + + def _setup(self): + self._logger= Logger.of(self._name) + self._setSignalIntHandler() + self._logger.notice(f"Creating process {self._name}") + self._determineInstalledBinaryDir() + sections = self._configuration.numberedSectionList(prefix=self._prefix) + try: + delay = self._configuration.getValue(self._my_config_section, + 'spawn_delay', getfloat=True) + except KeyError as e: + print(e) + delay = 0 + for section in sections: + self._spawnController(self._server_process_name, section) + time.sleep(delay) + self._replySocket = self.rpc().replySocket(self._my_port) + + def _handleRequest(self): + '''Handler for serverInfo''' + self.rpc().handleRequest(self, self._replySocket, multi=True) + + def _runLoop(self): + self._logRunning() + while self._timeToDie is False: + self._handleRequest() + time.sleep(1) + self._terminateAll() + + @override + def run(self): + self._setup() + self._runLoop() + return os.EX_OK + + @override + def terminate(self, signal, frame): + self._logger.notice("Terminating..") + self._terminateAll() + diff --git a/test/utils/base_process_monitor_runner_test.py b/test/utils/base_process_monitor_runner_test.py new file mode 100644 index 0000000..3f1e490 --- /dev/null +++ b/test/utils/base_process_monitor_runner_test.py @@ -0,0 +1,177 @@ +import os +import sys +import subprocess +import shutil +import unittest +import logging +import numpy as np +from test.test_helper import TestHelper, Poller, MessageInFileProbe, \ + ExecutionProbe +from plico.utils.base_process_monitor_runner import BaseProcessMonitorRunner + +from plico.utils.logger import Logger +from plico.utils.configuration import Configuration +from plico_motor_server.utils.starter_script_creator import \ + StarterScriptCreator +from plico_motor_server.utils.process_startup_helper import \ + ProcessStartUpHelper + +from functools import wraps + +CONF_STRING = ''' +[test_server1] +foo = 'bar1' + +[test_server2] +foo = 'bar2' + +[processMonitor] +spawn_delay = 2 + +[global] +app_name= inaf.arcetri.ao.plico +app_author= INAF Arcetri Adaptive Optics +python_package_name=plico +''' + +class TestRunner(BaseProcessMonitorRunner): + def __init__(self): + super.__init__(name='TestRunner', + server_config_prefix='test_server', + runner_config_section='runner', + server_process_name='test_server', + process_monitor_port=8000) + + +def _dumpEnterAndExit(enterMessage, exitMessage, f, self, *args, **kwds): + doDump = True + if doDump: + print(enterMessage) + res = f(self, *args, **kwds) + if doDump: + print(exitMessage) + return res + + +def dumpEnterAndExit(enterMessage, exitMessage): + + def wrapperFunc(f): + + @wraps(f) + def wrapper(self, *args, **kwds): + return _dumpEnterAndExit(enterMessage, exitMessage, + f, self, *args, **kwds) + + return wrapper + + return wrapperFunc + + +@unittest.skipIf(sys.platform == "win32", + "Integration test doesn't run on Windows. Fix it!") +class IntegrationTest(unittest.TestCase): + + TEST_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), + "./tmp/") + LOG_DIR = os.path.join(TEST_DIR, "log") + CALIB_FOLDER = 'test/integration/calib' + CONF_SECTION = 'processMonitor' + SERVER_LOG_PATH = os.path.join(LOG_DIR, "%s.log" % CONF_SECTION) + SERVER_PREFIX = 'test_server' + BIN_DIR = os.path.join(TEST_DIR, "apps", "bin") + CONF_DIR = os.path.join(TEST_DIR, "conf") + CONF_FILE = os.path.join(CONF_DIR, 'plico_test_runner.conf') + + SOURCE_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), + "../..") + + def setUp(self): + self._setUpBasicLogging() + self.server = None + self._wasSuccessful = False + + self._removeTestFolderIfItExists() + self._makeTestDir() + self._writeConfFile() + self.configuration = Configuration() + self.configuration.load(self.CONF_FILE) + + calibrationRootDir = self.configuration.calibrationRootDir() + self._setUpCalibrationTempFolder(calibrationRootDir) + print("Setup completed") + + def _setUpBasicLogging(self): + logging.basicConfig(level=logging.DEBUG) + self._logger = Logger.of('Integration Test') + + def _makeTestDir(self): + os.makedirs(self.TEST_DIR) + os.makedirs(self.LOG_DIR) + os.makedirs(self.BIN_DIR) + os.makedirs(self.CONF_DIR) + + def _writeConfFile(self): + with open(self.CONF_FILE, 'w') as f: + f.write(CONF_STRING) + + def _setUpCalibrationTempFolder(self, calibTempFolder): + shutil.copytree(self.CALIB_FOLDER, + calibTempFolder) + + def _removeTestFolderIfItExists(self): + if os.path.exists(self.TEST_DIR): + shutil.rmtree(self.TEST_DIR) + + @dumpEnterAndExit("tearing down", "teared down") + def tearDown(self): + TestHelper.dumpFileToStdout(self.SERVER_LOG_PATH) + + if self.server is not None: + TestHelper.terminateSubprocess(self.server) + + if self._wasSuccessful: + self._removeTestFolderIfItExists() + + @dumpEnterAndExit("creating starter scripts", "starter scripts created") + def _createStarterScripts(self): + ssc = StarterScriptCreator() + ssc.setInstallationBinDir(self.BIN_DIR) + ssc.setPythonPath(self.SOURCE_DIR) + ssc.setConfigFileDestination('$1') # Allow config file to be a script parameter + ssc.installExecutables() + + @dumpEnterAndExit("starting processes", "processes started") + def _startProcesses(self): + psh = ProcessStartUpHelper() + serverLog = open(os.path.join(self.LOG_DIR, "server.out"), "wb") + self.server = subprocess.Popen( + [psh.processProcessMonitorStartUpScriptPath(), + self.CONF_FILE, + self.CONF_SECTION], + stdout=serverLog, stderr=serverLog) + Poller(5).check(MessageInFileProbe( + TestRunner.RUNNING_MESSAGE, self.SERVER_LOG_PATH)) + + def _testProcessesActuallyStarted(self): + controllerLogFile = os.path.join( + self.LOG_DIR, + '%s%d.log' % (SERVER_PREFIX, 1)) + Poller(5).check(MessageInFileProbe( + TestRunner.RUNNING_MESSAGE, controllerLogFile)) + controller2LogFile = os.path.join( + self.LOG_DIR, + '%s%d.log' % (SERVER_PREFIX, 2)) + Poller(5).check(MessageInFileProbe( + TestRunner.RUNNING_MESSAGE, controller2LogFile)) + + def test_main(self): + self._buildClients() + self._createStarterScripts() + self._startProcesses() + self._testProcessesActuallyStarted() + self._wasSuccessful = True + + +if __name__ == "__main__": + unittest.main() + From c56b10276fe01c2ffa77de020cc8053692c2cee0 Mon Sep 17 00:00:00 2001 From: Alfio Puglisi Date: Thu, 18 Jul 2024 17:35:35 +0200 Subject: [PATCH 02/10] Added test for base process monitor runner --- test/integration/__init__.py | 0 .../base_process_monitor_runner_test.py | 86 ++++++------------- test/integration/calib/calib.txt | 1 + test/integration/conffiles/plico.conf | 16 ++++ 4 files changed, 44 insertions(+), 59 deletions(-) create mode 100644 test/integration/__init__.py rename test/{utils => integration}/base_process_monitor_runner_test.py (63%) create mode 100644 test/integration/calib/calib.txt create mode 100644 test/integration/conffiles/plico.conf diff --git a/test/integration/__init__.py b/test/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/utils/base_process_monitor_runner_test.py b/test/integration/base_process_monitor_runner_test.py similarity index 63% rename from test/utils/base_process_monitor_runner_test.py rename to test/integration/base_process_monitor_runner_test.py index 3f1e490..fc2b4e9 100644 --- a/test/utils/base_process_monitor_runner_test.py +++ b/test/integration/base_process_monitor_runner_test.py @@ -4,43 +4,33 @@ import shutil import unittest import logging -import numpy as np -from test.test_helper import TestHelper, Poller, MessageInFileProbe, \ - ExecutionProbe -from plico.utils.base_process_monitor_runner import BaseProcessMonitorRunner +from test.test_helper import TestHelper, Poller, MessageInFileProbe from plico.utils.logger import Logger from plico.utils.configuration import Configuration -from plico_motor_server.utils.starter_script_creator import \ - StarterScriptCreator -from plico_motor_server.utils.process_startup_helper import \ - ProcessStartUpHelper - -from functools import wraps -CONF_STRING = ''' -[test_server1] -foo = 'bar1' -[test_server2] -foo = 'bar2' +from functools import wraps -[processMonitor] -spawn_delay = 2 -[global] -app_name= inaf.arcetri.ao.plico -app_author= INAF Arcetri Adaptive Optics -python_package_name=plico -''' +runner_main = '''#!/usr/bin/env python +import sys +from plico.utils.base_process_monitor_runner import BaseProcessMonitorRunner class TestRunner(BaseProcessMonitorRunner): + + RUNNING_MESSAGE = 'TestRunner is running' def __init__(self): - super.__init__(name='TestRunner', - server_config_prefix='test_server', - runner_config_section='runner', - server_process_name='test_server', - process_monitor_port=8000) + super().__init__(name='TestRunner', + server_config_prefix='test_server', + runner_config_section='processMonitor', + server_process_name='test_server', + process_monitor_port=8000) + +if __name__ == '__main__': + runner = TestRunner() + sys.exit(runner.start(sys.argv)) +''' def _dumpEnterAndExit(enterMessage, exitMessage, f, self, *args, **kwds): @@ -74,14 +64,14 @@ class IntegrationTest(unittest.TestCase): TEST_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), "./tmp/") LOG_DIR = os.path.join(TEST_DIR, "log") + CONF_FILE = 'test/integration/conffiles/plico.conf' CALIB_FOLDER = 'test/integration/calib' CONF_SECTION = 'processMonitor' + RUNNING_MESSAGE = 'TestRunner is running' SERVER_LOG_PATH = os.path.join(LOG_DIR, "%s.log" % CONF_SECTION) SERVER_PREFIX = 'test_server' BIN_DIR = os.path.join(TEST_DIR, "apps", "bin") - CONF_DIR = os.path.join(TEST_DIR, "conf") - CONF_FILE = os.path.join(CONF_DIR, 'plico_test_runner.conf') - + RUN_FILE = os.path.join(BIN_DIR, 'run_integration_test.py') SOURCE_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../..") @@ -92,7 +82,6 @@ def setUp(self): self._removeTestFolderIfItExists() self._makeTestDir() - self._writeConfFile() self.configuration = Configuration() self.configuration.load(self.CONF_FILE) @@ -108,11 +97,6 @@ def _makeTestDir(self): os.makedirs(self.TEST_DIR) os.makedirs(self.LOG_DIR) os.makedirs(self.BIN_DIR) - os.makedirs(self.CONF_DIR) - - def _writeConfFile(self): - with open(self.CONF_FILE, 'w') as f: - f.write(CONF_STRING) def _setUpCalibrationTempFolder(self, calibTempFolder): shutil.copytree(self.CALIB_FOLDER, @@ -134,41 +118,25 @@ def tearDown(self): @dumpEnterAndExit("creating starter scripts", "starter scripts created") def _createStarterScripts(self): - ssc = StarterScriptCreator() - ssc.setInstallationBinDir(self.BIN_DIR) - ssc.setPythonPath(self.SOURCE_DIR) - ssc.setConfigFileDestination('$1') # Allow config file to be a script parameter - ssc.installExecutables() + with open(self.RUN_FILE, 'w') as f: + f.write(runner_main) + if not sys.platform == "win32": + subprocess.call(f'chmod +x "{self.RUN_FILE}"', shell=True) @dumpEnterAndExit("starting processes", "processes started") def _startProcesses(self): - psh = ProcessStartUpHelper() - serverLog = open(os.path.join(self.LOG_DIR, "server.out"), "wb") + serverLog = open(self.SERVER_LOG_PATH, "wb") self.server = subprocess.Popen( - [psh.processProcessMonitorStartUpScriptPath(), + [self.RUN_FILE, self.CONF_FILE, self.CONF_SECTION], stdout=serverLog, stderr=serverLog) Poller(5).check(MessageInFileProbe( - TestRunner.RUNNING_MESSAGE, self.SERVER_LOG_PATH)) - - def _testProcessesActuallyStarted(self): - controllerLogFile = os.path.join( - self.LOG_DIR, - '%s%d.log' % (SERVER_PREFIX, 1)) - Poller(5).check(MessageInFileProbe( - TestRunner.RUNNING_MESSAGE, controllerLogFile)) - controller2LogFile = os.path.join( - self.LOG_DIR, - '%s%d.log' % (SERVER_PREFIX, 2)) - Poller(5).check(MessageInFileProbe( - TestRunner.RUNNING_MESSAGE, controller2LogFile)) + self.RUNNING_MESSAGE, self.SERVER_LOG_PATH)) def test_main(self): - self._buildClients() self._createStarterScripts() self._startProcesses() - self._testProcessesActuallyStarted() self._wasSuccessful = True diff --git a/test/integration/calib/calib.txt b/test/integration/calib/calib.txt new file mode 100644 index 0000000..8e7503f --- /dev/null +++ b/test/integration/calib/calib.txt @@ -0,0 +1 @@ +just a stub for calibrations \ No newline at end of file diff --git a/test/integration/conffiles/plico.conf b/test/integration/conffiles/plico.conf new file mode 100644 index 0000000..10e9104 --- /dev/null +++ b/test/integration/conffiles/plico.conf @@ -0,0 +1,16 @@ + +[processMonitor] +name= Monitor of plico processes +host= localhost +port= 5030 +binaries_installation_directory= test/integration/tmp/apps/bin + +[global] +app_name= inaf.arcetri.ao.plico +app_author= INAF Arcetri Adaptive Optics +python_package_name= plico +force_log_dir= test/integration/tmp/log +force_calib_folder_dest= test/integration/tmp/calib + + + From ef1100fab6c66e8ba43fc1241f131d26a966cbb7 Mon Sep 17 00:00:00 2001 From: Alfio Puglisi Date: Thu, 18 Jul 2024 17:36:09 +0200 Subject: [PATCH 03/10] Version 0.30.0 --- plico/__version__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plico/__version__.py b/plico/__version__.py index 29e50be..deab738 100644 --- a/plico/__version__.py +++ b/plico/__version__.py @@ -1,3 +1,3 @@ -VERSION = (0, 29, 0) +VERSION = (0, 30, 0) __version__ = '.'.join(map(str, VERSION)) From 976e907185ab10d5b68561a9c7a2e0b698c6f4e6 Mon Sep 17 00:00:00 2001 From: Alfio Puglisi Date: Thu, 18 Jul 2024 17:54:53 +0200 Subject: [PATCH 04/10] Reduced request handling loop period to 0.1s --- plico/utils/base_process_monitor_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plico/utils/base_process_monitor_runner.py b/plico/utils/base_process_monitor_runner.py index 1978f13..75ea90b 100644 --- a/plico/utils/base_process_monitor_runner.py +++ b/plico/utils/base_process_monitor_runner.py @@ -125,7 +125,7 @@ def _runLoop(self): self._logRunning() while self._timeToDie is False: self._handleRequest() - time.sleep(1) + time.sleep(0.1) self._terminateAll() @override From 9bdc7bc04685fa1696498c99c75827ecf3733ed4 Mon Sep 17 00:00:00 2001 From: Alfio Puglisi Date: Fri, 19 Jul 2024 11:37:59 +0200 Subject: [PATCH 05/10] ProcessMonitor uses port from configuration file --- plico/utils/base_process_monitor_runner.py | 13 ++++++++++--- .../integration/base_process_monitor_runner_test.py | 4 +--- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/plico/utils/base_process_monitor_runner.py b/plico/utils/base_process_monitor_runner.py index 75ea90b..1728383 100644 --- a/plico/utils/base_process_monitor_runner.py +++ b/plico/utils/base_process_monitor_runner.py @@ -15,13 +15,12 @@ class BaseProcessMonitorRunner(BaseRunner): - def __init__(self, name, server_config_prefix, runner_config_section, server_process_name, process_monitor_port): + def __init__(self, name, server_config_prefix, runner_config_section, server_process_name): BaseRunner.__init__(self) self._name = name self._prefix = server_config_prefix self._my_config_section = runner_config_section - self._my_port = process_monitor_port self._server_process_name = server_process_name self._logger= None self._processes= [] @@ -112,10 +111,18 @@ def _setup(self): except KeyError as e: print(e) delay = 0 + + try: + port = self._configuration.getValue(self._my_config_section, + 'port', getint=True) + except KeyError: + self._logger.error('Key "port" missing from process monitor configuration') + raise + for section in sections: self._spawnController(self._server_process_name, section) time.sleep(delay) - self._replySocket = self.rpc().replySocket(self._my_port) + self._replySocket = self.rpc().replySocket(port) def _handleRequest(self): '''Handler for serverInfo''' diff --git a/test/integration/base_process_monitor_runner_test.py b/test/integration/base_process_monitor_runner_test.py index fc2b4e9..aa6b627 100644 --- a/test/integration/base_process_monitor_runner_test.py +++ b/test/integration/base_process_monitor_runner_test.py @@ -19,13 +19,11 @@ class TestRunner(BaseProcessMonitorRunner): - RUNNING_MESSAGE = 'TestRunner is running' def __init__(self): super().__init__(name='TestRunner', server_config_prefix='test_server', runner_config_section='processMonitor', - server_process_name='test_server', - process_monitor_port=8000) + server_process_name='test_server') if __name__ == '__main__': runner = TestRunner() From 32ac123e849164b9dff19c75b1a4370db2ca5cb0 Mon Sep 17 00:00:00 2001 From: Alfio Puglisi Date: Fri, 19 Jul 2024 11:51:02 +0200 Subject: [PATCH 06/10] ProcessMonitor uses name from configuration file --- plico/utils/base_process_monitor_runner.py | 11 +++++------ test/integration/base_process_monitor_runner_test.py | 7 ++++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/plico/utils/base_process_monitor_runner.py b/plico/utils/base_process_monitor_runner.py index 1728383..7d76a6a 100644 --- a/plico/utils/base_process_monitor_runner.py +++ b/plico/utils/base_process_monitor_runner.py @@ -14,18 +14,17 @@ class BaseProcessMonitorRunner(BaseRunner): + RUNNING_MESSAGE = 'Running as BaseProcessMonitorRunner, something is wrong' - def __init__(self, name, server_config_prefix, runner_config_section, server_process_name): + def __init__(self, runner_config_section, server_config_prefix, server_process_name): BaseRunner.__init__(self) - self._name = name self._prefix = server_config_prefix self._my_config_section = runner_config_section self._server_process_name = server_process_name self._logger= None self._processes= [] self._timeToDie= False - self._RUNNING_MESSAGE = f"{name} is running." def _determineInstalledBinaryDir(self): try: @@ -36,7 +35,7 @@ def _determineInstalledBinaryDir(self): self._binFolder= None def _logRunning(self): - self._logger.notice(self._RUNNING_MESSAGE) + self._logger.notice(self.RUNNING_MESSAGE) sys.stdout.flush() def _setSignalIntHandler(self): @@ -100,9 +99,9 @@ def _spawnController(self, name, section): return mirrorController def _setup(self): - self._logger= Logger.of(self._name) + self._logger= Logger.of(self.name) self._setSignalIntHandler() - self._logger.notice(f"Creating process {self._name}") + self._logger.notice(f"Creating process {self.name}") self._determineInstalledBinaryDir() sections = self._configuration.numberedSectionList(prefix=self._prefix) try: diff --git a/test/integration/base_process_monitor_runner_test.py b/test/integration/base_process_monitor_runner_test.py index aa6b627..f010e5d 100644 --- a/test/integration/base_process_monitor_runner_test.py +++ b/test/integration/base_process_monitor_runner_test.py @@ -19,9 +19,10 @@ class TestRunner(BaseProcessMonitorRunner): + RUNNING_MESSAGE = 'Monitor of plico processes is running' + def __init__(self): - super().__init__(name='TestRunner', - server_config_prefix='test_server', + super().__init__(server_config_prefix='test_server', runner_config_section='processMonitor', server_process_name='test_server') @@ -65,7 +66,7 @@ class IntegrationTest(unittest.TestCase): CONF_FILE = 'test/integration/conffiles/plico.conf' CALIB_FOLDER = 'test/integration/calib' CONF_SECTION = 'processMonitor' - RUNNING_MESSAGE = 'TestRunner is running' + RUNNING_MESSAGE = 'Monitor of plico processes is running' SERVER_LOG_PATH = os.path.join(LOG_DIR, "%s.log" % CONF_SECTION) SERVER_PREFIX = 'test_server' BIN_DIR = os.path.join(TEST_DIR, "apps", "bin") From 5e0582a6e6a975ac2c6518bee26e8e6a4dc278ae Mon Sep 17 00:00:00 2001 From: Alfio Puglisi Date: Fri, 19 Jul 2024 15:04:19 +0200 Subject: [PATCH 07/10] Process monitor server prefix from config. RUNNING_MESSAGE defined with an abstract method so that derived classes can override it. --- plico/utils/base_process_monitor_runner.py | 49 ++++++++++++++----- .../base_process_monitor_runner_test.py | 9 ++-- test/integration/conffiles/plico.conf | 1 + 3 files changed, 40 insertions(+), 19 deletions(-) diff --git a/plico/utils/base_process_monitor_runner.py b/plico/utils/base_process_monitor_runner.py index 7d76a6a..1b09071 100644 --- a/plico/utils/base_process_monitor_runner.py +++ b/plico/utils/base_process_monitor_runner.py @@ -5,6 +5,7 @@ import signal import os import subprocess +from abc import ABC, abstractmethod import psutil from plico.utils.base_runner import BaseRunner from plico.utils.decorator import override @@ -12,17 +13,28 @@ from plico.types.server_info import ServerInfo -class BaseProcessMonitorRunner(BaseRunner): +class BaseProcessMonitorRunner(BaseRunner, ABC): - RUNNING_MESSAGE = 'Running as BaseProcessMonitorRunner, something is wrong' + @classmethod + def RUNNING_MESSAGE(cls): + '''Returns a running message customized for the managed server name''' + return 'Monitor of ' + cls.server_process_name() + ' processes is running' - def __init__(self, runner_config_section, server_config_prefix, server_process_name): - BaseRunner.__init__(self) + @classmethod + @abstractmethod + def server_process_name(cls): + '''Returns the managed server name. + Implemented in the class so that integration tests can read it + without having to instantiate a dummy instance''' + pass - self._prefix = server_config_prefix + def __init__(self, runner_config_section='processMonitor'): + BaseRunner.__init__(self) self._my_config_section = runner_config_section - self._server_process_name = server_process_name - self._logger= None + + INITIALIZED_LATER = None + self._prefix = INITIALIZED_LATER + self._logger= INITIALIZED_LATER self._processes= [] self._timeToDie= False @@ -35,7 +47,7 @@ def _determineInstalledBinaryDir(self): self._binFolder= None def _logRunning(self): - self._logger.notice(self.RUNNING_MESSAGE) + self._logger.notice(self.RUNNING_MESSAGE()) sys.stdout.flush() def _setSignalIntHandler(self): @@ -103,14 +115,24 @@ def _setup(self): self._setSignalIntHandler() self._logger.notice(f"Creating process {self.name}") self._determineInstalledBinaryDir() - sections = self._configuration.numberedSectionList(prefix=self._prefix) + + # Get the prefix for servers in configuration file, mandatory + try: + self._prefix = self._configuration.getValue(self._my_config_section, + 'server_config_prefix') + except KeyError: + self._logger.error('Key "server_config_prefix" missing from process monitor configuration') + raise + + # Get the spawn delay, default = 1 second try: delay = self._configuration.getValue(self._my_config_section, 'spawn_delay', getfloat=True) - except KeyError as e: - print(e) - delay = 0 + except KeyError: + self._logger.warn('Key "spawn_delay" missing from process monitor configuration, using default delay = 1 second') + delay = 1 + # Get the process monitor network port, mandatory try: port = self._configuration.getValue(self._my_config_section, 'port', getint=True) @@ -118,8 +140,9 @@ def _setup(self): self._logger.error('Key "port" missing from process monitor configuration') raise + sections = self._configuration.numberedSectionList(prefix=self._prefix) for section in sections: - self._spawnController(self._server_process_name, section) + self._spawnController(self.server_process_name(), section) time.sleep(delay) self._replySocket = self.rpc().replySocket(port) diff --git a/test/integration/base_process_monitor_runner_test.py b/test/integration/base_process_monitor_runner_test.py index f010e5d..0a1a7ca 100644 --- a/test/integration/base_process_monitor_runner_test.py +++ b/test/integration/base_process_monitor_runner_test.py @@ -19,12 +19,9 @@ class TestRunner(BaseProcessMonitorRunner): - RUNNING_MESSAGE = 'Monitor of plico processes is running' - - def __init__(self): - super().__init__(server_config_prefix='test_server', - runner_config_section='processMonitor', - server_process_name='test_server') + @classmethod + def server_process_name(cls): + return 'plico' if __name__ == '__main__': runner = TestRunner() diff --git a/test/integration/conffiles/plico.conf b/test/integration/conffiles/plico.conf index 10e9104..9c707f1 100644 --- a/test/integration/conffiles/plico.conf +++ b/test/integration/conffiles/plico.conf @@ -4,6 +4,7 @@ name= Monitor of plico processes host= localhost port= 5030 binaries_installation_directory= test/integration/tmp/apps/bin +server_config_prefix = dummy [global] app_name= inaf.arcetri.ao.plico From d4b6b3545eb9c9ad8e042834b1a3f92165f97f3e Mon Sep 17 00:00:00 2001 From: Alfio Puglisi Date: Fri, 19 Jul 2024 15:14:54 +0200 Subject: [PATCH 08/10] ProcessMonitorRunner does not need to be inherited --- ...or_runner.py => process_monitor_runner.py} | 24 +++++++------------ ...tor_runner_test.py => integration_test.py} | 19 ++++++--------- 2 files changed, 15 insertions(+), 28 deletions(-) rename plico/utils/{base_process_monitor_runner.py => process_monitor_runner.py} (87%) rename test/integration/{base_process_monitor_runner_test.py => integration_test.py} (92%) diff --git a/plico/utils/base_process_monitor_runner.py b/plico/utils/process_monitor_runner.py similarity index 87% rename from plico/utils/base_process_monitor_runner.py rename to plico/utils/process_monitor_runner.py index 1b09071..b1b24fd 100644 --- a/plico/utils/base_process_monitor_runner.py +++ b/plico/utils/process_monitor_runner.py @@ -5,7 +5,6 @@ import signal import os import subprocess -from abc import ABC, abstractmethod import psutil from plico.utils.base_runner import BaseRunner from plico.utils.decorator import override @@ -13,24 +12,17 @@ from plico.types.server_info import ServerInfo -class BaseProcessMonitorRunner(BaseRunner, ABC): +def RUNNING_MESSAGE(server_name): + '''Return a running message customized for the managed server name''' + return f'Monitor of {server_name} processes is running' - @classmethod - def RUNNING_MESSAGE(cls): - '''Returns a running message customized for the managed server name''' - return 'Monitor of ' + cls.server_process_name() + ' processes is running' - @classmethod - @abstractmethod - def server_process_name(cls): - '''Returns the managed server name. - Implemented in the class so that integration tests can read it - without having to instantiate a dummy instance''' - pass +class ProcessMonitorRunner(BaseRunner): - def __init__(self, runner_config_section='processMonitor'): + def __init__(self, server_process_name, runner_config_section='processMonitor'): BaseRunner.__init__(self) self._my_config_section = runner_config_section + self._server_process_name = server_process_name INITIALIZED_LATER = None self._prefix = INITIALIZED_LATER @@ -47,7 +39,7 @@ def _determineInstalledBinaryDir(self): self._binFolder= None def _logRunning(self): - self._logger.notice(self.RUNNING_MESSAGE()) + self._logger.notice(RUNNING_MESSAGE(self._server_process_name)) sys.stdout.flush() def _setSignalIntHandler(self): @@ -142,7 +134,7 @@ def _setup(self): sections = self._configuration.numberedSectionList(prefix=self._prefix) for section in sections: - self._spawnController(self.server_process_name(), section) + self._spawnController(self._server_process_name, section) time.sleep(delay) self._replySocket = self.rpc().replySocket(port) diff --git a/test/integration/base_process_monitor_runner_test.py b/test/integration/integration_test.py similarity index 92% rename from test/integration/base_process_monitor_runner_test.py rename to test/integration/integration_test.py index 0a1a7ca..efd2447 100644 --- a/test/integration/base_process_monitor_runner_test.py +++ b/test/integration/integration_test.py @@ -4,27 +4,22 @@ import shutil import unittest import logging -from test.test_helper import TestHelper, Poller, MessageInFileProbe +from functools import wraps +from test.test_helper import TestHelper, Poller, MessageInFileProbe from plico.utils.logger import Logger from plico.utils.configuration import Configuration +from plico.utils.process_monitor_runner import RUNNING_MESSAGE -from functools import wraps - runner_main = '''#!/usr/bin/env python import sys -from plico.utils.base_process_monitor_runner import BaseProcessMonitorRunner - -class TestRunner(BaseProcessMonitorRunner): - - @classmethod - def server_process_name(cls): - return 'plico' +from plico.utils.process_monitor_runner import ProcessMonitorRunner if __name__ == '__main__': - runner = TestRunner() + runner = ProcessMonitorRunner(server_process_name='plico', + runner_config_section='processMonitor') sys.exit(runner.start(sys.argv)) ''' @@ -63,7 +58,7 @@ class IntegrationTest(unittest.TestCase): CONF_FILE = 'test/integration/conffiles/plico.conf' CALIB_FOLDER = 'test/integration/calib' CONF_SECTION = 'processMonitor' - RUNNING_MESSAGE = 'Monitor of plico processes is running' + RUNNING_MESSAGE = RUNNING_MESSAGE(server_name='plico') SERVER_LOG_PATH = os.path.join(LOG_DIR, "%s.log" % CONF_SECTION) SERVER_PREFIX = 'test_server' BIN_DIR = os.path.join(TEST_DIR, "apps", "bin") From 285e9cc2d458c3a1f8b95fd90e78c4b477eeb7ec Mon Sep 17 00:00:00 2001 From: Alfio Puglisi Date: Fri, 19 Jul 2024 15:41:21 +0200 Subject: [PATCH 09/10] Workaround for Windows missing os.EX_OK bug --- plico/utils/process_monitor_runner.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/plico/utils/process_monitor_runner.py b/plico/utils/process_monitor_runner.py index b1b24fd..eff7214 100644 --- a/plico/utils/process_monitor_runner.py +++ b/plico/utils/process_monitor_runner.py @@ -12,6 +12,11 @@ from plico.types.server_info import ServerInfo +# Windows old versions +if not hasattr(os, 'EX_OK'): + os.EX_OK = 0 + + def RUNNING_MESSAGE(server_name): '''Return a running message customized for the managed server name''' return f'Monitor of {server_name} processes is running' @@ -97,10 +102,10 @@ def _spawnController(self, name, section): else: cmd= [name] cmd += [self._configuration._filename, section] - self._logger.notice("MirrorController cmd is %s" % cmd) - mirrorController= subprocess.Popen(cmd) - self._processes.append(mirrorController) - return mirrorController + self._logger.notice("controller cmd is %s" % cmd) + controller= subprocess.Popen(cmd) + self._processes.append(controller) + return controller def _setup(self): self._logger= Logger.of(self.name) @@ -159,4 +164,3 @@ def run(self): def terminate(self, signal, frame): self._logger.notice("Terminating..") self._terminateAll() - From 25e37650353acb60e19cd9ab3a915f5cb33f4033 Mon Sep 17 00:00:00 2001 From: Alfio Puglisi Date: Tue, 23 Jul 2024 15:10:34 +0200 Subject: [PATCH 10/10] Added default value for server config prefix --- plico/utils/process_monitor_runner.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/plico/utils/process_monitor_runner.py b/plico/utils/process_monitor_runner.py index eff7214..c9f9b16 100644 --- a/plico/utils/process_monitor_runner.py +++ b/plico/utils/process_monitor_runner.py @@ -24,10 +24,13 @@ def RUNNING_MESSAGE(server_name): class ProcessMonitorRunner(BaseRunner): - def __init__(self, server_process_name, runner_config_section='processMonitor'): + def __init__(self, server_process_name, + runner_config_section='processMonitor', + default_server_config_prefix=None): BaseRunner.__init__(self) self._my_config_section = runner_config_section self._server_process_name = server_process_name + self._default_server_config_prefix = default_server_config_prefix INITIALIZED_LATER = None self._prefix = INITIALIZED_LATER @@ -118,8 +121,12 @@ def _setup(self): self._prefix = self._configuration.getValue(self._my_config_section, 'server_config_prefix') except KeyError: - self._logger.error('Key "server_config_prefix" missing from process monitor configuration') - raise + if not self._default_server_config_prefix: + self._logger.error('Key "server_config_prefix" missing from process monitor configuration' + ' and no default given') + raise + else: + self._prefix = self._default_server_config_prefix # Get the spawn delay, default = 1 second try: