Skip to content
This repository has been archived by the owner on Nov 29, 2022. It is now read-only.

Commit

Permalink
Make send_push non blocking.
Browse files Browse the repository at this point in the history
  • Loading branch information
jhgg committed Dec 6, 2016
1 parent 5df0a28 commit c20e3cb
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 8 deletions.
2 changes: 1 addition & 1 deletion py/examples/client_send_request.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from loqui.client import LoquiClient

client = LoquiClient(('localhost', 4001))
print client.send_request('hello world').data
print len(client.send_request('hello world'))
4 changes: 2 additions & 2 deletions py/examples/echo_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ def handle_request(self, request, session):
if i and i % 50000 == 0:
session.close()

return request.data
return 'm' * 1024

def handle_push(self, push, session):
# print 'psuh'
print 'pushed', push.data
return


Expand Down
12 changes: 12 additions & 0 deletions py/examples/push_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from loqui.client import LoquiClient

client = LoquiClient(('localhost', 4001))
for i in xrange(100):
client.send_push('hello world %i' % i)

assert client.send_request('oh hi') == 100

for i in xrange(100):
client.send_push('hello world %i' % i)

assert client.send_request('oh hi') == 200
22 changes: 22 additions & 0 deletions py/examples/push_server_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from loqui.server import LoquiServer

sessions = {}


class Server(LoquiServer):
def handle_request(self, request, session):
return sessions[session]

def handle_push(self, push, session):
sessions[session] += 1

def handle_new_session(self, session):
sessions[session] = 0

def handle_session_gone(self, session):
sessions.pop(session, None)


if __name__ == '__main__':
s = Server(('localhost', 4001))
s.serve_forever()
37 changes: 34 additions & 3 deletions py/loqui/client.pyx
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import Queue
import socket

import gevent
Expand All @@ -14,16 +15,20 @@ cdef class LoquiClient:
cdef LoquiSocketSession _session
cdef object _session_lock
cdef object _push_handler
cdef object _connect_greenlet
cdef object _push_queue
cdef tuple _address
cdef int _connect_timeout
cdef Backoff _backoff

def __init__(self, address, push_handler=None, connect_timeout=5):
def __init__(self, address, push_handler=None, connect_timeout=5, max_push_queue_len=5000):
self._session = None
self._session_lock = RLock()
self._address = address
self._connect_timeout = connect_timeout
self._push_handler = push_handler
self._connect_greenlet = None
self._push_queue = Queue.deque(maxlen=max_push_queue_len)
self._backoff = Backoff(min_delay=0.25, max_delay=2)

cpdef set_push_handler(self, object push_handler):
Expand All @@ -38,6 +43,24 @@ cdef class LoquiClient:
self._backoff.fail()
self._session = None

cdef inline is_connected(self):
return self._session is not None and not self._session.defunct() and self._session.is_ready()

cdef inline _connect_async(self):
if not self.is_connected():
if self._connect_greenlet is None:
self._connect_greenlet = gevent.spawn(self._connect_in_greenlet)

return False

return True

def _connect_in_greenlet(self):
try:
self.connect()
finally:
self._connect_greenlet = None

cdef connect(self):
if self._session is not None:
if self._session.defunct():
Expand All @@ -63,6 +86,11 @@ cdef class LoquiClient:
self._session = LoquiSocketSession(sock, ENCODERS, on_push=self._push_handler)
self._backoff.succeed()
logging.info('[Loqui] Connected to %s:%s', self._address[0], self._address[1])
self._session.await_ready()

if self.is_connected():
while self._push_queue:
self._session.send_push(self._push_queue.popleft())

except socket.error:
logging.exception('[Loqui] Connection to %s:%s failed.', self._address[0], self._address[1])
Expand All @@ -81,8 +109,11 @@ cdef class LoquiClient:
raise

cpdef send_push(self, push_data):
self.connect()
self._session.send_push(push_data)
if not self._connect_async():
self._push_queue.append(push_data)

else:
self._session.send_push(push_data)

def handle_new_socket(self, socket):
pass
Expand Down
14 changes: 12 additions & 2 deletions py/loqui/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,21 @@ def _handle_connection(self, sock, addr):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setblocking(False)
session = LoquiSocketSession(sock, ENCODERS, False, self.handle_request, self.handle_push)
session.join()
print 'connection from', addr, 'done'

self.handle_new_session(session)
try:
session.join()
finally:
self.handle_session_gone(session)

def handle_request(self, request, session):
pass

def handle_push(self, push, session):
pass

def handle_new_session(self, session):
pass

def handle_session_gone(self, session):
pass
2 changes: 2 additions & 0 deletions py/loqui/socket_session.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ cdef class LoquiSocketSession:
bint via_remote_goaway=?)

cdef bint defunct(self)
cdef bint await_ready(self) except 1
cdef bint is_ready(self)

cdef _resume_sending(self)
cpdef _close_timeout(self)
Expand Down
7 changes: 7 additions & 0 deletions py/loqui/socket_session.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ cdef class LoquiSocketSession:
if isinstance(request, AsyncResult):
request.set_exception(close_exception)

cdef bint await_ready(self) except 1:
if not self._is_ready:
self._ready_event.wait()

cdef bint is_ready(self):
return self._is_ready

cdef _encode_data(self, object data):
if not self._is_ready:
self._ready_event.wait()
Expand Down

0 comments on commit c20e3cb

Please sign in to comment.