Skip to content

Commit

Permalink
Manage multiple connection in parallel
Browse files Browse the repository at this point in the history
 - Send back socket if already connected
 - wait for connection to be up if connecting
  • Loading branch information
Vinceveve committed Aug 24, 2017
1 parent c1b22c6 commit 08daeca
Showing 1 changed file with 30 additions and 0 deletions.
30 changes: 30 additions & 0 deletions src/Rxnet/EventStore/EventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ class EventStore
const POSITION_START = 0;
const POSITION_END = -1;
const POSITION_LATEST = 999999;

const CONNECTED = 1;
const CONNECTING = 0;
const DISCONNECTED = -1;
/** @var LoopInterface */
protected $loop;
/** @var Dns */
Expand All @@ -75,6 +79,8 @@ class EventStore
protected $readBufferDisposable;
/** @var array */
protected $dsn;
/** @var int */
protected $state = self::DISCONNECTED;

/**
* EventStore constructor.
Expand All @@ -93,6 +99,10 @@ public function __construct(LoopInterface $loop = null, Dns $dns = null, Tcp $tc
$this->connector = $tcp ?: new Tcp($this->loop);
}

public function isConnected() {
return $this->state === self::CONNECTED;
}

/**
* @param string $dsn tcp://user:password@host:port
* @param int $connectTimeout in milliseconds
Expand All @@ -101,6 +111,23 @@ public function __construct(LoopInterface $loop = null, Dns $dns = null, Tcp $tc
*/
public function connect($dsn = 'tcp://admin:[email protected]:1113', $connectTimeout = 1000, $heartbeatTimeout = 5000)
{

if($this->state === self::CONNECTING) {
while($this->state === self::CONNECTING) {
$this->loop->tick();
}
return Observable::create(function(ObserverInterface $observer) {
$this->connectionSubject->subscribe($observer);
$this->connectionSubject->onNext(new Event('/eventstore/connected'));
});
}
if($this->state === self::CONNECTED) {
return Observable::create(function(ObserverInterface $observer) {
$this->connectionSubject->subscribe($observer);
$this->connectionSubject->onNext(new Event('/eventstore/connected'));
});
}
$this->state = self::CONNECTING;
// connector compatibility
$connectTimeout = ($connectTimeout > 0) ? $connectTimeout / 1000 : 0;
$this->heartbeatTimeout = $heartbeatTimeout;
Expand Down Expand Up @@ -146,6 +173,8 @@ function ($ip) {
$this->heartBeatDisposable = $this->heartbeat();

$this->connectionSubject->onNext(new Event('/eventstore/connected'));
$this->state = self::CONNECTED;

}), new EventLoopScheduler($this->loop));

$this->connectionSubject->subscribe($observer);
Expand Down Expand Up @@ -220,6 +249,7 @@ protected function heartbeat()
->filter(
function (SocketMessage $message) use (&$called) {
$called = microtime(true);
echo '#';
return $message->getMessageType()->getType() === MessageType::HEARTBEAT_REQUEST;
}
)
Expand Down

0 comments on commit 08daeca

Please sign in to comment.