diff --git a/README.md b/README.md index 7e9207da033e8..2a7bc1e2b5143 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,7 @@ make web ## Change Log +* v0.1.16: Minor restructuring and refactoring (create separate kinesis_util.py) * v0.1.14: Fix AWS tokens when creating Elasticsearch client * v0.1.11: Add startup/initialization notification for KCL process * v0.1.10: Bump version of amazon_kclpy to 1.4.1 diff --git a/localstack/utils/common.py b/localstack/utils/common.py index 2c58792f78fe6..80f224e58f153 100644 --- a/localstack/utils/common.py +++ b/localstack/utils/common.py @@ -49,16 +49,17 @@ def stop(self, quiet=False): class ShellCommandThread (FuncThread): - def __init__(self, cmd, params={}, outfile=None, env_vars={}, quiet=True): + def __init__(self, cmd, params={}, outfile=None, env_vars={}, stdin=False, quiet=True): self.cmd = cmd self.process = None self.outfile = outfile + self.stdin = stdin self.env_vars = env_vars FuncThread.__init__(self, self.run_cmd, params, quiet=quiet) def run_cmd(self, params): try: - self.process = run(self.cmd, async=True, outfile=self.outfile, env_vars=self.env_vars) + self.process = run(self.cmd, async=True, stdin=self.stdin, outfile=self.outfile, env_vars=self.env_vars) if self.outfile: self.process.wait() else: diff --git a/localstack/utils/kinesis/kinesis_connector.py b/localstack/utils/kinesis/kinesis_connector.py index cfe2175e8f141..d82a938dfdd84 100755 --- a/localstack/utils/kinesis/kinesis_connector.py +++ b/localstack/utils/kinesis/kinesis_connector.py @@ -17,6 +17,7 @@ from sh import tail from localstack.utils.common import * from localstack.utils.kinesis import kclipy_helper +from localstack.utils.kinesis.kinesis_util import EventFileReaderThread from localstack.constants import * from localstack.utils.common import ShellCommandThread, FuncThread from localstack.utils.aws import aws_stack @@ -176,49 +177,6 @@ def stop(self, quiet=True): self.running = False -class EventFileReaderThread(FuncThread): - def __init__(self, events_file, callback, ready_mutex=None): - FuncThread.__init__(self, self.retrieve_loop, None) - self.running = True - self.events_file = events_file - self.callback = callback - self.ready_mutex = ready_mutex - - def retrieve_loop(self, params): - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - sock.bind(self.events_file) - sock.listen(1) - if self.ready_mutex: - self.ready_mutex.release() - while self.running: - try: - conn, client_addr = sock.accept() - thread = FuncThread(self.handle_connection, conn) - thread.start() - except Exception, e: - LOGGER.error('Error dispatching client request: %s %s' % (e, traceback.format_exc())) - sock.close() - - def handle_connection(self, conn): - socket_file = conn.makefile() - while self.running: - line = socket_file.readline()[:-1] - if line == '': - # end of socket input stream - break - else: - try: - records = json.loads(line) - self.callback(records) - except Exception, e: - LOGGER.warning("Unable to process JSON line: '%s': %s. Callback: %s" % - (truncate(line), traceback.format_exc(), self.callback)) - conn.close() - - def stop(self, quiet=True): - self.running = False - - class KclLogListener(object): def __init__(self, regex='.*'): self.regex = regex diff --git a/localstack/utils/kinesis/kinesis_util.py b/localstack/utils/kinesis/kinesis_util.py new file mode 100644 index 0000000000000..f30766039e1cd --- /dev/null +++ b/localstack/utils/kinesis/kinesis_util.py @@ -0,0 +1,51 @@ +import json +import socket +import traceback +import logging +from localstack.utils.common import FuncThread + +# set up local logger +LOGGER = logging.getLogger(__name__) + + +class EventFileReaderThread(FuncThread): + def __init__(self, events_file, callback, ready_mutex=None): + FuncThread.__init__(self, self.retrieve_loop, None) + self.running = True + self.events_file = events_file + self.callback = callback + self.ready_mutex = ready_mutex + + def retrieve_loop(self, params): + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.bind(self.events_file) + sock.listen(1) + if self.ready_mutex: + self.ready_mutex.release() + while self.running: + try: + conn, client_addr = sock.accept() + thread = FuncThread(self.handle_connection, conn) + thread.start() + except Exception, e: + LOGGER.error('Error dispatching client request: %s %s' % (e, traceback.format_exc())) + sock.close() + + def handle_connection(self, conn): + socket_file = conn.makefile() + while self.running: + line = socket_file.readline()[:-1] + if line == '': + # end of socket input stream + break + else: + try: + records = json.loads(line) + self.callback(records) + except Exception, e: + LOGGER.warning("Unable to process JSON line: '%s': %s. Callback: %s" % + (truncate(line), traceback.format_exc(), self.callback)) + conn.close() + + def stop(self, quiet=True): + self.running = False diff --git a/setup.py b/setup.py index 024bc173ebec6..2f615d2bb29a8 100755 --- a/setup.py +++ b/setup.py @@ -76,7 +76,7 @@ def run(self): setup( name='localstack', - version='0.1.15', + version='0.1.16', description='Provides an easy-to-use test/mocking framework for developing Cloud applications', author='Waldemar Hummer (Atlassian)', author_email='waldemar.hummer@gmail.com',