diff --git a/AppController/djinn.rb b/AppController/djinn.rb index d47099023c..bb191413bb 100644 --- a/AppController/djinn.rb +++ b/AppController/djinn.rb @@ -3431,8 +3431,8 @@ def start_search2_role def stop_search2_role # Stop Solr Djinn.log_debug('Stopping SOLR on this node.') - Djinn.log_run('systemctl stop solr') - Djinn.log_run('systemctl disable solr') + Djinn.log_run('systemctl stop appscale-solr') + Djinn.log_run('systemctl disable appscale-solr') Djinn.log_debug('Done stopping SOLR.') end diff --git a/Hermes/appscale/hermes/constants.py b/Hermes/appscale/hermes/constants.py index c8e314c381..8472e3b186 100644 --- a/Hermes/appscale/hermes/constants.py +++ b/Hermes/appscale/hermes/constants.py @@ -14,11 +14,6 @@ # Stats which were produce less than X seconds ago is considered as current ACCEPTABLE_STATS_AGE = 10 -# The ZooKeeper location for storing Hermes configurations -NODES_STATS_CONFIGS_NODE = '/appscale/stats/profiling/nodes' -PROCESSES_STATS_CONFIGS_NODE = '/appscale/stats/profiling/processes' -PROXIES_STATS_CONFIGS_NODE = '/appscale/stats/profiling/proxies' - class _MissedValue(object): """ @@ -43,10 +38,6 @@ def __repr__(self): SECRET_HEADER = 'Appscale-Secret' -class HTTP_Codes(object): - """ A class with HTTP status codes. """ - HTTP_OK = 200 - HTTP_BAD_REQUEST = 400 - HTTP_DENIED = 403 - HTTP_INTERNAL_ERROR = 500 - HTTP_NOT_IMPLEMENTED = 501 +class SubprocessError(Exception): + """ Indicates that subcommand failed. """ + pass diff --git a/Hermes/appscale/hermes/handlers.py b/Hermes/appscale/hermes/handlers.py index 510cff2d25..a310e90803 100644 --- a/Hermes/appscale/hermes/handlers.py +++ b/Hermes/appscale/hermes/handlers.py @@ -138,7 +138,8 @@ async def __call__(self, request): snapshot = await snapshot self.cached_snapshot = snapshot - return web.json_response(stats_to_dict(snapshot, include_lists)) + return web.json_response(stats_to_dict(snapshot, include_lists), + content_type='application/json') class ClusterStatsHandler: @@ -215,10 +216,13 @@ async def __call__(self, request): for node_ip, snapshot in new_snapshots_dict.items() } - return web.json_response({ - "stats": rendered_snapshots, - "failures": failures - }) + return web.json_response( + { + "stats": rendered_snapshots, + "failures": failures + }, + content_type='application/json' + ) def not_found(reason): diff --git a/Hermes/appscale/hermes/helper.py b/Hermes/appscale/hermes/helper.py index 1ff7bcf29c..36c5573700 100644 --- a/Hermes/appscale/hermes/helper.py +++ b/Hermes/appscale/hermes/helper.py @@ -1,34 +1,34 @@ """ Helper functions for Hermes operations. """ -import errno -import os +import asyncio +import logging -class JSONTags(object): - """ A class containing all JSON tags used for Hermes functionality. """ - ALL_STATS = 'all_stats' - BUCKET_NAME = 'bucket_name' - BODY = 'body' - DEPLOYMENT_ID = 'deployment_id' - ERROR = 'error' - OBJECT_NAME = 'object_name' - REASON = 'reason' - STATUS = 'status' - STORAGE = 'storage' - SUCCESS = 'success' - TASK_ID = 'task_id' - TIMESTAMP = 'timestamp' - TYPE = 'type' - UNREACHABLE = 'unreachable' +from appscale.hermes.constants import SubprocessError -def ensure_directory(dir_path): - """ Ensures that the directory exists. +logger = logging.getLogger(__name__) + + +async def subprocess(command, timeout): + process = await asyncio.create_subprocess_shell( + command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + logger.debug('Started subprocess `{}` (pid: {})' + .format(command, process.pid)) - Args: - dir_path: A str representing the directory path. - """ try: - os.makedirs(dir_path) - except OSError as os_error: - if os_error.errno == errno.EEXIST and os.path.isdir(dir_path): - pass - else: - raise + # Wait for the subprocess to finish + stdout, stderr = await asyncio.wait_for(process.communicate(), timeout) + except asyncio.TimeoutError: + raise SubprocessError('Timed out waiting for subprocess `{}` (pid: {})' + .format(command, process.pid)) + + output = stdout.decode() + error = stderr.decode() + if error: + logger.warning(error) + if process.returncode != 0: + raise SubprocessError('Subprocess failed with return code {} ({})' + .format(process.returncode, error)) + + return output, error diff --git a/Hermes/appscale/hermes/hermes_server.py b/Hermes/appscale/hermes/hermes_server.py index 25add17214..b1e85e3751 100644 --- a/Hermes/appscale/hermes/hermes_server.py +++ b/Hermes/appscale/hermes/hermes_server.py @@ -25,6 +25,7 @@ from appscale.hermes.producers.rabbitmq_stats import PushQueueStatsSource from appscale.hermes.producers.rabbitmq_stats import RabbitMQStatsSource from appscale.hermes.producers.taskqueue_stats import TaskqueueStatsSource +from appscale.hermes.resources.resource_handlers import processes logger = logging.getLogger(__name__) @@ -143,12 +144,17 @@ def main(): app = web.Application(middlewares=[verify_secret_middleware]) + # Add routes for old style structured statistics. route_items = [] route_items += get_local_stats_api_routes(is_lb, is_tq, is_db) route_items += get_cluster_stats_api_routes(is_master) for route, handler in route_items: app.router.add_get(route, handler) + # Add routes for new resources API. + app.router.add_get('/v2/processes', processes.list_local) + app.router.add_get('/v2/processes/_cluster', processes.list_cluster) + logger.info("Starting Hermes on port: {}.".format(args.port)) web.run_app(app, port=args.port, access_log=logger, access_log_format='%a "%r" %s %bB %Tfs "%{User-Agent}i"') diff --git a/Hermes/appscale/hermes/producers/cassandra_stats.py b/Hermes/appscale/hermes/producers/cassandra_stats.py index 207e060725..d2411770ac 100644 --- a/Hermes/appscale/hermes/producers/cassandra_stats.py +++ b/Hermes/appscale/hermes/producers/cassandra_stats.py @@ -1,5 +1,4 @@ """ Fetches `nodetool status` info. """ -import asyncio import logging import re import time @@ -7,6 +6,8 @@ import attr from appscale.common import appscale_info +from appscale.hermes import helper +from appscale.hermes.constants import SubprocessError from appscale.hermes.converter import Meta, include_list_name # The endpoint used for retrieving queue stats. @@ -16,11 +17,6 @@ logger = logging.getLogger(__name__) -class NodetoolStatusError(Exception): - """ Indicates that `nodetool status` command failed. """ - pass - - @include_list_name('cassandra.node') @attr.s(cmp=False, hash=False, slots=True, frozen=True) class CassandraNodeStats(object): @@ -99,34 +95,8 @@ async def get_current(cls): An instance of CassandraStatsSnapshot. """ start = time.time() - - process = await asyncio.create_subprocess_shell( - NODETOOL_STATUS_COMMAND, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE - ) - logger.info('Started subprocess `{}` (pid: {})' - .format(NODETOOL_STATUS_COMMAND, process.pid)) - - try: - # Wait for the subprocess to finish - stdout, stderr = await asyncio.wait_for( - process.communicate(), NODETOOL_STATUS_TIMEOUT - ) - except asyncio.TimeoutError: - raise NodetoolStatusError( - 'Timed out waiting for subprocess `{}` (pid: {})' - .format(NODETOOL_STATUS_COMMAND, process.pid) - ) - - output = stdout.decode() - error = stderr.decode() - if error: - logger.warning(error) - if process.returncode != 0: - raise NodetoolStatusError('Subprocess failed with return code {} ({})' - .format(process.returncode, error)) - + output, error = await helper.subprocess(NODETOOL_STATUS_COMMAND, + NODETOOL_STATUS_TIMEOUT) known_db_nodes = set(appscale_info.get_db_ips()) nodes = [] shown_nodes = set() @@ -180,7 +150,7 @@ async def get_current(cls): shown_nodes.add(address) else: - raise NodetoolStatusError( + raise SubprocessError( '`{}` output does not contain expected header. Actual output:\n{}' .format(NODETOOL_STATUS_COMMAND, output) ) diff --git a/Hermes/appscale/hermes/producers/proxy_stats.py b/Hermes/appscale/hermes/producers/proxy_stats.py index 7a6764e22d..eb4b9a63b0 100644 --- a/Hermes/appscale/hermes/producers/proxy_stats.py +++ b/Hermes/appscale/hermes/producers/proxy_stats.py @@ -752,9 +752,13 @@ async def get_current(): proxy_stats_list = [] for haproxy_process_name, info in HAPROXY_PROCESSES.items(): logger.debug("Processing {} haproxy stats".format(haproxy_process_name)) - proxy_stats_list += await get_stats_from_one_haproxy( - info['socket'], info['configs'], net_connections - ) + try: + proxy_stats_list += await get_stats_from_one_haproxy( + info['socket'], info['configs'], net_connections + ) + except IOError as error: + logger.warning("Failed to read {} haproxy stats ({})" + .format(haproxy_process_name, error)) stats = ProxiesStatsSnapshot( utc_timestamp=time.mktime(datetime.now().timetuple()), diff --git a/Hermes/appscale/hermes/resources/__init__.py b/Hermes/appscale/hermes/resources/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/Hermes/appscale/hermes/resources/process.py b/Hermes/appscale/hermes/resources/process.py new file mode 100644 index 0000000000..553c7bd1fc --- /dev/null +++ b/Hermes/appscale/hermes/resources/process.py @@ -0,0 +1,338 @@ +import logging +import time +import re + +import attr +import psutil + +from appscale.common import appscale_info +from appscale.hermes import helper +from appscale.hermes.constants import SubprocessError + + +logger = logging.getLogger(__name__) + +APPSCALE_PROCESS_TAG = 'appscale' +SERVICE_NAME_PATTERN = re.compile( + r'(appscale-)?(?P[^@]+)(@(?P[^.]+))?.service' +) +PID_SLICE_LINE_PATTERN = re.compile( + r'(?P\d+) /sys/fs/cgroup/systemd/appscale\.slice/appscale-' + r'(?P[^\.]+)\.slice/' +) + + +@attr.s(cmp=False, hash=False, slots=True) +class Process(object): + """ + A container for all parameters representing process state at + a specific moment of time. + """ + # A global dict containing previous processes state. + # It is used for computing *_1h_diff attributes. + PREVIOUS_STATE = {} + + utc_timestamp = attr.ib(default=None) + host = attr.ib(default=None) + + long_pid = attr.ib(default=None) + pid = attr.ib(default=None) + ppid = attr.ib(default=None) + create_time = attr.ib(default=None) + status = attr.ib(default=None) + username = attr.ib(default=None) + cwd = attr.ib(default=None) + name = attr.ib(default=None) + exe = attr.ib(default=None) + cmdline = attr.ib(default=None) + + own_tags = attr.ib(default=None) # Tags related to the process. + all_tags = attr.ib(default=None) # Own tags + ancestors' tags. + + cpu_user = attr.ib(default=None) + cpu_system = attr.ib(default=None) + cpu_percent = attr.ib(default=None) + cpu_user_1h_diff = attr.ib(default=None) + cpu_system_1h_diff = attr.ib(default=None) + + memory_resident = attr.ib(default=None) + memory_virtual = attr.ib(default=None) + memory_shared = attr.ib(default=None) + + disk_io_read_count = attr.ib(default=None) + disk_io_write_count = attr.ib(default=None) + disk_io_read_bytes = attr.ib(default=None) + disk_io_write_bytes = attr.ib(default=None) + disk_io_read_count_1h_diff = attr.ib(default=None) + disk_io_write_count_1h_diff = attr.ib(default=None) + disk_io_read_bytes_1h_diff = attr.ib(default=None) + disk_io_write_bytes_1h_diff = attr.ib(default=None) + + threads_num = attr.ib(default=None) + file_descriptors_num = attr.ib(default=None) + + ctx_switches_voluntary = attr.ib(default=None) + ctx_switches_involuntary = attr.ib(default=None) + ctx_switches_voluntary_1h_diff = attr.ib(default=None) + ctx_switches_involuntary_1h_diff = attr.ib(default=None) + + sample_time_diff = attr.ib(default=None) + + +PROCESS_ATTRS = ( + 'pid', 'ppid', 'name', 'cwd', 'exe', 'cmdline', 'status', 'username', + 'cpu_times', 'cpu_percent', 'memory_info', 'io_counters', 'num_threads', + 'num_fds', 'num_ctx_switches', 'create_time' +) + + +async def list_resource(): + """ A coroutine which prepares a list of Process, + converts it to dictionaries. + + Returns: + A tuple (list of dict representation of Process, empty list of failures). + """ + processes = [attr.asdict(process) for process in (await list_processes())] + failed = [] + return processes, failed + + +async def list_processes(): + """ Function for building a list of Process. + + Returns: + A list of Processes. + """ + start_time = time.time() + host = appscale_info.get_private_ip() + + # Get dict with known processes (: ) + known_processes = await get_known_processes() + # Iterate through all processes and init majority of its info. + pid_to_process = { + '{}_{}_{}'.format(host, process.pid, int(process.create_time() * 1000)): + init_process_info(process, known_processes) + for process in psutil.process_iter(attrs=PROCESS_ATTRS, ad_value=None) + } + + def list_ancestors_tags(ppid): + """ A recursive function for collecting ancestors' tags. + + Args: + ppid: An int - parent PID. + Returns: + A list of ancestors' tags. + """ + parent_process = pid_to_process.get(ppid) + if not parent_process: + return [] + if parent_process.ppid in [0, 1, 2]: # Skip common root processes + return parent_process.own_tags + return parent_process.own_tags + list_ancestors_tags(parent_process.ppid) + + # Set the rest of information about processes state + for long_pid, p in pid_to_process.items(): + # Set unique process identifier + p.long_pid = long_pid + # and *_1h_diff attributes + prev = Process.PREVIOUS_STATE.get(p.long_pid) + if prev: + # Compute one hour difference coefficient + diff_coef = 60 * 60 / (start_time - prev.utc_timestamp) + # Set diff attributes + p.cpu_user_1h_diff = ( + (p.cpu_user - prev.cpu_user) * diff_coef + ) + p.cpu_system_1h_diff = ( + (p.cpu_system - prev.cpu_system) * diff_coef + ) + if p.disk_io_read_count is not None: + p.disk_io_read_count_1h_diff = ( + (p.disk_io_read_count - prev.disk_io_read_count) * diff_coef + ) + p.disk_io_write_count_1h_diff = ( + (p.disk_io_write_count - prev.disk_io_write_count) * diff_coef + ) + p.disk_io_read_bytes_1h_diff = ( + (p.disk_io_read_bytes - prev.disk_io_read_bytes) * diff_coef + ) + p.disk_io_write_bytes_1h_diff = ( + (p.disk_io_write_bytes - prev.disk_io_write_bytes) * diff_coef + ) + p.ctx_switches_voluntary_1h_diff = ( + (p.ctx_switches_voluntary - prev.ctx_switches_voluntary) * diff_coef + ) + p.ctx_switches_involuntary_1h_diff = ( + (p.ctx_switches_involuntary - prev.ctx_switches_involuntary) * diff_coef + ) + + p.utc_timestamp = start_time + p.host = host + p.all_tags += list_ancestors_tags(p.ppid) + + processes = pid_to_process.values() + logger.info( + "Prepared info about {} processes in {:.3f}s." + .format(len(processes), time.time() - start_time) + ) + Process.PREVIOUS_STATE = pid_to_process + return processes + + +def init_process_info(psutil_process, known_processes): + """ Initializes Process entity accoring to information in psutil process + and known appscale processes. + + Args: + psutil_process: An instance of psutil.Process. + known_processes: A dict - tags for known processes (: ). + Returns: + An instance of Process. + """ + process = Process() + + process_info = psutil_process.info + cpu_times = process_info['cpu_times'] + memory_info = process_info['memory_info'] + io_counters = process_info['io_counters'] + ctx_switches = process_info['num_ctx_switches'] + + # Fill psutil process attributes: + process.pid = process_info['pid'] + process.ppid = process_info['ppid'] + process.create_time = process_info['create_time'] + process.status = process_info['status'] + process.username = process_info['username'] + process.cwd = process_info['cwd'] + process.name = process_info['name'] + process.exe = process_info['exe'] + process.cmdline = process_info['cmdline'] + process.own_tags = known_processes.get(psutil_process.pid, [process.name]) + process.all_tags = process.own_tags[::] + process.cpu_user = cpu_times.user + process.cpu_system = cpu_times.system + process.cpu_percent = process_info['cpu_percent'] + process.memory_resident = memory_info.rss + process.memory_virtual = memory_info.vms + process.memory_shared = memory_info.shared + if io_counters: + process.disk_io_read_count = io_counters.read_count + process.disk_io_write_count = io_counters.write_count + process.disk_io_read_bytes = io_counters.read_bytes + process.disk_io_write_bytes = io_counters.write_bytes + process.threads_num = process_info['num_threads'] + process.file_descriptors_num = process_info['num_fds'] + process.ctx_switches_voluntary = ctx_switches.voluntary + process.ctx_switches_involuntary = ctx_switches.involuntary + return process + + +async def get_known_processes(): + """ Gets tags (e.g.: appscale, taskqueue, datastore, ...) + for appscale-related processes using systemd-provided information. + + Returns: + A dict containing tags for known processes (: ). + """ + service_processes = await identify_appscale_service_processes() + slice_processes = await identify_appscale_slice_processes() + known_processes = service_processes + known_processes.update(slice_processes) + return known_processes + + +async def identify_appscale_service_processes(): + """ Gets tags (e.g.: appscale, taskqueue, datastore, ...) + for appscale-related processes which are run as service. + + Returns: + A dict containing tags for known processes (: ). + """ + known_processes = {} + for service in await identify_appscale_services(): + try: + # Get Main PID for each service + show_cmd = 'systemctl show --property MainPID --value {}'.format(service) + output, error = await helper.subprocess(show_cmd, timeout=5) + except SubprocessError as err: + logger.warning('Failed to get Main PID for {} ({})'.format(service, err)) + continue + output = output.strip(' \t\n') + if output.isdigit() and output != '0': + pid = int(output) + process_tags = [APPSCALE_PROCESS_TAG] + # Sample service names are: + # appscale-instance-run@testapp_default_v1_1570022208920-20000.service + # appscale-memcached.service + match = SERVICE_NAME_PATTERN.match(service) + if not match: + logger.warning('Could not parse service name "{}"'.format(service)) + continue + before_at = match.group('before_at') + after_at = match.group('after_at') + process_tags.append(before_at) + if after_at: + for part in after_at.split('_'): + process_tags.append('_{}'.format(part)) + known_processes[pid] = process_tags + return known_processes + + +async def identify_appscale_services(): + """ Lists all appscale-related services. + + Returns: + A list of service names. + """ + dependencies_cmd = ('cat /lib/systemd/system/appscale-*.target ' + '| grep -E "^After=.*\.service$" | cut -d "=" -f 2') + try: + # Detect appscale dependency services + output, error = await helper.subprocess(dependencies_cmd, timeout=5) + services = output.strip().split('\n') + except SubprocessError as err: + logger.warning('Failed to detect appscale dependency services ' + 'by running `{}` ({})'.format(dependencies_cmd, err)) + services = [] + + services_cmd = ('systemctl --no-legend list-units "appscale-*.service" ' + '| cut -d " " -f 1') + try: + # Detect appscale own services + output, error = await helper.subprocess(services_cmd, timeout=5) + services += output.strip().split('\n') + except SubprocessError as err: + logger.warning('Failed to detect appscale own services ' + 'by running `{}` ({})'.format(services_cmd, err)) + return services + + +async def identify_appscale_slice_processes(): + """ Gets tags (e.g.: appscale, taskqueue, datastore, ...) + for processes running in appscale-slice. + + Returns: + A dict containing tags for known processes (: ). + """ + slice_processes = ( + 'for slice in /sys/fs/cgroup/systemd/appscale.slice/appscale-*.slice/;' + ' do sed -e "s|\$| ${slice}|" ${slice}/cgroup.procs ; done' + ) + try: + # Detect appscale own services + output, error = await helper.subprocess(slice_processes, timeout=5) + except SubprocessError as err: + logger.warning('Failed to detect appscale-slice processes ' + 'by running {} ({})'.format(slice_processes, err)) + return {} + detected_pids = {} + lines = output.strip(' \t\n').split('\n') + for line in lines: + match = PID_SLICE_LINE_PATTERN.match(line) + if not match: + logger.warning('Could not parse PID-slice line "{}"'.format(line)) + continue + pid = int(match.group('pid')) + detected_pids[pid] = [APPSCALE_PROCESS_TAG, match.group('name')] + return detected_pids diff --git a/Hermes/appscale/hermes/resources/resource_handlers.py b/Hermes/appscale/hermes/resources/resource_handlers.py new file mode 100644 index 0000000000..0db87a223c --- /dev/null +++ b/Hermes/appscale/hermes/resources/resource_handlers.py @@ -0,0 +1,257 @@ +""" Implementation of stats sources for cluster stats. """ +import asyncio +import inspect +import json +import logging + +import aiohttp +from aiohttp import web + +from appscale.common import appscale_info +from appscale.hermes import constants +from appscale.hermes.resources import process + +logger = logging.getLogger(__name__) + +# Do not run more than 100 concurrent requests to remote hermes servers. +max_concurrency = asyncio.Semaphore(100) + +# To avoid unnecessary JSON decoding and encoding, +# when listing a resource from a remote hermes, +# entities and failures are sent as two separate JSON lists connected by: +BODY_CONNECTOR = b'\n\n\xff\xff\xff\xff\n\n' + + +class HermesError(aiohttp.ClientError): + """ Represents an error while listing resource from local/remote Hermes. """ + def __init__(self, host, message): + self.host = host + self.message = message + super().__init__(message) + + +class ResourceHandler(object): + """ + A class implementing HTTP handlers for listing a monitored resource. + + It provides two public handler methods: + - list_local(request) # For listing local resource + - list_cluster(request) # For listing resource on many nodes + """ + def __init__(self, default_ips_getter, resource_name, local_source): + """ Initialised instance of ResourceHandler. + + Args: + default_ips_getter: A callable - should return a list of cluster nodes + to query resource from. + resource_name: A str - name of resource (should match name in route). + local_source: A callable (optionally async) - should return a tuple + of two lists: (entity_dicts, failure_strings). + """ + self.default_ips_getter = default_ips_getter + self.resource_name = resource_name + self.local_source = local_source + self.private_ip = appscale_info.get_private_ip() + + async def list_local(self, request): + """ A handler method to be assigned to route + 'GET /v2/'. + It accepts one optional query argument: 'return-as-2-json-objects=yes', + if it is passed, entities and failures are returned as two JSON objects + connected by BODY_CONNECTOR. + + Args: + request: An instance of aiohttp.web.Request. + Returns: + An instance of aiohttp.web.Response. + """ + entities, failures = await self._call_local_source() + + if request.query.get('return-as-2-json-objects', 'no') == 'yes': + # Return body used for joining entities without decoding JSON. + body = b'%(entities)b %(connector)b %(failures)b' % { + b'entities': json.dumps(entities).encode(), + b'connector': BODY_CONNECTOR, + b'failures': json.dumps(failures).encode() + } + return web.Response(body=body) + else: + # Return a regular JSON body. + body = b'{"entities": %(entities)b, "failures": %(failures)b}' % { + b'entities': json.dumps(entities).encode(), + b'failures': json.dumps(failures).encode() + } + return web.Response(body=body, content_type='application/json') + + async def list_cluster(self, request): + """ A handler method to be assigned to route + 'GET /v2//_cluster'. + It accepts optional JSON body containing list of locations to collect + resource entities from. e.g.: + {"locations": ["10.0.2.15", "10.0.2.16", "10.0.2.17"]} + + If body is missing, + locations returned by self.default_ips_getter() will be used. + + Args: + request: An instance of aiohttp.web.Request. + Returns: + An instance of aiohttp.web.Response. + """ + if request.has_body: + try: + locations = (await request.json())['locations'] + except (ValueError, TypeError, KeyError) as err: + reason = 'JSON body should contain "locations" attr ({}).'.format(err) + return web.Response(status=400, reason=reason) + else: + locations = self.default_ips_getter() + + joined_entities_json, failures = await self._list_resource(locations) + # As joined_entities_json is already valid JSON array, + # we're rendering final JSON body manually. + body = b'{"entities": %(entities)b, "failures": %(failures)b}' % { + b'entities': joined_entities_json, + b'failures': json.dumps(failures).encode() + } + return web.Response(body=body, content_type='application/json') + + async def _list_resource(self, hermes_locations): + """ Asynchronously collects full list of resource entities + from remote and local nodes. + + Args: + hermes_locations: a list of strings - hermes locations as [:]. + Returns: + A Future object which wraps a dict with node IP as key and + an instance of stats snapshot as value. + """ + entities_json_list = [] + failures = [] + + async def process_node_result(hermes_location): + """ Retrieves entities and failures from a particular hermes server + and appends results to local lists. + + Args: + hermes_location: A string - hermes locations as [:]. + """ + try: + entities_json, node_failures = await self._get_from_node(hermes_location) + if entities_json: + entities_json_list.append(entities_json) + if node_failures: + failures.extend(node_failures) + except HermesError as err: + failures.append({'host': err.host, 'message': err.message}) + + # Do multiple requests asynchronously and wait for all results + async with max_concurrency: + await asyncio.gather(*[ + process_node_result(location) for location in hermes_locations + ]) + + logger.info('Fetched {name} from {nodes} nodes.' + .format(name=self.resource_name, nodes=len(entities_json_list))) + + # Join raw JSON lists of entities to a single big list. + # This way we avoid extra JSON decoding and encoding. + joined_entities_json = b',\n\n'.join([ + raw_bytes.strip(b'[] ') for raw_bytes in entities_json_list + ]) + return b'[%b]' % joined_entities_json, failures + + async def _get_from_node(self, hermes_location): + """ Retrieves entities and failures from a particular hermes server + (local or remote). + + Args: + hermes_location: A string - hermes locations as [:]. + Returns: + A tuple (JSON encoded list of resource entities, list of failures). + """ + if hermes_location.split(':')[0] == self.private_ip: + # List local resource entities (and failures). + try: + entities, failures = await self._call_local_source() + entities_json = json.dumps(entities).encode() + return entities_json, failures + except Exception as err: + logger.error('Failed to prepare local stats: {err}'.format(err=err)) + raise HermesError(host=hermes_location, message=str(err)) + else: + # List remote resource entities (and failures). + entities_json, failures = await self._fetch_remote(hermes_location) + return entities_json, failures + + async def _fetch_remote(self, hermes_location): + """ Fetches resource entities from a single remote node. + + Args: + hermes_location: a string - remote hermes location as [:]. + Returns: + A tuple (JSON encoded list of resource entities, list of failures). + """ + # Security header + headers = {constants.SECRET_HEADER: appscale_info.get_secret()} + + # Determine host and port of remote hermes + if ':' in hermes_location: + host, port = hermes_location.split(':') + else: + host = hermes_location + port = constants.HERMES_PORT + + url = 'http://{host}:{port}/v2/{resource}'.format( + host=host, port=port, resource=self.resource_name + ) + try: + # Do HTTP call to remote hermes requesting body in two parts. + async with aiohttp.ClientSession() as session: + awaitable_get = session.get( + url, headers=headers, params={'return-as-2-json-objects': 'yes'}, + timeout=constants.REMOTE_REQUEST_TIMEOUT + ) + async with awaitable_get as resp: + if resp.status >= 400: + # Handler client error + err_message = 'HTTP {}: {}'.format(resp.status, resp.reason) + resp_text = await resp.text() + if resp_text: + err_message += '. {}'.format(resp_text) + logger.error("Failed to get {} ({})".format(url, err_message)) + raise HermesError(host=host, message=err_message) + + # Read body without decoding JSON + body = await resp.read() + entities_json, failures_json = body.split(BODY_CONNECTOR) + failures = json.loads(failures_json.decode()) + return entities_json, failures + + except aiohttp.ClientError as err: + # Handle server error + logger.error("Failed to get {} ({})".format(url, err)) + raise HermesError(host=host, message=str(err)) + + async def _call_local_source(self): + """ A wrapper method for retrieving local resource entities and failures. + It awaits awaitable if needed and add host information to failures. + + Returns: + A tuple (list of resource entities, list of failures). + """ + local = self.local_source() + if inspect.isawaitable(local): + entities, failures = await local + else: + entities, failures = local + failures = [{'host': self.private_ip, 'message': message} + for message in failures] + return entities, failures + + +processes = ResourceHandler( + default_ips_getter=appscale_info.get_all_ips, + resource_name='processes', + local_source=process.list_resource +) diff --git a/Hermes/setup.py b/Hermes/setup.py index 6ea60cccb8..3fae13b2bf 100644 --- a/Hermes/setup.py +++ b/Hermes/setup.py @@ -2,7 +2,7 @@ setup( name='appscale-hermes', - version='0.4.0', + version='0.5.0', description='AppScale module which provides statistics API.', author='AppScale Systems, Inc.', url='https://github.com/AppScale/appscale', @@ -12,7 +12,7 @@ install_requires=[ 'appscale-common', 'appscale-admin', - 'psutil==5.6.3', + 'psutil==5.6', 'attrs==19.1.0', 'mock==2.0.0', 'aiohttp==2.3.9' @@ -24,9 +24,12 @@ 'Programming Language :: Python :: 3.5' ], namespace_packages=['appscale'], - packages=['appscale', - 'appscale.hermes', - 'appscale.hermes.producers'], + packages=[ + 'appscale', + 'appscale.hermes', + 'appscale.hermes.producers', + 'appscale.hermes.resources' + ], entry_points={'console_scripts': [ 'appscale-hermes=appscale.hermes.hermes_server:main' ]} diff --git a/Hermes/tests/producers/__init__.py b/Hermes/tests/producers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/Hermes/tests/test-data/haproxy-stats-mapping.csv b/Hermes/tests/producers/test-data/haproxy-stats-mapping.csv similarity index 100% rename from Hermes/tests/test-data/haproxy-stats-mapping.csv rename to Hermes/tests/producers/test-data/haproxy-stats-mapping.csv diff --git a/Hermes/tests/test-data/haproxy-stats-v1.4.csv b/Hermes/tests/producers/test-data/haproxy-stats-v1.4.csv similarity index 100% rename from Hermes/tests/test-data/haproxy-stats-v1.4.csv rename to Hermes/tests/producers/test-data/haproxy-stats-v1.4.csv diff --git a/Hermes/tests/test-data/haproxy-stats-v1.5.csv b/Hermes/tests/producers/test-data/haproxy-stats-v1.5.csv similarity index 100% rename from Hermes/tests/test-data/haproxy-stats-v1.5.csv rename to Hermes/tests/producers/test-data/haproxy-stats-v1.5.csv diff --git a/Hermes/tests/test-data/node-stats.json b/Hermes/tests/producers/test-data/node-stats.json similarity index 100% rename from Hermes/tests/test-data/node-stats.json rename to Hermes/tests/producers/test-data/node-stats.json diff --git a/Hermes/tests/test-data/processes-stats.json b/Hermes/tests/producers/test-data/processes-stats.json similarity index 100% rename from Hermes/tests/test-data/processes-stats.json rename to Hermes/tests/producers/test-data/processes-stats.json diff --git a/Hermes/tests/test-data/proxies-stats.json b/Hermes/tests/producers/test-data/proxies-stats.json similarity index 100% rename from Hermes/tests/test-data/proxies-stats.json rename to Hermes/tests/producers/test-data/proxies-stats.json diff --git a/Hermes/tests/test-data/taskqueue-stats.json b/Hermes/tests/producers/test-data/taskqueue-stats.json similarity index 100% rename from Hermes/tests/test-data/taskqueue-stats.json rename to Hermes/tests/producers/test-data/taskqueue-stats.json diff --git a/Hermes/tests/test_cassandra.py b/Hermes/tests/producers/test_cassandra.py similarity index 100% rename from Hermes/tests/test_cassandra.py rename to Hermes/tests/producers/test_cassandra.py diff --git a/Hermes/tests/test_cluster_stats.py b/Hermes/tests/producers/test_cluster_stats.py similarity index 100% rename from Hermes/tests/test_cluster_stats.py rename to Hermes/tests/producers/test_cluster_stats.py diff --git a/Hermes/tests/test_node.py b/Hermes/tests/producers/test_node.py similarity index 100% rename from Hermes/tests/test_node.py rename to Hermes/tests/producers/test_node.py diff --git a/Hermes/tests/test_process.py b/Hermes/tests/producers/test_process.py similarity index 100% rename from Hermes/tests/test_process.py rename to Hermes/tests/producers/test_process.py diff --git a/Hermes/tests/test_proxy.py b/Hermes/tests/producers/test_proxy.py similarity index 100% rename from Hermes/tests/test_proxy.py rename to Hermes/tests/producers/test_proxy.py diff --git a/Hermes/tests/test_taskqueue.py b/Hermes/tests/producers/test_taskqueue.py similarity index 100% rename from Hermes/tests/test_taskqueue.py rename to Hermes/tests/producers/test_taskqueue.py diff --git a/Hermes/tests/resources/__init__.py b/Hermes/tests/resources/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/Hermes/tests/resources/test_process.py b/Hermes/tests/resources/test_process.py new file mode 100644 index 0000000000..2737fa9fbd --- /dev/null +++ b/Hermes/tests/resources/test_process.py @@ -0,0 +1,212 @@ +import asyncio + +import psutil +import pytest +from mock import patch, MagicMock + +from appscale.hermes.resources import process + + +def future(value=None): + future_obj = asyncio.Future() + future_obj.set_result(value) + return future_obj + + +# cat /lib/systemd/system/appscale-*.target +# | grep -E "^After=.*\.service$" | cut -d "=" -f 2 +APPSCALE_TARGETS = b""" +ejabberd.service +nginx.service +rabbitmq-server.service +zookeeper.service +""" + +# systemctl --no-legend list-units "appscale-*.service" | cut -d " " -f 1 +APPSCALE_SERVICES = b""" +appscale-blobstore.service +appscale-cassandra.service +appscale-controller.service +appscale-groomer.service +appscale-haproxy@app.service +appscale-haproxy@service.service +appscale-hermes.service +appscale-infrastructure@basic.service +appscale-instance-manager.service +appscale-instance-run@testapp_mod1_v1_1570022208920-20000.service +appscale-memcached.service +appscale-transaction-groomer.service +appscale-uaserver.service +""" + +# systemctl show --property MainPID --value +SERVICE_PID_MAP = { + 'ejabberd.service': b'9021', + 'nginx.service': b'9022', + 'rabbitmq-server.service': b'9023', + 'zookeeper.service': b'9024', + 'appscale-blobstore.service': b'10025', + 'appscale-cassandra.service': b'10026', + 'appscale-controller.service': b'10027', + 'appscale-groomer.service': b'10028', + 'appscale-haproxy@app.service': b'10029', + 'appscale-haproxy@service.service': b'10030', + 'appscale-hermes.service': b'10031', + 'appscale-infrastructure@basic.service': b'10032', + 'appscale-instance-manager.service': b'10033', + 'appscale-instance-run@testapp_mod1_v1_1570022208920-20000.service': b'10034', + 'appscale-memcached.service': b'10035', + 'appscale-transaction-groomer.service': b'10036', + 'appscale-uaserver.service': b'10037', +} + +# for slice in /sys/fs/cgroup/systemd/appscale.slice/appscale-*.slice/; do +# sed -e "s|\$| ${slice}|" ${slice}/cgroup.procs +# done +APPSCALE_SLICE_PIDS = b""" +11038 /sys/fs/cgroup/systemd/appscale.slice/appscale-datastore.slice/ +11039 /sys/fs/cgroup/systemd/appscale.slice/appscale-datastore.slice/ +11040 /sys/fs/cgroup/systemd/appscale.slice/appscale-search.slice/ +""" + + +@pytest.mark.asyncio +async def test_get_known_processes(): + subprocess_mocks = [] + + # Mock `cat /lib/systemd/system/appscale-*.target | ...` output + targets_mock = MagicMock(returncode=0) + stdout, stderr = APPSCALE_TARGETS, b'' + targets_mock.communicate.return_value = future((stdout, stderr)) + subprocess_mocks.append(('cat /lib/systemd/', targets_mock)) + + # Mock `systemctl --no-legend list-units "appscale-*.service" | ...` output + list_units_mock = MagicMock(returncode=0) + stdout, stderr = APPSCALE_SERVICES, b'' + list_units_mock.communicate.return_value = future((stdout, stderr)) + subprocess_mocks.append(('systemctl --no-legend list-units', list_units_mock)) + + # Mock `systemctl show --property MainPID --value ` output + for service, pid in SERVICE_PID_MAP.items(): + show_mainpid_mock = MagicMock(returncode=0) + stdout, stderr = pid, b'' + show_mainpid_mock.communicate.return_value = future((stdout, stderr)) + subprocess_mocks.append(('--value {}'.format(service), show_mainpid_mock)) + + # Mock `for slice in /sys/fs/cgroup/systemd/appscale.slice/... ; do...` output + appscale_slice_pids_mock = MagicMock(returncode=0) + stdout, stderr = APPSCALE_SLICE_PIDS, b'' + appscale_slice_pids_mock.communicate.return_value = future((stdout, stderr)) + subprocess_mocks.append(('for slice in /sys/fs/', appscale_slice_pids_mock)) + + def fake_subprocess_shell(command, **kwargs): + for matcher, command_mock in subprocess_mocks: + if matcher in command: + return future(command_mock) + assert False, 'Unexpected command "{}"'.format(command) + + subprocess_patcher = patch( + 'asyncio.create_subprocess_shell', + side_effect=fake_subprocess_shell + ) + + # ^^^ ALL INPUTS ARE SPECIFIED (or mocked) ^^^ + with subprocess_patcher: + # Calling method under test + known_processes = await process.get_known_processes() + + # ASSERTING EXPECTATIONS + assert known_processes == { + 9021: ['appscale', 'ejabberd'], + 9022: ['appscale', 'nginx'], + 9023: ['appscale', 'rabbitmq-server'], + 9024: ['appscale', 'zookeeper'], + 10025: ['appscale', 'blobstore'], + 10026: ['appscale', 'cassandra'], + 10027: ['appscale', 'controller'], + 10028: ['appscale', 'groomer'], + 10029: ['appscale', 'haproxy', '_app'], + 10030: ['appscale', 'haproxy', '_service'], + 10031: ['appscale', 'hermes'], + 10032: ['appscale', 'infrastructure', '_basic'], + 10033: ['appscale', 'instance-manager'], + 10034: ['appscale', 'instance-run', '_testapp', '_mod1', '_v1', '_1570022208920-20000'], + 10035: ['appscale', 'memcached'], + 10036: ['appscale', 'transaction-groomer'], + 10037: ['appscale', 'uaserver'], + 11038: ['appscale', 'datastore'], + 11039: ['appscale', 'datastore'], + 11040: ['appscale', 'search'], + } + + +@pytest.mark.asyncio +async def test_init_process_info(): + # Get info about current process + psutil_process = psutil.Process() + proc_info = psutil_process.as_dict(process.PROCESS_ATTRS) + psutil_process.info = proc_info + + my_pid = psutil_process.pid + # Call function under test + process_ = process.init_process_info(psutil_process, {my_pid: ['test-tag']}) + + # Check if attributes were assigned properly + assert process_.pid == proc_info['pid'] + assert process_.ppid == proc_info['ppid'] + assert process_.create_time == proc_info['create_time'] + assert process_.status == proc_info['status'] + assert process_.username == proc_info['username'] + assert process_.cwd == proc_info['cwd'] + assert process_.name == proc_info['name'] + assert process_.exe == proc_info['exe'] + assert process_.cmdline == proc_info['cmdline'] + assert process_.own_tags == ['test-tag'] + assert process_.all_tags == ['test-tag'] + assert process_.cpu_user == proc_info['cpu_times'].user + assert process_.cpu_system == proc_info['cpu_times'].system + assert process_.cpu_user_1h_diff is None + assert process_.cpu_system_1h_diff is None + assert process_.cpu_percent == proc_info['cpu_percent'] + assert process_.memory_resident == proc_info['memory_info'].rss + assert process_.memory_virtual == proc_info['memory_info'].vms + assert process_.memory_shared == proc_info['memory_info'].shared + assert process_.disk_io_read_count == proc_info['io_counters'].read_count + assert process_.disk_io_write_count == proc_info['io_counters'].write_count + assert process_.disk_io_read_bytes == proc_info['io_counters'].read_bytes + assert process_.disk_io_write_bytes == proc_info['io_counters'].write_bytes + assert process_.disk_io_read_count_1h_diff is None + assert process_.disk_io_write_count_1h_diff is None + assert process_.disk_io_read_bytes_1h_diff is None + assert process_.disk_io_write_bytes_1h_diff is None + assert process_.threads_num == proc_info['num_threads'] + assert process_.file_descriptors_num == proc_info['num_fds'] + assert ( + process_.ctx_switches_voluntary + == proc_info['num_ctx_switches'].voluntary + ) + assert ( + process_.ctx_switches_involuntary + == proc_info['num_ctx_switches'].involuntary + ) + assert process_.ctx_switches_voluntary_1h_diff is None + assert process_.ctx_switches_involuntary_1h_diff is None + + +@pytest.mark.asyncio +async def test_list_processes(): + # Mock `monit status` with empty output + monit_mock = MagicMock(returncode=0) + monit_mock.communicate.return_value = future((b'', b'')) + # Mock `systemctl status solr.service` with empty output + systemctl_mock = MagicMock(returncode=0) + systemctl_mock.communicate.return_value = future((b'', b'')) + # Mock ServiceManager.get_state result + state_mock = [ + MagicMock(process=MagicMock(pid=9850), type='datastore'), + MagicMock(process=MagicMock(pid=9851), type='datastore'), + ] + # Mock psutil.process_iter + fake_processes = [ + MagicMock() + ] diff --git a/Hermes/tests/resources/test_resource_handler.py b/Hermes/tests/resources/test_resource_handler.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/SearchService2/solr-management/solr.service b/SearchService2/solr-management/appscale-solr.service similarity index 94% rename from SearchService2/solr-management/solr.service rename to SearchService2/solr-management/appscale-solr.service index c2da9de983..3217724654 100644 --- a/SearchService2/solr-management/solr.service +++ b/SearchService2/solr-management/appscale-solr.service @@ -1,4 +1,4 @@ -# /etc/systemd/system/solr.service +# /etc/systemd/system/appscale-solr.service [Unit] Description=Apache SOLR After=syslog.target network.target remote-fs.target nss-lookup.target diff --git a/SearchService2/solr-management/ensure_solr_running.sh b/SearchService2/solr-management/ensure_solr_running.sh index 6554e0d886..086bbfb460 100755 --- a/SearchService2/solr-management/ensure_solr_running.sh +++ b/SearchService2/solr-management/ensure_solr_running.sh @@ -63,24 +63,24 @@ export PRIVATE_IP envsubst '$SOLR_HEAP $ZK_HOST $PRIVATE_IP' \ < "${SOLR_MANAGEMENT_DIR}/solr.in.sh" > "/tmp/solr.in.sh" envsubst '$MEMORY_LOW $MEMORY_HIGH $MEMORY_MAX'\ - < "${SOLR_MANAGEMENT_DIR}/solr.service" > "/tmp/solr.service" + < "${SOLR_MANAGEMENT_DIR}/appscale-solr.service" > "/tmp/appscale-solr.service" if cmp -s "/tmp/solr.in.sh" "/etc/default/solr.in.sh" \ -&& cmp -s "/tmp/solr.service" "/etc/systemd/system/solr.service" +&& cmp -s "/tmp/appscale-solr.service" "/etc/systemd/system/appscale-solr.service" then echo "/etc/default/solr.in.sh has no changes." - echo "/etc/systemd/system/solr.service has no changes." + echo "/etc/systemd/system/appscale-solr.service has no changes." echo "Making sure Solr is running." - sudo systemctl enable solr - sudo systemctl start solr + sudo systemctl enable appscale-solr + sudo systemctl start appscale-solr else echo "Copying new solr.in.sh to /etc/default/solr.in.sh" sudo cp "/tmp/solr.in.sh" "/etc/default/solr.in.sh" - echo "Copying new solr.service to /etc/systemd/system/solr.service" - sudo cp "/tmp/solr.service" "/etc/systemd/system/solr.service" + echo "Copying new solr.service to /etc/systemd/system/appscale-solr.service" + sudo cp "/tmp/appscale-solr.service" "/etc/systemd/system/appscale-solr.service" echo "Making sure Solr is restarted." sudo systemctl daemon-reload - sudo systemctl enable solr - sudo systemctl restart solr + sudo systemctl enable appscale-solr + sudo systemctl restart appscale-solr fi echo "Making sure appscale-specific config set is uploaded to zookeeper."