From 008769847cd1cdf5f871c73d15817e20e02453b1 Mon Sep 17 00:00:00 2001 From: William Gowell Date: Thu, 12 Apr 2018 14:59:51 -0700 Subject: [PATCH 1/4] fixed refactoring issues --- predix/admin/app.py | 1 + predix/admin/eventhub.py | 4 +- predix/data/eventhub/client.py | 76 ++++-------------------------- predix/data/eventhub/publisher.py | 5 +- predix/data/eventhub/subscriber.py | 5 +- predix/data/timeseries.py | 5 +- 6 files changed, 20 insertions(+), 76 deletions(-) diff --git a/predix/admin/app.py b/predix/admin/app.py index 210994b..c1f7162 100644 --- a/predix/admin/app.py +++ b/predix/admin/app.py @@ -12,6 +12,7 @@ import predix.admin.logstash import predix.admin.cache import predix.admin.dbaas +import predix.admin.eventhub class Manifest(predix.app.Manifest): diff --git a/predix/admin/eventhub.py b/predix/admin/eventhub.py index fec596d..61fb414 100644 --- a/predix/admin/eventhub.py +++ b/predix/admin/eventhub.py @@ -3,7 +3,7 @@ import predix.config import predix.security.uaa import predix.admin.service -import predix.data.eventhub +import predix.data.eventhub.client class EventHub(object): @@ -14,7 +14,7 @@ class EventHub(object): def __init__(self, plan_name=None, name=None, uaa=None, *args, **kwargs): self.service_name = 'predix-event-hub' self.plan_name = plan_name or 'Tiered' - self.use_class = predix.data.eventhub.Eventhub + self.use_class = predix.data.eventhub.client.Eventhub self.service = predix.admin.service.PredixService(self.service_name, self.plan_name, name=name, uaa=uaa) diff --git a/predix/data/eventhub/client.py b/predix/data/eventhub/client.py index 61c6348..9363317 100644 --- a/predix/data/eventhub/client.py +++ b/predix/data/eventhub/client.py @@ -35,6 +35,9 @@ def __init__(self, self._ws = None self._channel = None self._run_health_checker = True + self.zone_id = self._get_zone_id() + self.service = predix.service.Service(self.zone_id) + if publish_config is not None: # make the channel if publish_config.protocol == PublisherConfig.Protocol.GRPC: @@ -58,6 +61,12 @@ def shutdown(self): if self.subscriber is not None: self.subscriber.shutdown() + def _get_zone_id(self): + if 'VCAP_SERVICES' in os.environ: + services = json.loads(os.getenv('VCAP_SERVICES')) + return services['predix-event-hub'][0]['credentials']['publish']['zone-http-header-value'] + return self.get_service_env_value('zone_id') + def _get_host(self): if 'VCAP_SERVICES' in os.environ: services = json.loads(os.getenv('VCAP_SERVICES')) @@ -124,69 +133,4 @@ def _health_check_thread(self): time.sleep(30) return - class GrpcManager: - """ - Class for managing GRPC calls by turing the generators grpc uses into function calls - This allows the sdk to man in the middle the messages - """ - def __init__(self, stub_call, on_msg_callback, metadata, tx_stream=True, initial_message=None): - """ - :param stub_call: the call on the grpc stub to build the generator on - :param on_msg_callback: the callback to pass any received functions on - :param metadata: metadata to attach to the stub call - """ - self._tx_stream = tx_stream - self._stub_call = stub_call - self._on_msg_callback = on_msg_callback - self._metadata = metadata - self._initial_message = initial_message - self._grpc_rx_thread = threading.Thread(target=self._grpc_rx_receiver) - self._grpc_rx_thread.daemon = True - self._grpc_rx_thread.start() - self._grpc_tx_queue = [] - self._run_generator = True - time.sleep(1) - - def send_message(self, tx_message): - """ - Add a message onto the tx queue to be sent on the stub - :param tx_message: - :return: None - """ - self._grpc_tx_queue.append(tx_message) - - def _grpc_rx_receiver(self): - """ - Blocking Function that opens the stubs generator and pass any messages onto the callback - :return: None - """ - logging.debug("grpc rx stream metadata: " + str(self._metadata)) - if self._tx_stream: - if self._initial_message is not None: - self.send_message(self._initial_message) - msgs = self._stub_call(request_iterator=self._grpc_tx_generator(), metadata=self._metadata) - else: - msgs = self._stub_call(self._initial_message, metadata=self._metadata) - - for m in msgs: - self._on_msg_callback(m) - - def stop_generator(self): - """ - Call this to close the generator - :return: - """ - logging.debug('stopping generator') - self._run_generator = False - - def _grpc_tx_generator(self): - """ - the generator taking and messages added to the grpc_tx_queue - and yield them to grpc - :return: grpc messages - """ - while self._run_generator: - while len(self._grpc_tx_queue) != 0: - yield self._grpc_tx_queue.pop(0) - return - + \ No newline at end of file diff --git a/predix/data/eventhub/publisher.py b/predix/data/eventhub/publisher.py index 528733f..2419c21 100644 --- a/predix/data/eventhub/publisher.py +++ b/predix/data/eventhub/publisher.py @@ -5,8 +5,7 @@ import time -from predix.data.eventhub import EventHub_pb2, EventHub_pb2_grpc -from predix.data.eventhub.client import Eventhub +from predix.data.eventhub import EventHub_pb2, EventHub_pb2_grpc, grpc_manager class PublisherConfig: @@ -292,7 +291,7 @@ def _init_grpc_publisher(self): :return: None """ self._stub = EventHub_pb2_grpc.PublisherStub(channel=self._channel) - self.grpc_manager = Eventhub.GrpcManager(stub_call=self._stub.send, + self.grpc_manager = grpc_manager.GrpcManager(stub_call=self._stub.send, on_msg_callback=self._publisher_callback, metadata=self._generate_publish_headers().items()) diff --git a/predix/data/eventhub/subscriber.py b/predix/data/eventhub/subscriber.py index 385d4c6..f8944e4 100644 --- a/predix/data/eventhub/subscriber.py +++ b/predix/data/eventhub/subscriber.py @@ -1,5 +1,4 @@ -from predix.data.eventhub import EventHub_pb2, EventHub_pb2_grpc -from predix.data.eventhub.client import Eventhub +from predix.data.eventhub import EventHub_pb2, EventHub_pb2_grpc, grpc_manager class SubscribeConfig: @@ -68,7 +67,7 @@ def __init__(self, eventhub_client, config, channel): self._rx_messages = [] self.active = True self.run_subscribe_generator = True - self.grpc_manager = Eventhub.GrpcManager(stub_call=stub_call, + self.grpc_manager = grpc_manager.GrpcManager(stub_call=stub_call, on_msg_callback=self._subscriber_callback, metadata=self._generate_subscribe_headers(), tx_stream=tx_stream, diff --git a/predix/data/timeseries.py b/predix/data/timeseries.py index 440fb48..699c61b 100644 --- a/predix/data/timeseries.py +++ b/predix/data/timeseries.py @@ -11,6 +11,7 @@ import predix.service + class TimeSeries(object): """ Client library for working with the Time Series service. @@ -407,8 +408,8 @@ def _send_to_timeseries(self, message): try: ws = self._get_websocket() ws.send(json.dumps(message)) - result = ws.recv() - except (websocket.WebSocketConnectionClosedException, Exception) as e: + result = ws.recv() + except (WebSocketConnectionClosedException, Exception) as e: logging.debug("Connection failed, will try again.") logging.debug(e) From 7ca03a87292bf905435081d16632944e947eeadb Mon Sep 17 00:00:00 2001 From: William Gowell Date: Thu, 12 Apr 2018 15:47:55 -0700 Subject: [PATCH 2/4] added grpc manager --- predix/data/eventhub/grpc_manager.py | 70 ++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 predix/data/eventhub/grpc_manager.py diff --git a/predix/data/eventhub/grpc_manager.py b/predix/data/eventhub/grpc_manager.py new file mode 100644 index 0000000..88bceed --- /dev/null +++ b/predix/data/eventhub/grpc_manager.py @@ -0,0 +1,70 @@ +import threading +import time +import logging + +class GrpcManager: + """ + Class for managing GRPC calls by turing the generators grpc uses into function calls + This allows the sdk to man in the middle the messages + """ + def __init__(self, stub_call, on_msg_callback, metadata, tx_stream=True, initial_message=None): + """ + :param stub_call: the call on the grpc stub to build the generator on + :param on_msg_callback: the callback to pass any received functions on + :param metadata: metadata to attach to the stub call + """ + self._tx_stream = tx_stream + self._stub_call = stub_call + self._on_msg_callback = on_msg_callback + self._metadata = metadata + self._initial_message = initial_message + self._grpc_rx_thread = threading.Thread(target=self._grpc_rx_receiver) + self._grpc_rx_thread.daemon = True + self._grpc_rx_thread.start() + self._grpc_tx_queue = [] + self._run_generator = True + time.sleep(1) + + def send_message(self, tx_message): + """ + Add a message onto the tx queue to be sent on the stub + :param tx_message: + :return: None + """ + self._grpc_tx_queue.append(tx_message) + + def _grpc_rx_receiver(self): + """ + Blocking Function that opens the stubs generator and pass any messages onto the callback + :return: None + """ + logging.debug("grpc rx stream metadata: " + str(self._metadata)) + if self._tx_stream: + if self._initial_message is not None: + self.send_message(self._initial_message) + msgs = self._stub_call(request_iterator=self._grpc_tx_generator(), metadata=self._metadata) + else: + msgs = self._stub_call(self._initial_message, metadata=self._metadata) + + for m in msgs: + self._on_msg_callback(m) + + def stop_generator(self): + """ + Call this to close the generator + :return: + """ + logging.debug('stopping generator') + self._run_generator = False + + def _grpc_tx_generator(self): + """ + the generator taking and messages added to the grpc_tx_queue + and yield them to grpc + :return: grpc messages + """ + while self._run_generator: + while len(self._grpc_tx_queue) != 0: + yield self._grpc_tx_queue.pop(0) + return + From 211a0e38aef0960e1cbede20c56006451f7bbb47 Mon Sep 17 00:00:00 2001 From: William Gowell Date: Thu, 12 Apr 2018 16:05:46 -0700 Subject: [PATCH 3/4] reverted change in TS --- predix/data/timeseries.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/predix/data/timeseries.py b/predix/data/timeseries.py index 699c61b..54faeec 100644 --- a/predix/data/timeseries.py +++ b/predix/data/timeseries.py @@ -11,7 +11,6 @@ import predix.service - class TimeSeries(object): """ Client library for working with the Time Series service. @@ -409,7 +408,7 @@ def _send_to_timeseries(self, message): ws = self._get_websocket() ws.send(json.dumps(message)) result = ws.recv() - except (WebSocketConnectionClosedException, Exception) as e: + except (websocket.WebSocketConnectionClosedException, Exception) as e: logging.debug("Connection failed, will try again.") logging.debug(e) From 3507a07705289377dab3d9375f95c10102a94fb0 Mon Sep 17 00:00:00 2001 From: William Gowell Date: Thu, 12 Apr 2018 16:06:22 -0700 Subject: [PATCH 4/4] reverted change in TS --- predix/data/timeseries.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/predix/data/timeseries.py b/predix/data/timeseries.py index 54faeec..440fb48 100644 --- a/predix/data/timeseries.py +++ b/predix/data/timeseries.py @@ -407,7 +407,7 @@ def _send_to_timeseries(self, message): try: ws = self._get_websocket() ws.send(json.dumps(message)) - result = ws.recv() + result = ws.recv() except (websocket.WebSocketConnectionClosedException, Exception) as e: logging.debug("Connection failed, will try again.") logging.debug(e)