Skip to content

Fixed Refactor Issues #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions predix/admin/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import predix.admin.logstash
import predix.admin.cache
import predix.admin.dbaas
import predix.admin.eventhub


class Manifest(predix.app.Manifest):
Expand Down
4 changes: 2 additions & 2 deletions predix/admin/eventhub.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import predix.config
import predix.security.uaa
import predix.admin.service
import predix.data.eventhub
import predix.data.eventhub.client


class EventHub(object):
Expand All @@ -14,7 +14,7 @@ class EventHub(object):
def __init__(self, plan_name=None, name=None, uaa=None, *args, **kwargs):
self.service_name = 'predix-event-hub'
self.plan_name = plan_name or 'Tiered'
self.use_class = predix.data.eventhub.Eventhub
self.use_class = predix.data.eventhub.client.Eventhub

self.service = predix.admin.service.PredixService(self.service_name,
self.plan_name, name=name, uaa=uaa)
Expand Down
76 changes: 10 additions & 66 deletions predix/data/eventhub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ def __init__(self,
self._ws = None
self._channel = None
self._run_health_checker = True
self.zone_id = self._get_zone_id()
self.service = predix.service.Service(self.zone_id)

if publish_config is not None:
# make the channel
if publish_config.protocol == PublisherConfig.Protocol.GRPC:
Expand All @@ -58,6 +61,12 @@ def shutdown(self):
if self.subscriber is not None:
self.subscriber.shutdown()

def _get_zone_id(self):
if 'VCAP_SERVICES' in os.environ:
services = json.loads(os.getenv('VCAP_SERVICES'))
return services['predix-event-hub'][0]['credentials']['publish']['zone-http-header-value']
return self.get_service_env_value('zone_id')

def _get_host(self):
if 'VCAP_SERVICES' in os.environ:
services = json.loads(os.getenv('VCAP_SERVICES'))
Expand Down Expand Up @@ -124,69 +133,4 @@ def _health_check_thread(self):
time.sleep(30)
return

class GrpcManager:
"""
Class for managing GRPC calls by turing the generators grpc uses into function calls
This allows the sdk to man in the middle the messages
"""
def __init__(self, stub_call, on_msg_callback, metadata, tx_stream=True, initial_message=None):
"""
:param stub_call: the call on the grpc stub to build the generator on
:param on_msg_callback: the callback to pass any received functions on
:param metadata: metadata to attach to the stub call
"""
self._tx_stream = tx_stream
self._stub_call = stub_call
self._on_msg_callback = on_msg_callback
self._metadata = metadata
self._initial_message = initial_message
self._grpc_rx_thread = threading.Thread(target=self._grpc_rx_receiver)
self._grpc_rx_thread.daemon = True
self._grpc_rx_thread.start()
self._grpc_tx_queue = []
self._run_generator = True
time.sleep(1)

def send_message(self, tx_message):
"""
Add a message onto the tx queue to be sent on the stub
:param tx_message:
:return: None
"""
self._grpc_tx_queue.append(tx_message)

def _grpc_rx_receiver(self):
"""
Blocking Function that opens the stubs generator and pass any messages onto the callback
:return: None
"""
logging.debug("grpc rx stream metadata: " + str(self._metadata))
if self._tx_stream:
if self._initial_message is not None:
self.send_message(self._initial_message)
msgs = self._stub_call(request_iterator=self._grpc_tx_generator(), metadata=self._metadata)
else:
msgs = self._stub_call(self._initial_message, metadata=self._metadata)

for m in msgs:
self._on_msg_callback(m)

def stop_generator(self):
"""
Call this to close the generator
:return:
"""
logging.debug('stopping generator')
self._run_generator = False

def _grpc_tx_generator(self):
"""
the generator taking and messages added to the grpc_tx_queue
and yield them to grpc
:return: grpc messages
"""
while self._run_generator:
while len(self._grpc_tx_queue) != 0:
yield self._grpc_tx_queue.pop(0)
return


70 changes: 70 additions & 0 deletions predix/data/eventhub/grpc_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import threading
import time
import logging

class GrpcManager:
"""
Class for managing GRPC calls by turing the generators grpc uses into function calls
This allows the sdk to man in the middle the messages
"""
def __init__(self, stub_call, on_msg_callback, metadata, tx_stream=True, initial_message=None):
"""
:param stub_call: the call on the grpc stub to build the generator on
:param on_msg_callback: the callback to pass any received functions on
:param metadata: metadata to attach to the stub call
"""
self._tx_stream = tx_stream
self._stub_call = stub_call
self._on_msg_callback = on_msg_callback
self._metadata = metadata
self._initial_message = initial_message
self._grpc_rx_thread = threading.Thread(target=self._grpc_rx_receiver)
self._grpc_rx_thread.daemon = True
self._grpc_rx_thread.start()
self._grpc_tx_queue = []
self._run_generator = True
time.sleep(1)

def send_message(self, tx_message):
"""
Add a message onto the tx queue to be sent on the stub
:param tx_message:
:return: None
"""
self._grpc_tx_queue.append(tx_message)

def _grpc_rx_receiver(self):
"""
Blocking Function that opens the stubs generator and pass any messages onto the callback
:return: None
"""
logging.debug("grpc rx stream metadata: " + str(self._metadata))
if self._tx_stream:
if self._initial_message is not None:
self.send_message(self._initial_message)
msgs = self._stub_call(request_iterator=self._grpc_tx_generator(), metadata=self._metadata)
else:
msgs = self._stub_call(self._initial_message, metadata=self._metadata)

for m in msgs:
self._on_msg_callback(m)

def stop_generator(self):
"""
Call this to close the generator
:return:
"""
logging.debug('stopping generator')
self._run_generator = False

def _grpc_tx_generator(self):
"""
the generator taking and messages added to the grpc_tx_queue
and yield them to grpc
:return: grpc messages
"""
while self._run_generator:
while len(self._grpc_tx_queue) != 0:
yield self._grpc_tx_queue.pop(0)
return

5 changes: 2 additions & 3 deletions predix/data/eventhub/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

import time

from predix.data.eventhub import EventHub_pb2, EventHub_pb2_grpc
from predix.data.eventhub.client import Eventhub
from predix.data.eventhub import EventHub_pb2, EventHub_pb2_grpc, grpc_manager


class PublisherConfig:
Expand Down Expand Up @@ -292,7 +291,7 @@ def _init_grpc_publisher(self):
:return: None
"""
self._stub = EventHub_pb2_grpc.PublisherStub(channel=self._channel)
self.grpc_manager = Eventhub.GrpcManager(stub_call=self._stub.send,
self.grpc_manager = grpc_manager.GrpcManager(stub_call=self._stub.send,
on_msg_callback=self._publisher_callback,
metadata=self._generate_publish_headers().items())

Expand Down
5 changes: 2 additions & 3 deletions predix/data/eventhub/subscriber.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from predix.data.eventhub import EventHub_pb2, EventHub_pb2_grpc
from predix.data.eventhub.client import Eventhub
from predix.data.eventhub import EventHub_pb2, EventHub_pb2_grpc, grpc_manager


class SubscribeConfig:
Expand Down Expand Up @@ -68,7 +67,7 @@ def __init__(self, eventhub_client, config, channel):
self._rx_messages = []
self.active = True
self.run_subscribe_generator = True
self.grpc_manager = Eventhub.GrpcManager(stub_call=stub_call,
self.grpc_manager = grpc_manager.GrpcManager(stub_call=stub_call,
on_msg_callback=self._subscriber_callback,
metadata=self._generate_subscribe_headers(),
tx_stream=tx_stream,
Expand Down