Skip to content

Commit

Permalink
Debug connect on some latest php versions
Browse files Browse the repository at this point in the history
  • Loading branch information
Vinceveve committed Aug 2, 2017
1 parent 9566465 commit 24d7cb9
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
14 changes: 6 additions & 8 deletions src/Rxnet/EventStore/EventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public function connect($dsn = 'tcp://admin:changeit@localhost:1113', $connectTi
}
$this->dsn = $parsedDsn;
// What you should observe if you want to auto reconnect
$this->connectionSubject = new ReplaySubject(1, 1);
$this->connectionSubject = new Subject();
$this->connector->setTimeout($connectTimeout);
return Observable::create(function (ObserverInterface $observer) {
$this->dns
Expand All @@ -133,7 +133,7 @@ public function connect($dsn = 'tcp://admin:changeit@localhost:1113', $connectTi
function ($ip) {
return $this->connector->connect($ip, $this->dsn['port']);
})
->flatMap(function (ConnectorEvent $connectorEvent) {
->subscribe(new CallbackObserver(function (ConnectorEvent $connectorEvent) {
// send all data to our read buffer
$this->stream = new BufferedStream($connectorEvent->getStream()->getSocket(), $this->loop);
$this->readBufferDisposable = $this->stream->subscribe($this->readBuffer);
Expand All @@ -146,12 +146,10 @@ function ($ip) {
// start heartbeat listener
$this->heartBeatDisposable = $this->heartbeat();

// Replay subject will do the magic
$this->connectionSubject->onNext(new Event('/eventstore/connected'));
// Forward internal errors to the connect result
return $this->connectionSubject;
})
->subscribe($observer, new EventLoopScheduler($this->loop));
}), new EventLoopScheduler($this->loop));

$this->connectionSubject->subscribe($observer);

return new CallbackDisposable(function () {
if ($this->readBufferDisposable instanceof DisposableInterface) {
Expand Down Expand Up @@ -257,7 +255,7 @@ public function write($streamId, $events, $expectedVersion = -2, $requireMaster
$readDisposable = $this->readBuffer->waitFor($correlationID, 1)
->subscribe($observer);

return new CallbackDisposable(function() use($writeDisposable, $readDisposable) {
return new CallbackDisposable(function () use ($writeDisposable, $readDisposable) {
$readDisposable->dispose();
$writeDisposable->dispose();
});
Expand Down
6 changes: 6 additions & 0 deletions usage/write.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@
use Rxnet\EventStore\NewEvent\JsonEvent;

require '../vendor/autoload.php';
//$loop = new \Rxnet\Loop\LibEvLoop();
//EventLoop::setLoop($loop);

echo 'connecting';
$eventStore = new \Rxnet\EventStore\EventStore();
\Rxnet\await($eventStore->connect());

echo "connected \n";

\Rx\Observable::interval(10)
->flatMap(
function ($i) use ($eventStore) {
echo '.';
$event = new JsonEvent('/truc/chose', ['i' => $i]);
return $eventStore->write('domain-test-1.fr', [$event]);
}
Expand Down

0 comments on commit 24d7cb9

Please sign in to comment.