From b47b157ecdfadc4ceab7f395af36391a454b705c Mon Sep 17 00:00:00 2001 From: walkor Date: Wed, 28 Oct 2020 11:49:15 +0800 Subject: [PATCH] Reconnection optimization --- src/Client.php | 55 ++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 47 insertions(+), 8 deletions(-) diff --git a/src/Client.php b/src/Client.php index 77f8216..96ae162 100644 --- a/src/Client.php +++ b/src/Client.php @@ -213,6 +213,11 @@ class Client */ protected $_db = 0; + /** + * @var string|array + */ + protected $_auth = null; + /** * @var bool */ @@ -248,6 +253,11 @@ class Client */ protected $_subscribe = false; + /** + * @var bool + */ + protected $_firstConnect = true; + /** * Client constructor. * @param $address @@ -323,14 +333,22 @@ public function connect() Timer::del($this->_reconnectTimer); $this->_reconnectTimer = null; } + if ($this->_db) { - $this->select($this->_db); + $this->_queue = \array_merge([[['SELECT', $this->_db], time(), null]], $this->_queue); + } + + if ($this->_auth) { + $this->_queue = \array_merge([[['AUTH', $this->_auth], time(), null]], $this->_queue); } + + $this->_firstConnect = false; + $this->_connection->onError = function ($code, $msg) { echo new \Exception("Workerman Redis Connection Error $code $msg"); }; - $this->_connectionCallback && \call_user_func($this->_connectionCallback, true, $this); $this->process(); + $this->_firstConnect && $this->_connectionCallback && \call_user_func($this->_connectionCallback, true, $this); }; $time_start = microtime(true); @@ -343,7 +361,7 @@ public function connect() echo $exception; return; } - \call_user_func($this->_connectionCallback, false, $this); + $this->_firstConnect && \call_user_func($this->_connectionCallback, false, $this); }; $this->_connection->onClose = function () use ($time_start) { @@ -417,7 +435,7 @@ public function connect() } $this->closeConnection(); $this->_error = "Workerman Redis Connection to {$this->_address} timeout ({$timeout} seconds)"; - if ($this->_connectionCallback) { + if ($this->_firstConnect && $this->_connectionCallback) { \call_user_func($this->_connectionCallback, false, $this); } else { echo $this->_error . "\n"; @@ -435,8 +453,8 @@ public function process() if (!$this->_connection || $this->_waiting || empty($this->_queue) || $this->_subscribe) { return; } - - $queue = current($this->_queue); + \reset($this->_queue); + $queue = \current($this->_queue); if ($queue[0][0] === 'SUBSCRIBE' || $queue[0][0] === 'PSUBSCRIBE') { $this->_subscribe = true; } @@ -509,8 +527,29 @@ public function pSubscribe($patterns, $cb) */ public function select($db, $cb = null) { - $this->_db = $db; - $this->_queue[] = [['SELECT', $db], time(), $cb]; + $format = function ($result) use ($db) { + $this->_db = $db; + return $result; + }; + $cb = $cb ? $cb : function(){}; + $this->_queue[] = [['SELECT', $db], time(), $cb, $format]; + $this->process(); + } + + /** + * auth + * + * @param string|array $auth + * @param null $cb + */ + public function auth($auth, $cb = null) + { + $format = function ($result) use ($auth) { + $this->_auth = $auth; + return $result; + }; + $cb = $cb ? $cb : function(){}; + $this->_queue[] = [['AUTH', $auth], time(), $cb, $format]; $this->process(); }