Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Time function #119

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
Expand Down
7 changes: 4 additions & 3 deletions mite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import random
from pkg_resources import get_distribution, DistributionNotFound

from .scenario import time_function # noqa: F401
from .exceptions import MiteError # noqa: F401
from .context import Context
import mite.utils
Expand All @@ -29,7 +30,7 @@ def test_context(extensions=('http',), **config):
return c


class ensure_separation_from_callable:
class _ensure_separation_from_callable:
def __init__(self, sep_callable, loop=None):
self._sep_callable = sep_callable
self._loop = loop
Expand Down Expand Up @@ -65,7 +66,7 @@ def ensure_fixed_separation(separation, loop=None):
def fixed_separation():
return separation

return ensure_separation_from_callable(fixed_separation, loop=loop)
return _ensure_separation_from_callable(fixed_separation, loop=loop)


def ensure_average_separation(mean_separation, plus_minus=None, loop=None):
Expand All @@ -88,4 +89,4 @@ def ensure_average_separation(mean_separation, plus_minus=None, loop=None):
def average_separation():
return mean_separation + (random.random() * plus_minus * 2) - plus_minus

return ensure_separation_from_callable(average_separation, loop=loop)
return _ensure_separation_from_callable(average_separation, loop=loop)
119 changes: 3 additions & 116 deletions mite/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,21 @@
import os
import sys
import threading
from urllib.request import Request as UrlLibRequest
from urllib.request import urlopen

import docopt
import msgpack
import ujson

import uvloop

from .cli.common import (_create_config_manager, _create_runner,
_create_scenario_manager)
from .cli.common import _create_runner, _create_sender
from .cli.controller import controller
from .cli.duplicator import duplicator
from .cli.stats import stats
from .cli.test import journey_cmd, scenario_cmd
from .collector import Collector
from .controller import Controller
from .har_to_mite import har_convert_to_mite
from .recorder import Recorder
from .utils import _msg_backend_module, spec_import
from .utils import _msg_backend_module
from .web import app, prometheus_metrics


Expand All @@ -98,13 +94,6 @@ def _recorder_receiver(opts):
return receiver


def _create_sender(opts):
socket = opts['--message-socket']
sender = _msg_backend_module(opts).Sender()
sender.connect(socket)
return sender


def _create_prometheus_exporter_receiver(opts):
socket = opts['--stats-out-socket']
receiver = _msg_backend_module(opts).Receiver()
Expand All @@ -117,11 +106,6 @@ def _create_runner_transport(opts):
return _msg_backend_module(opts).RunnerTransport(socket)


def _create_controller_server(opts):
socket = opts['--controller-socket']
return _msg_backend_module(opts).ControllerServer(socket)


logger = logging.getLogger(__name__)


Expand All @@ -147,103 +131,6 @@ def _start_web_in_thread(opts):
t.start()


def _controller_log_start(scenario_spec, logging_url):
if not logging_url.endswith("/"):
logging_url += "/"

# The design decision has been made to do this logging synchronously
# rather than using the usual mite data pipeline, because we want to make
# sure the log is nailed down before we start doing any test activity.
url = logging_url + "start"
logger.info(f"Logging test start to {url}")
resp = urlopen(
UrlLibRequest(
url,
data=ujson.dumps(
{
'testname': scenario_spec,
# TODO: log other properties as well,
# like the endpoint URLs we are
# hitting.
}
).encode(),
method="POST",
)
)
logger.debug("Logging test start complete")
if resp.status == 200:
return ujson.loads(resp.read())['newid']
else:
logger.warning(
f"Could not complete test start logging; status was {resp.status_code}"
)


def _controller_log_end(logging_id, logging_url):
if logging_id is None:
return

if not logging_url.endswith("/"):
logging_url += "/"

url = logging_url + "end"
logger.info(f"Logging test end to {url}")
resp = urlopen(UrlLibRequest(url, data=ujson.dumps({'id': logging_id}).encode()))
if resp.status != 204:
logger.warning(
f"Could not complete test end logging; status was {resp.status_code}"
)
logger.debug("Logging test end complete")


def controller(opts):
config_manager = _create_config_manager(opts)
scenario_spec = opts['SCENARIO_SPEC']
scenarios_fn = spec_import(scenario_spec)
scenario_manager = _create_scenario_manager(opts)
try:
scenarios = scenarios_fn(config_manager)
except TypeError:
scenarios = scenarios_fn()
for journey_spec, datapool, volumemodel in scenarios:
scenario_manager.add_scenario(journey_spec, datapool, volumemodel)
controller = Controller(scenario_spec, scenario_manager, config_manager)
server = _create_controller_server(opts)
sender = _create_sender(opts)
loop = asyncio.get_event_loop()
logging_id = None
logging_url = opts["--logging-webhook"]
if logging_url is None:
try:
logging_url = os.environ["MITE_LOGGING_URL"]
except KeyError:
pass
if logging_url is not None:
logging_id = _controller_log_start(scenario_spec, logging_url)

async def controller_report():
while True:
if controller.should_stop():
return
await asyncio.sleep(1)
controller.report(sender.send)

try:
loop.run_until_complete(
asyncio.gather(
controller_report(), server.run(controller, controller.should_stop)
)
)
except KeyboardInterrupt:
# TODO: kill runners, do other shutdown tasks
logging.info("Received interrupt signal, shutting down")
finally:
_controller_log_end(logging_id, logging_url)
# TODO: cancel all loop tasks? Something must be done to stop this
# from hanging
loop.close()


def runner(opts):
transport = _create_runner_transport(opts)
sender = _create_sender(opts)
Expand Down
12 changes: 10 additions & 2 deletions mite/cli/common.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from ..config import ConfigManager
from ..runner import Runner
from ..scenario import ScenarioManager
from ..utils import spec_import
from ..utils import spec_import, _msg_backend_module


def _create_config_manager(opts):
Expand All @@ -15,8 +15,9 @@ def _create_config_manager(opts):
return config_manager


def _create_scenario_manager(opts):
def _create_scenario_manager(spec, opts):
return ScenarioManager(
spec=spec,
start_delay=float(opts['--delay-start-seconds']),
period=float(opts['--max-loop-delay']),
spawn_rate=int(opts['--spawn-rate']),
Expand All @@ -37,3 +38,10 @@ def _create_runner(opts, transport, msg_sender):
max_work=max_work,
debug=opts['--debugging'],
)


def _create_sender(opts):
socket = opts['--message-socket']
sender = _msg_backend_module(opts).Sender()
sender.connect(socket)
return sender
97 changes: 97 additions & 0 deletions mite/cli/controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import asyncio
import logging
from functools import partial

from ..controller import Controller
from ..utils import _msg_backend_module, spec_import
from .common import _create_config_manager, _create_scenario_manager, _create_sender


def _create_controller_server(opts):
socket = opts['--controller-socket']
return _msg_backend_module(opts).ControllerServer(socket)


async def _run_time_function(time_fn, start_event, end_event):
if time_fn is not None:
await time_fn(start_event, end_event)
if not end_event.is_set():
logging.error(
"The time function exited before the scenario ended, which seems like a bug"
)
else:
start_event.set()
await end_event.wait()


async def _send_controller_report(sender, controller_obj):
while True:
if controller_obj.should_stop():
return
await asyncio.sleep(1)
controller_obj.report(sender.send)


async def _run_controller(server, controller_obj, start_event, end_event):
await start_event.wait()
await server.run(controller_obj, controller_obj.should_stop)
end_event.set()


def _run(
scenario_spec,
opts,
scenarios_fn,
server,
sender,
get_controller=Controller,
extra_tasks=(),
):
config_manager = _create_config_manager(opts)
scenario_manager = _create_scenario_manager(scenario_spec, opts)
try:
scenarios = scenarios_fn(config_manager)
except TypeError:
scenarios = scenarios_fn()
for journey_spec, datapool, volumemodel in scenarios:
scenario_manager.add_scenario(journey_spec, datapool, volumemodel)
controller_object = get_controller(scenario_manager, config_manager)

time_fn = getattr(scenarios_fn, "_mite_time_function", None)
if time_fn is not None:
time_fn = partial(time_fn, scenario_spec, config_manager)

loop = asyncio.get_event_loop()
start_event = asyncio.Event()
end_event = asyncio.Event()

time_fn_task = loop.create_task(_run_time_function(time_fn, start_event, end_event))

try:
loop.run_until_complete(
asyncio.gather(
_send_controller_report(sender, controller_object),
_run_controller(server, controller_object, start_event, end_event),
time_fn_task,
*extra_tasks,
)
)
except KeyboardInterrupt:
# TODO: kill runners, do other shutdown tasks
logging.info("Received interrupt signal, shutting down")
if not end_event.is_set():
time_fn_task.cancel()
loop.run_until_complete(time_fn_task)
finally:
# TODO: cancel all loop tasks? Something must be done to stop this
# from hanging
loop.close()


def controller(opts):
server = _create_controller_server(opts)
sender = _create_sender(opts)
scenario_spec = opts['SCENARIO_SPEC']
scenarios_fn = spec_import(scenario_spec)

_run(scenario_spec, opts, scenarios_fn, server, sender)
Loading