Skip to content

Commit

Permalink
Fix timeout when server shut down
Browse files Browse the repository at this point in the history
  • Loading branch information
Vinceveve committed Aug 24, 2017
1 parent 34ce86d commit 95f208c
Showing 1 changed file with 23 additions and 13 deletions.
36 changes: 23 additions & 13 deletions src/Rxnet/EventStore/EventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
use Rx\Observer\CallbackObserver;
use Rx\ObserverInterface;
use Rx\Scheduler\EventLoopScheduler;
use Rx\Subject\ReplaySubject;
use Rx\Subject\Subject;
use function Rxnet\await;
use Rxnet\Connector\Tcp;
use Rxnet\Dns\Dns;
use Rxnet\Event\ConnectorEvent;
Expand Down Expand Up @@ -72,7 +70,7 @@ class EventStore
/** @var DisposableInterface */
protected $heartBeatDisposable;
/** @var int */
protected $heartBeatRate;
protected $heartbeatTimeout;
/** @var DisposableInterface */
protected $readBufferDisposable;
/** @var array */
Expand All @@ -98,14 +96,14 @@ public function __construct(LoopInterface $loop = null, Dns $dns = null, Tcp $tc
/**
* @param string $dsn tcp://user:password@host:port
* @param int $connectTimeout in milliseconds
* @param int $heartBeatRate in milliseconds
* @param int $heartbeatTimeout in milliseconds
* @return Observable\AnonymousObservable
*/
public function connect($dsn = 'tcp://admin:[email protected]:1113', $connectTimeout = 1000, $heartBeatRate = 5000)
public function connect($dsn = 'tcp://admin:[email protected]:1113', $connectTimeout = 1000, $heartbeatTimeout = 5000)
{
// connector compatibility
$connectTimeout = ($connectTimeout > 0) ? $connectTimeout / 1000 : 0;
$this->heartBeatRate = $heartBeatRate;
$this->heartbeatTimeout = $heartbeatTimeout;

if (false === stripos($dsn, '://')) {
$dsn = 'tcp://' . $dsn;
Expand Down Expand Up @@ -205,16 +203,29 @@ protected function reconnect($host, $port)
*/
protected function heartbeat()
{
$timeout = $this->heartbeatTimeout/1000;
$called = microtime(true);

Observable::interval($this->heartbeatTimeout)
->doOnNext(function () use (&$called, &$timeout) {
$now = microtime(true);
if (($now - $called) > $timeout) {
throw new \Exception("timeout");
}
})
->subscribeCallback(null, [$this->connectionSubject, 'onError'], null, new EventLoopScheduler($this->loop));


return $this->readBuffer
->timeout($this->heartBeatRate)
->filter(
function (SocketMessage $message) {
return $message->getMessageType()->getType() === MessageType::HEARTBEAT_REQUEST;
}
)
->subscribe(
new CallbackObserver(
function (SocketMessage $message) {
function (SocketMessage $message) use (&$called) {
$called = microtime(true);
$this->writer->composeAndWrite(MessageType::HEARTBEAT_RESPONSE, null, $message->getCorrelationID());
},
[$this->connectionSubject, 'onError']
Expand All @@ -233,7 +244,7 @@ function (SocketMessage $message) {
*/
public function write($streamId, $events, $expectedVersion = -2, $requireMaster = false)
{
if($events instanceof NewEventInterface) {
if ($events instanceof NewEventInterface) {
$events = [$events];
}
if (!$events) {
Expand Down Expand Up @@ -304,10 +315,9 @@ public function startTransaction($streamId, $expectedVersion = -2, $requireMaste
*/
public function catchUpSubscription($streamId, $startFrom = self::POSITION_START, $resolveLink = false)
{
if($startFrom === self::POSITION_END) {
if ($startFrom === self::POSITION_END) {
$observable = $this->readEvent($streamId, $startFrom);
}
else {
} else {
$observable = $this->readEventsForward($streamId, $startFrom, self::POSITION_LATEST, $resolveLink);
}
return $observable->concat($this->volatileSubscription($streamId, $resolveLink));
Expand Down Expand Up @@ -413,7 +423,7 @@ public function persistentSubscription($streamID, $group)
function (PersistentSubscriptionStreamEventAppeared $eventAppeared) use ($correlationID, $group) {
$record = $eventAppeared->getEvent()->getEvent();
$link = $eventAppeared->getEvent()->getLink();
if(!$record) {
if (!$record) {
// TODO ugly patch investigate why
$record = $link;
}
Expand Down

0 comments on commit 95f208c

Please sign in to comment.