From 33008204dd56ec038ab5725615c9bfda0a685846 Mon Sep 17 00:00:00 2001 From: Andrey Pokhilko Date: Mon, 25 Oct 2021 16:19:06 +0300 Subject: [PATCH] SIGHUP handler (#154) * Implement signal handler * flake8 * Fix the keep-alive issue * Release 0.13.17 --- docs/Changelog.md | 6 ++- docs/index.md | 2 + mockintosh/__init__.py | 28 ++++++---- mockintosh/management.py | 44 +++++----------- mockintosh/res/version.txt | 2 +- mockintosh/servers.py | 54 +++++++++++++++++--- mockintosh/services/asynchronous/_looping.py | 16 +++++- 7 files changed, 101 insertions(+), 51 deletions(-) diff --git a/docs/Changelog.md b/docs/Changelog.md index 00daf68a6..0e517a4d3 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -1,5 +1,9 @@ # Changelog +## v0.13.17 - 2021-10-25 + +- handle SUGHUP to reload configuration from disk and restart + ## v0.13.16 - 2021-10-08 - add Windows Installer to release assets @@ -181,8 +185,6 @@ - allow enabling multiple tags + allow response to trigger tag up/down => state machine for complex scenarios ## Other -- Have SIGHUP handler - - Nicer logging of requests, with special option to enable it. - Nicer formatted error pages for known errors, explaining the problem diff --git a/docs/index.md b/docs/index.md index 52d88dd88..3225b5686 100644 --- a/docs/index.md +++ b/docs/index.md @@ -169,6 +169,8 @@ enabled in startup time, e.g. `mockintosh --enable-tags first,second` Using `--sample-config` will cause Mockintosh to write the example configuration file into specified location. +_Note: sending SIGHUP to Mockintosh's process will cause it to re-read configuration file and restart the server._ + ### OpenAPI Specification to Mockintosh Config Conversion (_experimental_) _Note: This feature is experimental. One-to-one transpilation of OAS documents is not guaranteed._ diff --git a/mockintosh/__init__.py b/mockintosh/__init__.py index 4562c49bb..214f2de48 100644 --- a/mockintosh/__init__.py +++ b/mockintosh/__init__.py @@ -90,7 +90,7 @@ def run( tags: list = [], load_override: Union[dict, None] = None ): - queue, _ = start_render_queue() + queue, render_thread = start_render_queue() if address: # pragma: no cover logging.info('Bind address: %s', address) @@ -111,8 +111,22 @@ def run( logging.exception('Mock server loading error:') with _nostderr(): raise + + prev_handler = signal.getsignal(signal.SIGHUP) + do_restart = [False] # mutable + + def sighup_handler(num, frame): + logging.info("Received SIGHUP") + http_server.stop() + render_thread.kill() + signal.signal(signal.SIGHUP, prev_handler) + do_restart[0] = True + + signal.signal(signal.SIGHUP, sighup_handler) http_server.run() + return do_restart[0] + def _gracefully_exit(num, frame): atexit._run_exitfuncs() @@ -271,15 +285,9 @@ def initiate(argv=None): logging.info("%s v%s is starting...", PROGRAM.capitalize(), __version__) if not cov_no_run: # pragma: no cover - run( - source, - debug=debug_mode, - interceptors=interceptors, - address=address, - services_list=services_list, - tags=tags, - load_override=load_override - ) + while run(source, debug=debug_mode, interceptors=interceptors, address=address, + services_list=services_list, tags=tags, load_override=load_override): + logging.info("Restarting...") def demo_run(): diff --git a/mockintosh/management.py b/mockintosh/management.py index 2ca998c09..573cf0603 100644 --- a/mockintosh/management.py +++ b/mockintosh/management.py @@ -15,12 +15,14 @@ import threading from typing import ( Union, - Tuple + Tuple, Optional, Awaitable ) from collections import OrderedDict from urllib.parse import parse_qs, unquote import yaml +import yaml.scanner +import yaml.parser from yaml.representer import Representer import jsonschema import tornado.web @@ -29,13 +31,8 @@ import mockintosh from mockintosh.constants import PROGRAM -from mockintosh.config import ( - ConfigService, - ConfigExternalFilePath -) -from mockintosh.services.http import ( - HttpService -) +from mockintosh.config import ConfigExternalFilePath +from mockintosh.services.http import HttpService from mockintosh.builders import ConfigRootBuilder from mockintosh.handlers import GenericHandler from mockintosh.helpers import _safe_path_split, _b64encode, _urlsplit @@ -46,14 +43,8 @@ AsyncProducerDatasetLoopEnd, InternalResourcePathCheckError ) -from mockintosh.services.asynchronous import ( - AsyncService, - AsyncActor, - AsyncProducer, - AsyncConsumer, - AsyncConsumerGroup -) -from mockintosh.services.asynchronous._looping import run_loops as async_run_loops +from mockintosh.services.asynchronous import AsyncService, AsyncProducer, AsyncConsumer +from mockintosh.services.asynchronous._looping import run_loops as async_run_loops, stop_loops from mockintosh.replicas import Request, Response POST_CONFIG_RESTRICTED_FIELDS = ('port', 'hostname', 'ssl', 'sslCertFile', 'sslKeyFile') @@ -109,7 +100,6 @@ def _reset_iterators(app): class ManagementBaseHandler(tornado.web.RequestHandler): - def write(self, chunk: Union[str, bytes, dict]) -> None: if self._finished: # pragma: no cover raise RuntimeError("Cannot write() after finish()") @@ -131,6 +121,9 @@ def _log(self) -> None: if logging.DEBUG >= logging.root.level: self.application.log_request(self) + def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]: + pass + class ManagementRootHandler(ManagementBaseHandler): @@ -166,21 +159,10 @@ async def post(self): if not self.check_restricted_fields(service, i): return - for actor in AsyncActor.actors: - actor.stop = True - - for consumer_group in AsyncConsumerGroup.groups: - consumer_group.stop = True + stop_loops() + self.http_server.clear_lists() definition.stats.services = [] - AsyncService.services = [] - AsyncActor.actors = [] - AsyncProducer.producers = [] - AsyncConsumer.consumers = [] - AsyncConsumerGroup.groups = [] - HttpService.services = [] - ConfigService.services = [] - ConfigExternalFilePath.files = [] definition.services, definition.config_root = definition.analyze(data) for service in HttpService.services: @@ -298,7 +280,7 @@ async def get(self): self.write(self.logs.json()) async def post(self): - enabled = not self.get_body_argument('enable', default=True) in ('false', 'False', '0') + enabled = not self.get_body_argument('enable', default='True') in ('false', 'False', '0') for service in self.logs.services: service.enabled = enabled self.set_status(204) diff --git a/mockintosh/res/version.txt b/mockintosh/res/version.txt index c00d941f4..da252327b 100644 --- a/mockintosh/res/version.txt +++ b/mockintosh/res/version.txt @@ -1 +1 @@ -0.13.16 \ No newline at end of file +0.13.17 \ No newline at end of file diff --git a/mockintosh/servers.py b/mockintosh/servers.py index e1daef445..b8b00b331 100644 --- a/mockintosh/servers.py +++ b/mockintosh/servers.py @@ -22,6 +22,7 @@ import tornado.web from tornado.routing import Rule, RuleRouter, HostMatches +from mockintosh.config import ConfigService, ConfigExternalFilePath from mockintosh.definition import Definition from mockintosh.exceptions import CertificateLoadingError from mockintosh.handlers import GenericHandler @@ -49,7 +50,7 @@ ManagementServiceTagHandler, UnhandledData ) -from mockintosh.services.asynchronous._looping import run_loops as async_run_loops +from mockintosh.services.asynchronous._looping import run_loops as async_run_loops, stop_loops as async_stop_loops from mockintosh.services.http import ( HttpService, HttpPath, @@ -75,8 +76,16 @@ def get_server( def serve(self): raise NotImplementedError + @abstractmethod + def stop(self): + raise NotImplementedError + class TornadoImpl(Impl): + def __init__(self) -> None: + super().__init__() + self.ioloop = None + self.servers = [] def get_server( self, @@ -89,14 +98,32 @@ def get_server( else: server = tornado.web.HTTPServer(router) + self.servers.append(server) return server def serve(self) -> None: + self.ioloop = tornado.ioloop.IOLoop.current() + logging.debug("Starting ioloop: %s", self.ioloop) try: - tornado.ioloop.IOLoop.current().start() + self.ioloop.start() except KeyboardInterrupt: logging.debug("Shutdown: %s", traceback.format_exc()) + logging.debug("IOLoop has completed") + + def stop(self): + logging.debug("Stopping servers...") + for server in self.servers: + logging.debug("Stopping: %s", server) + self.ioloop.add_callback(server.close_all_connections) + server.stop() + + logging.debug("Stopping IOLoop...") + logging.debug("%s", self.ioloop) + self.ioloop.add_callback_from_signal(self.ioloop.stop) + + logging.debug("TornadoImpl is stopped") + class _Listener: def __init__(self, hostname: Union[str, None], port: int, address: Union[str, None]): @@ -191,19 +218,20 @@ def load_service(self, service: HttpService, rules: list, ssl: bool, ssl_options http_path_list, management_root = self.prepare_app(service) app = self.make_app(service, http_path_list, self.globals, debug=self.debug, management_root=management_root) self._apps.apps[service.internal_http_service_id] = app + address_str = self.address if self.address else 'localhost' self._apps.listeners[service.internal_http_service_id] = _Listener( service.hostname, service.port, - self.address if self.address else 'localhost' + address_str ) if service.hostname is None: server = self.impl.get_server(app, ssl, ssl_options) + logging.debug('Will listen: %s:%d', address_str, service.port) server.listen(service.port, address=self.address) - logging.debug('Will listen port number: %d', service.port) self.services_log.append('Serving at %s://%s:%s%s' % ( protocol, - self.address if self.address else 'localhost', + address_str, service.port, ' the mock for %r' % service.get_name_or_empty() )) @@ -250,8 +278,8 @@ def load(self) -> None: if rules: router = RuleRouter(rules) server = self.impl.get_server(router, ssl, ssl_options) + logging.debug('Listening on port: %s:%d', self.address, service.port) server.listen(services[0].port, address=self.address) - logging.debug('Will listen port number: %d', service.port) self.load_management_api() @@ -538,6 +566,7 @@ def load_management_api(self) -> None: ) ) ]) + logging.debug("Listening on port %s:%s", self.address, config_management.port) server = self.impl.get_server(app, ssl, ssl_options) server.listen(config_management.port, address=self.address) self.services_log.append('Serving management UI+API at %s://%s:%s' % ( @@ -545,3 +574,16 @@ def load_management_api(self) -> None: self.address if self.address else 'localhost', config_management.port )) + + def clear_lists(self): + HttpService.services = [] + ConfigService.services = [] + ConfigExternalFilePath.files = [] + + def stop(self): + logging.info("Stopping server...") + self.impl.stop() + logging.debug("Stoppping async actor threads") + async_stop_loops() + self.clear_lists() + logging.debug("Done shutdown") diff --git a/mockintosh/services/asynchronous/_looping.py b/mockintosh/services/asynchronous/_looping.py index f66a43bb9..f6ff45be2 100644 --- a/mockintosh/services/asynchronous/_looping.py +++ b/mockintosh/services/asynchronous/_looping.py @@ -9,7 +9,7 @@ import sys import threading -from mockintosh.services.asynchronous import AsyncService, AsyncConsumerGroup +from mockintosh.services.asynchronous import AsyncService, AsyncConsumerGroup, AsyncActor, AsyncProducer, AsyncConsumer from mockintosh.services.asynchronous.kafka import KafkaConsumerGroup # noqa: F401 from mockintosh.services.asynchronous.amqp import AmqpConsumerGroup # noqa: F401 from mockintosh.services.asynchronous.redis import RedisConsumerGroup # noqa: F401 @@ -49,3 +49,17 @@ def run_loops(): t = threading.Thread(target=consumer_group.consume, args=(), kwargs={}) t.daemon = True t.start() + + +def stop_loops(): + for actor in AsyncActor.actors: + actor.stop = True + + for consumer_group in AsyncConsumerGroup.groups: + consumer_group.stop = True + + AsyncService.services = [] + AsyncActor.actors = [] + AsyncProducer.producers = [] + AsyncConsumer.consumers = [] + AsyncConsumerGroup.groups = []