Skip to content

Commit

Permalink
Set process priority and CPU affinity of service as in config.
Browse files Browse the repository at this point in the history
  • Loading branch information
ehpor committed Sep 9, 2024
1 parent 2460bb7 commit 48e0a74
Showing 1 changed file with 55 additions and 1 deletion.
56 changes: 55 additions & 1 deletion catkit2/testbed/testbed.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,34 @@
import zmq
import numpy as np

from ..catkit_bindings import LogForwarder, Server, ServiceState, DataStream, get_timestamp, is_alive_state, Client
from ..catkit_bindings import LogForwarder, Server, ServiceState, DataStream, get_timestamp, is_alive_state, Client, get_host_name
from .logging import *

from ..proto import testbed_pb2 as testbed_proto
from ..proto import service_pb2 as service_proto


SERVICE_LIVELINESS = 5


if sys.platform == 'win32':
NICE_VALUES = {
'idle': psutil.IDLE_PRIORITY_CLASS,
'below_normal': psutil.BELOW_NORMAL_PRIORITY_CLASS,
'normal': psutil.NORMAL_PRIORITY_CLASS,
'above_normal': psutil.ABOVE_NORMAL_PRIORITY_CLASS,
'high': psutil.HIGH_PRIORITY_CLASS
}
else:
NICE_VALUES = {
'idle': 20,
'below_normal': 10,
'normal': 0,
'above_normal': -10,
'high': -20
}


def get_unused_port(num_ports=1):
'''Get port numbers that are unused.
Expand Down Expand Up @@ -169,6 +189,8 @@ def __init__(self, port, is_simulated, config):
self.host = '127.0.0.1'
self.port = port

self.host_name = get_host_name()

self.logging_ingress_port = 0
self.logging_egress_port = 0
self.data_logging_ingress_port = 0
Expand Down Expand Up @@ -620,6 +642,38 @@ def start_service(self, service_id):

self.launched_processes.append(process)

# Set CPU affinity
if 'cpu_affinity' in self.config['testbed']:
affinity_config = self.config['testbed']['cpu_affinity']

# Only use affinity if there is an entry for our host name.
if self.host_name in affinity_config:
default_affinity = affinity_config[self.host_name].get('default')

affinity = affinity_config[self.host_name].get(service_id, default_affinity)

# Only set affinity if there was one for this service or if there was a default.
if affinity:
self.services[service_id].process.cpu_affinity(affinity)

self.log.debug(f'with CPU affinity to {affinity}.')

# Set process priority
if 'process_priority' in self.config['testbed']:
priority_config = self.config['testbed']['process_priority']

# Only use priority if there is an entry for our host name.
if self.host_name in priority_config:
default_priority = priority_config[self.host_name].get('default', None)

priority = priority_config[self.host_name].get(service_id, default_priority)

# Only set priority if there was one for this service or if there was a default.
if priority:
self.services[service_id].process.nice(NICE_VALUES[priority])

self.log.debug(f'with priority {priority}.')

self.log.info(f'Started service "{service_id}" with type "{service_type}".')

# Start the dependencies. This is not required but will speed things up.
Expand Down

0 comments on commit 48e0a74

Please sign in to comment.