Skip to content

Commit

Permalink
Merge pull request #4179 from StackStorm/sensor_container_single_sens…
Browse files Browse the repository at this point in the history
…or_mode

Add single sensor mode to the sensor container service / process
  • Loading branch information
Kami authored Jun 20, 2018
2 parents f56a54b + 342a5ac commit da6b04f
Show file tree
Hide file tree
Showing 26 changed files with 270 additions and 73 deletions.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ matrix:
python: 2.7
- env: TASK="compilepy3 ci-py3-unit" CACHE_NAME=py3
python: 3.6
- env: TASK="ci-py3-integration" CACHE_NAME=py3
python: 3.6
addons:
apt:
sources:
Expand Down Expand Up @@ -55,7 +57,7 @@ before_install:
- sudo pip install --upgrade "virtualenv==15.1.0"

install:
- if [ "${TASK}" = 'compilepy3 ci-py3-unit' ]; then pip install "tox==3.0.0"; else make requirements; fi
- if [ "${TASK}" = 'compilepy3 ci-py3-unit' ] || [ "${TASK}" = 'ci-py3-integration' ]; then pip install "tox==3.0.0"; else make requirements; fi
- if [ "${TASK}" = 'ci-unit' ] || [ "${TASK}" = 'ci-integration' ]; then pip install codecov; fi
- if [ "${TASK}" = 'ci-unit' ] || [ "${TASK}" = 'ci-integration' ] || [ "${TASK}" = 'compilepy3 ci-py3-unit' ]; then sudo .circle/add-itest-user.sh; fi

Expand Down
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,14 @@ ci-py3-unit:
@echo
@echo "==================== ci-py3-unit ===================="
@echo
tox -e py36 -vv
tox -e py36-unit -vv

.PHONY: ci-py3-integration
ci-py3-integration:
@echo
@echo "==================== ci-py3-integration ===================="
@echo
tox -e py36-integration -vv

.PHONY: .rst-check
.rst-check:
Expand Down
2 changes: 2 additions & 0 deletions conf/st2.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ draft = http://json-schema.org/draft-04/schema#
[sensorcontainer]
# Provider of sensor node partition config.
partition_provider = {'name': 'default'}
# Run in a single sensor mode where parent process exits when a sensor crashes / dies. This is useful in environments where partitioning, sensor process life cycle and failover is handled by a 3rd party service such as kubernetes.
single_sensor_mode = False
# location of the logging.conf file
logging = conf/logging.sensorcontainer.conf
# name of the sensor node.
Expand Down
1 change: 1 addition & 0 deletions conf/st2.tests.conf
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ sleep_delay = 0.1
[content]
system_packs_base_path =
packs_base_paths = st2tests/st2tests/fixtures/packs/
system_runners_base_path = contrib/runners

[syslog]
host = 127.0.0.1
Expand Down
1 change: 1 addition & 0 deletions conf/st2.tests1.conf
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ base_path = /tmp
[content]
system_packs_base_path =
packs_base_paths = st2tests/st2tests/fixtures/packs_1/
system_runners_base_path = contrib/runners

[syslog]
host = 127.0.0.1
Expand Down
Empty file removed contrib/hello_st2/config.yaml
Empty file.
9 changes: 6 additions & 3 deletions st2common/st2common/constants/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__all__ = ['SCHEDULER_ENABLED_LOG_LINE', 'SCHEDULER_DISABLED_LOG_LINE']
__all__ = [
'SCHEDULER_ENABLED_LOG_LINE',
'SCHEDULER_DISABLED_LOG_LINE'
]


# Integration tests look for these loglines to validate scheduler enable/disable
SCHEDULER_ENABLED_LOG_LINE = 'Scheduler is enabled.'
SCHEDULER_DISABLED_LOG_LINE = 'Scheduler is disabled.'
SCHEDULER_ENABLED_LOG_LINE = b'Scheduler is enabled.'
SCHEDULER_DISABLED_LOG_LINE = b'Scheduler is disabled.'
5 changes: 5 additions & 0 deletions st2common/st2common/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,11 @@ def __call__(self, req):
detail = 'Failed to parse request body: %s' % str(e)
raise exc.HTTPBadRequest(detail=detail)

# Special case for Python 3
if six.PY3 and content_type == 'text/plain' and isinstance(data, six.binary_type):
# Convert bytes to text type (string / unicode)
data = data.decode('utf-8')

try:
CustomValidator(schema, resolver=self.spec_resolver).validate(data)
except (jsonschema.ValidationError, ValueError) as e:
Expand Down
15 changes: 9 additions & 6 deletions st2common/tests/integration/test_register_content_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
# limitations under the License.

from __future__ import absolute_import

import os
import sys
import glob

from st2tests.base import IntegrationTestCase
Expand All @@ -27,7 +29,7 @@
SCRIPT_PATH = os.path.join(BASE_DIR, '../../bin/st2-register-content')
SCRIPT_PATH = os.path.abspath(SCRIPT_PATH)

BASE_CMD_ARGS = [SCRIPT_PATH, '--config-file=conf/st2.tests.conf', '-v']
BASE_CMD_ARGS = [sys.executable, SCRIPT_PATH, '--config-file=conf/st2.tests.conf', '-v']
BASE_REGISTER_ACTIONS_CMD_ARGS = BASE_CMD_ARGS + ['--register-actions']

PACKS_PATH = get_fixtures_packs_base_path()
Expand Down Expand Up @@ -111,13 +113,14 @@ def test_register_from_packs_doesnt_throw_on_missing_pack_resource_folder(self):

# Note: We want to use a different config which sets fixtures/packs_1/
# dir as packs_base_paths
cmd = [SCRIPT_PATH, '--config-file=conf/st2.tests1.conf', '-v', '--register-sensors']
cmd = [sys.executable, SCRIPT_PATH, '--config-file=conf/st2.tests1.conf', '-v',
'--register-sensors']
exit_code, _, stderr = run_command(cmd=cmd)
self.assertTrue('Registered 0 sensors.' in stderr)
self.assertTrue('Registered 0 sensors.' in stderr, 'Actual stderr: %s' % (stderr))
self.assertEqual(exit_code, 0)

cmd = [SCRIPT_PATH, '--config-file=conf/st2.tests1.conf', '-v', '--register-all',
'--register-no-fail-on-failure']
cmd = [sys.executable, SCRIPT_PATH, '--config-file=conf/st2.tests1.conf', '-v',
'--register-all', '--register-no-fail-on-failure']
exit_code, _, stderr = run_command(cmd=cmd)
self.assertTrue('Registered 0 actions.' in stderr)
self.assertTrue('Registered 0 sensors.' in stderr)
Expand All @@ -129,7 +132,7 @@ def test_register_all_and_register_setup_virtualenvs(self):
cmd = BASE_CMD_ARGS + ['--register-all', '--register-setup-virtualenvs',
'--register-no-fail-on-failure']
exit_code, stdout, stderr = run_command(cmd=cmd)
self.assertTrue('Registering actions' in stderr)
self.assertTrue('Registering actions' in stderr, 'Actual stderr: %s' % (stderr))
self.assertTrue('Registering rules' in stderr)
self.assertTrue('Setup virtualenv for %s pack(s)' % (PACKS_COUNT) in stderr)
self.assertEqual(exit_code, 0)
Expand Down
9 changes: 7 additions & 2 deletions st2debug/st2debug/cmd/submit_debug_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ def create_archive(self):

# Prepend temp_dir_path to OUTPUT_PATHS
output_paths = {}
for key, path in OUTPUT_PATHS.iteritems():
for key, path in six.iteritems(OUTPUT_PATHS):
output_paths[key] = os.path.join(self._temp_dir_path, path)

# 2. Moves all the files to the temporary directory
Expand Down Expand Up @@ -489,7 +489,12 @@ def format_output_filename(cmd):
:return: Formatted filename.
:rtype: ``str``
"""
return cmd.translate(None, """ !@#$%^&*()[]{};:,./<>?\|`~=+"'""")
if six.PY3:
cmd = cmd.translate(cmd.maketrans('', '', """ !@#$%^&*()[]{};:,./<>?\|`~=+"'"""))
else:
cmd = cmd.translate(None, """ !@#$%^&*()[]{};:,./<>?\|`~=+"'""")

return cmd

@staticmethod
def get_system_information():
Expand Down
3 changes: 0 additions & 3 deletions st2debug/tests/integration/fixtures/configs/st2.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ logging = st2api/conf/logging.conf
username = ponies
password = ponies

[messaging]
url = ponies

[sensorcontainer]
logging = st2reactor/conf/logging.sensorcontainer.conf

Expand Down
4 changes: 2 additions & 2 deletions st2exporter/st2exporter/exporter/dumper.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
# limitations under the License.

import os
import Queue

import eventlet
from six.moves import queue

from st2common import log as logging
from st2exporter.exporter.file_writer import TextFileWriter
Expand Down Expand Up @@ -94,7 +94,7 @@ def _get_batch(self):
for _ in range(self._batch_size):
try:
item = self._queue.get(block=False)
except Queue.Empty:
except queue.Empty:
break
else:
executions_to_write.append(item)
Expand Down
5 changes: 2 additions & 3 deletions st2exporter/st2exporter/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import Queue

import eventlet
from six.moves import queue
from kombu import Connection
from oslo_config import cfg

Expand Down Expand Up @@ -47,7 +46,7 @@ class ExecutionsExporter(consumers.MessageHandler):

def __init__(self, connection, queues):
super(ExecutionsExporter, self).__init__(connection, queues)
self.pending_executions = Queue.Queue()
self.pending_executions = queue.Queue()
self._dumper = Dumper(queue=self.pending_executions,
export_dir=cfg.CONF.exporter.dump_dir)
self._consumer_thread = None
Expand Down
5 changes: 3 additions & 2 deletions st2exporter/tests/integration/test_dumper_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@

import datetime
import os
import Queue

import mock
import six

from six.moves import queue

from st2common.models.api.execution import ActionExecutionAPI
from st2common.persistence.marker import DumperMarker
from st2common.util import isotime
Expand Down Expand Up @@ -47,7 +48,7 @@ class TestDumper(DbTestCase):
execution_apis.append(ActionExecutionAPI(**execution))

def get_queue(self):
executions_queue = Queue.Queue()
executions_queue = queue.Queue()

for execution in self.execution_apis:
executions_queue.put(execution)
Expand Down
2 changes: 1 addition & 1 deletion st2exporter/tests/integration/test_export_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def test_bootstrap(self):

@mock.patch.object(os.path, 'exists', mock.MagicMock(return_value=True))
def test_process(self):
some_execution = self.saved_executions.values()[5]
some_execution = list(self.saved_executions.values())[5]
exec_exporter = ExecutionsExporter(None, None)
self.assertEqual(exec_exporter.pending_executions.qsize(), 0)
exec_exporter.process(some_execution)
Expand Down
6 changes: 3 additions & 3 deletions st2exporter/tests/unit/test_dumper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@

import datetime
import os
import Queue

import eventlet
import mock
from six.moves import queue

from st2common.models.api.execution import ActionExecutionAPI
from st2common.util import isotime
Expand Down Expand Up @@ -48,7 +48,7 @@ class TestDumper(EventletTestCase):
execution_apis.append(ActionExecutionAPI(**execution))

def get_queue(self):
executions_queue = Queue.Queue()
executions_queue = queue.Queue()

for execution in self.execution_apis:
executions_queue.put(execution)
Expand Down Expand Up @@ -88,7 +88,7 @@ def test_get_file_name(self):

@mock.patch.object(os.path, 'exists', mock.MagicMock(return_value=True))
def test_write_to_disk_empty_queue(self):
dumper = Dumper(queue=Queue.Queue(),
dumper = Dumper(queue=queue.Queue(),
export_dir='/tmp',
file_prefix='st2-stuff-', file_format='json')
# We just make sure this doesn't blow up.
Expand Down
2 changes: 1 addition & 1 deletion st2exporter/tests/unit/test_json_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class TestJsonConverter(unittest2.TestCase):
fixtures_dict=DESCENDANTS_FIXTURES)

def test_convert(self):
executions_list = self.loaded_fixtures['executions'].values()
executions_list = list(self.loaded_fixtures['executions'].values())
converter = JsonConverter()
converted_doc = converter.convert(executions_list)
self.assertTrue(type(converted_doc), 'string')
Expand Down
14 changes: 13 additions & 1 deletion st2reactor/st2reactor/cmd/sensormanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
# limitations under the License.

from __future__ import absolute_import

import os
import sys

from oslo_config import cfg

from st2common import log as logging
from st2common.logging.misc import get_logger_name_for_module
from st2common.service_setup import setup as common_setup
Expand Down Expand Up @@ -50,8 +53,17 @@ def _teardown():
def main():
try:
_setup()

single_sensor_mode = (cfg.CONF.single_sensor_mode or
cfg.CONF.sensorcontainer.single_sensor_mode)

if single_sensor_mode and not cfg.CONF.sensor_ref:
raise ValueError('--sensor-ref argument must be provided when running in single '
'sensor mode')

sensors_partitioner = get_sensors_partitioner()
container_manager = SensorContainerManager(sensors_partitioner=sensors_partitioner)
container_manager = SensorContainerManager(sensors_partitioner=sensors_partitioner,
single_sensor_mode=single_sensor_mode)
return container_manager.run_sensors()
except SystemExit as exit_code:
return exit_code
Expand Down
34 changes: 26 additions & 8 deletions st2reactor/st2reactor/container/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,27 @@

LOG = logging.getLogger(__name__)

__all__ = [
'SensorContainerManager'
]


class SensorContainerManager(object):

def __init__(self, sensors_partitioner):
def __init__(self, sensors_partitioner, single_sensor_mode=False):
if not sensors_partitioner:
raise ValueError('sensors_partitioner should be non-None.')

self._sensors_partitioner = sensors_partitioner
self._single_sensor_mode = single_sensor_mode

self._sensor_container = None
self._container_thread = None

self._sensors_watcher = SensorWatcher(create_handler=self._handle_create_sensor,
update_handler=self._handle_update_sensor,
delete_handler=self._handle_delete_sensor,
queue_suffix='sensor_container')
self._container_thread = None
if not sensors_partitioner:
raise ValueError('sensors_partitioner should be non-None.')
self._sensors_partitioner = sensors_partitioner

def run_sensors(self):
"""
Expand All @@ -57,14 +65,22 @@ def run_sensors(self):

LOG.info('(PID:%s) SensorContainer started.', os.getpid())
self._setup_sigterm_handler()
self._spin_container_and_wait(sensors_to_run)

exit_code = self._spin_container_and_wait(sensors_to_run)
return exit_code

def _spin_container_and_wait(self, sensors):
exit_code = 0

try:
self._sensor_container = ProcessSensorContainer(sensors=sensors)
self._sensor_container = ProcessSensorContainer(
sensors=sensors,
single_sensor_mode=self._single_sensor_mode)
self._container_thread = eventlet.spawn(self._sensor_container.run)

LOG.debug('Starting sensor CUD watcher...')
self._sensors_watcher.start()

exit_code = self._container_thread.wait()
LOG.error('Process container quit with exit_code %d.', exit_code)
LOG.error('(PID:%s) SensorContainer stopped.', os.getpid())
Expand All @@ -78,7 +94,9 @@ def _spin_container_and_wait(self, sensors):
eventlet.kill(self._container_thread)
self._container_thread = None

return 0
return exit_code

return exit_code

def _setup_sigterm_handler(self):

Expand Down
Loading

0 comments on commit da6b04f

Please sign in to comment.