Skip to content

Commit

Permalink
Reconnection optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
walkor committed Oct 28, 2020
1 parent 2df76a1 commit b47b157
Showing 1 changed file with 47 additions and 8 deletions.
55 changes: 47 additions & 8 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ class Client
*/
protected $_db = 0;

/**
* @var string|array
*/
protected $_auth = null;

/**
* @var bool
*/
Expand Down Expand Up @@ -248,6 +253,11 @@ class Client
*/
protected $_subscribe = false;

/**
* @var bool
*/
protected $_firstConnect = true;

/**
* Client constructor.
* @param $address
Expand Down Expand Up @@ -323,14 +333,22 @@ public function connect()
Timer::del($this->_reconnectTimer);
$this->_reconnectTimer = null;
}

if ($this->_db) {
$this->select($this->_db);
$this->_queue = \array_merge([[['SELECT', $this->_db], time(), null]], $this->_queue);
}

if ($this->_auth) {
$this->_queue = \array_merge([[['AUTH', $this->_auth], time(), null]], $this->_queue);
}

$this->_firstConnect = false;

$this->_connection->onError = function ($code, $msg) {
echo new \Exception("Workerman Redis Connection Error $code $msg");
};
$this->_connectionCallback && \call_user_func($this->_connectionCallback, true, $this);
$this->process();
$this->_firstConnect && $this->_connectionCallback && \call_user_func($this->_connectionCallback, true, $this);
};

$time_start = microtime(true);
Expand All @@ -343,7 +361,7 @@ public function connect()
echo $exception;
return;
}
\call_user_func($this->_connectionCallback, false, $this);
$this->_firstConnect && \call_user_func($this->_connectionCallback, false, $this);
};

$this->_connection->onClose = function () use ($time_start) {
Expand Down Expand Up @@ -417,7 +435,7 @@ public function connect()
}
$this->closeConnection();
$this->_error = "Workerman Redis Connection to {$this->_address} timeout ({$timeout} seconds)";
if ($this->_connectionCallback) {
if ($this->_firstConnect && $this->_connectionCallback) {
\call_user_func($this->_connectionCallback, false, $this);
} else {
echo $this->_error . "\n";
Expand All @@ -435,8 +453,8 @@ public function process()
if (!$this->_connection || $this->_waiting || empty($this->_queue) || $this->_subscribe) {
return;
}

$queue = current($this->_queue);
\reset($this->_queue);
$queue = \current($this->_queue);
if ($queue[0][0] === 'SUBSCRIBE' || $queue[0][0] === 'PSUBSCRIBE') {
$this->_subscribe = true;
}
Expand Down Expand Up @@ -509,8 +527,29 @@ public function pSubscribe($patterns, $cb)
*/
public function select($db, $cb = null)
{
$this->_db = $db;
$this->_queue[] = [['SELECT', $db], time(), $cb];
$format = function ($result) use ($db) {
$this->_db = $db;
return $result;
};
$cb = $cb ? $cb : function(){};
$this->_queue[] = [['SELECT', $db], time(), $cb, $format];
$this->process();
}

/**
* auth
*
* @param string|array $auth
* @param null $cb
*/
public function auth($auth, $cb = null)
{
$format = function ($result) use ($auth) {
$this->_auth = $auth;
return $result;
};
$cb = $cb ? $cb : function(){};
$this->_queue[] = [['AUTH', $auth], time(), $cb, $format];
$this->process();
}

Expand Down

0 comments on commit b47b157

Please sign in to comment.