Skip to content

Commit

Permalink
Merge pull request #11 from ArcetriAdaptiveOptics/process_monitor
Browse files Browse the repository at this point in the history
Generic ProcessMonitorRunner in plico, instead of having a separate one in each sub-package.

The separate ones were almost identical, but with small differences (bugfixes etc) that were often not propagated to each other.
In addition, the "port" configuration key of the processMonitor section was ignored, and instead hard-coded in Constants, it is now read from configuration file.
I added a new optional keyword "server_config_prefix", that sets the server configuration header: if the prefix is "camera", the sections will be "camera1", "camera2" etc. If not set, a default value is used if provided.
  • Loading branch information
alfiopuglisi authored Jul 23, 2024
2 parents 935aa06 + 25e3765 commit 27d94e5
Show file tree
Hide file tree
Showing 6 changed files with 328 additions and 1 deletion.
2 changes: 1 addition & 1 deletion plico/__version__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
VERSION = (0, 29, 0)
VERSION = (0, 30, 0)

__version__ = '.'.join(map(str, VERSION))
173 changes: 173 additions & 0 deletions plico/utils/process_monitor_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
#!/usr/bin/env python

import time
import sys
import signal
import os
import subprocess
import psutil
from plico.utils.base_runner import BaseRunner
from plico.utils.decorator import override
from plico.utils.logger import Logger
from plico.types.server_info import ServerInfo


# Windows old versions
if not hasattr(os, 'EX_OK'):
os.EX_OK = 0


def RUNNING_MESSAGE(server_name):
'''Return a running message customized for the managed server name'''
return f'Monitor of {server_name} processes is running'


class ProcessMonitorRunner(BaseRunner):

def __init__(self, server_process_name,
runner_config_section='processMonitor',
default_server_config_prefix=None):
BaseRunner.__init__(self)
self._my_config_section = runner_config_section
self._server_process_name = server_process_name
self._default_server_config_prefix = default_server_config_prefix

INITIALIZED_LATER = None
self._prefix = INITIALIZED_LATER
self._logger= INITIALIZED_LATER
self._processes= []
self._timeToDie= False

def _determineInstalledBinaryDir(self):
try:
self._binFolder= self._configuration.getValue(
self._my_config_section,
'binaries_installation_directory')
except KeyError:
self._binFolder= None

def _logRunning(self):
self._logger.notice(RUNNING_MESSAGE(self._server_process_name))
sys.stdout.flush()

def _setSignalIntHandler(self):
signal.signal(signal.SIGINT, self._signalHandling)

def _signalHandling(self, signalNumber, stackFrame):
self._logger.notice("Received signal %d (%s)" %
(signalNumber, str(stackFrame)))
if signalNumber == signal.SIGINT:
self._timeToDie= True

def _terminateAll(self):

def on_terminate(proc):
self._logger.notice(
"process {} terminated with exit code {}".
format(proc, proc.returncode))

self._logger.notice("Terminating all subprocesses using psutil")
self._logger.notice("My pid %d" % os.getpid())
parent = psutil.Process(os.getpid())
processes = parent.children(recursive=True)
for process in processes:
try:
self._logger.notice(
"Killing pid %d %s" % (process.pid, process.cmdline()))
process.send_signal(signal.SIGTERM)
except Exception as e:
self._logger.error("Failed killing process %s: %s" %
(str(process), str(e)))
_, alive = psutil.wait_procs(processes,
timeout=10,
callback=on_terminate)
if alive:
for p in alive:
self._logger.notice(
"process %s survived SIGTERM; giving up" % str(p))

self._logger.notice("terminated all")

def serverInfo(self):
sections = self._configuration.numberedSectionList(prefix=self._prefix)
info = []
for section in sections:
name = self._configuration.getValue(section, 'name')
host = self._configuration.getValue(section, 'host')
port = self._configuration.getValue(section, 'port')
controller_info = ServerInfo(name, 0, host, port)
info.append(controller_info)
return info

def _spawnController(self, name, section):
if self._binFolder:
cmd= [os.path.join(self._binFolder, name)]
else:
cmd= [name]
cmd += [self._configuration._filename, section]
self._logger.notice("controller cmd is %s" % cmd)
controller= subprocess.Popen(cmd)
self._processes.append(controller)
return controller

def _setup(self):
self._logger= Logger.of(self.name)
self._setSignalIntHandler()
self._logger.notice(f"Creating process {self.name}")
self._determineInstalledBinaryDir()

# Get the prefix for servers in configuration file, mandatory
try:
self._prefix = self._configuration.getValue(self._my_config_section,
'server_config_prefix')
except KeyError:
if not self._default_server_config_prefix:
self._logger.error('Key "server_config_prefix" missing from process monitor configuration'
' and no default given')
raise
else:
self._prefix = self._default_server_config_prefix

# Get the spawn delay, default = 1 second
try:
delay = self._configuration.getValue(self._my_config_section,
'spawn_delay', getfloat=True)
except KeyError:
self._logger.warn('Key "spawn_delay" missing from process monitor configuration, using default delay = 1 second')
delay = 1

# Get the process monitor network port, mandatory
try:
port = self._configuration.getValue(self._my_config_section,
'port', getint=True)
except KeyError:
self._logger.error('Key "port" missing from process monitor configuration')
raise

sections = self._configuration.numberedSectionList(prefix=self._prefix)
for section in sections:
self._spawnController(self._server_process_name, section)
time.sleep(delay)
self._replySocket = self.rpc().replySocket(port)

def _handleRequest(self):
'''Handler for serverInfo'''
self.rpc().handleRequest(self, self._replySocket, multi=True)

def _runLoop(self):
self._logRunning()
while self._timeToDie is False:
self._handleRequest()
time.sleep(0.1)
self._terminateAll()

@override
def run(self):
self._setup()
self._runLoop()
return os.EX_OK

@override
def terminate(self, signal, frame):
self._logger.notice("Terminating..")
self._terminateAll()
Empty file added test/integration/__init__.py
Empty file.
1 change: 1 addition & 0 deletions test/integration/calib/calib.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
just a stub for calibrations
17 changes: 17 additions & 0 deletions test/integration/conffiles/plico.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

[processMonitor]
name= Monitor of plico processes
host= localhost
port= 5030
binaries_installation_directory= test/integration/tmp/apps/bin
server_config_prefix = dummy

[global]
app_name= inaf.arcetri.ao.plico
app_author= INAF Arcetri Adaptive Optics
python_package_name= plico
force_log_dir= test/integration/tmp/log
force_calib_folder_dest= test/integration/tmp/calib



136 changes: 136 additions & 0 deletions test/integration/integration_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import os
import sys
import subprocess
import shutil
import unittest
import logging
from functools import wraps

from test.test_helper import TestHelper, Poller, MessageInFileProbe
from plico.utils.logger import Logger
from plico.utils.configuration import Configuration
from plico.utils.process_monitor_runner import RUNNING_MESSAGE



runner_main = '''#!/usr/bin/env python
import sys
from plico.utils.process_monitor_runner import ProcessMonitorRunner
if __name__ == '__main__':
runner = ProcessMonitorRunner(server_process_name='plico',
runner_config_section='processMonitor')
sys.exit(runner.start(sys.argv))
'''


def _dumpEnterAndExit(enterMessage, exitMessage, f, self, *args, **kwds):
doDump = True
if doDump:
print(enterMessage)
res = f(self, *args, **kwds)
if doDump:
print(exitMessage)
return res


def dumpEnterAndExit(enterMessage, exitMessage):

def wrapperFunc(f):

@wraps(f)
def wrapper(self, *args, **kwds):
return _dumpEnterAndExit(enterMessage, exitMessage,
f, self, *args, **kwds)

return wrapper

return wrapperFunc


@unittest.skipIf(sys.platform == "win32",
"Integration test doesn't run on Windows. Fix it!")
class IntegrationTest(unittest.TestCase):

TEST_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)),
"./tmp/")
LOG_DIR = os.path.join(TEST_DIR, "log")
CONF_FILE = 'test/integration/conffiles/plico.conf'
CALIB_FOLDER = 'test/integration/calib'
CONF_SECTION = 'processMonitor'
RUNNING_MESSAGE = RUNNING_MESSAGE(server_name='plico')
SERVER_LOG_PATH = os.path.join(LOG_DIR, "%s.log" % CONF_SECTION)
SERVER_PREFIX = 'test_server'
BIN_DIR = os.path.join(TEST_DIR, "apps", "bin")
RUN_FILE = os.path.join(BIN_DIR, 'run_integration_test.py')
SOURCE_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)),
"../..")

def setUp(self):
self._setUpBasicLogging()
self.server = None
self._wasSuccessful = False

self._removeTestFolderIfItExists()
self._makeTestDir()
self.configuration = Configuration()
self.configuration.load(self.CONF_FILE)

calibrationRootDir = self.configuration.calibrationRootDir()
self._setUpCalibrationTempFolder(calibrationRootDir)
print("Setup completed")

def _setUpBasicLogging(self):
logging.basicConfig(level=logging.DEBUG)
self._logger = Logger.of('Integration Test')

def _makeTestDir(self):
os.makedirs(self.TEST_DIR)
os.makedirs(self.LOG_DIR)
os.makedirs(self.BIN_DIR)

def _setUpCalibrationTempFolder(self, calibTempFolder):
shutil.copytree(self.CALIB_FOLDER,
calibTempFolder)

def _removeTestFolderIfItExists(self):
if os.path.exists(self.TEST_DIR):
shutil.rmtree(self.TEST_DIR)

@dumpEnterAndExit("tearing down", "teared down")
def tearDown(self):
TestHelper.dumpFileToStdout(self.SERVER_LOG_PATH)

if self.server is not None:
TestHelper.terminateSubprocess(self.server)

if self._wasSuccessful:
self._removeTestFolderIfItExists()

@dumpEnterAndExit("creating starter scripts", "starter scripts created")
def _createStarterScripts(self):
with open(self.RUN_FILE, 'w') as f:
f.write(runner_main)
if not sys.platform == "win32":
subprocess.call(f'chmod +x "{self.RUN_FILE}"', shell=True)

@dumpEnterAndExit("starting processes", "processes started")
def _startProcesses(self):
serverLog = open(self.SERVER_LOG_PATH, "wb")
self.server = subprocess.Popen(
[self.RUN_FILE,
self.CONF_FILE,
self.CONF_SECTION],
stdout=serverLog, stderr=serverLog)
Poller(5).check(MessageInFileProbe(
self.RUNNING_MESSAGE, self.SERVER_LOG_PATH))

def test_main(self):
self._createStarterScripts()
self._startProcesses()
self._wasSuccessful = True


if __name__ == "__main__":
unittest.main()

0 comments on commit 27d94e5

Please sign in to comment.