Skip to content

Commit

Permalink
Merged in feat/refactor-kinesis-utils (pull request localstack#25)
Browse files Browse the repository at this point in the history
Refactoring: move kinesis utils to separate file
  • Loading branch information
whummer committed Dec 10, 2016
2 parents 5139494 + c2a4134 commit 72e149e
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 46 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ make web

## Change Log

* v0.1.16: Minor restructuring and refactoring (create separate kinesis_util.py)
* v0.1.14: Fix AWS tokens when creating Elasticsearch client
* v0.1.11: Add startup/initialization notification for KCL process
* v0.1.10: Bump version of amazon_kclpy to 1.4.1
Expand Down
5 changes: 3 additions & 2 deletions localstack/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,17 @@ def stop(self, quiet=False):


class ShellCommandThread (FuncThread):
def __init__(self, cmd, params={}, outfile=None, env_vars={}, quiet=True):
def __init__(self, cmd, params={}, outfile=None, env_vars={}, stdin=False, quiet=True):
self.cmd = cmd
self.process = None
self.outfile = outfile
self.stdin = stdin
self.env_vars = env_vars
FuncThread.__init__(self, self.run_cmd, params, quiet=quiet)

def run_cmd(self, params):
try:
self.process = run(self.cmd, async=True, outfile=self.outfile, env_vars=self.env_vars)
self.process = run(self.cmd, async=True, stdin=self.stdin, outfile=self.outfile, env_vars=self.env_vars)
if self.outfile:
self.process.wait()
else:
Expand Down
44 changes: 1 addition & 43 deletions localstack/utils/kinesis/kinesis_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from sh import tail
from localstack.utils.common import *
from localstack.utils.kinesis import kclipy_helper
from localstack.utils.kinesis.kinesis_util import EventFileReaderThread
from localstack.constants import *
from localstack.utils.common import ShellCommandThread, FuncThread
from localstack.utils.aws import aws_stack
Expand Down Expand Up @@ -176,49 +177,6 @@ def stop(self, quiet=True):
self.running = False


class EventFileReaderThread(FuncThread):
def __init__(self, events_file, callback, ready_mutex=None):
FuncThread.__init__(self, self.retrieve_loop, None)
self.running = True
self.events_file = events_file
self.callback = callback
self.ready_mutex = ready_mutex

def retrieve_loop(self, params):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(self.events_file)
sock.listen(1)
if self.ready_mutex:
self.ready_mutex.release()
while self.running:
try:
conn, client_addr = sock.accept()
thread = FuncThread(self.handle_connection, conn)
thread.start()
except Exception, e:
LOGGER.error('Error dispatching client request: %s %s' % (e, traceback.format_exc()))
sock.close()

def handle_connection(self, conn):
socket_file = conn.makefile()
while self.running:
line = socket_file.readline()[:-1]
if line == '':
# end of socket input stream
break
else:
try:
records = json.loads(line)
self.callback(records)
except Exception, e:
LOGGER.warning("Unable to process JSON line: '%s': %s. Callback: %s" %
(truncate(line), traceback.format_exc(), self.callback))
conn.close()

def stop(self, quiet=True):
self.running = False


class KclLogListener(object):
def __init__(self, regex='.*'):
self.regex = regex
Expand Down
51 changes: 51 additions & 0 deletions localstack/utils/kinesis/kinesis_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import json
import socket
import traceback
import logging
from localstack.utils.common import FuncThread

# set up local logger
LOGGER = logging.getLogger(__name__)


class EventFileReaderThread(FuncThread):
def __init__(self, events_file, callback, ready_mutex=None):
FuncThread.__init__(self, self.retrieve_loop, None)
self.running = True
self.events_file = events_file
self.callback = callback
self.ready_mutex = ready_mutex

def retrieve_loop(self, params):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(self.events_file)
sock.listen(1)
if self.ready_mutex:
self.ready_mutex.release()
while self.running:
try:
conn, client_addr = sock.accept()
thread = FuncThread(self.handle_connection, conn)
thread.start()
except Exception, e:
LOGGER.error('Error dispatching client request: %s %s' % (e, traceback.format_exc()))
sock.close()

def handle_connection(self, conn):
socket_file = conn.makefile()
while self.running:
line = socket_file.readline()[:-1]
if line == '':
# end of socket input stream
break
else:
try:
records = json.loads(line)
self.callback(records)
except Exception, e:
LOGGER.warning("Unable to process JSON line: '%s': %s. Callback: %s" %
(truncate(line), traceback.format_exc(), self.callback))
conn.close()

def stop(self, quiet=True):
self.running = False
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def run(self):

setup(
name='localstack',
version='0.1.15',
version='0.1.16',
description='Provides an easy-to-use test/mocking framework for developing Cloud applications',
author='Waldemar Hummer (Atlassian)',
author_email='[email protected]',
Expand Down

0 comments on commit 72e149e

Please sign in to comment.