Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processes stats v2 and new Hermes API concept #3180

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions AppController/djinn.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3431,8 +3431,8 @@ def start_search2_role
def stop_search2_role
# Stop Solr
Djinn.log_debug('Stopping SOLR on this node.')
Djinn.log_run('systemctl stop solr')
Djinn.log_run('systemctl disable solr')
Djinn.log_run('systemctl stop appscale-solr')
Djinn.log_run('systemctl disable appscale-solr')
Djinn.log_debug('Done stopping SOLR.')
end

Expand Down
15 changes: 3 additions & 12 deletions Hermes/appscale/hermes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@
# Stats which were produce less than X seconds ago is considered as current
ACCEPTABLE_STATS_AGE = 10

# The ZooKeeper location for storing Hermes configurations
NODES_STATS_CONFIGS_NODE = '/appscale/stats/profiling/nodes'
PROCESSES_STATS_CONFIGS_NODE = '/appscale/stats/profiling/processes'
PROXIES_STATS_CONFIGS_NODE = '/appscale/stats/profiling/proxies'


class _MissedValue(object):
"""
Expand All @@ -43,10 +38,6 @@ def __repr__(self):
SECRET_HEADER = 'Appscale-Secret'


class HTTP_Codes(object):
""" A class with HTTP status codes. """
HTTP_OK = 200
HTTP_BAD_REQUEST = 400
HTTP_DENIED = 403
HTTP_INTERNAL_ERROR = 500
HTTP_NOT_IMPLEMENTED = 501
class SubprocessError(Exception):
""" Indicates that subcommand failed. """
pass
14 changes: 9 additions & 5 deletions Hermes/appscale/hermes/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ async def __call__(self, request):
snapshot = await snapshot
self.cached_snapshot = snapshot

return web.json_response(stats_to_dict(snapshot, include_lists))
return web.json_response(stats_to_dict(snapshot, include_lists),
content_type='application/json')


class ClusterStatsHandler:
Expand Down Expand Up @@ -215,10 +216,13 @@ async def __call__(self, request):
for node_ip, snapshot in new_snapshots_dict.items()
}

return web.json_response({
"stats": rendered_snapshots,
"failures": failures
})
return web.json_response(
{
"stats": rendered_snapshots,
"failures": failures
},
content_type='application/json'
)


def not_found(reason):
Expand Down
58 changes: 29 additions & 29 deletions Hermes/appscale/hermes/helper.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,34 @@
""" Helper functions for Hermes operations. """
import errno
import os
import asyncio
import logging

class JSONTags(object):
""" A class containing all JSON tags used for Hermes functionality. """
ALL_STATS = 'all_stats'
BUCKET_NAME = 'bucket_name'
BODY = 'body'
DEPLOYMENT_ID = 'deployment_id'
ERROR = 'error'
OBJECT_NAME = 'object_name'
REASON = 'reason'
STATUS = 'status'
STORAGE = 'storage'
SUCCESS = 'success'
TASK_ID = 'task_id'
TIMESTAMP = 'timestamp'
TYPE = 'type'
UNREACHABLE = 'unreachable'
from appscale.hermes.constants import SubprocessError

def ensure_directory(dir_path):
""" Ensures that the directory exists.
logger = logging.getLogger(__name__)


async def subprocess(command, timeout):
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
logger.debug('Started subprocess `{}` (pid: {})'
.format(command, process.pid))

Args:
dir_path: A str representing the directory path.
"""
try:
os.makedirs(dir_path)
except OSError as os_error:
if os_error.errno == errno.EEXIST and os.path.isdir(dir_path):
pass
else:
raise
# Wait for the subprocess to finish
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout)
except asyncio.TimeoutError:
raise SubprocessError('Timed out waiting for subprocess `{}` (pid: {})'
.format(command, process.pid))

output = stdout.decode()
error = stderr.decode()
if error:
logger.warning(error)
if process.returncode != 0:
raise SubprocessError('Subprocess failed with return code {} ({})'
.format(process.returncode, error))

return output, error
6 changes: 6 additions & 0 deletions Hermes/appscale/hermes/hermes_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from appscale.hermes.producers.rabbitmq_stats import PushQueueStatsSource
from appscale.hermes.producers.rabbitmq_stats import RabbitMQStatsSource
from appscale.hermes.producers.taskqueue_stats import TaskqueueStatsSource
from appscale.hermes.resources.resource_handlers import processes

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -143,12 +144,17 @@ def main():

app = web.Application(middlewares=[verify_secret_middleware])

# Add routes for old style structured statistics.
route_items = []
route_items += get_local_stats_api_routes(is_lb, is_tq, is_db)
route_items += get_cluster_stats_api_routes(is_master)
for route, handler in route_items:
app.router.add_get(route, handler)

# Add routes for new resources API.
app.router.add_get('/v2/processes', processes.list_local)
app.router.add_get('/v2/processes/_cluster', processes.list_cluster)

logger.info("Starting Hermes on port: {}.".format(args.port))
web.run_app(app, port=args.port, access_log=logger,
access_log_format='%a "%r" %s %bB %Tfs "%{User-Agent}i"')
40 changes: 5 additions & 35 deletions Hermes/appscale/hermes/producers/cassandra_stats.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
""" Fetches `nodetool status` info. """
import asyncio
import logging
import re
import time

import attr

from appscale.common import appscale_info
from appscale.hermes import helper
from appscale.hermes.constants import SubprocessError
from appscale.hermes.converter import Meta, include_list_name

# The endpoint used for retrieving queue stats.
Expand All @@ -16,11 +17,6 @@
logger = logging.getLogger(__name__)


class NodetoolStatusError(Exception):
""" Indicates that `nodetool status` command failed. """
pass


@include_list_name('cassandra.node')
@attr.s(cmp=False, hash=False, slots=True, frozen=True)
class CassandraNodeStats(object):
Expand Down Expand Up @@ -99,34 +95,8 @@ async def get_current(cls):
An instance of CassandraStatsSnapshot.
"""
start = time.time()

process = await asyncio.create_subprocess_shell(
NODETOOL_STATUS_COMMAND,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
logger.info('Started subprocess `{}` (pid: {})'
.format(NODETOOL_STATUS_COMMAND, process.pid))

try:
# Wait for the subprocess to finish
stdout, stderr = await asyncio.wait_for(
process.communicate(), NODETOOL_STATUS_TIMEOUT
)
except asyncio.TimeoutError:
raise NodetoolStatusError(
'Timed out waiting for subprocess `{}` (pid: {})'
.format(NODETOOL_STATUS_COMMAND, process.pid)
)

output = stdout.decode()
error = stderr.decode()
if error:
logger.warning(error)
if process.returncode != 0:
raise NodetoolStatusError('Subprocess failed with return code {} ({})'
.format(process.returncode, error))

output, error = await helper.subprocess(NODETOOL_STATUS_COMMAND,
NODETOOL_STATUS_TIMEOUT)
known_db_nodes = set(appscale_info.get_db_ips())
nodes = []
shown_nodes = set()
Expand Down Expand Up @@ -180,7 +150,7 @@ async def get_current(cls):
shown_nodes.add(address)

else:
raise NodetoolStatusError(
raise SubprocessError(
'`{}` output does not contain expected header. Actual output:\n{}'
.format(NODETOOL_STATUS_COMMAND, output)
)
Expand Down
10 changes: 7 additions & 3 deletions Hermes/appscale/hermes/producers/proxy_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,9 +752,13 @@ async def get_current():
proxy_stats_list = []
for haproxy_process_name, info in HAPROXY_PROCESSES.items():
logger.debug("Processing {} haproxy stats".format(haproxy_process_name))
proxy_stats_list += await get_stats_from_one_haproxy(
info['socket'], info['configs'], net_connections
)
try:
proxy_stats_list += await get_stats_from_one_haproxy(
info['socket'], info['configs'], net_connections
)
except IOError as error:
logger.warning("Failed to read {} haproxy stats ({})"
.format(haproxy_process_name, error))

stats = ProxiesStatsSnapshot(
utc_timestamp=time.mktime(datetime.now().timetuple()),
Expand Down
Empty file.
Loading