Skip to content

Commit

Permalink
Improve systemd services recognition
Browse files Browse the repository at this point in the history
  • Loading branch information
jeanleonov committed Oct 4, 2019
1 parent 53d8b92 commit 377a7a3
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 166 deletions.
4 changes: 2 additions & 2 deletions AppController/djinn.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3431,8 +3431,8 @@ def start_search2_role
def stop_search2_role
# Stop Solr
Djinn.log_debug('Stopping SOLR on this node.')
Djinn.log_run('systemctl stop solr')
Djinn.log_run('systemctl disable solr')
Djinn.log_run('systemctl stop appscale-solr')
Djinn.log_run('systemctl disable appscale-solr')
Djinn.log_debug('Done stopping SOLR.')
end

Expand Down
10 changes: 7 additions & 3 deletions Hermes/appscale/hermes/producers/proxy_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,9 +752,13 @@ async def get_current():
proxy_stats_list = []
for haproxy_process_name, info in HAPROXY_PROCESSES.items():
logger.debug("Processing {} haproxy stats".format(haproxy_process_name))
proxy_stats_list += await get_stats_from_one_haproxy(
info['socket'], info['configs'], net_connections
)
try:
proxy_stats_list += await get_stats_from_one_haproxy(
info['socket'], info['configs'], net_connections
)
except IOError as error:
logger.warning("Failed to read {} haproxy stats ({})"
.format(haproxy_process_name, error))

stats = ProxiesStatsSnapshot(
utc_timestamp=time.mktime(datetime.now().timetuple()),
Expand Down
178 changes: 118 additions & 60 deletions Hermes/appscale/hermes/resources/process.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
import logging
import re
import time
import re

import attr
import psutil

from appscale.admin.service_manager import ServiceManager
from appscale.common import appscale_info
from appscale.hermes import helper
from appscale.hermes.constants import SubprocessError

from appscale.hermes.unified_service_names import (
find_service_by_monit_name
)

logger = logging.getLogger(__name__)

APPSCALE_PROCESS_TAG = 'appscale'
SERVICE_NAME_PATTERN = re.compile(
r'(appscale-)?(?P<before_at>[^@]+)(@(?P<after_at>[^.]+))?.service'
)
PID_SLICE_LINE_PATTERN = re.compile(
r'(?P<pid>\d+) /sys/fs/cgroup/systemd/appscale\.slice/appscale-'
r'(?P<name>[^\.]+)\.slice/'
)


@attr.s(cmp=False, hash=False, slots=True)
Expand Down Expand Up @@ -76,12 +79,6 @@ class Process(object):
sample_time_diff = attr.ib(default=None)


MONIT_PROCESS_PATTERN = re.compile(
r"^Process \'(?P<name>[^']+)\' *\n"
r"(^ .*\n)*?"
r"^ pid +(?P<pid>\d+)\n",
re.MULTILINE
)
PROCESS_ATTRS = (
'pid', 'ppid', 'name', 'cwd', 'exe', 'cmdline', 'status', 'username',
'cpu_times', 'cpu_percent', 'memory_info', 'io_counters', 'num_threads',
Expand Down Expand Up @@ -183,55 +180,6 @@ def list_ancestors_tags(ppid):
return processes


async def get_known_processes():
""" Gets tags (e.g.: appscale, taskqueue, datastore, ...)
for appscale-related processes using all AppScale supervisors
(monit, ServiceManager and systemd).
Returns:
A dict containing tags for known processes (<PID>: <a list of tags>).
"""
known_processes = {}

try:
# Detect processes supervised by monit
output, error = await helper.subprocess('monit status', timeout=5)
except SubprocessError as err:
logger.warning('Failed to run `monit status` ({})'.format(err))
output = ''
for match in MONIT_PROCESS_PATTERN.finditer(output):
monit_name = match.group('name')
pid = int(match.group('pid'))
service = find_service_by_monit_name(monit_name)
application_id = service.get_application_id_by_monit_name(monit_name)
tags = [APPSCALE_PROCESS_TAG, service.name, monit_name]
if application_id:
tags.append('app___{}'.format(application_id))
known_processes[pid] = tags

# Detect processes supervised by AppScale ServiceManager
for server in ServiceManager.get_state():
known_processes[server.process.pid] = [APPSCALE_PROCESS_TAG, server.type]

try:
# Detect processes supervised by systemd
output, error = await helper.subprocess(
'systemctl status solr.service '
'| grep \'Main PID\' | awk \'{ print $3 }\'',
timeout=5
)
except SubprocessError as err:
logger.warning('Failed to run `systemctl status solr.service` ({})'
.format(err))
output = ''
if output.isdigit() and output != '0':
solr_pid = int(output)
known_processes[solr_pid] = [
APPSCALE_PROCESS_TAG, 'solr'
]
return known_processes


def init_process_info(psutil_process, known_processes):
""" Initializes Process entity accoring to information in psutil process
and known appscale processes.
Expand Down Expand Up @@ -278,3 +226,113 @@ def init_process_info(psutil_process, known_processes):
process.ctx_switches_voluntary = ctx_switches.voluntary
process.ctx_switches_involuntary = ctx_switches.involuntary
return process


async def get_known_processes():
""" Gets tags (e.g.: appscale, taskqueue, datastore, ...)
for appscale-related processes using systemd-provided information.
Returns:
A dict containing tags for known processes (<PID>: <a list of tags>).
"""
service_processes = await identify_appscale_service_processes()
slice_processes = await identify_appscale_slice_processes()
known_processes = service_processes
known_processes.update(slice_processes)
return known_processes


async def identify_appscale_service_processes():
""" Gets tags (e.g.: appscale, taskqueue, datastore, ...)
for appscale-related processes which are run as service.
Returns:
A dict containing tags for known processes (<PID>: <a list of tags>).
"""
known_processes = {}
for service in await identify_appscale_services():
try:
# Get Main PID for each service
show_cmd = 'systemctl show --property MainPID --value {}'.format(service)
output, error = await helper.subprocess(show_cmd, timeout=5)
except SubprocessError as err:
logger.warning('Failed to get Main PID for {} ({})'.format(service, err))
continue
output = output.strip(' \t\n')
if output.isdigit() and output != '0':
pid = int(output)
process_tags = [APPSCALE_PROCESS_TAG]
# Sample service names are:
# appscale-instance-run@testapp_default_v1_1570022208920-20000.service
# appscale-memcached.service
match = SERVICE_NAME_PATTERN.match(service)
if not match:
logger.warning('Could not parse service name "{}"'.format(service))
continue
before_at = match.group('before_at')
after_at = match.group('after_at')
process_tags.append(before_at)
if after_at:
for part in after_at.split('_'):
process_tags.append('_{}'.format(part))
known_processes[pid] = process_tags
return known_processes


async def identify_appscale_services():
""" Lists all appscale-related services.
Returns:
A list of service names.
"""
dependencies_cmd = ('cat /lib/systemd/system/appscale-*.target '
'| grep -E "^After=.*\.service$" | cut -d "=" -f 2')
try:
# Detect appscale dependency services
output, error = await helper.subprocess(dependencies_cmd, timeout=5)
services = output.strip().split('\n')
except SubprocessError as err:
logger.warning('Failed to detect appscale dependency services '
'by running `{}` ({})'.format(dependencies_cmd, err))
services = []

services_cmd = ('systemctl --no-legend list-units "appscale-*.service" '
'| cut -d " " -f 1')
try:
# Detect appscale own services
output, error = await helper.subprocess(services_cmd, timeout=5)
services += output.strip().split('\n')
except SubprocessError as err:
logger.warning('Failed to detect appscale own services '
'by running `{}` ({})'.format(services_cmd, err))
return services


async def identify_appscale_slice_processes():
""" Gets tags (e.g.: appscale, taskqueue, datastore, ...)
for processes running in appscale-slice.
Returns:
A dict containing tags for known processes (<PID>: <a list of tags>).
"""
slice_processes = (
'for slice in /sys/fs/cgroup/systemd/appscale.slice/appscale-*.slice/;'
' do sed -e "s|\$| ${slice}|" ${slice}/cgroup.procs ; done'
)
try:
# Detect appscale own services
output, error = await helper.subprocess(slice_processes, timeout=5)
except SubprocessError as err:
logger.warning('Failed to detect appscale-slice processes '
'by running {} ({})'.format(slice_processes, err))
return {}
detected_pids = {}
lines = output.strip(' \t\n').split('\n')
for line in lines:
match = PID_SLICE_LINE_PATTERN.match(line)
if not match:
logger.warning('Could not parse PID-slice line "{}"'.format(line))
continue
pid = int(match.group('pid'))
detected_pids[pid] = [APPSCALE_PROCESS_TAG, match.group('name')]
return detected_pids
Loading

0 comments on commit 377a7a3

Please sign in to comment.