diff --git a/catkit2/testbed/testbed.py b/catkit2/testbed/testbed.py index 3d6cb591..e88c7571 100644 --- a/catkit2/testbed/testbed.py +++ b/catkit2/testbed/testbed.py @@ -11,14 +11,34 @@ import zmq import numpy as np -from ..catkit_bindings import LogForwarder, Server, ServiceState, DataStream, get_timestamp, is_alive_state, Client +from ..catkit_bindings import LogForwarder, Server, ServiceState, DataStream, get_timestamp, is_alive_state, Client, get_host_name from .logging import * from ..proto import testbed_pb2 as testbed_proto from ..proto import service_pb2 as service_proto + SERVICE_LIVELINESS = 5 + +if sys.platform == 'win32': + NICE_VALUES = { + 'idle': psutil.IDLE_PRIORITY_CLASS, + 'below_normal': psutil.BELOW_NORMAL_PRIORITY_CLASS, + 'normal': psutil.NORMAL_PRIORITY_CLASS, + 'above_normal': psutil.ABOVE_NORMAL_PRIORITY_CLASS, + 'high': psutil.HIGH_PRIORITY_CLASS + } +else: + NICE_VALUES = { + 'idle': 20, + 'below_normal': 10, + 'normal': 0, + 'above_normal': -10, + 'high': -20 + } + + def get_unused_port(num_ports=1): '''Get port numbers that are unused. @@ -169,6 +189,8 @@ def __init__(self, port, is_simulated, config): self.host = '127.0.0.1' self.port = port + self.host_name = get_host_name() + self.logging_ingress_port = 0 self.logging_egress_port = 0 self.data_logging_ingress_port = 0 @@ -620,6 +642,38 @@ def start_service(self, service_id): self.launched_processes.append(process) + # Set CPU affinity + if 'cpu_affinity' in self.config['testbed']: + affinity_config = self.config['testbed']['cpu_affinity'] + + # Only use affinity if there is an entry for our host name. + if self.host_name in affinity_config: + default_affinity = affinity_config[self.host_name].get('default') + + affinity = affinity_config[self.host_name].get(service_id, default_affinity) + + # Only set affinity if there was one for this service or if there was a default. + if affinity: + self.services[service_id].process.cpu_affinity(affinity) + + self.log.debug(f'with CPU affinity to {affinity}.') + + # Set process priority + if 'process_priority' in self.config['testbed']: + priority_config = self.config['testbed']['process_priority'] + + # Only use priority if there is an entry for our host name. + if self.host_name in priority_config: + default_priority = priority_config[self.host_name].get('default', None) + + priority = priority_config[self.host_name].get(service_id, default_priority) + + # Only set priority if there was one for this service or if there was a default. + if priority: + self.services[service_id].process.nice(NICE_VALUES[priority]) + + self.log.debug(f'with priority {priority}.') + self.log.info(f'Started service "{service_id}" with type "{service_type}".') # Start the dependencies. This is not required but will speed things up.