Skip to content
This repository was archived by the owner on Apr 3, 2019. It is now read-only.

Do not block when connecting socket #62

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions tornadoredis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ def on_disconnect(self):
raise ConnectionError("Socket closed on remote end")

#### connection
@gen.engine
def connect(self):
if not self.connection.connected():
pool = self._connection_pool
Expand All @@ -329,7 +330,7 @@ def connect(self):
self.connection = pool.get_connection(event_handler_ref=self)
self.connection.ready_callbacks = old_conn.ready_callbacks
else:
self.connection.connect()
yield gen.Task(self.connection.connect)

@gen.engine
def disconnect(self, callback=None):
Expand Down Expand Up @@ -402,7 +403,7 @@ def execute_command(self, cmd, *args, **kwargs):
while n_tries > 0:
n_tries -= 1
if not self.connection.connected():
self.connection.connect()
yield gen.Task(self.connection.connect)

if not self.subscribed and not self.connection.ready():
yield gen.Task(self.connection.wait_until_ready)
Expand Down Expand Up @@ -1281,7 +1282,7 @@ def execute(self, callback=None):
yield gen.Task(self.select, self.selected_db)

if not self.connection.connected():
self.connection.connect()
yield gen.Task(self.connection.connect)

if not self.connection.ready():
yield gen.Task(self.connection.wait_until_ready)
Expand Down
21 changes: 12 additions & 9 deletions tornadoredis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,25 +59,28 @@ def wait_until_ready(self, callback=None):
else:
callback()

def connect(self):
def connect(self, callback=None):
if not self._stream:
try:
if self.unix_socket_path:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
sock.connect(self.unix_socket_path)
endpoint = self.unix_socket_path
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
sock.settimeout(self.timeout)
sock.connect((self.host, self.port))
endpoint = (self.host, self.port)
sock.settimeout(self.timeout)
self._stream = IOStream(sock, io_loop=self._io_loop)
self._stream.set_close_callback(self.on_stream_close)
self.info['db'] = 0
self.info['pass'] = None
self._stream.connect(endpoint, partial(self._on_connect, callback))
except socket.error as e:
raise ConnectionError(str(e))
self.fire_event('on_connect')

def _on_connect(self, callback):
self.info['db'] = 0
self.info['pass'] = None
self._stream.set_close_callback(self.on_stream_close)
callback()
self.fire_event('on_connect')

def on_stream_close(self):
if self._stream:
Expand Down